You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by zh...@apache.org on 2017/10/11 05:44:30 UTC

[bookkeeper] branch master updated: ISSUE #520: Add more http endpoint

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 9537323  ISSUE #520: Add more http endpoint
9537323 is described below

commit 95373234a273920fadc06742479ddcc4e603fa1b
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Wed Oct 11 13:43:46 2017 +0800

    ISSUE #520: Add more http endpoint
    
    Descriptions of the changes in this PR:
    
    #278 introduces BookKeeper Http Endpoint module. However there are only two endpoints, which is “/heartbeat” and “/api/config/serverConfig”. In order to fully leverage the http modules, The goal is to add more endpoints to this modules.
    Please reference BP-17 for more details.
    
    Author: Jia Zhai <zh...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>
    
    This closes #521 from zhaijack/http_endpoint, closes #520
---
 ...actory.java => AbstractHttpHandlerFactory.java} |  32 +-
 .../org/apache/bookkeeper/http/HttpRouter.java     |  51 +-
 .../org/apache/bookkeeper/http/HttpServer.java     |  32 +-
 ...rviceProvider.java => HttpServiceProvider.java} |  17 +-
 ...eProvider.java => NullHttpServiceProvider.java} |  29 +-
 .../{ErrorService.java => ErrorHttpService.java}   |  12 +-
 .../bookkeeper/http/service/HeartbeatService.java  |   8 +-
 .../{Service.java => HttpEndpointService.java}     |   6 +-
 ...ServiceRequest.java => HttpServiceRequest.java} |  12 +-
 ...rviceResponse.java => HttpServiceResponse.java} |  10 +-
 .../{NullService.java => NullHttpService.java}     |   8 +-
 .../http/twitter/TwitterAbstractHandler.java       |  23 +-
 ...Factory.java => TwitterHttpHandlerFactory.java} |  30 +-
 .../bookkeeper/http/twitter/TwitterHttpServer.java |  16 +-
 .../http/twitter/TestTwitterHttpServer.java        |   8 +-
 .../http/vertx/VertxAbstractHandler.java           |  20 +-
 ...erFactory.java => VertxHttpHandlerFactory.java} |  27 +-
 .../bookkeeper/http/vertx/VertxHttpServer.java     |  16 +-
 .../bookkeeper/http/vertx/TestVertxHttpServer.java |   8 +-
 .../org/apache/bookkeeper/bookie/BookieShell.java  |   2 +-
 .../java/org/apache/bookkeeper/bookie/Cookie.java  |  16 +-
 .../java/org/apache/bookkeeper/bookie/Journal.java |   8 +-
 .../java/org/apache/bookkeeper/bookie/LogMark.java |   2 +-
 .../org/apache/bookkeeper/client/LedgerHandle.java |   2 +-
 .../bookkeeper/http/BKHttpServiceProvider.java     | 195 ++++++
 .../apache/bookkeeper/http/BKServiceProvider.java  | 110 ----
 .../bookkeeper/http/ConfigurationService.java      |  50 +-
 .../bookkeeper/http/DecommissionService.java       | 110 ++++
 .../bookkeeper/http/DeleteLedgerService.java       |  81 +++
 .../bookkeeper/http/ExpandStorageService.java      | 103 +++
 .../bookkeeper/http/GetLastLogMarkService.java     | 109 +++
 .../bookkeeper/http/GetLedgerMetaService.java      |  90 +++
 .../bookkeeper/http/ListBookieInfoService.java     | 131 ++++
 .../apache/bookkeeper/http/ListBookiesService.java |  96 +++
 .../bookkeeper/http/ListDiskFilesService.java      | 132 ++++
 .../apache/bookkeeper/http/ListLedgerService.java  | 176 +++++
 .../http/ListUnderReplicatedLedgerService.java     | 124 ++++
 .../http/LostBookieRecoveryDelayService.java       | 103 +++
 .../bookkeeper/http/ReadLedgerEntryService.java    | 120 ++++
 .../bookkeeper/http/RecoveryBookieService.java     | 143 ++++
 .../bookkeeper/http/TriggerAuditService.java       |  76 +++
 .../bookkeeper/http/WhoIsAuditorService.java       |  89 +++
 .../org/apache/bookkeeper/proto/BookieServer.java  |   6 +-
 .../bookkeeper/replication/AutoRecoveryMain.java   |   4 +-
 .../java/org/apache/bookkeeper/server/Main.java    |   4 +-
 .../bookkeeper/server/service/BookieService.java   |   3 +-
 .../bookkeeper/server/service/HttpService.java     |   4 +-
 .../bookkeeper/client/BookieRecoveryTest.java      |  68 +-
 .../apache/bookkeeper/http/TestHttpService.java    | 727 ++++++++++++++++++++-
 49 files changed, 2901 insertions(+), 348 deletions(-)

diff --git a/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/AbstractHandlerFactory.java b/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/AbstractHttpHandlerFactory.java
similarity index 57%
rename from bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/AbstractHandlerFactory.java
rename to bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/AbstractHttpHandlerFactory.java
index c9896a7..480a47a 100644
--- a/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/AbstractHandlerFactory.java
+++ b/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/AbstractHttpHandlerFactory.java
@@ -24,25 +24,33 @@ package org.apache.bookkeeper.http;
  * Abstract handler factory which provide interface
  * to create handlers for bookkeeper http endpoints.
  */
