You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by gu...@apache.org on 2020/06/29 01:11:19 UTC

[pulsar-manager] branch master updated: Use Pulsar Admin instead of HttpUtil (#286)

This is an automated email from the ASF dual-hosted git repository.

guangning pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-manager.git


The following commit(s) were added to refs/heads/master by this push:
     new df69426  Use Pulsar Admin instead of HttpUtil (#286)
df69426 is described below

commit df69426972537e102bd055f457ced9b18751f4b2
Author: shustsud <51...@users.noreply.github.com>
AuthorDate: Mon Jun 29 10:11:12 2020 +0900

    Use Pulsar Admin instead of HttpUtil (#286)
    
    Master Issue: <https://github.com/apache/pulsar-manager/issues/249>
    
    ### Motivation
    
    See <https://github.com/apache/pulsar-manager/issues/249>.
    
    ### Modifications
    
    * Fixed <https://github.com/apache/pulsar-manager/issues/249>.
    * Add authentication to [EnvironmentForward class](https://github.com/apache/pulsar-manager/blob/master/src/main/java/org/apache/pulsar/manager/zuul/EnvironmentForward.java).
---
 build.gradle                                       |   7 +-
 gradle.properties                                  |   2 +-
 .../pulsar/manager/PulsarApplicationListener.java  |  46 ++--
 .../manager/controller/BrokerStatsController.java  |  14 -
 .../manager/controller/EnvironmentsController.java |  38 +--
 .../manager/controller/TopicsController.java       |  72 +----
 .../exception/PulsarAdminOperationException.java   |  21 ++
 .../pulsar/manager/service/BrokerStatsService.java |   2 -
 .../pulsar/manager/service/PulsarAdminService.java |  35 +++
 .../pulsar/manager/service/TopicsService.java      |   8 +-
 .../service/impl/BrokerStatsServiceImpl.java       |  47 ++--
 .../manager/service/impl/BrokersServiceImpl.java   |  61 +++--
 .../manager/service/impl/ClustersServiceImpl.java  |  61 +++--
 .../service/impl/EnvironmentCacheServiceImpl.java  |  67 +++--
 .../service/impl/NamespacesServiceImpl.java        |  34 ++-
 .../service/impl/PulsarAdminServiceImpl.java       | 155 +++++++++++
 .../manager/service/impl/TenantsServiceImpl.java   |  90 +++---
 .../manager/service/impl/TopicsServiceImpl.java    | 305 +++++++++++++++------
 .../pulsar/manager/zuul/EnvironmentForward.java    |  28 +-
 src/main/resources/application.properties          |   5 +
 .../dao/BrokerTokensRepositoryImplTest.java        |   8 +-
 .../dao/EnvironmentsRepositoryImplTest.java        |  12 +-
 .../manager/dao/NamespacesRepositoryImplTest.java  |  16 +-
 .../manager/dao/RoleBindingRepositoryImplTest.java |  16 +-
 .../manager/dao/RolesRepositoryImplTest.java       |  24 +-
 .../manager/dao/TenantsRepositoryImplTest.java     |  26 +-
 .../manager/dao/UsersRepositoryImplTest.java       |  20 +-
 .../manager/service/BookiesServiceImplTest.java    |   6 +-
 .../service/BrokerStatsServiceImplTest.java        | 241 ++++++++--------
 .../service/BrokerTokensServiceImplTest.java       |   2 +-
 .../manager/service/BrokersServiceImplTest.java    |  63 ++---
 .../manager/service/ClustersServiceImplTest.java   |  90 +++---
 .../service/EnvironmentCacheServiceImplTest.java   | 143 +++++-----
 .../service/GithubLoginServiceImplTest.java        |  16 +-
 .../manager/service/NamespacesServiceImplTest.java | 115 ++++----
 .../service/PulsarAdminServiceImplTest.java        |  64 +++++
 .../manager/service/PulsarEventImplTest.java       |   8 +-
 .../service/RoleBindingServiceImplTest.java        |  26 +-
 .../manager/service/RolesServiceImplTest.java      |  22 +-
 .../manager/service/TenantsServiceImplTest.java    |  81 +++---
 .../manager/service/TopicsServiceImplTest.java     | 172 ++++++------
 .../manager/service/UsersServiceImplTest.java      |  15 +-
 42 files changed, 1324 insertions(+), 960 deletions(-)

diff --git a/build.gradle b/build.gradle
index a6a1389..b856ed3 100644
--- a/build.gradle
+++ b/build.gradle
@@ -36,6 +36,9 @@ plugins {
 repositories {
     mavenCentral()
     mavenLocal()
+    maven {
+        url "https://yahoo.bintray.com/maven"
+    }
 }
 
 configurations {
@@ -120,6 +123,8 @@ dependencies {
     compile group: 'com.google.code.gson', name: 'gson', version: gsonVersion
     compile group: 'org.apache.pulsar', name: 'pulsar-common', version: pulsarVersion
     compile group: 'org.apache.pulsar', name: 'pulsar-client-admin-original', version: pulsarVersion
+    compile group: 'org.apache.pulsar', name: 'pulsar-client-auth-athenz', version: pulsarVersion
+    compile group: 'org.apache.pulsar', name: 'pulsar-client-auth-sasl', version: pulsarVersion
     compile group: 'io.springfox', name: 'springfox-swagger2', version: swagger2Version
     compile group: 'io.springfox', name: 'springfox-swagger-ui', version: swaggeruiVersion
     compile group: 'org.apache.pulsar', name: 'pulsar-broker', version: brokerVersion
@@ -141,6 +146,6 @@ dependencies {
     compileOnly group: 'org.springframework.boot', name: 'spring-boot-devtools', version: springBootVersion
     testCompile group: 'org.springframework.boot', name: 'spring-boot-starter-test', version: springBootVersion
     testCompile group: 'org.mockito', name: 'mockito-core', version: mockitoVersion
-    testCompile group: 'org.powermock', name: 'powermock-api-mockito', version: apiMockitoVersion
+    testCompile group: 'org.powermock', name: 'powermock-api-mockito2', version: apiMockitoVersion
     testCompile group: 'org.powermock', name: 'powermock-module-junit4', version: mockitoJunit4Version
 }
diff --git a/gradle.properties b/gradle.properties
index 5679029..5165d5a 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -8,7 +8,7 @@ jsonWebTokenApiVersion=0.10.5
 jsonWebTokenImplVersion=0.10.5
 lombokVersion=1.18.10
 pageHelperVersion=1.2.4
-mockitoVersion=1.10.19
+mockitoVersion=2.8.47
 guavaVersion=21.0
 pulsarVersion=2.5.2
 swagger2Version=2.9.2
diff --git a/src/main/java/org/apache/pulsar/manager/PulsarApplicationListener.java b/src/main/java/org/apache/pulsar/manager/PulsarApplicationListener.java
index 0391718..4cc1c40 100644
--- a/src/main/java/org/apache/pulsar/manager/PulsarApplicationListener.java
+++ b/src/main/java/org/apache/pulsar/manager/PulsarApplicationListener.java
@@ -14,19 +14,18 @@
 package org.apache.pulsar.manager;
 
 import com.github.pagehelper.Page;
-import com.google.common.collect.Maps;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.manager.entity.EnvironmentEntity;
 import org.apache.pulsar.manager.entity.EnvironmentsRepository;
-import org.apache.pulsar.manager.utils.HttpUtil;
+import org.apache.pulsar.manager.service.PulsarAdminService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.ApplicationListener;
 import org.springframework.context.event.ContextRefreshedEvent;
 import org.springframework.stereotype.Component;
 
-import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -39,17 +38,18 @@ public class PulsarApplicationListener implements ApplicationListener<ContextRef
 
     private final EnvironmentsRepository environmentsRepository;
 
+    private final PulsarAdminService pulsarAdminService;
+
     @Value("${default.environment.name}")
     private String defaultEnvironmentName;
 
     @Value("${default.environment.service_url}")
     private String defaultEnvironmentServiceUrl;
 
-    @Value("${backend.jwt.token}")
-    private String pulsarJwtToken;
-
-    public PulsarApplicationListener(EnvironmentsRepository environmentsRepository) {
+    @Autowired
+    public PulsarApplicationListener(EnvironmentsRepository environmentsRepository, PulsarAdminService pulsarAdminService) {
         this.environmentsRepository = environmentsRepository;
+        this.pulsarAdminService = pulsarAdminService;
     }
 
     @Override
@@ -65,26 +65,24 @@ public class PulsarApplicationListener implements ApplicationListener<ContextRef
                     && defaultEnvironmentName.length() > 0
                     && defaultEnvironmentServiceUrl.length() > 0
                     && !environmentEntityOptional.isPresent()) {
-                Map<String, String> header = Maps.newHashMap();
-                if (StringUtils.isNotBlank(pulsarJwtToken)) {
-                    header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
-                }
-                String httpTestResult = HttpUtil.doGet(defaultEnvironmentServiceUrl + "/metrics", header);
-                if (httpTestResult != null) {
-                    EnvironmentEntity environmentEntity = new EnvironmentEntity();
-                    environmentEntity.setBroker(defaultEnvironmentServiceUrl);
-                    environmentEntity.setName(defaultEnvironmentName);
-                    environmentsRepository.save(environmentEntity);
-                    log.info("Successfully added a default environment: name = {}, service_url = {}.",
-                            defaultEnvironmentName, defaultEnvironmentServiceUrl);
-                } else {
+                try {
+                    pulsarAdminService.clusters(defaultEnvironmentServiceUrl).getClusters();
+                } catch (PulsarAdminException e) {
+                    log.error("Failed to get clusters list.", e);
                     log.error("Unable to connect default environment {} via {}, " +
-                            "please check if `environment.default.name` " +
-                            "and `environment.default.broker` are set correctly, " +
-                            "environmentDefaultName, environmentDefaultBroker",
+                                    "please check if `environment.default.name` " +
+                                    "and `environment.default.broker` are set correctly, " +
+                                    "environmentDefaultName, environmentDefaultBroker",
                             defaultEnvironmentName, defaultEnvironmentServiceUrl);
                     System.exit(-1);
                 }
+
+                EnvironmentEntity environmentEntity = new EnvironmentEntity();
+                environmentEntity.setBroker(defaultEnvironmentServiceUrl);
+                environmentEntity.setName(defaultEnvironmentName);
+                environmentsRepository.save(environmentEntity);
+                log.info("Successfully added a default environment: name = {}, service_url = {}.",
+                        defaultEnvironmentName, defaultEnvironmentServiceUrl);
             } else {
                 log.warn("The default environment already exists.");
             }
diff --git a/src/main/java/org/apache/pulsar/manager/controller/BrokerStatsController.java b/src/main/java/org/apache/pulsar/manager/controller/BrokerStatsController.java
index 5f06089..68366f6 100644
--- a/src/main/java/org/apache/pulsar/manager/controller/BrokerStatsController.java
+++ b/src/main/java/org/apache/pulsar/manager/controller/BrokerStatsController.java
@@ -64,18 +64,4 @@ public class BrokerStatsController {
         String result = brokerStatsService.forwardBrokerStatsMetrics(broker, requestHost);
         return ResponseEntity.ok(result);
     }
-
-    @ApiOperation(value = "Get the broker stats topics")
-    @ApiResponses({
-            @ApiResponse(code = 200, message = "ok"),
-            @ApiResponse(code = 500, message = "Internal server error")
-    })
-    @RequestMapping(value = "/broker-stats/topics", method =  RequestMethod.GET)
-    public ResponseEntity<String> getBrokerStatsTopics(
-            @RequestParam() String broker) {
-        String requestHost = environmentCacheService.getServiceUrl(request);
-        String result = brokerStatsService.forwardBrokerStatsTopics(broker, requestHost);
-        return ResponseEntity.ok(result);
-    }
-
 }
diff --git a/src/main/java/org/apache/pulsar/manager/controller/EnvironmentsController.java b/src/main/java/org/apache/pulsar/manager/controller/EnvironmentsController.java
index 017dcd3..0b050a0 100644
--- a/src/main/java/org/apache/pulsar/manager/controller/EnvironmentsController.java
+++ b/src/main/java/org/apache/pulsar/manager/controller/EnvironmentsController.java
@@ -14,6 +14,8 @@
 package org.apache.pulsar.manager.controller;
 
 import com.google.common.collect.Maps;
+
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.manager.entity.EnvironmentEntity;
 import org.apache.pulsar.manager.entity.EnvironmentsRepository;
 import org.apache.pulsar.manager.entity.RoleBindingEntity;
@@ -25,18 +27,18 @@ import org.apache.pulsar.manager.entity.TenantsRepository;
 import org.apache.pulsar.manager.entity.UserInfoEntity;
 import org.apache.pulsar.manager.entity.UsersRepository;
 import org.apache.pulsar.manager.service.EnvironmentCacheService;
+import org.apache.pulsar.manager.service.PulsarAdminService;
 import org.apache.pulsar.manager.service.RolesService;
-import org.apache.pulsar.manager.utils.HttpUtil;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.manager.utils.ResourceType;
 import org.hibernate.validator.constraints.Range;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.http.ResponseEntity;
 import org.springframework.validation.annotation.Validated;
 import org.springframework.web.bind.annotation.RequestBody;
@@ -61,8 +63,7 @@ import java.util.Optional;
 @RestController
 public class EnvironmentsController {
 
-    @Value("${backend.jwt.token}")
-    private String pulsarJwtToken;
+    private static final Logger log = LoggerFactory.getLogger(EnvironmentsController.class);
 
     private final EnvironmentsRepository environmentsRepository;
 
@@ -78,8 +79,11 @@ public class EnvironmentsController {
 
     private final RolesService rolesService;
 
+    private final PulsarAdminService pulsarAdminService;
+
     private final HttpServletRequest request;
 
+    @Autowired
     public EnvironmentsController(
             HttpServletRequest request,
             EnvironmentsRepository environmentsRepository,
@@ -88,7 +92,8 @@ public class EnvironmentsController {
             TenantsRepository tenantsRepository,
             RolesRepository rolesRepository,
             RoleBindingRepository roleBindingRepository,
-            RolesService rolesService) {
+            RolesService rolesService,
+            PulsarAdminService pulsarAdminService) {
         this.environmentsRepository = environmentsRepository;
         this.environmentCacheService = environmentCacheService;
         this.request = request;
@@ -97,6 +102,7 @@ public class EnvironmentsController {
         this.rolesRepository = rolesRepository;
         this.roleBindingRepository = roleBindingRepository;
         this.rolesService = rolesService;
+        this.pulsarAdminService = pulsarAdminService;
     }
 
     @ApiOperation(value = "Get the list of existing environments, support paging, the default is 10 per page")
@@ -183,12 +189,10 @@ public class EnvironmentsController {
             result.put("error", "Environment is exist");
             return ResponseEntity.ok(result);
         }
-        Map<String, String> header = Maps.newHashMap();
-        if (StringUtils.isNotBlank(pulsarJwtToken)) {
-            header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
-        }
-        String httpTestResult = HttpUtil.doGet(environmentEntity.getBroker() + "/metrics", header);
-        if (httpTestResult == null) {
+        try {
+            pulsarAdminService.clusters(environmentEntity.getBroker()).getClusters();
+        } catch (PulsarAdminException e) {
+            log.error("Failed to get clusters list.", e);
             result.put("error", "This environment is error. Please check it");
             return ResponseEntity.ok(result);
         }
@@ -217,12 +221,10 @@ public class EnvironmentsController {
             result.put("error", "Environment no exist");
             return ResponseEntity.ok(result);
         }
-        Map<String, String> header = Maps.newHashMap();
-        if (StringUtils.isNotBlank(pulsarJwtToken)) {
-            header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
-        }
-        String httpTestResult = HttpUtil.doGet(environmentEntity.getBroker() + "/metrics", header);
-        if (httpTestResult == null) {
+        try {
+            pulsarAdminService.clusters(environmentEntity.getBroker()).getClusters();
+        } catch (PulsarAdminException e) {
+            log.error("Failed to get clusters list.", e);
             result.put("error", "This environment is error. Please check it");
             return ResponseEntity.ok(result);
         }
diff --git a/src/main/java/org/apache/pulsar/manager/controller/TopicsController.java b/src/main/java/org/apache/pulsar/manager/controller/TopicsController.java
index fd1db13..30da4f5 100644
--- a/src/main/java/org/apache/pulsar/manager/controller/TopicsController.java
+++ b/src/main/java/org/apache/pulsar/manager/controller/TopicsController.java
@@ -14,14 +14,6 @@
 package org.apache.pulsar.manager.controller;
 
 import com.google.common.collect.Maps;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminBuilder;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.BatchMessageIdImpl;
-import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.manager.service.EnvironmentCacheService;
 import org.apache.pulsar.manager.service.TopicsService;
 import io.swagger.annotations.Api;
@@ -43,7 +35,6 @@ import org.springframework.web.bind.annotation.RestController;
 import javax.servlet.http.HttpServletRequest;
 import javax.validation.constraints.Min;
 import javax.validation.constraints.Size;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -60,21 +51,9 @@ public class TopicsController {
     private final EnvironmentCacheService environmentCacheService;
     private final HttpServletRequest request;
 
-    @Value("${backend.jwt.token}")
-    private String token;
-
     @Value("${pulsar.peek.message}")
     private boolean peekMessage;
 
-    @Value("${tls.enabled}")
-    private boolean tlsEnabled;
-
-    @Value("${tls.hostname.verifier}")
-    private boolean tlsHostnameVerifier;
-
-    @Value("${tls.pulsar.admin.ca-certs}")
-    private String tlsPulsarAdminCaCerts;
-
     @Autowired
     public TopicsController(
             TopicsService topicsService,
@@ -161,53 +140,10 @@ public class TopicsController {
                     "turn on option pulsar.peek.message in file application.properties");
             return ResponseEntity.ok(result);
         }
-        PulsarAdmin pulsarAdmin = null;
         // to do check permission for non super, waiting for https://github.com/apache/pulsar-manager/pull/238
-        try {
-            PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder();
-            if (tlsEnabled) {
-                pulsarAdminBuilder
-                        .tlsTrustCertsFilePath(tlsPulsarAdminCaCerts)
-                        .enableTlsHostnameVerification(tlsHostnameVerifier);
-            }
-            if (token != null && token.length() > 0) {
-                pulsarAdminBuilder.authentication(AuthenticationFactory.token(token));
-            }
-            pulsarAdmin = pulsarAdminBuilder.serviceHttpUrl(requestHost).build();
-            String topicFullPath = persistent + "://" + tenant + "/" + namespace + "/" + topic;
-            List<Message<byte[]>> messages = pulsarAdmin.topics().peekMessages(topicFullPath, subName, messagePosition);
-            List<Map<String, Object>> mapList = new ArrayList<>();
-            for (Message<byte[]> msg: messages) {
-                Map<String, Object> message = Maps.newHashMap();
-                if (msg.getMessageId() instanceof BatchMessageIdImpl) {
-                    BatchMessageIdImpl msgId = (BatchMessageIdImpl) msg.getMessageId();
-                    message.put("ledgerId", msgId.getLedgerId());
-                    message.put("entryId", msgId.getEntryId());
-                    message.put("batchIndex", msgId.getBatchIndex());
-                    message.put("batch", true);
-                } else {
-                    MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
-                    message.put("batch", false);
-                    message.put("ledgerId", msgId.getLedgerId());
-                    message.put("entryId", msgId.getEntryId());
-                }
-                if (msg.getProperties().size() > 0) {
-                    msg.getProperties().forEach((k, v) -> {
-                        message.put(k, v);
-                    });
-                }
-                message.put("data", msg.getData());
-                mapList.add(message);
-            }
-            result.put("data", mapList);
-        } catch (PulsarClientException clientException) {
-            result.put("error", clientException.getMessage());
-        } catch (PulsarAdminException adminException) {
-            result.put("error", adminException.getMessage());
-        }
-        if (pulsarAdmin != null) {
-            pulsarAdmin.close();
-        }
+        List<Map<String, Object>> mapList = topicsService.peekMessages(
+                persistent, tenant, namespace, topic, subName, messagePosition, requestHost);
+        result.put("data", mapList);
         return ResponseEntity.ok(result);
     }
-}
\ No newline at end of file
+}
diff --git a/src/main/java/org/apache/pulsar/manager/controller/exception/PulsarAdminOperationException.java b/src/main/java/org/apache/pulsar/manager/controller/exception/PulsarAdminOperationException.java
new file mode 100644
index 0000000..577b09f
--- /dev/null
+++ b/src/main/java/org/apache/pulsar/manager/controller/exception/PulsarAdminOperationException.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.manager.controller.exception;
+
+@SuppressWarnings("serial")
+public class PulsarAdminOperationException extends RuntimeException {
+    public PulsarAdminOperationException(String message) {
+        super(message);
+    }
+}
diff --git a/src/main/java/org/apache/pulsar/manager/service/BrokerStatsService.java b/src/main/java/org/apache/pulsar/manager/service/BrokerStatsService.java
index 0d298ce..ed9f075 100644
--- a/src/main/java/org/apache/pulsar/manager/service/BrokerStatsService.java
+++ b/src/main/java/org/apache/pulsar/manager/service/BrokerStatsService.java
@@ -22,8 +22,6 @@ public interface BrokerStatsService {
 
     String forwardBrokerStatsMetrics(String broker, String requestHost);
 
-    String forwardBrokerStatsTopics(String broker, String requestHost);
-
     void collectStatsToDB(long unixTime, String environment, String cluster, String serviceUrl);
 
     void clearStats(long nowTime, long timeInterval);
diff --git a/src/main/java/org/apache/pulsar/manager/service/PulsarAdminService.java b/src/main/java/org/apache/pulsar/manager/service/PulsarAdminService.java
new file mode 100644
index 0000000..ba2838f
--- /dev/null
+++ b/src/main/java/org/apache/pulsar/manager/service/PulsarAdminService.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.manager.service;
+
+import java.util.Map;
+
+import org.apache.pulsar.client.admin.BrokerStats;
+import org.apache.pulsar.client.admin.Brokers;
+import org.apache.pulsar.client.admin.Clusters;
+import org.apache.pulsar.client.admin.Namespaces;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.Tenants;
+import org.apache.pulsar.client.admin.Topics;
+
+public interface PulsarAdminService {
+    PulsarAdmin getPulsarAdmin(String url);
+    BrokerStats brokerStats(String url);
+    Clusters clusters(String url);
+    Brokers brokers(String url);
+    Tenants tenants(String url);
+    Namespaces namespaces(String url);
+    Topics topics(String url);
+    Map<String, String> getAuthHeader(String url);
+}
diff --git a/src/main/java/org/apache/pulsar/manager/service/TopicsService.java b/src/main/java/org/apache/pulsar/manager/service/TopicsService.java
index 4ae0cdc..08094cd 100644
--- a/src/main/java/org/apache/pulsar/manager/service/TopicsService.java
+++ b/src/main/java/org/apache/pulsar/manager/service/TopicsService.java
@@ -28,4 +28,10 @@ public interface TopicsService {
 
     List<Map<String, Object>> getTopicsStatsList(String env, String tenant, String namespace,
                                                  String persistent, List<Map<String, String>> topics);
-}
\ No newline at end of file
+
+    List<Map<String, Object>> peekMessages(
+            String persistent, String tenant,
+            String namespace, String topic,
+            String subName, Integer messagePosition,
+            String requestHost);
+}
diff --git a/src/main/java/org/apache/pulsar/manager/service/impl/BrokerStatsServiceImpl.java b/src/main/java/org/apache/pulsar/manager/service/impl/BrokerStatsServiceImpl.java
index c7a1037..99fa696 100644
--- a/src/main/java/org/apache/pulsar/manager/service/impl/BrokerStatsServiceImpl.java
+++ b/src/main/java/org/apache/pulsar/manager/service/impl/BrokerStatsServiceImpl.java
@@ -15,14 +15,17 @@ package org.apache.pulsar.manager.service.impl;
 
 import com.github.pagehelper.Page;
 import com.google.gson.Gson;
+import com.google.gson.JsonObject;
 import com.google.gson.reflect.TypeToken;
 
 import java.text.DecimalFormat;
+
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.manager.controller.exception.PulsarAdminOperationException;
 import org.apache.pulsar.manager.service.BrokerStatsService;
 import org.apache.pulsar.manager.service.BrokersService;
 import org.apache.pulsar.manager.service.ClustersService;
-import org.apache.pulsar.manager.service.EnvironmentCacheService;
-import org.apache.pulsar.manager.utils.HttpUtil;
+import org.apache.pulsar.manager.service.PulsarAdminService;
 import org.apache.pulsar.manager.entity.ConsumerStatsEntity;
 import org.apache.pulsar.manager.entity.ConsumersStatsRepository;
 import org.apache.pulsar.manager.entity.EnvironmentEntity;
@@ -40,7 +43,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,9 +66,6 @@ public class BrokerStatsServiceImpl implements BrokerStatsService {
     @Value("${backend.directRequestHost}")
     private String directRequestHost;
 
-    @Value("${backend.jwt.token}")
-    private String pulsarJwtToken;
-
     @Value("${clear.stats.interval}")
     private Long clearStatsInterval;
 
@@ -78,8 +77,7 @@ public class BrokerStatsServiceImpl implements BrokerStatsService {
     private final PublishersStatsRepository publishersStatsRepository;
     private final ReplicationsStatsRepository replicationsStatsRepository;
     private final ConsumersStatsRepository consumersStatsRepository;
-
-    private static final Map<String, String> header = new HashMap<String, String>();
+    private final PulsarAdminService pulsarAdminService;
 
     @Autowired
     public BrokerStatsServiceImpl(
@@ -91,7 +89,7 @@ public class BrokerStatsServiceImpl implements BrokerStatsService {
             PublishersStatsRepository publishersStatsRepository,
             ReplicationsStatsRepository replicationsStatsRepository,
             ConsumersStatsRepository consumersStatsRepository,
-            EnvironmentCacheService environmentCache) {
+            PulsarAdminService pulsarAdminService) {
         this.environmentsRepository = environmentsRepository;
         this.clustersService = clustersService;
         this.brokersService = brokersService;
@@ -100,23 +98,19 @@ public class BrokerStatsServiceImpl implements BrokerStatsService {
         this.publishersStatsRepository = publishersStatsRepository;
         this.replicationsStatsRepository = replicationsStatsRepository;
         this.consumersStatsRepository = consumersStatsRepository;
+        this.pulsarAdminService = pulsarAdminService;
     }
 
     public String forwardBrokerStatsMetrics(String broker, String requestHost) {
-        if (StringUtils.isNotBlank(pulsarJwtToken)) {
-            header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
-        }
-
         broker = checkServiceUrl(broker, requestHost);
-        return HttpUtil.doGet(broker + "/admin/v2/broker-stats/metrics", header);
-    }
-
-    public String forwardBrokerStatsTopics(String broker, String requestHost) {
-        if (StringUtils.isNotBlank(pulsarJwtToken)) {
-            header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
+        try {
+            return pulsarAdminService.brokerStats(broker).getMetrics().toString();
+        } catch(PulsarAdminException e) {
+            PulsarAdminOperationException pulsarAdminOperationException
+                    = new PulsarAdminOperationException("Failed to get broker metrics.");
+            log.error(pulsarAdminOperationException.getMessage(), e);
+            throw pulsarAdminOperationException;
         }
-        broker = checkServiceUrl(broker, requestHost);
-        return HttpUtil.doGet(broker + "/admin/v2/broker-stats/topics", header);
     }
 
     @Scheduled(initialDelayString = "${init.delay.interval}", fixedDelayString = "${insert.stats.interval}")
@@ -165,16 +159,19 @@ public class BrokerStatsServiceImpl implements BrokerStatsService {
     }
 
     public void collectStatsToDB(long unixTime, String env, String cluster, String serviceUrl) {
-        if (StringUtils.isNotBlank(pulsarJwtToken)) {
-            header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
-        }
         Map<String, Object> brokerObject = brokersService.getBrokersList(0, 0, cluster, serviceUrl);
         List<HashMap<String, Object>> brokerLists = (List<HashMap<String, Object>>) brokerObject.get("data");
         brokerLists.forEach((brokerMap) -> {
             String tempBroker = (String) brokerMap.get("broker");
             // TODO: handle other protocols
             String broker = "http://" + tempBroker;
-            String result = HttpUtil.doGet(broker + "/admin/v2/broker-stats/topics", header);
+            JsonObject result;
+            try {
+                result = pulsarAdminService.brokerStats(broker).getTopics();
+            } catch(PulsarAdminException e) {
+                log.error("Failed to get broker metrics.", e);
+                return;
+            }
             Gson gson = new Gson();
             HashMap<String, HashMap<String, HashMap<String, HashMap<String, PulsarManagerTopicStats>>>> brokerStatsTopicEntity = gson.fromJson(result,
                 new TypeToken<HashMap<String, HashMap<String, HashMap<String, HashMap<String, PulsarManagerTopicStats>>>>>() {
diff --git a/src/main/java/org/apache/pulsar/manager/service/impl/BrokersServiceImpl.java b/src/main/java/org/apache/pulsar/manager/service/impl/BrokersServiceImpl.java
index 536d428..e139695 100644
--- a/src/main/java/org/apache/pulsar/manager/service/impl/BrokersServiceImpl.java
+++ b/src/main/java/org/apache/pulsar/manager/service/impl/BrokersServiceImpl.java
@@ -14,11 +14,14 @@
 package org.apache.pulsar.manager.service.impl;
 
 import com.google.common.collect.Maps;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.Gson;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.FailureDomain;
+import org.apache.pulsar.manager.controller.exception.PulsarAdminOperationException;
 import org.apache.pulsar.manager.service.BrokersService;
-import org.apache.pulsar.manager.utils.HttpUtil;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.manager.service.PulsarAdminService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
@@ -29,28 +32,43 @@ import java.util.Map;
 @Service
 public class BrokersServiceImpl implements BrokersService {
 
+    private static final Logger log = LoggerFactory.getLogger(BrokersServiceImpl.class);
+
     @Value("${backend.directRequestBroker}")
     private boolean directRequestBroker;
 
-    @Value("${backend.jwt.token}")
-    private String pulsarJwtToken;
+    private final PulsarAdminService pulsarAdminService;
+
+    @Autowired
+    public BrokersServiceImpl(PulsarAdminService pulsarAdminService) {
+        this.pulsarAdminService = pulsarAdminService;
+    }
 
 
     public Map<String, Object> getBrokersList(Integer pageNum, Integer pageSize, String cluster, String requestHost) {
         Map<String, Object> brokersMap = Maps.newHashMap();
         List<Map<String, Object>> brokersArray = new ArrayList<>();
         if (directRequestBroker) {
-            Gson gson = new Gson();
-            Map<String, String> header = Maps.newHashMap();
-            if (StringUtils.isNotBlank(pulsarJwtToken)) {
-                header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
+            Map<String, FailureDomain> failureDomains;
+            try {
+                failureDomains = pulsarAdminService.clusters(requestHost).getFailureDomains(cluster);
+            } catch (PulsarAdminException e) {
+                PulsarAdminOperationException pulsarAdminOperationException
+                        = new PulsarAdminOperationException("Failed to get failureDomains list.");
+                log.error(pulsarAdminOperationException.getMessage(), e);
+                throw pulsarAdminOperationException;
             }
-            String failureDomainsResult = HttpUtil.doGet(
-                    requestHost + "/admin/v2/clusters/" + cluster + "/failureDomains", header);
-            Map<String, Map<String, List<String>>> failureDomains = gson.fromJson(
-                    failureDomainsResult, new TypeToken<Map<String, Map<String, List<String>>>>() {}.getType());
-            String result = HttpUtil.doGet(requestHost + "/admin/v2/brokers/" + cluster, header);
-            List<String> brokersList = gson.fromJson(result, new TypeToken<List<String>>() {}.getType());
+
+            List<String>  brokersList;
+            try {
+                brokersList = pulsarAdminService.brokers(requestHost).getActiveBrokers(cluster);
+            } catch (PulsarAdminException e) {
+                PulsarAdminOperationException pulsarAdminOperationException
+                        = new PulsarAdminOperationException("Failed to get brokers list.");
+                log.error(pulsarAdminOperationException.getMessage(), e);
+                throw pulsarAdminOperationException;
+            }
+
             for (String broker: brokersList) {
                 Map<String, Object> brokerEntity = Maps.newHashMap();
                 List<String> failureDomain = this.getFailureDomain(broker, failureDomains);
@@ -67,15 +85,12 @@ public class BrokersServiceImpl implements BrokersService {
         return brokersMap;
     }
 
-    private List<String> getFailureDomain(String broker, Map<String, Map<String, List<String>>> failureDomains) {
+    private List<String> getFailureDomain(String broker, Map<String, FailureDomain> failureDomains) {
         List<String> failureDomainsList = new ArrayList<>();
         for (String failureDomain: failureDomains.keySet()) {
-            Map<String, List<String>> domains = failureDomains.get(failureDomain);
-            for (String domain: domains.keySet()) {
-                List<String> domainList = domains.get(domain);
-                if (domainList.contains(broker)) {
-                    failureDomainsList.add(failureDomain);
-                }
+            FailureDomain domain = failureDomains.get(failureDomain);
+            if (domain.getBrokers().contains(broker)) {
+                failureDomainsList.add(failureDomain);
             }
         }
         return failureDomainsList;
diff --git a/src/main/java/org/apache/pulsar/manager/service/impl/ClustersServiceImpl.java b/src/main/java/org/apache/pulsar/manager/service/impl/ClustersServiceImpl.java
index 0162658..636cc77 100644
--- a/src/main/java/org/apache/pulsar/manager/service/impl/ClustersServiceImpl.java
+++ b/src/main/java/org/apache/pulsar/manager/service/impl/ClustersServiceImpl.java
@@ -14,15 +14,17 @@
 package org.apache.pulsar.manager.service.impl;
 
 import com.google.common.collect.Maps;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.Gson;
+
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.manager.controller.exception.PulsarAdminOperationException;
 import org.apache.pulsar.manager.service.BrokersService;
 import org.apache.pulsar.manager.service.ClustersService;
-import org.apache.pulsar.manager.utils.HttpUtil;
+import org.apache.pulsar.manager.service.PulsarAdminService;
 import java.util.function.Function;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
@@ -34,20 +36,21 @@ import java.util.Map;
 @Service
 public class ClustersServiceImpl implements ClustersService {
 
+    private static final Logger log = LoggerFactory.getLogger(ClustersServiceImpl.class);
+
     @Value("${backend.directRequestBroker}")
     private boolean directRequestBroker;
 
-    @Value("${backend.jwt.token}")
-    private String pulsarJwtToken;
-
     private final BrokersService brokersService;
 
+    private final PulsarAdminService pulsarAdminService;
+
     @Autowired
-    public ClustersServiceImpl(BrokersService brokersService) {
+    public ClustersServiceImpl(BrokersService brokersService, PulsarAdminService pulsarAdminService) {
         this.brokersService = brokersService;
+        this.pulsarAdminService = pulsarAdminService;
     }
 
-
     public Map<String, Object> getClustersList(Integer pageNum,
                                                Integer pageSize,
                                                String envServiceUrl,
@@ -55,13 +58,15 @@ public class ClustersServiceImpl implements ClustersService {
         Map<String, Object> clustersMap = Maps.newHashMap();
         List<Map<String, Object>> clustersArray = new ArrayList<>();
         if (directRequestBroker) {
-            Gson gson = new Gson();
-            Map<String, String> header = Maps.newHashMap();
-            if (StringUtils.isNotBlank(pulsarJwtToken)) {
-                header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
+            List<String> clustersList;
+            try {
+                clustersList = pulsarAdminService.clusters(envServiceUrl).getClusters();
+            } catch (PulsarAdminException e) {
+                PulsarAdminOperationException pulsarAdminOperationException
+                        = new PulsarAdminOperationException("Failed to get clusters list.");
+                log.error(pulsarAdminOperationException.getMessage(), e);
+                throw pulsarAdminOperationException;
             }
-            String result = HttpUtil.doGet(envServiceUrl + "/admin/v2/clusters", header);
-            List<String> clustersList = gson.fromJson(result, new TypeToken<List<String>>(){}.getType());
             for (String cluster: clustersList) {
                 String clusterServiceUrl =
                     serviceUrlProvider == null ? envServiceUrl : serviceUrlProvider.apply(cluster);
@@ -70,8 +75,15 @@ public class ClustersServiceImpl implements ClustersService {
                     brokersService.getBrokersList(1, 1, cluster, clusterServiceUrl);
                 clusterEntity.put("brokers", brokers.get("total"));
                 clusterEntity.put("cluster", cluster);
-                String clusterInfo = HttpUtil.doGet(clusterServiceUrl + "/admin/v2/clusters/" + cluster, header);
-                ClusterData clusterData = gson.fromJson(clusterInfo, ClusterData.class);
+                ClusterData clusterData;
+                try {
+                    clusterData = pulsarAdminService.clusters(clusterServiceUrl).getCluster(cluster);
+                } catch (PulsarAdminException e) {
+                    PulsarAdminOperationException pulsarAdminOperationException
+                            = new PulsarAdminOperationException("Failed to get cluster data.");
+                    log.error(pulsarAdminOperationException.getMessage(), e);
+                    throw pulsarAdminOperationException;
+                }
                 clusterEntity.put("serviceUrl", clusterData.getServiceUrl());
                 clusterEntity.put("serviceUrlTls", clusterData.getServiceUrlTls());
                 clusterEntity.put("brokerServiceUrl", clusterData.getBrokerServiceUrl());
@@ -88,13 +100,16 @@ public class ClustersServiceImpl implements ClustersService {
     }
 
     public List<String> getClusterByAnyBroker(String requestHost) {
-        Gson gson = new Gson();
-        Map<String, String> header = Maps.newHashMap();
-        if (StringUtils.isNotBlank(pulsarJwtToken)) {
-            header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
+        List<String> clustersList;
+        try {
+            clustersList = pulsarAdminService.clusters(requestHost).getClusters();
+        } catch (PulsarAdminException e) {
+            PulsarAdminOperationException pulsarAdminOperationException
+                    = new PulsarAdminOperationException("Failed to get clusters list.");
+            log.error(pulsarAdminOperationException.getMessage(), e);
+            throw pulsarAdminOperationException;
         }
-        String result = HttpUtil.doGet(requestHost + "/admin/v2/clusters", header);
-        List<String> clustersList = gson.fromJson(result, new TypeToken<List<String>>(){}.getType());
+
         return clustersList;
     }
 }
diff --git a/src/main/java/org/apache/pulsar/manager/service/impl/EnvironmentCacheServiceImpl.java b/src/main/java/org/apache/pulsar/manager/service/impl/EnvironmentCacheServiceImpl.java
index a0f4b5d..b1290fb 100644
--- a/src/main/java/org/apache/pulsar/manager/service/impl/EnvironmentCacheServiceImpl.java
+++ b/src/main/java/org/apache/pulsar/manager/service/impl/EnvironmentCacheServiceImpl.java
@@ -14,14 +14,15 @@
 package org.apache.pulsar.manager.service.impl;
 
 import com.github.pagehelper.Page;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
+
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.manager.controller.exception.PulsarAdminOperationException;
 import org.apache.pulsar.manager.entity.EnvironmentEntity;
 import org.apache.pulsar.manager.entity.EnvironmentsRepository;
 import org.apache.pulsar.manager.service.EnvironmentCacheService;
-import org.apache.pulsar.manager.utils.HttpUtil;
+import org.apache.pulsar.manager.service.PulsarAdminService;
+
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -33,7 +34,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
 
@@ -44,16 +44,17 @@ import org.springframework.stereotype.Service;
 @Service
 public class EnvironmentCacheServiceImpl implements EnvironmentCacheService {
 
-    @Value("${backend.jwt.token}")
-    private String pulsarJwtToken;
-
     private final EnvironmentsRepository environmentsRepository;
+
     private final Map<String, Map<String, ClusterData>> environments;
 
+    private final PulsarAdminService pulsarAdminService;
+
     @Autowired
-    public EnvironmentCacheServiceImpl(EnvironmentsRepository environmentsRepository) {
+    public EnvironmentCacheServiceImpl(EnvironmentsRepository environmentsRepository, PulsarAdminService pulsarAdminService) {
         this.environmentsRepository = environmentsRepository;
         this.environments = new ConcurrentHashMap<>();
+        this.pulsarAdminService = pulsarAdminService;
     }
 
     @Override
@@ -100,14 +101,6 @@ public class EnvironmentCacheServiceImpl implements EnvironmentCacheService {
         return clusterData.getServiceUrl();
     }
 
-    private Map<String, String> jsonHeader() {
-        Map<String, String> header = Maps.newHashMap();
-        if (StringUtils.isNotBlank(pulsarJwtToken)) {
-            header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
-        }
-        return header;
-    }
-
     @Scheduled(
         initialDelay = 0L,
         fixedDelayString = "${cluster.cache.reload.interval.ms}")
@@ -120,7 +113,11 @@ public class EnvironmentCacheServiceImpl implements EnvironmentCacheService {
         List<EnvironmentEntity> environmentList = environmentPage.getResult();
         while (!environmentList.isEmpty()) {
             environmentList.forEach(env -> {
-                reloadEnvironment(env);
+                try {
+                    reloadEnvironment(env);
+                } catch (PulsarAdminOperationException e) {
+                    log.error(e.getMessage(), e);
+                }
                 newEnvironments.add(env.getName());
             });
             ++pageNum;
@@ -143,13 +140,15 @@ public class EnvironmentCacheServiceImpl implements EnvironmentCacheService {
     }
 
     public void reloadEnvironment(EnvironmentEntity environment) {
-        Gson gson = new Gson();
-        String result = HttpUtil.doGet(
-            environment.getBroker() + "/admin/v2/clusters",
-            jsonHeader()
-        );
-        List<String> clustersList =
-            gson.fromJson(result, new TypeToken<List<String>>(){}.getType());
+        List<String> clustersList;
+        try {
+            clustersList = pulsarAdminService.clusters(environment.getBroker()).getClusters();
+        } catch(PulsarAdminException e) {
+            PulsarAdminOperationException pulsarAdminOperationException
+                    = new PulsarAdminOperationException("Failed to get clusters list.");
+            log.error(pulsarAdminOperationException.getMessage(), e);
+            throw pulsarAdminOperationException;
+        }
         log.info("Reload cluster list for environment {} : {}", environment.getName(), clustersList);
         Set<String> newClusters = Sets.newHashSet(clustersList);
         Map<String, ClusterData> clusterDataMap = environments.computeIfAbsent(
@@ -176,19 +175,15 @@ public class EnvironmentCacheServiceImpl implements EnvironmentCacheService {
     private ClusterData reloadCluster(EnvironmentEntity environment, String cluster) {
         log.info("Reloading cluster data for cluster {} @ environment {} ...",
             cluster, environment.getName());
-        Gson gson = new Gson();
-        String clusterInfoUrl = environment.getBroker() + "/admin/v2/clusters/" + cluster;
-        String result = HttpUtil.doGet(
-            clusterInfoUrl,
-            jsonHeader()
-        );
-        if (null == result) {
-            // fail to fetch the cluster data or the cluster is not found
+        ClusterData clusterData;
+        try {
+            clusterData = pulsarAdminService.clusters(environment.getBroker()).getCluster(cluster);
+        } catch(PulsarAdminException e) {
+            log.error("Failed to get cluster data.", e);
             return null;
         }
-        log.info("Loaded cluster data for cluster {} @ environment {} from {} : {}",
-            cluster, environment.getName(), clusterInfoUrl, result);
-        ClusterData clusterData = gson.fromJson(result, ClusterData.class);
+        log.info("Loaded cluster data for cluster {} @ environment {} : {}",
+            cluster, environment.getName(), clusterData.toString());
         Map<String, ClusterData> clusters = environments.computeIfAbsent(
             environment.getName(),
             (e) -> new ConcurrentHashMap<>());
diff --git a/src/main/java/org/apache/pulsar/manager/service/impl/NamespacesServiceImpl.java b/src/main/java/org/apache/pulsar/manager/service/impl/NamespacesServiceImpl.java
index 630b34d..f5d1eea 100644
--- a/src/main/java/org/apache/pulsar/manager/service/impl/NamespacesServiceImpl.java
+++ b/src/main/java/org/apache/pulsar/manager/service/impl/NamespacesServiceImpl.java
@@ -15,15 +15,17 @@ package org.apache.pulsar.manager.service.impl;
 
 import com.github.pagehelper.Page;
 import com.google.common.collect.Maps;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.Gson;
+
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.manager.controller.exception.PulsarAdminOperationException;
 import org.apache.pulsar.manager.entity.TopicStatsEntity;
 import org.apache.pulsar.manager.entity.TopicsStatsRepository;
 import org.apache.pulsar.manager.service.BrokerStatsService;
 import org.apache.pulsar.manager.service.NamespacesService;
+import org.apache.pulsar.manager.service.PulsarAdminService;
 import org.apache.pulsar.manager.service.TopicsService;
-import org.apache.pulsar.manager.utils.HttpUtil;
-import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
@@ -37,38 +39,42 @@ public class NamespacesServiceImpl implements NamespacesService {
     @Value("${backend.directRequestBroker}")
     private boolean directRequestBroker;
 
-    @Value("${backend.jwt.token}")
-    private String pulsarJwtToken;
+    private static final Logger log = LoggerFactory.getLogger(NamespacesServiceImpl.class);
 
     private final TopicsStatsRepository topicsStatsRepository;
     private final TopicsService topicsService;
     private final HttpServletRequest request;
     private final BrokerStatsService brokerStatsService;
+    private final PulsarAdminService pulsarAdminService;
 
     @Autowired
     public NamespacesServiceImpl(
             TopicsStatsRepository topicsStatsRepository,
             TopicsService topicsService,
             HttpServletRequest request,
-            BrokerStatsService brokerStatsService) {
+            BrokerStatsService brokerStatsService,
+            PulsarAdminService pulsarAdminService) {
         this.topicsStatsRepository = topicsStatsRepository;
         this.topicsService = topicsService;
         this.request = request;
         this.brokerStatsService = brokerStatsService;
+        this.pulsarAdminService = pulsarAdminService;
     }
 
     public Map<String, Object> getNamespaceList(Integer pageNum, Integer pageSize, String tenant, String requestHost) {
         Map<String, Object> namespacesMap = Maps.newHashMap();
         List<Map<String, Object>> namespacesArray = new ArrayList<>();
         if (directRequestBroker) {
-            Gson gson = new Gson();
-            Map<String, String> header = Maps.newHashMap();
-            if (StringUtils.isNotBlank(pulsarJwtToken)) {
-                header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
+            List<String> namespacesList;
+            try {
+                namespacesList = pulsarAdminService.namespaces(requestHost).getNamespaces(tenant);
+            } catch (PulsarAdminException e) {
+                PulsarAdminOperationException pulsarAdminOperationException
+                        = new PulsarAdminOperationException("Failed to get namespaces list.");
+                log.error(pulsarAdminOperationException.getMessage(), e);
+                throw pulsarAdminOperationException;
             }
-            String result = HttpUtil.doGet(requestHost + "/admin/v2/namespaces/" + tenant, header);
-            if (result != null) {
-                List<String> namespacesList = gson.fromJson(result, new TypeToken<List<String>>(){}.getType());
+            if (!namespacesList.isEmpty()) {
                 Optional<TopicStatsEntity> topicStatsEntityOptional = topicsStatsRepository.findMaxTime();
                 Map<String, TopicStatsEntity> topicStatsEntityMap = Maps.newHashMap();
                 if (topicStatsEntityOptional.isPresent()) {
diff --git a/src/main/java/org/apache/pulsar/manager/service/impl/PulsarAdminServiceImpl.java b/src/main/java/org/apache/pulsar/manager/service/impl/PulsarAdminServiceImpl.java
new file mode 100644
index 0000000..1b79c1f
--- /dev/null
+++ b/src/main/java/org/apache/pulsar/manager/service/impl/PulsarAdminServiceImpl.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.manager.service.impl;
+
+import javax.annotation.PreDestroy;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.BrokerStats;
+import org.apache.pulsar.client.admin.Brokers;
+import org.apache.pulsar.client.admin.Clusters;
+import org.apache.pulsar.client.admin.Namespaces;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.Tenants;
+import org.apache.pulsar.client.admin.Topics;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.manager.controller.exception.PulsarAdminOperationException;
+import org.apache.pulsar.manager.service.PulsarAdminService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+@Service
+public class PulsarAdminServiceImpl implements PulsarAdminService {
+
+    private static final Logger log = LoggerFactory.getLogger(PulsarAdminServiceImpl.class);
+
+    @Value("${backend.broker.pulsarAdmin.authPlugin:}")
+    private String authPlugin;
+
+    @Value("${backend.broker.pulsarAdmin.authParams:}")
+    private String authParams;
+
+    @Value("${backend.broker.pulsarAdmin.tlsAllowInsecureConnection:false}")
+    private Boolean tlsAllowInsecureConnection;
+
+    @Value("${backend.broker.pulsarAdmin.tlsTrustCertsFilePath:}")
+    private String tlsTrustCertsFilePath;
+
+    @Value("${backend.broker.pulsarAdmin.tlsEnableHostnameVerification:false}")
+    private Boolean tlsEnableHostnameVerification;
+
+    @Value("${backend.jwt.token:}")
+    private String pulsarJwtToken;
+
+    private Map<String, PulsarAdmin> pulsarAdmins = new HashMap<>();
+
+    @PreDestroy
+    public void destroy() {
+        pulsarAdmins.values().forEach(value -> value.close());
+    }
+
+    public synchronized PulsarAdmin getPulsarAdmin(String url) {
+        if (!pulsarAdmins.containsKey(url)) {
+            pulsarAdmins.put(url, this.createPulsarAdmin(url));
+        }
+        return pulsarAdmins.get(url);
+    }
+
+    public BrokerStats brokerStats(String url) {
+        return getPulsarAdmin(url).brokerStats();
+    }
+
+    public Clusters clusters(String url) {
+        return getPulsarAdmin(url).clusters();
+    }
+
+    public Brokers brokers(String url) {
+        return getPulsarAdmin(url).brokers();
+    }
+
+    public Tenants tenants(String url) {
+        return getPulsarAdmin(url).tenants();
+    }
+
+    public Namespaces namespaces(String url) {
+        return getPulsarAdmin(url).namespaces();
+    }
+
+    public Topics topics(String url) {
+        return getPulsarAdmin(url).topics();
+    }
+
+    public Map<String, String> getAuthHeader(String url) {
+        Authentication authentication = getPulsarAdmin(url).getClientConfigData().getAuthentication();
+        Map<String, String> result = new HashMap<>();
+
+        try {
+            CompletableFuture<Map<String, String>> authFuture = new CompletableFuture<>();
+            AuthenticationDataProvider authData = authentication.getAuthData(new URL(url).getHost());
+            if (authData.hasDataForHttp()) {
+                authentication.authenticationStage(url, authData, null, authFuture);
+            } else {
+                return result;
+            }
+
+            try {
+                Map<String, String> responseHeader = authFuture.get();
+                Set<Map.Entry<String, String>> headers =
+                        authentication.newRequestHeader(url, authData, responseHeader);
+                if (headers != null) {
+                    headers.forEach(entry -> result.put(entry.getKey(), entry.getValue()));
+                }
+            } catch (Exception e) {
+                log.error("Failed to get headers", e);
+            }
+        } catch (Exception e) {
+            log.error("Failed to run getAuthHeader", e);
+        }
+
+        return result;
+    }
+
+    private PulsarAdmin createPulsarAdmin(String url) {
+        try {
+            log.info("Create Pulsar Admin instance. url={}, authPlugin={}, authParams={}, tlsAllowInsecureConnection={}, tlsTrustCertsFilePath={}, tlsEnableHostnameVerification={}",
+                    url, authPlugin, authParams, tlsAllowInsecureConnection, tlsTrustCertsFilePath, tlsEnableHostnameVerification);
+            PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder();
+            pulsarAdminBuilder.serviceHttpUrl(url);
+            if (StringUtils.isNotBlank(pulsarJwtToken)) {
+                pulsarAdminBuilder.authentication(AuthenticationFactory.token(pulsarJwtToken));
+            } else {
+                pulsarAdminBuilder.authentication(authPlugin, authParams);
+            }
+            pulsarAdminBuilder.allowTlsInsecureConnection(tlsAllowInsecureConnection);
+            pulsarAdminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
+            pulsarAdminBuilder.enableTlsHostnameVerification(tlsEnableHostnameVerification);
+            return pulsarAdminBuilder.build();
+        } catch (PulsarClientException e) {
+            PulsarAdminOperationException pulsarAdminOperationException = new PulsarAdminOperationException("Failed to create Pulsar Admin instance.");
+            log.error(pulsarAdminOperationException.getMessage(), e);
+            throw pulsarAdminOperationException;
+        }
+    }
+}
diff --git a/src/main/java/org/apache/pulsar/manager/service/impl/TenantsServiceImpl.java b/src/main/java/org/apache/pulsar/manager/service/impl/TenantsServiceImpl.java
index 9eeeda6..ee2bdd6 100644
--- a/src/main/java/org/apache/pulsar/manager/service/impl/TenantsServiceImpl.java
+++ b/src/main/java/org/apache/pulsar/manager/service/impl/TenantsServiceImpl.java
@@ -16,14 +16,14 @@ package org.apache.pulsar.manager.service.impl;
 import com.github.pagehelper.Page;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.Gson;
+
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.manager.controller.exception.PulsarAdminOperationException;
 import org.apache.pulsar.manager.entity.TopicStatsEntity;
 import org.apache.pulsar.manager.entity.TopicsStatsRepository;
 import org.apache.pulsar.manager.service.BrokerStatsService;
+import org.apache.pulsar.manager.service.PulsarAdminService;
 import org.apache.pulsar.manager.service.TenantsService;
-import org.apache.pulsar.manager.utils.HttpUtil;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,7 +32,6 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
 import javax.servlet.http.HttpServletRequest;
-import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -43,34 +42,40 @@ public class TenantsServiceImpl implements TenantsService {
 
     private static final Logger log = LoggerFactory.getLogger(TenantsServiceImpl.class);
 
-
     @Value("${backend.directRequestBroker}")
     private boolean directRequestBroker;
 
-    @Value("${backend.jwt.token}")
-    private String pulsarJwtToken;
+    private final BrokerStatsService brokerStatsService;
 
-    @Autowired
-    private BrokerStatsService brokerStatsService;
+    private final TopicsStatsRepository topicsStatsRepository;
 
-    @Autowired
-    private TopicsStatsRepository topicsStatsRepository;
+    private final PulsarAdminService pulsarAdminService;
+
+    private final HttpServletRequest request;
 
     @Autowired
-    private HttpServletRequest request;
+    public TenantsServiceImpl(BrokerStatsService brokerStatsService, TopicsStatsRepository topicsStatsRepository, PulsarAdminService pulsarAdminService, HttpServletRequest request) {
+        this.brokerStatsService = brokerStatsService;
+        this.topicsStatsRepository = topicsStatsRepository;
+        this.pulsarAdminService = pulsarAdminService;
+        this.request = request;
+    }
 
     public Map<String, Object> getTenantsList(Integer pageNum, Integer pageSize, String requestHost) {
         Map<String, Object> tenantsMap = Maps.newHashMap();
         List<Map<String, Object>> tenantsArray = new ArrayList<>();
         if (directRequestBroker) {
-            Gson gson = new Gson();
-            Map<String, String> header = Maps.newHashMap();
-            if (StringUtils.isNotBlank(pulsarJwtToken)) {
-                header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
+            List<String> tenantsList;
+            try {
+                tenantsList = pulsarAdminService.tenants(requestHost).getTenants();
+            } catch (PulsarAdminException e) {
+                PulsarAdminOperationException pulsarAdminOperationException
+                        = new PulsarAdminOperationException("Failed to get tenants list.");
+                log.error(pulsarAdminOperationException.getMessage(), e);
+                throw pulsarAdminOperationException;
             }
-            String result = HttpUtil.doGet( requestHost + "/admin/v2/tenants", header);
-            if (result != null) {
-                List<String> tenantsList = gson.fromJson(result, new TypeToken<List<String>>(){}.getType());
+
+            if (!tenantsList.isEmpty()) {
                 Optional<TopicStatsEntity> topicStatsEntityOptional = topicsStatsRepository.findMaxTime();
                 Map<String, TopicStatsEntity> topicStatsEntityMap = Maps.newHashMap();
                 if (topicStatsEntityOptional.isPresent()) {
@@ -86,19 +91,29 @@ public class TenantsServiceImpl implements TenantsService {
                     }
                 }
                 for (String tenant : tenantsList) {
+                    TenantInfo tenantInfo;
+                    try {
+                        tenantInfo = pulsarAdminService.tenants(requestHost).getTenantInfo(tenant);
+                    } catch (PulsarAdminException e) {
+                        PulsarAdminOperationException pulsarAdminOperationException
+                                = new PulsarAdminOperationException("Failed to get tenant info.");
+                        log.error(pulsarAdminOperationException.getMessage(), e);
+                        throw pulsarAdminOperationException;
+                    }
                     Map<String, Object> tenantEntity = Maps.newHashMap();
-                    String info = HttpUtil.doGet( requestHost + "/admin/v2/tenants/" + tenant, header);
-                    TenantInfo tenantInfo = gson.fromJson(info, TenantInfo.class);
                     tenantEntity.put("tenant", tenant);
                     tenantEntity.put("adminRoles", String.join(",", tenantInfo.getAdminRoles()));
                     tenantEntity.put("allowedClusters", String.join(",",  tenantInfo.getAllowedClusters()));
-                    String namespace = HttpUtil.doGet(requestHost + "/admin/v2/namespaces/" + tenant, header);
-                    if (namespace != null) {
-                        List<String> namespacesList = gson.fromJson(namespace, new TypeToken<List<String>>(){}.getType());
-                        tenantEntity.put("namespaces", namespacesList.size());
-                    } else {
-                        tenantEntity.put("namespaces", 0);
+                    List<String> namespacesList;
+                    try {
+                        namespacesList = pulsarAdminService.namespaces(requestHost).getNamespaces(tenant);
+                    } catch (PulsarAdminException e) {
+                        PulsarAdminOperationException pulsarAdminOperationException
+                                = new PulsarAdminOperationException("Failed to get namespaces list.");
+                        log.error(pulsarAdminOperationException.getMessage(), e);
+                        throw pulsarAdminOperationException;
                     }
+                    tenantEntity.put("namespaces", namespacesList.size());
                     if (topicStatsEntityMap.get(tenant) != null ) {
                         TopicStatsEntity topicStatsEntity = topicStatsEntityMap.get(tenant);
                         tenantEntity.put("inMsg", topicStatsEntity.getMsgRateIn());
@@ -122,22 +137,15 @@ public class TenantsServiceImpl implements TenantsService {
     }
 
     public Map<String, String> createTenant(String tenant, String role, String cluster, String requestHost) {
-        Map<String, String> header = Maps.newHashMap();
-        header.put("Content-Type", "application/json");
-        if (StringUtils.isNotBlank(pulsarJwtToken)) {
-            header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
-        }
-        Map<String, Object> body = Maps.newHashMap();
-        body.put("adminRoles", Sets.newHashSet(role));
-        // Get cluster from standalone, to do
-        body.put("allowedClusters", Sets.newHashSet(cluster));
-        Gson gson = new Gson();
+        TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet(role), Sets.newHashSet(cluster));
         Map<String, String> result = Maps.newHashMap();
         try {
-            HttpUtil.doPut( requestHost + "/admin/v2/tenants/" + tenant, header, gson.toJson(body));
+            pulsarAdminService.tenants(requestHost).createTenant(tenant, tenantInfo);
             result.put("message", "Create tenant success");
-        } catch (UnsupportedEncodingException e) {
-            log.error("Init tenant failed for user: {}", tenant);
+        } catch (PulsarAdminException e) {
+            PulsarAdminOperationException pulsarAdminOperationException
+                    = new PulsarAdminOperationException("Failed to create tenant.");
+            log.error(pulsarAdminOperationException.getMessage(), e);
             result.put("error", "Create tenant failed");
         }
         return result;
diff --git a/src/main/java/org/apache/pulsar/manager/service/impl/TopicsServiceImpl.java b/src/main/java/org/apache/pulsar/manager/service/impl/TopicsServiceImpl.java
index 2503747..6013b21 100644
--- a/src/main/java/org/apache/pulsar/manager/service/impl/TopicsServiceImpl.java
+++ b/src/main/java/org/apache/pulsar/manager/service/impl/TopicsServiceImpl.java
@@ -15,14 +15,24 @@ package org.apache.pulsar.manager.service.impl;
 
 import com.github.pagehelper.Page;
 import com.google.common.collect.Maps;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.Gson;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.manager.controller.exception.PulsarAdminOperationException;
 import org.apache.pulsar.manager.entity.TopicStatsEntity;
 import org.apache.pulsar.manager.entity.TopicStatsEntity.TopicStatsSummary;
 import org.apache.pulsar.manager.entity.TopicsStatsRepository;
+import org.apache.pulsar.manager.service.PulsarAdminService;
 import org.apache.pulsar.manager.service.TopicsService;
-import org.apache.pulsar.manager.utils.HttpUtil;
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -34,19 +44,21 @@ import java.util.*;
 @Service
 public class TopicsServiceImpl implements TopicsService {
 
+    private static final Logger log = LoggerFactory.getLogger(TopicsServiceImpl.class);
+
     public static final String PARTITIONED_TOPIC_SUFFIX = "-partition-";
 
     @Value("${backend.directRequestBroker}")
     private boolean directRequestBroker;
 
-    @Value("${backend.jwt.token}")
-    private String pulsarJwtToken;
-
     private final TopicsStatsRepository topicsStatsRepository;
 
+    private final PulsarAdminService pulsarAdminService;
+
     @Autowired
-    public TopicsServiceImpl(TopicsStatsRepository topicsStatsRepository) {
+    public TopicsServiceImpl(TopicsStatsRepository topicsStatsRepository, PulsarAdminService pulsarAdminService) {
         this.topicsStatsRepository = topicsStatsRepository;
+        this.pulsarAdminService = pulsarAdminService;
     }
 
     private boolean isPartitionedTopic(List<String> topics, String topic) {
@@ -67,23 +79,52 @@ public class TopicsServiceImpl implements TopicsService {
             String env,
             String serviceUrl) {
         Map<String, Object> topicsMap = Maps.newHashMap();
-        List<Map<String, String>> persistentTopics = this.getTopicListByHttp(
-                tenant, namespace, "persistent", serviceUrl);
-        List<Map<String, Object>> persistentTopicsArray = this.getTopicsStatsList(
-                env, tenant, namespace, "persistent", persistentTopics);
-        List<Map<String, String>> nonPersistentTopics = this.getTopicListByHttp(
-                tenant, namespace, "non-persistent", serviceUrl);
-        List<Map<String, Object>> nonPersistentTopicsArray = this.getTopicsStatsList(
-                env, tenant, namespace, "non-persistent", nonPersistentTopics);
-        persistentTopicsArray.addAll(nonPersistentTopicsArray);
-        topicsMap.put("topics", persistentTopicsArray);
+        List<Map<String, Object>> allTopics = this.getTopicStats(env, tenant, namespace, serviceUrl);
+        topicsMap.put("topics", allTopics);
         topicsMap.put("isPage", false);
-        topicsMap.put("total", persistentTopicsArray.size());
+        topicsMap.put("total", allTopics.size());
         topicsMap.put("pageNum", 1);
-        topicsMap.put("pageSize", persistentTopicsArray.size());
+        topicsMap.put("pageSize", allTopics.size());
         return topicsMap;
     }
 
+    private List<Map<String, Object>> getTopicStats(
+            String env, String tenant, String namespace, String requestHost) {
+        List<Map<String, Object>> result = new ArrayList<>();
+
+        Map<String, List<String>> allTopics
+                = this.getTopicListByPulsarAdmin(tenant, namespace, requestHost);
+        Map<String, List<String>> allPartitionedTopics
+                = this.getPartitionedTopicListByPulsarAdmin(tenant, namespace, requestHost);
+
+        result.addAll(this.getTopicsStatsList(
+                env,
+                tenant,
+                namespace,
+                TopicDomain.persistent.toString(),
+                this.convertTopicList(
+                        allTopics.get(TopicDomain.persistent.toString()),
+                        allPartitionedTopics.get(TopicDomain.persistent.toString()),
+                        TopicDomain.persistent.toString(),
+                        requestHost
+                )
+        ));
+        result.addAll(this.getTopicsStatsList(
+                env,
+                tenant,
+                namespace,
+                TopicDomain.non_persistent.toString(),
+                this.convertTopicList(
+                        allTopics.get(TopicDomain.non_persistent.toString()),
+                        allPartitionedTopics.get(TopicDomain.non_persistent.toString()),
+                        TopicDomain.non_persistent.toString(),
+                        requestHost
+                )
+        ));
+
+        return result;
+    }
+
     public List<Map<String, Object>> getTopicsStatsList(String env,
                                                         String tenant,
                                                         String namespace,
@@ -179,84 +220,126 @@ public class TopicsServiceImpl implements TopicsService {
     public Map<String, Object> getTopicsList(
             Integer pageNum, Integer pageSize, String tenant, String namespace, String requestHost) {
         Map<String, Object> topicsMap = Maps.newHashMap();
-        List<Map<String, String>> persistentTopic = this.getTopicListByHttp(
-                tenant, namespace, "persistent", requestHost);
-        List<Map<String, String>> nonPersistentTopic = this.getTopicListByHttp(
-                tenant, namespace, "non-persistent", requestHost);
-        persistentTopic.addAll(nonPersistentTopic);
-        topicsMap.put("topics", persistentTopic);
+        List<Map<String, String>> allTopics = this.getTopicsList(tenant, namespace, requestHost);
+        topicsMap.put("topics", allTopics);
         topicsMap.put("isPage", false);
-        topicsMap.put("total", persistentTopic.size());
+        topicsMap.put("total", allTopics.size());
         topicsMap.put("pageNum", 1);
-        topicsMap.put("pageSize", persistentTopic.size());
+        topicsMap.put("pageSize", allTopics.size());
         return topicsMap;
     }
 
-    private List<Map<String, String>> getTopicListByHttp(
-            String tenant, String namespace, String persistent, String requestHost) {
-        List<Map<String, String>> topicsArray = new ArrayList<>();
-        Map<String, String> header = Maps.newHashMap();
-        if (StringUtils.isNotBlank(pulsarJwtToken)) {
-            header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
+    @Override
+    public List<Map<String, Object>> peekMessages(String persistent,
+                                                  String tenant,
+                                                  String namespace,
+                                                  String topic,
+                                                  String subName,
+                                                  Integer messagePosition,
+                                                  String requestHost) {
+        List<Message<byte[]>> messages;
+        String topicFullPath = persistent + "://" + tenant + "/" + namespace + "/" + topic;
+        try {
+            messages = pulsarAdminService.topics(requestHost).peekMessages(topicFullPath, subName, messagePosition);
+        } catch(PulsarAdminException e) {
+            PulsarAdminOperationException pulsarAdminOperationException
+                    = new PulsarAdminOperationException("Failed to peek messages.");
+            log.error(pulsarAdminOperationException.getMessage(), e);
+            throw pulsarAdminOperationException;
         }
-        String prefix = "/admin/v2/" + persistent + "/" + tenant + "/" + namespace;
-        Gson gson = new Gson();
-        String partitionedUrl = requestHost + prefix + "/partitioned";
-        String partitionedTopic = HttpUtil.doGet(partitionedUrl, header);
-        List<String> partitionedTopicsList = Arrays.asList();
-        Map<String, List<String>> partitionedMap = Maps.newHashMap();
-        if (partitionedTopic != null) {
-            partitionedTopicsList = gson.fromJson(
-                    partitionedTopic, new TypeToken<List<String>>(){}.getType());
-            for (String p : partitionedTopicsList) {
-                if (p.startsWith(persistent)) {
-                    partitionedMap.put(this.getTopicName(p), new ArrayList<>());
-                }
+        List<Map<String, Object>> mapList = new ArrayList<>();
+        for (Message<byte[]> msg: messages) {
+            Map<String, Object> message = Maps.newHashMap();
+            if (msg.getMessageId() instanceof BatchMessageIdImpl) {
+                BatchMessageIdImpl msgId = (BatchMessageIdImpl) msg.getMessageId();
+                message.put("ledgerId", msgId.getLedgerId());
+                message.put("entryId", msgId.getEntryId());
+                message.put("batchIndex", msgId.getBatchIndex());
+                message.put("batch", true);
+            } else {
+                MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
+                message.put("ledgerId", msgId.getLedgerId());
+                message.put("entryId", msgId.getEntryId());
+                message.put("batch", false);
             }
+            if (msg.getProperties().size() > 0) {
+                msg.getProperties().forEach((k, v) -> message.put(k, v));
+            }
+            message.put("data", msg.getData());
+            mapList.add(message);
         }
+        return mapList;
+    }
 
-        String topicUrl = requestHost + prefix;
-        String topics = HttpUtil.doGet(topicUrl, header);
-        if (topics != null) {
-            List<String> topicsList = gson.fromJson(
-                    topics, new TypeToken<List<String>>(){}.getType());
-            for (String topic: topicsList) {
-                if (topic.startsWith(persistent)) {
-                    String topicName = this.getTopicName(topic);
-                    Map<String, String> topicEntity = Maps.newHashMap();
-                    if (isPartitionedTopic(partitionedTopicsList, topic)) {
-                        String[] name = topicName.split(PARTITIONED_TOPIC_SUFFIX);
-                        partitionedMap.get(name[0]).add(topicName);
-                    } else {
-                        topicEntity.put("topic", topicName);
-                        topicEntity.put("partitions", "0");
-                        topicEntity.put("persistent", persistent);
-                        topicsArray.add(topicEntity);
-                    }
-                }
+    private List<Map<String, String>> getTopicsList(
+            String tenant, String namespace, String requestHost) {
+        List<Map<String, String>> result = new ArrayList<>();
+
+        Map<String, List<String>> allTopics
+                = this.getTopicListByPulsarAdmin(tenant, namespace, requestHost);
+        Map<String, List<String>> allPartitionedTopics
+                = this.getPartitionedTopicListByPulsarAdmin(tenant, namespace, requestHost);
+
+        result.addAll(this.convertTopicList(
+                allTopics.get(TopicDomain.persistent.toString()),
+                allPartitionedTopics.get(TopicDomain.persistent.toString()),
+                TopicDomain.persistent.toString(),
+                requestHost
+        ));
+        result.addAll(this.convertTopicList(
+                allTopics.get(TopicDomain.non_persistent.toString()),
+                allPartitionedTopics.get(TopicDomain.non_persistent.toString()),
+                TopicDomain.non_persistent.toString(),
+                requestHost
+        ));
+
+        return result;
+    }
+
+    private List<Map<String, String>> convertTopicList(
+            List<String> topics, List<String> partitionedTopics, String persistent, String requestHost) {
+        List<Map<String, String>> topicsArray = new ArrayList<>();
+
+        Map<String, List<String>> partitionedMap = Maps.newHashMap();
+        for (String p : partitionedTopics) {
+            if (p.startsWith(persistent)) {
+                partitionedMap.put(this.getTopicName(p), new ArrayList<>());
             }
         }
-        if (partitionedTopicsList != null) {
-            for (String s : partitionedTopicsList) {
-                String topicName = this.getTopicName(s);
+
+        for (String topic: topics) {
+            if (topic.startsWith(persistent)) {
+                String topicName = this.getTopicName(topic);
                 Map<String, String> topicEntity = Maps.newHashMap();
-                List<String> partitionedTopicList = partitionedMap.get(s);
-                if (partitionedTopicList != null && partitionedTopicList.size() > 0) {
-                    topicEntity.put("topic", topicName);
-                    topicEntity.put("partitions", String.valueOf(partitionedTopicList.size()));
-                    topicEntity.put("persistent", persistent);
+                if (isPartitionedTopic(partitionedTopics, topic)) {
+                    String[] name = topicName.split(PARTITIONED_TOPIC_SUFFIX);
+                    partitionedMap.get(name[0]).add(topicName);
                 } else {
                     topicEntity.put("topic", topicName);
-                    String metadataTopicUrl = requestHost + prefix + "/" + topicName + "/partitions";
-                    String metadataTopic = HttpUtil.doGet(metadataTopicUrl, header);
-                    Map<String, Integer> metadata = gson.fromJson(
-                            metadataTopic, new TypeToken<Map<String, Integer>>(){}.getType());
-                    topicEntity.put("partitions", String.valueOf(metadata.get("partitions")));
+                    topicEntity.put("partitions", "0");
                     topicEntity.put("persistent", persistent);
+                    topicsArray.add(topicEntity);
                 }
-                topicsArray.add(topicEntity);
             }
         }
+
+        for (String s : partitionedTopics) {
+            String topicName = this.getTopicName(s);
+            Map<String, String> topicEntity = Maps.newHashMap();
+            List<String> partitionedTopicList = partitionedMap.get(s);
+            if (partitionedTopicList != null && partitionedTopicList.size() > 0) {
+                topicEntity.put("topic", topicName);
+                topicEntity.put("partitions", String.valueOf(partitionedTopicList.size()));
+                topicEntity.put("persistent", persistent);
+            } else {
+                topicEntity.put("topic", topicName);
+                PartitionedTopicMetadata partitionedTopicMetadata
+                        = this.getPartitionedTopicMetadataByPulsarAdmin(s, requestHost);
+                topicEntity.put("partitions", String.valueOf(partitionedTopicMetadata.partitions));
+                topicEntity.put("persistent", persistent);
+            }
+            topicsArray.add(topicEntity);
+        }
         return topicsArray;
     }
 
@@ -265,4 +348,66 @@ public class TopicsServiceImpl implements TopicsService {
         String topicName = tntPath.split("/")[2];
         return topicName;
     }
+
+    private Map<String, List<String>> getTopicListByPulsarAdmin(
+            String tenant, String namespace, String requestHost) {
+        try {
+            return parseTopics(
+                    pulsarAdminService.topics(requestHost).
+                            getList(tenant + "/" + namespace)
+            );
+        } catch (PulsarAdminException e) {
+            PulsarAdminOperationException pulsarAdminOperationException
+                    = new PulsarAdminOperationException("Failed to get topic list.");
+            log.error(pulsarAdminOperationException.getMessage(), e);
+            throw pulsarAdminOperationException;
+        }
+    }
+
+    private Map<String, List<String>> getPartitionedTopicListByPulsarAdmin(
+            String tenant, String namespace, String requestHost) {
+        try {
+            return parseTopics(
+                    pulsarAdminService.topics(requestHost).
+                            getPartitionedTopicList(tenant + "/" + namespace)
+            );
+        } catch (PulsarAdminException e) {
+            PulsarAdminOperationException pulsarAdminOperationException
+                    = new PulsarAdminOperationException("Failed to get partitioned topic list.");
+            log.error(pulsarAdminOperationException.getMessage(), e);
+            throw pulsarAdminOperationException;
+        }
+    }
+
+    private PartitionedTopicMetadata getPartitionedTopicMetadataByPulsarAdmin(
+            String topic, String requestHost) {
+        try {
+            return pulsarAdminService.topics(requestHost).
+                    getPartitionedTopicMetadata(topic);
+        } catch (PulsarAdminException e) {
+            PulsarAdminOperationException pulsarAdminOperationException
+                    = new PulsarAdminOperationException("Failed to get partitioned topic metadata.");
+            log.error(pulsarAdminOperationException.getMessage(), e);
+            throw pulsarAdminOperationException;
+        }
+    }
+
+    private Map<String, List<String>> parseTopics(List<String> topics) {
+        Map<String, List<String>> result = new HashMap<>();
+        List<String> persistentTopics = new ArrayList<>();
+        List<String> nonPersistentTopics = new ArrayList<>();
+
+        for (String topic : topics) {
+            TopicName topicName  = TopicName.get(topic);
+            if (TopicDomain.persistent.equals(topicName.getDomain())) {
+                persistentTopics.add(topic);
+            } else {
+                nonPersistentTopics.add(topic);
+            }
+        }
+
+        result.put(TopicDomain.persistent.toString(), persistentTopics);
+        result.put(TopicDomain.non_persistent.toString(), nonPersistentTopics);
+        return result;
+    }
 }
diff --git a/src/main/java/org/apache/pulsar/manager/zuul/EnvironmentForward.java b/src/main/java/org/apache/pulsar/manager/zuul/EnvironmentForward.java
index d1fb323..d3336c0 100644
--- a/src/main/java/org/apache/pulsar/manager/zuul/EnvironmentForward.java
+++ b/src/main/java/org/apache/pulsar/manager/zuul/EnvironmentForward.java
@@ -13,24 +13,25 @@
  */
 package org.apache.pulsar.manager.zuul;
 
-import org.apache.pulsar.manager.service.EnvironmentCacheService;
 import com.netflix.zuul.ZuulFilter;
 import com.netflix.zuul.context.RequestContext;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Map;
+import javax.servlet.http.HttpServletRequest;
+
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.manager.service.EnvironmentCacheService;
+import org.apache.pulsar.manager.service.PulsarAdminService;
 import org.apache.pulsar.manager.service.PulsarEvent;
 import org.apache.pulsar.manager.service.RolesService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.cloud.netflix.zuul.filters.support.FilterConstants;
 import org.springframework.stereotype.Component;
 
-import javax.servlet.http.HttpServletRequest;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
 import static org.springframework.cloud.netflix.zuul.filters.support.FilterConstants.PRE_TYPE;
 import static org.springframework.cloud.netflix.zuul.filters.support.FilterConstants.REQUEST_URI_KEY;
 
@@ -42,21 +43,22 @@ public class EnvironmentForward extends ZuulFilter {
 
     private static final Logger log = LoggerFactory.getLogger(EnvironmentForward.class);
 
-    @Value("${backend.jwt.token}")
-    private String pulsarJwtToken;
-
     private final EnvironmentCacheService environmentCacheService;
 
     private final PulsarEvent pulsarEvent;
 
     private final RolesService rolesService;
 
+    private final PulsarAdminService pulsarAdminService;
+
     @Autowired
     public EnvironmentForward(
-            EnvironmentCacheService environmentCacheService, PulsarEvent pulsarEvent, RolesService rolesService) {
+            EnvironmentCacheService environmentCacheService, PulsarEvent pulsarEvent,
+            RolesService rolesService, PulsarAdminService pulsarAdminService) {
         this.environmentCacheService = environmentCacheService;
         this.pulsarEvent = pulsarEvent;
         this.rolesService = rolesService;
+        this.pulsarAdminService = pulsarAdminService;
     }
 
     @Override
@@ -129,7 +131,8 @@ public class EnvironmentForward extends ZuulFilter {
     private Object forwardRequest(RequestContext ctx, HttpServletRequest request, String serviceUrl) {
         ctx.put(REQUEST_URI_KEY, request.getRequestURI());
         try {
-            ctx.addZuulRequestHeader("Authorization", String.format("Bearer %s", pulsarJwtToken));
+            Map<String, String> authHeader = pulsarAdminService.getAuthHeader(serviceUrl);
+            authHeader.entrySet().forEach(entry -> ctx.addZuulRequestHeader(entry.getKey(), entry.getValue()));
             ctx.setRouteHost(new URL(serviceUrl));
             pulsarEvent.parsePulsarEvent(request.getRequestURI(), request);
             log.info("Forward request to {} @ path {}",
@@ -140,5 +143,4 @@ public class EnvironmentForward extends ZuulFilter {
         }
         return null;
     }
-
 }
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index 9305e60..46e0462 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -76,6 +76,11 @@ pagehelper.helperDialect=mysql
 backend.directRequestBroker=true
 backend.directRequestHost=http://localhost:8080
 backend.jwt.token=
+backend.broker.pulsarAdmin.authPlugin=
+backend.broker.pulsarAdmin.authParams=
+backend.broker.pulsarAdmin.tlsAllowInsecureConnection=false
+backend.broker.pulsarAdmin.tlsTrustCertsFilePath=
+backend.broker.pulsarAdmin.tlsEnableHostnameVerification=false
 
 jwt.secret=dab1c8ba-b01b-11e9-b384-186590e06885
 jwt.sessionTime=2592000
diff --git a/src/test/java/org/apache/pulsar/manager/dao/BrokerTokensRepositoryImplTest.java b/src/test/java/org/apache/pulsar/manager/dao/BrokerTokensRepositoryImplTest.java
index caedd68..2c73e1c 100644
--- a/src/test/java/org/apache/pulsar/manager/dao/BrokerTokensRepositoryImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/dao/BrokerTokensRepositoryImplTest.java
@@ -51,8 +51,8 @@ public class BrokerTokensRepositoryImplTest {
         Page<BrokerTokenEntity> brokerTokenEntityPage = brokerTokensRepository.getBrokerTokensList(1, 1);
         brokerTokenEntityPage.count(true);
         brokerTokenEntityPage.getResult().forEach((result) -> {
-            Assert.assertEquals(result.getRole(), brokerTokenEntity.getRole());
-            Assert.assertEquals(result.getDescription(), brokerTokenEntity.getDescription());
+            Assert.assertEquals(brokerTokenEntity.getRole(), result.getRole());
+            Assert.assertEquals(brokerTokenEntity.getDescription(), result.getDescription());
         });
 
         brokerTokenEntity.setDescription("This role for update test");
@@ -60,8 +60,8 @@ public class BrokerTokensRepositoryImplTest {
         brokerTokensRepository.update(brokerTokenEntity);
         Optional<BrokerTokenEntity> optionalBrokerTokenEntity = brokerTokensRepository.findTokenByRole(brokerTokenEntity.getRole());
         BrokerTokenEntity updatedBrokerTokenEntity = optionalBrokerTokenEntity.get();
-        Assert.assertEquals(updatedBrokerTokenEntity.getRole(), brokerTokenEntity.getRole());
-        Assert.assertEquals(updatedBrokerTokenEntity.getDescription(), brokerTokenEntity.getDescription());
+        Assert.assertEquals(brokerTokenEntity.getRole(), updatedBrokerTokenEntity.getRole());
+        Assert.assertEquals(brokerTokenEntity.getDescription(), updatedBrokerTokenEntity.getDescription());
 
         brokerTokensRepository.remove(brokerTokenEntity.getRole());
         Assert.assertFalse(brokerTokensRepository.findTokenByRole(brokerTokenEntity.getRole()).isPresent());
diff --git a/src/test/java/org/apache/pulsar/manager/dao/EnvironmentsRepositoryImplTest.java b/src/test/java/org/apache/pulsar/manager/dao/EnvironmentsRepositoryImplTest.java
index e2553cd..6eab859 100644
--- a/src/test/java/org/apache/pulsar/manager/dao/EnvironmentsRepositoryImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/dao/EnvironmentsRepositoryImplTest.java
@@ -50,8 +50,8 @@ public class EnvironmentsRepositoryImplTest {
         Page<EnvironmentEntity> environmentEntityPage = environmentsRepository.getEnvironmentsList(1, 1);
         environmentEntityPage.count(true);
         environmentEntityPage.getResult().forEach((result) -> {
-            Assert.assertEquals(result.getName(), "test-environment");
-            Assert.assertEquals(result.getBroker(), "http://localhost:8080");
+            Assert.assertEquals("test-environment", result.getName());
+            Assert.assertEquals("http://localhost:8080", result.getBroker());
             environmentsRepository.remove(result.getName());
         });
     }
@@ -65,16 +65,16 @@ public class EnvironmentsRepositoryImplTest {
         Optional<EnvironmentEntity> environmentEntityOptionalGet = environmentsRepository
                 .findByBroker("https://localhost:8080");
         EnvironmentEntity environmentEntityGet = environmentEntityOptionalGet.get();
-        Assert.assertEquals(environmentEntityGet.getName(), "test-environment");
-        Assert.assertEquals(environmentEntityGet.getBroker(), "https://localhost:8080");
+        Assert.assertEquals("test-environment", environmentEntityGet.getName());
+        Assert.assertEquals("https://localhost:8080", environmentEntityGet.getBroker());
 
         environmentEntity.setBroker("https://localhost:8081");
         environmentsRepository.update(environmentEntity);
         Optional<EnvironmentEntity> environmentEntityOptionalUpdate = environmentsRepository
                 .findByName("test-environment");
         EnvironmentEntity environmentEntityUpdate = environmentEntityOptionalUpdate.get();
-        Assert.assertEquals(environmentEntityUpdate.getName(), "test-environment");
-        Assert.assertEquals(environmentEntityUpdate.getBroker(), "https://localhost:8081");
+        Assert.assertEquals("test-environment", environmentEntityUpdate.getName());
+        Assert.assertEquals("https://localhost:8081", environmentEntityUpdate.getBroker());
 
         environmentsRepository.remove(environmentEntityUpdate.getName());
     }
diff --git a/src/test/java/org/apache/pulsar/manager/dao/NamespacesRepositoryImplTest.java b/src/test/java/org/apache/pulsar/manager/dao/NamespacesRepositoryImplTest.java
index c8e2efe..c8eb360 100644
--- a/src/test/java/org/apache/pulsar/manager/dao/NamespacesRepositoryImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/dao/NamespacesRepositoryImplTest.java
@@ -57,16 +57,16 @@ public class NamespacesRepositoryImplTest {
 
     public void checkResult(Page<NamespaceEntity> namespacesEntityPage) {
         long total = namespacesEntityPage.getTotal();
-        Assert.assertEquals(total, 1);
+        Assert.assertEquals(1, total);
         namespacesEntityPage.getResult().forEach((result) -> {
-            Assert.assertEquals(result.getTenant(), "test-namespace-public");
-            Assert.assertEquals(result.getNamespace(), "test-namespace-default");
+            Assert.assertEquals("test-namespace-public", result.getTenant());
+            Assert.assertEquals("test-namespace-default", result.getNamespace());
         });
     }
 
     public void checkDeleteResult(Page<NamespaceEntity> namespacesEntityPage) {
         long total = namespacesEntityPage.getTotal();
-        Assert.assertEquals(total, 0);
+        Assert.assertEquals(0, total);
     }
 
     @Before
@@ -168,8 +168,8 @@ public class NamespacesRepositoryImplTest {
         idList.add(namespaceId);
         Optional<NamespaceEntity> namespacesEntityOptional = namespacesRepository.findByTenantNamespace(
                 namespacesEntity.getTenant(), namespacesEntity.getNamespace());
-        Assert.assertEquals(namespacesEntityOptional.get().getTenant(), "test-namespace-public");
-        Assert.assertEquals(namespacesEntityOptional.get().getNamespace(), "test-namespace-default");
+        Assert.assertEquals("test-namespace-public", namespacesEntityOptional.get().getTenant());
+        Assert.assertEquals("test-namespace-default", namespacesEntityOptional.get().getNamespace());
         namespacesRepository.remove(namespacesEntity.getTenant(), namespacesEntity.getNamespace());
     }
 
@@ -179,8 +179,8 @@ public class NamespacesRepositoryImplTest {
         initNamespaceEntity(namespacesEntity);
         long namespaceId = namespacesRepository.save(namespacesEntity);
         Optional<NamespaceEntity> namespacesEntityOptional = namespacesRepository.findByNamespaceId(namespaceId);
-        Assert.assertEquals(namespacesEntityOptional.get().getTenant(), "test-namespace-public");
-        Assert.assertEquals(namespacesEntityOptional.get().getNamespace(), "test-namespace-default");
+        Assert.assertEquals("test-namespace-public", namespacesEntityOptional.get().getTenant());
+        Assert.assertEquals("test-namespace-default", namespacesEntityOptional.get().getNamespace());
         namespacesRepository.remove(namespacesEntity.getTenant(), namespacesEntity.getNamespace());
     }
 
diff --git a/src/test/java/org/apache/pulsar/manager/dao/RoleBindingRepositoryImplTest.java b/src/test/java/org/apache/pulsar/manager/dao/RoleBindingRepositoryImplTest.java
index 6a95a08..ed71bdf 100644
--- a/src/test/java/org/apache/pulsar/manager/dao/RoleBindingRepositoryImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/dao/RoleBindingRepositoryImplTest.java
@@ -52,10 +52,10 @@ public class RoleBindingRepositoryImplTest {
         Page<RoleBindingEntity> roleBindingEntities = roleBindingRepository.findByUserId(
                 1, 2, 2);
         roleBindingEntities.getResult().forEach((r) -> {
-            Assert.assertEquals(r.getRoleBindingId(), 1);
-            Assert.assertEquals(r.getName(), "test-role-binding");
-            Assert.assertEquals(r.getRoleId(), 1);
-            Assert.assertEquals(r.getDescription(), "this is description");
+            Assert.assertEquals(1, r.getRoleBindingId());
+            Assert.assertEquals("test-role-binding", r.getName());
+            Assert.assertEquals(1, r.getRoleId());
+            Assert.assertEquals("this is description", r.getDescription());
         });
 
         roleBindingEntity.setName("update-role-binding");
@@ -64,10 +64,10 @@ public class RoleBindingRepositoryImplTest {
         Page<RoleBindingEntity> updateRoleBindingEntities = roleBindingRepository.findByUserId(
                 1, 2, 2);
         updateRoleBindingEntities.getResult().forEach((r) -> {
-            Assert.assertEquals(r.getRoleBindingId(), 1);
-            Assert.assertEquals(r.getName(), "update-role-binding");
-            Assert.assertEquals(r.getRoleId(), 1);
-            Assert.assertEquals(r.getDescription(), "this is update description");
+            Assert.assertEquals(1, r.getRoleBindingId());
+            Assert.assertEquals("update-role-binding", r.getName());
+            Assert.assertEquals(1, r.getRoleId());
+            Assert.assertEquals("this is update description", r.getDescription());
         });
     }
 
diff --git a/src/test/java/org/apache/pulsar/manager/dao/RolesRepositoryImplTest.java b/src/test/java/org/apache/pulsar/manager/dao/RolesRepositoryImplTest.java
index 4688074..30148fa 100644
--- a/src/test/java/org/apache/pulsar/manager/dao/RolesRepositoryImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/dao/RolesRepositoryImplTest.java
@@ -55,14 +55,14 @@ public class RolesRepositoryImplTest {
     }
 
     private void validateRole(RoleInfoEntity role) {
-        Assert.assertEquals(role.getRoleName(), "test-role-name");
-        Assert.assertEquals(role.getRoleSource(), "test-tenant");
-        Assert.assertEquals(role.getResourceType(), "tenants");
-        Assert.assertEquals(role.getResourceName(), "tenants");
-        Assert.assertEquals(role.getResourceVerbs(), "admin");
-        Assert.assertEquals(role.getResourceId(), 2);
-        Assert.assertEquals(role.getDescription(), "This is tenants permissions");
-        Assert.assertEquals(role.getFlag(), 0);
+        Assert.assertEquals("test-role-name", role.getRoleName());
+        Assert.assertEquals("test-tenant", role.getRoleSource());
+        Assert.assertEquals("tenants", role.getResourceType());
+        Assert.assertEquals("tenants", role.getResourceName());
+        Assert.assertEquals("admin", role.getResourceVerbs());
+        Assert.assertEquals(2, role.getResourceId());
+        Assert.assertEquals("This is tenants permissions", role.getDescription());
+        Assert.assertEquals(0, role.getFlag());
     }
 
     @Test
@@ -115,10 +115,10 @@ public class RolesRepositoryImplTest {
         Optional<RoleInfoEntity> updateRoleInfo = rolesRepository.findByRoleName(
                 roleInfoEntity.getRoleName(), roleInfoEntity.getRoleSource());
         RoleInfoEntity updateRoleInfoEntity = updateRoleInfo.get();
-        Assert.assertEquals(updateRoleInfoEntity.getResourceType(), "clusters");
-        Assert.assertEquals(updateRoleInfoEntity.getResourceVerbs(), "admin,produce,consume");
-        Assert.assertEquals(updateRoleInfoEntity.getDescription(), "This is update role");
-        Assert.assertEquals(updateRoleInfoEntity.getFlag(), 1);
+        Assert.assertEquals("clusters", updateRoleInfoEntity.getResourceType());
+        Assert.assertEquals("admin,produce,consume", updateRoleInfoEntity.getResourceVerbs());
+        Assert.assertEquals("This is update role", updateRoleInfoEntity.getDescription());
+        Assert.assertEquals(1, updateRoleInfoEntity.getFlag());
 
         rolesRepository.delete(roleInfoEntity.getRoleName(), roleInfoEntity.getRoleSource());
         Optional<RoleInfoEntity> deleteRoleInfo = rolesRepository.findByRoleName(
diff --git a/src/test/java/org/apache/pulsar/manager/dao/TenantsRepositoryImplTest.java b/src/test/java/org/apache/pulsar/manager/dao/TenantsRepositoryImplTest.java
index d37d719..57c559b 100644
--- a/src/test/java/org/apache/pulsar/manager/dao/TenantsRepositoryImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/dao/TenantsRepositoryImplTest.java
@@ -56,13 +56,13 @@ public class TenantsRepositoryImplTest {
         Page<TenantEntity> tenantsEntities = tenantsRepository.getTenantsList(1, 10);
         tenantsEntities.count(true);
         long total = tenantsEntities.getTotal();
-        Assert.assertEquals(total, 10);
+        Assert.assertEquals(10, total);
         List<TenantEntity> tenantsEntityList = tenantsEntities.getResult();
         for (int i = 0; i < total; i ++) {
             TenantEntity tenantEntity = tenantsEntityList.get(i);
             Assert.assertEquals(tenantEntity.getTenant(), tenantEntity.getAdminRoles());
-            Assert.assertEquals(tenantEntity.getAllowedClusters(), "test-cluster");
-            Assert.assertEquals(tenantEntity.getEnvironmentName(), "test-environment");
+            Assert.assertEquals("test-cluster", tenantEntity.getAllowedClusters());
+            Assert.assertEquals("test-environment", tenantEntity.getEnvironmentName());
         }
         tenantsEntities.getResult().forEach((result) -> {
             tenantsRepository.remove(result.getTenant());
@@ -89,8 +89,8 @@ public class TenantsRepositoryImplTest {
         for (int i = 0; i < total; i ++) {
             TenantEntity tenantEntity = tenantsEntityList.get(i);
             Assert.assertEquals(tenantEntity.getTenant(), tenantEntity.getAdminRoles());
-            Assert.assertEquals(tenantEntity.getAllowedClusters(), "test-cluster");
-            Assert.assertEquals(tenantEntity.getEnvironmentName(), "test-environment");
+            Assert.assertEquals("test-cluster", tenantEntity.getAllowedClusters());
+            Assert.assertEquals("test-environment", tenantEntity.getEnvironmentName());
         }
         tenantEntityPage.getResult().forEach((result) -> {
             tenantsRepository.remove(result.getTenant());
@@ -107,10 +107,10 @@ public class TenantsRepositoryImplTest {
         tenantsRepository.save(tenantEntity);
         Optional<TenantEntity> result = tenantsRepository.findByName("test");
         TenantEntity getTenantEntity = result.get();
-        Assert.assertEquals(getTenantEntity.getTenant(), "test");
-        Assert.assertEquals(getTenantEntity.getAdminRoles(), "test-role");
-        Assert.assertEquals(getTenantEntity.getAllowedClusters(), "test-cluster");
-        Assert.assertEquals(getTenantEntity.getEnvironmentName(), "test-environment");
+        Assert.assertEquals("test", getTenantEntity.getTenant());
+        Assert.assertEquals("test-role", getTenantEntity.getAdminRoles());
+        Assert.assertEquals("test-cluster", getTenantEntity.getAllowedClusters());
+        Assert.assertEquals("test-environment", getTenantEntity.getEnvironmentName());
         tenantsRepository.remove("test");
     }
 
@@ -124,9 +124,9 @@ public class TenantsRepositoryImplTest {
         long tenantId = tenantsRepository.save(tenantEntity);
         Optional<TenantEntity> result = tenantsRepository.findByTenantId(tenantId);
         TenantEntity getTenantEntity = result.get();
-        Assert.assertEquals(getTenantEntity.getTenant(), "test");
-        Assert.assertEquals(getTenantEntity.getAdminRoles(), "test-role");
-        Assert.assertEquals(getTenantEntity.getAllowedClusters(), "test-cluster");
-        Assert.assertEquals(getTenantEntity.getEnvironmentName(), "test-environment");
+        Assert.assertEquals("test", getTenantEntity.getTenant());
+        Assert.assertEquals("test-role", getTenantEntity.getAdminRoles());
+        Assert.assertEquals("test-cluster", getTenantEntity.getAllowedClusters());
+        Assert.assertEquals("test-environment", getTenantEntity.getEnvironmentName());
     }
 }
\ No newline at end of file
diff --git a/src/test/java/org/apache/pulsar/manager/dao/UsersRepositoryImplTest.java b/src/test/java/org/apache/pulsar/manager/dao/UsersRepositoryImplTest.java
index 3d4d311..8e8583c 100644
--- a/src/test/java/org/apache/pulsar/manager/dao/UsersRepositoryImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/dao/UsersRepositoryImplTest.java
@@ -54,15 +54,15 @@ public class UsersRepositoryImplTest {
     }
 
     private void validateUser(UserInfoEntity user, boolean list) {
-        Assert.assertEquals(user.getName(), "test-user");
-        Assert.assertEquals(user.getExpire(), 157900045678l);
-        Assert.assertEquals(user.getPhoneNumber(), "1356789023456");
-        Assert.assertEquals(user.getDescription(), "test-description");
-        Assert.assertEquals(user.getLocation(), "bj");
-        Assert.assertEquals(user.getEmail(), "test@apache.org");
-        Assert.assertEquals(user.getAccessToken(), "test-access-token");
+        Assert.assertEquals("test-user", user.getName());
+        Assert.assertEquals(157900045678l, user.getExpire());
+        Assert.assertEquals("1356789023456", user.getPhoneNumber());
+        Assert.assertEquals("test-description", user.getDescription());
+        Assert.assertEquals("bj", user.getLocation());
+        Assert.assertEquals("test@apache.org", user.getEmail());
+        Assert.assertEquals("test-access-token", user.getAccessToken());
         if (!list) {
-            Assert.assertEquals(user.getPassword(), DigestUtils.sha256Hex("hello-world"));
+            Assert.assertEquals(DigestUtils.sha256Hex("hello-world"), user.getPassword());
         }
     }
 
@@ -95,8 +95,8 @@ public class UsersRepositoryImplTest {
 
         userInfoEntityOptional = usersRepository.findByUserName(userInfoEntity.getName());
         UserInfoEntity updateUserInfoEntity = userInfoEntityOptional.get();
-        Assert.assertEquals(updateUserInfoEntity.getPhoneNumber(), "1356789023456");
-        Assert.assertEquals(updateUserInfoEntity.getEmail(), "test2@apache.org");
+        Assert.assertEquals("1356789023456", updateUserInfoEntity.getPhoneNumber());
+        Assert.assertEquals("test2@apache.org", updateUserInfoEntity.getEmail());
 
         usersRepository.delete(updateUserInfoEntity.getName());
         userInfoEntityOptional = usersRepository.findByUserName(userInfoEntity.getName());
diff --git a/src/test/java/org/apache/pulsar/manager/service/BookiesServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/BookiesServiceImplTest.java
index 12902c8..8063ebb 100644
--- a/src/test/java/org/apache/pulsar/manager/service/BookiesServiceImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/BookiesServiceImplTest.java
@@ -71,8 +71,8 @@ public class BookiesServiceImplTest {
                 .thenReturn("{\"192.168.2.116:3181\" : \": {Free: 48920571904(48.92GB), Total: 250790436864(250.79GB)}," +
                         "\",\"ClusterInfo: \" : \"{Free: 48920571904(48.92GB), Total: 250790436864(250.79GB)}\" }");
         Map<String, Object> result = bookiesService.getBookiesList(1, 1, "standalone");
-        Assert.assertEquals(result.get("total"), 1);
-        Assert.assertEquals(result.get("data").toString(), "[{storage=[48920571904, 250790436864], bookie=192.168.2.116:3181, status=rw}]");
-        Assert.assertEquals(result.get("pageSize"), 1);
+        Assert.assertEquals(1, result.get("total"));
+        Assert.assertEquals("[{storage=[48920571904, 250790436864], bookie=192.168.2.116:3181, status=rw}]", result.get("data").toString());
+        Assert.assertEquals(1, result.get("pageSize"));
     }
 }
diff --git a/src/test/java/org/apache/pulsar/manager/service/BrokerStatsServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/BrokerStatsServiceImplTest.java
index 2049977..a15e312 100644
--- a/src/test/java/org/apache/pulsar/manager/service/BrokerStatsServiceImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/BrokerStatsServiceImplTest.java
@@ -15,6 +15,9 @@ package org.apache.pulsar.manager.service;
 
 import com.github.pagehelper.Page;
 import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import org.apache.pulsar.client.admin.BrokerStats;
 import org.apache.pulsar.manager.PulsarManagerApplication;
 import org.apache.pulsar.manager.entity.ConsumerStatsEntity;
 import org.apache.pulsar.manager.entity.ConsumersStatsRepository;
@@ -23,34 +26,28 @@ import org.apache.pulsar.manager.entity.PublishersStatsRepository;
 import org.apache.pulsar.manager.entity.ReplicationsStatsRepository;
 import org.apache.pulsar.manager.entity.SubscriptionStatsEntity;
 import org.apache.pulsar.manager.entity.SubscriptionsStatsRepository;
-import org.apache.pulsar.manager.utils.HttpUtil;
 import org.apache.pulsar.manager.entity.ReplicationStatsEntity;
 import org.apache.pulsar.manager.entity.TopicStatsEntity;
 import org.apache.pulsar.manager.entity.TopicsStatsRepository;
 import org.apache.pulsar.manager.profiles.HerdDBTestProfile;
-import org.apache.commons.lang3.StringUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
 import org.springframework.test.context.ActiveProfiles;
 import org.springframework.test.context.junit4.SpringRunner;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.List;
 import java.util.Optional;
 
-@RunWith(PowerMockRunner.class)
-@PowerMockRunnerDelegate(SpringRunner.class)
-@PowerMockIgnore( {"javax.*", "sun.*", "com.sun.*", "org.xml.*", "org.w3c.*"})
-@PrepareForTest(HttpUtil.class)
+@RunWith(SpringRunner.class)
 @SpringBootTest(
         classes = {
                 PulsarManagerApplication.class,
@@ -59,12 +56,17 @@ import java.util.Optional;
 )
 @ActiveProfiles("test")
 public class BrokerStatsServiceImplTest {
+    @MockBean
+    private PulsarAdminService pulsarAdminService;
 
     @Autowired
     private BrokerStatsService brokerStatsService;
 
-    @Value("${backend.jwt.token}")
-    private static String pulsarJwtToken;
+    @MockBean
+    private BrokersService brokersService;
+
+    @Mock
+    private BrokerStats stats;
 
     @Autowired
     private TopicsStatsRepository topicsStatsRepository;
@@ -151,97 +153,92 @@ public class BrokerStatsServiceImplTest {
             "}";
 
     private void checkTopicStatsResult(TopicStatsEntity topicStatsEntity) {
-        Assert.assertEquals(topicStatsEntity.getAverageMsgSize(), 0.0, 1);
-        Assert.assertEquals(topicStatsEntity.getMsgRateIn(), 0.0, 1);
-        Assert.assertEquals(topicStatsEntity.getMsgRateOut(), 0.0, 1);
-        Assert.assertEquals(topicStatsEntity.getMsgThroughputIn(), 0.0, 1);
-        Assert.assertEquals(topicStatsEntity.getMsgThroughputOut(), 0.0, 1);
-        Assert.assertEquals(topicStatsEntity.getStorageSize(), 0, 0);
-        Assert.assertEquals(topicStatsEntity.getCluster(), "standalone");
-        Assert.assertEquals(topicStatsEntity.getBroker(), "localhost:8080");
-        Assert.assertEquals(topicStatsEntity.getTenant(), "public");
-        Assert.assertEquals(topicStatsEntity.getNamespace(), "functions");
-        Assert.assertEquals(topicStatsEntity.getBundle(), "0x40000000_0x80000000");
-        Assert.assertEquals(topicStatsEntity.getPersistent(), "persistent");
-        Assert.assertEquals(topicStatsEntity.getTopic(), "metadata");
+        Assert.assertEquals(0.0, topicStatsEntity.getAverageMsgSize(), 1);
+        Assert.assertEquals(0.0, topicStatsEntity.getMsgRateIn(), 1);
+        Assert.assertEquals(0.0, topicStatsEntity.getMsgRateOut(), 1);
+        Assert.assertEquals(0.0, topicStatsEntity.getMsgThroughputIn(), 1);
+        Assert.assertEquals(0.0, topicStatsEntity.getMsgThroughputOut(), 1);
+        Assert.assertEquals(0, topicStatsEntity.getStorageSize(), 0);
+        Assert.assertEquals("standalone", topicStatsEntity.getCluster());
+        Assert.assertEquals("localhost:8080", topicStatsEntity.getBroker());
+        Assert.assertEquals("public", topicStatsEntity.getTenant());
+        Assert.assertEquals("functions", topicStatsEntity.getNamespace());
+        Assert.assertEquals("0x40000000_0x80000000", topicStatsEntity.getBundle());
+        Assert.assertEquals("persistent", topicStatsEntity.getPersistent());
+        Assert.assertEquals("metadata", topicStatsEntity.getTopic());
     }
 
     private void checkPublisherStatsResult(PublisherStatsEntity publisherStatsEntity) {
-        Assert.assertEquals(publisherStatsEntity.getMsgRateIn(), 0.0, 1);
-        Assert.assertEquals(publisherStatsEntity.getMsgThroughputIn(), 0.0, 1);
-        Assert.assertEquals(publisherStatsEntity.getAverageMsgSize(), 0.0, 1);
-        Assert.assertEquals(publisherStatsEntity.getAddress(), "/127.0.0.1:59668");
-        Assert.assertEquals(publisherStatsEntity.getProducerId(), 1);
-        Assert.assertEquals(publisherStatsEntity.getProducerName(), "standalone-1-1");
-        Assert.assertEquals(publisherStatsEntity.getConnectedSince(), "2019-08-10T11:37:22.405+08:00");
-        Assert.assertEquals(publisherStatsEntity.getClientVersion(), "2.5.0-SNAPSHOT");
-        Assert.assertEquals(publisherStatsEntity.getMetadata(), "{}");
+        Assert.assertEquals(0.0, publisherStatsEntity.getMsgRateIn(), 1);
+        Assert.assertEquals(0.0, publisherStatsEntity.getMsgThroughputIn(), 1);
+        Assert.assertEquals(0.0, publisherStatsEntity.getAverageMsgSize(), 1);
+        Assert.assertEquals("/127.0.0.1:59668", publisherStatsEntity.getAddress());
+        Assert.assertEquals(1, publisherStatsEntity.getProducerId());
+        Assert.assertEquals("standalone-1-1", publisherStatsEntity.getProducerName());
+        Assert.assertEquals("2019-08-10T11:37:22.405+08:00", publisherStatsEntity.getConnectedSince());
+        Assert.assertEquals("2.5.0-SNAPSHOT", publisherStatsEntity.getClientVersion());
+        Assert.assertEquals("{}", publisherStatsEntity.getMetadata());
     }
 
     private void checkReplicationStatsResult(ReplicationStatsEntity replicationStatsEntity) {
-        Assert.assertEquals(replicationStatsEntity.getCluster(), "test-replications");
-        Assert.assertEquals(replicationStatsEntity.getMsgRateIn(), 123, 0);
-        Assert.assertEquals(replicationStatsEntity.getMsgThroughputIn(), 456, 0);
-        Assert.assertEquals(replicationStatsEntity.getMsgRateOut(), 456, 0);
-        Assert.assertEquals(replicationStatsEntity.getMsgThroughputOut(), 789, 0);
-        Assert.assertEquals(replicationStatsEntity.getMsgRateExpired(), 990, 0);
-        Assert.assertEquals(replicationStatsEntity.getReplicationBacklog(), 100, 0);
+        Assert.assertEquals("test-replications", replicationStatsEntity.getCluster());
+        Assert.assertEquals(123, replicationStatsEntity.getMsgRateIn(), 0);
+        Assert.assertEquals(456, replicationStatsEntity.getMsgThroughputIn(), 0);
+        Assert.assertEquals(456, replicationStatsEntity.getMsgRateOut(), 0);
+        Assert.assertEquals(789, replicationStatsEntity.getMsgThroughputOut(), 0);
+        Assert.assertEquals(990, replicationStatsEntity.getMsgRateExpired(), 0);
+        Assert.assertEquals(100, replicationStatsEntity.getReplicationBacklog(), 0);
         Assert.assertFalse(replicationStatsEntity.isConnected());
-        Assert.assertEquals(replicationStatsEntity.getReplicationDelayInSeconds(), 890, 0);
-        Assert.assertEquals(replicationStatsEntity.getInboundConnection(), "test");
-        Assert.assertEquals(replicationStatsEntity.getInboundConnectedSince(), "test2");
-        Assert.assertEquals(replicationStatsEntity.getOutboundConnection(), "test3");
-        Assert.assertEquals(replicationStatsEntity.getOutboundConnectedSince(), "test4");
+        Assert.assertEquals(890, replicationStatsEntity.getReplicationDelayInSeconds(), 0);
+        Assert.assertEquals("test", replicationStatsEntity.getInboundConnection());
+        Assert.assertEquals("test2", replicationStatsEntity.getInboundConnectedSince());
+        Assert.assertEquals("test3", replicationStatsEntity.getOutboundConnection());
+        Assert.assertEquals("test4", replicationStatsEntity.getOutboundConnectedSince());
     }
 
     private void checkSubscriptionStatsResult(SubscriptionStatsEntity subscriptionStatsEntity) {
-        Assert.assertEquals(subscriptionStatsEntity.getSubscription(), "reader-1ddabdb183");
-        Assert.assertEquals(subscriptionStatsEntity.getMsgBacklog(), 0.0, 0);
-        Assert.assertEquals(subscriptionStatsEntity.getMsgRateOut(), 0.0, 0);
-        Assert.assertEquals(subscriptionStatsEntity.getMsgThroughputOut(), 0.0, 0);
-        Assert.assertEquals(subscriptionStatsEntity.getMsgRateExpired(), 0.0, 0);
-        Assert.assertEquals(subscriptionStatsEntity.getMsgRateRedeliver(), 0.0, 0);
-        Assert.assertEquals(subscriptionStatsEntity.getNumberOfEntriesSinceFirstNotAckedMessage(), 1);
-        Assert.assertEquals(subscriptionStatsEntity.getTotalNonContiguousDeletedMessagesRange(), 0);
-        Assert.assertEquals(subscriptionStatsEntity.getSubscriptionType(), "Exclusive");
+        Assert.assertEquals("reader-1ddabdb183", subscriptionStatsEntity.getSubscription());
+        Assert.assertEquals(0.0, subscriptionStatsEntity.getMsgBacklog(), 0);
+        Assert.assertEquals(0.0, subscriptionStatsEntity.getMsgRateOut(), 0);
+        Assert.assertEquals(0.0, subscriptionStatsEntity.getMsgThroughputOut(), 0);
+        Assert.assertEquals(0.0, subscriptionStatsEntity.getMsgRateExpired(), 0);
+        Assert.assertEquals( 0.0, subscriptionStatsEntity.getMsgRateRedeliver(), 0);
+        Assert.assertEquals(1, subscriptionStatsEntity.getNumberOfEntriesSinceFirstNotAckedMessage());
+        Assert.assertEquals(0, subscriptionStatsEntity.getTotalNonContiguousDeletedMessagesRange());
+        Assert.assertEquals("Exclusive", subscriptionStatsEntity.getSubscriptionType());
     }
 
     private void checkConsumerStatsResult(ConsumerStatsEntity consumerStatsEntity) {
-        Assert.assertEquals(consumerStatsEntity.getConsumer(), "543bd");
-        Assert.assertEquals(consumerStatsEntity.getAddress(), "/127.0.0.1:59668");
-        Assert.assertEquals(consumerStatsEntity.getConnectedSince(), "2019-08-10T11:37:24.306+08:00");
-        Assert.assertEquals(consumerStatsEntity.getAvailablePermits(), 1000, 0);
-        Assert.assertEquals(consumerStatsEntity.getMsgRateOut(), 0.0, 0);
-        Assert.assertEquals(consumerStatsEntity.getMsgThroughputOut(), 0.0, 0);
-        Assert.assertEquals(consumerStatsEntity.getMsgRateRedeliver(), 0.0, 0);
-        Assert.assertEquals(consumerStatsEntity.getClientVersion(), "2.5.0-SNAPSHOT");
-        Assert.assertEquals(consumerStatsEntity.getMetadata(), "{}");
+        Assert.assertEquals("543bd", consumerStatsEntity.getConsumer());
+        Assert.assertEquals("/127.0.0.1:59668", consumerStatsEntity.getAddress());
+        Assert.assertEquals("2019-08-10T11:37:24.306+08:00", consumerStatsEntity.getConnectedSince());
+        Assert.assertEquals(1000, consumerStatsEntity.getAvailablePermits(), 0);
+        Assert.assertEquals(0.0, consumerStatsEntity.getMsgRateOut(), 0);
+        Assert.assertEquals(0.0, consumerStatsEntity.getMsgThroughputOut(), 0);
+        Assert.assertEquals(0.0, consumerStatsEntity.getMsgRateRedeliver(), 0);
+        Assert.assertEquals("2.5.0-SNAPSHOT", consumerStatsEntity.getClientVersion());
+        Assert.assertEquals("{}", consumerStatsEntity.getMetadata());
     }
 
     @Test
-    public void convertStatsToDbTest() {
-        PowerMockito.mockStatic(HttpUtil.class);
-        Map<String, String> header = Maps.newHashMap();
-        if (StringUtils.isNotBlank(pulsarJwtToken)){
-            header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
-        }
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters", header))
-                .thenReturn("[\"standalone\"]");
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters/standalone", header))
-                .thenReturn("{\n" +
-                        "\"serviceUrl\" : \"http://tengdeMBP:8080\",\n" +
-                        "\"brokerServiceUrl\" : \"pulsar://tengdeMBP:6650\"\n" +
-                        "}");
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/brokers/standalone", header))
-                .thenReturn("[\"localhost:8080\"]");
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/broker-stats/topics", header))
-                .thenReturn(testData);
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters/standalone/failureDomains", header))
-                .thenReturn("{}");
-
+    public void convertStatsToDbTest() throws Exception {
         String environment = "staging";
         String cluster = "standalone";
         String serviceUrl = "http://localhost:8080";
+
+        Map<String, Object> brokersMap = new HashMap<>();
+        List<Map<String, Object>> brokersArray = new ArrayList<>();
+        Map<String, Object> brokerEntity = Maps.newHashMap();
+        brokerEntity.put("broker", "localhost:8080");
+        brokersArray.add(brokerEntity);
+        brokersMap.put("data", brokersArray);
+        Mockito.when(brokersService.getBrokersList(0,0, cluster, serviceUrl))
+                .thenReturn(brokersMap);
+        Mockito.when(pulsarAdminService.brokerStats(serviceUrl)).thenReturn(stats);
+        JsonObject data = new Gson().fromJson(testData, JsonObject.class);
+        Mockito.when(stats.getTopics())
+                .thenReturn(data);
+
         brokerStatsService.collectStatsToDB(
             System.currentTimeMillis() / 1000,
             environment,
@@ -275,53 +272,49 @@ public class BrokerStatsServiceImplTest {
         replicationStatsEntities.getResult().forEach((replication) -> {
             checkReplicationStatsResult(replication);
         });
-        long unixTime = System.currentTimeMillis() / 1000L;
-        brokerStatsService.clearStats(unixTime, 0);
 
         try {
-            Thread.sleep(2000);
+            Thread.sleep(1000);
         } catch (Exception e) {
 
         }
 
+        long unixTime = System.currentTimeMillis() / 1000L;
+        brokerStatsService.clearStats(unixTime, 0);
+
         Optional<TopicStatsEntity> deleteTopicStatsEntity = topicsStatsRepository.findMaxTime();
         Assert.assertFalse(deleteTopicStatsEntity.isPresent());
 
         Page<SubscriptionStatsEntity> deleteSubscriptionStatsEntities = subscriptionsStatsRepository.findByTopicStatsId(
                 1, 1, topicStatsEntity1.getTopicStatsId(), topicStatsEntity1.getTime_stamp());
-        Assert.assertEquals(deleteSubscriptionStatsEntities.getTotal(), 0);
+        Assert.assertEquals(0, deleteSubscriptionStatsEntities.getTotal());
         Page<PublisherStatsEntity> deletePublisherStatsEntities = publishersStatsRepository.findByTopicStatsId(
                 1, 1, topicStatsEntity1.getTopicStatsId(), topicStatsEntity1.getTime_stamp());
-        Assert.assertEquals(deletePublisherStatsEntities.getTotal(), 0);
+        Assert.assertEquals(0, deletePublisherStatsEntities.getTotal());
         Page<ReplicationStatsEntity> deleteReplicationStatsEntities = replicationsStatsRepository.findByTopicStatsId(
                 1, 1, topicStatsEntity1.getTopicStatsId(), topicStatsEntity1.getTime_stamp());
-        Assert.assertEquals(deleteReplicationStatsEntities.getTotal(), 0);
+        Assert.assertEquals(0, deleteReplicationStatsEntities.getTotal());
     }
 
     @Test
-    public void findByMultiTenantOrMultiNamespace() {
-        PowerMockito.mockStatic(HttpUtil.class);
-        Map<String, String> header = Maps.newHashMap();
-        if (StringUtils.isNotBlank(pulsarJwtToken)){
-            header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
-        }
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters", header))
-                .thenReturn("[\"standalone\"]");
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters/standalone", header))
-                .thenReturn("{\n" +
-                        "\"serviceUrl\" : \"http://tengdeMBP:8080\",\n" +
-                        "\"brokerServiceUrl\" : \"pulsar://tengdeMBP:6650\"\n" +
-                        "}");
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/brokers/standalone", header))
-                .thenReturn("[\"localhost:8080\"]");
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/broker-stats/topics", header))
-                .thenReturn(testData);
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters/standalone/failureDomains", header))
-                .thenReturn("{}");
-
+    public void findByMultiTenantOrMultiNamespace() throws Exception {
         String environment = "staging";
         String cluster = "standalone";
         String serviceUrl = "http://localhost:8080";
+
+        Map<String, Object> brokersMap = new HashMap<>();
+        List<Map<String, Object>> brokersArray = new ArrayList<>();
+        Map<String, Object> brokerEntity = Maps.newHashMap();
+        brokerEntity.put("broker", "localhost:8080");
+        brokersArray.add(brokerEntity);
+        brokersMap.put("data", brokersArray);
+        Mockito.when(brokersService.getBrokersList(0,0, cluster, serviceUrl))
+                .thenReturn(brokersMap);
+        Mockito.when(pulsarAdminService.brokerStats(serviceUrl)).thenReturn(stats);
+        JsonObject data = new Gson().fromJson(testData, JsonObject.class);
+        Mockito.when(stats.getTopics())
+                .thenReturn(data);
+
         brokerStatsService.collectStatsToDB(
                 System.currentTimeMillis() / 1000,
                 environment,
@@ -340,12 +333,12 @@ public class BrokerStatsServiceImplTest {
 
 
         tenantAllCountPage.getResult().forEach((result) -> {
-            Assert.assertEquals(result.getAverageMsgSize(), 0.0, 1);
-            Assert.assertEquals(result.getMsgRateIn(), 0.0, 1);
-            Assert.assertEquals(result.getMsgRateOut(), 0.0, 1);
-            Assert.assertEquals(result.getMsgThroughputIn(), 0.0, 1);
-            Assert.assertEquals(result.getMsgThroughputOut(), 0.0, 1);
-            Assert.assertEquals(result.getStorageSize(), 0, 0);
+            Assert.assertEquals(0.0, result.getAverageMsgSize(), 1);
+            Assert.assertEquals(0.0, result.getMsgRateIn(), 1);
+            Assert.assertEquals(0.0, result.getMsgRateOut(), 1);
+            Assert.assertEquals(0.0, result.getMsgThroughputIn(), 1);
+            Assert.assertEquals(0.0, result.getMsgThroughputOut(), 1);
+            Assert.assertEquals(0, result.getStorageSize(), 0);
         });
 
         ArrayList<String> namespaceList = new ArrayList<>();
@@ -360,12 +353,12 @@ public class BrokerStatsServiceImplTest {
 
 
         namespaceAllCountPage.getResult().forEach((result) -> {
-            Assert.assertEquals(result.getAverageMsgSize(), 0.0, 1);
-            Assert.assertEquals(result.getMsgRateIn(), 0.0, 1);
-            Assert.assertEquals(result.getMsgRateOut(), 0.0, 1);
-            Assert.assertEquals(result.getMsgThroughputIn(), 0.0, 1);
-            Assert.assertEquals(result.getMsgThroughputOut(), 0.0, 1);
-            Assert.assertEquals(result.getStorageSize(), 0, 0);
+            Assert.assertEquals(0.0, result.getAverageMsgSize(), 1);
+            Assert.assertEquals(0.0, result.getMsgRateIn(), 1);
+            Assert.assertEquals(0.0, result.getMsgRateOut(), 1);
+            Assert.assertEquals(0.0, result.getMsgThroughputIn(), 1);
+            Assert.assertEquals(0.0, result.getMsgThroughputOut(), 1);
+            Assert.assertEquals(0, result.getStorageSize(), 0);
         });
 
         long unixTime = System.currentTimeMillis() / 1000L;
diff --git a/src/test/java/org/apache/pulsar/manager/service/BrokerTokensServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/BrokerTokensServiceImplTest.java
index 06ec665..618dee7 100644
--- a/src/test/java/org/apache/pulsar/manager/service/BrokerTokensServiceImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/BrokerTokensServiceImplTest.java
@@ -54,6 +54,6 @@ public class BrokerTokensServiceImplTest {
         String role = "test";
         String token = jwtService.createBrokerToken(role, null);
         Claims jwtBody = jwtService.validateBrokerToken(token);
-        Assert.assertEquals(jwtBody.getSubject(), role);
+        Assert.assertEquals(role, jwtBody.getSubject());
     }
 }
diff --git a/src/test/java/org/apache/pulsar/manager/service/BrokersServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/BrokersServiceImplTest.java
index eea39cb..6139fc3 100644
--- a/src/test/java/org/apache/pulsar/manager/service/BrokersServiceImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/BrokersServiceImplTest.java
@@ -13,32 +13,29 @@
  */
 package org.apache.pulsar.manager.service;
 
-import com.google.common.collect.Maps;
+import org.apache.pulsar.client.admin.Brokers;
+import org.apache.pulsar.client.admin.Clusters;
+import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.manager.PulsarManagerApplication;
-import org.apache.pulsar.manager.entity.EnvironmentEntity;
 import org.apache.pulsar.manager.profiles.HerdDBTestProfile;
-import org.apache.pulsar.manager.utils.HttpUtil;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 
-import org.apache.commons.lang3.StringUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
 import org.springframework.test.context.ActiveProfiles;
 import org.springframework.test.context.junit4.SpringRunner;
 
-@RunWith(PowerMockRunner.class)
-@PowerMockRunnerDelegate(SpringRunner.class)
-@PowerMockIgnore( {"javax.*", "sun.*", "com.sun.*", "org.xml.*", "org.w3c.*"})
-@PrepareForTest(HttpUtil.class)
+@RunWith(SpringRunner.class)
 @SpringBootTest(
     classes = {
         PulsarManagerApplication.class,
@@ -48,32 +45,36 @@ import org.springframework.test.context.junit4.SpringRunner;
 @ActiveProfiles("test")
 public class BrokersServiceImplTest {
 
-    @Value("${backend.jwt.token}")
-    private static String pulsarJwtToken;
+    @MockBean
+    private PulsarAdminService pulsarAdminService;
 
     @Autowired
     private BrokersService brokersService;
 
+    @Mock
+    private Clusters clusters;
+
+    @Mock
+    private Brokers brokers;
+
     @Test
     public void brokersServiceTest() throws Exception{
-        PowerMockito.mockStatic(HttpUtil.class);
-        Map<String, String> header = Maps.newHashMap();
-        if (StringUtils.isNotBlank(pulsarJwtToken)) {
-            header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
-        }
+        FailureDomain fdomain = new FailureDomain();
+        fdomain.setBrokers(new HashSet<String>(Arrays.asList("broker-1:8080")));
+        Map<String, FailureDomain> fMap = new HashMap<>();
+        fMap.put("fdomain-1",fdomain);
+        Mockito.when(pulsarAdminService.clusters("http://localhost:8080")).thenReturn(clusters);
+        Mockito.when(clusters.getFailureDomains("standalone"))
+                .thenReturn(fMap);
 
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters/standalone/failureDomains", header))
-                .thenReturn("{\"test\":{\"brokers\":[\"tengdeMBP:8080\"]}}");
+        Mockito.when(pulsarAdminService.brokers("http://localhost:8080")).thenReturn(brokers);
+        Mockito.when(brokers.getActiveBrokers("standalone"))
+                .thenReturn(Arrays.asList("broker-1:8080"));
 
-        EnvironmentEntity environmentEntity = new EnvironmentEntity();
-        environmentEntity.setName("test-environment");
-        environmentEntity.setBroker("http://localhost:8080");
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/brokers/standalone", header))
-                .thenReturn("[\"tengdeMBP:8080\"]");
         Map<String, Object> result = brokersService.getBrokersList(
                 1, 1, "standalone", "http://localhost:8080");
-        Assert.assertEquals(result.get("total"), 1);
-        Assert.assertEquals(result.get("data").toString(), "[{failureDomain=[test], broker=tengdeMBP:8080}]");
-        Assert.assertEquals(result.get("pageSize"), 1);
+        Assert.assertEquals(1, result.get("total"));
+        Assert.assertEquals("[{failureDomain=[fdomain-1], broker=broker-1:8080}]", result.get("data").toString());
+        Assert.assertEquals(1, result.get("pageSize"));
     }
 }
diff --git a/src/test/java/org/apache/pulsar/manager/service/ClustersServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/ClustersServiceImplTest.java
index 00be004..97ef0e3 100644
--- a/src/test/java/org/apache/pulsar/manager/service/ClustersServiceImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/ClustersServiceImplTest.java
@@ -14,31 +14,30 @@
 package org.apache.pulsar.manager.service;
 
 import com.google.common.collect.Maps;
+
+import org.apache.pulsar.client.admin.Clusters;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.manager.PulsarManagerApplication;
 import org.apache.pulsar.manager.profiles.HerdDBTestProfile;
-import org.apache.pulsar.manager.utils.HttpUtil;
-import org.apache.commons.lang3.StringUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
 import org.springframework.test.context.ActiveProfiles;
 import org.springframework.test.context.junit4.SpringRunner;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-@RunWith(PowerMockRunner.class)
-@PowerMockRunnerDelegate(SpringRunner.class)
-@PowerMockIgnore( {"javax.*", "sun.*", "com.sun.*", "org.xml.*", "org.w3c.*"})
-@PrepareForTest(HttpUtil.class)
+@RunWith(SpringRunner.class)
 @SpringBootTest(
     classes = {
         PulsarManagerApplication.class,
@@ -51,47 +50,48 @@ public class ClustersServiceImplTest {
     @Autowired
     private ClustersService clustersService;
 
-    @Value("${backend.jwt.token}")
-    private static String pulsarJwtToken;
+    @MockBean
+    private BrokersService brokersService;
+
+    @MockBean
+    private PulsarAdminService pulsarAdminService;
+
+    @Mock
+    private Clusters clusters;
 
     @Test
-    public void clusterServiceImplTest() {
-        PowerMockito.mockStatic(HttpUtil.class);
-        Map<String, String> header = Maps.newHashMap();
-        if (StringUtils.isNotBlank(pulsarJwtToken)) {
-            header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
-        }
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters", header))
-                .thenReturn("[\"standalone\"]");
+    public void clusterServiceImplTest() throws PulsarAdminException {
+        Mockito.when(pulsarAdminService.clusters("http://localhost:8080")).thenReturn(clusters);
+        Mockito.when(pulsarAdminService.clusters("http://localhost:8080").getClusters()).thenReturn(Arrays.asList("standalone"));
+        ClusterData standaloneClusterData = new ClusterData("http://broker-1:8080", null, "pulsar://broker-1:6650", null);
+        Mockito.when(pulsarAdminService.clusters("http://localhost:8080").getCluster("standalone")).thenReturn(standaloneClusterData);
 
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters/standalone", header))
-                .thenReturn("{\n" +
-                        "\"serviceUrl\" : \"http://tengdeMBP:8080\",\n" +
-                        "\"brokerServiceUrl\" : \"pulsar://tengdeMBP:6650\"\n" +
-                        "}");
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters/standalone/failureDomains", header))
-                .thenReturn("{\"test\":{\"brokers\":[\"tengdeMBP:8080\"]}}");
+        Map<String, Object> brokerEntity = Maps.newHashMap();
+        brokerEntity.put("broker", "broker-1:8080");
+        brokerEntity.put("failureDomain", null);
+        List<Map<String, Object>> brokersArray = new ArrayList<>();
+        brokersArray.add(brokerEntity);
+        Map<String, Object> brokersMap = new HashMap<>();
+        brokersMap.put("isPage", false);
+        brokersMap.put("total", brokersArray.size());
+        brokersMap.put("data", brokersArray);
+        brokersMap.put("pageNum", 1);
+        brokersMap.put("pageSize", brokersArray.size());
+        Mockito.when(brokersService.getBrokersList(1, 1, "standalone", "http://localhost:8080")).thenReturn(brokersMap);
 
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/brokers/standalone", header))
-                .thenReturn("[\"tengdeMBP:8080\"]");
         Map<String, Object> result = clustersService.getClustersList(1, 1, "http://localhost:8080", null);
-        Assert.assertEquals(result.get("data").toString(),
-                "[{cluster=standalone, serviceUrlTls=null, brokers=1, serviceUrl=http://tengdeMBP:8080, " +
-                        "brokerServiceUrlTls=null, brokerServiceUrl=pulsar://tengdeMBP:6650}]");
-        Assert.assertEquals(result.get("total"), 1);
-        Assert.assertEquals(result.get("pageSize"), 1);
+        Assert.assertEquals("[{cluster=standalone, serviceUrlTls=null, brokers=1, serviceUrl=http://broker-1:8080, " +
+                        "brokerServiceUrlTls=null, brokerServiceUrl=pulsar://broker-1:6650}]", result.get("data").toString());
+        Assert.assertEquals(1, result.get("total"));
+        Assert.assertEquals(1, result.get("pageSize"));
     }
 
     @Test
-    public void getClusterByAnyBroker() {
-        PowerMockito.mockStatic(HttpUtil.class);
-        Map<String, String> header = Maps.newHashMap();
-        if (StringUtils.isNotBlank(pulsarJwtToken)) {
-            header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
-        }
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters", header))
-                .thenReturn("[\"standalone\"]");
+    public void getClusterByAnyBroker() throws PulsarAdminException  {
+        Mockito.when(pulsarAdminService.clusters("http://localhost:8080")).thenReturn(clusters);
+        Mockito.when(pulsarAdminService.clusters("http://localhost:8080").getClusters()).thenReturn(Arrays.asList("standalone"));
+
         List<String> clusterList = clustersService.getClusterByAnyBroker("http://localhost:8080");
-        Assert.assertEquals(clusterList.get(0), "standalone");
+        Assert.assertEquals("standalone", clusterList.get(0));
     }
 }
diff --git a/src/test/java/org/apache/pulsar/manager/service/EnvironmentCacheServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/EnvironmentCacheServiceImplTest.java
index 95bef54..c67bf0c 100644
--- a/src/test/java/org/apache/pulsar/manager/service/EnvironmentCacheServiceImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/EnvironmentCacheServiceImplTest.java
@@ -15,38 +15,34 @@ package org.apache.pulsar.manager.service;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyMap;
-import static org.mockito.Matchers.eq;
 
-import com.google.gson.Gson;
+import org.apache.pulsar.client.admin.Clusters;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.manager.PulsarManagerApplication;
 import org.apache.pulsar.manager.entity.EnvironmentEntity;
 import org.apache.pulsar.manager.entity.EnvironmentsRepository;
 import org.apache.pulsar.manager.profiles.HerdDBTestProfile;
-import org.apache.pulsar.manager.utils.HttpUtil;
+
+import java.util.Arrays;
 import java.util.NoSuchElementException;
+
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
 import org.springframework.test.context.ActiveProfiles;
 import org.springframework.test.context.junit4.SpringRunner;
 
 /**
  * Unit test {@link EnvironmentCacheService}.
  */
-@RunWith(PowerMockRunner.class)
-@PowerMockRunnerDelegate(SpringRunner.class)
-@PowerMockIgnore( {"javax.*", "sun.*", "com.sun.*", "org.xml.*", "org.w3c.*"})
-@PrepareForTest(HttpUtil.class)
+@RunWith(SpringRunner.class)
 @SpringBootTest(
     classes = {
         PulsarManagerApplication.class,
@@ -57,10 +53,22 @@ import org.springframework.test.context.junit4.SpringRunner;
 public class EnvironmentCacheServiceImplTest {
 
     @Autowired
-    private EnvironmentsRepository environmentsRepository;
+    private EnvironmentCacheService environmentCacheService;
 
     @Autowired
-    private EnvironmentCacheService environmentCache;
+    private EnvironmentsRepository environmentsRepository;
+
+    @MockBean
+    private PulsarAdminService pulsarAdminService;
+
+    @Mock
+    private Clusters emptyClusters;
+
+    @Mock
+    private Clusters cluster1Clusters;
+
+    @Mock
+    private Clusters cluster2Clusters;
 
     private EnvironmentEntity environment1;
     private EnvironmentEntity environment2;
@@ -73,7 +81,7 @@ public class EnvironmentCacheServiceImplTest {
     private ClusterData cluster2_1;
 
     @Before
-    public void setup() {
+    public void setup() throws PulsarAdminException {
         // setup 3 environments
         environment1 = new EnvironmentEntity();
         environment1.setBroker("http://cluster1_0:8080");
@@ -95,36 +103,17 @@ public class EnvironmentCacheServiceImplTest {
         cluster2_1 = new ClusterData();
         cluster2_1.setServiceUrl("http://cluster2_1:8080");
 
-        PowerMockito.mockStatic(HttpUtil.class);
-        // empty environment
-        PowerMockito.when(HttpUtil.doGet(
-            eq(emptyEnvironment.getBroker() + "/admin/v2/clusters"),
-            anyMap()
-        )).thenReturn("[]");
-
-        // environment 1
-        PowerMockito.when(HttpUtil.doGet(
-            eq(cluster1_0.getServiceUrl() + "/admin/v2/clusters"),
-            anyMap()
-        )).thenReturn("[\""+ cluster1_0_name + "\"]");
-        PowerMockito.when(HttpUtil.doGet(
-            eq(cluster1_0.getServiceUrl() + "/admin/v2/clusters/" + cluster1_0_name),
-            anyMap()
-        )).thenReturn(new Gson().toJson(cluster1_0));
-
-        // environment 2
-        PowerMockito.when(HttpUtil.doGet(
-            eq(cluster2_0.getServiceUrl() + "/admin/v2/clusters"),
-            anyMap()
-        )).thenReturn("[\""+ cluster2_0_name + "\", \"" + cluster2_1_name + "\"]");
-        PowerMockito.when(HttpUtil.doGet(
-            eq(cluster2_0.getServiceUrl() + "/admin/v2/clusters/" + cluster2_0_name),
-            anyMap()
-        )).thenReturn(new Gson().toJson(cluster2_0));
-        PowerMockito.when(HttpUtil.doGet(
-            eq(cluster2_0.getServiceUrl() + "/admin/v2/clusters/" + cluster2_1_name),
-            anyMap()
-        )).thenReturn(new Gson().toJson(cluster2_1));
+        Mockito.when(pulsarAdminService.clusters(emptyEnvironment.getBroker())).thenReturn(emptyClusters);
+        Mockito.when(emptyClusters.getClusters()).thenReturn(Arrays.asList());
+
+        Mockito.when(pulsarAdminService.clusters(cluster1_0.getServiceUrl())).thenReturn(cluster1Clusters);
+        Mockito.when(cluster1Clusters.getClusters()).thenReturn(Arrays.asList(cluster1_0_name));
+        Mockito.when(cluster1Clusters.getCluster(cluster1_0_name)).thenReturn(cluster1_0);
+
+        Mockito.when(pulsarAdminService.clusters(cluster2_0.getServiceUrl())).thenReturn(cluster2Clusters);
+        Mockito.when(cluster2Clusters.getClusters()).thenReturn(Arrays.asList(cluster2_0_name, cluster2_1_name));
+        Mockito.when(cluster2Clusters.getCluster(cluster2_0_name)).thenReturn(cluster2_0);
+        Mockito.when(cluster2Clusters.getCluster(cluster2_1_name)).thenReturn(cluster2_1);
     }
 
     @After
@@ -136,10 +125,10 @@ public class EnvironmentCacheServiceImplTest {
 
     @Test
     public void testEmptyEnvironments() {
-        environmentCache.reloadEnvironments();
+        environmentCacheService.reloadEnvironments();
 
         try {
-            environmentCache.getServiceUrl(environment1.getName(), null);
+            environmentCacheService.getServiceUrl(environment1.getName(), null);
             fail("Should fail to get service url if environments is empty");
         } catch (NoSuchElementException e) {
             // expected
@@ -147,11 +136,14 @@ public class EnvironmentCacheServiceImplTest {
     }
 
     @Test
-    public void testEmptyEnvironment() {
+    public void testEmptyEnvironment() throws PulsarAdminException {
         environmentsRepository.save(emptyEnvironment);
+        PulsarAdminException pulsarAdminException = new PulsarAdminException("Cluster does not exist");
+        Mockito.when(emptyClusters.getCluster(cluster1_0_name)).thenThrow(pulsarAdminException);
+        environmentCacheService.reloadEnvironments();
 
         try {
-            environmentCache.getServiceUrl(emptyEnvironment.getName(), cluster1_0_name);
+            environmentCacheService.getServiceUrl(emptyEnvironment.getName(), cluster1_0_name);
             fail("Should fail to get service url if environments is empty");
         } catch (RuntimeException e) {
             // expected
@@ -171,29 +163,29 @@ public class EnvironmentCacheServiceImplTest {
         // without cluster
 
         assertEquals(cluster1_0.getServiceUrl(),
-            environmentCache.getServiceUrl(environment1.getName(), null));
+            environmentCacheService.getServiceUrl(environment1.getName(), null));
         assertEquals(cluster2_0.getServiceUrl(),
-            environmentCache.getServiceUrl(environment2.getName(), null));
+            environmentCacheService.getServiceUrl(environment2.getName(), null));
 
         // with cluster
 
         assertEquals(cluster1_0.getServiceUrl(),
-            environmentCache.getServiceUrl(environment1.getName(), cluster1_0_name));
+            environmentCacheService.getServiceUrl(environment1.getName(), cluster1_0_name));
         assertEquals(cluster2_0.getServiceUrl(),
-            environmentCache.getServiceUrl(environment2.getName(), cluster2_0_name));
+            environmentCacheService.getServiceUrl(environment2.getName(), cluster2_0_name));
         assertEquals(cluster2_1.getServiceUrl(),
-            environmentCache.getServiceUrl(environment2.getName(), cluster2_1_name));
+            environmentCacheService.getServiceUrl(environment2.getName(), cluster2_1_name));
     }
 
     @Test
     public void testReloadEnvironmentsAddNewEnvironmentsAndRemoveOldEnvironments() {
         environmentsRepository.save(environment1);
+        environmentCacheService.reloadEnvironments();
 
-        environmentCache.reloadEnvironments();
         assertEquals(cluster1_0.getServiceUrl(),
-            environmentCache.getServiceUrl(environment1.getName(), null));
+            environmentCacheService.getServiceUrl(environment1.getName(), null));
         try {
-            environmentCache.getServiceUrl(environment2.getName(), null);
+            environmentCacheService.getServiceUrl(environment2.getName(), null);
             fail("Should fail to get service url if environments is empty");
         } catch (NoSuchElementException e) {
             // expected
@@ -201,12 +193,12 @@ public class EnvironmentCacheServiceImplTest {
 
         environmentsRepository.save(environment2);
         environmentsRepository.remove(environment1.getName());
-        environmentCache.reloadEnvironments();
+        environmentCacheService.reloadEnvironments();
 
         assertEquals(cluster2_0.getServiceUrl(),
-            environmentCache.getServiceUrl(environment2.getName(), null));
+            environmentCacheService.getServiceUrl(environment2.getName(), null));
         try {
-            environmentCache.getServiceUrl(environment1.getName(), null);
+            environmentCacheService.getServiceUrl(environment1.getName(), null);
             fail("Should fail to get service url if environments is empty");
         } catch (NoSuchElementException e) {
             // expected
@@ -214,29 +206,24 @@ public class EnvironmentCacheServiceImplTest {
     }
 
     @Test
-    public void testReloadEnvironmentsAddNewClusterAndRemoveOldCluster() {
+    public void testReloadEnvironmentsAddNewClusterAndRemoveOldCluster() throws PulsarAdminException {
         environmentsRepository.save(environment2);
-        environmentCache.reloadEnvironments();
+        environmentCacheService.reloadEnvironments();
         assertEquals(cluster2_0.getServiceUrl(),
-            environmentCache.getServiceUrl(environment2.getName(), cluster2_0_name));
+            environmentCacheService.getServiceUrl(environment2.getName(), cluster2_0_name));
         assertEquals(cluster2_1.getServiceUrl(),
-            environmentCache.getServiceUrl(environment2.getName(), cluster2_1_name));
-
-        PowerMockito.when(HttpUtil.doGet(
-            eq(cluster2_0.getServiceUrl() + "/admin/v2/clusters"),
-            anyMap()
-        )).thenReturn("[\""+ cluster2_0_name + "\"]");
-        PowerMockito.when(HttpUtil.doGet(
-            eq(cluster2_0.getServiceUrl() + "/admin/v2/clusters/" + cluster2_1_name),
-            anyMap()
-        )).thenReturn(null);
-
-        environmentCache.reloadEnvironments();
+            environmentCacheService.getServiceUrl(environment2.getName(), cluster2_1_name));
+
+        Mockito.when(cluster2Clusters.getClusters()).thenReturn(Arrays.asList(cluster2_0_name));
+        PulsarAdminException pulsarAdminException = new PulsarAdminException("Cluster does not exist");
+        Mockito.when(cluster2Clusters.getCluster(cluster2_1_name)).thenThrow(pulsarAdminException);
+
+        environmentCacheService.reloadEnvironments();
         assertEquals(cluster2_0.getServiceUrl(),
-            environmentCache.getServiceUrl(environment2.getName(), cluster2_0_name));
+            environmentCacheService.getServiceUrl(environment2.getName(), cluster2_0_name));
         try {
             assertEquals(cluster2_1.getServiceUrl(),
-                environmentCache.getServiceUrl(environment2.getName(), cluster2_1_name));
+                environmentCacheService.getServiceUrl(environment2.getName(), cluster2_1_name));
             fail("Should fail to get service url if cluster is not found");
         } catch (RuntimeException e) {
             // expected
diff --git a/src/test/java/org/apache/pulsar/manager/service/GithubLoginServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/GithubLoginServiceImplTest.java
index 4dadd54..2534b68 100644
--- a/src/test/java/org/apache/pulsar/manager/service/GithubLoginServiceImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/GithubLoginServiceImplTest.java
@@ -90,7 +90,7 @@ public class GithubLoginServiceImplTest {
                     "\"token_type\": \"bearer\"" +
                 "}");
         String withCodeResult = thirdPartyLoginService.getAuthToken(parameters);
-        Assert.assertEquals(withCodeResult, "e72e16c7e42f292c6912e7710c838347ae178b4a");
+        Assert.assertEquals("e72e16c7e42f292c6912e7710c838347ae178b4a", withCodeResult);
     }
 
     @Test
@@ -99,7 +99,7 @@ public class GithubLoginServiceImplTest {
         Map<String, String> authenticationMap = Maps.newHashMap();
         UserInfoEntity noTokenUserInfoEntity = thirdPartyLoginService.getUserInfo(authenticationMap);
 
-        Assert.assertEquals(noTokenUserInfoEntity, null);
+        Assert.assertEquals(null, noTokenUserInfoEntity);
 
         authenticationMap.put("access_token", "test-user-token");
         PowerMockito.mockStatic(HttpUtil.class);
@@ -119,11 +119,11 @@ public class GithubLoginServiceImplTest {
                         "\t\"bio\": \"this is description\"" +
                         "}");
         UserInfoEntity withTokenUserInfoEntity = thirdPartyLoginService.getUserInfo(authenticationMap);
-        Assert.assertEquals(withTokenUserInfoEntity.getEmail(), "test@apache.org");
-        Assert.assertEquals(withTokenUserInfoEntity.getName(), "test1");
-        Assert.assertEquals(withTokenUserInfoEntity.getCompany(), "bj");
-        Assert.assertEquals(withTokenUserInfoEntity.getDescription(), "this is description");
-        Assert.assertEquals(withTokenUserInfoEntity.getLocation(), "nw");
-        Assert.assertEquals(withTokenUserInfoEntity.getAccessToken(), "test-user-token");
+        Assert.assertEquals("test@apache.org", withTokenUserInfoEntity.getEmail());
+        Assert.assertEquals("test1", withTokenUserInfoEntity.getName());
+        Assert.assertEquals("bj", withTokenUserInfoEntity.getCompany());
+        Assert.assertEquals("this is description", withTokenUserInfoEntity.getDescription());
+        Assert.assertEquals("nw", withTokenUserInfoEntity.getLocation());
+        Assert.assertEquals("test-user-token", withTokenUserInfoEntity.getAccessToken());
     }
 }
diff --git a/src/test/java/org/apache/pulsar/manager/service/NamespacesServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/NamespacesServiceImplTest.java
index c5684bc..bcd1f73 100644
--- a/src/test/java/org/apache/pulsar/manager/service/NamespacesServiceImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/NamespacesServiceImplTest.java
@@ -14,30 +14,28 @@
 package org.apache.pulsar.manager.service;
 
 import com.google.common.collect.Maps;
+import org.apache.pulsar.client.admin.BrokerStats;
+import org.apache.pulsar.client.admin.Namespaces;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.manager.PulsarManagerApplication;
+import org.apache.pulsar.manager.entity.TopicStatsEntity;
+import org.apache.pulsar.manager.entity.TopicsStatsRepository;
 import org.apache.pulsar.manager.profiles.HerdDBTestProfile;
-import org.apache.pulsar.manager.utils.HttpUtil;
-import org.apache.commons.lang3.StringUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
 import org.springframework.test.context.ActiveProfiles;
 import org.springframework.test.context.junit4.SpringRunner;
 
+import java.util.Arrays;
 import java.util.Map;
 
-@RunWith(PowerMockRunner.class)
-@PowerMockRunnerDelegate(SpringRunner.class)
-@PowerMockIgnore( {"javax.*", "sun.*", "com.sun.*", "org.xml.*", "org.w3c.*"})
-@PrepareForTest(HttpUtil.class)
+@RunWith(SpringRunner.class)
 @SpringBootTest(
     classes = {
         PulsarManagerApplication.class,
@@ -50,68 +48,59 @@ public class NamespacesServiceImplTest {
     @Autowired
     private NamespacesService namespacesService;
 
-    @Autowired
-    private BrokerStatsService brokerStatsService;
+    @MockBean
+    private PulsarAdminService pulsarAdminService;
+
+    @MockBean
+    private TopicsService topicsService;
 
-    @Value("${backend.jwt.token}")
-    private static String pulsarJwtToken;
+    @Mock
+    private Namespaces namespaces;
+
+    @Autowired
+    private TopicsStatsRepository topicsStatsRepository;
 
     @Test
-    public void namespaceServiceImplTest() {
-        PowerMockito.mockStatic(HttpUtil.class);
-        Map<String, String> header = Maps.newHashMap();
-        if (StringUtils.isNotBlank(pulsarJwtToken)) {
-            header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
-        }
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/namespaces/public", header))
-                .thenReturn("[\"public/default\"]");
+    public void namespaceServiceImplTest() throws PulsarAdminException {
+        Mockito.when(pulsarAdminService.namespaces("http://localhost:8080")).thenReturn(namespaces);
+        Mockito.when(namespaces.getNamespaces("public")).thenReturn(Arrays.asList("public/default"));
+        Map<String, Object> topics = Maps.newHashMap();
+        topics.put("total", 1);
+        Mockito.when(topicsService.getTopicsList(0, 0, "public", "default", "http://localhost:8080"))
+                .thenReturn(topics);
 
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/persistent/public/default", header))
-                .thenReturn("[\"persistent://public/default/test789\"]");
-        PowerMockito.when(HttpUtil.doGet(
-                "http://localhost:8080/admin/v2/persistent/public/default/partitioned", header))
-                .thenReturn("[]");
         Map<String, Object> result = namespacesService.getNamespaceList(1, 1, "public", "http://localhost:8080");
-        Assert.assertEquals(result.get("total"), 1);
+        Assert.assertEquals(1, result.get("total"));
         Assert.assertFalse((Boolean) result.get("isPage"));
-        Assert.assertEquals(result.get("data").toString(), "[{topics=1, namespace=default}]");
+        Assert.assertEquals("[{topics=1, namespace=default}]", result.get("data").toString());
     }
 
     @Test
     public void getNamespaceStatsTest() {
-        PowerMockito.mockStatic(HttpUtil.class);
-        Map<String, String> header = Maps.newHashMap();
-        if (StringUtils.isNotBlank(pulsarJwtToken)) {
-            header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
-        }
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters", header))
-                .thenReturn("[\"standalone\"]");
-
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/brokers/standalone", header))
-                .thenReturn("[\"localhost:8080\"]");
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/broker-stats/topics", header))
-                .thenReturn(BrokerStatsServiceImplTest.testData);
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters/standalone/failureDomains", header))
-                .thenReturn("{}");
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters/standalone", header))
-                .thenReturn("{\n" +
-                        "\"serviceUrl\" : \"http://tengdeMBP:8080\",\n" +
-                        "\"brokerServiceUrl\" : \"pulsar://tengdeMBP:6650\"\n" +
-                        "}");
         String environment = "staging";
-        String cluster = "standalone";
-        String serviceUrl = "http://localhost:8080";
-        brokerStatsService.collectStatsToDB(
-                System.currentTimeMillis() / 1000,
-                environment,
-                cluster,
-                serviceUrl
-        );
+        String tenant = "public";
+        String namespace = "functions";
+
+        TopicStatsEntity topicStatsEntity = new TopicStatsEntity();
+        topicStatsEntity.setEnvironment(environment);
+        topicStatsEntity.setCluster("standalone");
+        topicStatsEntity.setBroker("localhost:8080");
+        topicStatsEntity.setPersistent("persistent");
+        topicStatsEntity.setTenant(tenant);
+        topicStatsEntity.setNamespace(namespace);
+        topicStatsEntity.setTopic("metadata");
+        topicStatsEntity.setBundle("0x40000000_0x80000000");
+        topicStatsEntity.setMsgRateIn(0.0);
+        topicStatsEntity.setSubscriptionCount(1);
+        topicStatsEntity.setProducerCount(1);
+        topicStatsEntity.setTime_stamp((System.currentTimeMillis() / 1000L));
+        topicsStatsRepository.save(topicStatsEntity);
+
         Map<String, Object> namespaceStats = namespacesService.getNamespaceStats(
-                environment, "public", "functions");
-        Assert.assertEquals(namespaceStats.get("outMsg"), 0.0);
-        Assert.assertEquals(namespaceStats.get("inMsg"), 0.0);
-        Assert.assertEquals(namespaceStats.get("msgThroughputIn"), 0.0);
-        Assert.assertEquals(namespaceStats.get("msgThroughputOut"), 0.0);
+                tenant, namespace, environment);
+        Assert.assertEquals(0.0, namespaceStats.get("outMsg"));
+        Assert.assertEquals(0.0, namespaceStats.get("inMsg"));
+        Assert.assertEquals(0.0, namespaceStats.get("msgThroughputIn"));
+        Assert.assertEquals(0.0, namespaceStats.get("msgThroughputOut"));
     }
 }
diff --git a/src/test/java/org/apache/pulsar/manager/service/PulsarAdminServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/PulsarAdminServiceImplTest.java
new file mode 100644
index 0000000..a819e86
--- /dev/null
+++ b/src/test/java/org/apache/pulsar/manager/service/PulsarAdminServiceImplTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.manager.service;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.pulsar.manager.PulsarManagerApplication;
+import org.apache.pulsar.manager.profiles.HerdDBTestProfile;
+import org.apache.pulsar.manager.service.impl.PulsarAdminServiceImpl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.test.util.ReflectionTestUtils;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(
+    classes = {
+        PulsarManagerApplication.class,
+        HerdDBTestProfile.class
+    }
+)
+@ActiveProfiles("test")
+public class PulsarAdminServiceImplTest {
+
+    @Autowired
+    private PulsarAdminService pulsarAdminService;
+
+    @After
+    public void teardown() {
+        ((PulsarAdminServiceImpl) pulsarAdminService).destroy();
+        ReflectionTestUtils.setField(pulsarAdminService, "pulsarAdmins", new HashMap<>());
+    }
+
+    @Test
+    public void getPulsarAdminTest() {
+        String serviceUrl = pulsarAdminService.getPulsarAdmin("http://localhost:8080").getServiceUrl();
+        Assert.assertEquals("http://localhost:8080", serviceUrl);
+    }
+
+    @Test
+    public void getAuthHeaderTest() {
+        ReflectionTestUtils.setField(pulsarAdminService, "authPlugin", "org.apache.pulsar.client.impl.auth.AuthenticationToken");
+        ReflectionTestUtils.setField(pulsarAdminService, "authParams", "test");
+        Map<String, String> authHeader = pulsarAdminService.getAuthHeader("http://localhost:8080");
+        Assert.assertEquals("Bearer test", authHeader.get("Authorization"));
+    }
+}
diff --git a/src/test/java/org/apache/pulsar/manager/service/PulsarEventImplTest.java b/src/test/java/org/apache/pulsar/manager/service/PulsarEventImplTest.java
index df518ac..711a618 100644
--- a/src/test/java/org/apache/pulsar/manager/service/PulsarEventImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/PulsarEventImplTest.java
@@ -167,17 +167,17 @@ public class PulsarEventImplTest {
     public void validateTenantPermission() {
         Map<String, String> result;
         result = pulsarEvent.validateTenantPermission("/admin/v2/tenants/superTenant", "super-access-token");
-        Assert.assertEquals(result.get("message"), "Validate tenant success");
+        Assert.assertEquals("Validate tenant success", result.get("message"));
         result = pulsarEvent.validateTenantPermission("/admin/v2/tenants/adminTenant", "super-access-token");
-        Assert.assertEquals(result.get("error"), "This user no include this tenant");
+        Assert.assertEquals("This user no include this tenant", result.get("error"));
 
         result = pulsarEvent.validateTenantPermission(
                 "/pulsar-manager/admin/v2/schemas/adminTenant/default/test-topic", "super-access-token");
-        Assert.assertEquals(result.get("message"), "This resource no need validate");
+        Assert.assertEquals("This resource no need validate", result.get("message"));
 
         result = pulsarEvent.validateTenantPermission(
                 "/pulsar-manager/admin/v2/namespaces/adminTenant/default", "admin-access-token");
-        Assert.assertEquals(result.get("message"), "Validate tenant success");
+        Assert.assertEquals("Validate tenant success", result.get("message"));
     }
 
     @Test
diff --git a/src/test/java/org/apache/pulsar/manager/service/RoleBindingServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/RoleBindingServiceImplTest.java
index 08e3ac6..754a608 100644
--- a/src/test/java/org/apache/pulsar/manager/service/RoleBindingServiceImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/RoleBindingServiceImplTest.java
@@ -80,7 +80,7 @@ public class RoleBindingServiceImplTest {
 
         Map<String, String> validateErrorUser = roleBindingService.validateCurrentUser(
                 "test-error-access-token", roleBindingEntity);
-        Assert.assertEquals(validateErrorUser.get("error"), "User no exist.");
+        Assert.assertEquals("User no exist.", validateErrorUser.get("error"));
 
         TenantEntity tenantEntity = new TenantEntity();
         tenantEntity.setTenant("test-tenant");
@@ -105,12 +105,12 @@ public class RoleBindingServiceImplTest {
         roleBindingEntity.setRoleId(10);
         Map<String, String> validateIllegalUser = roleBindingService.validateCurrentUser(
                 "test-access-token", roleBindingEntity);
-        Assert.assertEquals(validateIllegalUser.get("error"), "This operation is illegal for this user");
+        Assert.assertEquals("This operation is illegal for this user", validateIllegalUser.get("error"));
 
         roleBindingEntity.setRoleId(roleId);
         Map<String, String> validateSuccessUser = roleBindingService.validateCurrentUser(
                 "test-access-token", roleBindingEntity);
-        Assert.assertEquals(validateSuccessUser.get("message"), "Validate current user success");
+        Assert.assertEquals("Validate current user success", validateSuccessUser.get("message"));
 
         roleBindingRepository.delete(roleId, userId);
         rolesRepository.delete("test-role", "test-tenant");
@@ -150,12 +150,12 @@ public class RoleBindingServiceImplTest {
         Map<String, Object> validateErrorUser = roleBindingService.validateCreateRoleBinding(
                 "test-error-access-token", "test-error-tenant",
                 "test-role-name", "test-user-name");
-        Assert.assertEquals(validateErrorUser.get("error"), "The user is not exist");
+        Assert.assertEquals("The user is not exist", validateErrorUser.get("error"));
 
         Map<String, Object> validateErrorRoleName = roleBindingService.validateCreateRoleBinding(
                 "test-access-token", "test-tenant",
                 "test-error-role", "test-user");
-        Assert.assertEquals(validateErrorRoleName.get("error"), "This role is no exist");
+        Assert.assertEquals("This role is no exist", validateErrorRoleName.get("error"));
 
         RoleInfoEntity testRoleInfoEntity = new RoleInfoEntity();
         testRoleInfoEntity.setRoleName("test-no-binding-role");
@@ -185,12 +185,12 @@ public class RoleBindingServiceImplTest {
         Map<String, Object> validateBindingRole = roleBindingService.validateCreateRoleBinding(
                 "test-access-token", "test-tenant",
                 "test-role", "test-user");
-        Assert.assertEquals(validateBindingRole.get("error"), "Role binding already exist");
+        Assert.assertEquals("Role binding already exist", validateBindingRole.get("error"));
 
         Map<String, Object> validateCreateRoleBinding = roleBindingService.validateCreateRoleBinding(
                 "test-access-token", "test-tenant",
                 "test-no-binding-role", "test-user");
-        Assert.assertEquals(validateCreateRoleBinding.get("message"), "Validate create role success");
+        Assert.assertEquals("Validate create role success", validateCreateRoleBinding.get("message"));
 
         roleBindingRepository.delete(roleId, userId);
         rolesRepository.delete("test-role", "test-tenant");
@@ -233,12 +233,12 @@ public class RoleBindingServiceImplTest {
         List<Map<String, Object>> roleBindingMap = roleBindingService.getRoleBindingList(
                 "test-access-token-binding", "test-tenant-binding");
         for (Map<String, Object> stringObjectMap : roleBindingMap) {
-            Assert.assertEquals(stringObjectMap.get("name"), "test-role-binding");
-            Assert.assertEquals(stringObjectMap.get("userId"), userId);
-            Assert.assertEquals(stringObjectMap.get("userName"), "test-user-binding");
-            Assert.assertEquals(stringObjectMap.get("roleId"), roleId);
-            Assert.assertEquals(stringObjectMap.get("roleName"), "test-role-binding");
-            Assert.assertEquals(stringObjectMap.get("description"), "test-role-binding-description");
+            Assert.assertEquals("test-role-binding", stringObjectMap.get("name"));
+            Assert.assertEquals(userId, stringObjectMap.get("userId"));
+            Assert.assertEquals("test-user-binding", stringObjectMap.get("userName"));
+            Assert.assertEquals(roleId, stringObjectMap.get("roleId"));
+            Assert.assertEquals("test-role-binding", stringObjectMap.get("roleName"));
+            Assert.assertEquals("test-role-binding-description", stringObjectMap.get("description"));
         }
 
         roleBindingRepository.delete(roleId, userId);
diff --git a/src/test/java/org/apache/pulsar/manager/service/RolesServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/RolesServiceImplTest.java
index dc6b8dc..604ec3d 100644
--- a/src/test/java/org/apache/pulsar/manager/service/RolesServiceImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/RolesServiceImplTest.java
@@ -78,33 +78,33 @@ public class RolesServiceImplTest {
         RoleInfoEntity roleInfoEntity = new RoleInfoEntity();
 
         Map<String, String> roleNameIsEmpty = rolesService.validateRoleInfoEntity(roleInfoEntity);
-        Assert.assertEquals(roleNameIsEmpty.get("error"), "Role name cannot be empty");
+        Assert.assertEquals("Role name cannot be empty", roleNameIsEmpty.get("error"));
 
         roleInfoEntity.setRoleName("------");
 
         Map<String, String> resourceNameIsEmpty = rolesService.validateRoleInfoEntity(roleInfoEntity);
-        Assert.assertEquals(resourceNameIsEmpty.get("error"), "Resource name cannot be empty");
+        Assert.assertEquals("Resource name cannot be empty", resourceNameIsEmpty.get("error"));
 
         roleInfoEntity.setResourceName("===========");
 
         Map<String, String> roleNameIsIllegal = rolesService.validateRoleInfoEntity(roleInfoEntity);
-        Assert.assertEquals(roleNameIsIllegal.get("error"), "Role name is illegal");
+        Assert.assertEquals("Role name is illegal", roleNameIsIllegal.get("error"));
 
         roleInfoEntity.setRoleName("testRoleName");
 
         Map<String, String> resourceNameIsIllegal = rolesService.validateRoleInfoEntity(roleInfoEntity);
-        Assert.assertEquals(resourceNameIsIllegal.get("error"), "Resource Name is illegal");
+        Assert.assertEquals("Resource Name is illegal", resourceNameIsIllegal.get("error"));
 
         roleInfoEntity.setResourceName("testResourceName");
 
         roleInfoEntity.setResourceType("test-resourceType");
         Map<String, String> resourceTypeIsIllegal = rolesService.validateRoleInfoEntity(roleInfoEntity);
-        Assert.assertEquals(resourceTypeIsIllegal.get("error"), "Resource type is illegal");
+        Assert.assertEquals("Resource type is illegal", resourceTypeIsIllegal.get("error"));
 
         roleInfoEntity.setResourceId(10);
         roleInfoEntity.setResourceType(ResourceType.TENANTS.name());
         Map<String, String> resourceNoExist = rolesService.validateRoleInfoEntity(roleInfoEntity);
-        Assert.assertEquals(resourceNoExist.get("error"), "Tenant no exist, please check");
+        Assert.assertEquals("Tenant no exist, please check", resourceNoExist.get("error"));
 
         TenantEntity tenantEntity = new TenantEntity();
         tenantEntity.setTenant("test-tenant");
@@ -116,7 +116,7 @@ public class RolesServiceImplTest {
         roleInfoEntity.setResourceId(20);
         roleInfoEntity.setResourceType(ResourceType.NAMESPACES.name());
         Map<String, String> namespaceNoExist = rolesService.validateRoleInfoEntity(roleInfoEntity);
-        Assert.assertEquals(namespaceNoExist.get("error"), "Namespace no exist, please check");
+        Assert.assertEquals("Namespace no exist, please check", namespaceNoExist.get("error"));
 
         NamespaceEntity namespaceEntity = new NamespaceEntity();
         namespaceEntity.setTenant("test-tenant");
@@ -131,18 +131,18 @@ public class RolesServiceImplTest {
         roleInfoEntity.setResourceType(ResourceType.TOPICS.name());
         roleInfoEntity.setResourceVerbs(ResourceVerbs.ADMIN.name());
         Map<String, String> stringMapTopics = rolesService.validateRoleInfoEntity(roleInfoEntity);
-        Assert.assertEquals(stringMapTopics.get("error"),
-                "admin should not be excluded for resources of type topic");
+        Assert.assertEquals("admin should not be excluded for resources of type topic",
+                stringMapTopics.get("error"));
 
         roleInfoEntity.setResourceType(ResourceType.TENANTS.name());
         roleInfoEntity.setResourceVerbs(ResourceVerbs.PRODUCE.name() + "," +  ResourceVerbs.CONSUME.name());
         Map<String, String> stringMapTenants = rolesService.validateRoleInfoEntity(roleInfoEntity);
-        Assert.assertEquals(stringMapTenants.get("error"), "Type TENANTS include not supported verbs");
+        Assert.assertEquals("Type TENANTS include not supported verbs", stringMapTenants.get("error"));
 
         roleInfoEntity.setResourceType(ResourceType.ALL.name());
         roleInfoEntity.setResourceVerbs(ResourceVerbs.ADMIN.name());
         Map<String, String> stringMapAll = rolesService.validateRoleInfoEntity(roleInfoEntity);
-        Assert.assertEquals(stringMapAll.get("message"), "Role validate success");
+        Assert.assertEquals("Role validate success", stringMapAll.get("message"));
     }
 
     @Test
diff --git a/src/test/java/org/apache/pulsar/manager/service/TenantsServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/TenantsServiceImplTest.java
index ab81a7a..306b9b4 100644
--- a/src/test/java/org/apache/pulsar/manager/service/TenantsServiceImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/TenantsServiceImplTest.java
@@ -15,35 +15,31 @@ package org.apache.pulsar.manager.service;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.gson.Gson;
+
+import org.apache.pulsar.client.admin.Namespaces;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Tenants;
+import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.manager.PulsarManagerApplication;
 import org.apache.pulsar.manager.profiles.HerdDBTestProfile;
-import org.apache.pulsar.manager.utils.HttpUtil;
-import org.apache.commons.lang3.StringUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
 import org.springframework.test.context.ActiveProfiles;
 import org.springframework.test.context.junit4.SpringRunner;
 
-import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
 
-@RunWith(PowerMockRunner.class)
-@PowerMockRunnerDelegate(SpringRunner.class)
-@PowerMockIgnore( {"javax.*", "sun.*", "com.sun.*", "org.xml.*", "org.w3c.*"})
-@PrepareForTest(HttpUtil.class)
+@RunWith(SpringRunner.class)
 @SpringBootTest(
     classes = {
         PulsarManagerApplication.class,
@@ -56,25 +52,28 @@ public class TenantsServiceImplTest {
     @Autowired
     private TenantsService tenantsService;
 
-    @Value("${backend.jwt.token}")
-    private static String pulsarJwtToken;
+    @MockBean
+    private PulsarAdminService pulsarAdminService;
+
+    @Mock
+    private Tenants tenants;
+
+    @Mock
+    private Namespaces namespaces;
 
     @Test
-    public void tenantsServiceImplTest() {
-        PowerMockito.mockStatic(HttpUtil.class);
-        Map<String, String> header = Maps.newHashMap();
-        if (StringUtils.isNotBlank(pulsarJwtToken)) {
-            header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
-        }
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/tenants", header)).thenReturn("[\"public\"]");
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/tenants/public", header))
-                .thenReturn("{\"adminRoles\": [\"admin\"], \"allowedClusters\": [\"standalone\"]}");
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/namespaces/public", header))
-                .thenReturn("[\"public/default\"]");
+    public void tenantsServiceImplTest() throws PulsarAdminException {
+        Mockito.when(pulsarAdminService.tenants("http://localhost:8080")).thenReturn(tenants);
+        Mockito.when(tenants.getTenants()).thenReturn(Arrays.asList("public"));
+        TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("admin"), Sets.newHashSet("standalone"));
+        Mockito.when(tenants.getTenantInfo("public")).thenReturn(tenantInfo);
+        Mockito.when(pulsarAdminService.namespaces("http://localhost:8080")).thenReturn(namespaces);
+        Mockito.when(namespaces.getNamespaces("public")).thenReturn(Arrays.asList("public/default"));
+
         Map<String, Object> objectMap = tenantsService.getTenantsList(1, 2, "http://localhost:8080");
-        Assert.assertEquals(objectMap.get("total"), 1);
-        Assert.assertEquals(objectMap.get("pageSize"), 1);
-        Assert.assertEquals(objectMap.get("pageNum"), 1);
+        Assert.assertEquals(1, objectMap.get("total"));
+        Assert.assertEquals(1, objectMap.get("pageSize"));
+        Assert.assertEquals(1, objectMap.get("pageNum"));
         List<Map<String, Object>> tenantsArray = new ArrayList<>();
         Map<String, Object> tenantMap = Maps.newHashMap();
         tenantMap.put("adminRoles", "admin");
@@ -82,29 +81,19 @@ public class TenantsServiceImplTest {
         tenantMap.put("tenant", "public");
         tenantMap.put("namespaces", 1);
         tenantsArray.add(tenantMap);
-        Assert.assertEquals(objectMap.get("data"), tenantsArray);
+        Assert.assertEquals(tenantsArray, objectMap.get("data"));
     }
 
     @Test
-    public void createTenantTest() throws UnsupportedEncodingException {
-        PowerMockito.mockStatic(HttpUtil.class);
-        Map<String, String> header = Maps.newHashMap();
-        header.put("Content-Type", "application/json");
-        if (StringUtils.isNotBlank(pulsarJwtToken)) {
-            header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
-        }
-        Map<String, Object> body = Maps.newHashMap();
+    public void createTenantTest() throws PulsarAdminException {
         String tenant = "test";
         String role = "test-role";
         String cluster = "test-cluster";
-        body.put("adminRoles", Sets.newHashSet(role));
-        // Get cluster from standalone, to do
-        body.put("allowedClusters", Sets.newHashSet(cluster));
-        Gson gson = new Gson();
-        PowerMockito.when(HttpUtil.doPut(
-                "http://localhost:8080/admin/v2/tenants/" + tenant, header, gson.toJson(body))).thenReturn("");
+        Mockito.when(pulsarAdminService.tenants("http://localhost:8080")).thenReturn(tenants);
+        TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet(role), Sets.newHashSet(cluster));
+        Mockito.doNothing().when(tenants).createTenant(tenant, tenantInfo);
         Map<String, String> createTenantResult =  tenantsService.createTenant(
                 tenant, role, cluster, "http://localhost:8080");
-        Assert.assertEquals(createTenantResult.get("message"), "Create tenant success");
+        Assert.assertEquals("Create tenant success", createTenantResult.get("message"));
     }
 }
diff --git a/src/test/java/org/apache/pulsar/manager/service/TopicsServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/TopicsServiceImplTest.java
index 2036b47..e998d07 100644
--- a/src/test/java/org/apache/pulsar/manager/service/TopicsServiceImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/TopicsServiceImplTest.java
@@ -14,32 +14,33 @@
 package org.apache.pulsar.manager.service;
 
 import com.google.common.collect.Maps;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Topics;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.manager.PulsarManagerApplication;
+import org.apache.pulsar.manager.entity.TopicStatsEntity;
+import org.apache.pulsar.manager.entity.TopicsStatsRepository;
 import org.apache.pulsar.manager.profiles.HerdDBTestProfile;
-import org.apache.pulsar.manager.utils.HttpUtil;
-import org.apache.commons.lang3.StringUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
 import org.springframework.test.context.ActiveProfiles;
 import org.springframework.test.context.junit4.SpringRunner;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
-@RunWith(PowerMockRunner.class)
-@PowerMockRunnerDelegate(SpringRunner.class)
-@PowerMockIgnore( {"javax.*", "sun.*", "com.sun.*", "org.xml.*", "org.w3c.*"})
-@PrepareForTest(HttpUtil.class)
+@RunWith(SpringRunner.class)
 @SpringBootTest(
     classes = {
         PulsarManagerApplication.class,
@@ -49,92 +50,103 @@ import java.util.Map;
 @ActiveProfiles("test")
 public class TopicsServiceImplTest {
 
+    @MockBean
+    private PulsarAdminService pulsarAdminService;
+
+    @Mock
+    private Topics topics;
+
     @Autowired
     private TopicsService topicsService;
 
     @Autowired
-    private BrokerStatsService brokerStatsService;
-
-    @Value("${backend.jwt.token}")
-    private static String pulsarJwtToken;
-
-    private final String topics = "[" +
-            "\"persistent://public/default/test789\"," +
-            "\"persistent://public/default/test900-partition-0\"," +
-            "\"persistent://public/default/test900-partition-1\"," +
-            "\"persistent://public/default/test900-partition-2\"]";
-
-    private final String partitionedTopics = "[\"persistent://public/default/test900\"]";
+    private TopicsStatsRepository topicsStatsRepository;
 
     @Test
-    public void topicsServiceImplTest() {
-        PowerMockito.mockStatic(HttpUtil.class);
-        Map<String, String> header = Maps.newHashMap();
-        if (StringUtils.isNotBlank(pulsarJwtToken)) {
-            header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
-        }
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/persistent/public/default", header))
-                .thenReturn(topics);
-        PowerMockito.when(HttpUtil.doGet(
-                "http://localhost:8080/admin/v2/persistent/public/default/partitioned", header))
-                .thenReturn(partitionedTopics);
-        PowerMockito.when(HttpUtil.doGet(
-                "http://localhost:8080/admin/v2/persistent/public/default/test900/partitions", header))
-                .thenReturn("{\"partitions\":3}");
+    public void topicsServiceImplTest() throws PulsarAdminException {
+        Mockito.when(pulsarAdminService.topics("http://localhost:8080")).thenReturn(topics);
+        Mockito.when(topics.getList("public/default")).thenReturn(
+                Arrays.asList(
+                        "persistent://public/default/test789",
+                        "persistent://public/default/test900-partition-0",
+                        "persistent://public/default/test900-partition-1",
+                        "persistent://public/default/test900-partition-2"
+                )
+        );
+        Mockito.when(topics.getPartitionedTopicList("public/default")).thenReturn(
+                Arrays.asList(
+                        "persistent://public/default/test900"
+                )
+        );
+        Mockito.when(topics.getPartitionedTopicMetadata("persistent://public/default/test900")).thenReturn(
+                new PartitionedTopicMetadata(3)
+        );
         Map<String, Object> topicsMap = topicsService.getTopicsList(
                 1, 1, "public", "default", "http://localhost:8080");
-        Assert.assertEquals(topicsMap.get("total"), 2);
+        Assert.assertEquals(2, topicsMap.get("total"));
         Assert.assertFalse((Boolean) topicsMap.get("isPage"));
-        Assert.assertEquals(topicsMap.get("topics").toString(),
-                "[{partitions=0, topic=test789, persistent=persistent}, {partitions=3, topic=test900, persistent=persistent}]");
+        Assert.assertEquals("[{partitions=0, topic=test789, persistent=persistent}, {partitions=3, topic=test900, persistent=persistent}]", topicsMap.get("topics").toString());
     }
 
     @Test
-    public void getTopicsStatsImplTest() {
-        PowerMockito.mockStatic(HttpUtil.class);
-        Map<String, String> header = Maps.newHashMap();
-        if (StringUtils.isNotBlank(pulsarJwtToken)) {
-            header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
-        }
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters", header))
-                .thenReturn("[\"standalone\"]");
-
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/brokers/standalone", header))
-                .thenReturn("[\"localhost:8080\"]");
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/broker-stats/topics", header))
-                .thenReturn(BrokerStatsServiceImplTest.testData);
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters/standalone/failureDomains", header))
-                .thenReturn("{}");
-        PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters/standalone", header))
-                .thenReturn("{\n" +
-                        "\"serviceUrl\" : \"http://tengdeMBP:8080\",\n" +
-                        "\"brokerServiceUrl\" : \"pulsar://tengdeMBP:6650\"\n" +
-                        "}");
+    public void getTopicsStatsImplTest() throws Exception {
         String environment = "staging";
-        String cluster = "standalone";
-        String serviceUrl = "http://localhost:8080";
-        brokerStatsService.collectStatsToDB(
-            System.currentTimeMillis() / 1000,
-            environment,
-            cluster,
-            serviceUrl
-        );
+        String tenant = "public";
+        String namespace = "functions";
+        String topic = "metadata";
+        String policy = "persistent";
+
+        TopicStatsEntity topicStatsEntity = new TopicStatsEntity();
+        topicStatsEntity.setEnvironment(environment);
+        topicStatsEntity.setCluster("standalone");
+        topicStatsEntity.setBroker("localhost:8080");
+        topicStatsEntity.setPersistent(policy);
+        topicStatsEntity.setTenant(tenant);
+        topicStatsEntity.setNamespace(namespace);
+        topicStatsEntity.setTopic(topic);
+        topicStatsEntity.setBundle("0x40000000_0x80000000");
+        topicStatsEntity.setMsgRateIn(0.0);
+        topicStatsEntity.setSubscriptionCount(1);
+        topicStatsEntity.setProducerCount(1);
+        topicStatsEntity.setTime_stamp((System.currentTimeMillis() / 1000L));
+        topicsStatsRepository.save(topicStatsEntity);
 
         List<Map<String, String>> topics = new ArrayList<>();
-        Map<String, String> topic = Maps.newHashMap();
-        topic.put("topic", "metadata");
-        topic.put("partitions", "0");
-        topics.add(topic);
+        Map<String, String> topicMap = Maps.newHashMap();
+        topicMap.put("topic", topic);
+        topicMap.put("partitions", "0");
+        topics.add(topicMap);
 
         List<Map<String, Object>> topicsList =  topicsService.getTopicsStatsList(
-                environment, "public", "functions", "persistent", topics);
+                environment, tenant, namespace, policy, topics);
         topicsList.forEach((t) -> {
-            Assert.assertEquals(t.get("partitions"), 0);
-            Assert.assertEquals(t.get("subscriptions"), 1);
-            Assert.assertEquals(t.get("inMsg"), 0.0);
-            Assert.assertEquals(t.get("producers"), 1);
-            Assert.assertEquals(t.get("persistent"), "persistent");
-            Assert.assertEquals(t.get("topic"), "metadata");
+            Assert.assertEquals(0, t.get("partitions"));
+            Assert.assertEquals(1, t.get("subscriptions"));
+            Assert.assertEquals(0.0, t.get("inMsg"));
+            Assert.assertEquals(1, t.get("producers"));
+            Assert.assertEquals(policy, t.get("persistent"));
+            Assert.assertEquals(topic, t.get("topic"));
+        });
+    }
+
+    @Test
+    public void peekMessagesTest() throws PulsarAdminException {
+        Mockito.when(pulsarAdminService.topics("http://localhost:8080")).thenReturn(topics);
+        List<Message<byte[]>> messages = new ArrayList<>();
+        messages.add(new MessageImpl<byte[]>("persistent://public/default/test", "1:1", Maps.newTreeMap(), "test".getBytes(), Schema.BYTES));
+        Mockito.when(topics.peekMessages("persistent://public/default/test", "sub-1", 1)).thenReturn(messages);
+
+        List<Map<String, Object>> result = topicsService.peekMessages(
+                "persistent", "public",
+                "default", "test",
+                "sub-1", 1,
+                "http://localhost:8080");
+        Assert.assertEquals(1, result.size());
+        result.forEach((message) -> {
+            Assert.assertEquals(1L, message.get("ledgerId"));
+            Assert.assertEquals(1L, message.get("entryId"));
+            Assert.assertEquals(false, message.get("batch"));
+            Assert.assertEquals(new String("test".getBytes()), new String((byte[]) message.get("data")));
         });
     }
 }
diff --git a/src/test/java/org/apache/pulsar/manager/service/UsersServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/UsersServiceImplTest.java
index 353c79c..460c4ba 100644
--- a/src/test/java/org/apache/pulsar/manager/service/UsersServiceImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/UsersServiceImplTest.java
@@ -49,31 +49,30 @@ public class UsersServiceImplTest {
         UserInfoEntity userInfoEntity = new UserInfoEntity();
         userInfoEntity.setName(" ");
         Map<String, String> validateNameEmpty = usersService.validateUserInfo(userInfoEntity);
-        Assert.assertEquals(validateNameEmpty.get("error"), "User name cannot be empty");
+        Assert.assertEquals("User name cannot be empty", validateNameEmpty.get("error"));
         userInfoEntity.setName("----====");
         Map<String, String> validateNameIllegal = usersService.validateUserInfo(userInfoEntity);
-        Assert.assertEquals(validateNameIllegal.get("error"), "User name illegal");
+        Assert.assertEquals("User name illegal", validateNameIllegal.get("error"));
 
         userInfoEntity.setName("test");
         userInfoEntity.setEmail("  ");
         Map<String, String> validateEmailEmpty = usersService.validateUserInfo(userInfoEntity);
-        Assert.assertEquals(validateEmailEmpty.get("error"), "User email cannot be empty");
+        Assert.assertEquals("User email cannot be empty", validateEmailEmpty.get("error"));
         userInfoEntity.setEmail("xxxx@");
         Map<String, String> validateEmailIllegal = usersService.validateUserInfo(userInfoEntity);
-        Assert.assertEquals(validateEmailIllegal.get("error"), "Email address illegal");
+        Assert.assertEquals("Email address illegal", validateEmailIllegal.get("error"));
 
         userInfoEntity.setEmail("test@apache.org");
         userInfoEntity.setPassword("  ");
         userInfoEntity.setAccessToken(" ");
         Map<String, String> validatePasswordAndTokenBlank = usersService.validateUserInfo(userInfoEntity);
-        Assert.assertEquals(validatePasswordAndTokenBlank.get("error"),
-                "Fields password and access token cannot be empty at the same time.");
+        Assert.assertEquals("Fields password and access token cannot be empty at the same time.",
+                validatePasswordAndTokenBlank.get("error"));
 
         userInfoEntity.setPassword("password");
         userInfoEntity.setAccessToken("token");
         Map<String, String> validateSuccess = usersService.validateUserInfo(userInfoEntity);
-        Assert.assertEquals(validateSuccess.get("message"),
-                "Validate user success");
+        Assert.assertEquals("Validate user success", validateSuccess.get("message"));
 
     }
 }