-public abstract class AbstractHandlerFactory<Handler> {
-    private ServiceProvider serviceProvider;
+public abstract class AbstractHttpHandlerFactory<Handler> {
+    private HttpServiceProvider httpServiceProvider;
 
-    public AbstractHandlerFactory(ServiceProvider serviceProvider) {
-        this.serviceProvider = serviceProvider;
-    }
-
-    public ServiceProvider getServiceProvider() {
-        return serviceProvider;
+    /**
+     * Instantiates a new Abstract http handler factory.
+     *
+     * @param httpServiceProvider the http service provider
+     */
+    public AbstractHttpHandlerFactory(HttpServiceProvider httpServiceProvider) {
+        this.httpServiceProvider = httpServiceProvider;
     }
 
     /**
-     * Create a handler for heartbeat api.
+     * Gets http service provider.
+     *
+     * @return the http service provider
      */
-    public abstract Handler newHeartbeatHandler();
+    public HttpServiceProvider getHttpServiceProvider() {
+        return httpServiceProvider;
+    }
 
     /**
-     * Create a handler for server configuration api.
+     * Create a handler for the given ApiType.
+     *
+     * @param type the api type
+     * @return the handler
      */
-    public abstract Handler newConfigurationHandler();
+    public abstract Handler newHandler(HttpServer.ApiType type);
 
 }
diff --git a/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/HttpRouter.java b/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/HttpRouter.java
index b506cc7..531e86a 100644
--- a/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/HttpRouter.java
+++ b/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/HttpRouter.java
@@ -31,13 +31,56 @@ public abstract class HttpRouter<Handler> {
 
   // Define endpoints here.
   public static final String HEARTBEAT                    = "/heartbeat";
-  public static final String SERVER_CONFIG                = "/api/config/serverConfig";
+  public static final String SERVER_CONFIG                = "/api/v1/config/server_config";
+
+  // ledger
+  public static final String DELETE_LEDGER                = "/api/v1/ledger/delete";
+  public static final String LIST_LEDGER                  = "/api/v1/ledger/list";
+  public static final String GET_LEDGER_META              = "/api/v1/ledger/metadata";
+  public static final String READ_LEDGER_ENTRY            = "/api/v1/ledger/read";
+  // bookie
+  public static final String LIST_BOOKIES                 = "/api/v1/bookie/list_bookies";
+  public static final String LIST_BOOKIE_INFO             = "/api/v1/bookie/list_bookie_info";
+  public static final String LAST_LOG_MARK                = "/api/v1/bookie/last_log_mark";
+  public static final String LIST_DISK_FILE               = "/api/v1/bookie/list_disk_file";
+  public static final String EXPAND_STORAGE               = "/api/v1/bookie/expand_storage";
+  // autorecovery
+  public static final String RECOVERY_BOOKIE              = "/api/v1/autorecovery/bookie";
+  public static final String LIST_UNDER_REPLICATED_LEDGER = "/api/v1/autorecovery/list_under_replicated_ledger";
+  public static final String WHO_IS_AUDITOR               = "/api/v1/autorecovery/who_is_auditor";
+  public static final String TRIGGER_AUDIT                = "/api/v1/autorecovery/trigger_audit";
+  public static final String LOST_BOOKIE_RECOVERY_DELAY   = "/api/v1/autorecovery/lost_bookie_recovery_delay";
+  public static final String DECOMMISSION                 = "/api/v1/autorecovery/decommission";
+
 
   private final Map<String, Handler> endpointHandlers = new HashMap<>();
 
-  public HttpRouter(AbstractHandlerFactory<Handler> handlerFactory) {
-    this.endpointHandlers.put(HEARTBEAT, handlerFactory.newHeartbeatHandler());
-    this.endpointHandlers.put(SERVER_CONFIG, handlerFactory.newConfigurationHandler());
+  public HttpRouter(AbstractHttpHandlerFactory<Handler> handlerFactory) {
+    this.endpointHandlers.put(HEARTBEAT, handlerFactory.newHandler(HttpServer.ApiType.HEARTBEAT));
+    this.endpointHandlers.put(SERVER_CONFIG, handlerFactory.newHandler(HttpServer.ApiType.SERVER_CONFIG));
+
+    // ledger
+    this.endpointHandlers.put(DELETE_LEDGER, handlerFactory.newHandler(HttpServer.ApiType.DELETE_LEDGER));
+    this.endpointHandlers.put(LIST_LEDGER, handlerFactory.newHandler(HttpServer.ApiType.LIST_LEDGER));
+    this.endpointHandlers.put(GET_LEDGER_META, handlerFactory.newHandler(HttpServer.ApiType.GET_LEDGER_META));
+    this.endpointHandlers.put(READ_LEDGER_ENTRY, handlerFactory.newHandler(HttpServer.ApiType.READ_LEDGER_ENTRY));
+
+    // bookie
+    this.endpointHandlers.put(LIST_BOOKIES, handlerFactory.newHandler(HttpServer.ApiType.LIST_BOOKIES));
+    this.endpointHandlers.put(LIST_BOOKIE_INFO, handlerFactory.newHandler(HttpServer.ApiType.LIST_BOOKIE_INFO));
+    this.endpointHandlers.put(LAST_LOG_MARK, handlerFactory.newHandler(HttpServer.ApiType.LAST_LOG_MARK));
+    this.endpointHandlers.put(LIST_DISK_FILE, handlerFactory.newHandler(HttpServer.ApiType.LIST_DISK_FILE));
+    this.endpointHandlers.put(EXPAND_STORAGE, handlerFactory.newHandler(HttpServer.ApiType.EXPAND_STORAGE));
+
+    // autorecovery
+    this.endpointHandlers.put(RECOVERY_BOOKIE, handlerFactory.newHandler(HttpServer.ApiType.RECOVERY_BOOKIE));
+    this.endpointHandlers.put(LIST_UNDER_REPLICATED_LEDGER,
+      handlerFactory.newHandler(HttpServer.ApiType.LIST_UNDER_REPLICATED_LEDGER));
+    this.endpointHandlers.put(WHO_IS_AUDITOR, handlerFactory.newHandler(HttpServer.ApiType.WHO_IS_AUDITOR));
+    this.endpointHandlers.put(TRIGGER_AUDIT, handlerFactory.newHandler(HttpServer.ApiType.TRIGGER_AUDIT));
+    this.endpointHandlers.put(LOST_BOOKIE_RECOVERY_DELAY,
+      handlerFactory.newHandler(HttpServer.ApiType.LOST_BOOKIE_RECOVERY_DELAY));
+    this.endpointHandlers.put(DECOMMISSION, handlerFactory.newHandler(HttpServer.ApiType.DECOMMISSION));
   }
 
   /**
diff --git a/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/HttpServer.java b/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/HttpServer.java
index d077013..d2c4e81 100644
--- a/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/HttpServer.java
+++ b/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/HttpServer.java
@@ -33,6 +33,7 @@ public interface HttpServer {
     enum StatusCode {
         OK(200),
         REDIRECT(302),
+        FORBIDDEN(403),
         NOT_FOUND(404),
         INTERNAL_ERROR(500);
 
@@ -54,13 +55,40 @@ public interface HttpServer {
         GET,
         POST,
         PUT,
-        DELETE;
+        DELETE
+    }
+
+    /**
+     * Http ApiTypes.
+     */
+    enum ApiType {
+        HEARTBEAT,
+        SERVER_CONFIG,
+
+        // ledger
+        DELETE_LEDGER,
+        LIST_LEDGER,
+        GET_LEDGER_META,
+        READ_LEDGER_ENTRY,
+        // bookie
+        LIST_BOOKIES,
+        LIST_BOOKIE_INFO,
+        LAST_LOG_MARK,
+        LIST_DISK_FILE,
+        EXPAND_STORAGE,
+        // autorecovery
+        RECOVERY_BOOKIE,
+        LIST_UNDER_REPLICATED_LEDGER,
+        WHO_IS_AUDITOR,
+        TRIGGER_AUDIT,
+        LOST_BOOKIE_RECOVERY_DELAY,
+        DECOMMISSION
     }
 
     /**
      * Initialize the HTTP server with underline service provider.
      */
-    void initialize(ServiceProvider serviceProvider);
+    void initialize(HttpServiceProvider httpServiceProvider);
 
     /**
      * Start the HTTP server on given port.
diff --git a/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/ServiceProvider.java b/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/HttpServiceProvider.java
similarity index 73%
rename from bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/ServiceProvider.java
rename to bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/HttpServiceProvider.java
index af8e980..5d1ceca 100644
--- a/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/ServiceProvider.java
+++ b/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/HttpServiceProvider.java
@@ -20,21 +20,20 @@
  */
 package org.apache.bookkeeper.http;
 
-import org.apache.bookkeeper.http.service.Service;
+import java.io.Closeable;
+import org.apache.bookkeeper.http.service.HttpEndpointService;
 
 /**
  * Provider to provide services for different http endpoint handlers.
  */
-public interface ServiceProvider {
+public interface HttpServiceProvider extends Closeable{
 
     /**
-     * Provide heartbeat service for heartbeat api.
+     * Provide http endpoint service.
+     *
+     * @param type the service type
+     * @return the http endpoint service
      */
-    Service provideHeartbeatService();
-
-    /**
-     * Provide service for configuration api.
-     */
-    Service provideConfigurationService();
+    HttpEndpointService provideHttpEndpointService(HttpServer.ApiType type);
 
 }
diff --git a/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/NullServiceProvider.java b/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/NullHttpServiceProvider.java
similarity index 53%
rename from bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/NullServiceProvider.java
rename to bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/NullHttpServiceProvider.java
index de36b2d..4bff10e 100644
--- a/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/NullServiceProvider.java
+++ b/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/NullHttpServiceProvider.java
@@ -20,30 +20,33 @@
  */
 package org.apache.bookkeeper.http;
 
+import java.io.IOException;
 import org.apache.bookkeeper.http.service.HeartbeatService;
-import org.apache.bookkeeper.http.service.NullService;
-import org.apache.bookkeeper.http.service.Service;
+import org.apache.bookkeeper.http.service.HttpEndpointService;
+import org.apache.bookkeeper.http.service.NullHttpService;
 
 /**
- * Service provider which provide service that do nothing.
+ * HttpEndpointService provider which provide service that do nothing.
  */
-public class NullServiceProvider implements ServiceProvider {
+public class NullHttpServiceProvider implements HttpServiceProvider {
 
-    private static final NullServiceProvider NULL_SERVICE_PROVIDER = new NullServiceProvider();
+    private static final NullHttpServiceProvider NULL_HTTP_SERVICE_PROVIDER = new NullHttpServiceProvider();
 
-    static final Service NULL_SERVICE = new NullService();
+    static final HttpEndpointService NULL_HTTP_SERVICE = new NullHttpService();
 
-    @Override
-    public Service provideHeartbeatService() {
-        return new HeartbeatService();
+    public static NullHttpServiceProvider getInstance() {
+        return NULL_HTTP_SERVICE_PROVIDER;
     }
 
     @Override
-    public Service provideConfigurationService() {
-        return NULL_SERVICE;
+    public HttpEndpointService provideHttpEndpointService(HttpServer.ApiType type) {
+        if (type == HttpServer.ApiType.HEARTBEAT) {
+            return new HeartbeatService();
+        }
+        return NULL_HTTP_SERVICE;
     }
 
-    public static NullServiceProvider getInstance() {
-        return NULL_SERVICE_PROVIDER;
+    @Override
+    public void close() throws IOException {
     }
 }
diff --git a/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/ErrorService.java b/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/ErrorHttpService.java
similarity index 75%
rename from bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/ErrorService.java
rename to bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/ErrorHttpService.java
index f655fc6..30bdff7 100644
--- a/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/ErrorService.java
+++ b/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/ErrorHttpService.java
@@ -23,20 +23,20 @@ package org.apache.bookkeeper.http.service;
 import org.apache.bookkeeper.http.HttpServer;
 
 /**
- * Service that return internal server error.
+ * HttpEndpointService that return internal server error.
  */
-public class ErrorService implements Service {
+public class ErrorHttpService implements HttpEndpointService {
 
     private HttpServer.StatusCode statusCode = HttpServer.StatusCode.INTERNAL_ERROR;
 
-    public ErrorService() {}
+    public ErrorHttpService() {}
 
-    public ErrorService(HttpServer.StatusCode statusCode) {
+    public ErrorHttpService(HttpServer.StatusCode statusCode) {
         this.statusCode = statusCode;
     }
 
     @Override
-    public ServiceResponse handle(ServiceRequest request) {
-        return new ServiceResponse().setCode(statusCode);
+    public HttpServiceResponse handle(HttpServiceRequest request) {
+        return new HttpServiceResponse().setCode(statusCode);
     }
 }
diff --git a/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/HeartbeatService.java b/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/HeartbeatService.java
index c64b53e..bb13497 100644
--- a/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/HeartbeatService.java
+++ b/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/HeartbeatService.java
@@ -23,14 +23,14 @@ package org.apache.bookkeeper.http.service;
 import org.apache.bookkeeper.http.HttpServer;
 
 /**
- * Service that serve heartbeat request.
+ * HttpEndpointService that serve heartbeat request.
  */
-public class HeartbeatService implements Service {
+public class HeartbeatService implements HttpEndpointService {
 
     public static final String HEARTBEAT = "OK\n";
 
     @Override
-    public ServiceResponse handle(ServiceRequest request) {
-        return new ServiceResponse(HEARTBEAT, HttpServer.StatusCode.OK);
+    public HttpServiceResponse handle(HttpServiceRequest request) {
+        return new HttpServiceResponse(HEARTBEAT, HttpServer.StatusCode.OK);
     }
 }
diff --git a/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/Service.java b/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/HttpEndpointService.java
similarity index 82%
rename from bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/Service.java
rename to bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/HttpEndpointService.java
index 503ad4c..20f45c2 100644
--- a/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/Service.java
+++ b/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/HttpEndpointService.java
@@ -21,10 +21,10 @@
 package org.apache.bookkeeper.http.service;
 
 /**
- * Service provides the abstraction for how to handle the logic
+ * HttpEndpointService provides the abstraction for how to handle the logic
  * for a specific endpoint.
  */
-public interface Service {
+public interface HttpEndpointService {
 
-    ServiceResponse handle(ServiceRequest request) throws Exception;
+    HttpServiceResponse handle(HttpServiceRequest request) throws Exception;
 }
diff --git a/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/ServiceRequest.java b/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/HttpServiceRequest.java
similarity index 82%
rename from bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/ServiceRequest.java
rename to bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/HttpServiceRequest.java
index 733f1ee..cce7981 100644
--- a/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/ServiceRequest.java
+++ b/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/HttpServiceRequest.java
@@ -29,14 +29,14 @@ import org.apache.bookkeeper.http.HttpServer;
  * A wrapper class that wrap a http request into a class which
  * can then be passed into the service.
  */
-public class ServiceRequest {
+public class HttpServiceRequest {
     private String body;
     private HttpServer.Method method = HttpServer.Method.GET;
     private Map<String, String> params = new HashMap<>();
 
-    public ServiceRequest() {}
+    public HttpServiceRequest() {}
 
-    public ServiceRequest(String body, HttpServer.Method method, Map<String, String> params) {
+    public HttpServiceRequest(String body, HttpServer.Method method, Map<String, String> params) {
         this.body = body;
         this.method = method;
         this.params = params;
@@ -46,7 +46,7 @@ public class ServiceRequest {
         return body;
     }
 
-    public ServiceRequest setBody(String body) {
+    public HttpServiceRequest setBody(String body) {
         this.body = body;
         return this;
     }
@@ -55,7 +55,7 @@ public class ServiceRequest {
         return method;
     }
 
-    public ServiceRequest setMethod(HttpServer.Method method) {
+    public HttpServiceRequest setMethod(HttpServer.Method method) {
         this.method = method;
         return this;
     }
@@ -64,7 +64,7 @@ public class ServiceRequest {
         return params;
     }
 
-    public ServiceRequest setParams(Map<String, String> params) {
+    public HttpServiceRequest setParams(Map<String, String> params) {
         this.params = params;
         return this;
     }
diff --git a/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/ServiceResponse.java b/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/HttpServiceResponse.java
similarity index 84%
rename from bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/ServiceResponse.java
rename to bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/HttpServiceResponse.java
index c30981b..9224d1e 100644
--- a/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/ServiceResponse.java
+++ b/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/HttpServiceResponse.java
@@ -26,13 +26,13 @@ import org.apache.bookkeeper.http.HttpServer;
  * A wrapper class that wrap the result from service, which
  * can then be parsed into http response.
  */
-public class ServiceResponse {
+public class HttpServiceResponse {
     private String body;
     private HttpServer.StatusCode code = HttpServer.StatusCode.OK;
 
-    public ServiceResponse() {}
+    public HttpServiceResponse() {}
 
-    public ServiceResponse(String body, HttpServer.StatusCode code) {
+    public HttpServiceResponse(String body, HttpServer.StatusCode code) {
         this.body = body;
         this.code = code;
     }
@@ -45,12 +45,12 @@ public class ServiceResponse {
         return code.getValue();
     }
 
-    public ServiceResponse setBody(String body) {
+    public HttpServiceResponse setBody(String body) {
         this.body = body;
         return this;
     }
 
-    public ServiceResponse setCode(HttpServer.StatusCode code) {
+    public HttpServiceResponse setCode(HttpServer.StatusCode code) {
         this.code = code;
         return this;
     }
diff --git a/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/NullService.java b/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/NullHttpService.java
similarity index 80%
rename from bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/NullService.java
rename to bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/NullHttpService.java
index b0c1cab..13c607d 100644
--- a/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/NullService.java
+++ b/bookkeeper-http/http-server/src/main/java/org/apache/bookkeeper/http/service/NullHttpService.java
@@ -21,11 +21,11 @@
 package org.apache.bookkeeper.http.service;
 
 /**
- * Service that return empty content.
+ * HttpEndpointService that return empty content.
  */
-public class NullService implements Service {
+public class NullHttpService implements HttpEndpointService {
     @Override
-    public ServiceResponse handle(ServiceRequest request) {
-        return new ServiceResponse();
+    public HttpServiceResponse handle(HttpServiceRequest request) {
+        return new HttpServiceResponse();
     }
 }
diff --git a/bookkeeper-http/twitter-http-server/src/main/java/org/apache/bookkeeper/http/twitter/TwitterAbstractHandler.java b/bookkeeper-http/twitter-http-server/src/main/java/org/apache/bookkeeper/http/twitter/TwitterAbstractHandler.java
index c8b1c61..87e48df 100644
--- a/bookkeeper-http/twitter-http-server/src/main/java/org/apache/bookkeeper/http/twitter/TwitterAbstractHandler.java
+++ b/bookkeeper-http/twitter-http-server/src/main/java/org/apache/bookkeeper/http/twitter/TwitterAbstractHandler.java
@@ -29,9 +29,10 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.bookkeeper.http.HttpServer;
-import org.apache.bookkeeper.http.service.ErrorService;
-import org.apache.bookkeeper.http.service.ServiceRequest;
-import org.apache.bookkeeper.http.service.ServiceResponse;
+import org.apache.bookkeeper.http.service.ErrorHttpService;
+import org.apache.bookkeeper.http.service.HttpEndpointService;
+import org.apache.bookkeeper.http.service.HttpServiceRequest;
+import org.apache.bookkeeper.http.service.HttpServiceResponse;
 
 /**
  * Http handler for TwitterServer.
@@ -39,22 +40,22 @@ import org.apache.bookkeeper.http.service.ServiceResponse;
 public abstract class TwitterAbstractHandler extends Service<Request, Response> {
 
     /**
-     * Process the request using the given service.
+     * Process the request using the given httpEndpointService.
      */
-    Future<Response> processRequest(org.apache.bookkeeper.http.service.Service service, Request request) {
-        ServiceRequest serviceRequest = new ServiceRequest()
+    Future<Response> processRequest(HttpEndpointService httpEndpointService, Request request) {
+        HttpServiceRequest httpServiceRequest = new HttpServiceRequest()
             .setMethod(convertMethod(request))
             .setParams(convertParams(request))
             .setBody(request.contentString());
-        ServiceResponse serviceResponse = null;
+        HttpServiceResponse httpServiceResponse = null;
         try {
-            serviceResponse = service.handle(serviceRequest);
+            httpServiceResponse = httpEndpointService.handle(httpServiceRequest);
         } catch (Exception e) {
-            serviceResponse = new ErrorService().handle(serviceRequest);
+            httpServiceResponse = new ErrorHttpService().handle(httpServiceRequest);
         }
         Response response = Response.apply();
-        response.setContentString(serviceResponse.getBody());
-        response.statusCode(serviceResponse.getStatusCode());
+        response.setContentString(httpServiceResponse.getBody());
+        response.statusCode(httpServiceResponse.getStatusCode());
         return Future.value(response);
     }
 
diff --git a/bookkeeper-http/twitter-http-server/src/main/java/org/apache/bookkeeper/http/twitter/TwitterHandlerFactory.java b/bookkeeper-http/twitter-http-server/src/main/java/org/apache/bookkeeper/http/twitter/TwitterHttpHandlerFactory.java
similarity index 61%
rename from bookkeeper-http/twitter-http-server/src/main/java/org/apache/bookkeeper/http/twitter/TwitterHandlerFactory.java
rename to bookkeeper-http/twitter-http-server/src/main/java/org/apache/bookkeeper/http/twitter/TwitterHttpHandlerFactory.java
index e97725d..569235b 100644
--- a/bookkeeper-http/twitter-http-server/src/main/java/org/apache/bookkeeper/http/twitter/TwitterHandlerFactory.java
+++ b/bookkeeper-http/twitter-http-server/src/main/java/org/apache/bookkeeper/http/twitter/TwitterHttpHandlerFactory.java
@@ -23,38 +23,30 @@ package org.apache.bookkeeper.http.twitter;
 import com.twitter.finagle.http.Request;
 import com.twitter.finagle.http.Response;
 import com.twitter.util.Future;
-
-import org.apache.bookkeeper.http.AbstractHandlerFactory;
-import org.apache.bookkeeper.http.ServiceProvider;
-
+import org.apache.bookkeeper.http.AbstractHttpHandlerFactory;
+import org.apache.bookkeeper.http.HttpServer;
+import org.apache.bookkeeper.http.HttpServiceProvider;
+import org.apache.bookkeeper.http.service.HttpEndpointService;
 
 
 /**
  * Factory which provide http handlers for TwitterServer based Http Server.
  */
-public class TwitterHandlerFactory extends AbstractHandlerFactory<TwitterAbstractHandler> {
+public class TwitterHttpHandlerFactory extends AbstractHttpHandlerFactory<TwitterAbstractHandler> {
 
-    public TwitterHandlerFactory(ServiceProvider serviceProvider) {
-        super(serviceProvider);
+    public TwitterHttpHandlerFactory(HttpServiceProvider httpServiceProvider) {
+        super(httpServiceProvider);
     }
 
-    @Override
-    public TwitterAbstractHandler newHeartbeatHandler() {
-        return new TwitterAbstractHandler() {
-            @Override
-            public Future<Response> apply(Request request) {
-                return processRequest(getServiceProvider().provideHeartbeatService(), request);
-            }
-        };
-    }
 
-    public TwitterAbstractHandler newConfigurationHandler() {
+    @Override
+    public TwitterAbstractHandler newHandler(HttpServer.ApiType type) {
         return new TwitterAbstractHandler() {
             @Override
             public Future<Response> apply(Request request) {
-                return processRequest(getServiceProvider().provideConfigurationService(), request);
+                HttpEndpointService service = getHttpServiceProvider().provideHttpEndpointService(type);
+                return processRequest(service, request);
             }
         };
     }
-
 }
diff --git a/bookkeeper-http/twitter-http-server/src/main/java/org/apache/bookkeeper/http/twitter/TwitterHttpServer.java b/bookkeeper-http/twitter-http-server/src/main/java/org/apache/bookkeeper/http/twitter/TwitterHttpServer.java
index 6d80104..ad42f46 100644
--- a/bookkeeper-http/twitter-http-server/src/main/java/org/apache/bookkeeper/http/twitter/TwitterHttpServer.java
+++ b/bookkeeper-http/twitter-http-server/src/main/java/org/apache/bookkeeper/http/twitter/TwitterHttpServer.java
@@ -27,11 +27,12 @@ import com.twitter.finagle.http.HttpMuxer;
 import com.twitter.finagle.http.HttpMuxer$;
 import com.twitter.server.AbstractTwitterServer;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 
 import org.apache.bookkeeper.http.HttpRouter;
 import org.apache.bookkeeper.http.HttpServer;
-import org.apache.bookkeeper.http.ServiceProvider;
+import org.apache.bookkeeper.http.HttpServiceProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,11 +46,11 @@ public class TwitterHttpServer extends AbstractTwitterServer implements HttpServ
     private ListeningServer server;
     private boolean isRunning;
     private int port;
-    private ServiceProvider serviceProvider;
+    private HttpServiceProvider httpServiceProvider;
 
     @Override
-    public void initialize(ServiceProvider serviceProvider) {
-        this.serviceProvider = serviceProvider;
+    public void initialize(HttpServiceProvider httpServiceProvider) {
+        this.httpServiceProvider = httpServiceProvider;
     }
 
     @Override
@@ -68,6 +69,11 @@ public class TwitterHttpServer extends AbstractTwitterServer implements HttpServ
 
     @Override
     public void stopServer() {
+        try {
+            httpServiceProvider.close();
+        } catch (IOException ioe) {
+            LOG.error("Error while close httpServiceProvider", ioe);
+        }
         if (server != null) {
             server.close();
             isRunning = false;
@@ -82,7 +88,7 @@ public class TwitterHttpServer extends AbstractTwitterServer implements HttpServ
     @Override
     public void main() throws Throwable {
         LOG.info("Starting Twitter HTTP server on port {}", port);
-        TwitterHandlerFactory handlerFactory = new TwitterHandlerFactory(serviceProvider);
+        TwitterHttpHandlerFactory handlerFactory = new TwitterHttpHandlerFactory(httpServiceProvider);
         HttpRouter<TwitterAbstractHandler> requestRouter = new HttpRouter<TwitterAbstractHandler>(handlerFactory) {
             @Override
             public void bindHandler(String endpoint, TwitterAbstractHandler handler) {
diff --git a/bookkeeper-http/twitter-http-server/src/test/java/org/apache/bookkeeper/http/twitter/TestTwitterHttpServer.java b/bookkeeper-http/twitter-http-server/src/test/java/org/apache/bookkeeper/http/twitter/TestTwitterHttpServer.java
index d7598db..8487d65 100644
--- a/bookkeeper-http/twitter-http-server/src/test/java/org/apache/bookkeeper/http/twitter/TestTwitterHttpServer.java
+++ b/bookkeeper-http/twitter-http-server/src/test/java/org/apache/bookkeeper/http/twitter/TestTwitterHttpServer.java
@@ -28,8 +28,8 @@ import java.net.URL;
 
 import org.apache.bookkeeper.http.HttpRouter;
 import org.apache.bookkeeper.http.HttpServer;
-import org.apache.bookkeeper.http.NullServiceProvider;
-import org.apache.bookkeeper.http.ServiceProvider;
+import org.apache.bookkeeper.http.NullHttpServiceProvider;
+import org.apache.bookkeeper.http.HttpServiceProvider;
 import org.apache.bookkeeper.http.service.HeartbeatService;
 import org.junit.Test;
 
@@ -42,8 +42,8 @@ public class TestTwitterHttpServer {
     @Test
     public void testStartBasicHttpServer() throws Exception {
         TwitterHttpServer httpServer = new TwitterHttpServer();
-        ServiceProvider serviceProvider = NullServiceProvider.getInstance();
-        httpServer.initialize(serviceProvider);
+        HttpServiceProvider httpServiceProvider = NullHttpServiceProvider.getInstance();
+        httpServer.initialize(httpServiceProvider);
         int port = getNextPort();
         while (!httpServer.startServer(port)) {
             httpServer.stopServer();
diff --git a/bookkeeper-http/vertx-http-server/src/main/java/org/apache/bookkeeper/http/vertx/VertxAbstractHandler.java b/bookkeeper-http/vertx-http-server/src/main/java/org/apache/bookkeeper/http/vertx/VertxAbstractHandler.java
index 8aa2c60..c433376 100644
--- a/bookkeeper-http/vertx-http-server/src/main/java/org/apache/bookkeeper/http/vertx/VertxAbstractHandler.java
+++ b/bookkeeper-http/vertx-http-server/src/main/java/org/apache/bookkeeper/http/vertx/VertxAbstractHandler.java
@@ -30,10 +30,10 @@ import java.util.Iterator;
 import java.util.Map;
 
 import org.apache.bookkeeper.http.HttpServer;
-import org.apache.bookkeeper.http.service.ErrorService;
-import org.apache.bookkeeper.http.service.Service;
-import org.apache.bookkeeper.http.service.ServiceRequest;
-import org.apache.bookkeeper.http.service.ServiceResponse;
+import org.apache.bookkeeper.http.service.ErrorHttpService;
+import org.apache.bookkeeper.http.service.HttpEndpointService;
+import org.apache.bookkeeper.http.service.HttpServiceRequest;
+import org.apache.bookkeeper.http.service.HttpServiceResponse;
 
 /**
  * Http Handler for Vertx based Http Server.
@@ -41,20 +41,20 @@ import org.apache.bookkeeper.http.service.ServiceResponse;
 public abstract class VertxAbstractHandler implements Handler<RoutingContext> {
 
     /**
-     * Process the request using the given service.
+     * Process the request using the given httpEndpointService.
      */
-    void processRequest(Service service, RoutingContext context) {
+    void processRequest(HttpEndpointService httpEndpointService, RoutingContext context) {
         HttpServerRequest httpRequest = context.request();
         HttpServerResponse httpResponse = context.response();
-        ServiceRequest request = new ServiceRequest()
+        HttpServiceRequest request = new HttpServiceRequest()
             .setMethod(convertMethod(httpRequest))
             .setParams(convertParams(httpRequest))
             .setBody(context.getBodyAsString());
-        ServiceResponse response = null;
+        HttpServiceResponse response = null;
         try {
-            response = service.handle(request);
+            response = httpEndpointService.handle(request);
         } catch (Exception e) {
-            response = new ErrorService().handle(request);
+            response = new ErrorHttpService().handle(request);
         }
         httpResponse.setStatusCode(response.getStatusCode());
         httpResponse.end(response.getBody());
diff --git a/bookkeeper-http/vertx-http-server/src/main/java/org/apache/bookkeeper/http/vertx/VertxHandlerFactory.java b/bookkeeper-http/vertx-http-server/src/main/java/org/apache/bookkeeper/http/vertx/VertxHttpHandlerFactory.java
similarity index 60%
rename from bookkeeper-http/vertx-http-server/src/main/java/org/apache/bookkeeper/http/vertx/VertxHandlerFactory.java
rename to bookkeeper-http/vertx-http-server/src/main/java/org/apache/bookkeeper/http/vertx/VertxHttpHandlerFactory.java
index d4d22d6..f8f0bf0 100644
--- a/bookkeeper-http/vertx-http-server/src/main/java/org/apache/bookkeeper/http/vertx/VertxHandlerFactory.java
+++ b/bookkeeper-http/vertx-http-server/src/main/java/org/apache/bookkeeper/http/vertx/VertxHttpHandlerFactory.java
@@ -23,35 +23,28 @@ package org.apache.bookkeeper.http.vertx;
 
 import io.vertx.ext.web.RoutingContext;
 
-import org.apache.bookkeeper.http.AbstractHandlerFactory;
-import org.apache.bookkeeper.http.ServiceProvider;
+import org.apache.bookkeeper.http.AbstractHttpHandlerFactory;
+import org.apache.bookkeeper.http.HttpServer;
+import org.apache.bookkeeper.http.HttpServiceProvider;
+import org.apache.bookkeeper.http.service.HttpEndpointService;
 
 /**
  * Factory which provide http handlers for Vertx based Http Server.
  */
-public class VertxHandlerFactory extends AbstractHandlerFactory<VertxAbstractHandler> {
+public class VertxHttpHandlerFactory extends AbstractHttpHandlerFactory<VertxAbstractHandler> {
 
 
-    public VertxHandlerFactory(ServiceProvider serviceProvider) {
-        super(serviceProvider);
+    public VertxHttpHandlerFactory(HttpServiceProvider httpServiceProvider) {
+        super(httpServiceProvider);
     }
 
     @Override
-    public VertxAbstractHandler newHeartbeatHandler() {
+    public VertxAbstractHandler newHandler(HttpServer.ApiType type) {
         return new VertxAbstractHandler() {
             @Override
             public void handle(RoutingContext context) {
-                processRequest(getServiceProvider().provideHeartbeatService(), context);
-            }
-        };
-    }
-
-    @Override
-    public VertxAbstractHandler newConfigurationHandler() {
-        return new VertxAbstractHandler() {
-            @Override
-            public void handle(RoutingContext context) {
-                processRequest(getServiceProvider().provideConfigurationService(), context);
+                HttpEndpointService service = getHttpServiceProvider().provideHttpEndpointService(type);
+                processRequest(service, context);
             }
         };
     }
diff --git a/bookkeeper-http/vertx-http-server/src/main/java/org/apache/bookkeeper/http/vertx/VertxHttpServer.java b/bookkeeper-http/vertx-http-server/src/main/java/org/apache/bookkeeper/http/vertx/VertxHttpServer.java
index 16016fb..635d24c 100644
--- a/bookkeeper-http/vertx-http-server/src/main/java/org/apache/bookkeeper/http/vertx/VertxHttpServer.java
+++ b/bookkeeper-http/vertx-http-server/src/main/java/org/apache/bookkeeper/http/vertx/VertxHttpServer.java
@@ -25,13 +25,14 @@ import io.vertx.core.AsyncResult;
 import io.vertx.core.Vertx;
 import io.vertx.ext.web.Router;
 
+import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.bookkeeper.http.HttpRouter;
 import org.apache.bookkeeper.http.HttpServer;
-import org.apache.bookkeeper.http.ServiceProvider;
+import org.apache.bookkeeper.http.HttpServiceProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,21 +45,21 @@ public class VertxHttpServer implements HttpServer {
 
     private Vertx vertx;
     private boolean isRunning;
-    private ServiceProvider serviceProvider;
+    private HttpServiceProvider httpServiceProvider;
 
     public VertxHttpServer() {
         this.vertx = Vertx.vertx();
     }
 
     @Override
-    public void initialize(ServiceProvider serviceProvider) {
-        this.serviceProvider = serviceProvider;
+    public void initialize(HttpServiceProvider httpServiceProvider) {
+        this.httpServiceProvider = httpServiceProvider;
     }
 
     @Override
     public boolean startServer(int port) {
         CompletableFuture<AsyncResult> future = new CompletableFuture<>();
-        VertxHandlerFactory handlerFactory = new VertxHandlerFactory(serviceProvider);
+        VertxHttpHandlerFactory handlerFactory = new VertxHttpHandlerFactory(httpServiceProvider);
         Router router = Router.router(vertx);
         HttpRouter<VertxAbstractHandler> requestRouter = new HttpRouter<VertxAbstractHandler>(handlerFactory) {
             @Override
@@ -92,6 +93,11 @@ public class VertxHttpServer implements HttpServer {
     @Override
     public void stopServer() {
         CountDownLatch shutdownLatch = new CountDownLatch(1);
+        try {
+            httpServiceProvider.close();
+        } catch (IOException ioe) {
+            LOG.error("Error while close httpServiceProvider", ioe);
+        }
         vertx.close(asyncResult -> {
             isRunning = false;
             shutdownLatch.countDown();
diff --git a/bookkeeper-http/vertx-http-server/src/test/java/org/apache/bookkeeper/http/vertx/TestVertxHttpServer.java b/bookkeeper-http/vertx-http-server/src/test/java/org/apache/bookkeeper/http/vertx/TestVertxHttpServer.java
index 4a0644a..63e4517 100644
--- a/bookkeeper-http/vertx-http-server/src/test/java/org/apache/bookkeeper/http/vertx/TestVertxHttpServer.java
+++ b/bookkeeper-http/vertx-http-server/src/test/java/org/apache/bookkeeper/http/vertx/TestVertxHttpServer.java
@@ -28,8 +28,8 @@ import java.net.URL;
 
 import org.apache.bookkeeper.http.HttpRouter;
 import org.apache.bookkeeper.http.HttpServer;
-import org.apache.bookkeeper.http.NullServiceProvider;
-import org.apache.bookkeeper.http.ServiceProvider;
+import org.apache.bookkeeper.http.NullHttpServiceProvider;
+import org.apache.bookkeeper.http.HttpServiceProvider;
 import org.apache.bookkeeper.http.service.HeartbeatService;
 import org.junit.Test;
 
@@ -42,8 +42,8 @@ public class TestVertxHttpServer {
     @Test
     public void testStartBasicHttpServer() throws Exception {
         VertxHttpServer httpServer = new VertxHttpServer();
-        ServiceProvider serviceProvider = NullServiceProvider.getInstance();
-        httpServer.initialize(serviceProvider);
+        HttpServiceProvider httpServiceProvider = NullHttpServiceProvider.getInstance();
+        httpServer.initialize(httpServiceProvider);
         int port = getNextPort();
         while (!httpServer.startServer(port)) {
             httpServer.stopServer();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 36cd832..f4e943d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -2085,7 +2085,7 @@ public class BookieShell implements Tool {
      * @param extensions - the file extensions, which we are interested in
      * @return sorted list of files
      */
-    private static List<File> listFilesAndSort(File[] folderNames, String... extensions) {
+    public static List<File> listFilesAndSort(File[] folderNames, String... extensions) {
         List<File> completeFilesList = new ArrayList<File>();
         for (int i = 0; i < folderNames.length; i++) {
             Collection<File> filesCollection = FileUtils.listFiles(folderNames[i], extensions, true);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java
index be475be..ec7793c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java
@@ -68,7 +68,7 @@ import org.slf4j.LoggerFactory;
  * but the bookie would be up, so the client would think that everything is ok
  * with the cluster. It's better to fail early and obviously.
  */
-class Cookie {
+public class Cookie {
     private static final Logger LOG = LoggerFactory.getLogger(Cookie.class);
 
     static final int CURRENT_COOKIE_LAYOUT_VERSION = 4;
@@ -215,7 +215,7 @@ class Cookie {
         return cBuilder;
     }
 
-    void writeToDirectory(File directory) throws IOException {
+    public void writeToDirectory(File directory) throws IOException {
         File versionFile = new File(directory,
                 BookKeeperConstants.VERSION_FILENAME);
 
@@ -242,7 +242,7 @@ class Cookie {
      * @throws InterruptedException
      * @throws UnknownHostException
      */
-    void writeToZooKeeper(ZooKeeper zk, ServerConfiguration conf, Version version)
+    public void writeToZooKeeper(ZooKeeper zk, ServerConfiguration conf, Version version)
             throws KeeperException, InterruptedException, UnknownHostException {
         List<ACL> zkAcls = ZkUtils.getACLs(conf);
         String bookieCookiePath = conf.getZkLedgersRootPath() + "/"
@@ -336,7 +336,7 @@ class Cookie {
      * @throws IOException
      * @throws UnknownHostException
      */
-    static Versioned<Cookie> readFromZooKeeper(ZooKeeper zk, ServerConfiguration conf)
+    public static Versioned<Cookie> readFromZooKeeper(ZooKeeper zk, ServerConfiguration conf)
             throws KeeperException, InterruptedException, IOException, UnknownHostException {
         return readFromZooKeeper(zk, conf, Bookie.getBookieAddress(conf));
     }
@@ -353,7 +353,7 @@ class Cookie {
      * @throws IOException
      * @throws UnknownHostException
      */
-    static Versioned<Cookie> readFromZooKeeper(ZooKeeper zk, AbstractConfiguration conf, BookieSocketAddress address)
+    public static Versioned<Cookie> readFromZooKeeper(ZooKeeper zk, AbstractConfiguration conf, BookieSocketAddress address)
             throws KeeperException, InterruptedException, IOException, UnknownHostException {
         String zkPath = getZkPath(conf, address);
 
@@ -378,7 +378,7 @@ class Cookie {
      * @return cookie object
      * @throws IOException
      */
-    static Cookie readFromDirectory(File directory) throws IOException {
+    public static Cookie readFromDirectory(File directory) throws IOException {
         File versionFile = new File(directory, BookKeeperConstants.VERSION_FILENAME);
         BufferedReader reader = new BufferedReader(
                 new InputStreamReader(new FileInputStream(versionFile), UTF_8));
@@ -495,7 +495,7 @@ class Cookie {
      *
      * @return cookie builder
      */
-    static Builder newBuilder() {
+    public static Builder newBuilder() {
         return new Builder();
     }
 
@@ -505,7 +505,7 @@ class Cookie {
      * @param oldCookie build new cookie from this cookie
      * @return cookie builder
      */
-    static Builder newBuilder(Cookie oldCookie) {
+    public static Builder newBuilder(Cookie oldCookie) {
         return new Builder(oldCookie.layoutVersion, oldCookie.bookieHost, oldCookie.journalDirs, oldCookie.ledgerDirs,
                 oldCookie.instanceId);
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index e2b1e79..1f8aef4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -54,7 +54,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Provide journal related management.
  */
-class Journal extends BookieCriticalThread implements CheckpointSource {
+public class Journal extends BookieCriticalThread implements CheckpointSource {
 
     private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
 
@@ -140,7 +140,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
     /**
      * Last Log Mark.
      */
-    class LastLogMark {
+    public class LastLogMark {
         private final LogMark curMark;
 
         LastLogMark(long logId, long logPosition) {
@@ -155,7 +155,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
             return new LastLogMark(curMark.getLogFileId(), curMark.getLogFileOffset());
         }
 
-        LogMark getCurMark() {
+        public LogMark getCurMark() {
             return curMark;
         }
 
@@ -587,7 +587,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
         return journalDirectory;
     }
 
-    LastLogMark getLastLogMark() {
+    public LastLogMark getLastLogMark() {
         return lastLogMark;
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java
index adae5b2..01055fb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java
@@ -24,7 +24,7 @@ import java.nio.ByteBuffer;
 /**
  * Journal stream position.
  */
-class LogMark {
+public class LogMark {
     long logFileId;
     long logFileOffset;
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 7c81d6d..8e934ad 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -227,7 +227,7 @@ public class LedgerHandle implements AutoCloseable {
      *
      * @return LedgerMetadata for the LedgerHandle
      */
-    LedgerMetadata getLedgerMetadata() {
+    public LedgerMetadata getLedgerMetadata() {
         return metadata;
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/BKHttpServiceProvider.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/BKHttpServiceProvider.java
new file mode 100644
index 0000000..b31b5e7
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/BKHttpServiceProvider.java
@@ -0,0 +1,195 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.http;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.http.service.ErrorHttpService;
+import org.apache.bookkeeper.http.service.HeartbeatService;
+import org.apache.bookkeeper.http.service.HttpEndpointService;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.replication.Auditor;
+import org.apache.bookkeeper.replication.AutoRecoveryMain;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * Bookkeeper based implementation of HttpServiceProvider,
+ * which provide bookkeeper services to handle http requests
+ * from different http endpoints.
+ */
+@Slf4j
+public class BKHttpServiceProvider implements HttpServiceProvider {
+
+    private final BookieServer bookieServer;
+    private final AutoRecoveryMain autoRecovery;
+    private final ServerConfiguration serverConf;
+    private final ZooKeeper zk;
+    private final BookKeeperAdmin bka;
+    private final ExecutorService executor;
+
+
+    private BKHttpServiceProvider(BookieServer bookieServer,
+                                  AutoRecoveryMain autoRecovery,
+                                  ServerConfiguration serverConf)
+        throws IOException, KeeperException, InterruptedException {
+        this.bookieServer = bookieServer;
+        this.autoRecovery = autoRecovery;
+        this.serverConf = serverConf;
+        this.zk = ZooKeeperClient.newBuilder()
+          .connectString(serverConf.getZkServers())
+          .sessionTimeoutMs(serverConf.getZkTimeout())
+          .build();
+
+        ClientConfiguration clientConfiguration = new ClientConfiguration(serverConf)
+          .setZkServers(serverConf.getZkServers());
+        this.bka = new BookKeeperAdmin(clientConfiguration);
+
+        this.executor = Executors.newSingleThreadExecutor(
+          new ThreadFactoryBuilder().setNameFormat("BKHttpServiceThread").setDaemon(true).build());
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            executor.shutdown();
+            if (bka != null) {
+                bka.close();
+            }
+            if (zk != null) {
+                zk.close();
+            }
+        } catch (InterruptedException | BKException e) {
+            log.error("Error while close BKHttpServiceProvider", e);
+            throw new IOException("Error while close BKHttpServiceProvider", e);
+        }
+    }
+
+    private ServerConfiguration getServerConf() {
+        return serverConf;
+    }
+
+    private Auditor getAuditor() {
+        return autoRecovery == null ? null : autoRecovery.getAuditor();
+    }
+
+    private Bookie getBookie() {
+        return bookieServer == null ? null : bookieServer.getBookie();
+    }
+
+    /**
+     * Builder for HttpServiceProvider.
+     */
+    public static class Builder {
+
+        BookieServer bookieServer = null;
+        AutoRecoveryMain autoRecovery = null;
+        ServerConfiguration serverConf = null;
+
+        public Builder setBookieServer(BookieServer bookieServer) {
+            this.bookieServer = bookieServer;
+            return this;
+        }
+
+        public Builder setAutoRecovery(AutoRecoveryMain autoRecovery) {
+            this.autoRecovery = autoRecovery;
+            return this;
+        }
+
+        public Builder setServerConfiguration(ServerConfiguration conf) {
+            this.serverConf = conf;
+            return this;
+        }
+
+        public BKHttpServiceProvider build()
+            throws IOException, KeeperException, InterruptedException {
+            return new BKHttpServiceProvider(
+                bookieServer,
+                autoRecovery,
+                serverConf
+            );
+        }
+    }
+
+    @Override
+    public HttpEndpointService provideHttpEndpointService(HttpServer.ApiType type) {
+        ServerConfiguration configuration = getServerConf();
+        if (configuration == null) {
+            return new ErrorHttpService();
+        }
+
+        switch (type) {
+            case HEARTBEAT:
+                return new HeartbeatService();
+            case SERVER_CONFIG:
+                return new ConfigurationService(configuration);
+
+            // ledger
+            case DELETE_LEDGER:
+                return new DeleteLedgerService(configuration);
+            case LIST_LEDGER:
+                return new ListLedgerService(configuration, zk);
+            case GET_LEDGER_META:
+                return new GetLedgerMetaService(configuration, zk);
+            case READ_LEDGER_ENTRY:
+                return new ReadLedgerEntryService(configuration, bka);
+
+            // bookie
+            case LIST_BOOKIES:
+                return new ListBookiesService(configuration, bka);
+            case LIST_BOOKIE_INFO:
+                return new ListBookieInfoService(configuration);
+            case LAST_LOG_MARK:
+                return new GetLastLogMarkService(configuration);
+            case LIST_DISK_FILE:
+                return new ListDiskFilesService(configuration);
+            case EXPAND_STORAGE:
+                return new ExpandStorageService(configuration, zk);
+
+            // autorecovery
+            case RECOVERY_BOOKIE:
+                return new RecoveryBookieService(configuration, bka, executor);
+            case LIST_UNDER_REPLICATED_LEDGER:
+                return new ListUnderReplicatedLedgerService(configuration, zk);
+            case WHO_IS_AUDITOR:
+                return new WhoIsAuditorService(configuration, zk);
+            case TRIGGER_AUDIT:
+                return new TriggerAuditService(configuration, bka);
+            case LOST_BOOKIE_RECOVERY_DELAY:
+                return new LostBookieRecoveryDelayService(configuration, bka);
+            case DECOMMISSION:
+                return new DecommissionService(configuration, bka, executor);
+
+            default:
+                return new ConfigurationService(configuration);
+        }
+    }
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/BKServiceProvider.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/BKServiceProvider.java
deleted file mode 100644
index a36d73e..0000000
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/BKServiceProvider.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.bookkeeper.http;
-
-import org.apache.bookkeeper.bookie.Bookie;
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.http.service.ErrorService;
-import org.apache.bookkeeper.http.service.HeartbeatService;
-import org.apache.bookkeeper.http.service.Service;
-import org.apache.bookkeeper.proto.BookieServer;
-import org.apache.bookkeeper.replication.Auditor;
-import org.apache.bookkeeper.replication.AutoRecoveryMain;
-
-/**
- * Bookkeeper based implementation of ServiceProvider,
- * which provide bookkeeper services to handle http requests
- * from different http endpoints.
- */
-public class BKServiceProvider implements ServiceProvider {
-
-    private final BookieServer bookieServer;
-    private final AutoRecoveryMain autoRecovery;
-    private final ServerConfiguration serverConf;
-
-    private BKServiceProvider(BookieServer bookieServer,
-                             AutoRecoveryMain autoRecovery,
-                             ServerConfiguration serverConf) {
-        this.bookieServer = bookieServer;
-        this.autoRecovery = autoRecovery;
-        this.serverConf = serverConf;
-    }
-
-    @Override
-    public Service provideHeartbeatService() {
-        return new HeartbeatService();
-    }
-
-    @Override
-    public Service provideConfigurationService() {
-        ServerConfiguration configuration = getServerConf();
-        if (configuration == null) {
-            return new ErrorService();
-        }
-        return new ConfigurationService(configuration);
-    }
-
-    private ServerConfiguration getServerConf() {
-        return serverConf;
-    }
-
-    private Auditor getAuditor() {
-        return autoRecovery == null ? null : autoRecovery.getAuditor();
-    }
-
-    private Bookie getBookie() {
-        return bookieServer == null ? null : bookieServer.getBookie();
-    }
-
-    /**
-     * Builder for ServiceProvider.
-     */
-    public static class Builder {
-
-        BookieServer bookieServer = null;
-        AutoRecoveryMain autoRecovery = null;
-        ServerConfiguration serverConf = null;
-
-        public Builder setBookieServer(BookieServer bookieServer) {
-            this.bookieServer = bookieServer;
-            return this;
-        }
-
-        public Builder setAutoRecovery(AutoRecoveryMain autoRecovery) {
-            this.autoRecovery = autoRecovery;
-            return this;
-        }
-
-        public Builder setServerConfiguration(ServerConfiguration conf) {
-            this.serverConf = conf;
-            return this;
-        }
-
-        public BKServiceProvider build() {
-            return new BKServiceProvider(
-                bookieServer,
-                autoRecovery,
-                serverConf
-            );
-        }
-    }
-
-}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ConfigurationService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ConfigurationService.java
index 142042d..a7f3407 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ConfigurationService.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ConfigurationService.java
@@ -20,23 +20,21 @@
  */
 package org.apache.bookkeeper.http;
 
+import com.google.common.base.Preconditions;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 
-import com.google.common.base.Preconditions;
-
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.http.service.Service;
-import org.apache.bookkeeper.http.service.ServiceRequest;
-import org.apache.bookkeeper.http.service.ServiceResponse;
+import org.apache.bookkeeper.http.service.HttpEndpointService;
+import org.apache.bookkeeper.http.service.HttpServiceRequest;
+import org.apache.bookkeeper.http.service.HttpServiceResponse;
 import org.apache.bookkeeper.util.JsonUtil;
 
 /**
- * Service that handle Bookkeeper Configuration related http request.
+ * HttpEndpointService that handle Bookkeeper Configuration related http request.
  */
-public class ConfigurationService implements Service {
+public class ConfigurationService implements HttpEndpointService {
 
     protected ServerConfiguration conf;
 
@@ -46,12 +44,36 @@ public class ConfigurationService implements Service {
     }
 
     @Override
-    public ServiceResponse handle(ServiceRequest request) throws Exception {
-        ServiceResponse response = new ServiceResponse();
-        Map<String, Object> configMap = toMap(conf);
-        String jsonResponse = JsonUtil.toJson(configMap);
-        response.setBody(jsonResponse);
-        return response;
+    public HttpServiceResponse handle(HttpServiceRequest request) throws Exception {
+        HttpServiceResponse response = new HttpServiceResponse();
+        // GET
+        if (HttpServer.Method.GET == request.getMethod()) {
+            Map<String, Object> configMap = toMap(conf);
+            String jsonResponse = JsonUtil.toJson(configMap);
+            response.setBody(jsonResponse);
+            return response;
+        } else if (HttpServer.Method.PUT == request.getMethod()) {
+            String requestBody = request.getBody();
+            if(null == requestBody) {
+                response.setCode(HttpServer.StatusCode.NOT_FOUND);
+                response.setBody("Request body not found. should contains k-v pairs");
+                return response;
+            }
+            @SuppressWarnings("unchecked")
+            HashMap<String, Object> configMap = JsonUtil.fromJson(requestBody, HashMap.class);
+            for(Map.Entry<String, Object> entry: configMap.entrySet()) {
+                conf.setProperty(entry.getKey(), entry.getValue());
+            }
+
+            response.setCode(HttpServer.StatusCode.OK);
+            response.setBody("Success set server config.");
+            return response;
+        } else {
+            response.setCode(HttpServer.StatusCode.NOT_FOUND);
+            response.setBody("Request body not found. should contains k-v pairs");
+            return response;
+        }
+
     }
 
     private Map<String, Object> toMap(ServerConfiguration conf) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/DecommissionService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/DecommissionService.java
new file mode 100644
index 0000000..428c007
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/DecommissionService.java
@@ -0,0 +1,110 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.http;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.concurrent.ExecutorService;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.http.service.HttpEndpointService;
+import org.apache.bookkeeper.http.service.HttpServiceRequest;
+import org.apache.bookkeeper.http.service.HttpServiceResponse;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.util.JsonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HttpEndpointService that handle Bookkeeper Decommission related http request.
+ * The PUT method will send decommission bookie command running at backend.
+ */
+public class DecommissionService implements HttpEndpointService {
+
+    static final Logger LOG = LoggerFactory.getLogger(DecommissionService.class);
+
+    protected ServerConfiguration conf;
+    protected BookKeeperAdmin bka;
+    protected ExecutorService executor;
+
+
+    public DecommissionService(ServerConfiguration conf, BookKeeperAdmin bka, ExecutorService executor) {
+        Preconditions.checkNotNull(conf);
+        this.conf = conf;
+        this.bka = bka;
+        this.executor = executor;
+    }
+
+    /*
+     * decommission bookie.
+     */
+    @Override
+    public HttpServiceResponse handle(HttpServiceRequest request) throws Exception {
+        HttpServiceResponse response = new HttpServiceResponse();
+
+        if (HttpServer.Method.PUT == request.getMethod()) {
+            String requestBody = request.getBody();
+
+            if (requestBody == null) {
+                response.setCode(HttpServer.StatusCode.NOT_FOUND);
+                response.setBody("Null request body for DecommissionService.");
+                return response;
+            }
+
+            @SuppressWarnings("unchecked")
+            HashMap<String, String> configMap = JsonUtil.fromJson(requestBody, HashMap.class);
+            if (configMap != null && configMap.containsKey("bookie_src")) {
+                try {
+                    String bookieSrcString[] = configMap.get("bookie_src").split(":");
+                    BookieSocketAddress bookieSrc = new BookieSocketAddress(
+                      bookieSrcString[0], Integer.parseInt(bookieSrcString[1]));
+
+                    executor.execute(() -> {
+                        try {
+                            LOG.info("Start decommissioning bookie.");
+                            bka.decommissionBookie(bookieSrc);
+                            LOG.info("Complete decommissioning bookie.");
+                        } catch (Exception e) {
+                            LOG.error("Error handling decommissionBookie: {} with exception {}", bookieSrc, e);
+                        }
+                    });
+
+                    response.setCode(HttpServer.StatusCode.OK);
+                    response.setBody("Success send decommission Bookie command " + bookieSrc.toString());
+                    return response;
+                } catch (Exception e) {
+                    LOG.error("Exception occurred while decommissioning bookie: ", e);
+                    response.setCode(HttpServer.StatusCode.NOT_FOUND);
+                    response.setBody("Exception when send decommission command." + e.getMessage());
+                    return response;
+                }
+            } else {
+                response.setCode(HttpServer.StatusCode.NOT_FOUND);
+                response.setBody("Request body not contains bookie_src.");
+                return response;
+            }
+        } else {
+            response.setCode(HttpServer.StatusCode.NOT_FOUND);
+            response.setBody("Not found method. Should be PUT method");
+            return response;
+        }
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/DeleteLedgerService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/DeleteLedgerService.java
new file mode 100644
index 0000000..334aeed
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/DeleteLedgerService.java
@@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.http;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.http.service.HttpEndpointService;
+import org.apache.bookkeeper.http.service.HttpServiceRequest;
+import org.apache.bookkeeper.http.service.HttpServiceResponse;
+import org.apache.bookkeeper.util.JsonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HttpEndpointService that handle Bookkeeper delete ledger related http request.
+ * The DELETE method will delete ledger with provided "ledger_id".
+ */
+public class DeleteLedgerService implements HttpEndpointService {
+
+    static final Logger LOG = LoggerFactory.getLogger(DeleteLedgerService.class);
+
+    protected ServerConfiguration conf;
+
+    public DeleteLedgerService(ServerConfiguration conf) {
+        Preconditions.checkNotNull(conf);
+        this.conf = conf;
+    }
+
+    @Override
+    public HttpServiceResponse handle(HttpServiceRequest request) throws Exception {
+        HttpServiceResponse response = new HttpServiceResponse();
+        // only handle DELETE method
+        if (HttpServer.Method.DELETE == request.getMethod()) {
+            Map<String, String> params = request.getParams();
+            if (params != null && params.containsKey("ledger_id")) {
+                ClientConfiguration clientConf = new ClientConfiguration();
+                clientConf.addConfiguration(conf);
+                BookKeeper bk = new BookKeeper(clientConf);
+                Long ledgerId = Long.parseLong(params.get("ledger_id"));
+
+                bk.deleteLedger(ledgerId);
+
+                String output = "Deleted ledger: " + ledgerId;
+                String jsonResponse = JsonUtil.toJson(output);
+                LOG.debug("output body:" + jsonResponse);
+                response.setBody(jsonResponse);
+                response.setCode(HttpServer.StatusCode.OK);
+                return response;
+            } else {
+                response.setCode(HttpServer.StatusCode.NOT_FOUND);
+                response.setBody("Not ledger found. Should provide ledger_id=<id>");
+                return response;
+            }
+        } else {
+            response.setCode(HttpServer.StatusCode.NOT_FOUND);
+            response.setBody("Not found method. Should be DELETE method");
+            return response;
+        }
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ExpandStorageService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ExpandStorageService.java
new file mode 100644
index 0000000..e51ca12
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ExpandStorageService.java
@@ -0,0 +1,103 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.http;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.http.service.HttpEndpointService;
+import org.apache.bookkeeper.http.service.HttpServiceRequest;
+import org.apache.bookkeeper.http.service.HttpServiceResponse;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HttpEndpointService that handle Bookkeeper expand storage related http request.
+ * The PUT method will expand this bookie's storage.
+ * User should update the directories info in the conf file with new empty ledger/index
+ * directories, before running the command.
+ */
+public class ExpandStorageService implements HttpEndpointService {
+
+    static final Logger LOG = LoggerFactory.getLogger(ExpandStorageService.class);
+
+    protected ServerConfiguration conf;
+    private ZooKeeper zk;
+
+    public ExpandStorageService(ServerConfiguration conf, ZooKeeper zk) {
+        Preconditions.checkNotNull(conf);
+        this.conf = conf;
+        this.zk = zk;
+    }
+
+    /*
+     * Add new empty ledger/index directories.
+     * Update the directories info in the conf file before running the command.
+     */
+    @Override
+    public HttpServiceResponse handle(HttpServiceRequest request) throws Exception {
+        HttpServiceResponse response = new HttpServiceResponse();
+
+        if (HttpServer.Method.PUT == request.getMethod()) {
+            File[] ledgerDirectories = Bookie.getCurrentDirectories(conf.getLedgerDirs());
+            File[] journalDirectories = Bookie.getCurrentDirectories(conf.getJournalDirs());
+            File[] indexDirectories;
+            if (null == conf.getIndexDirs()) {
+                indexDirectories = ledgerDirectories;
+            } else {
+                indexDirectories = Bookie.getCurrentDirectories(conf.getIndexDirs());
+            }
+
+            List<File> allLedgerDirs = Lists.newArrayList();
+            allLedgerDirs.addAll(Arrays.asList(ledgerDirectories));
+            if (indexDirectories != ledgerDirectories) {
+                allLedgerDirs.addAll(Arrays.asList(indexDirectories));
+            }
+
+            try {
+                Bookie.checkEnvironmentWithStorageExpansion(conf, zk,
+                  Lists.newArrayList(journalDirectories), allLedgerDirs);
+            } catch (BookieException | IOException e) {
+                LOG.error("Exception occurred while updating cookie for storage expansion", e);
+                response.setCode(HttpServer.StatusCode.INTERNAL_ERROR);
+                response.setBody("Exception while updating cookie for storage expansion");
+                return response;
+            }
+
+            String jsonResponse = "Success expand storage";
+            LOG.debug("output body:" + jsonResponse);
+            response.setBody(jsonResponse);
+            response.setCode(HttpServer.StatusCode.OK);
+            return response;
+        } else {
+            response.setCode(HttpServer.StatusCode.NOT_FOUND);
+            response.setBody("Not found method. Should be PUT method");
+            return response;
+        }
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/GetLastLogMarkService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/GetLastLogMarkService.java
new file mode 100644
index 0000000..7d699da
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/GetLastLogMarkService.java
@@ -0,0 +1,109 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.http;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import org.apache.bookkeeper.bookie.Journal;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.LogMark;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.http.service.HttpEndpointService;
+import org.apache.bookkeeper.http.service.HttpServiceRequest;
+import org.apache.bookkeeper.http.service.HttpServiceResponse;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.bookkeeper.util.JsonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HttpEndpointService that handle Bookkeeper get last log mark related http request.
+ * The GET method will get the last log position of each journal.
+ *
+ * output would be like this:
+ *  {
+ *    "<Journal_id>" : "<Pos>",
+ *    ...
+ *  }
+ */
+public class GetLastLogMarkService implements HttpEndpointService {
+
+    static final Logger LOG = LoggerFactory.getLogger(GetLastLogMarkService.class);
+
+    protected ServerConfiguration conf;
+
+    public GetLastLogMarkService(ServerConfiguration conf) {
+        Preconditions.checkNotNull(conf);
+        this.conf = conf;
+    }
+
+    @Override
+    public HttpServiceResponse handle(HttpServiceRequest request) throws Exception {
+        HttpServiceResponse response = new HttpServiceResponse();
+
+        if (HttpServer.Method.GET == request.getMethod()) {
+            try {
+                /**
+                 * output:
+                 *  {
+                 *    "<Journal_id>" : "<Pos>",
+                 *    ...
+                 *  }
+                 */
+                Map<String, String> output = Maps.newHashMap();
+
+                List<Journal> journals = Lists.newArrayListWithCapacity(conf.getJournalDirs().length);
+                for (File journalDir : conf.getJournalDirs()) {
+                    journals.add(new Journal(journalDir, conf, new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                      new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()))));
+                }
+                for (Journal journal : journals) {
+                    LogMark lastLogMark = journal.getLastLogMark().getCurMark();
+                    LOG.debug("LastLogMark: Journal Id - " + lastLogMark.getLogFileId() + "("
+                      + Long.toHexString(lastLogMark.getLogFileId()) + ".txn), Pos - "
+                      + lastLogMark.getLogFileOffset());
+                    output.put("LastLogMark: Journal Id - " + lastLogMark.getLogFileId()
+                        + "(" + Long.toHexString(lastLogMark.getLogFileId()) + ".txn)",
+                        "Pos - " + lastLogMark.getLogFileOffset());
+                }
+
+                String jsonResponse = JsonUtil.toJson(output);
+                LOG.debug("output body:" + jsonResponse);
+                response.setBody(jsonResponse);
+                response.setCode(HttpServer.StatusCode.OK);
+                return response;
+            } catch (Exception e) {
+                LOG.error("Exception occurred while getting last log mark", e);
+                response.setCode(HttpServer.StatusCode.NOT_FOUND);
+                response.setBody("ERROR handling request: " + e.getMessage());
+                return response;
+            }
+        } else {
+            response.setCode(HttpServer.StatusCode.NOT_FOUND);
+            response.setBody("Not found method. Should be GET method");
+            return response;
+        }
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/GetLedgerMetaService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/GetLedgerMetaService.java
new file mode 100644
index 0000000..db0bb62
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/GetLedgerMetaService.java
@@ -0,0 +1,90 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.http;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.http.service.HttpEndpointService;
+import org.apache.bookkeeper.http.service.HttpServiceRequest;
+import org.apache.bookkeeper.http.service.HttpServiceResponse;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.util.JsonUtil;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HttpEndpointService that handle Bookkeeper get ledger metadata related http request.
+ * The GET method will get the ledger metadata for given "ledger_id".
+ */
+public class GetLedgerMetaService implements HttpEndpointService {
+
+    static final Logger LOG = LoggerFactory.getLogger(GetLedgerMetaService.class);
+
+    protected ServerConfiguration conf;
+    protected ZooKeeper zk;
+
+    public GetLedgerMetaService(ServerConfiguration conf, ZooKeeper zk) {
+        Preconditions.checkNotNull(conf);
+        this.conf = conf;
+        this.zk = zk;
+    }
+
+    @Override
+    public HttpServiceResponse handle(HttpServiceRequest request) throws Exception {
+        HttpServiceResponse response = new HttpServiceResponse();
+        Map<String, String> params = request.getParams();
+
+        if (HttpServer.Method.GET == request.getMethod() && (params != null) && params.containsKey("ledger_id")) {
+            Long ledgerId = Long.parseLong(params.get("ledger_id"));
+
+            LedgerManagerFactory mFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, zk);
+            LedgerManager manager = mFactory.newLedgerManager();
+
+            // output <ledgerId: ledgerMetadata>
+            Map<String, String> output = Maps.newHashMap();
+            ListLedgerService.ReadLedgerMetadataCallback cb =
+              new ListLedgerService.ReadLedgerMetadataCallback(ledgerId);
+            manager.readLedgerMetadata(ledgerId, cb);
+            LedgerMetadata md = cb.get();
+            output.put(ledgerId.toString(), new String(md.serialize(), UTF_8));
+
+            manager.close();
+            mFactory.uninitialize();
+
+            String jsonResponse = JsonUtil.toJson(output);
+            LOG.debug("output body:" + jsonResponse);
+            response.setBody(jsonResponse);
+            response.setCode(HttpServer.StatusCode.OK);
+            return response;
+        } else {
+            response.setCode(HttpServer.StatusCode.NOT_FOUND);
+            response.setBody("Not found method. Should be GET method");
+            return response;
+        }
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ListBookieInfoService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ListBookieInfoService.java
new file mode 100644
index 0000000..083ee8b
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ListBookieInfoService.java
@@ -0,0 +1,131 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.http;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import java.math.RoundingMode;
+import java.text.DecimalFormat;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookieInfoReader;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.http.service.HttpEndpointService;
+import org.apache.bookkeeper.http.service.HttpServiceRequest;
+import org.apache.bookkeeper.http.service.HttpServiceResponse;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.util.JsonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HttpEndpointService that handle Bookkeeper list bookie info related http request.
+ *
+ * The GET method will get the disk usage of all bookies in this bookkeeper cluster.
+ * Output would be like this:
+ *  {
+ *    "bookieAddress" : {free: xxx, total: xxx}",
+ *    "bookieAddress" : {free: xxx, total: xxx},
+ *    ...
+ *    "clusterInfo" : {total_free: xxx, total: xxx}"
+ *  }
+ */
+public class ListBookieInfoService implements HttpEndpointService {
+
+    static final Logger LOG = LoggerFactory.getLogger(ListBookieInfoService.class);
+
+    protected ServerConfiguration conf;
+
+    public ListBookieInfoService(ServerConfiguration conf) {
+        Preconditions.checkNotNull(conf);
+        this.conf = conf;
+    }
+
+    String getReadable(long val) {
+        String unit[] = {"", "KB", "MB", "GB", "TB" };
+        int cnt = 0;
+        double d = val;
+        while (d >= 1000 && cnt < unit.length - 1) {
+            d = d / 1000;
+            cnt++;
+        }
+        DecimalFormat df = new DecimalFormat("#.###");
+        df.setRoundingMode(RoundingMode.DOWN);
+        return cnt > 0 ? "(" + df.format(d) + unit[cnt] + ")" : unit[cnt];
+    }
+
+    @Override
+    public HttpServiceResponse handle(HttpServiceRequest request) throws Exception {
+        HttpServiceResponse response = new HttpServiceResponse();
+
+        if (HttpServer.Method.GET == request.getMethod()) {
+            ClientConfiguration clientConf = new ClientConfiguration(conf)
+              .setZkServers(conf.getZkServers());
+            clientConf.setDiskWeightBasedPlacementEnabled(true);
+            BookKeeper bk = new BookKeeper(clientConf);
+
+            Map<BookieSocketAddress, BookieInfoReader.BookieInfo> map = bk.getBookieInfo();
+            if (map.size() == 0) {
+                bk.close();
+                response.setCode(HttpServer.StatusCode.NOT_FOUND);
+                response.setBody("Not found any Bookie info.");
+                return response;
+            }
+
+            /**
+             * output:
+             *  {
+             *    "bookieAddress" : {free: xxx, total: xxx}",
+             *    "bookieAddress" : {free: xxx, total: xxx},
+             *    ...
+             *    "clusterInfo" : {total_free: xxx, total: xxx}"
+             *  }
+             */
+            LinkedHashMap<String, String> output = Maps.newLinkedHashMapWithExpectedSize(map.size());
+            Long totalFree = 0L, total = 0L;
+            for (Map.Entry<BookieSocketAddress, BookieInfoReader.BookieInfo> infoEntry : map.entrySet()) {
+                BookieInfoReader.BookieInfo bInfo = infoEntry.getValue();
+                output.put(infoEntry.getKey().toString(),
+                    ": {Free: " + bInfo.getFreeDiskSpace() + getReadable(bInfo.getFreeDiskSpace())
+                    + ", Total: " + bInfo.getTotalDiskSpace() + getReadable(bInfo.getTotalDiskSpace()) + "},");
+                totalFree += bInfo.getFreeDiskSpace();
+                total += bInfo.getTotalDiskSpace();
+            }
+            output.put("ClusterInfo: ",
+                "{Free: " + totalFree + getReadable(totalFree)
+                + ", Total: " + total + getReadable(total) + "}");
+
+            bk.close();
+
+            String jsonResponse = JsonUtil.toJson(output);
+            LOG.debug("output body:" + jsonResponse);
+            response.setBody(jsonResponse);
+            response.setCode(HttpServer.StatusCode.OK);
+            return response;
+        } else {
+            response.setCode(HttpServer.StatusCode.NOT_FOUND);
+            response.setBody("Not found method. Should be GET method");
+            return response;
+        }
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ListBookiesService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ListBookiesService.java
new file mode 100644
index 0000000..5dc849e
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ListBookiesService.java
@@ -0,0 +1,96 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.http;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.http.service.HttpEndpointService;
+import org.apache.bookkeeper.http.service.HttpServiceRequest;
+import org.apache.bookkeeper.http.service.HttpServiceResponse;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.util.JsonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HttpEndpointService that handle Bookkeeper list bookies related http request.
+ * The GET method will list all bookies of type rw|ro in this bookkeeper cluster.
+ */
+public class ListBookiesService implements HttpEndpointService {
+
+    static final Logger LOG = LoggerFactory.getLogger(ListBookiesService.class);
+
+    protected ServerConfiguration conf;
+    protected BookKeeperAdmin bka;
+
+    public ListBookiesService(ServerConfiguration conf, BookKeeperAdmin bka) {
+        Preconditions.checkNotNull(conf);
+        this.conf = conf;
+        this.bka = bka;
+    }
+
+    @Override
+    public HttpServiceResponse handle(HttpServiceRequest request) throws Exception {
+        HttpServiceResponse response = new HttpServiceResponse();
+        // GET
+        if (HttpServer.Method.GET == request.getMethod()) {
+            Collection<BookieSocketAddress> bookies = new ArrayList<BookieSocketAddress>();
+
+            Map<String, String> params = request.getParams();
+            // default print rw
+            boolean readOnly = (params != null) &&
+              params.containsKey("type") &&
+              params.get("type").equals("ro");
+            // default not print hostname
+            boolean printHostname = (params != null) &&
+              params.containsKey("print_hostnames") &&
+              params.get("print_hostnames").equals("true");
+
+            if (readOnly) {
+                bookies.addAll(bka.getReadOnlyBookies());
+            } else {
+                bookies.addAll(bka.getAvailableBookies());
+            }
+
+            // output <bookieSocketAddress: hostname>
+            Map<String, String> output = Maps.newHashMap();
+            for (BookieSocketAddress b : bookies) {
+                output.putIfAbsent(b.toString(), printHostname ? b.getHostName() : null);
+                LOG.debug("bookie: " + b.toString() + " hostname:" + b.getHostName());
+            }
+            String jsonResponse = JsonUtil.toJson(output);
+
+            response.setBody(jsonResponse);
+            response.setCode(HttpServer.StatusCode.OK);
+            return response;
+        } else {
+            response.setCode(HttpServer.StatusCode.NOT_FOUND);
+            response.setBody("Not found method. Should be GET method");
+            return response;
+        }
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ListDiskFilesService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ListDiskFilesService.java
new file mode 100644
index 0000000..72871b5
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ListDiskFilesService.java
@@ -0,0 +1,132 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.http;
+
+import static org.apache.bookkeeper.bookie.BookieShell.listFilesAndSort;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.http.service.HttpEndpointService;
+import org.apache.bookkeeper.http.service.HttpServiceRequest;
+import org.apache.bookkeeper.http.service.HttpServiceResponse;
+import org.apache.bookkeeper.util.JsonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HttpEndpointService that handle Bookkeeper list disk files related http request.
+ *
+ * The GET method will list all bookie files of type journal|entrylog|index in this bookie.
+ * The output would be like this:
+ *  {
+ *    "journal files" : "filename1 \t ...",
+ *    "entrylog files" : "filename1 \t ...",
+ *    "index files" : "filename1 \t ..."
+ *  }
+ */
+public class ListDiskFilesService implements HttpEndpointService {
+
+    static final Logger LOG = LoggerFactory.getLogger(ListDiskFilesService.class);
+
+    protected ServerConfiguration conf;
+
+    public ListDiskFilesService(ServerConfiguration conf) {
+        Preconditions.checkNotNull(conf);
+        this.conf = conf;
+    }
+
+    @Override
+    public HttpServiceResponse handle(HttpServiceRequest request) throws Exception {
+        HttpServiceResponse response = new HttpServiceResponse();
+        Map<String, String> params = request.getParams();
+
+        if (HttpServer.Method.GET == request.getMethod()) {
+            /**
+             * output:
+             *  {
+             *    "journal files" : "filename \t ...",
+             *    "entrylog files" : "filename \t ...",
+             *    "index files" : "filename \t ..."
+             *  }
+             */
+            Map<String, String> output = Maps.newHashMap();
+
+            boolean journal = params != null &&
+                params.containsKey("file_type")
+                && params.get("file_type").equals("journal");
+            boolean entrylog = params != null &&
+                params.containsKey("file_type")
+                && params.get("file_type").equals("entrylog");
+            boolean index = params != null &&
+                params.containsKey("file_type")
+                && params.get("file_type").equals("index");
+            boolean all = false;
+
+            if (!journal && !entrylog && !index && !all) {
+                all = true;
+            }
+
+            if (all || journal) {
+                File[] journalDirs = conf.getJournalDirs();
+                List<File> journalFiles = listFilesAndSort(journalDirs, "txn");
+                StringBuffer files = new StringBuffer();
+                for (File journalFile : journalFiles) {
+                    files.append(journalFile.getName() + "\t");
+                }
+                output.put("journal files", files.toString());
+            }
+
+            if (all || entrylog) {
+                File[] ledgerDirs = conf.getLedgerDirs();
+                List<File> ledgerFiles = listFilesAndSort(ledgerDirs, "log");
+                StringBuffer files = new StringBuffer();
+                for (File ledgerFile : ledgerFiles) {
+                    files.append(ledgerFile.getName()+ "\t");
+                }
+                output.put("entrylog files", files.toString());
+            }
+
+            if (all || index) {
+                File[] indexDirs = (conf.getIndexDirs() == null) ? conf.getLedgerDirs() : conf.getIndexDirs();
+                List<File> indexFiles = listFilesAndSort(indexDirs, "idx");
+                StringBuffer files = new StringBuffer();
+                for (File indexFile : indexFiles) {
+                    files.append(indexFile.getName()+ "\t");
+                }
+                output.put("index files", files.toString());
+            }
+
+            String jsonResponse = JsonUtil.toJson(output);
+            LOG.debug("output body:" + jsonResponse);
+            response.setBody(jsonResponse);
+            response.setCode(HttpServer.StatusCode.OK);
+            return response;
+        } else {
+            response.setCode(HttpServer.StatusCode.NOT_FOUND);
+            response.setBody("Not found method. Should be GET method");
+            return response;
+        }
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ListLedgerService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ListLedgerService.java
new file mode 100644
index 0000000..b500c18
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ListLedgerService.java
@@ -0,0 +1,176 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.http;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.AbstractFuture;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.http.service.HttpEndpointService;
+import org.apache.bookkeeper.http.service.HttpServiceRequest;
+import org.apache.bookkeeper.http.service.HttpServiceResponse;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
+import org.apache.bookkeeper.util.JsonUtil;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HttpEndpointService that handle Bookkeeper list ledger related http request.
+ *
+ * The GET method will list all ledger_ids in this bookkeeper cluster.
+ * User can choose print metadata of each ledger or not by set parameter "print_metadata"
+ */
+public class ListLedgerService implements HttpEndpointService {
+
+    static final Logger LOG = LoggerFactory.getLogger(ListLedgerService.class);
+
+    protected ServerConfiguration conf;
+    protected ZooKeeper zk;
+
+    public ListLedgerService(ServerConfiguration conf, ZooKeeper zk) {
+        Preconditions.checkNotNull(conf);
+        this.conf = conf;
+        this.zk = zk;
+    }
+
+    // Number of LedgerMetadata contains in each page
+    static final int LIST_LEDGER_BATCH_SIZE = 100;
+
+    public static class ReadLedgerMetadataCallback extends AbstractFuture<LedgerMetadata>
+      implements BookkeeperInternalCallbacks.GenericCallback<LedgerMetadata> {
+        final long ledgerId;
+
+        ReadLedgerMetadataCallback(long ledgerId) {
+            this.ledgerId = ledgerId;
+        }
+
+        long getLedgerId() {
+            return ledgerId;
+        }
+
+        public void operationComplete(int rc, LedgerMetadata result) {
+            if (rc != 0) {
+                setException(BKException.create(rc));
+            } else {
+                set(result);
+            }
+        }
+    }
+    static void keepLedgerMetadata(ReadLedgerMetadataCallback cb, LinkedHashMap<String, String> output) throws Exception {
+        LedgerMetadata md = cb.get();
+        output.put(Long.valueOf(cb.getLedgerId()).toString(), new String(md.serialize(), UTF_8));
+    }
+
+    @Override
+    public HttpServiceResponse handle(HttpServiceRequest request) throws Exception {
+        HttpServiceResponse response = new HttpServiceResponse();
+        // GET
+        // parameter could be like: print_metadata=true&page=PageIndex
+        if (HttpServer.Method.GET == request.getMethod()) {
+            Map<String, String> params = request.getParams();
+            // default not print metadata
+            boolean printMeta = (params != null) &&
+              params.containsKey("print_metadata") &&
+              params.get("print_metadata").equals("true");
+
+            // Page index should start from 1;
+            int pageIndex = (printMeta && params.containsKey("page")) ?
+              Integer.parseInt(params.get("page")) :
+              -1;
+
+            LedgerManagerFactory mFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, zk);
+            LedgerManager manager = mFactory.newLedgerManager();
+            LedgerManager.LedgerRangeIterator iter = manager.getLedgerRanges();
+
+            // output <ledgerId: ledgerMetadata>
+            LinkedHashMap<String, String> output = Maps.newLinkedHashMap();
+            // futures for readLedgerMetadata for each page.
+            List<ReadLedgerMetadataCallback> futures = Lists.newArrayListWithExpectedSize(LIST_LEDGER_BATCH_SIZE);
+
+            if (printMeta) {
+                int ledgerIndex = 0;
+
+                // start and end ledger index for wanted page.
+                int startLedgerIndex = 0;
+                int endLedgerIndex = 0;
+                if(pageIndex > 0) {
+                    startLedgerIndex = (pageIndex - 1) * LIST_LEDGER_BATCH_SIZE;
+                    endLedgerIndex = startLedgerIndex + LIST_LEDGER_BATCH_SIZE - 1;
+                }
+
+                // get metadata
+                while (iter.hasNext()) {
+                    LedgerManager.LedgerRange r = iter.next();
+                    for (Long lid : r.getLedgers()) {
+                        ledgerIndex ++;
+                        if (endLedgerIndex == 0 ||       // no actual page parameter provided
+                          (ledgerIndex >= startLedgerIndex && ledgerIndex <= endLedgerIndex)) {
+                            ReadLedgerMetadataCallback cb = new ReadLedgerMetadataCallback(lid);
+                            manager.readLedgerMetadata(lid, cb);
+                            futures.add(cb);
+                        }
+                    }
+                    if (futures.size() >= LIST_LEDGER_BATCH_SIZE) {
+                        while (futures.size() > 0) {
+                            ReadLedgerMetadataCallback cb = futures.remove(0);
+                            keepLedgerMetadata(cb, output);
+                        }
+                    }
+                }
+                while (futures.size() > 0) {
+                    ReadLedgerMetadataCallback cb = futures.remove(0);
+                    keepLedgerMetadata(cb, output);
+                }
+            } else {
+                while (iter.hasNext()) {
+                    LedgerManager.LedgerRange r = iter.next();
+                    for (Long lid : r.getLedgers()) {
+                        output.put(lid.toString(), null);
+                    }
+                }
+            }
+
+            manager.close();
+            mFactory.uninitialize();
+
+            String jsonResponse = JsonUtil.toJson(output);
+            LOG.debug("output body:" + jsonResponse);
+            response.setBody(jsonResponse);
+            response.setCode(HttpServer.StatusCode.OK);
+            return response;
+        } else {
+            response.setCode(HttpServer.StatusCode.NOT_FOUND);
+            response.setBody("Not found method. Should be GET method");
+            return response;
+        }
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ListUnderReplicatedLedgerService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ListUnderReplicatedLedgerService.java
new file mode 100644
index 0000000..01d5853
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ListUnderReplicatedLedgerService.java
@@ -0,0 +1,124 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.http;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.http.service.HttpEndpointService;
+import org.apache.bookkeeper.http.service.HttpServiceRequest;
+import org.apache.bookkeeper.http.service.HttpServiceResponse;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.util.JsonUtil;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HttpEndpointService that handle Bookkeeper list under replicated ledger related http request.
+ *
+ * The GET method will list all ledger_ids of under replicated ledger.
+ * User can filer wanted ledger by set parameter "missingreplica" and "excludingmissingreplica"
+ */
+public class ListUnderReplicatedLedgerService implements HttpEndpointService {
+
+    static final Logger LOG = LoggerFactory.getLogger(ListUnderReplicatedLedgerService.class);
+
+    protected ServerConfiguration conf;
+    protected ZooKeeper zk;
+
+    public ListUnderReplicatedLedgerService(ServerConfiguration conf, ZooKeeper zk) {
+        Preconditions.checkNotNull(conf);
+        this.conf = conf;
+        this.zk = zk;
+    }
+
+    /*
+     * Print the node which holds the auditor lock.
+     */
+    @Override
+    public HttpServiceResponse handle(HttpServiceRequest request) throws Exception {
+        HttpServiceResponse response = new HttpServiceResponse();
+        // parameter as this: ?missingreplica=<bookie_address>&excludingmissingreplica=<bookid_address>
+        Map<String, String> params = request.getParams();
+
+        if (HttpServer.Method.GET == request.getMethod()) {
+            final String includingBookieId;
+            final String excludingBookieId;
+            if (params != null && params.containsKey("missingreplica")) {
+                includingBookieId = params.get("missingreplica");
+            } else {
+                includingBookieId = null;
+            }
+            if (params != null && params.containsKey("excludingmissingreplica")) {
+                excludingBookieId = params.get("excludingmissingreplica");
+            } else {
+                excludingBookieId = null;
+            }
+            Predicate<List<String>> predicate = null;
+            if (!StringUtils.isBlank(includingBookieId) && !StringUtils.isBlank(excludingBookieId)) {
+                predicate = replicasList -> (replicasList.contains(includingBookieId)
+                  && !replicasList.contains(excludingBookieId));
+            } else if (!StringUtils.isBlank(includingBookieId)) {
+                predicate = replicasList -> replicasList.contains(includingBookieId);
+            } else if (!StringUtils.isBlank(excludingBookieId)) {
+                predicate = replicasList -> !replicasList.contains(excludingBookieId);
+            }
+
+            try {
+                List<Long> outputLedgers = Lists.newArrayList();
+                LedgerManagerFactory mFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, zk);
+                LedgerUnderreplicationManager underreplicationManager = mFactory.newLedgerUnderreplicationManager();
+                Iterator<Long> iter = underreplicationManager.listLedgersToRereplicate(predicate);
+
+                while (iter.hasNext()) {
+                    outputLedgers.add(iter.next());
+                }
+                if (outputLedgers.isEmpty()) {
+                    response.setCode(HttpServer.StatusCode.NOT_FOUND);
+                    response.setBody("No under replicated ledgers found");
+                    return response;
+                } else {
+                    response.setCode(HttpServer.StatusCode.OK);
+                    String jsonResponse = JsonUtil.toJson(outputLedgers);
+                    LOG.debug("output body: " + jsonResponse);
+                    response.setBody(jsonResponse);
+                    return response;
+                }
+            } catch (Exception e) {
+                LOG.error("Exception occurred while listing under replicated ledgers", e);
+                response.setCode(HttpServer.StatusCode.NOT_FOUND);
+                response.setBody("Exception when get." + e.getMessage());
+                return response;
+            }
+        } else {
+            response.setCode(HttpServer.StatusCode.NOT_FOUND);
+            response.setBody("Not found method. Should be GET method");
+            return response;
+        }
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/LostBookieRecoveryDelayService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/LostBookieRecoveryDelayService.java
new file mode 100644
index 0000000..0098e07
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/LostBookieRecoveryDelayService.java
@@ -0,0 +1,103 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.http;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.http.service.HttpEndpointService;
+import org.apache.bookkeeper.http.service.HttpServiceRequest;
+import org.apache.bookkeeper.http.service.HttpServiceResponse;
+import org.apache.bookkeeper.util.JsonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HttpEndpointService that handle Bookkeeper lost bookie recovery delay parameter related http request.
+ *
+ * The GET method will get the value of parameter lostBookieRecoveryDelay,
+ * while the PUT method will set the value of parameter lostBookieRecoveryDelay,
+ */
+public class LostBookieRecoveryDelayService implements HttpEndpointService {
+
+    static final Logger LOG = LoggerFactory.getLogger(LostBookieRecoveryDelayService.class);
+
+    protected ServerConfiguration conf;
+    protected BookKeeperAdmin bka;
+
+    public LostBookieRecoveryDelayService(ServerConfiguration conf, BookKeeperAdmin bka) {
+        Preconditions.checkNotNull(conf);
+        this.conf = conf;
+        this.bka = bka;
+    }
+
+    /*
+     * set/get lostBookieRecoveryDelay.
+     */
+    @Override
+    public HttpServiceResponse handle(HttpServiceRequest request) throws Exception {
+        HttpServiceResponse response = new HttpServiceResponse();
+
+        if (HttpServer.Method.PUT == request.getMethod()) {
+            // request body as {"delay_seconds": <delay_seconds>}
+            String requestBody = request.getBody();
+
+            if (requestBody == null) {
+                response.setCode(HttpServer.StatusCode.NOT_FOUND);
+                response.setBody("Null request body for lostBookieRecoveryDelay.");
+                return response;
+            }
+
+            @SuppressWarnings("unchecked")
+            HashMap<String, Integer> configMap = JsonUtil.fromJson(requestBody, HashMap.class);
+            if (configMap != null && configMap.containsKey("delay_seconds")) {
+                int delaySeconds = configMap.get("delay_seconds");
+                bka.setLostBookieRecoveryDelay(delaySeconds);
+                response.setCode(HttpServer.StatusCode.OK);
+                response.setBody("Success set lostBookieRecoveryDelay to " + delaySeconds);
+                return response;
+            } else {
+                response.setCode(HttpServer.StatusCode.NOT_FOUND);
+                response.setBody("Request body not contains lostBookieRecoveryDelay.");
+                return response;
+            }
+        } else if (HttpServer.Method.GET == request.getMethod()) {
+            try {
+                int delaySeconds = bka.getLostBookieRecoveryDelay();
+                response.setCode(HttpServer.StatusCode.OK);
+                response.setBody("lostBookieRecoveryDelay value: " + delaySeconds);
+                LOG.debug("response body:" + response.getBody());
+                return response;
+            } catch (Exception e) {
+                // may get noNode exception
+                LOG.error("Exception occurred while getting lost bookie recovery delay", e);
+                response.setCode(HttpServer.StatusCode.NOT_FOUND);
+                response.setBody("Exception when get lostBookieRecoveryDelay." + e.getMessage());
+                return response;
+            }
+        } else {
+            response.setCode(HttpServer.StatusCode.NOT_FOUND);
+            response.setBody("Not found method. Should be PUT method");
+            return response;
+        }
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ReadLedgerEntryService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ReadLedgerEntryService.java
new file mode 100644
index 0000000..a380f19
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ReadLedgerEntryService.java
@@ -0,0 +1,120 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.http;
+
+import static com.google.common.base.Charsets.US_ASCII;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.http.service.HttpEndpointService;
+import org.apache.bookkeeper.http.service.HttpServiceRequest;
+import org.apache.bookkeeper.http.service.HttpServiceResponse;
+import org.apache.bookkeeper.util.JsonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HttpEndpointService that handle Bookkeeper read ledger entry related http request.
+ *
+ * The GET method will print all entry content of wanted entry.
+ * User should set wanted "ledger_id", and can choose only print out wanted entry
+ * by set parameter "start_entry_id", "end_entry_id" and "page".
+ */
+public class ReadLedgerEntryService implements HttpEndpointService {
+
+    static final Logger LOG = LoggerFactory.getLogger(ReadLedgerEntryService.class);
+
+    protected ServerConfiguration conf;
+    protected BookKeeperAdmin bka;
+
+    public ReadLedgerEntryService(ServerConfiguration conf, BookKeeperAdmin bka) {
+        Preconditions.checkNotNull(conf);
+        this.conf = conf;
+        this.bka = bka;
+    }
+
+    static final Long ENTRIES_PER_PAE = 1000L;
+
+    @Override
+    public HttpServiceResponse handle(HttpServiceRequest request) throws Exception {
+        HttpServiceResponse response = new HttpServiceResponse();
+        Map<String, String> params = request.getParams();
+
+        if (HttpServer.Method.GET == request.getMethod() && (params != null) && params.containsKey("ledger_id")) {
+            Long ledgerId = Long.parseLong(params.get("ledger_id"));
+            Long startEntryId = 0L;
+            Long endEntryId = -1L;
+            if (params.containsKey("start_entry_id")) {
+                startEntryId = Long.parseLong(params.get("start_entry_id"));
+            }
+            if (params.containsKey("end_entry_id")) {
+                endEntryId = Long.parseLong(params.get("end_entry_id"));
+            }
+
+            // output <entryid: entry_content>
+            Map<String, String> output = Maps.newHashMap();
+
+            // Page index should start from 1;
+            Integer pageIndex = params.containsKey("page") ? Integer.parseInt(params.get("page")) : -1;
+            if(pageIndex > 0) {
+                // start and end ledger index for wanted page.
+                Long startIndexInPage = (pageIndex - 1) * ENTRIES_PER_PAE;
+                Long endIndexInPage = startIndexInPage + ENTRIES_PER_PAE - 1;
+
+                if ((startEntryId == 0L) || (startEntryId < startIndexInPage)) {
+                    startEntryId = startIndexInPage;
+                }
+                if ((endEntryId == -1L) || (endEntryId > endIndexInPage)) {
+                    endEntryId = endIndexInPage;
+                }
+                output.put("Entries for page: ", pageIndex.toString());
+            }
+
+            if (endEntryId != -1L && startEntryId > endEntryId) {
+                response.setCode(HttpServer.StatusCode.INTERNAL_ERROR);
+                response.setBody("parameter for start_entry_id: " + startEntryId
+                    + " and end_entry_id: " + endEntryId + " conflict with page=" + pageIndex);
+                return response;
+            }
+
+            Iterator<LedgerEntry> entries = bka.readEntries(ledgerId, startEntryId, endEntryId).iterator();
+            while (entries.hasNext()) {
+                LedgerEntry entry = entries.next();
+                output.put(Long.valueOf(entry.getEntryId()).toString(), new String(entry.getEntry(), US_ASCII));
+            }
+
+            String jsonResponse = JsonUtil.toJson(output);
+            LOG.debug("output body:" + jsonResponse);
+            response.setBody(jsonResponse);
+            response.setCode(HttpServer.StatusCode.OK);
+            return response;
+        } else {
+            response.setCode(HttpServer.StatusCode.NOT_FOUND);
+            response.setBody("Not found method. Should be GET method, with ledger_id provided");
+            return response;
+        }
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/RecoveryBookieService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/RecoveryBookieService.java
new file mode 100644
index 0000000..74ffa33
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/RecoveryBookieService.java
@@ -0,0 +1,143 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.http;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import org.apache.bookkeeper.bookie.Cookie;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.http.service.HttpEndpointService;
+import org.apache.bookkeeper.http.service.HttpServiceRequest;
+import org.apache.bookkeeper.http.service.HttpServiceResponse;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.util.JsonUtil;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HttpEndpointService that handle Bookkeeper recovery related http request.
+ *
+ * The PUT method will recovery bookie with provided parameter.
+ * The parameter of input body should be like this format:
+ * {
+ *   "bookie_src": [ "bookie_src1", "bookie_src2"... ],
+ *   "bookie_dest": [ "bookie_dest1", "bookie_dest2"... ],
+ *   "delete_cookie": <bool_value>
+ * }
+ */
+public class RecoveryBookieService implements HttpEndpointService {
+
+    static final Logger LOG = LoggerFactory.getLogger(RecoveryBookieService.class);
+
+    protected ServerConfiguration conf;
+    protected BookKeeperAdmin bka;
+    protected ExecutorService executor;
+
+    public RecoveryBookieService(ServerConfiguration conf, BookKeeperAdmin bka, ExecutorService executor) {
+        Preconditions.checkNotNull(conf);
+        this.conf = conf;
+        this.bka = bka;
+        this.executor = executor;
+    }
+
+    /*
+     * Example body as this:
+     * {
+     *   "bookie_src": [ "bookie_src1", "bookie_src2"... ],
+     *   "bookie_dest": [ "bookie_dest1", "bookie_dest2"... ],
+     *   "delete_cookie": <bool_value>
+     * }
+     */
+    static class RecoveryRequestJsonBody {
+        public List<String> bookie_src;
+        public List<String> bookie_dest;
+        public boolean delete_cookie;
+    }
+
+    @Override
+    public HttpServiceResponse handle(HttpServiceRequest request) throws Exception {
+        HttpServiceResponse response = new HttpServiceResponse();
+        String requestBody = request.getBody();
+        RecoveryRequestJsonBody requestJsonBody;
+
+        if (requestBody == null) {
+            response.setCode(HttpServer.StatusCode.NOT_FOUND);
+            response.setBody("No request body provide.");
+            return response;
+        }
+
+        try {
+            requestJsonBody = JsonUtil.fromJson(requestBody, RecoveryRequestJsonBody.class);
+            LOG.debug("bookie_src: [" + requestJsonBody.bookie_src.get(0)
+                + "],  bookie_dest: ["
+                + ((requestJsonBody.bookie_dest == null) ? "null" : requestJsonBody.bookie_dest.get(0))
+                + "],  delete_cookie: [" + requestJsonBody.delete_cookie + "]");
+        } catch (JsonUtil.ParseJsonException e) {
+            LOG.error("Meet Exception: ", e);
+            response.setCode(HttpServer.StatusCode.NOT_FOUND);
+            response.setBody("ERROR parameters: " + e.getMessage());
+            return response;
+        }
+
+        if (HttpServer.Method.PUT == request.getMethod() &&
+            !requestJsonBody.bookie_src.isEmpty()) {
+            ClientConfiguration adminConf = new ClientConfiguration(conf);
+
+            String bookieSrcString[] = requestJsonBody.bookie_src.get(0).split(":");
+            BookieSocketAddress bookieSrc = new BookieSocketAddress(
+              bookieSrcString[0], Integer.parseInt(bookieSrcString[1]));
+            final BookieSocketAddress bookieDest;
+            if ((requestJsonBody.bookie_dest != null) && !requestJsonBody.bookie_dest.isEmpty()) {
+                String bookieDestString[] = requestJsonBody.bookie_dest.get(0).split(":");
+                bookieDest = new BookieSocketAddress(bookieDestString[0],
+                  Integer.parseInt(bookieDestString[1]));
+            } else {
+                bookieDest = null;
+            }
+            boolean deleteCookie = requestJsonBody.delete_cookie;
+            executor.execute(() -> {
+                try {
+                    LOG.info("Start recovering bookie.");
+                    bka.recoverBookieData(bookieSrc, bookieDest);
+                    if (deleteCookie) {
+                        Versioned<Cookie> cookie = Cookie.readFromZooKeeper(bka.getZooKeeper(), adminConf, bookieSrc);
+                        cookie.getValue().deleteFromZooKeeper(bka.getZooKeeper(), adminConf, bookieSrc, cookie.getVersion());
+                    }
+                    LOG.info("Complete recovering bookie");
+                } catch (Exception e) {
+                    LOG.error("Exception occurred while recovering bookie", e);
+                }
+            });
+
+            response.setCode(HttpServer.StatusCode.OK);
+            response.setBody("Success send recovery request command.");
+            return response;
+        } else {
+            response.setCode(HttpServer.StatusCode.NOT_FOUND);
+            response.setBody("Not found method. Should be PUT method");
+            return response;
+        }
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/TriggerAuditService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/TriggerAuditService.java
new file mode 100644
index 0000000..3f6ed8a
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/TriggerAuditService.java
@@ -0,0 +1,76 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.http;
+
+import com.google.common.base.Preconditions;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.http.service.HttpEndpointService;
+import org.apache.bookkeeper.http.service.HttpServiceRequest;
+import org.apache.bookkeeper.http.service.HttpServiceResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HttpEndpointService that handle Bookkeeper trigger audit related http request.
+ * The PUT method will force trigger the audit by resetting the lostBookieRecoveryDelay.
+ */
+public class TriggerAuditService implements HttpEndpointService {
+
+    static final Logger LOG = LoggerFactory.getLogger(TriggerAuditService.class);
+
+    protected ServerConfiguration conf;
+    protected BookKeeperAdmin bka;
+
+    public TriggerAuditService(ServerConfiguration conf, BookKeeperAdmin bka) {
+        Preconditions.checkNotNull(conf);
+        this.conf = conf;
+        this.bka = bka;
+    }
+
+    /*
+     * Force trigger the Audit by resetting the lostBookieRecoveryDelay.
+     */
+    @Override
+    public HttpServiceResponse handle(HttpServiceRequest request) throws Exception {
+        HttpServiceResponse response = new HttpServiceResponse();
+
+        if (HttpServer.Method.PUT == request.getMethod()) {
+            try {
+                bka.triggerAudit();
+            } catch (Exception e) {
+                LOG.error("Meet Exception: ", e);
+                response.setCode(HttpServer.StatusCode.NOT_FOUND);
+                response.setBody("Exception when do operation." + e.getMessage());
+                return response;
+            }
+
+            response.setCode(HttpServer.StatusCode.OK);
+            response.setBody("Success trigger audit.");
+            LOG.debug("response body:" + response.getBody());
+            return response;
+        } else {
+            response.setCode(HttpServer.StatusCode.NOT_FOUND);
+            response.setBody("Not found method. Should be PUT method");
+            return response;
+        }
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/WhoIsAuditorService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/WhoIsAuditorService.java
new file mode 100644
index 0000000..2e4837c
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/WhoIsAuditorService.java
@@ -0,0 +1,89 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.http;
+
+import com.google.common.base.Preconditions;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.http.service.HttpEndpointService;
+import org.apache.bookkeeper.http.service.HttpServiceRequest;
+import org.apache.bookkeeper.http.service.HttpServiceResponse;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.replication.AuditorElector;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HttpEndpointService that handle Bookkeeper who is auditor related http request.
+ *
+ * The GET method will get the auditor bookie address
+ */
+public class WhoIsAuditorService implements HttpEndpointService {
+
+    static final Logger LOG = LoggerFactory.getLogger(WhoIsAuditorService.class);
+
+    protected ServerConfiguration conf;
+    protected ZooKeeper zk;
+
+    public WhoIsAuditorService(ServerConfiguration conf, ZooKeeper zk) {
+        Preconditions.checkNotNull(conf);
+        this.conf = conf;
+        this.zk = zk;
+    }
+
+    /*
+     * Print the node which holds the auditor lock.
+     */
+    @Override
+    public HttpServiceResponse handle(HttpServiceRequest request) throws Exception {
+        HttpServiceResponse response = new HttpServiceResponse();
+
+        if (HttpServer.Method.GET == request.getMethod()) {
+            BookieSocketAddress bookieId = null;
+            try {
+                bookieId = AuditorElector.getCurrentAuditor(conf, zk);
+
+                if (bookieId == null) {
+                    response.setCode(HttpServer.StatusCode.NOT_FOUND);
+                    response.setBody("No auditor elected");
+                    return response;
+                }
+            } catch (Exception e) {
+                LOG.error("Meet Exception: ", e);
+                response.setCode(HttpServer.StatusCode.NOT_FOUND);
+                response.setBody("Exception when get." + e.getMessage());
+                return response;
+            }
+
+            response.setCode(HttpServer.StatusCode.OK);
+            response.setBody("Auditor: "
+                + bookieId.getSocketAddress().getAddress().getCanonicalHostName() + "/"
+                + bookieId.getSocketAddress().getAddress().getHostAddress() + ":"
+                + bookieId.getSocketAddress().getPort());
+            LOG.debug("response body:" + response.getBody());
+            return response;
+        } else {
+            response.setCode(HttpServer.StatusCode.NOT_FOUND);
+            response.setBody("Not found method. Should be GET method");
+            return response;
+        }
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
index a38556a..21cde18 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
@@ -32,7 +32,7 @@ import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.ExitCode;
 import org.apache.bookkeeper.bookie.ReadOnlyBookie;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.http.BKServiceProvider;
+import org.apache.bookkeeper.http.BKHttpServiceProvider;
 import org.apache.bookkeeper.http.HttpServer;
 import org.apache.bookkeeper.http.HttpServerLoader;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -107,7 +107,7 @@ public class BookieServer {
                 new Bookie(conf, statsLogger.scope(BOOKIE_SCOPE));
     }
 
-    public void start() throws IOException, UnavailableException, InterruptedException {
+    public void start() throws IOException, UnavailableException, InterruptedException, KeeperException {
         this.bookie.start();
         // fail fast, when bookie startup is not successful
         if (!this.bookie.isRunning()) {
@@ -115,7 +115,7 @@ public class BookieServer {
             return;
         }
         if (conf.isHttpServerEnabled()) {
-            BKServiceProvider serviceProvider = new BKServiceProvider.Builder()
+            BKHttpServiceProvider serviceProvider = new BKHttpServiceProvider.Builder()
                 .setBookieServer(this)
                 .setServerConfiguration(conf)
                 .build();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
index 96f4091..9c3c019 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
@@ -32,7 +32,7 @@ import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieCriticalThread;
 import org.apache.bookkeeper.bookie.ExitCode;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.http.BKServiceProvider;
+import org.apache.bookkeeper.http.BKHttpServiceProvider;
 import org.apache.bookkeeper.http.HttpServer;
 import org.apache.bookkeeper.http.HttpServerLoader;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
@@ -302,7 +302,7 @@ public class AutoRecoveryMain {
             HttpServerLoader.loadHttpServer(conf);
             final HttpServer httpServer = HttpServerLoader.get();
             if (conf.isHttpServerEnabled() && httpServer != null) {
-                BKServiceProvider serviceProvider = new BKServiceProvider.Builder()
+                BKHttpServiceProvider serviceProvider = new BKHttpServiceProvider.Builder()
                     .setAutoRecovery(autoRecoveryMain)
                     .setServerConfiguration(conf)
                     .build();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
index e4ac80c..99a93ae 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
@@ -32,7 +32,7 @@ import org.apache.bookkeeper.common.component.ComponentStarter;
 import org.apache.bookkeeper.common.component.LifecycleComponent;
 import org.apache.bookkeeper.common.component.LifecycleComponentStack;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.http.BKServiceProvider;
+import org.apache.bookkeeper.http.BKHttpServiceProvider;
 import org.apache.bookkeeper.server.component.ServerLifecycleComponent;
 import org.apache.bookkeeper.server.conf.BookieConfiguration;
 import org.apache.bookkeeper.server.service.AutoRecoveryService;
@@ -300,7 +300,7 @@ public class Main {
 
         // 4. build http service
         if (conf.getServerConf().isHttpServerEnabled()) {
-            BKServiceProvider provider = new BKServiceProvider.Builder()
+            BKHttpServiceProvider provider = new BKHttpServiceProvider.Builder()
                 .setBookieServer(bookieService.getServer())
                 .setServerConfiguration(conf.getServerConf())
                 .build();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/BookieService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/BookieService.java
index 13a77e8..7b43122 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/BookieService.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/BookieService.java
@@ -24,6 +24,7 @@ import org.apache.bookkeeper.replication.ReplicationException.UnavailableExcepti
 import org.apache.bookkeeper.server.component.ServerLifecycleComponent;
 import org.apache.bookkeeper.server.conf.BookieConfiguration;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * A {@link ServerLifecycleComponent} that starts the core bookie server.
@@ -49,7 +50,7 @@ public class BookieService extends ServerLifecycleComponent {
     protected void doStart() {
         try {
             this.server.start();
-        } catch (IOException | UnavailableException | InterruptedException e) {
+        } catch (IOException | UnavailableException | InterruptedException | KeeperException e) {
             throw new RuntimeException("Failed to start bookie server", e);
         }
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/HttpService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/HttpService.java
index 78f97fe..3f81377 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/HttpService.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/HttpService.java
@@ -21,7 +21,7 @@ package org.apache.bookkeeper.server.service;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.io.IOException;
-import org.apache.bookkeeper.http.BKServiceProvider;
+import org.apache.bookkeeper.http.BKHttpServiceProvider;
 import org.apache.bookkeeper.http.HttpServer;
 import org.apache.bookkeeper.http.HttpServerLoader;
 import org.apache.bookkeeper.server.component.ServerLifecycleComponent;
@@ -37,7 +37,7 @@ public class HttpService extends ServerLifecycleComponent {
 
     private HttpServer server;
 
-    public HttpService(BKServiceProvider provider,
+    public HttpService(BKHttpServiceProvider provider,
                        BookieConfiguration conf,
                        StatsLogger statsLogger) {
         super(NAME, conf, statsLogger);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
index cf4fb56..c2c8ad9 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
@@ -24,8 +24,6 @@ import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory;
-import org.apache.bookkeeper.meta.MSLedgerManagerFactory;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
@@ -139,7 +137,7 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
      * @return List of LedgerHandles for each of the ledgers created
      */
     private List<LedgerHandle> createLedgers(int numLedgers)
-            throws BKException, IOException, InterruptedException
+      throws BKException, IOException, InterruptedException
     {
         return createLedgers(numLedgers, 3, 2);
     }
@@ -154,18 +152,18 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
      * @return List of LedgerHandles for each of the ledgers created
      */
     private List<LedgerHandle> createLedgers(int numLedgers, int ensemble, int quorum)
-            throws BKException, IOException,
-        InterruptedException {
+      throws BKException, IOException,
+      InterruptedException {
         List<LedgerHandle> lhs = new ArrayList<LedgerHandle>();
         for (int i = 0; i < numLedgers; i++) {
             lhs.add(bkc.createLedger(ensemble, quorum,
-                                     digestType, baseClientConf.getBookieRecoveryPasswd()));
+              digestType, baseClientConf.getBookieRecoveryPasswd()));
         }
         return lhs;
     }
 
     private List<LedgerHandle> openLedgers(List<LedgerHandle> oldLhs)
-            throws Exception {
+      throws Exception {
         List<LedgerHandle> newLhs = new ArrayList<LedgerHandle>();
         for (LedgerHandle oldLh : oldLhs) {
             newLhs.add(bkc.openLedger(oldLh.getId(), digestType, baseClientConf.getBookieRecoveryPasswd()));
@@ -187,7 +185,7 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
      */
     private void writeEntriestoLedgers(int numEntries, long startEntryId,
                                        List<LedgerHandle> lhs)
-        throws BKException, InterruptedException {
+      throws BKException, InterruptedException {
         for (LedgerHandle lh : lhs) {
             for (int i = 0; i < numEntries; i++) {
                 lh.addEntry(("LedgerId: " + lh.getId() + ", EntryId: " + (startEntryId + i)).getBytes());
@@ -214,7 +212,7 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
      * @throws InterruptedException
      */
     private void verifyRecoveredLedgers(List<LedgerHandle> oldLhs, long startEntryId, long endEntryId) throws BKException,
-        InterruptedException {
+      InterruptedException {
         // Get a set of LedgerHandles for all of the ledgers to verify
         List<LedgerHandle> lhs = new ArrayList<LedgerHandle>();
         for (int i = 0; i < oldLhs.size(); i++) {
@@ -227,7 +225,7 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
             while (entries.hasMoreElements()) {
                 LedgerEntry entry = entries.nextElement();
                 assertTrue(new String(entry.getEntry()).equals("LedgerId: " + entry.getLedgerId() + ", EntryId: "
-                           + entry.getEntryId()));
+                  + entry.getEntryId()));
             }
         }
 
@@ -323,11 +321,11 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
 
         // Call the async recover bookie method.
         BookieSocketAddress bookieSrc = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(),
-                initialPort);
+          initialPort);
         BookieSocketAddress bookieDest = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(),
-                newBookiePort);
+          newBookiePort);
         LOG.info("Now recover the data on the killed bookie (" + bookieSrc + ") and replicate it to the new one ("
-                 + bookieDest + ")");
+          + bookieDest + ")");
         // Initiate the sync object
         sync.value = false;
         bkAdmin.asyncRecoverBookieData(bookieSrc, bookieDest, bookieRecoverCb, sync);
@@ -381,10 +379,10 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
 
         // Call the async recover bookie method.
         BookieSocketAddress bookieSrc = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(),
-                initialPort);
+          initialPort);
         BookieSocketAddress bookieDest = null;
         LOG.info("Now recover the data on the killed bookie (" + bookieSrc
-                 + ") and replicate it to a random available one");
+          + ") and replicate it to a random available one");
         // Initiate the sync object
         sync.value = false;
         bkAdmin.asyncRecoverBookieData(bookieSrc, bookieDest, bookieRecoverCb, sync);
@@ -435,11 +433,11 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
 
         // Call the sync recover bookie method.
         BookieSocketAddress bookieSrc = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(),
-                initialPort);
+          initialPort);
         BookieSocketAddress bookieDest = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(),
-                newBookiePort);
+          newBookiePort);
         LOG.info("Now recover the data on the killed bookie (" + bookieSrc + ") and replicate it to the new one ("
-                 + bookieDest + ")");
+          + bookieDest + ")");
         bkAdmin.recoverBookieData(bookieSrc, bookieDest);
 
         // Verify the recovered ledger entries are okay.
@@ -483,10 +481,10 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
 
         // Call the sync recover bookie method.
         BookieSocketAddress bookieSrc = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(),
-                initialPort);
+          initialPort);
         BookieSocketAddress bookieDest = null;
         LOG.info("Now recover the data on the killed bookie (" + bookieSrc
-                 + ") and replicate it to a random available one");
+          + ") and replicate it to a random available one");
         bkAdmin.recoverBookieData(bookieSrc, bookieDest);
 
         // Verify the recovered ledger entries are okay.
@@ -530,7 +528,7 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
 
         HashMap<Long, Long> ranges = new HashMap<Long, Long>();
         ArrayList<Long> keyList = Collections.list(
-                Collections.enumeration(ensembles.keySet()));
+          Collections.enumeration(ensembles.keySet()));
         Collections.sort(keyList);
         for (int i = 0; i < keyList.size() - 1; i++) {
             ranges.put(keyList.get(i), keyList.get(i+1));
@@ -554,10 +552,10 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
             long numSuccess = cb.await();
             if (numSuccess < expectedSuccess) {
                 LOG.warn("Fragment not fully replicated ledgerId = " + lh.getId()
-                         + " startEntryId = " + startEntryId
-                         + " endEntryId = " + endEntryId
-                         + " expectedSuccess = " + expectedSuccess
-                         + " gotSuccess = " + numSuccess);
+                  + " startEntryId = " + startEntryId
+                  + " endEntryId = " + endEntryId
+                  + " expectedSuccess = " + expectedSuccess
+                  + " gotSuccess = " + numSuccess);
                 return false;
             }
         }
@@ -612,7 +610,7 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
                 for (BookieSocketAddress addr : e.getValue()) {
                     if (set.contains(addr)) {
                         LOG.error("Dupe " + addr + " found in ensemble for fragment " + fragment
-                                + " of ledger " + lh.getId());
+                          + " of ledger " + lh.getId());
                         numDupes++;
                     }
                     set.add(addr);
@@ -639,7 +637,7 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
 
         // Shutdown last bookie server in last ensemble
         ArrayList<BookieSocketAddress> lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles()
-                                                       .entrySet().iterator().next().getValue();
+          .entrySet().iterator().next().getValue();
         BookieSocketAddress bookieToKill = lastEnsemble.get(lastEnsemble.size() - 1);
         killBookie(bookieToKill);
 
@@ -648,7 +646,7 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
 
         BookieSocketAddress bookieDest = null;
         LOG.info("Now recover the data on the killed bookie (" + bookieToKill
-               + ") and replicate it to a random available one");
+          + ") and replicate it to a random available one");
 
         bkAdmin.recoverBookieData(bookieToKill, bookieDest);
         for (LedgerHandle lh : lhs) {
@@ -669,7 +667,7 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
 
         // Shutdown the first bookie server
         ArrayList<BookieSocketAddress> lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles()
-                                                       .entrySet().iterator().next().getValue();
+          .entrySet().iterator().next().getValue();
         BookieSocketAddress bookieToKill = lastEnsemble.get(lastEnsemble.size() - 1);
         killBookie(bookieToKill);
 
@@ -678,7 +676,7 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
 
         BookieSocketAddress bookieDest = null;
         LOG.info("Now recover the data on the killed bookie (" + bookieToKill
-               + ") and replicate it to a random available one");
+          + ") and replicate it to a random available one");
 
         bkAdmin.recoverBookieData(bookieToKill, bookieDest);
 
@@ -706,7 +704,7 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
 
         // Shutdown the first bookie server
         ArrayList<BookieSocketAddress> lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles()
-                                                       .entrySet().iterator().next().getValue();
+          .entrySet().iterator().next().getValue();
         // removed bookie
         BookieSocketAddress bookieToKill = lastEnsemble.get(0);
         killBookie(bookieToKill);
@@ -749,7 +747,7 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
         for (LedgerHandle newLh : newLhs) {
             // first ensemble should contains bookieToKill2 and not contain bookieToKill
             Map.Entry<Long, ArrayList<BookieSocketAddress>> entry =
-                newLh.getLedgerMetadata().getEnsembles().entrySet().iterator().next();
+              newLh.getLedgerMetadata().getEnsembles().entrySet().iterator().next();
             assertFalse(entry.getValue().contains(bookieToKill));
             assertTrue(entry.getValue().contains(bookieToKill2));
         }
@@ -774,9 +772,9 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
 
         // Call the async recover bookie method.
         BookieSocketAddress bookieSrc = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(),
-                initialPort);
+          initialPort);
         LOG.info("Now recover the data on the killed bookie (" + bookieSrc
-                 + ") and replicate it to a random available one");
+          + ") and replicate it to a random available one");
         // Initiate the sync object
         sync.value = false;
         try {
@@ -815,7 +813,7 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
 
         // Call the async recover bookie method.
         LOG.info("Now recover the data on the killed bookie (" + bookieSrc
-                 + ") and replicate it to a random available one");
+          + ") and replicate it to a random available one");
         // Initiate the sync object
         sync.value = false;
         bkAdmin.recoverBookieData(bookieSrc, null);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/http/TestHttpService.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/http/TestHttpService.java
index 2001f93..7a49b30 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/http/TestHttpService.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/http/TestHttpService.java
@@ -20,50 +20,735 @@
  */
 package org.apache.bookkeeper.http;
 
-import java.util.Map;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
-import org.apache.bookkeeper.http.service.Service;
-import org.apache.bookkeeper.http.service.ServiceResponse;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Maps;
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerHandleAdapter;
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.http.service.HttpEndpointService;
+import org.apache.bookkeeper.http.service.HttpServiceRequest;
+import org.apache.bookkeeper.http.service.HttpServiceResponse;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.metastore.InMemoryMetaStore;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.replication.AuditorElector;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.test.TestCallbacks;
 import org.apache.bookkeeper.util.JsonUtil;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Before;
 import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TestHttpService extends BookKeeperClusterTestCase {
 
-    private BKServiceProvider serviceProvider;
+    static final Logger LOG = LoggerFactory.getLogger(TestHttpService.class);
+
+    private BKHttpServiceProvider bkHttpServiceProvider;
+    private static final int numberOfBookies = 6;
 
     public TestHttpService() {
-        super(0);
-        this.serviceProvider = new BKServiceProvider.Builder()
-            .setServerConfiguration(baseConf)
-            .build();
+        super(numberOfBookies);
+        try {
+            File tmpDir = createTempDir("bookie_http", "test");
+            baseConf.setJournalDirName(tmpDir.getPath())
+              .setLedgerDirNames(
+                new String[]{tmpDir.getPath()});
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        this.bkHttpServiceProvider = new BKHttpServiceProvider.Builder()
+          .setServerConfiguration(baseConf)
+          .build();
     }
 
     @Test
     public void testHeartbeatService() throws Exception {
         // test heartbeat service
-        Service heartbeatService = serviceProvider.provideHeartbeatService();
-        ServiceResponse response = heartbeatService.handle(null);
+        HttpEndpointService heartbeatService = bkHttpServiceProvider
+          .provideHttpEndpointService(HttpServer.ApiType.HEARTBEAT);
+        HttpServiceResponse response = heartbeatService.handle(null);
         assertEquals(HttpServer.StatusCode.OK.getValue(), response.getStatusCode());
         assertEquals("OK\n", response.getBody());
     }
 
     @Test
-    public void testConfigService() throws Exception {
+    public void testConfigServiceGet() throws Exception {
+        try {
+            // test config service
+            String testProperty = "TEST_PROPERTY";
+            String testValue = "TEST_VALUE";
+            baseConf.setProperty(testProperty, testValue);
+            HttpEndpointService configService = bkHttpServiceProvider
+              .provideHttpEndpointService(HttpServer.ApiType.SERVER_CONFIG);
+            HttpServiceRequest getRequest = new HttpServiceRequest(null, HttpServer.Method.GET, null);
+            HttpServiceResponse response = configService.handle(getRequest);
+            Map configMap = JsonUtil.fromJson(
+              response.getBody(),
+              Map.class
+            );
+            assertEquals(HttpServer.StatusCode.OK.getValue(), response.getStatusCode());
+            assertEquals(testValue, configMap.get(testProperty));
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testConfigServicePut() throws Exception {
         // test config service
-        String testProperty = "TEST_PROPERTY";
-        String testValue = "TEST_VALUE";
-        baseConf.setProperty(testProperty, testValue);
-        Service configService = serviceProvider.provideConfigurationService();
-        ServiceResponse response = configService.handle(null);
+        HttpEndpointService configService = bkHttpServiceProvider
+          .provideHttpEndpointService(HttpServer.ApiType.SERVER_CONFIG);
+        // properties to be set
+        String putBody = "{\"TEST_PROPERTY1\": \"TEST_VALUE1\", \"TEST_PROPERTY2\": 2,  \"TEST_PROPERTY3\": true }";
+
+        // null body, should return NOT_FOUND
+        HttpServiceRequest putRequest1 = new HttpServiceRequest(null, HttpServer.Method.PUT, null);
+        HttpServiceResponse putResponse1 = configService.handle(putRequest1);
+        assertEquals(HttpServer.StatusCode.NOT_FOUND.getValue(), putResponse1.getStatusCode());
+
+        // Method DELETE, should return NOT_FOUND
+        HttpServiceRequest putRequest2 = new HttpServiceRequest(putBody, HttpServer.Method.DELETE, null);
+        HttpServiceResponse putResponse2 = configService.handle(putRequest2);
+        assertEquals(HttpServer.StatusCode.NOT_FOUND.getValue(), putResponse2.getStatusCode());
+
+        // Normal PUT, should success, then verify using get method
+        HttpServiceRequest putRequest3 = new HttpServiceRequest(putBody, HttpServer.Method.PUT, null);
+        HttpServiceResponse putResponse3 = configService.handle(putRequest3);
+        assertEquals(HttpServer.StatusCode.OK.getValue(), putResponse3.getStatusCode());
+
+        // Get all the config
+        HttpServiceRequest getRequest = new HttpServiceRequest(null, HttpServer.Method.GET, null);
+        HttpServiceResponse response = configService.handle(getRequest);
         Map configMap = JsonUtil.fromJson(
-            response.getBody(),
-            Map.class
+          response.getBody(),
+          Map.class
         );
+
+        // verify response code
         assertEquals(HttpServer.StatusCode.OK.getValue(), response.getStatusCode());
-        assertEquals(testValue, configMap.get(testProperty));
+        // verify response body
+        assertEquals("TEST_VALUE1", configMap.get("TEST_PROPERTY1"));
+        assertEquals("2", configMap.get("TEST_PROPERTY2"));
+        assertEquals("true", configMap.get("TEST_PROPERTY3"));
+    }
+
+    @Test
+    public void testListBookiesService() throws Exception {
+        baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        HttpEndpointService listBookiesService = bkHttpServiceProvider
+          .provideHttpEndpointService(HttpServer.ApiType.LIST_BOOKIES);
+
+        //1,  null parameters, should print rw bookies, without hostname
+        HttpServiceRequest request1 = new HttpServiceRequest(null, HttpServer.Method.GET, null);
+        HttpServiceResponse response1 = listBookiesService.handle(request1);
+        assertEquals(HttpServer.StatusCode.OK.getValue(), response1.getStatusCode());
+        // get response , expected get 3 bookies, and without hostname
+        @SuppressWarnings("unchecked")
+        HashMap<String, String> respBody = JsonUtil.fromJson(response1.getBody(), HashMap.class);
+        assertEquals(numberOfBookies, respBody.size());
+        for (int i = 0; i < numberOfBookies; i++) {
+            assertEquals(true, respBody.containsKey(getBookie(i).toString()));
+            assertEquals(null, respBody.get(getBookie(i).toString()));
+        }
+
+        //2, parameter: type=rw&print_hostnames=true, should print rw bookies with hostname
+        HashMap<String, String> params = Maps.newHashMap();
+        params.put("type", "rw");
+        params.put("print_hostnames", "true");
+        HttpServiceRequest request2 = new HttpServiceRequest(null, HttpServer.Method.GET, params);
+        HttpServiceResponse response2 = listBookiesService.handle(request2);
+        assertEquals(HttpServer.StatusCode.OK.getValue(), response2.getStatusCode());
+        // get response , expected get numberOfBookies bookies, and with hostname
+        @SuppressWarnings("unchecked")
+        HashMap<String, String> respBody2 = JsonUtil.fromJson(response2.getBody(), HashMap.class);
+        assertEquals(numberOfBookies, respBody2.size());
+        for (int i = 0; i < numberOfBookies; i++) {
+            assertEquals(true, respBody2.containsKey(getBookie(i).toString()));
+            assertNotNull(respBody2.get(getBookie(i).toString()));
+        }
+
+        //3, parameter: type=ro&print_hostnames=true, should print ro bookies with hostname
+        // turn bookie 1 into ro, get it
+        setBookieToReadOnly(getBookie(1));
+        Thread.sleep(200);
+        HashMap<String, String> params3 = Maps.newHashMap();
+        params3.put("type", "ro");
+        params3.put("print_hostnames", "true");
+        HttpServiceRequest request3 = new HttpServiceRequest(null, HttpServer.Method.GET, params3);
+        HttpServiceResponse response3 = listBookiesService.handle(request3);
+        //LOG.info("Turn 1 bookies into RO, should get it in this request");
+        assertEquals(HttpServer.StatusCode.OK.getValue(), response3.getStatusCode());
+        // get response , expected get 1 ro bookies, and with hostname
+        @SuppressWarnings("unchecked")
+        HashMap<String, String> respBody3 = JsonUtil.fromJson(response3.getBody(), HashMap.class);
+        assertEquals(1, respBody3.size());
+        assertEquals(true, respBody3.containsKey(getBookie(1).toString()));
+
+        // get other 5 rw bookies.
+        HashMap<String, String> params4 = Maps.newHashMap();
+        params4.put("type", "rw");
+        params4.put("print_hostnames", "true");
+        HttpServiceRequest request4 = new HttpServiceRequest(null, HttpServer.Method.GET, params4);
+        HttpServiceResponse response4 = listBookiesService.handle(request4);
+        assertEquals(HttpServer.StatusCode.OK.getValue(), response4.getStatusCode());
+        @SuppressWarnings("unchecked")
+        HashMap<String, String> respBody4 = JsonUtil.fromJson(response4.getBody(), HashMap.class);
+        assertEquals(5, respBody4.size());
+        assertEquals(true, respBody4.containsKey(getBookie(2).toString()));
+    }
+
+    /**
+     * create ledgers, then test ListLedgerService
+     */
+    @Test
+    public void testListLedgerService() throws Exception {
+        baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        BookKeeper.DigestType digestType = BookKeeper.DigestType.CRC32;
+        int numLedgers = 430;
+        LedgerHandle[] lh = new LedgerHandle[numLedgers];
+        // create ledgers
+        for (int i = 0; i < numLedgers; i++) {
+            lh[i] = bkc.createLedger(digestType, "password".getBytes());
+        }
+
+        HttpEndpointService listLedgerService = bkHttpServiceProvider
+          .provideHttpEndpointService(HttpServer.ApiType.LIST_LEDGER);
+
+        //1,  null parameters, should print ledger ids, without metadata
+        HttpServiceRequest request1 = new HttpServiceRequest(null, HttpServer.Method.GET, null);
+        HttpServiceResponse response1 = listLedgerService.handle(request1);
+        assertEquals(HttpServer.StatusCode.OK.getValue(), response1.getStatusCode());
+        // get response , expected get all ledgers, and without metadata
+        @SuppressWarnings("unchecked")
+        LinkedHashMap<String, String> respBody = JsonUtil.fromJson(response1.getBody(), LinkedHashMap.class);
+        assertEquals(numLedgers, respBody.size());
+        for (int i = 0; i < numLedgers; i++) {
+            assertEquals(true, respBody.containsKey(Long.valueOf(lh[i].getId()).toString()));
+            assertEquals(null, respBody.get(Long.valueOf(lh[i].getId()).toString()));
+        }
+
+        //2, parameter: print_metadata=true, should print ledger ids, with metadata
+        HashMap<String, String> params = Maps.newHashMap();
+        params.put("print_metadata", "true");
+        HttpServiceRequest request2 = new HttpServiceRequest(null, HttpServer.Method.GET, params);
+        HttpServiceResponse response2 = listLedgerService.handle(request2);
+        assertEquals(HttpServer.StatusCode.OK.getValue(), response2.getStatusCode());
+        // get response, expected get all ledgers, and with metadata
+        @SuppressWarnings("unchecked")
+        LinkedHashMap<String, String> respBody2 = JsonUtil.fromJson(response2.getBody(), LinkedHashMap.class);
+        assertEquals(numLedgers, respBody2.size());
+        for (int i = 0; i < numLedgers; i++) {
+            assertEquals(true, respBody2.containsKey(Long.valueOf(lh[i].getId()).toString()));
+            assertNotNull(respBody2.get(Long.valueOf(lh[i].getId()).toString()));
+        }
+
+        //3, parameter: print_metadata=true&page=5,
+        // since each page contains 100 ledgers, page=5 should print ledger ids, with metadata for(400--430)
+        HashMap<String, String> params3 = Maps.newHashMap();
+        params3.put("print_metadata", "true");
+        params3.put("page", "5");
+
+        HttpServiceRequest request3 = new HttpServiceRequest(null, HttpServer.Method.GET, params3);
+        HttpServiceResponse response3 = listLedgerService.handle(request3);
+        assertEquals(HttpServer.StatusCode.OK.getValue(), response3.getStatusCode());
+        // get response, expected get 4 ledgers, and with metadata
+        @SuppressWarnings("unchecked")
+        LinkedHashMap<String, String> respBody3 = JsonUtil.fromJson(response3.getBody(), LinkedHashMap.class);
+        assertEquals(31, respBody3.size());
+        for (int i = 400; i < 430; i++) {
+            assertEquals(true, respBody3.containsKey(Long.valueOf(lh[i].getId()).toString()));
+            assertNotNull(respBody3.get(Long.valueOf(lh[i].getId()).toString()));
+        }
+    }
+
+    /**
+     * create ledgers, then test Delete Ledger service
+     */
+    @Test
+    public void testDeleteLedgerService() throws Exception {
+        baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        BookKeeper.DigestType digestType = BookKeeper.DigestType.CRC32;
+        int numLedgers = 4;
+        int numMsgs = 100;
+        LedgerHandle[] lh = new LedgerHandle[numLedgers];
+        // create ledgers
+        for (int i = 0; i < numLedgers; i++) {
+            lh[i] = bkc.createLedger(digestType, "".getBytes());
+        }
+        String content = "Apache BookKeeper is cool!";
+        // add entries
+        for (int i = 0; i < numMsgs; i++) {
+            for (int j = 0; j < numLedgers; j++) {
+                lh[j].addEntry(content.getBytes());
+            }
+        }
+        // close ledgers
+        for (int i = 0; i < numLedgers; i++) {
+            lh[i].close();
+        }
+
+        HttpEndpointService deleteLedgerService = bkHttpServiceProvider
+          .provideHttpEndpointService(HttpServer.ApiType.DELETE_LEDGER);
+
+        //1,  null parameters of GET, should return NOT_FOUND
+        HttpServiceRequest request1 = new HttpServiceRequest(null, HttpServer.Method.GET, null);
+        HttpServiceResponse response1 = deleteLedgerService.handle(request1);
+        assertEquals(HttpServer.StatusCode.NOT_FOUND.getValue(), response1.getStatusCode());
+
+        //2,  null parameters of DELETE, should return NOT_FOUND
+        HttpServiceRequest request2 = new HttpServiceRequest(null, HttpServer.Method.DELETE, null);
+        HttpServiceResponse response2 = deleteLedgerService.handle(request2);
+        assertEquals(HttpServer.StatusCode.NOT_FOUND.getValue(), response2.getStatusCode());
+
+        //3,  delete first ledger, should return OK, and should only get 3 ledgers after delete.
+        HashMap<String, String> params = Maps.newHashMap();
+        Long ledgerId = Long.valueOf(lh[0].getId());
+        params.put("ledger_id", ledgerId.toString());
+        HttpServiceRequest request3 = new HttpServiceRequest(null, HttpServer.Method.DELETE, params);
+        HttpServiceResponse response3 = deleteLedgerService.handle(request3);
+        assertEquals(HttpServer.StatusCode.OK.getValue(), response3.getStatusCode());
+        // use list Ledger to verify left 3 ledger
+        HttpEndpointService listLedgerService = bkHttpServiceProvider
+          .provideHttpEndpointService(HttpServer.ApiType.LIST_LEDGER);
+        HttpServiceRequest request4 = new HttpServiceRequest(null, HttpServer.Method.GET, null);
+        HttpServiceResponse response4 = listLedgerService.handle(request4);
+        assertEquals(HttpServer.StatusCode.OK.getValue(), response4.getStatusCode());
+        // get response , expected get 3 ledgers
+        @SuppressWarnings("unchecked")
+        LinkedHashMap<String, String> respBody = JsonUtil.fromJson(response4.getBody(), LinkedHashMap.class);
+        assertEquals(3, respBody.size());
+    }
+
+    @Test
+    public void testGetLedgerMetaService() throws Exception {
+        baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        BookKeeper.DigestType digestType = BookKeeper.DigestType.CRC32;
+        int numLedgers = 4;
+        int numMsgs = 100;
+        LedgerHandle[] lh = new LedgerHandle[numLedgers];
+        // create ledgers
+        for (int i = 0; i < numLedgers; i++) {
+            lh[i] = bkc.createLedger(digestType, "password".getBytes());
+        }
+        String content = "Apache BookKeeper is cool!";
+        // add entries
+        for (int i = 0; i < numMsgs; i++) {
+            for (int j = 0; j < numLedgers; j++) {
+                lh[j].addEntry(content.getBytes());
+            }
+        }
+        // close ledgers
+        for (int i = 0; i < numLedgers; i++) {
+            lh[i].close();
+        }
+        HttpEndpointService getLedgerMetaService = bkHttpServiceProvider
+          .provideHttpEndpointService(HttpServer.ApiType.GET_LEDGER_META);
+
+        //1,  null parameters of GET, should return NOT_FOUND
+        HttpServiceRequest request1 = new HttpServiceRequest(null, HttpServer.Method.GET, null);
+        HttpServiceResponse response1 = getLedgerMetaService.handle(request1);
+        assertEquals(HttpServer.StatusCode.NOT_FOUND.getValue(), response1.getStatusCode());
+
+        //2,  parameters for GET first ledger, should return OK, and contains metadata
+        HashMap<String, String> params = Maps.newHashMap();
+        Long ledgerId = Long.valueOf(lh[0].getId());
+        params.put("ledger_id", ledgerId.toString());
+        HttpServiceRequest request2 = new HttpServiceRequest(null, HttpServer.Method.GET, params);
+        HttpServiceResponse response2 = getLedgerMetaService.handle(request2);
+        assertEquals(HttpServer.StatusCode.OK.getValue(), response2.getStatusCode());
+        @SuppressWarnings("unchecked")
+        HashMap<String, String> respBody = JsonUtil.fromJson(response2.getBody(), HashMap.class);
+        assertEquals(1, respBody.size());
+        // verify LedgerMetadata content is equal
+        assertTrue(respBody.get(ledgerId.toString()).toString()
+          .equals(new String(lh[0].getLedgerMetadata().serialize())));
+    }
+
+    @Test
+    public void testReadLedgerEntryService() throws Exception {
+        baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        BookKeeper.DigestType digestType = BookKeeper.DigestType.CRC32;
+        int numLedgers = 1;
+        int numMsgs = 100;
+        LedgerHandle[] lh = new LedgerHandle[numLedgers];
+        // create ledgers
+        for (int i = 0; i < numLedgers; i++) {
+            lh[i] = bkc.createLedger(digestType, "".getBytes());
+        }
+        String content = "Apache BookKeeper is cool!";
+        // add entries
+        for (int i = 0; i < numMsgs; i++) {
+            for (int j = 0; j < numLedgers; j++) {
+                lh[j].addEntry(content.getBytes());
+            }
+        }
+        // close ledgers
+        for (int i = 0; i < numLedgers; i++) {
+            lh[i].close();
+        }
+        HttpEndpointService readLedgerEntryService = bkHttpServiceProvider
+          .provideHttpEndpointService(HttpServer.ApiType.READ_LEDGER_ENTRY);
+
+        //1,  null parameters of GET, should return NOT_FOUND
+        HttpServiceRequest request1 = new HttpServiceRequest(null, HttpServer.Method.GET, null);
+        HttpServiceResponse response1 = readLedgerEntryService.handle(request1);
+        assertEquals(HttpServer.StatusCode.NOT_FOUND.getValue(), response1.getStatusCode());
+
+        //2,  parameters for GET first ledger, should return OK
+        // no start/end entry id, so return all the 100 entries.
+        HashMap<String, String> params = Maps.newHashMap();
+        Long ledgerId = Long.valueOf(lh[0].getId());
+        params.put("ledger_id", ledgerId.toString());
+        HttpServiceRequest request2 = new HttpServiceRequest(null, HttpServer.Method.GET, params);
+        HttpServiceResponse response2 = readLedgerEntryService.handle(request2);
+        assertEquals(HttpServer.StatusCode.OK.getValue(), response2.getStatusCode());
+        @SuppressWarnings("unchecked")
+        HashMap<String, String> respBody = JsonUtil.fromJson(response2.getBody(), HashMap.class);
+        // default return all the entries. so should have 100 entries return
+        assertEquals(100, respBody.size());
+
+        //2,  parameters for GET first ledger, should return OK
+        // start_entry_id=1, end_entry_id=77, so return 77 entries.
+        HashMap<String, String> params3 = Maps.newHashMap();
+        params3.put("ledger_id", ledgerId.toString());
+        params3.put("start_entry_id", "1");
+        params3.put("end_entry_id", "77");
+        HttpServiceRequest request3 = new HttpServiceRequest(null, HttpServer.Method.GET, params3);
+        HttpServiceResponse response3 = readLedgerEntryService.handle(request3);
+        assertEquals(HttpServer.StatusCode.OK.getValue(), response3.getStatusCode());
+        @SuppressWarnings("unchecked")
+        HashMap<String, String> respBody3 = JsonUtil.fromJson(response3.getBody(), HashMap.class);
+        assertEquals(77, respBody3.size());
+        // Verify the entry content that we got.
+        assertTrue(respBody3.get("17").equals(content));
+    }
+
+    @Test
+    public void testListBookieInfoService() throws Exception {
+        baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        HttpEndpointService listBookieInfoService = bkHttpServiceProvider
+          .provideHttpEndpointService(HttpServer.ApiType.LIST_BOOKIE_INFO);
+
+        //1,  PUT method, should return NOT_FOUND
+        HttpServiceRequest request1 = new HttpServiceRequest(null, HttpServer.Method.PUT, null);
+        HttpServiceResponse response1 = listBookieInfoService.handle(request1);
+        assertEquals(HttpServer.StatusCode.NOT_FOUND.getValue(), response1.getStatusCode());
+
+        //2, GET method, expected get 6 bookies info and the cluster total info
+        HttpServiceRequest request2 = new HttpServiceRequest(null, HttpServer.Method.GET, null);
+        HttpServiceResponse response2 = listBookieInfoService.handle(request2);
+        assertEquals(HttpServer.StatusCode.OK.getValue(), response2.getStatusCode());
+        @SuppressWarnings("unchecked")
+        LinkedHashMap<String, String> respBody = JsonUtil.fromJson(response2.getBody(), LinkedHashMap.class);
+        assertEquals(numberOfBookies + 1, respBody.size());
+        for (int i = 0; i < numberOfBookies; i++) {
+            assertEquals(true, respBody.containsKey(getBookie(i).toString()));
+        }
+    }
+
+    @Test
+    public void testGetLastLogMarkService() throws Exception {
+        baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        BookKeeper.DigestType digestType = BookKeeper.DigestType.CRC32;
+        int numLedgers = 4;
+        int numMsgs = 100;
+        LedgerHandle[] lh = new LedgerHandle[numLedgers];
+        // create ledgers
+        for (int i = 0; i < numLedgers; i++) {
+            lh[i] = bkc.createLedger(digestType, "".getBytes());
+        }
+        String content = "Apache BookKeeper is cool!";
+        // add entries
+        for (int i = 0; i < numMsgs; i++) {
+            for (int j = 0; j < numLedgers; j++) {
+                lh[j].addEntry(content.getBytes());
+            }
+        }
+        // close ledgers
+        for (int i = 0; i < numLedgers; i++) {
+            lh[i].close();
+        }
+
+        HttpEndpointService getLastLogMarkService = bkHttpServiceProvider
+          .provideHttpEndpointService(HttpServer.ApiType.LAST_LOG_MARK);
+
+        //1,  null parameters of PUT, should fail
+        HttpServiceRequest request1 = new HttpServiceRequest(null, HttpServer.Method.PUT, null);
+        HttpServiceResponse response1 = getLastLogMarkService.handle(request1);
+        assertEquals(HttpServer.StatusCode.NOT_FOUND.getValue(), response1.getStatusCode());
+
+        //2,  null parameters of GET, should return 1 file
+        HttpServiceRequest request2 = new HttpServiceRequest(null, HttpServer.Method.GET, null);
+        HttpServiceResponse response2 = getLastLogMarkService.handle(request2);
+        assertEquals(HttpServer.StatusCode.OK.getValue(), response2.getStatusCode());
+        @SuppressWarnings("unchecked")
+        HashMap<String, String> respBody = JsonUtil.fromJson(response2.getBody(), HashMap.class);
+        assertEquals(1, respBody.size());
+    }
+
+    @Test
+    public void testListDiskFilesService() throws Exception {
+        baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        BookKeeper.DigestType digestType = BookKeeper.DigestType.CRC32;
+        int numLedgers = 4;
+        int numMsgs = 100;
+        LedgerHandle[] lh = new LedgerHandle[numLedgers];
+        // create ledgers
+        for (int i = 0; i < numLedgers; i++) {
+            lh[i] = bkc.createLedger(digestType, "".getBytes());
+        }
+        String content = "Apache BookKeeper is cool!";
+        // add entries
+        for (int i = 0; i < numMsgs; i++) {
+            for (int j = 0; j < numLedgers; j++) {
+                lh[j].addEntry(content.getBytes());
+            }
+        }
+        // close ledgers
+        for (int i = 0; i < numLedgers; i++) {
+            lh[i].close();
+        }
+
+        HttpEndpointService listDiskFileService = bkHttpServiceProvider
+          .provideHttpEndpointService(HttpServer.ApiType.LIST_DISK_FILE);
+
+        //1,  null parameters of GET, should return 3 kind of files: journal, entrylog and index files
+        HttpServiceRequest request1 = new HttpServiceRequest(null, HttpServer.Method.GET, null);
+        HttpServiceResponse response1 = listDiskFileService.handle(request1);
+        assertEquals(HttpServer.StatusCode.OK.getValue(), response1.getStatusCode());
+        @SuppressWarnings("unchecked")
+        HashMap<String, String> respBody = JsonUtil.fromJson(response1.getBody(), HashMap.class);
+        assertEquals(3, respBody.size());
+
+        //2,  parameters of GET journal file, should return journal files
+        HashMap<String, String> params = Maps.newHashMap();
+        params.put("file_type", "journal");
+        HttpServiceRequest request2 = new HttpServiceRequest(null, HttpServer.Method.GET, params);
+        HttpServiceResponse response2 = listDiskFileService.handle(request2);
+        assertEquals(HttpServer.StatusCode.OK.getValue(), response2.getStatusCode());
+        @SuppressWarnings("unchecked")
+        HashMap<String, String> respBody2 = JsonUtil.fromJson(response2.getBody(), HashMap.class);
+        assertEquals(1, respBody2.size());
+    }
+
+    @Test
+    public void testRecoveryBookieService() throws Exception {
+        baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
+
+        HttpEndpointService recoveryBookieService = bkHttpServiceProvider
+          .provideHttpEndpointService(HttpServer.ApiType.RECOVERY_BOOKIE);
+
+        //1,  null body of GET, should return error
+        HttpServiceRequest request1 = new HttpServiceRequest(null, HttpServer.Method.GET, null);
+        HttpServiceResponse response1 = recoveryBookieService.handle(request1);
+        assertEquals(HttpServer.StatusCode.NOT_FOUND.getValue(), response1.getStatusCode());
+
+        //2,  null body of PUT, should return error
+        HttpServiceRequest request2 = new HttpServiceRequest(null, HttpServer.Method.PUT, null);
+        HttpServiceResponse response2 = recoveryBookieService.handle(request2);
+        assertEquals(HttpServer.StatusCode.NOT_FOUND.getValue(), response2.getStatusCode());
+
+        //3, body with bookie_src, bookie_dest and delete_cookie of PUT, should success.
+        String bookieSrc = getBookie(0).toString();
+        String bookieDest = getBookie(1).toString();
+        String putBody = "{\"bookie_src\": [ \"" + bookieSrc + "\" ],"
+          + "\"bookie_dest\": [ \"" + bookieDest + "\" ],"
+          + "\"delete_cookie\": true }";
+        HttpServiceRequest request3 = new HttpServiceRequest(putBody, HttpServer.Method.PUT, null);
+        HttpServiceResponse response3 = recoveryBookieService.handle(request3);
+        assertEquals(HttpServer.StatusCode.OK.getValue(), response3.getStatusCode());
+
+        //4, body with bookie_src, and delete_cookie of PUT, should success.
+        String putBody4 = "{\"bookie_src\": [ \"" + bookieSrc + "\" ],"
+          + "\"delete_cookie\": false }";
+        HttpServiceRequest request4 = new HttpServiceRequest(putBody4, HttpServer.Method.PUT, null);
+        HttpServiceResponse response4 = recoveryBookieService.handle(request4);
+        assertEquals(HttpServer.StatusCode.OK.getValue(), response4.getStatusCode());
+
+        //5, body with bookie_src of PUT, should success.
+        String putBody5 = "{\"bookie_src\": [ \"" + bookieSrc + "\" ] }";
+        HttpServiceRequest request5 = new HttpServiceRequest(putBody5, HttpServer.Method.PUT, null);
+        HttpServiceResponse response5 = recoveryBookieService.handle(request5);
+        assertEquals(HttpServer.StatusCode.OK.getValue(), response5.getStatusCode());
+    }
+
+    ZooKeeper auditorZookeeper;
+    AuditorElector auditorElector;
+    private void startAuditorElector() throws Exception {
+        auditorZookeeper = ZooKeeperClient.newBuilder()
+          .connectString(zkUtil.getZooKeeperConnectString())
+          .sessionTimeoutMs(10000)
+          .build();
+        String addr = bs.get(0).getLocalAddress().toString();
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setAuditorPeriodicBookieCheckInterval(1);
+        auditorElector = new AuditorElector(addr, conf,
+          auditorZookeeper);
+        auditorElector.start();
+    }
+
+    private void stopAuditorElector() throws Exception {
+        auditorElector.shutdown();
+        auditorZookeeper.close();
+    }
+
+    @Test
+    public void testTriggerAuditService() throws Exception {
+        baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        startAuditorElector();
+
+        HttpEndpointService triggerAuditService = bkHttpServiceProvider
+          .provideHttpEndpointService(HttpServer.ApiType.TRIGGER_AUDIT);
+
+        //1,  GET, should return error
+        HttpServiceRequest request1 = new HttpServiceRequest(null, HttpServer.Method.GET, null);
+        HttpServiceResponse response1 = triggerAuditService.handle(request1);
+        assertEquals(HttpServer.StatusCode.NOT_FOUND.getValue(), response1.getStatusCode());
+
+        //2,  PUT, should success.
+        killBookie(1);
+        Thread.sleep(500);
+        HttpServiceRequest request2 = new HttpServiceRequest(null, HttpServer.Method.PUT, null);
+        HttpServiceResponse response2 = triggerAuditService.handle(request2);
+        assertEquals(HttpServer.StatusCode.OK.getValue(), response2.getStatusCode());
+        stopAuditorElector();
+    }
+
+    @Test
+    public void testWhoIsAuditorService() throws Exception {
+        baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        startAuditorElector();
+
+        HttpEndpointService whoIsAuditorService = bkHttpServiceProvider
+          .provideHttpEndpointService(HttpServer.ApiType.WHO_IS_AUDITOR);
+
+        //1,  GET, should return success
+        HttpServiceRequest request1 = new HttpServiceRequest(null, HttpServer.Method.GET, null);
+        HttpServiceResponse response1 = whoIsAuditorService.handle(request1);
+        assertEquals(HttpServer.StatusCode.OK.getValue(), response1.getStatusCode());
+        LOG.info(response1.getBody());
+        stopAuditorElector();
+    }
+
+    @Test
+    public void testListUnderReplicatedLedgerService() throws Exception {
+        baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        startAuditorElector();
+
+        HttpEndpointService listUnderReplicatedLedgerService = bkHttpServiceProvider
+          .provideHttpEndpointService(HttpServer.ApiType.LIST_UNDER_REPLICATED_LEDGER);
+
+        //1,  PUT, should return error, because only support GET.
+        HttpServiceRequest request1 = new HttpServiceRequest(null, HttpServer.Method.PUT, null);
+        HttpServiceResponse response1 = listUnderReplicatedLedgerService.handle(request1);
+        assertEquals(HttpServer.StatusCode.NOT_FOUND.getValue(), response1.getStatusCode());
+
+
+        //2,  GET, should return success.
+        // first put ledger into rereplicate. then use api to list ur ledger.
+        LedgerManagerFactory mFactory = LedgerManagerFactory.newLedgerManagerFactory(bsConfs.get(0), zkc);
+        LedgerManager ledgerManager = mFactory.newLedgerManager();
+        final LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
+
+        LedgerHandle lh = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, "passwd".getBytes());
+        LedgerMetadata md = LedgerHandleAdapter.getLedgerMetadata(lh);
+        List<BookieSocketAddress> ensemble = md.getEnsembles().get(0L);
+        ensemble.set(0, new BookieSocketAddress("1.1.1.1", 1000));
+
+        TestCallbacks.GenericCallbackFuture<Void> cb = new TestCallbacks.GenericCallbackFuture<Void>();
+        ledgerManager.writeLedgerMetadata(lh.getId(), md, cb);
+        cb.get();
+
+        long underReplicatedLedger = -1;
+        for (int i = 0; i < 10; i++) {
+            underReplicatedLedger = underReplicationManager.pollLedgerToRereplicate();
+            if (underReplicatedLedger != -1) {
+                break;
+            }
+            Thread.sleep(1000);
+        }
+
+        HttpServiceRequest request2 = new HttpServiceRequest(null, HttpServer.Method.GET, null);
+        HttpServiceResponse response2 = listUnderReplicatedLedgerService.handle(request2);
+        assertEquals(HttpServer.StatusCode.OK.getValue(), response2.getStatusCode());
+        stopAuditorElector();
+    }
+
+    @Test
+    public void testLostBookieRecoveryDelayService() throws Exception {
+        baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
+
+        HttpEndpointService lostBookieRecoveryDelayService = bkHttpServiceProvider
+          .provideHttpEndpointService(HttpServer.ApiType.LOST_BOOKIE_RECOVERY_DELAY);
+
+        //1,  PUT with null, should return error, because should contains {"delay_seconds": <delay_seconds>}.
+        HttpServiceRequest request1 = new HttpServiceRequest(null, HttpServer.Method.PUT, null);
+        HttpServiceResponse response1 = lostBookieRecoveryDelayService.handle(request1);
+        assertEquals(HttpServer.StatusCode.NOT_FOUND.getValue(), response1.getStatusCode());
+
+        //2,  GET, should meet exception when get delay seconds
+        HttpServiceRequest request2 = new HttpServiceRequest(null, HttpServer.Method.GET, null);
+        HttpServiceResponse response2 = lostBookieRecoveryDelayService.handle(request2);
+        assertEquals(HttpServer.StatusCode.NOT_FOUND.getValue(), response2.getStatusCode());
+
+        //3, PUT, with body, should success
+        String putBody3 = "{\"delay_seconds\": 17 }";
+        HttpServiceRequest request3 = new HttpServiceRequest(putBody3, HttpServer.Method.PUT, null);
+        HttpServiceResponse response3 = lostBookieRecoveryDelayService.handle(request3);
+        assertEquals(HttpServer.StatusCode.OK.getValue(), response3.getStatusCode());
+    }
+
+    @Test
+    public void testDecommissionService() throws Exception {
+        baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        startAuditorElector();
+
+        HttpEndpointService decommissionService = bkHttpServiceProvider
+          .provideHttpEndpointService(HttpServer.ApiType.DECOMMISSION);
+
+        //1,  PUT with null, should return error, because should contains {"bookie_src": <bookie_address>}.
+        HttpServiceRequest request1 = new HttpServiceRequest(null, HttpServer.Method.PUT, null);
+        HttpServiceResponse response1 = decommissionService.handle(request1);
+        assertEquals(HttpServer.StatusCode.NOT_FOUND.getValue(), response1.getStatusCode());
+
+        //2,  GET, should fail for not support get
+        HttpServiceRequest request2 = new HttpServiceRequest(null, HttpServer.Method.GET, null);
+        HttpServiceResponse response2 = decommissionService.handle(request2);
+        assertEquals(HttpServer.StatusCode.NOT_FOUND.getValue(), response2.getStatusCode());
+
+        //3, PUT, with body, should success.
+        String putBody3 = "{\"bookie_src\": \"" + getBookie(1).toString() + "\"}";
+        HttpServiceRequest request3 = new HttpServiceRequest(putBody3, HttpServer.Method.PUT, null);
+        // after bookie kill, request should success
+        killBookie(1);
+        HttpServiceResponse response3 = decommissionService.handle(request3);
+        assertEquals(HttpServer.StatusCode.OK.getValue(), response3.getStatusCode());
+        stopAuditorElector();
     }
 
 }

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].