You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by gu...@apache.org on 2020/06/29 01:11:19 UTC
[pulsar-manager] branch master updated: Use Pulsar Admin instead of
HttpUtil (#286)
This is an automated email from the ASF dual-hosted git repository.
guangning pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-manager.git
The following commit(s) were added to refs/heads/master by this push:
new df69426 Use Pulsar Admin instead of HttpUtil (#286)
df69426 is described below
commit df69426972537e102bd055f457ced9b18751f4b2
Author: shustsud <51...@users.noreply.github.com>
AuthorDate: Mon Jun 29 10:11:12 2020 +0900
Use Pulsar Admin instead of HttpUtil (#286)
Master Issue: <https://github.com/apache/pulsar-manager/issues/249>
### Motivation
See <https://github.com/apache/pulsar-manager/issues/249>.
### Modifications
* Fixed <https://github.com/apache/pulsar-manager/issues/249>.
* Add authentication to [EnvironmentForward class](https://github.com/apache/pulsar-manager/blob/master/src/main/java/org/apache/pulsar/manager/zuul/EnvironmentForward.java).
---
build.gradle | 7 +-
gradle.properties | 2 +-
.../pulsar/manager/PulsarApplicationListener.java | 46 ++--
.../manager/controller/BrokerStatsController.java | 14 -
.../manager/controller/EnvironmentsController.java | 38 +--
.../manager/controller/TopicsController.java | 72 +----
.../exception/PulsarAdminOperationException.java | 21 ++
.../pulsar/manager/service/BrokerStatsService.java | 2 -
.../pulsar/manager/service/PulsarAdminService.java | 35 +++
.../pulsar/manager/service/TopicsService.java | 8 +-
.../service/impl/BrokerStatsServiceImpl.java | 47 ++--
.../manager/service/impl/BrokersServiceImpl.java | 61 +++--
.../manager/service/impl/ClustersServiceImpl.java | 61 +++--
.../service/impl/EnvironmentCacheServiceImpl.java | 67 +++--
.../service/impl/NamespacesServiceImpl.java | 34 ++-
.../service/impl/PulsarAdminServiceImpl.java | 155 +++++++++++
.../manager/service/impl/TenantsServiceImpl.java | 90 +++---
.../manager/service/impl/TopicsServiceImpl.java | 305 +++++++++++++++------
.../pulsar/manager/zuul/EnvironmentForward.java | 28 +-
src/main/resources/application.properties | 5 +
.../dao/BrokerTokensRepositoryImplTest.java | 8 +-
.../dao/EnvironmentsRepositoryImplTest.java | 12 +-
.../manager/dao/NamespacesRepositoryImplTest.java | 16 +-
.../manager/dao/RoleBindingRepositoryImplTest.java | 16 +-
.../manager/dao/RolesRepositoryImplTest.java | 24 +-
.../manager/dao/TenantsRepositoryImplTest.java | 26 +-
.../manager/dao/UsersRepositoryImplTest.java | 20 +-
.../manager/service/BookiesServiceImplTest.java | 6 +-
.../service/BrokerStatsServiceImplTest.java | 241 ++++++++--------
.../service/BrokerTokensServiceImplTest.java | 2 +-
.../manager/service/BrokersServiceImplTest.java | 63 ++---
.../manager/service/ClustersServiceImplTest.java | 90 +++---
.../service/EnvironmentCacheServiceImplTest.java | 143 +++++-----
.../service/GithubLoginServiceImplTest.java | 16 +-
.../manager/service/NamespacesServiceImplTest.java | 115 ++++----
.../service/PulsarAdminServiceImplTest.java | 64 +++++
.../manager/service/PulsarEventImplTest.java | 8 +-
.../service/RoleBindingServiceImplTest.java | 26 +-
.../manager/service/RolesServiceImplTest.java | 22 +-
.../manager/service/TenantsServiceImplTest.java | 81 +++---
.../manager/service/TopicsServiceImplTest.java | 172 ++++++------
.../manager/service/UsersServiceImplTest.java | 15 +-
42 files changed, 1324 insertions(+), 960 deletions(-)
diff --git a/build.gradle b/build.gradle
index a6a1389..b856ed3 100644
--- a/build.gradle
+++ b/build.gradle
@@ -36,6 +36,9 @@ plugins {
repositories {
mavenCentral()
mavenLocal()
+ maven {
+ url "https://yahoo.bintray.com/maven"
+ }
}
configurations {
@@ -120,6 +123,8 @@ dependencies {
compile group: 'com.google.code.gson', name: 'gson', version: gsonVersion
compile group: 'org.apache.pulsar', name: 'pulsar-common', version: pulsarVersion
compile group: 'org.apache.pulsar', name: 'pulsar-client-admin-original', version: pulsarVersion
+ compile group: 'org.apache.pulsar', name: 'pulsar-client-auth-athenz', version: pulsarVersion
+ compile group: 'org.apache.pulsar', name: 'pulsar-client-auth-sasl', version: pulsarVersion
compile group: 'io.springfox', name: 'springfox-swagger2', version: swagger2Version
compile group: 'io.springfox', name: 'springfox-swagger-ui', version: swaggeruiVersion
compile group: 'org.apache.pulsar', name: 'pulsar-broker', version: brokerVersion
@@ -141,6 +146,6 @@ dependencies {
compileOnly group: 'org.springframework.boot', name: 'spring-boot-devtools', version: springBootVersion
testCompile group: 'org.springframework.boot', name: 'spring-boot-starter-test', version: springBootVersion
testCompile group: 'org.mockito', name: 'mockito-core', version: mockitoVersion
- testCompile group: 'org.powermock', name: 'powermock-api-mockito', version: apiMockitoVersion
+ testCompile group: 'org.powermock', name: 'powermock-api-mockito2', version: apiMockitoVersion
testCompile group: 'org.powermock', name: 'powermock-module-junit4', version: mockitoJunit4Version
}
diff --git a/gradle.properties b/gradle.properties
index 5679029..5165d5a 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -8,7 +8,7 @@ jsonWebTokenApiVersion=0.10.5
jsonWebTokenImplVersion=0.10.5
lombokVersion=1.18.10
pageHelperVersion=1.2.4
-mockitoVersion=1.10.19
+mockitoVersion=2.8.47
guavaVersion=21.0
pulsarVersion=2.5.2
swagger2Version=2.9.2
diff --git a/src/main/java/org/apache/pulsar/manager/PulsarApplicationListener.java b/src/main/java/org/apache/pulsar/manager/PulsarApplicationListener.java
index 0391718..4cc1c40 100644
--- a/src/main/java/org/apache/pulsar/manager/PulsarApplicationListener.java
+++ b/src/main/java/org/apache/pulsar/manager/PulsarApplicationListener.java
@@ -14,19 +14,18 @@
package org.apache.pulsar.manager;
import com.github.pagehelper.Page;
-import com.google.common.collect.Maps;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.manager.entity.EnvironmentEntity;
import org.apache.pulsar.manager.entity.EnvironmentsRepository;
-import org.apache.pulsar.manager.utils.HttpUtil;
+import org.apache.pulsar.manager.service.PulsarAdminService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
-import java.util.Map;
import java.util.Optional;
/**
@@ -39,17 +38,18 @@ public class PulsarApplicationListener implements ApplicationListener<ContextRef
private final EnvironmentsRepository environmentsRepository;
+ private final PulsarAdminService pulsarAdminService;
+
@Value("${default.environment.name}")
private String defaultEnvironmentName;
@Value("${default.environment.service_url}")
private String defaultEnvironmentServiceUrl;
- @Value("${backend.jwt.token}")
- private String pulsarJwtToken;
-
- public PulsarApplicationListener(EnvironmentsRepository environmentsRepository) {
+ @Autowired
+ public PulsarApplicationListener(EnvironmentsRepository environmentsRepository, PulsarAdminService pulsarAdminService) {
this.environmentsRepository = environmentsRepository;
+ this.pulsarAdminService = pulsarAdminService;
}
@Override
@@ -65,26 +65,24 @@ public class PulsarApplicationListener implements ApplicationListener<ContextRef
&& defaultEnvironmentName.length() > 0
&& defaultEnvironmentServiceUrl.length() > 0
&& !environmentEntityOptional.isPresent()) {
- Map<String, String> header = Maps.newHashMap();
- if (StringUtils.isNotBlank(pulsarJwtToken)) {
- header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
- }
- String httpTestResult = HttpUtil.doGet(defaultEnvironmentServiceUrl + "/metrics", header);
- if (httpTestResult != null) {
- EnvironmentEntity environmentEntity = new EnvironmentEntity();
- environmentEntity.setBroker(defaultEnvironmentServiceUrl);
- environmentEntity.setName(defaultEnvironmentName);
- environmentsRepository.save(environmentEntity);
- log.info("Successfully added a default environment: name = {}, service_url = {}.",
- defaultEnvironmentName, defaultEnvironmentServiceUrl);
- } else {
+ try {
+ pulsarAdminService.clusters(defaultEnvironmentServiceUrl).getClusters();
+ } catch (PulsarAdminException e) {
+ log.error("Failed to get clusters list.", e);
log.error("Unable to connect default environment {} via {}, " +
- "please check if `environment.default.name` " +
- "and `environment.default.broker` are set correctly, " +
- "environmentDefaultName, environmentDefaultBroker",
+ "please check if `environment.default.name` " +
+ "and `environment.default.broker` are set correctly, " +
+ "environmentDefaultName, environmentDefaultBroker",
defaultEnvironmentName, defaultEnvironmentServiceUrl);
System.exit(-1);
}
+
+ EnvironmentEntity environmentEntity = new EnvironmentEntity();
+ environmentEntity.setBroker(defaultEnvironmentServiceUrl);
+ environmentEntity.setName(defaultEnvironmentName);
+ environmentsRepository.save(environmentEntity);
+ log.info("Successfully added a default environment: name = {}, service_url = {}.",
+ defaultEnvironmentName, defaultEnvironmentServiceUrl);
} else {
log.warn("The default environment already exists.");
}
diff --git a/src/main/java/org/apache/pulsar/manager/controller/BrokerStatsController.java b/src/main/java/org/apache/pulsar/manager/controller/BrokerStatsController.java
index 5f06089..68366f6 100644
--- a/src/main/java/org/apache/pulsar/manager/controller/BrokerStatsController.java
+++ b/src/main/java/org/apache/pulsar/manager/controller/BrokerStatsController.java
@@ -64,18 +64,4 @@ public class BrokerStatsController {
String result = brokerStatsService.forwardBrokerStatsMetrics(broker, requestHost);
return ResponseEntity.ok(result);
}
-
- @ApiOperation(value = "Get the broker stats topics")
- @ApiResponses({
- @ApiResponse(code = 200, message = "ok"),
- @ApiResponse(code = 500, message = "Internal server error")
- })
- @RequestMapping(value = "/broker-stats/topics", method = RequestMethod.GET)
- public ResponseEntity<String> getBrokerStatsTopics(
- @RequestParam() String broker) {
- String requestHost = environmentCacheService.getServiceUrl(request);
- String result = brokerStatsService.forwardBrokerStatsTopics(broker, requestHost);
- return ResponseEntity.ok(result);
- }
-
}
diff --git a/src/main/java/org/apache/pulsar/manager/controller/EnvironmentsController.java b/src/main/java/org/apache/pulsar/manager/controller/EnvironmentsController.java
index 017dcd3..0b050a0 100644
--- a/src/main/java/org/apache/pulsar/manager/controller/EnvironmentsController.java
+++ b/src/main/java/org/apache/pulsar/manager/controller/EnvironmentsController.java
@@ -14,6 +14,8 @@
package org.apache.pulsar.manager.controller;
import com.google.common.collect.Maps;
+
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.manager.entity.EnvironmentEntity;
import org.apache.pulsar.manager.entity.EnvironmentsRepository;
import org.apache.pulsar.manager.entity.RoleBindingEntity;
@@ -25,18 +27,18 @@ import org.apache.pulsar.manager.entity.TenantsRepository;
import org.apache.pulsar.manager.entity.UserInfoEntity;
import org.apache.pulsar.manager.entity.UsersRepository;
import org.apache.pulsar.manager.service.EnvironmentCacheService;
+import org.apache.pulsar.manager.service.PulsarAdminService;
import org.apache.pulsar.manager.service.RolesService;
-import org.apache.pulsar.manager.utils.HttpUtil;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
-import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.manager.utils.ResourceType;
import org.hibernate.validator.constraints.Range;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RequestBody;
@@ -61,8 +63,7 @@ import java.util.Optional;
@RestController
public class EnvironmentsController {
- @Value("${backend.jwt.token}")
- private String pulsarJwtToken;
+ private static final Logger log = LoggerFactory.getLogger(EnvironmentsController.class);
private final EnvironmentsRepository environmentsRepository;
@@ -78,8 +79,11 @@ public class EnvironmentsController {
private final RolesService rolesService;
+ private final PulsarAdminService pulsarAdminService;
+
private final HttpServletRequest request;
+ @Autowired
public EnvironmentsController(
HttpServletRequest request,
EnvironmentsRepository environmentsRepository,
@@ -88,7 +92,8 @@ public class EnvironmentsController {
TenantsRepository tenantsRepository,
RolesRepository rolesRepository,
RoleBindingRepository roleBindingRepository,
- RolesService rolesService) {
+ RolesService rolesService,
+ PulsarAdminService pulsarAdminService) {
this.environmentsRepository = environmentsRepository;
this.environmentCacheService = environmentCacheService;
this.request = request;
@@ -97,6 +102,7 @@ public class EnvironmentsController {
this.rolesRepository = rolesRepository;
this.roleBindingRepository = roleBindingRepository;
this.rolesService = rolesService;
+ this.pulsarAdminService = pulsarAdminService;
}
@ApiOperation(value = "Get the list of existing environments, support paging, the default is 10 per page")
@@ -183,12 +189,10 @@ public class EnvironmentsController {
result.put("error", "Environment is exist");
return ResponseEntity.ok(result);
}
- Map<String, String> header = Maps.newHashMap();
- if (StringUtils.isNotBlank(pulsarJwtToken)) {
- header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
- }
- String httpTestResult = HttpUtil.doGet(environmentEntity.getBroker() + "/metrics", header);
- if (httpTestResult == null) {
+ try {
+ pulsarAdminService.clusters(environmentEntity.getBroker()).getClusters();
+ } catch (PulsarAdminException e) {
+ log.error("Failed to get clusters list.", e);
result.put("error", "This environment is error. Please check it");
return ResponseEntity.ok(result);
}
@@ -217,12 +221,10 @@ public class EnvironmentsController {
result.put("error", "Environment no exist");
return ResponseEntity.ok(result);
}
- Map<String, String> header = Maps.newHashMap();
- if (StringUtils.isNotBlank(pulsarJwtToken)) {
- header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
- }
- String httpTestResult = HttpUtil.doGet(environmentEntity.getBroker() + "/metrics", header);
- if (httpTestResult == null) {
+ try {
+ pulsarAdminService.clusters(environmentEntity.getBroker()).getClusters();
+ } catch (PulsarAdminException e) {
+ log.error("Failed to get clusters list.", e);
result.put("error", "This environment is error. Please check it");
return ResponseEntity.ok(result);
}
diff --git a/src/main/java/org/apache/pulsar/manager/controller/TopicsController.java b/src/main/java/org/apache/pulsar/manager/controller/TopicsController.java
index fd1db13..30da4f5 100644
--- a/src/main/java/org/apache/pulsar/manager/controller/TopicsController.java
+++ b/src/main/java/org/apache/pulsar/manager/controller/TopicsController.java
@@ -14,14 +14,6 @@
package org.apache.pulsar.manager.controller;
import com.google.common.collect.Maps;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminBuilder;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.BatchMessageIdImpl;
-import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.manager.service.EnvironmentCacheService;
import org.apache.pulsar.manager.service.TopicsService;
import io.swagger.annotations.Api;
@@ -43,7 +35,6 @@ import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import javax.validation.constraints.Min;
import javax.validation.constraints.Size;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -60,21 +51,9 @@ public class TopicsController {
private final EnvironmentCacheService environmentCacheService;
private final HttpServletRequest request;
- @Value("${backend.jwt.token}")
- private String token;
-
@Value("${pulsar.peek.message}")
private boolean peekMessage;
- @Value("${tls.enabled}")
- private boolean tlsEnabled;
-
- @Value("${tls.hostname.verifier}")
- private boolean tlsHostnameVerifier;
-
- @Value("${tls.pulsar.admin.ca-certs}")
- private String tlsPulsarAdminCaCerts;
-
@Autowired
public TopicsController(
TopicsService topicsService,
@@ -161,53 +140,10 @@ public class TopicsController {
"turn on option pulsar.peek.message in file application.properties");
return ResponseEntity.ok(result);
}
- PulsarAdmin pulsarAdmin = null;
// to do check permission for non super, waiting for https://github.com/apache/pulsar-manager/pull/238
- try {
- PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder();
- if (tlsEnabled) {
- pulsarAdminBuilder
- .tlsTrustCertsFilePath(tlsPulsarAdminCaCerts)
- .enableTlsHostnameVerification(tlsHostnameVerifier);
- }
- if (token != null && token.length() > 0) {
- pulsarAdminBuilder.authentication(AuthenticationFactory.token(token));
- }
- pulsarAdmin = pulsarAdminBuilder.serviceHttpUrl(requestHost).build();
- String topicFullPath = persistent + "://" + tenant + "/" + namespace + "/" + topic;
- List<Message<byte[]>> messages = pulsarAdmin.topics().peekMessages(topicFullPath, subName, messagePosition);
- List<Map<String, Object>> mapList = new ArrayList<>();
- for (Message<byte[]> msg: messages) {
- Map<String, Object> message = Maps.newHashMap();
- if (msg.getMessageId() instanceof BatchMessageIdImpl) {
- BatchMessageIdImpl msgId = (BatchMessageIdImpl) msg.getMessageId();
- message.put("ledgerId", msgId.getLedgerId());
- message.put("entryId", msgId.getEntryId());
- message.put("batchIndex", msgId.getBatchIndex());
- message.put("batch", true);
- } else {
- MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
- message.put("batch", false);
- message.put("ledgerId", msgId.getLedgerId());
- message.put("entryId", msgId.getEntryId());
- }
- if (msg.getProperties().size() > 0) {
- msg.getProperties().forEach((k, v) -> {
- message.put(k, v);
- });
- }
- message.put("data", msg.getData());
- mapList.add(message);
- }
- result.put("data", mapList);
- } catch (PulsarClientException clientException) {
- result.put("error", clientException.getMessage());
- } catch (PulsarAdminException adminException) {
- result.put("error", adminException.getMessage());
- }
- if (pulsarAdmin != null) {
- pulsarAdmin.close();
- }
+ List<Map<String, Object>> mapList = topicsService.peekMessages(
+ persistent, tenant, namespace, topic, subName, messagePosition, requestHost);
+ result.put("data", mapList);
return ResponseEntity.ok(result);
}
-}
\ No newline at end of file
+}
diff --git a/src/main/java/org/apache/pulsar/manager/controller/exception/PulsarAdminOperationException.java b/src/main/java/org/apache/pulsar/manager/controller/exception/PulsarAdminOperationException.java
new file mode 100644
index 0000000..577b09f
--- /dev/null
+++ b/src/main/java/org/apache/pulsar/manager/controller/exception/PulsarAdminOperationException.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.manager.controller.exception;
+
+@SuppressWarnings("serial")
+public class PulsarAdminOperationException extends RuntimeException {
+ public PulsarAdminOperationException(String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/org/apache/pulsar/manager/service/BrokerStatsService.java b/src/main/java/org/apache/pulsar/manager/service/BrokerStatsService.java
index 0d298ce..ed9f075 100644
--- a/src/main/java/org/apache/pulsar/manager/service/BrokerStatsService.java
+++ b/src/main/java/org/apache/pulsar/manager/service/BrokerStatsService.java
@@ -22,8 +22,6 @@ public interface BrokerStatsService {
String forwardBrokerStatsMetrics(String broker, String requestHost);
- String forwardBrokerStatsTopics(String broker, String requestHost);
-
void collectStatsToDB(long unixTime, String environment, String cluster, String serviceUrl);
void clearStats(long nowTime, long timeInterval);
diff --git a/src/main/java/org/apache/pulsar/manager/service/PulsarAdminService.java b/src/main/java/org/apache/pulsar/manager/service/PulsarAdminService.java
new file mode 100644
index 0000000..ba2838f
--- /dev/null
+++ b/src/main/java/org/apache/pulsar/manager/service/PulsarAdminService.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.manager.service;
+
+import java.util.Map;
+
+import org.apache.pulsar.client.admin.BrokerStats;
+import org.apache.pulsar.client.admin.Brokers;
+import org.apache.pulsar.client.admin.Clusters;
+import org.apache.pulsar.client.admin.Namespaces;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.Tenants;
+import org.apache.pulsar.client.admin.Topics;
+
+public interface PulsarAdminService {
+ PulsarAdmin getPulsarAdmin(String url);
+ BrokerStats brokerStats(String url);
+ Clusters clusters(String url);
+ Brokers brokers(String url);
+ Tenants tenants(String url);
+ Namespaces namespaces(String url);
+ Topics topics(String url);
+ Map<String, String> getAuthHeader(String url);
+}
diff --git a/src/main/java/org/apache/pulsar/manager/service/TopicsService.java b/src/main/java/org/apache/pulsar/manager/service/TopicsService.java
index 4ae0cdc..08094cd 100644
--- a/src/main/java/org/apache/pulsar/manager/service/TopicsService.java
+++ b/src/main/java/org/apache/pulsar/manager/service/TopicsService.java
@@ -28,4 +28,10 @@ public interface TopicsService {
List<Map<String, Object>> getTopicsStatsList(String env, String tenant, String namespace,
String persistent, List<Map<String, String>> topics);
-}
\ No newline at end of file
+
+ List<Map<String, Object>> peekMessages(
+ String persistent, String tenant,
+ String namespace, String topic,
+ String subName, Integer messagePosition,
+ String requestHost);
+}
diff --git a/src/main/java/org/apache/pulsar/manager/service/impl/BrokerStatsServiceImpl.java b/src/main/java/org/apache/pulsar/manager/service/impl/BrokerStatsServiceImpl.java
index c7a1037..99fa696 100644
--- a/src/main/java/org/apache/pulsar/manager/service/impl/BrokerStatsServiceImpl.java
+++ b/src/main/java/org/apache/pulsar/manager/service/impl/BrokerStatsServiceImpl.java
@@ -15,14 +15,17 @@ package org.apache.pulsar.manager.service.impl;
import com.github.pagehelper.Page;
import com.google.gson.Gson;
+import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;
import java.text.DecimalFormat;
+
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.manager.controller.exception.PulsarAdminOperationException;
import org.apache.pulsar.manager.service.BrokerStatsService;
import org.apache.pulsar.manager.service.BrokersService;
import org.apache.pulsar.manager.service.ClustersService;
-import org.apache.pulsar.manager.service.EnvironmentCacheService;
-import org.apache.pulsar.manager.utils.HttpUtil;
+import org.apache.pulsar.manager.service.PulsarAdminService;
import org.apache.pulsar.manager.entity.ConsumerStatsEntity;
import org.apache.pulsar.manager.entity.ConsumersStatsRepository;
import org.apache.pulsar.manager.entity.EnvironmentEntity;
@@ -40,7 +43,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,9 +66,6 @@ public class BrokerStatsServiceImpl implements BrokerStatsService {
@Value("${backend.directRequestHost}")
private String directRequestHost;
- @Value("${backend.jwt.token}")
- private String pulsarJwtToken;
-
@Value("${clear.stats.interval}")
private Long clearStatsInterval;
@@ -78,8 +77,7 @@ public class BrokerStatsServiceImpl implements BrokerStatsService {
private final PublishersStatsRepository publishersStatsRepository;
private final ReplicationsStatsRepository replicationsStatsRepository;
private final ConsumersStatsRepository consumersStatsRepository;
-
- private static final Map<String, String> header = new HashMap<String, String>();
+ private final PulsarAdminService pulsarAdminService;
@Autowired
public BrokerStatsServiceImpl(
@@ -91,7 +89,7 @@ public class BrokerStatsServiceImpl implements BrokerStatsService {
PublishersStatsRepository publishersStatsRepository,
ReplicationsStatsRepository replicationsStatsRepository,
ConsumersStatsRepository consumersStatsRepository,
- EnvironmentCacheService environmentCache) {
+ PulsarAdminService pulsarAdminService) {
this.environmentsRepository = environmentsRepository;
this.clustersService = clustersService;
this.brokersService = brokersService;
@@ -100,23 +98,19 @@ public class BrokerStatsServiceImpl implements BrokerStatsService {
this.publishersStatsRepository = publishersStatsRepository;
this.replicationsStatsRepository = replicationsStatsRepository;
this.consumersStatsRepository = consumersStatsRepository;
+ this.pulsarAdminService = pulsarAdminService;
}
public String forwardBrokerStatsMetrics(String broker, String requestHost) {
- if (StringUtils.isNotBlank(pulsarJwtToken)) {
- header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
- }
-
broker = checkServiceUrl(broker, requestHost);
- return HttpUtil.doGet(broker + "/admin/v2/broker-stats/metrics", header);
- }
-
- public String forwardBrokerStatsTopics(String broker, String requestHost) {
- if (StringUtils.isNotBlank(pulsarJwtToken)) {
- header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
+ try {
+ return pulsarAdminService.brokerStats(broker).getMetrics().toString();
+ } catch(PulsarAdminException e) {
+ PulsarAdminOperationException pulsarAdminOperationException
+ = new PulsarAdminOperationException("Failed to get broker metrics.");
+ log.error(pulsarAdminOperationException.getMessage(), e);
+ throw pulsarAdminOperationException;
}
- broker = checkServiceUrl(broker, requestHost);
- return HttpUtil.doGet(broker + "/admin/v2/broker-stats/topics", header);
}
@Scheduled(initialDelayString = "${init.delay.interval}", fixedDelayString = "${insert.stats.interval}")
@@ -165,16 +159,19 @@ public class BrokerStatsServiceImpl implements BrokerStatsService {
}
public void collectStatsToDB(long unixTime, String env, String cluster, String serviceUrl) {
- if (StringUtils.isNotBlank(pulsarJwtToken)) {
- header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
- }
Map<String, Object> brokerObject = brokersService.getBrokersList(0, 0, cluster, serviceUrl);
List<HashMap<String, Object>> brokerLists = (List<HashMap<String, Object>>) brokerObject.get("data");
brokerLists.forEach((brokerMap) -> {
String tempBroker = (String) brokerMap.get("broker");
// TODO: handle other protocols
String broker = "http://" + tempBroker;
- String result = HttpUtil.doGet(broker + "/admin/v2/broker-stats/topics", header);
+ JsonObject result;
+ try {
+ result = pulsarAdminService.brokerStats(broker).getTopics();
+ } catch(PulsarAdminException e) {
+ log.error("Failed to get broker metrics.", e);
+ return;
+ }
Gson gson = new Gson();
HashMap<String, HashMap<String, HashMap<String, HashMap<String, PulsarManagerTopicStats>>>> brokerStatsTopicEntity = gson.fromJson(result,
new TypeToken<HashMap<String, HashMap<String, HashMap<String, HashMap<String, PulsarManagerTopicStats>>>>>() {
diff --git a/src/main/java/org/apache/pulsar/manager/service/impl/BrokersServiceImpl.java b/src/main/java/org/apache/pulsar/manager/service/impl/BrokersServiceImpl.java
index 536d428..e139695 100644
--- a/src/main/java/org/apache/pulsar/manager/service/impl/BrokersServiceImpl.java
+++ b/src/main/java/org/apache/pulsar/manager/service/impl/BrokersServiceImpl.java
@@ -14,11 +14,14 @@
package org.apache.pulsar.manager.service.impl;
import com.google.common.collect.Maps;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.Gson;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.FailureDomain;
+import org.apache.pulsar.manager.controller.exception.PulsarAdminOperationException;
import org.apache.pulsar.manager.service.BrokersService;
-import org.apache.pulsar.manager.utils.HttpUtil;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.manager.service.PulsarAdminService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@@ -29,28 +32,43 @@ import java.util.Map;
@Service
public class BrokersServiceImpl implements BrokersService {
+ private static final Logger log = LoggerFactory.getLogger(BrokersServiceImpl.class);
+
@Value("${backend.directRequestBroker}")
private boolean directRequestBroker;
- @Value("${backend.jwt.token}")
- private String pulsarJwtToken;
+ private final PulsarAdminService pulsarAdminService;
+
+ @Autowired
+ public BrokersServiceImpl(PulsarAdminService pulsarAdminService) {
+ this.pulsarAdminService = pulsarAdminService;
+ }
public Map<String, Object> getBrokersList(Integer pageNum, Integer pageSize, String cluster, String requestHost) {
Map<String, Object> brokersMap = Maps.newHashMap();
List<Map<String, Object>> brokersArray = new ArrayList<>();
if (directRequestBroker) {
- Gson gson = new Gson();
- Map<String, String> header = Maps.newHashMap();
- if (StringUtils.isNotBlank(pulsarJwtToken)) {
- header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
+ Map<String, FailureDomain> failureDomains;
+ try {
+ failureDomains = pulsarAdminService.clusters(requestHost).getFailureDomains(cluster);
+ } catch (PulsarAdminException e) {
+ PulsarAdminOperationException pulsarAdminOperationException
+ = new PulsarAdminOperationException("Failed to get failureDomains list.");
+ log.error(pulsarAdminOperationException.getMessage(), e);
+ throw pulsarAdminOperationException;
}
- String failureDomainsResult = HttpUtil.doGet(
- requestHost + "/admin/v2/clusters/" + cluster + "/failureDomains", header);
- Map<String, Map<String, List<String>>> failureDomains = gson.fromJson(
- failureDomainsResult, new TypeToken<Map<String, Map<String, List<String>>>>() {}.getType());
- String result = HttpUtil.doGet(requestHost + "/admin/v2/brokers/" + cluster, header);
- List<String> brokersList = gson.fromJson(result, new TypeToken<List<String>>() {}.getType());
+
+ List<String> brokersList;
+ try {
+ brokersList = pulsarAdminService.brokers(requestHost).getActiveBrokers(cluster);
+ } catch (PulsarAdminException e) {
+ PulsarAdminOperationException pulsarAdminOperationException
+ = new PulsarAdminOperationException("Failed to get brokers list.");
+ log.error(pulsarAdminOperationException.getMessage(), e);
+ throw pulsarAdminOperationException;
+ }
+
for (String broker: brokersList) {
Map<String, Object> brokerEntity = Maps.newHashMap();
List<String> failureDomain = this.getFailureDomain(broker, failureDomains);
@@ -67,15 +85,12 @@ public class BrokersServiceImpl implements BrokersService {
return brokersMap;
}
- private List<String> getFailureDomain(String broker, Map<String, Map<String, List<String>>> failureDomains) {
+ private List<String> getFailureDomain(String broker, Map<String, FailureDomain> failureDomains) {
List<String> failureDomainsList = new ArrayList<>();
for (String failureDomain: failureDomains.keySet()) {
- Map<String, List<String>> domains = failureDomains.get(failureDomain);
- for (String domain: domains.keySet()) {
- List<String> domainList = domains.get(domain);
- if (domainList.contains(broker)) {
- failureDomainsList.add(failureDomain);
- }
+ FailureDomain domain = failureDomains.get(failureDomain);
+ if (domain.getBrokers().contains(broker)) {
+ failureDomainsList.add(failureDomain);
}
}
return failureDomainsList;
diff --git a/src/main/java/org/apache/pulsar/manager/service/impl/ClustersServiceImpl.java b/src/main/java/org/apache/pulsar/manager/service/impl/ClustersServiceImpl.java
index 0162658..636cc77 100644
--- a/src/main/java/org/apache/pulsar/manager/service/impl/ClustersServiceImpl.java
+++ b/src/main/java/org/apache/pulsar/manager/service/impl/ClustersServiceImpl.java
@@ -14,15 +14,17 @@
package org.apache.pulsar.manager.service.impl;
import com.google.common.collect.Maps;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.Gson;
+
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.manager.controller.exception.PulsarAdminOperationException;
import org.apache.pulsar.manager.service.BrokersService;
import org.apache.pulsar.manager.service.ClustersService;
-import org.apache.pulsar.manager.utils.HttpUtil;
+import org.apache.pulsar.manager.service.PulsarAdminService;
import java.util.function.Function;
-import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@@ -34,20 +36,21 @@ import java.util.Map;
@Service
public class ClustersServiceImpl implements ClustersService {
+ private static final Logger log = LoggerFactory.getLogger(ClustersServiceImpl.class);
+
@Value("${backend.directRequestBroker}")
private boolean directRequestBroker;
- @Value("${backend.jwt.token}")
- private String pulsarJwtToken;
-
private final BrokersService brokersService;
+ private final PulsarAdminService pulsarAdminService;
+
@Autowired
- public ClustersServiceImpl(BrokersService brokersService) {
+ public ClustersServiceImpl(BrokersService brokersService, PulsarAdminService pulsarAdminService) {
this.brokersService = brokersService;
+ this.pulsarAdminService = pulsarAdminService;
}
-
public Map<String, Object> getClustersList(Integer pageNum,
Integer pageSize,
String envServiceUrl,
@@ -55,13 +58,15 @@ public class ClustersServiceImpl implements ClustersService {
Map<String, Object> clustersMap = Maps.newHashMap();
List<Map<String, Object>> clustersArray = new ArrayList<>();
if (directRequestBroker) {
- Gson gson = new Gson();
- Map<String, String> header = Maps.newHashMap();
- if (StringUtils.isNotBlank(pulsarJwtToken)) {
- header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
+ List<String> clustersList;
+ try {
+ clustersList = pulsarAdminService.clusters(envServiceUrl).getClusters();
+ } catch (PulsarAdminException e) {
+ PulsarAdminOperationException pulsarAdminOperationException
+ = new PulsarAdminOperationException("Failed to get clusters list.");
+ log.error(pulsarAdminOperationException.getMessage(), e);
+ throw pulsarAdminOperationException;
}
- String result = HttpUtil.doGet(envServiceUrl + "/admin/v2/clusters", header);
- List<String> clustersList = gson.fromJson(result, new TypeToken<List<String>>(){}.getType());
for (String cluster: clustersList) {
String clusterServiceUrl =
serviceUrlProvider == null ? envServiceUrl : serviceUrlProvider.apply(cluster);
@@ -70,8 +75,15 @@ public class ClustersServiceImpl implements ClustersService {
brokersService.getBrokersList(1, 1, cluster, clusterServiceUrl);
clusterEntity.put("brokers", brokers.get("total"));
clusterEntity.put("cluster", cluster);
- String clusterInfo = HttpUtil.doGet(clusterServiceUrl + "/admin/v2/clusters/" + cluster, header);
- ClusterData clusterData = gson.fromJson(clusterInfo, ClusterData.class);
+ ClusterData clusterData;
+ try {
+ clusterData = pulsarAdminService.clusters(clusterServiceUrl).getCluster(cluster);
+ } catch (PulsarAdminException e) {
+ PulsarAdminOperationException pulsarAdminOperationException
+ = new PulsarAdminOperationException("Failed to get cluster data.");
+ log.error(pulsarAdminOperationException.getMessage(), e);
+ throw pulsarAdminOperationException;
+ }
clusterEntity.put("serviceUrl", clusterData.getServiceUrl());
clusterEntity.put("serviceUrlTls", clusterData.getServiceUrlTls());
clusterEntity.put("brokerServiceUrl", clusterData.getBrokerServiceUrl());
@@ -88,13 +100,16 @@ public class ClustersServiceImpl implements ClustersService {
}
public List<String> getClusterByAnyBroker(String requestHost) {
- Gson gson = new Gson();
- Map<String, String> header = Maps.newHashMap();
- if (StringUtils.isNotBlank(pulsarJwtToken)) {
- header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
+ List<String> clustersList;
+ try {
+ clustersList = pulsarAdminService.clusters(requestHost).getClusters();
+ } catch (PulsarAdminException e) {
+ PulsarAdminOperationException pulsarAdminOperationException
+ = new PulsarAdminOperationException("Failed to get clusters list.");
+ log.error(pulsarAdminOperationException.getMessage(), e);
+ throw pulsarAdminOperationException;
}
- String result = HttpUtil.doGet(requestHost + "/admin/v2/clusters", header);
- List<String> clustersList = gson.fromJson(result, new TypeToken<List<String>>(){}.getType());
+
return clustersList;
}
}
diff --git a/src/main/java/org/apache/pulsar/manager/service/impl/EnvironmentCacheServiceImpl.java b/src/main/java/org/apache/pulsar/manager/service/impl/EnvironmentCacheServiceImpl.java
index a0f4b5d..b1290fb 100644
--- a/src/main/java/org/apache/pulsar/manager/service/impl/EnvironmentCacheServiceImpl.java
+++ b/src/main/java/org/apache/pulsar/manager/service/impl/EnvironmentCacheServiceImpl.java
@@ -14,14 +14,15 @@
package org.apache.pulsar.manager.service.impl;
import com.github.pagehelper.Page;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
+
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.manager.controller.exception.PulsarAdminOperationException;
import org.apache.pulsar.manager.entity.EnvironmentEntity;
import org.apache.pulsar.manager.entity.EnvironmentsRepository;
import org.apache.pulsar.manager.service.EnvironmentCacheService;
-import org.apache.pulsar.manager.utils.HttpUtil;
+import org.apache.pulsar.manager.service.PulsarAdminService;
+
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -33,7 +34,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@@ -44,16 +44,17 @@ import org.springframework.stereotype.Service;
@Service
public class EnvironmentCacheServiceImpl implements EnvironmentCacheService {
- @Value("${backend.jwt.token}")
- private String pulsarJwtToken;
-
private final EnvironmentsRepository environmentsRepository;
+
private final Map<String, Map<String, ClusterData>> environments;
+ private final PulsarAdminService pulsarAdminService;
+
@Autowired
- public EnvironmentCacheServiceImpl(EnvironmentsRepository environmentsRepository) {
+ public EnvironmentCacheServiceImpl(EnvironmentsRepository environmentsRepository, PulsarAdminService pulsarAdminService) {
this.environmentsRepository = environmentsRepository;
this.environments = new ConcurrentHashMap<>();
+ this.pulsarAdminService = pulsarAdminService;
}
@Override
@@ -100,14 +101,6 @@ public class EnvironmentCacheServiceImpl implements EnvironmentCacheService {
return clusterData.getServiceUrl();
}
- private Map<String, String> jsonHeader() {
- Map<String, String> header = Maps.newHashMap();
- if (StringUtils.isNotBlank(pulsarJwtToken)) {
- header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
- }
- return header;
- }
-
@Scheduled(
initialDelay = 0L,
fixedDelayString = "${cluster.cache.reload.interval.ms}")
@@ -120,7 +113,11 @@ public class EnvironmentCacheServiceImpl implements EnvironmentCacheService {
List<EnvironmentEntity> environmentList = environmentPage.getResult();
while (!environmentList.isEmpty()) {
environmentList.forEach(env -> {
- reloadEnvironment(env);
+ try {
+ reloadEnvironment(env);
+ } catch (PulsarAdminOperationException e) {
+ log.error(e.getMessage(), e);
+ }
newEnvironments.add(env.getName());
});
++pageNum;
@@ -143,13 +140,15 @@ public class EnvironmentCacheServiceImpl implements EnvironmentCacheService {
}
public void reloadEnvironment(EnvironmentEntity environment) {
- Gson gson = new Gson();
- String result = HttpUtil.doGet(
- environment.getBroker() + "/admin/v2/clusters",
- jsonHeader()
- );
- List<String> clustersList =
- gson.fromJson(result, new TypeToken<List<String>>(){}.getType());
+ List<String> clustersList;
+ try {
+ clustersList = pulsarAdminService.clusters(environment.getBroker()).getClusters();
+ } catch(PulsarAdminException e) {
+ PulsarAdminOperationException pulsarAdminOperationException
+ = new PulsarAdminOperationException("Failed to get clusters list.");
+ log.error(pulsarAdminOperationException.getMessage(), e);
+ throw pulsarAdminOperationException;
+ }
log.info("Reload cluster list for environment {} : {}", environment.getName(), clustersList);
Set<String> newClusters = Sets.newHashSet(clustersList);
Map<String, ClusterData> clusterDataMap = environments.computeIfAbsent(
@@ -176,19 +175,15 @@ public class EnvironmentCacheServiceImpl implements EnvironmentCacheService {
private ClusterData reloadCluster(EnvironmentEntity environment, String cluster) {
log.info("Reloading cluster data for cluster {} @ environment {} ...",
cluster, environment.getName());
- Gson gson = new Gson();
- String clusterInfoUrl = environment.getBroker() + "/admin/v2/clusters/" + cluster;
- String result = HttpUtil.doGet(
- clusterInfoUrl,
- jsonHeader()
- );
- if (null == result) {
- // fail to fetch the cluster data or the cluster is not found
+ ClusterData clusterData;
+ try {
+ clusterData = pulsarAdminService.clusters(environment.getBroker()).getCluster(cluster);
+ } catch(PulsarAdminException e) {
+ log.error("Failed to get cluster data.", e);
return null;
}
- log.info("Loaded cluster data for cluster {} @ environment {} from {} : {}",
- cluster, environment.getName(), clusterInfoUrl, result);
- ClusterData clusterData = gson.fromJson(result, ClusterData.class);
+ log.info("Loaded cluster data for cluster {} @ environment {} : {}",
+ cluster, environment.getName(), clusterData.toString());
Map<String, ClusterData> clusters = environments.computeIfAbsent(
environment.getName(),
(e) -> new ConcurrentHashMap<>());
diff --git a/src/main/java/org/apache/pulsar/manager/service/impl/NamespacesServiceImpl.java b/src/main/java/org/apache/pulsar/manager/service/impl/NamespacesServiceImpl.java
index 630b34d..f5d1eea 100644
--- a/src/main/java/org/apache/pulsar/manager/service/impl/NamespacesServiceImpl.java
+++ b/src/main/java/org/apache/pulsar/manager/service/impl/NamespacesServiceImpl.java
@@ -15,15 +15,17 @@ package org.apache.pulsar.manager.service.impl;
import com.github.pagehelper.Page;
import com.google.common.collect.Maps;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.Gson;
+
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.manager.controller.exception.PulsarAdminOperationException;
import org.apache.pulsar.manager.entity.TopicStatsEntity;
import org.apache.pulsar.manager.entity.TopicsStatsRepository;
import org.apache.pulsar.manager.service.BrokerStatsService;
import org.apache.pulsar.manager.service.NamespacesService;
+import org.apache.pulsar.manager.service.PulsarAdminService;
import org.apache.pulsar.manager.service.TopicsService;
-import org.apache.pulsar.manager.utils.HttpUtil;
-import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@@ -37,38 +39,42 @@ public class NamespacesServiceImpl implements NamespacesService {
@Value("${backend.directRequestBroker}")
private boolean directRequestBroker;
- @Value("${backend.jwt.token}")
- private String pulsarJwtToken;
+ private static final Logger log = LoggerFactory.getLogger(NamespacesServiceImpl.class);
private final TopicsStatsRepository topicsStatsRepository;
private final TopicsService topicsService;
private final HttpServletRequest request;
private final BrokerStatsService brokerStatsService;
+ private final PulsarAdminService pulsarAdminService;
@Autowired
public NamespacesServiceImpl(
TopicsStatsRepository topicsStatsRepository,
TopicsService topicsService,
HttpServletRequest request,
- BrokerStatsService brokerStatsService) {
+ BrokerStatsService brokerStatsService,
+ PulsarAdminService pulsarAdminService) {
this.topicsStatsRepository = topicsStatsRepository;
this.topicsService = topicsService;
this.request = request;
this.brokerStatsService = brokerStatsService;
+ this.pulsarAdminService = pulsarAdminService;
}
public Map<String, Object> getNamespaceList(Integer pageNum, Integer pageSize, String tenant, String requestHost) {
Map<String, Object> namespacesMap = Maps.newHashMap();
List<Map<String, Object>> namespacesArray = new ArrayList<>();
if (directRequestBroker) {
- Gson gson = new Gson();
- Map<String, String> header = Maps.newHashMap();
- if (StringUtils.isNotBlank(pulsarJwtToken)) {
- header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
+ List<String> namespacesList;
+ try {
+ namespacesList = pulsarAdminService.namespaces(requestHost).getNamespaces(tenant);
+ } catch (PulsarAdminException e) {
+ PulsarAdminOperationException pulsarAdminOperationException
+ = new PulsarAdminOperationException("Failed to get namespaces list.");
+ log.error(pulsarAdminOperationException.getMessage(), e);
+ throw pulsarAdminOperationException;
}
- String result = HttpUtil.doGet(requestHost + "/admin/v2/namespaces/" + tenant, header);
- if (result != null) {
- List<String> namespacesList = gson.fromJson(result, new TypeToken<List<String>>(){}.getType());
+ if (!namespacesList.isEmpty()) {
Optional<TopicStatsEntity> topicStatsEntityOptional = topicsStatsRepository.findMaxTime();
Map<String, TopicStatsEntity> topicStatsEntityMap = Maps.newHashMap();
if (topicStatsEntityOptional.isPresent()) {
diff --git a/src/main/java/org/apache/pulsar/manager/service/impl/PulsarAdminServiceImpl.java b/src/main/java/org/apache/pulsar/manager/service/impl/PulsarAdminServiceImpl.java
new file mode 100644
index 0000000..1b79c1f
--- /dev/null
+++ b/src/main/java/org/apache/pulsar/manager/service/impl/PulsarAdminServiceImpl.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.manager.service.impl;
+
+import javax.annotation.PreDestroy;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.BrokerStats;
+import org.apache.pulsar.client.admin.Brokers;
+import org.apache.pulsar.client.admin.Clusters;
+import org.apache.pulsar.client.admin.Namespaces;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.Tenants;
+import org.apache.pulsar.client.admin.Topics;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.manager.controller.exception.PulsarAdminOperationException;
+import org.apache.pulsar.manager.service.PulsarAdminService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+@Service
+public class PulsarAdminServiceImpl implements PulsarAdminService {
+
+ private static final Logger log = LoggerFactory.getLogger(PulsarAdminServiceImpl.class);
+
+ @Value("${backend.broker.pulsarAdmin.authPlugin:}")
+ private String authPlugin;
+
+ @Value("${backend.broker.pulsarAdmin.authParams:}")
+ private String authParams;
+
+ @Value("${backend.broker.pulsarAdmin.tlsAllowInsecureConnection:false}")
+ private Boolean tlsAllowInsecureConnection;
+
+ @Value("${backend.broker.pulsarAdmin.tlsTrustCertsFilePath:}")
+ private String tlsTrustCertsFilePath;
+
+ @Value("${backend.broker.pulsarAdmin.tlsEnableHostnameVerification:false}")
+ private Boolean tlsEnableHostnameVerification;
+
+ @Value("${backend.jwt.token:}")
+ private String pulsarJwtToken;
+
+ private Map<String, PulsarAdmin> pulsarAdmins = new HashMap<>();
+
+ @PreDestroy
+ public void destroy() {
+ pulsarAdmins.values().forEach(value -> value.close());
+ }
+
+ public synchronized PulsarAdmin getPulsarAdmin(String url) {
+ if (!pulsarAdmins.containsKey(url)) {
+ pulsarAdmins.put(url, this.createPulsarAdmin(url));
+ }
+ return pulsarAdmins.get(url);
+ }
+
+ public BrokerStats brokerStats(String url) {
+ return getPulsarAdmin(url).brokerStats();
+ }
+
+ public Clusters clusters(String url) {
+ return getPulsarAdmin(url).clusters();
+ }
+
+ public Brokers brokers(String url) {
+ return getPulsarAdmin(url).brokers();
+ }
+
+ public Tenants tenants(String url) {
+ return getPulsarAdmin(url).tenants();
+ }
+
+ public Namespaces namespaces(String url) {
+ return getPulsarAdmin(url).namespaces();
+ }
+
+ public Topics topics(String url) {
+ return getPulsarAdmin(url).topics();
+ }
+
+ public Map<String, String> getAuthHeader(String url) {
+ Authentication authentication = getPulsarAdmin(url).getClientConfigData().getAuthentication();
+ Map<String, String> result = new HashMap<>();
+
+ try {
+ CompletableFuture<Map<String, String>> authFuture = new CompletableFuture<>();
+ AuthenticationDataProvider authData = authentication.getAuthData(new URL(url).getHost());
+ if (authData.hasDataForHttp()) {
+ authentication.authenticationStage(url, authData, null, authFuture);
+ } else {
+ return result;
+ }
+
+ try {
+ Map<String, String> responseHeader = authFuture.get();
+ Set<Map.Entry<String, String>> headers =
+ authentication.newRequestHeader(url, authData, responseHeader);
+ if (headers != null) {
+ headers.forEach(entry -> result.put(entry.getKey(), entry.getValue()));
+ }
+ } catch (Exception e) {
+ log.error("Failed to get headers", e);
+ }
+ } catch (Exception e) {
+ log.error("Failed to run getAuthHeader", e);
+ }
+
+ return result;
+ }
+
+ private PulsarAdmin createPulsarAdmin(String url) {
+ try {
+ log.info("Create Pulsar Admin instance. url={}, authPlugin={}, authParams={}, tlsAllowInsecureConnection={}, tlsTrustCertsFilePath={}, tlsEnableHostnameVerification={}",
+ url, authPlugin, authParams, tlsAllowInsecureConnection, tlsTrustCertsFilePath, tlsEnableHostnameVerification);
+ PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder();
+ pulsarAdminBuilder.serviceHttpUrl(url);
+ if (StringUtils.isNotBlank(pulsarJwtToken)) {
+ pulsarAdminBuilder.authentication(AuthenticationFactory.token(pulsarJwtToken));
+ } else {
+ pulsarAdminBuilder.authentication(authPlugin, authParams);
+ }
+ pulsarAdminBuilder.allowTlsInsecureConnection(tlsAllowInsecureConnection);
+ pulsarAdminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
+ pulsarAdminBuilder.enableTlsHostnameVerification(tlsEnableHostnameVerification);
+ return pulsarAdminBuilder.build();
+ } catch (PulsarClientException e) {
+ PulsarAdminOperationException pulsarAdminOperationException = new PulsarAdminOperationException("Failed to create Pulsar Admin instance.");
+ log.error(pulsarAdminOperationException.getMessage(), e);
+ throw pulsarAdminOperationException;
+ }
+ }
+}
diff --git a/src/main/java/org/apache/pulsar/manager/service/impl/TenantsServiceImpl.java b/src/main/java/org/apache/pulsar/manager/service/impl/TenantsServiceImpl.java
index 9eeeda6..ee2bdd6 100644
--- a/src/main/java/org/apache/pulsar/manager/service/impl/TenantsServiceImpl.java
+++ b/src/main/java/org/apache/pulsar/manager/service/impl/TenantsServiceImpl.java
@@ -16,14 +16,14 @@ package org.apache.pulsar.manager.service.impl;
import com.github.pagehelper.Page;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.Gson;
+
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.manager.controller.exception.PulsarAdminOperationException;
import org.apache.pulsar.manager.entity.TopicStatsEntity;
import org.apache.pulsar.manager.entity.TopicsStatsRepository;
import org.apache.pulsar.manager.service.BrokerStatsService;
+import org.apache.pulsar.manager.service.PulsarAdminService;
import org.apache.pulsar.manager.service.TenantsService;
-import org.apache.pulsar.manager.utils.HttpUtil;
-import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,7 +32,6 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.servlet.http.HttpServletRequest;
-import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -43,34 +42,40 @@ public class TenantsServiceImpl implements TenantsService {
private static final Logger log = LoggerFactory.getLogger(TenantsServiceImpl.class);
-
@Value("${backend.directRequestBroker}")
private boolean directRequestBroker;
- @Value("${backend.jwt.token}")
- private String pulsarJwtToken;
+ private final BrokerStatsService brokerStatsService;
- @Autowired
- private BrokerStatsService brokerStatsService;
+ private final TopicsStatsRepository topicsStatsRepository;
- @Autowired
- private TopicsStatsRepository topicsStatsRepository;
+ private final PulsarAdminService pulsarAdminService;
+
+ private final HttpServletRequest request;
@Autowired
- private HttpServletRequest request;
+ public TenantsServiceImpl(BrokerStatsService brokerStatsService, TopicsStatsRepository topicsStatsRepository, PulsarAdminService pulsarAdminService, HttpServletRequest request) {
+ this.brokerStatsService = brokerStatsService;
+ this.topicsStatsRepository = topicsStatsRepository;
+ this.pulsarAdminService = pulsarAdminService;
+ this.request = request;
+ }
public Map<String, Object> getTenantsList(Integer pageNum, Integer pageSize, String requestHost) {
Map<String, Object> tenantsMap = Maps.newHashMap();
List<Map<String, Object>> tenantsArray = new ArrayList<>();
if (directRequestBroker) {
- Gson gson = new Gson();
- Map<String, String> header = Maps.newHashMap();
- if (StringUtils.isNotBlank(pulsarJwtToken)) {
- header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
+ List<String> tenantsList;
+ try {
+ tenantsList = pulsarAdminService.tenants(requestHost).getTenants();
+ } catch (PulsarAdminException e) {
+ PulsarAdminOperationException pulsarAdminOperationException
+ = new PulsarAdminOperationException("Failed to get tenants list.");
+ log.error(pulsarAdminOperationException.getMessage(), e);
+ throw pulsarAdminOperationException;
}
- String result = HttpUtil.doGet( requestHost + "/admin/v2/tenants", header);
- if (result != null) {
- List<String> tenantsList = gson.fromJson(result, new TypeToken<List<String>>(){}.getType());
+
+ if (!tenantsList.isEmpty()) {
Optional<TopicStatsEntity> topicStatsEntityOptional = topicsStatsRepository.findMaxTime();
Map<String, TopicStatsEntity> topicStatsEntityMap = Maps.newHashMap();
if (topicStatsEntityOptional.isPresent()) {
@@ -86,19 +91,29 @@ public class TenantsServiceImpl implements TenantsService {
}
}
for (String tenant : tenantsList) {
+ TenantInfo tenantInfo;
+ try {
+ tenantInfo = pulsarAdminService.tenants(requestHost).getTenantInfo(tenant);
+ } catch (PulsarAdminException e) {
+ PulsarAdminOperationException pulsarAdminOperationException
+ = new PulsarAdminOperationException("Failed to get tenant info.");
+ log.error(pulsarAdminOperationException.getMessage(), e);
+ throw pulsarAdminOperationException;
+ }
Map<String, Object> tenantEntity = Maps.newHashMap();
- String info = HttpUtil.doGet( requestHost + "/admin/v2/tenants/" + tenant, header);
- TenantInfo tenantInfo = gson.fromJson(info, TenantInfo.class);
tenantEntity.put("tenant", tenant);
tenantEntity.put("adminRoles", String.join(",", tenantInfo.getAdminRoles()));
tenantEntity.put("allowedClusters", String.join(",", tenantInfo.getAllowedClusters()));
- String namespace = HttpUtil.doGet(requestHost + "/admin/v2/namespaces/" + tenant, header);
- if (namespace != null) {
- List<String> namespacesList = gson.fromJson(namespace, new TypeToken<List<String>>(){}.getType());
- tenantEntity.put("namespaces", namespacesList.size());
- } else {
- tenantEntity.put("namespaces", 0);
+ List<String> namespacesList;
+ try {
+ namespacesList = pulsarAdminService.namespaces(requestHost).getNamespaces(tenant);
+ } catch (PulsarAdminException e) {
+ PulsarAdminOperationException pulsarAdminOperationException
+ = new PulsarAdminOperationException("Failed to get namespaces list.");
+ log.error(pulsarAdminOperationException.getMessage(), e);
+ throw pulsarAdminOperationException;
}
+ tenantEntity.put("namespaces", namespacesList.size());
if (topicStatsEntityMap.get(tenant) != null ) {
TopicStatsEntity topicStatsEntity = topicStatsEntityMap.get(tenant);
tenantEntity.put("inMsg", topicStatsEntity.getMsgRateIn());
@@ -122,22 +137,15 @@ public class TenantsServiceImpl implements TenantsService {
}
public Map<String, String> createTenant(String tenant, String role, String cluster, String requestHost) {
- Map<String, String> header = Maps.newHashMap();
- header.put("Content-Type", "application/json");
- if (StringUtils.isNotBlank(pulsarJwtToken)) {
- header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
- }
- Map<String, Object> body = Maps.newHashMap();
- body.put("adminRoles", Sets.newHashSet(role));
- // Get cluster from standalone, to do
- body.put("allowedClusters", Sets.newHashSet(cluster));
- Gson gson = new Gson();
+ TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet(role), Sets.newHashSet(cluster));
Map<String, String> result = Maps.newHashMap();
try {
- HttpUtil.doPut( requestHost + "/admin/v2/tenants/" + tenant, header, gson.toJson(body));
+ pulsarAdminService.tenants(requestHost).createTenant(tenant, tenantInfo);
result.put("message", "Create tenant success");
- } catch (UnsupportedEncodingException e) {
- log.error("Init tenant failed for user: {}", tenant);
+ } catch (PulsarAdminException e) {
+ PulsarAdminOperationException pulsarAdminOperationException
+ = new PulsarAdminOperationException("Failed to create tenant.");
+ log.error(pulsarAdminOperationException.getMessage(), e);
result.put("error", "Create tenant failed");
}
return result;
diff --git a/src/main/java/org/apache/pulsar/manager/service/impl/TopicsServiceImpl.java b/src/main/java/org/apache/pulsar/manager/service/impl/TopicsServiceImpl.java
index 2503747..6013b21 100644
--- a/src/main/java/org/apache/pulsar/manager/service/impl/TopicsServiceImpl.java
+++ b/src/main/java/org/apache/pulsar/manager/service/impl/TopicsServiceImpl.java
@@ -15,14 +15,24 @@ package org.apache.pulsar.manager.service.impl;
import com.github.pagehelper.Page;
import com.google.common.collect.Maps;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.Gson;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.manager.controller.exception.PulsarAdminOperationException;
import org.apache.pulsar.manager.entity.TopicStatsEntity;
import org.apache.pulsar.manager.entity.TopicStatsEntity.TopicStatsSummary;
import org.apache.pulsar.manager.entity.TopicsStatsRepository;
+import org.apache.pulsar.manager.service.PulsarAdminService;
import org.apache.pulsar.manager.service.TopicsService;
-import org.apache.pulsar.manager.utils.HttpUtil;
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -34,19 +44,21 @@ import java.util.*;
@Service
public class TopicsServiceImpl implements TopicsService {
+ private static final Logger log = LoggerFactory.getLogger(TopicsServiceImpl.class);
+
public static final String PARTITIONED_TOPIC_SUFFIX = "-partition-";
@Value("${backend.directRequestBroker}")
private boolean directRequestBroker;
- @Value("${backend.jwt.token}")
- private String pulsarJwtToken;
-
private final TopicsStatsRepository topicsStatsRepository;
+ private final PulsarAdminService pulsarAdminService;
+
@Autowired
- public TopicsServiceImpl(TopicsStatsRepository topicsStatsRepository) {
+ public TopicsServiceImpl(TopicsStatsRepository topicsStatsRepository, PulsarAdminService pulsarAdminService) {
this.topicsStatsRepository = topicsStatsRepository;
+ this.pulsarAdminService = pulsarAdminService;
}
private boolean isPartitionedTopic(List<String> topics, String topic) {
@@ -67,23 +79,52 @@ public class TopicsServiceImpl implements TopicsService {
String env,
String serviceUrl) {
Map<String, Object> topicsMap = Maps.newHashMap();
- List<Map<String, String>> persistentTopics = this.getTopicListByHttp(
- tenant, namespace, "persistent", serviceUrl);
- List<Map<String, Object>> persistentTopicsArray = this.getTopicsStatsList(
- env, tenant, namespace, "persistent", persistentTopics);
- List<Map<String, String>> nonPersistentTopics = this.getTopicListByHttp(
- tenant, namespace, "non-persistent", serviceUrl);
- List<Map<String, Object>> nonPersistentTopicsArray = this.getTopicsStatsList(
- env, tenant, namespace, "non-persistent", nonPersistentTopics);
- persistentTopicsArray.addAll(nonPersistentTopicsArray);
- topicsMap.put("topics", persistentTopicsArray);
+ List<Map<String, Object>> allTopics = this.getTopicStats(env, tenant, namespace, serviceUrl);
+ topicsMap.put("topics", allTopics);
topicsMap.put("isPage", false);
- topicsMap.put("total", persistentTopicsArray.size());
+ topicsMap.put("total", allTopics.size());
topicsMap.put("pageNum", 1);
- topicsMap.put("pageSize", persistentTopicsArray.size());
+ topicsMap.put("pageSize", allTopics.size());
return topicsMap;
}
+ private List<Map<String, Object>> getTopicStats(
+ String env, String tenant, String namespace, String requestHost) {
+ List<Map<String, Object>> result = new ArrayList<>();
+
+ Map<String, List<String>> allTopics
+ = this.getTopicListByPulsarAdmin(tenant, namespace, requestHost);
+ Map<String, List<String>> allPartitionedTopics
+ = this.getPartitionedTopicListByPulsarAdmin(tenant, namespace, requestHost);
+
+ result.addAll(this.getTopicsStatsList(
+ env,
+ tenant,
+ namespace,
+ TopicDomain.persistent.toString(),
+ this.convertTopicList(
+ allTopics.get(TopicDomain.persistent.toString()),
+ allPartitionedTopics.get(TopicDomain.persistent.toString()),
+ TopicDomain.persistent.toString(),
+ requestHost
+ )
+ ));
+ result.addAll(this.getTopicsStatsList(
+ env,
+ tenant,
+ namespace,
+ TopicDomain.non_persistent.toString(),
+ this.convertTopicList(
+ allTopics.get(TopicDomain.non_persistent.toString()),
+ allPartitionedTopics.get(TopicDomain.non_persistent.toString()),
+ TopicDomain.non_persistent.toString(),
+ requestHost
+ )
+ ));
+
+ return result;
+ }
+
public List<Map<String, Object>> getTopicsStatsList(String env,
String tenant,
String namespace,
@@ -179,84 +220,126 @@ public class TopicsServiceImpl implements TopicsService {
public Map<String, Object> getTopicsList(
Integer pageNum, Integer pageSize, String tenant, String namespace, String requestHost) {
Map<String, Object> topicsMap = Maps.newHashMap();
- List<Map<String, String>> persistentTopic = this.getTopicListByHttp(
- tenant, namespace, "persistent", requestHost);
- List<Map<String, String>> nonPersistentTopic = this.getTopicListByHttp(
- tenant, namespace, "non-persistent", requestHost);
- persistentTopic.addAll(nonPersistentTopic);
- topicsMap.put("topics", persistentTopic);
+ List<Map<String, String>> allTopics = this.getTopicsList(tenant, namespace, requestHost);
+ topicsMap.put("topics", allTopics);
topicsMap.put("isPage", false);
- topicsMap.put("total", persistentTopic.size());
+ topicsMap.put("total", allTopics.size());
topicsMap.put("pageNum", 1);
- topicsMap.put("pageSize", persistentTopic.size());
+ topicsMap.put("pageSize", allTopics.size());
return topicsMap;
}
- private List<Map<String, String>> getTopicListByHttp(
- String tenant, String namespace, String persistent, String requestHost) {
- List<Map<String, String>> topicsArray = new ArrayList<>();
- Map<String, String> header = Maps.newHashMap();
- if (StringUtils.isNotBlank(pulsarJwtToken)) {
- header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
+ @Override
+ public List<Map<String, Object>> peekMessages(String persistent,
+ String tenant,
+ String namespace,
+ String topic,
+ String subName,
+ Integer messagePosition,
+ String requestHost) {
+ List<Message<byte[]>> messages;
+ String topicFullPath = persistent + "://" + tenant + "/" + namespace + "/" + topic;
+ try {
+ messages = pulsarAdminService.topics(requestHost).peekMessages(topicFullPath, subName, messagePosition);
+ } catch(PulsarAdminException e) {
+ PulsarAdminOperationException pulsarAdminOperationException
+ = new PulsarAdminOperationException("Failed to peek messages.");
+ log.error(pulsarAdminOperationException.getMessage(), e);
+ throw pulsarAdminOperationException;
}
- String prefix = "/admin/v2/" + persistent + "/" + tenant + "/" + namespace;
- Gson gson = new Gson();
- String partitionedUrl = requestHost + prefix + "/partitioned";
- String partitionedTopic = HttpUtil.doGet(partitionedUrl, header);
- List<String> partitionedTopicsList = Arrays.asList();
- Map<String, List<String>> partitionedMap = Maps.newHashMap();
- if (partitionedTopic != null) {
- partitionedTopicsList = gson.fromJson(
- partitionedTopic, new TypeToken<List<String>>(){}.getType());
- for (String p : partitionedTopicsList) {
- if (p.startsWith(persistent)) {
- partitionedMap.put(this.getTopicName(p), new ArrayList<>());
- }
+ List<Map<String, Object>> mapList = new ArrayList<>();
+ for (Message<byte[]> msg: messages) {
+ Map<String, Object> message = Maps.newHashMap();
+ if (msg.getMessageId() instanceof BatchMessageIdImpl) {
+ BatchMessageIdImpl msgId = (BatchMessageIdImpl) msg.getMessageId();
+ message.put("ledgerId", msgId.getLedgerId());
+ message.put("entryId", msgId.getEntryId());
+ message.put("batchIndex", msgId.getBatchIndex());
+ message.put("batch", true);
+ } else {
+ MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
+ message.put("ledgerId", msgId.getLedgerId());
+ message.put("entryId", msgId.getEntryId());
+ message.put("batch", false);
}
+ if (msg.getProperties().size() > 0) {
+ msg.getProperties().forEach((k, v) -> message.put(k, v));
+ }
+ message.put("data", msg.getData());
+ mapList.add(message);
}
+ return mapList;
+ }
- String topicUrl = requestHost + prefix;
- String topics = HttpUtil.doGet(topicUrl, header);
- if (topics != null) {
- List<String> topicsList = gson.fromJson(
- topics, new TypeToken<List<String>>(){}.getType());
- for (String topic: topicsList) {
- if (topic.startsWith(persistent)) {
- String topicName = this.getTopicName(topic);
- Map<String, String> topicEntity = Maps.newHashMap();
- if (isPartitionedTopic(partitionedTopicsList, topic)) {
- String[] name = topicName.split(PARTITIONED_TOPIC_SUFFIX);
- partitionedMap.get(name[0]).add(topicName);
- } else {
- topicEntity.put("topic", topicName);
- topicEntity.put("partitions", "0");
- topicEntity.put("persistent", persistent);
- topicsArray.add(topicEntity);
- }
- }
+ private List<Map<String, String>> getTopicsList(
+ String tenant, String namespace, String requestHost) {
+ List<Map<String, String>> result = new ArrayList<>();
+
+ Map<String, List<String>> allTopics
+ = this.getTopicListByPulsarAdmin(tenant, namespace, requestHost);
+ Map<String, List<String>> allPartitionedTopics
+ = this.getPartitionedTopicListByPulsarAdmin(tenant, namespace, requestHost);
+
+ result.addAll(this.convertTopicList(
+ allTopics.get(TopicDomain.persistent.toString()),
+ allPartitionedTopics.get(TopicDomain.persistent.toString()),
+ TopicDomain.persistent.toString(),
+ requestHost
+ ));
+ result.addAll(this.convertTopicList(
+ allTopics.get(TopicDomain.non_persistent.toString()),
+ allPartitionedTopics.get(TopicDomain.non_persistent.toString()),
+ TopicDomain.non_persistent.toString(),
+ requestHost
+ ));
+
+ return result;
+ }
+
+ private List<Map<String, String>> convertTopicList(
+ List<String> topics, List<String> partitionedTopics, String persistent, String requestHost) {
+ List<Map<String, String>> topicsArray = new ArrayList<>();
+
+ Map<String, List<String>> partitionedMap = Maps.newHashMap();
+ for (String p : partitionedTopics) {
+ if (p.startsWith(persistent)) {
+ partitionedMap.put(this.getTopicName(p), new ArrayList<>());
}
}
- if (partitionedTopicsList != null) {
- for (String s : partitionedTopicsList) {
- String topicName = this.getTopicName(s);
+
+ for (String topic: topics) {
+ if (topic.startsWith(persistent)) {
+ String topicName = this.getTopicName(topic);
Map<String, String> topicEntity = Maps.newHashMap();
- List<String> partitionedTopicList = partitionedMap.get(s);
- if (partitionedTopicList != null && partitionedTopicList.size() > 0) {
- topicEntity.put("topic", topicName);
- topicEntity.put("partitions", String.valueOf(partitionedTopicList.size()));
- topicEntity.put("persistent", persistent);
+ if (isPartitionedTopic(partitionedTopics, topic)) {
+ String[] name = topicName.split(PARTITIONED_TOPIC_SUFFIX);
+ partitionedMap.get(name[0]).add(topicName);
} else {
topicEntity.put("topic", topicName);
- String metadataTopicUrl = requestHost + prefix + "/" + topicName + "/partitions";
- String metadataTopic = HttpUtil.doGet(metadataTopicUrl, header);
- Map<String, Integer> metadata = gson.fromJson(
- metadataTopic, new TypeToken<Map<String, Integer>>(){}.getType());
- topicEntity.put("partitions", String.valueOf(metadata.get("partitions")));
+ topicEntity.put("partitions", "0");
topicEntity.put("persistent", persistent);
+ topicsArray.add(topicEntity);
}
- topicsArray.add(topicEntity);
}
}
+
+ for (String s : partitionedTopics) {
+ String topicName = this.getTopicName(s);
+ Map<String, String> topicEntity = Maps.newHashMap();
+ List<String> partitionedTopicList = partitionedMap.get(s);
+ if (partitionedTopicList != null && partitionedTopicList.size() > 0) {
+ topicEntity.put("topic", topicName);
+ topicEntity.put("partitions", String.valueOf(partitionedTopicList.size()));
+ topicEntity.put("persistent", persistent);
+ } else {
+ topicEntity.put("topic", topicName);
+ PartitionedTopicMetadata partitionedTopicMetadata
+ = this.getPartitionedTopicMetadataByPulsarAdmin(s, requestHost);
+ topicEntity.put("partitions", String.valueOf(partitionedTopicMetadata.partitions));
+ topicEntity.put("persistent", persistent);
+ }
+ topicsArray.add(topicEntity);
+ }
return topicsArray;
}
@@ -265,4 +348,66 @@ public class TopicsServiceImpl implements TopicsService {
String topicName = tntPath.split("/")[2];
return topicName;
}
+
+ private Map<String, List<String>> getTopicListByPulsarAdmin(
+ String tenant, String namespace, String requestHost) {
+ try {
+ return parseTopics(
+ pulsarAdminService.topics(requestHost).
+ getList(tenant + "/" + namespace)
+ );
+ } catch (PulsarAdminException e) {
+ PulsarAdminOperationException pulsarAdminOperationException
+ = new PulsarAdminOperationException("Failed to get topic list.");
+ log.error(pulsarAdminOperationException.getMessage(), e);
+ throw pulsarAdminOperationException;
+ }
+ }
+
+ private Map<String, List<String>> getPartitionedTopicListByPulsarAdmin(
+ String tenant, String namespace, String requestHost) {
+ try {
+ return parseTopics(
+ pulsarAdminService.topics(requestHost).
+ getPartitionedTopicList(tenant + "/" + namespace)
+ );
+ } catch (PulsarAdminException e) {
+ PulsarAdminOperationException pulsarAdminOperationException
+ = new PulsarAdminOperationException("Failed to get partitioned topic list.");
+ log.error(pulsarAdminOperationException.getMessage(), e);
+ throw pulsarAdminOperationException;
+ }
+ }
+
+ private PartitionedTopicMetadata getPartitionedTopicMetadataByPulsarAdmin(
+ String topic, String requestHost) {
+ try {
+ return pulsarAdminService.topics(requestHost).
+ getPartitionedTopicMetadata(topic);
+ } catch (PulsarAdminException e) {
+ PulsarAdminOperationException pulsarAdminOperationException
+ = new PulsarAdminOperationException("Failed to get partitioned topic metadata.");
+ log.error(pulsarAdminOperationException.getMessage(), e);
+ throw pulsarAdminOperationException;
+ }
+ }
+
+ private Map<String, List<String>> parseTopics(List<String> topics) {
+ Map<String, List<String>> result = new HashMap<>();
+ List<String> persistentTopics = new ArrayList<>();
+ List<String> nonPersistentTopics = new ArrayList<>();
+
+ for (String topic : topics) {
+ TopicName topicName = TopicName.get(topic);
+ if (TopicDomain.persistent.equals(topicName.getDomain())) {
+ persistentTopics.add(topic);
+ } else {
+ nonPersistentTopics.add(topic);
+ }
+ }
+
+ result.put(TopicDomain.persistent.toString(), persistentTopics);
+ result.put(TopicDomain.non_persistent.toString(), nonPersistentTopics);
+ return result;
+ }
}
diff --git a/src/main/java/org/apache/pulsar/manager/zuul/EnvironmentForward.java b/src/main/java/org/apache/pulsar/manager/zuul/EnvironmentForward.java
index d1fb323..d3336c0 100644
--- a/src/main/java/org/apache/pulsar/manager/zuul/EnvironmentForward.java
+++ b/src/main/java/org/apache/pulsar/manager/zuul/EnvironmentForward.java
@@ -13,24 +13,25 @@
*/
package org.apache.pulsar.manager.zuul;
-import org.apache.pulsar.manager.service.EnvironmentCacheService;
import com.netflix.zuul.ZuulFilter;
import com.netflix.zuul.context.RequestContext;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Map;
+import javax.servlet.http.HttpServletRequest;
+
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.manager.service.EnvironmentCacheService;
+import org.apache.pulsar.manager.service.PulsarAdminService;
import org.apache.pulsar.manager.service.PulsarEvent;
import org.apache.pulsar.manager.service.RolesService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.netflix.zuul.filters.support.FilterConstants;
import org.springframework.stereotype.Component;
-import javax.servlet.http.HttpServletRequest;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
import static org.springframework.cloud.netflix.zuul.filters.support.FilterConstants.PRE_TYPE;
import static org.springframework.cloud.netflix.zuul.filters.support.FilterConstants.REQUEST_URI_KEY;
@@ -42,21 +43,22 @@ public class EnvironmentForward extends ZuulFilter {
private static final Logger log = LoggerFactory.getLogger(EnvironmentForward.class);
- @Value("${backend.jwt.token}")
- private String pulsarJwtToken;
-
private final EnvironmentCacheService environmentCacheService;
private final PulsarEvent pulsarEvent;
private final RolesService rolesService;
+ private final PulsarAdminService pulsarAdminService;
+
@Autowired
public EnvironmentForward(
- EnvironmentCacheService environmentCacheService, PulsarEvent pulsarEvent, RolesService rolesService) {
+ EnvironmentCacheService environmentCacheService, PulsarEvent pulsarEvent,
+ RolesService rolesService, PulsarAdminService pulsarAdminService) {
this.environmentCacheService = environmentCacheService;
this.pulsarEvent = pulsarEvent;
this.rolesService = rolesService;
+ this.pulsarAdminService = pulsarAdminService;
}
@Override
@@ -129,7 +131,8 @@ public class EnvironmentForward extends ZuulFilter {
private Object forwardRequest(RequestContext ctx, HttpServletRequest request, String serviceUrl) {
ctx.put(REQUEST_URI_KEY, request.getRequestURI());
try {
- ctx.addZuulRequestHeader("Authorization", String.format("Bearer %s", pulsarJwtToken));
+ Map<String, String> authHeader = pulsarAdminService.getAuthHeader(serviceUrl);
+ authHeader.entrySet().forEach(entry -> ctx.addZuulRequestHeader(entry.getKey(), entry.getValue()));
ctx.setRouteHost(new URL(serviceUrl));
pulsarEvent.parsePulsarEvent(request.getRequestURI(), request);
log.info("Forward request to {} @ path {}",
@@ -140,5 +143,4 @@ public class EnvironmentForward extends ZuulFilter {
}
return null;
}
-
}
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index 9305e60..46e0462 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -76,6 +76,11 @@ pagehelper.helperDialect=mysql
backend.directRequestBroker=true
backend.directRequestHost=http://localhost:8080
backend.jwt.token=
+backend.broker.pulsarAdmin.authPlugin=
+backend.broker.pulsarAdmin.authParams=
+backend.broker.pulsarAdmin.tlsAllowInsecureConnection=false
+backend.broker.pulsarAdmin.tlsTrustCertsFilePath=
+backend.broker.pulsarAdmin.tlsEnableHostnameVerification=false
jwt.secret=dab1c8ba-b01b-11e9-b384-186590e06885
jwt.sessionTime=2592000
diff --git a/src/test/java/org/apache/pulsar/manager/dao/BrokerTokensRepositoryImplTest.java b/src/test/java/org/apache/pulsar/manager/dao/BrokerTokensRepositoryImplTest.java
index caedd68..2c73e1c 100644
--- a/src/test/java/org/apache/pulsar/manager/dao/BrokerTokensRepositoryImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/dao/BrokerTokensRepositoryImplTest.java
@@ -51,8 +51,8 @@ public class BrokerTokensRepositoryImplTest {
Page<BrokerTokenEntity> brokerTokenEntityPage = brokerTokensRepository.getBrokerTokensList(1, 1);
brokerTokenEntityPage.count(true);
brokerTokenEntityPage.getResult().forEach((result) -> {
- Assert.assertEquals(result.getRole(), brokerTokenEntity.getRole());
- Assert.assertEquals(result.getDescription(), brokerTokenEntity.getDescription());
+ Assert.assertEquals(brokerTokenEntity.getRole(), result.getRole());
+ Assert.assertEquals(brokerTokenEntity.getDescription(), result.getDescription());
});
brokerTokenEntity.setDescription("This role for update test");
@@ -60,8 +60,8 @@ public class BrokerTokensRepositoryImplTest {
brokerTokensRepository.update(brokerTokenEntity);
Optional<BrokerTokenEntity> optionalBrokerTokenEntity = brokerTokensRepository.findTokenByRole(brokerTokenEntity.getRole());
BrokerTokenEntity updatedBrokerTokenEntity = optionalBrokerTokenEntity.get();
- Assert.assertEquals(updatedBrokerTokenEntity.getRole(), brokerTokenEntity.getRole());
- Assert.assertEquals(updatedBrokerTokenEntity.getDescription(), brokerTokenEntity.getDescription());
+ Assert.assertEquals(brokerTokenEntity.getRole(), updatedBrokerTokenEntity.getRole());
+ Assert.assertEquals(brokerTokenEntity.getDescription(), updatedBrokerTokenEntity.getDescription());
brokerTokensRepository.remove(brokerTokenEntity.getRole());
Assert.assertFalse(brokerTokensRepository.findTokenByRole(brokerTokenEntity.getRole()).isPresent());
diff --git a/src/test/java/org/apache/pulsar/manager/dao/EnvironmentsRepositoryImplTest.java b/src/test/java/org/apache/pulsar/manager/dao/EnvironmentsRepositoryImplTest.java
index e2553cd..6eab859 100644
--- a/src/test/java/org/apache/pulsar/manager/dao/EnvironmentsRepositoryImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/dao/EnvironmentsRepositoryImplTest.java
@@ -50,8 +50,8 @@ public class EnvironmentsRepositoryImplTest {
Page<EnvironmentEntity> environmentEntityPage = environmentsRepository.getEnvironmentsList(1, 1);
environmentEntityPage.count(true);
environmentEntityPage.getResult().forEach((result) -> {
- Assert.assertEquals(result.getName(), "test-environment");
- Assert.assertEquals(result.getBroker(), "http://localhost:8080");
+ Assert.assertEquals("test-environment", result.getName());
+ Assert.assertEquals("http://localhost:8080", result.getBroker());
environmentsRepository.remove(result.getName());
});
}
@@ -65,16 +65,16 @@ public class EnvironmentsRepositoryImplTest {
Optional<EnvironmentEntity> environmentEntityOptionalGet = environmentsRepository
.findByBroker("https://localhost:8080");
EnvironmentEntity environmentEntityGet = environmentEntityOptionalGet.get();
- Assert.assertEquals(environmentEntityGet.getName(), "test-environment");
- Assert.assertEquals(environmentEntityGet.getBroker(), "https://localhost:8080");
+ Assert.assertEquals("test-environment", environmentEntityGet.getName());
+ Assert.assertEquals("https://localhost:8080", environmentEntityGet.getBroker());
environmentEntity.setBroker("https://localhost:8081");
environmentsRepository.update(environmentEntity);
Optional<EnvironmentEntity> environmentEntityOptionalUpdate = environmentsRepository
.findByName("test-environment");
EnvironmentEntity environmentEntityUpdate = environmentEntityOptionalUpdate.get();
- Assert.assertEquals(environmentEntityUpdate.getName(), "test-environment");
- Assert.assertEquals(environmentEntityUpdate.getBroker(), "https://localhost:8081");
+ Assert.assertEquals("test-environment", environmentEntityUpdate.getName());
+ Assert.assertEquals("https://localhost:8081", environmentEntityUpdate.getBroker());
environmentsRepository.remove(environmentEntityUpdate.getName());
}
diff --git a/src/test/java/org/apache/pulsar/manager/dao/NamespacesRepositoryImplTest.java b/src/test/java/org/apache/pulsar/manager/dao/NamespacesRepositoryImplTest.java
index c8e2efe..c8eb360 100644
--- a/src/test/java/org/apache/pulsar/manager/dao/NamespacesRepositoryImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/dao/NamespacesRepositoryImplTest.java
@@ -57,16 +57,16 @@ public class NamespacesRepositoryImplTest {
public void checkResult(Page<NamespaceEntity> namespacesEntityPage) {
long total = namespacesEntityPage.getTotal();
- Assert.assertEquals(total, 1);
+ Assert.assertEquals(1, total);
namespacesEntityPage.getResult().forEach((result) -> {
- Assert.assertEquals(result.getTenant(), "test-namespace-public");
- Assert.assertEquals(result.getNamespace(), "test-namespace-default");
+ Assert.assertEquals("test-namespace-public", result.getTenant());
+ Assert.assertEquals("test-namespace-default", result.getNamespace());
});
}
public void checkDeleteResult(Page<NamespaceEntity> namespacesEntityPage) {
long total = namespacesEntityPage.getTotal();
- Assert.assertEquals(total, 0);
+ Assert.assertEquals(0, total);
}
@Before
@@ -168,8 +168,8 @@ public class NamespacesRepositoryImplTest {
idList.add(namespaceId);
Optional<NamespaceEntity> namespacesEntityOptional = namespacesRepository.findByTenantNamespace(
namespacesEntity.getTenant(), namespacesEntity.getNamespace());
- Assert.assertEquals(namespacesEntityOptional.get().getTenant(), "test-namespace-public");
- Assert.assertEquals(namespacesEntityOptional.get().getNamespace(), "test-namespace-default");
+ Assert.assertEquals("test-namespace-public", namespacesEntityOptional.get().getTenant());
+ Assert.assertEquals("test-namespace-default", namespacesEntityOptional.get().getNamespace());
namespacesRepository.remove(namespacesEntity.getTenant(), namespacesEntity.getNamespace());
}
@@ -179,8 +179,8 @@ public class NamespacesRepositoryImplTest {
initNamespaceEntity(namespacesEntity);
long namespaceId = namespacesRepository.save(namespacesEntity);
Optional<NamespaceEntity> namespacesEntityOptional = namespacesRepository.findByNamespaceId(namespaceId);
- Assert.assertEquals(namespacesEntityOptional.get().getTenant(), "test-namespace-public");
- Assert.assertEquals(namespacesEntityOptional.get().getNamespace(), "test-namespace-default");
+ Assert.assertEquals("test-namespace-public", namespacesEntityOptional.get().getTenant());
+ Assert.assertEquals("test-namespace-default", namespacesEntityOptional.get().getNamespace());
namespacesRepository.remove(namespacesEntity.getTenant(), namespacesEntity.getNamespace());
}
diff --git a/src/test/java/org/apache/pulsar/manager/dao/RoleBindingRepositoryImplTest.java b/src/test/java/org/apache/pulsar/manager/dao/RoleBindingRepositoryImplTest.java
index 6a95a08..ed71bdf 100644
--- a/src/test/java/org/apache/pulsar/manager/dao/RoleBindingRepositoryImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/dao/RoleBindingRepositoryImplTest.java
@@ -52,10 +52,10 @@ public class RoleBindingRepositoryImplTest {
Page<RoleBindingEntity> roleBindingEntities = roleBindingRepository.findByUserId(
1, 2, 2);
roleBindingEntities.getResult().forEach((r) -> {
- Assert.assertEquals(r.getRoleBindingId(), 1);
- Assert.assertEquals(r.getName(), "test-role-binding");
- Assert.assertEquals(r.getRoleId(), 1);
- Assert.assertEquals(r.getDescription(), "this is description");
+ Assert.assertEquals(1, r.getRoleBindingId());
+ Assert.assertEquals("test-role-binding", r.getName());
+ Assert.assertEquals(1, r.getRoleId());
+ Assert.assertEquals("this is description", r.getDescription());
});
roleBindingEntity.setName("update-role-binding");
@@ -64,10 +64,10 @@ public class RoleBindingRepositoryImplTest {
Page<RoleBindingEntity> updateRoleBindingEntities = roleBindingRepository.findByUserId(
1, 2, 2);
updateRoleBindingEntities.getResult().forEach((r) -> {
- Assert.assertEquals(r.getRoleBindingId(), 1);
- Assert.assertEquals(r.getName(), "update-role-binding");
- Assert.assertEquals(r.getRoleId(), 1);
- Assert.assertEquals(r.getDescription(), "this is update description");
+ Assert.assertEquals(1, r.getRoleBindingId());
+ Assert.assertEquals("update-role-binding", r.getName());
+ Assert.assertEquals(1, r.getRoleId());
+ Assert.assertEquals("this is update description", r.getDescription());
});
}
diff --git a/src/test/java/org/apache/pulsar/manager/dao/RolesRepositoryImplTest.java b/src/test/java/org/apache/pulsar/manager/dao/RolesRepositoryImplTest.java
index 4688074..30148fa 100644
--- a/src/test/java/org/apache/pulsar/manager/dao/RolesRepositoryImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/dao/RolesRepositoryImplTest.java
@@ -55,14 +55,14 @@ public class RolesRepositoryImplTest {
}
private void validateRole(RoleInfoEntity role) {
- Assert.assertEquals(role.getRoleName(), "test-role-name");
- Assert.assertEquals(role.getRoleSource(), "test-tenant");
- Assert.assertEquals(role.getResourceType(), "tenants");
- Assert.assertEquals(role.getResourceName(), "tenants");
- Assert.assertEquals(role.getResourceVerbs(), "admin");
- Assert.assertEquals(role.getResourceId(), 2);
- Assert.assertEquals(role.getDescription(), "This is tenants permissions");
- Assert.assertEquals(role.getFlag(), 0);
+ Assert.assertEquals("test-role-name", role.getRoleName());
+ Assert.assertEquals("test-tenant", role.getRoleSource());
+ Assert.assertEquals("tenants", role.getResourceType());
+ Assert.assertEquals("tenants", role.getResourceName());
+ Assert.assertEquals("admin", role.getResourceVerbs());
+ Assert.assertEquals(2, role.getResourceId());
+ Assert.assertEquals("This is tenants permissions", role.getDescription());
+ Assert.assertEquals(0, role.getFlag());
}
@Test
@@ -115,10 +115,10 @@ public class RolesRepositoryImplTest {
Optional<RoleInfoEntity> updateRoleInfo = rolesRepository.findByRoleName(
roleInfoEntity.getRoleName(), roleInfoEntity.getRoleSource());
RoleInfoEntity updateRoleInfoEntity = updateRoleInfo.get();
- Assert.assertEquals(updateRoleInfoEntity.getResourceType(), "clusters");
- Assert.assertEquals(updateRoleInfoEntity.getResourceVerbs(), "admin,produce,consume");
- Assert.assertEquals(updateRoleInfoEntity.getDescription(), "This is update role");
- Assert.assertEquals(updateRoleInfoEntity.getFlag(), 1);
+ Assert.assertEquals("clusters", updateRoleInfoEntity.getResourceType());
+ Assert.assertEquals("admin,produce,consume", updateRoleInfoEntity.getResourceVerbs());
+ Assert.assertEquals("This is update role", updateRoleInfoEntity.getDescription());
+ Assert.assertEquals(1, updateRoleInfoEntity.getFlag());
rolesRepository.delete(roleInfoEntity.getRoleName(), roleInfoEntity.getRoleSource());
Optional<RoleInfoEntity> deleteRoleInfo = rolesRepository.findByRoleName(
diff --git a/src/test/java/org/apache/pulsar/manager/dao/TenantsRepositoryImplTest.java b/src/test/java/org/apache/pulsar/manager/dao/TenantsRepositoryImplTest.java
index d37d719..57c559b 100644
--- a/src/test/java/org/apache/pulsar/manager/dao/TenantsRepositoryImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/dao/TenantsRepositoryImplTest.java
@@ -56,13 +56,13 @@ public class TenantsRepositoryImplTest {
Page<TenantEntity> tenantsEntities = tenantsRepository.getTenantsList(1, 10);
tenantsEntities.count(true);
long total = tenantsEntities.getTotal();
- Assert.assertEquals(total, 10);
+ Assert.assertEquals(10, total);
List<TenantEntity> tenantsEntityList = tenantsEntities.getResult();
for (int i = 0; i < total; i ++) {
TenantEntity tenantEntity = tenantsEntityList.get(i);
Assert.assertEquals(tenantEntity.getTenant(), tenantEntity.getAdminRoles());
- Assert.assertEquals(tenantEntity.getAllowedClusters(), "test-cluster");
- Assert.assertEquals(tenantEntity.getEnvironmentName(), "test-environment");
+ Assert.assertEquals("test-cluster", tenantEntity.getAllowedClusters());
+ Assert.assertEquals("test-environment", tenantEntity.getEnvironmentName());
}
tenantsEntities.getResult().forEach((result) -> {
tenantsRepository.remove(result.getTenant());
@@ -89,8 +89,8 @@ public class TenantsRepositoryImplTest {
for (int i = 0; i < total; i ++) {
TenantEntity tenantEntity = tenantsEntityList.get(i);
Assert.assertEquals(tenantEntity.getTenant(), tenantEntity.getAdminRoles());
- Assert.assertEquals(tenantEntity.getAllowedClusters(), "test-cluster");
- Assert.assertEquals(tenantEntity.getEnvironmentName(), "test-environment");
+ Assert.assertEquals("test-cluster", tenantEntity.getAllowedClusters());
+ Assert.assertEquals("test-environment", tenantEntity.getEnvironmentName());
}
tenantEntityPage.getResult().forEach((result) -> {
tenantsRepository.remove(result.getTenant());
@@ -107,10 +107,10 @@ public class TenantsRepositoryImplTest {
tenantsRepository.save(tenantEntity);
Optional<TenantEntity> result = tenantsRepository.findByName("test");
TenantEntity getTenantEntity = result.get();
- Assert.assertEquals(getTenantEntity.getTenant(), "test");
- Assert.assertEquals(getTenantEntity.getAdminRoles(), "test-role");
- Assert.assertEquals(getTenantEntity.getAllowedClusters(), "test-cluster");
- Assert.assertEquals(getTenantEntity.getEnvironmentName(), "test-environment");
+ Assert.assertEquals("test", getTenantEntity.getTenant());
+ Assert.assertEquals("test-role", getTenantEntity.getAdminRoles());
+ Assert.assertEquals("test-cluster", getTenantEntity.getAllowedClusters());
+ Assert.assertEquals("test-environment", getTenantEntity.getEnvironmentName());
tenantsRepository.remove("test");
}
@@ -124,9 +124,9 @@ public class TenantsRepositoryImplTest {
long tenantId = tenantsRepository.save(tenantEntity);
Optional<TenantEntity> result = tenantsRepository.findByTenantId(tenantId);
TenantEntity getTenantEntity = result.get();
- Assert.assertEquals(getTenantEntity.getTenant(), "test");
- Assert.assertEquals(getTenantEntity.getAdminRoles(), "test-role");
- Assert.assertEquals(getTenantEntity.getAllowedClusters(), "test-cluster");
- Assert.assertEquals(getTenantEntity.getEnvironmentName(), "test-environment");
+ Assert.assertEquals("test", getTenantEntity.getTenant());
+ Assert.assertEquals("test-role", getTenantEntity.getAdminRoles());
+ Assert.assertEquals("test-cluster", getTenantEntity.getAllowedClusters());
+ Assert.assertEquals("test-environment", getTenantEntity.getEnvironmentName());
}
}
\ No newline at end of file
diff --git a/src/test/java/org/apache/pulsar/manager/dao/UsersRepositoryImplTest.java b/src/test/java/org/apache/pulsar/manager/dao/UsersRepositoryImplTest.java
index 3d4d311..8e8583c 100644
--- a/src/test/java/org/apache/pulsar/manager/dao/UsersRepositoryImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/dao/UsersRepositoryImplTest.java
@@ -54,15 +54,15 @@ public class UsersRepositoryImplTest {
}
private void validateUser(UserInfoEntity user, boolean list) {
- Assert.assertEquals(user.getName(), "test-user");
- Assert.assertEquals(user.getExpire(), 157900045678l);
- Assert.assertEquals(user.getPhoneNumber(), "1356789023456");
- Assert.assertEquals(user.getDescription(), "test-description");
- Assert.assertEquals(user.getLocation(), "bj");
- Assert.assertEquals(user.getEmail(), "test@apache.org");
- Assert.assertEquals(user.getAccessToken(), "test-access-token");
+ Assert.assertEquals("test-user", user.getName());
+ Assert.assertEquals(157900045678l, user.getExpire());
+ Assert.assertEquals("1356789023456", user.getPhoneNumber());
+ Assert.assertEquals("test-description", user.getDescription());
+ Assert.assertEquals("bj", user.getLocation());
+ Assert.assertEquals("test@apache.org", user.getEmail());
+ Assert.assertEquals("test-access-token", user.getAccessToken());
if (!list) {
- Assert.assertEquals(user.getPassword(), DigestUtils.sha256Hex("hello-world"));
+ Assert.assertEquals(DigestUtils.sha256Hex("hello-world"), user.getPassword());
}
}
@@ -95,8 +95,8 @@ public class UsersRepositoryImplTest {
userInfoEntityOptional = usersRepository.findByUserName(userInfoEntity.getName());
UserInfoEntity updateUserInfoEntity = userInfoEntityOptional.get();
- Assert.assertEquals(updateUserInfoEntity.getPhoneNumber(), "1356789023456");
- Assert.assertEquals(updateUserInfoEntity.getEmail(), "test2@apache.org");
+ Assert.assertEquals("1356789023456", updateUserInfoEntity.getPhoneNumber());
+ Assert.assertEquals("test2@apache.org", updateUserInfoEntity.getEmail());
usersRepository.delete(updateUserInfoEntity.getName());
userInfoEntityOptional = usersRepository.findByUserName(userInfoEntity.getName());
diff --git a/src/test/java/org/apache/pulsar/manager/service/BookiesServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/BookiesServiceImplTest.java
index 12902c8..8063ebb 100644
--- a/src/test/java/org/apache/pulsar/manager/service/BookiesServiceImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/BookiesServiceImplTest.java
@@ -71,8 +71,8 @@ public class BookiesServiceImplTest {
.thenReturn("{\"192.168.2.116:3181\" : \": {Free: 48920571904(48.92GB), Total: 250790436864(250.79GB)}," +
"\",\"ClusterInfo: \" : \"{Free: 48920571904(48.92GB), Total: 250790436864(250.79GB)}\" }");
Map<String, Object> result = bookiesService.getBookiesList(1, 1, "standalone");
- Assert.assertEquals(result.get("total"), 1);
- Assert.assertEquals(result.get("data").toString(), "[{storage=[48920571904, 250790436864], bookie=192.168.2.116:3181, status=rw}]");
- Assert.assertEquals(result.get("pageSize"), 1);
+ Assert.assertEquals(1, result.get("total"));
+ Assert.assertEquals("[{storage=[48920571904, 250790436864], bookie=192.168.2.116:3181, status=rw}]", result.get("data").toString());
+ Assert.assertEquals(1, result.get("pageSize"));
}
}
diff --git a/src/test/java/org/apache/pulsar/manager/service/BrokerStatsServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/BrokerStatsServiceImplTest.java
index 2049977..a15e312 100644
--- a/src/test/java/org/apache/pulsar/manager/service/BrokerStatsServiceImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/BrokerStatsServiceImplTest.java
@@ -15,6 +15,9 @@ package org.apache.pulsar.manager.service;
import com.github.pagehelper.Page;
import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.manager.PulsarManagerApplication;
import org.apache.pulsar.manager.entity.ConsumerStatsEntity;
import org.apache.pulsar.manager.entity.ConsumersStatsRepository;
@@ -23,34 +26,28 @@ import org.apache.pulsar.manager.entity.PublishersStatsRepository;
import org.apache.pulsar.manager.entity.ReplicationsStatsRepository;
import org.apache.pulsar.manager.entity.SubscriptionStatsEntity;
import org.apache.pulsar.manager.entity.SubscriptionsStatsRepository;
-import org.apache.pulsar.manager.utils.HttpUtil;
import org.apache.pulsar.manager.entity.ReplicationStatsEntity;
import org.apache.pulsar.manager.entity.TopicStatsEntity;
import org.apache.pulsar.manager.entity.TopicsStatsRepository;
import org.apache.pulsar.manager.profiles.HerdDBTestProfile;
-import org.apache.commons.lang3.StringUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import org.mockito.Mock;
+import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Map;
+import java.util.List;
import java.util.Optional;
-@RunWith(PowerMockRunner.class)
-@PowerMockRunnerDelegate(SpringRunner.class)
-@PowerMockIgnore( {"javax.*", "sun.*", "com.sun.*", "org.xml.*", "org.w3c.*"})
-@PrepareForTest(HttpUtil.class)
+@RunWith(SpringRunner.class)
@SpringBootTest(
classes = {
PulsarManagerApplication.class,
@@ -59,12 +56,17 @@ import java.util.Optional;
)
@ActiveProfiles("test")
public class BrokerStatsServiceImplTest {
+ @MockBean
+ private PulsarAdminService pulsarAdminService;
@Autowired
private BrokerStatsService brokerStatsService;
- @Value("${backend.jwt.token}")
- private static String pulsarJwtToken;
+ @MockBean
+ private BrokersService brokersService;
+
+ @Mock
+ private BrokerStats stats;
@Autowired
private TopicsStatsRepository topicsStatsRepository;
@@ -151,97 +153,92 @@ public class BrokerStatsServiceImplTest {
"}";
private void checkTopicStatsResult(TopicStatsEntity topicStatsEntity) {
- Assert.assertEquals(topicStatsEntity.getAverageMsgSize(), 0.0, 1);
- Assert.assertEquals(topicStatsEntity.getMsgRateIn(), 0.0, 1);
- Assert.assertEquals(topicStatsEntity.getMsgRateOut(), 0.0, 1);
- Assert.assertEquals(topicStatsEntity.getMsgThroughputIn(), 0.0, 1);
- Assert.assertEquals(topicStatsEntity.getMsgThroughputOut(), 0.0, 1);
- Assert.assertEquals(topicStatsEntity.getStorageSize(), 0, 0);
- Assert.assertEquals(topicStatsEntity.getCluster(), "standalone");
- Assert.assertEquals(topicStatsEntity.getBroker(), "localhost:8080");
- Assert.assertEquals(topicStatsEntity.getTenant(), "public");
- Assert.assertEquals(topicStatsEntity.getNamespace(), "functions");
- Assert.assertEquals(topicStatsEntity.getBundle(), "0x40000000_0x80000000");
- Assert.assertEquals(topicStatsEntity.getPersistent(), "persistent");
- Assert.assertEquals(topicStatsEntity.getTopic(), "metadata");
+ Assert.assertEquals(0.0, topicStatsEntity.getAverageMsgSize(), 1);
+ Assert.assertEquals(0.0, topicStatsEntity.getMsgRateIn(), 1);
+ Assert.assertEquals(0.0, topicStatsEntity.getMsgRateOut(), 1);
+ Assert.assertEquals(0.0, topicStatsEntity.getMsgThroughputIn(), 1);
+ Assert.assertEquals(0.0, topicStatsEntity.getMsgThroughputOut(), 1);
+ Assert.assertEquals(0, topicStatsEntity.getStorageSize(), 0);
+ Assert.assertEquals("standalone", topicStatsEntity.getCluster());
+ Assert.assertEquals("localhost:8080", topicStatsEntity.getBroker());
+ Assert.assertEquals("public", topicStatsEntity.getTenant());
+ Assert.assertEquals("functions", topicStatsEntity.getNamespace());
+ Assert.assertEquals("0x40000000_0x80000000", topicStatsEntity.getBundle());
+ Assert.assertEquals("persistent", topicStatsEntity.getPersistent());
+ Assert.assertEquals("metadata", topicStatsEntity.getTopic());
}
private void checkPublisherStatsResult(PublisherStatsEntity publisherStatsEntity) {
- Assert.assertEquals(publisherStatsEntity.getMsgRateIn(), 0.0, 1);
- Assert.assertEquals(publisherStatsEntity.getMsgThroughputIn(), 0.0, 1);
- Assert.assertEquals(publisherStatsEntity.getAverageMsgSize(), 0.0, 1);
- Assert.assertEquals(publisherStatsEntity.getAddress(), "/127.0.0.1:59668");
- Assert.assertEquals(publisherStatsEntity.getProducerId(), 1);
- Assert.assertEquals(publisherStatsEntity.getProducerName(), "standalone-1-1");
- Assert.assertEquals(publisherStatsEntity.getConnectedSince(), "2019-08-10T11:37:22.405+08:00");
- Assert.assertEquals(publisherStatsEntity.getClientVersion(), "2.5.0-SNAPSHOT");
- Assert.assertEquals(publisherStatsEntity.getMetadata(), "{}");
+ Assert.assertEquals(0.0, publisherStatsEntity.getMsgRateIn(), 1);
+ Assert.assertEquals(0.0, publisherStatsEntity.getMsgThroughputIn(), 1);
+ Assert.assertEquals(0.0, publisherStatsEntity.getAverageMsgSize(), 1);
+ Assert.assertEquals("/127.0.0.1:59668", publisherStatsEntity.getAddress());
+ Assert.assertEquals(1, publisherStatsEntity.getProducerId());
+ Assert.assertEquals("standalone-1-1", publisherStatsEntity.getProducerName());
+ Assert.assertEquals("2019-08-10T11:37:22.405+08:00", publisherStatsEntity.getConnectedSince());
+ Assert.assertEquals("2.5.0-SNAPSHOT", publisherStatsEntity.getClientVersion());
+ Assert.assertEquals("{}", publisherStatsEntity.getMetadata());
}
private void checkReplicationStatsResult(ReplicationStatsEntity replicationStatsEntity) {
- Assert.assertEquals(replicationStatsEntity.getCluster(), "test-replications");
- Assert.assertEquals(replicationStatsEntity.getMsgRateIn(), 123, 0);
- Assert.assertEquals(replicationStatsEntity.getMsgThroughputIn(), 456, 0);
- Assert.assertEquals(replicationStatsEntity.getMsgRateOut(), 456, 0);
- Assert.assertEquals(replicationStatsEntity.getMsgThroughputOut(), 789, 0);
- Assert.assertEquals(replicationStatsEntity.getMsgRateExpired(), 990, 0);
- Assert.assertEquals(replicationStatsEntity.getReplicationBacklog(), 100, 0);
+ Assert.assertEquals("test-replications", replicationStatsEntity.getCluster());
+ Assert.assertEquals(123, replicationStatsEntity.getMsgRateIn(), 0);
+ Assert.assertEquals(456, replicationStatsEntity.getMsgThroughputIn(), 0);
+ Assert.assertEquals(456, replicationStatsEntity.getMsgRateOut(), 0);
+ Assert.assertEquals(789, replicationStatsEntity.getMsgThroughputOut(), 0);
+ Assert.assertEquals(990, replicationStatsEntity.getMsgRateExpired(), 0);
+ Assert.assertEquals(100, replicationStatsEntity.getReplicationBacklog(), 0);
Assert.assertFalse(replicationStatsEntity.isConnected());
- Assert.assertEquals(replicationStatsEntity.getReplicationDelayInSeconds(), 890, 0);
- Assert.assertEquals(replicationStatsEntity.getInboundConnection(), "test");
- Assert.assertEquals(replicationStatsEntity.getInboundConnectedSince(), "test2");
- Assert.assertEquals(replicationStatsEntity.getOutboundConnection(), "test3");
- Assert.assertEquals(replicationStatsEntity.getOutboundConnectedSince(), "test4");
+ Assert.assertEquals(890, replicationStatsEntity.getReplicationDelayInSeconds(), 0);
+ Assert.assertEquals("test", replicationStatsEntity.getInboundConnection());
+ Assert.assertEquals("test2", replicationStatsEntity.getInboundConnectedSince());
+ Assert.assertEquals("test3", replicationStatsEntity.getOutboundConnection());
+ Assert.assertEquals("test4", replicationStatsEntity.getOutboundConnectedSince());
}
private void checkSubscriptionStatsResult(SubscriptionStatsEntity subscriptionStatsEntity) {
- Assert.assertEquals(subscriptionStatsEntity.getSubscription(), "reader-1ddabdb183");
- Assert.assertEquals(subscriptionStatsEntity.getMsgBacklog(), 0.0, 0);
- Assert.assertEquals(subscriptionStatsEntity.getMsgRateOut(), 0.0, 0);
- Assert.assertEquals(subscriptionStatsEntity.getMsgThroughputOut(), 0.0, 0);
- Assert.assertEquals(subscriptionStatsEntity.getMsgRateExpired(), 0.0, 0);
- Assert.assertEquals(subscriptionStatsEntity.getMsgRateRedeliver(), 0.0, 0);
- Assert.assertEquals(subscriptionStatsEntity.getNumberOfEntriesSinceFirstNotAckedMessage(), 1);
- Assert.assertEquals(subscriptionStatsEntity.getTotalNonContiguousDeletedMessagesRange(), 0);
- Assert.assertEquals(subscriptionStatsEntity.getSubscriptionType(), "Exclusive");
+ Assert.assertEquals("reader-1ddabdb183", subscriptionStatsEntity.getSubscription());
+ Assert.assertEquals(0.0, subscriptionStatsEntity.getMsgBacklog(), 0);
+ Assert.assertEquals(0.0, subscriptionStatsEntity.getMsgRateOut(), 0);
+ Assert.assertEquals(0.0, subscriptionStatsEntity.getMsgThroughputOut(), 0);
+ Assert.assertEquals(0.0, subscriptionStatsEntity.getMsgRateExpired(), 0);
+ Assert.assertEquals( 0.0, subscriptionStatsEntity.getMsgRateRedeliver(), 0);
+ Assert.assertEquals(1, subscriptionStatsEntity.getNumberOfEntriesSinceFirstNotAckedMessage());
+ Assert.assertEquals(0, subscriptionStatsEntity.getTotalNonContiguousDeletedMessagesRange());
+ Assert.assertEquals("Exclusive", subscriptionStatsEntity.getSubscriptionType());
}
private void checkConsumerStatsResult(ConsumerStatsEntity consumerStatsEntity) {
- Assert.assertEquals(consumerStatsEntity.getConsumer(), "543bd");
- Assert.assertEquals(consumerStatsEntity.getAddress(), "/127.0.0.1:59668");
- Assert.assertEquals(consumerStatsEntity.getConnectedSince(), "2019-08-10T11:37:24.306+08:00");
- Assert.assertEquals(consumerStatsEntity.getAvailablePermits(), 1000, 0);
- Assert.assertEquals(consumerStatsEntity.getMsgRateOut(), 0.0, 0);
- Assert.assertEquals(consumerStatsEntity.getMsgThroughputOut(), 0.0, 0);
- Assert.assertEquals(consumerStatsEntity.getMsgRateRedeliver(), 0.0, 0);
- Assert.assertEquals(consumerStatsEntity.getClientVersion(), "2.5.0-SNAPSHOT");
- Assert.assertEquals(consumerStatsEntity.getMetadata(), "{}");
+ Assert.assertEquals("543bd", consumerStatsEntity.getConsumer());
+ Assert.assertEquals("/127.0.0.1:59668", consumerStatsEntity.getAddress());
+ Assert.assertEquals("2019-08-10T11:37:24.306+08:00", consumerStatsEntity.getConnectedSince());
+ Assert.assertEquals(1000, consumerStatsEntity.getAvailablePermits(), 0);
+ Assert.assertEquals(0.0, consumerStatsEntity.getMsgRateOut(), 0);
+ Assert.assertEquals(0.0, consumerStatsEntity.getMsgThroughputOut(), 0);
+ Assert.assertEquals(0.0, consumerStatsEntity.getMsgRateRedeliver(), 0);
+ Assert.assertEquals("2.5.0-SNAPSHOT", consumerStatsEntity.getClientVersion());
+ Assert.assertEquals("{}", consumerStatsEntity.getMetadata());
}
@Test
- public void convertStatsToDbTest() {
- PowerMockito.mockStatic(HttpUtil.class);
- Map<String, String> header = Maps.newHashMap();
- if (StringUtils.isNotBlank(pulsarJwtToken)){
- header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
- }
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters", header))
- .thenReturn("[\"standalone\"]");
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters/standalone", header))
- .thenReturn("{\n" +
- "\"serviceUrl\" : \"http://tengdeMBP:8080\",\n" +
- "\"brokerServiceUrl\" : \"pulsar://tengdeMBP:6650\"\n" +
- "}");
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/brokers/standalone", header))
- .thenReturn("[\"localhost:8080\"]");
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/broker-stats/topics", header))
- .thenReturn(testData);
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters/standalone/failureDomains", header))
- .thenReturn("{}");
-
+ public void convertStatsToDbTest() throws Exception {
String environment = "staging";
String cluster = "standalone";
String serviceUrl = "http://localhost:8080";
+
+ Map<String, Object> brokersMap = new HashMap<>();
+ List<Map<String, Object>> brokersArray = new ArrayList<>();
+ Map<String, Object> brokerEntity = Maps.newHashMap();
+ brokerEntity.put("broker", "localhost:8080");
+ brokersArray.add(brokerEntity);
+ brokersMap.put("data", brokersArray);
+ Mockito.when(brokersService.getBrokersList(0,0, cluster, serviceUrl))
+ .thenReturn(brokersMap);
+ Mockito.when(pulsarAdminService.brokerStats(serviceUrl)).thenReturn(stats);
+ JsonObject data = new Gson().fromJson(testData, JsonObject.class);
+ Mockito.when(stats.getTopics())
+ .thenReturn(data);
+
brokerStatsService.collectStatsToDB(
System.currentTimeMillis() / 1000,
environment,
@@ -275,53 +272,49 @@ public class BrokerStatsServiceImplTest {
replicationStatsEntities.getResult().forEach((replication) -> {
checkReplicationStatsResult(replication);
});
- long unixTime = System.currentTimeMillis() / 1000L;
- brokerStatsService.clearStats(unixTime, 0);
try {
- Thread.sleep(2000);
+ Thread.sleep(1000);
} catch (Exception e) {
}
+ long unixTime = System.currentTimeMillis() / 1000L;
+ brokerStatsService.clearStats(unixTime, 0);
+
Optional<TopicStatsEntity> deleteTopicStatsEntity = topicsStatsRepository.findMaxTime();
Assert.assertFalse(deleteTopicStatsEntity.isPresent());
Page<SubscriptionStatsEntity> deleteSubscriptionStatsEntities = subscriptionsStatsRepository.findByTopicStatsId(
1, 1, topicStatsEntity1.getTopicStatsId(), topicStatsEntity1.getTime_stamp());
- Assert.assertEquals(deleteSubscriptionStatsEntities.getTotal(), 0);
+ Assert.assertEquals(0, deleteSubscriptionStatsEntities.getTotal());
Page<PublisherStatsEntity> deletePublisherStatsEntities = publishersStatsRepository.findByTopicStatsId(
1, 1, topicStatsEntity1.getTopicStatsId(), topicStatsEntity1.getTime_stamp());
- Assert.assertEquals(deletePublisherStatsEntities.getTotal(), 0);
+ Assert.assertEquals(0, deletePublisherStatsEntities.getTotal());
Page<ReplicationStatsEntity> deleteReplicationStatsEntities = replicationsStatsRepository.findByTopicStatsId(
1, 1, topicStatsEntity1.getTopicStatsId(), topicStatsEntity1.getTime_stamp());
- Assert.assertEquals(deleteReplicationStatsEntities.getTotal(), 0);
+ Assert.assertEquals(0, deleteReplicationStatsEntities.getTotal());
}
@Test
- public void findByMultiTenantOrMultiNamespace() {
- PowerMockito.mockStatic(HttpUtil.class);
- Map<String, String> header = Maps.newHashMap();
- if (StringUtils.isNotBlank(pulsarJwtToken)){
- header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
- }
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters", header))
- .thenReturn("[\"standalone\"]");
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters/standalone", header))
- .thenReturn("{\n" +
- "\"serviceUrl\" : \"http://tengdeMBP:8080\",\n" +
- "\"brokerServiceUrl\" : \"pulsar://tengdeMBP:6650\"\n" +
- "}");
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/brokers/standalone", header))
- .thenReturn("[\"localhost:8080\"]");
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/broker-stats/topics", header))
- .thenReturn(testData);
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters/standalone/failureDomains", header))
- .thenReturn("{}");
-
+ public void findByMultiTenantOrMultiNamespace() throws Exception {
String environment = "staging";
String cluster = "standalone";
String serviceUrl = "http://localhost:8080";
+
+ Map<String, Object> brokersMap = new HashMap<>();
+ List<Map<String, Object>> brokersArray = new ArrayList<>();
+ Map<String, Object> brokerEntity = Maps.newHashMap();
+ brokerEntity.put("broker", "localhost:8080");
+ brokersArray.add(brokerEntity);
+ brokersMap.put("data", brokersArray);
+ Mockito.when(brokersService.getBrokersList(0,0, cluster, serviceUrl))
+ .thenReturn(brokersMap);
+ Mockito.when(pulsarAdminService.brokerStats(serviceUrl)).thenReturn(stats);
+ JsonObject data = new Gson().fromJson(testData, JsonObject.class);
+ Mockito.when(stats.getTopics())
+ .thenReturn(data);
+
brokerStatsService.collectStatsToDB(
System.currentTimeMillis() / 1000,
environment,
@@ -340,12 +333,12 @@ public class BrokerStatsServiceImplTest {
tenantAllCountPage.getResult().forEach((result) -> {
- Assert.assertEquals(result.getAverageMsgSize(), 0.0, 1);
- Assert.assertEquals(result.getMsgRateIn(), 0.0, 1);
- Assert.assertEquals(result.getMsgRateOut(), 0.0, 1);
- Assert.assertEquals(result.getMsgThroughputIn(), 0.0, 1);
- Assert.assertEquals(result.getMsgThroughputOut(), 0.0, 1);
- Assert.assertEquals(result.getStorageSize(), 0, 0);
+ Assert.assertEquals(0.0, result.getAverageMsgSize(), 1);
+ Assert.assertEquals(0.0, result.getMsgRateIn(), 1);
+ Assert.assertEquals(0.0, result.getMsgRateOut(), 1);
+ Assert.assertEquals(0.0, result.getMsgThroughputIn(), 1);
+ Assert.assertEquals(0.0, result.getMsgThroughputOut(), 1);
+ Assert.assertEquals(0, result.getStorageSize(), 0);
});
ArrayList<String> namespaceList = new ArrayList<>();
@@ -360,12 +353,12 @@ public class BrokerStatsServiceImplTest {
namespaceAllCountPage.getResult().forEach((result) -> {
- Assert.assertEquals(result.getAverageMsgSize(), 0.0, 1);
- Assert.assertEquals(result.getMsgRateIn(), 0.0, 1);
- Assert.assertEquals(result.getMsgRateOut(), 0.0, 1);
- Assert.assertEquals(result.getMsgThroughputIn(), 0.0, 1);
- Assert.assertEquals(result.getMsgThroughputOut(), 0.0, 1);
- Assert.assertEquals(result.getStorageSize(), 0, 0);
+ Assert.assertEquals(0.0, result.getAverageMsgSize(), 1);
+ Assert.assertEquals(0.0, result.getMsgRateIn(), 1);
+ Assert.assertEquals(0.0, result.getMsgRateOut(), 1);
+ Assert.assertEquals(0.0, result.getMsgThroughputIn(), 1);
+ Assert.assertEquals(0.0, result.getMsgThroughputOut(), 1);
+ Assert.assertEquals(0, result.getStorageSize(), 0);
});
long unixTime = System.currentTimeMillis() / 1000L;
diff --git a/src/test/java/org/apache/pulsar/manager/service/BrokerTokensServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/BrokerTokensServiceImplTest.java
index 06ec665..618dee7 100644
--- a/src/test/java/org/apache/pulsar/manager/service/BrokerTokensServiceImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/BrokerTokensServiceImplTest.java
@@ -54,6 +54,6 @@ public class BrokerTokensServiceImplTest {
String role = "test";
String token = jwtService.createBrokerToken(role, null);
Claims jwtBody = jwtService.validateBrokerToken(token);
- Assert.assertEquals(jwtBody.getSubject(), role);
+ Assert.assertEquals(role, jwtBody.getSubject());
}
}
diff --git a/src/test/java/org/apache/pulsar/manager/service/BrokersServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/BrokersServiceImplTest.java
index eea39cb..6139fc3 100644
--- a/src/test/java/org/apache/pulsar/manager/service/BrokersServiceImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/BrokersServiceImplTest.java
@@ -13,32 +13,29 @@
*/
package org.apache.pulsar.manager.service;
-import com.google.common.collect.Maps;
+import org.apache.pulsar.client.admin.Brokers;
+import org.apache.pulsar.client.admin.Clusters;
+import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.manager.PulsarManagerApplication;
-import org.apache.pulsar.manager.entity.EnvironmentEntity;
import org.apache.pulsar.manager.profiles.HerdDBTestProfile;
-import org.apache.pulsar.manager.utils.HttpUtil;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
-import org.apache.commons.lang3.StringUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import org.mockito.Mock;
+import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
-@RunWith(PowerMockRunner.class)
-@PowerMockRunnerDelegate(SpringRunner.class)
-@PowerMockIgnore( {"javax.*", "sun.*", "com.sun.*", "org.xml.*", "org.w3c.*"})
-@PrepareForTest(HttpUtil.class)
+@RunWith(SpringRunner.class)
@SpringBootTest(
classes = {
PulsarManagerApplication.class,
@@ -48,32 +45,36 @@ import org.springframework.test.context.junit4.SpringRunner;
@ActiveProfiles("test")
public class BrokersServiceImplTest {
- @Value("${backend.jwt.token}")
- private static String pulsarJwtToken;
+ @MockBean
+ private PulsarAdminService pulsarAdminService;
@Autowired
private BrokersService brokersService;
+ @Mock
+ private Clusters clusters;
+
+ @Mock
+ private Brokers brokers;
+
@Test
public void brokersServiceTest() throws Exception{
- PowerMockito.mockStatic(HttpUtil.class);
- Map<String, String> header = Maps.newHashMap();
- if (StringUtils.isNotBlank(pulsarJwtToken)) {
- header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
- }
+ FailureDomain fdomain = new FailureDomain();
+ fdomain.setBrokers(new HashSet<String>(Arrays.asList("broker-1:8080")));
+ Map<String, FailureDomain> fMap = new HashMap<>();
+ fMap.put("fdomain-1",fdomain);
+ Mockito.when(pulsarAdminService.clusters("http://localhost:8080")).thenReturn(clusters);
+ Mockito.when(clusters.getFailureDomains("standalone"))
+ .thenReturn(fMap);
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters/standalone/failureDomains", header))
- .thenReturn("{\"test\":{\"brokers\":[\"tengdeMBP:8080\"]}}");
+ Mockito.when(pulsarAdminService.brokers("http://localhost:8080")).thenReturn(brokers);
+ Mockito.when(brokers.getActiveBrokers("standalone"))
+ .thenReturn(Arrays.asList("broker-1:8080"));
- EnvironmentEntity environmentEntity = new EnvironmentEntity();
- environmentEntity.setName("test-environment");
- environmentEntity.setBroker("http://localhost:8080");
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/brokers/standalone", header))
- .thenReturn("[\"tengdeMBP:8080\"]");
Map<String, Object> result = brokersService.getBrokersList(
1, 1, "standalone", "http://localhost:8080");
- Assert.assertEquals(result.get("total"), 1);
- Assert.assertEquals(result.get("data").toString(), "[{failureDomain=[test], broker=tengdeMBP:8080}]");
- Assert.assertEquals(result.get("pageSize"), 1);
+ Assert.assertEquals(1, result.get("total"));
+ Assert.assertEquals("[{failureDomain=[fdomain-1], broker=broker-1:8080}]", result.get("data").toString());
+ Assert.assertEquals(1, result.get("pageSize"));
}
}
diff --git a/src/test/java/org/apache/pulsar/manager/service/ClustersServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/ClustersServiceImplTest.java
index 00be004..97ef0e3 100644
--- a/src/test/java/org/apache/pulsar/manager/service/ClustersServiceImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/ClustersServiceImplTest.java
@@ -14,31 +14,30 @@
package org.apache.pulsar.manager.service;
import com.google.common.collect.Maps;
+
+import org.apache.pulsar.client.admin.Clusters;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.manager.PulsarManagerApplication;
import org.apache.pulsar.manager.profiles.HerdDBTestProfile;
-import org.apache.pulsar.manager.utils.HttpUtil;
-import org.apache.commons.lang3.StringUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import org.mockito.Mock;
+import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-@RunWith(PowerMockRunner.class)
-@PowerMockRunnerDelegate(SpringRunner.class)
-@PowerMockIgnore( {"javax.*", "sun.*", "com.sun.*", "org.xml.*", "org.w3c.*"})
-@PrepareForTest(HttpUtil.class)
+@RunWith(SpringRunner.class)
@SpringBootTest(
classes = {
PulsarManagerApplication.class,
@@ -51,47 +50,48 @@ public class ClustersServiceImplTest {
@Autowired
private ClustersService clustersService;
- @Value("${backend.jwt.token}")
- private static String pulsarJwtToken;
+ @MockBean
+ private BrokersService brokersService;
+
+ @MockBean
+ private PulsarAdminService pulsarAdminService;
+
+ @Mock
+ private Clusters clusters;
@Test
- public void clusterServiceImplTest() {
- PowerMockito.mockStatic(HttpUtil.class);
- Map<String, String> header = Maps.newHashMap();
- if (StringUtils.isNotBlank(pulsarJwtToken)) {
- header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
- }
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters", header))
- .thenReturn("[\"standalone\"]");
+ public void clusterServiceImplTest() throws PulsarAdminException {
+ Mockito.when(pulsarAdminService.clusters("http://localhost:8080")).thenReturn(clusters);
+ Mockito.when(pulsarAdminService.clusters("http://localhost:8080").getClusters()).thenReturn(Arrays.asList("standalone"));
+ ClusterData standaloneClusterData = new ClusterData("http://broker-1:8080", null, "pulsar://broker-1:6650", null);
+ Mockito.when(pulsarAdminService.clusters("http://localhost:8080").getCluster("standalone")).thenReturn(standaloneClusterData);
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters/standalone", header))
- .thenReturn("{\n" +
- "\"serviceUrl\" : \"http://tengdeMBP:8080\",\n" +
- "\"brokerServiceUrl\" : \"pulsar://tengdeMBP:6650\"\n" +
- "}");
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters/standalone/failureDomains", header))
- .thenReturn("{\"test\":{\"brokers\":[\"tengdeMBP:8080\"]}}");
+ Map<String, Object> brokerEntity = Maps.newHashMap();
+ brokerEntity.put("broker", "broker-1:8080");
+ brokerEntity.put("failureDomain", null);
+ List<Map<String, Object>> brokersArray = new ArrayList<>();
+ brokersArray.add(brokerEntity);
+ Map<String, Object> brokersMap = new HashMap<>();
+ brokersMap.put("isPage", false);
+ brokersMap.put("total", brokersArray.size());
+ brokersMap.put("data", brokersArray);
+ brokersMap.put("pageNum", 1);
+ brokersMap.put("pageSize", brokersArray.size());
+ Mockito.when(brokersService.getBrokersList(1, 1, "standalone", "http://localhost:8080")).thenReturn(brokersMap);
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/brokers/standalone", header))
- .thenReturn("[\"tengdeMBP:8080\"]");
Map<String, Object> result = clustersService.getClustersList(1, 1, "http://localhost:8080", null);
- Assert.assertEquals(result.get("data").toString(),
- "[{cluster=standalone, serviceUrlTls=null, brokers=1, serviceUrl=http://tengdeMBP:8080, " +
- "brokerServiceUrlTls=null, brokerServiceUrl=pulsar://tengdeMBP:6650}]");
- Assert.assertEquals(result.get("total"), 1);
- Assert.assertEquals(result.get("pageSize"), 1);
+ Assert.assertEquals("[{cluster=standalone, serviceUrlTls=null, brokers=1, serviceUrl=http://broker-1:8080, " +
+ "brokerServiceUrlTls=null, brokerServiceUrl=pulsar://broker-1:6650}]", result.get("data").toString());
+ Assert.assertEquals(1, result.get("total"));
+ Assert.assertEquals(1, result.get("pageSize"));
}
@Test
- public void getClusterByAnyBroker() {
- PowerMockito.mockStatic(HttpUtil.class);
- Map<String, String> header = Maps.newHashMap();
- if (StringUtils.isNotBlank(pulsarJwtToken)) {
- header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
- }
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters", header))
- .thenReturn("[\"standalone\"]");
+ public void getClusterByAnyBroker() throws PulsarAdminException {
+ Mockito.when(pulsarAdminService.clusters("http://localhost:8080")).thenReturn(clusters);
+ Mockito.when(pulsarAdminService.clusters("http://localhost:8080").getClusters()).thenReturn(Arrays.asList("standalone"));
+
List<String> clusterList = clustersService.getClusterByAnyBroker("http://localhost:8080");
- Assert.assertEquals(clusterList.get(0), "standalone");
+ Assert.assertEquals("standalone", clusterList.get(0));
}
}
diff --git a/src/test/java/org/apache/pulsar/manager/service/EnvironmentCacheServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/EnvironmentCacheServiceImplTest.java
index 95bef54..c67bf0c 100644
--- a/src/test/java/org/apache/pulsar/manager/service/EnvironmentCacheServiceImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/EnvironmentCacheServiceImplTest.java
@@ -15,38 +15,34 @@ package org.apache.pulsar.manager.service;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyMap;
-import static org.mockito.Matchers.eq;
-import com.google.gson.Gson;
+import org.apache.pulsar.client.admin.Clusters;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.manager.PulsarManagerApplication;
import org.apache.pulsar.manager.entity.EnvironmentEntity;
import org.apache.pulsar.manager.entity.EnvironmentsRepository;
import org.apache.pulsar.manager.profiles.HerdDBTestProfile;
-import org.apache.pulsar.manager.utils.HttpUtil;
+
+import java.util.Arrays;
import java.util.NoSuchElementException;
+
import org.apache.pulsar.common.policies.data.ClusterData;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import org.mockito.Mock;
+import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
/**
* Unit test {@link EnvironmentCacheService}.
*/
-@RunWith(PowerMockRunner.class)
-@PowerMockRunnerDelegate(SpringRunner.class)
-@PowerMockIgnore( {"javax.*", "sun.*", "com.sun.*", "org.xml.*", "org.w3c.*"})
-@PrepareForTest(HttpUtil.class)
+@RunWith(SpringRunner.class)
@SpringBootTest(
classes = {
PulsarManagerApplication.class,
@@ -57,10 +53,22 @@ import org.springframework.test.context.junit4.SpringRunner;
public class EnvironmentCacheServiceImplTest {
@Autowired
- private EnvironmentsRepository environmentsRepository;
+ private EnvironmentCacheService environmentCacheService;
@Autowired
- private EnvironmentCacheService environmentCache;
+ private EnvironmentsRepository environmentsRepository;
+
+ @MockBean
+ private PulsarAdminService pulsarAdminService;
+
+ @Mock
+ private Clusters emptyClusters;
+
+ @Mock
+ private Clusters cluster1Clusters;
+
+ @Mock
+ private Clusters cluster2Clusters;
private EnvironmentEntity environment1;
private EnvironmentEntity environment2;
@@ -73,7 +81,7 @@ public class EnvironmentCacheServiceImplTest {
private ClusterData cluster2_1;
@Before
- public void setup() {
+ public void setup() throws PulsarAdminException {
// setup 3 environments
environment1 = new EnvironmentEntity();
environment1.setBroker("http://cluster1_0:8080");
@@ -95,36 +103,17 @@ public class EnvironmentCacheServiceImplTest {
cluster2_1 = new ClusterData();
cluster2_1.setServiceUrl("http://cluster2_1:8080");
- PowerMockito.mockStatic(HttpUtil.class);
- // empty environment
- PowerMockito.when(HttpUtil.doGet(
- eq(emptyEnvironment.getBroker() + "/admin/v2/clusters"),
- anyMap()
- )).thenReturn("[]");
-
- // environment 1
- PowerMockito.when(HttpUtil.doGet(
- eq(cluster1_0.getServiceUrl() + "/admin/v2/clusters"),
- anyMap()
- )).thenReturn("[\""+ cluster1_0_name + "\"]");
- PowerMockito.when(HttpUtil.doGet(
- eq(cluster1_0.getServiceUrl() + "/admin/v2/clusters/" + cluster1_0_name),
- anyMap()
- )).thenReturn(new Gson().toJson(cluster1_0));
-
- // environment 2
- PowerMockito.when(HttpUtil.doGet(
- eq(cluster2_0.getServiceUrl() + "/admin/v2/clusters"),
- anyMap()
- )).thenReturn("[\""+ cluster2_0_name + "\", \"" + cluster2_1_name + "\"]");
- PowerMockito.when(HttpUtil.doGet(
- eq(cluster2_0.getServiceUrl() + "/admin/v2/clusters/" + cluster2_0_name),
- anyMap()
- )).thenReturn(new Gson().toJson(cluster2_0));
- PowerMockito.when(HttpUtil.doGet(
- eq(cluster2_0.getServiceUrl() + "/admin/v2/clusters/" + cluster2_1_name),
- anyMap()
- )).thenReturn(new Gson().toJson(cluster2_1));
+ Mockito.when(pulsarAdminService.clusters(emptyEnvironment.getBroker())).thenReturn(emptyClusters);
+ Mockito.when(emptyClusters.getClusters()).thenReturn(Arrays.asList());
+
+ Mockito.when(pulsarAdminService.clusters(cluster1_0.getServiceUrl())).thenReturn(cluster1Clusters);
+ Mockito.when(cluster1Clusters.getClusters()).thenReturn(Arrays.asList(cluster1_0_name));
+ Mockito.when(cluster1Clusters.getCluster(cluster1_0_name)).thenReturn(cluster1_0);
+
+ Mockito.when(pulsarAdminService.clusters(cluster2_0.getServiceUrl())).thenReturn(cluster2Clusters);
+ Mockito.when(cluster2Clusters.getClusters()).thenReturn(Arrays.asList(cluster2_0_name, cluster2_1_name));
+ Mockito.when(cluster2Clusters.getCluster(cluster2_0_name)).thenReturn(cluster2_0);
+ Mockito.when(cluster2Clusters.getCluster(cluster2_1_name)).thenReturn(cluster2_1);
}
@After
@@ -136,10 +125,10 @@ public class EnvironmentCacheServiceImplTest {
@Test
public void testEmptyEnvironments() {
- environmentCache.reloadEnvironments();
+ environmentCacheService.reloadEnvironments();
try {
- environmentCache.getServiceUrl(environment1.getName(), null);
+ environmentCacheService.getServiceUrl(environment1.getName(), null);
fail("Should fail to get service url if environments is empty");
} catch (NoSuchElementException e) {
// expected
@@ -147,11 +136,14 @@ public class EnvironmentCacheServiceImplTest {
}
@Test
- public void testEmptyEnvironment() {
+ public void testEmptyEnvironment() throws PulsarAdminException {
environmentsRepository.save(emptyEnvironment);
+ PulsarAdminException pulsarAdminException = new PulsarAdminException("Cluster does not exist");
+ Mockito.when(emptyClusters.getCluster(cluster1_0_name)).thenThrow(pulsarAdminException);
+ environmentCacheService.reloadEnvironments();
try {
- environmentCache.getServiceUrl(emptyEnvironment.getName(), cluster1_0_name);
+ environmentCacheService.getServiceUrl(emptyEnvironment.getName(), cluster1_0_name);
fail("Should fail to get service url if environments is empty");
} catch (RuntimeException e) {
// expected
@@ -171,29 +163,29 @@ public class EnvironmentCacheServiceImplTest {
// without cluster
assertEquals(cluster1_0.getServiceUrl(),
- environmentCache.getServiceUrl(environment1.getName(), null));
+ environmentCacheService.getServiceUrl(environment1.getName(), null));
assertEquals(cluster2_0.getServiceUrl(),
- environmentCache.getServiceUrl(environment2.getName(), null));
+ environmentCacheService.getServiceUrl(environment2.getName(), null));
// with cluster
assertEquals(cluster1_0.getServiceUrl(),
- environmentCache.getServiceUrl(environment1.getName(), cluster1_0_name));
+ environmentCacheService.getServiceUrl(environment1.getName(), cluster1_0_name));
assertEquals(cluster2_0.getServiceUrl(),
- environmentCache.getServiceUrl(environment2.getName(), cluster2_0_name));
+ environmentCacheService.getServiceUrl(environment2.getName(), cluster2_0_name));
assertEquals(cluster2_1.getServiceUrl(),
- environmentCache.getServiceUrl(environment2.getName(), cluster2_1_name));
+ environmentCacheService.getServiceUrl(environment2.getName(), cluster2_1_name));
}
@Test
public void testReloadEnvironmentsAddNewEnvironmentsAndRemoveOldEnvironments() {
environmentsRepository.save(environment1);
+ environmentCacheService.reloadEnvironments();
- environmentCache.reloadEnvironments();
assertEquals(cluster1_0.getServiceUrl(),
- environmentCache.getServiceUrl(environment1.getName(), null));
+ environmentCacheService.getServiceUrl(environment1.getName(), null));
try {
- environmentCache.getServiceUrl(environment2.getName(), null);
+ environmentCacheService.getServiceUrl(environment2.getName(), null);
fail("Should fail to get service url if environments is empty");
} catch (NoSuchElementException e) {
// expected
@@ -201,12 +193,12 @@ public class EnvironmentCacheServiceImplTest {
environmentsRepository.save(environment2);
environmentsRepository.remove(environment1.getName());
- environmentCache.reloadEnvironments();
+ environmentCacheService.reloadEnvironments();
assertEquals(cluster2_0.getServiceUrl(),
- environmentCache.getServiceUrl(environment2.getName(), null));
+ environmentCacheService.getServiceUrl(environment2.getName(), null));
try {
- environmentCache.getServiceUrl(environment1.getName(), null);
+ environmentCacheService.getServiceUrl(environment1.getName(), null);
fail("Should fail to get service url if environments is empty");
} catch (NoSuchElementException e) {
// expected
@@ -214,29 +206,24 @@ public class EnvironmentCacheServiceImplTest {
}
@Test
- public void testReloadEnvironmentsAddNewClusterAndRemoveOldCluster() {
+ public void testReloadEnvironmentsAddNewClusterAndRemoveOldCluster() throws PulsarAdminException {
environmentsRepository.save(environment2);
- environmentCache.reloadEnvironments();
+ environmentCacheService.reloadEnvironments();
assertEquals(cluster2_0.getServiceUrl(),
- environmentCache.getServiceUrl(environment2.getName(), cluster2_0_name));
+ environmentCacheService.getServiceUrl(environment2.getName(), cluster2_0_name));
assertEquals(cluster2_1.getServiceUrl(),
- environmentCache.getServiceUrl(environment2.getName(), cluster2_1_name));
-
- PowerMockito.when(HttpUtil.doGet(
- eq(cluster2_0.getServiceUrl() + "/admin/v2/clusters"),
- anyMap()
- )).thenReturn("[\""+ cluster2_0_name + "\"]");
- PowerMockito.when(HttpUtil.doGet(
- eq(cluster2_0.getServiceUrl() + "/admin/v2/clusters/" + cluster2_1_name),
- anyMap()
- )).thenReturn(null);
-
- environmentCache.reloadEnvironments();
+ environmentCacheService.getServiceUrl(environment2.getName(), cluster2_1_name));
+
+ Mockito.when(cluster2Clusters.getClusters()).thenReturn(Arrays.asList(cluster2_0_name));
+ PulsarAdminException pulsarAdminException = new PulsarAdminException("Cluster does not exist");
+ Mockito.when(cluster2Clusters.getCluster(cluster2_1_name)).thenThrow(pulsarAdminException);
+
+ environmentCacheService.reloadEnvironments();
assertEquals(cluster2_0.getServiceUrl(),
- environmentCache.getServiceUrl(environment2.getName(), cluster2_0_name));
+ environmentCacheService.getServiceUrl(environment2.getName(), cluster2_0_name));
try {
assertEquals(cluster2_1.getServiceUrl(),
- environmentCache.getServiceUrl(environment2.getName(), cluster2_1_name));
+ environmentCacheService.getServiceUrl(environment2.getName(), cluster2_1_name));
fail("Should fail to get service url if cluster is not found");
} catch (RuntimeException e) {
// expected
diff --git a/src/test/java/org/apache/pulsar/manager/service/GithubLoginServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/GithubLoginServiceImplTest.java
index 4dadd54..2534b68 100644
--- a/src/test/java/org/apache/pulsar/manager/service/GithubLoginServiceImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/GithubLoginServiceImplTest.java
@@ -90,7 +90,7 @@ public class GithubLoginServiceImplTest {
"\"token_type\": \"bearer\"" +
"}");
String withCodeResult = thirdPartyLoginService.getAuthToken(parameters);
- Assert.assertEquals(withCodeResult, "e72e16c7e42f292c6912e7710c838347ae178b4a");
+ Assert.assertEquals("e72e16c7e42f292c6912e7710c838347ae178b4a", withCodeResult);
}
@Test
@@ -99,7 +99,7 @@ public class GithubLoginServiceImplTest {
Map<String, String> authenticationMap = Maps.newHashMap();
UserInfoEntity noTokenUserInfoEntity = thirdPartyLoginService.getUserInfo(authenticationMap);
- Assert.assertEquals(noTokenUserInfoEntity, null);
+ Assert.assertEquals(null, noTokenUserInfoEntity);
authenticationMap.put("access_token", "test-user-token");
PowerMockito.mockStatic(HttpUtil.class);
@@ -119,11 +119,11 @@ public class GithubLoginServiceImplTest {
"\t\"bio\": \"this is description\"" +
"}");
UserInfoEntity withTokenUserInfoEntity = thirdPartyLoginService.getUserInfo(authenticationMap);
- Assert.assertEquals(withTokenUserInfoEntity.getEmail(), "test@apache.org");
- Assert.assertEquals(withTokenUserInfoEntity.getName(), "test1");
- Assert.assertEquals(withTokenUserInfoEntity.getCompany(), "bj");
- Assert.assertEquals(withTokenUserInfoEntity.getDescription(), "this is description");
- Assert.assertEquals(withTokenUserInfoEntity.getLocation(), "nw");
- Assert.assertEquals(withTokenUserInfoEntity.getAccessToken(), "test-user-token");
+ Assert.assertEquals("test@apache.org", withTokenUserInfoEntity.getEmail());
+ Assert.assertEquals("test1", withTokenUserInfoEntity.getName());
+ Assert.assertEquals("bj", withTokenUserInfoEntity.getCompany());
+ Assert.assertEquals("this is description", withTokenUserInfoEntity.getDescription());
+ Assert.assertEquals("nw", withTokenUserInfoEntity.getLocation());
+ Assert.assertEquals("test-user-token", withTokenUserInfoEntity.getAccessToken());
}
}
diff --git a/src/test/java/org/apache/pulsar/manager/service/NamespacesServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/NamespacesServiceImplTest.java
index c5684bc..bcd1f73 100644
--- a/src/test/java/org/apache/pulsar/manager/service/NamespacesServiceImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/NamespacesServiceImplTest.java
@@ -14,30 +14,28 @@
package org.apache.pulsar.manager.service;
import com.google.common.collect.Maps;
+import org.apache.pulsar.client.admin.BrokerStats;
+import org.apache.pulsar.client.admin.Namespaces;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.manager.PulsarManagerApplication;
+import org.apache.pulsar.manager.entity.TopicStatsEntity;
+import org.apache.pulsar.manager.entity.TopicsStatsRepository;
import org.apache.pulsar.manager.profiles.HerdDBTestProfile;
-import org.apache.pulsar.manager.utils.HttpUtil;
-import org.apache.commons.lang3.StringUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import org.mockito.Mock;
+import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
+import java.util.Arrays;
import java.util.Map;
-@RunWith(PowerMockRunner.class)
-@PowerMockRunnerDelegate(SpringRunner.class)
-@PowerMockIgnore( {"javax.*", "sun.*", "com.sun.*", "org.xml.*", "org.w3c.*"})
-@PrepareForTest(HttpUtil.class)
+@RunWith(SpringRunner.class)
@SpringBootTest(
classes = {
PulsarManagerApplication.class,
@@ -50,68 +48,59 @@ public class NamespacesServiceImplTest {
@Autowired
private NamespacesService namespacesService;
- @Autowired
- private BrokerStatsService brokerStatsService;
+ @MockBean
+ private PulsarAdminService pulsarAdminService;
+
+ @MockBean
+ private TopicsService topicsService;
- @Value("${backend.jwt.token}")
- private static String pulsarJwtToken;
+ @Mock
+ private Namespaces namespaces;
+
+ @Autowired
+ private TopicsStatsRepository topicsStatsRepository;
@Test
- public void namespaceServiceImplTest() {
- PowerMockito.mockStatic(HttpUtil.class);
- Map<String, String> header = Maps.newHashMap();
- if (StringUtils.isNotBlank(pulsarJwtToken)) {
- header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
- }
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/namespaces/public", header))
- .thenReturn("[\"public/default\"]");
+ public void namespaceServiceImplTest() throws PulsarAdminException {
+ Mockito.when(pulsarAdminService.namespaces("http://localhost:8080")).thenReturn(namespaces);
+ Mockito.when(namespaces.getNamespaces("public")).thenReturn(Arrays.asList("public/default"));
+ Map<String, Object> topics = Maps.newHashMap();
+ topics.put("total", 1);
+ Mockito.when(topicsService.getTopicsList(0, 0, "public", "default", "http://localhost:8080"))
+ .thenReturn(topics);
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/persistent/public/default", header))
- .thenReturn("[\"persistent://public/default/test789\"]");
- PowerMockito.when(HttpUtil.doGet(
- "http://localhost:8080/admin/v2/persistent/public/default/partitioned", header))
- .thenReturn("[]");
Map<String, Object> result = namespacesService.getNamespaceList(1, 1, "public", "http://localhost:8080");
- Assert.assertEquals(result.get("total"), 1);
+ Assert.assertEquals(1, result.get("total"));
Assert.assertFalse((Boolean) result.get("isPage"));
- Assert.assertEquals(result.get("data").toString(), "[{topics=1, namespace=default}]");
+ Assert.assertEquals("[{topics=1, namespace=default}]", result.get("data").toString());
}
@Test
public void getNamespaceStatsTest() {
- PowerMockito.mockStatic(HttpUtil.class);
- Map<String, String> header = Maps.newHashMap();
- if (StringUtils.isNotBlank(pulsarJwtToken)) {
- header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
- }
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters", header))
- .thenReturn("[\"standalone\"]");
-
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/brokers/standalone", header))
- .thenReturn("[\"localhost:8080\"]");
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/broker-stats/topics", header))
- .thenReturn(BrokerStatsServiceImplTest.testData);
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters/standalone/failureDomains", header))
- .thenReturn("{}");
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters/standalone", header))
- .thenReturn("{\n" +
- "\"serviceUrl\" : \"http://tengdeMBP:8080\",\n" +
- "\"brokerServiceUrl\" : \"pulsar://tengdeMBP:6650\"\n" +
- "}");
String environment = "staging";
- String cluster = "standalone";
- String serviceUrl = "http://localhost:8080";
- brokerStatsService.collectStatsToDB(
- System.currentTimeMillis() / 1000,
- environment,
- cluster,
- serviceUrl
- );
+ String tenant = "public";
+ String namespace = "functions";
+
+ TopicStatsEntity topicStatsEntity = new TopicStatsEntity();
+ topicStatsEntity.setEnvironment(environment);
+ topicStatsEntity.setCluster("standalone");
+ topicStatsEntity.setBroker("localhost:8080");
+ topicStatsEntity.setPersistent("persistent");
+ topicStatsEntity.setTenant(tenant);
+ topicStatsEntity.setNamespace(namespace);
+ topicStatsEntity.setTopic("metadata");
+ topicStatsEntity.setBundle("0x40000000_0x80000000");
+ topicStatsEntity.setMsgRateIn(0.0);
+ topicStatsEntity.setSubscriptionCount(1);
+ topicStatsEntity.setProducerCount(1);
+ topicStatsEntity.setTime_stamp((System.currentTimeMillis() / 1000L));
+ topicsStatsRepository.save(topicStatsEntity);
+
Map<String, Object> namespaceStats = namespacesService.getNamespaceStats(
- environment, "public", "functions");
- Assert.assertEquals(namespaceStats.get("outMsg"), 0.0);
- Assert.assertEquals(namespaceStats.get("inMsg"), 0.0);
- Assert.assertEquals(namespaceStats.get("msgThroughputIn"), 0.0);
- Assert.assertEquals(namespaceStats.get("msgThroughputOut"), 0.0);
+ tenant, namespace, environment);
+ Assert.assertEquals(0.0, namespaceStats.get("outMsg"));
+ Assert.assertEquals(0.0, namespaceStats.get("inMsg"));
+ Assert.assertEquals(0.0, namespaceStats.get("msgThroughputIn"));
+ Assert.assertEquals(0.0, namespaceStats.get("msgThroughputOut"));
}
}
diff --git a/src/test/java/org/apache/pulsar/manager/service/PulsarAdminServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/PulsarAdminServiceImplTest.java
new file mode 100644
index 0000000..a819e86
--- /dev/null
+++ b/src/test/java/org/apache/pulsar/manager/service/PulsarAdminServiceImplTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.manager.service;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.pulsar.manager.PulsarManagerApplication;
+import org.apache.pulsar.manager.profiles.HerdDBTestProfile;
+import org.apache.pulsar.manager.service.impl.PulsarAdminServiceImpl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.test.util.ReflectionTestUtils;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(
+ classes = {
+ PulsarManagerApplication.class,
+ HerdDBTestProfile.class
+ }
+)
+@ActiveProfiles("test")
+public class PulsarAdminServiceImplTest {
+
+ @Autowired
+ private PulsarAdminService pulsarAdminService;
+
+ @After
+ public void teardown() {
+ ((PulsarAdminServiceImpl) pulsarAdminService).destroy();
+ ReflectionTestUtils.setField(pulsarAdminService, "pulsarAdmins", new HashMap<>());
+ }
+
+ @Test
+ public void getPulsarAdminTest() {
+ String serviceUrl = pulsarAdminService.getPulsarAdmin("http://localhost:8080").getServiceUrl();
+ Assert.assertEquals("http://localhost:8080", serviceUrl);
+ }
+
+ @Test
+ public void getAuthHeaderTest() {
+ ReflectionTestUtils.setField(pulsarAdminService, "authPlugin", "org.apache.pulsar.client.impl.auth.AuthenticationToken");
+ ReflectionTestUtils.setField(pulsarAdminService, "authParams", "test");
+ Map<String, String> authHeader = pulsarAdminService.getAuthHeader("http://localhost:8080");
+ Assert.assertEquals("Bearer test", authHeader.get("Authorization"));
+ }
+}
diff --git a/src/test/java/org/apache/pulsar/manager/service/PulsarEventImplTest.java b/src/test/java/org/apache/pulsar/manager/service/PulsarEventImplTest.java
index df518ac..711a618 100644
--- a/src/test/java/org/apache/pulsar/manager/service/PulsarEventImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/PulsarEventImplTest.java
@@ -167,17 +167,17 @@ public class PulsarEventImplTest {
public void validateTenantPermission() {
Map<String, String> result;
result = pulsarEvent.validateTenantPermission("/admin/v2/tenants/superTenant", "super-access-token");
- Assert.assertEquals(result.get("message"), "Validate tenant success");
+ Assert.assertEquals("Validate tenant success", result.get("message"));
result = pulsarEvent.validateTenantPermission("/admin/v2/tenants/adminTenant", "super-access-token");
- Assert.assertEquals(result.get("error"), "This user no include this tenant");
+ Assert.assertEquals("This user no include this tenant", result.get("error"));
result = pulsarEvent.validateTenantPermission(
"/pulsar-manager/admin/v2/schemas/adminTenant/default/test-topic", "super-access-token");
- Assert.assertEquals(result.get("message"), "This resource no need validate");
+ Assert.assertEquals("This resource no need validate", result.get("message"));
result = pulsarEvent.validateTenantPermission(
"/pulsar-manager/admin/v2/namespaces/adminTenant/default", "admin-access-token");
- Assert.assertEquals(result.get("message"), "Validate tenant success");
+ Assert.assertEquals("Validate tenant success", result.get("message"));
}
@Test
diff --git a/src/test/java/org/apache/pulsar/manager/service/RoleBindingServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/RoleBindingServiceImplTest.java
index 08e3ac6..754a608 100644
--- a/src/test/java/org/apache/pulsar/manager/service/RoleBindingServiceImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/RoleBindingServiceImplTest.java
@@ -80,7 +80,7 @@ public class RoleBindingServiceImplTest {
Map<String, String> validateErrorUser = roleBindingService.validateCurrentUser(
"test-error-access-token", roleBindingEntity);
- Assert.assertEquals(validateErrorUser.get("error"), "User no exist.");
+ Assert.assertEquals("User no exist.", validateErrorUser.get("error"));
TenantEntity tenantEntity = new TenantEntity();
tenantEntity.setTenant("test-tenant");
@@ -105,12 +105,12 @@ public class RoleBindingServiceImplTest {
roleBindingEntity.setRoleId(10);
Map<String, String> validateIllegalUser = roleBindingService.validateCurrentUser(
"test-access-token", roleBindingEntity);
- Assert.assertEquals(validateIllegalUser.get("error"), "This operation is illegal for this user");
+ Assert.assertEquals("This operation is illegal for this user", validateIllegalUser.get("error"));
roleBindingEntity.setRoleId(roleId);
Map<String, String> validateSuccessUser = roleBindingService.validateCurrentUser(
"test-access-token", roleBindingEntity);
- Assert.assertEquals(validateSuccessUser.get("message"), "Validate current user success");
+ Assert.assertEquals("Validate current user success", validateSuccessUser.get("message"));
roleBindingRepository.delete(roleId, userId);
rolesRepository.delete("test-role", "test-tenant");
@@ -150,12 +150,12 @@ public class RoleBindingServiceImplTest {
Map<String, Object> validateErrorUser = roleBindingService.validateCreateRoleBinding(
"test-error-access-token", "test-error-tenant",
"test-role-name", "test-user-name");
- Assert.assertEquals(validateErrorUser.get("error"), "The user is not exist");
+ Assert.assertEquals("The user is not exist", validateErrorUser.get("error"));
Map<String, Object> validateErrorRoleName = roleBindingService.validateCreateRoleBinding(
"test-access-token", "test-tenant",
"test-error-role", "test-user");
- Assert.assertEquals(validateErrorRoleName.get("error"), "This role is no exist");
+ Assert.assertEquals("This role is no exist", validateErrorRoleName.get("error"));
RoleInfoEntity testRoleInfoEntity = new RoleInfoEntity();
testRoleInfoEntity.setRoleName("test-no-binding-role");
@@ -185,12 +185,12 @@ public class RoleBindingServiceImplTest {
Map<String, Object> validateBindingRole = roleBindingService.validateCreateRoleBinding(
"test-access-token", "test-tenant",
"test-role", "test-user");
- Assert.assertEquals(validateBindingRole.get("error"), "Role binding already exist");
+ Assert.assertEquals("Role binding already exist", validateBindingRole.get("error"));
Map<String, Object> validateCreateRoleBinding = roleBindingService.validateCreateRoleBinding(
"test-access-token", "test-tenant",
"test-no-binding-role", "test-user");
- Assert.assertEquals(validateCreateRoleBinding.get("message"), "Validate create role success");
+ Assert.assertEquals("Validate create role success", validateCreateRoleBinding.get("message"));
roleBindingRepository.delete(roleId, userId);
rolesRepository.delete("test-role", "test-tenant");
@@ -233,12 +233,12 @@ public class RoleBindingServiceImplTest {
List<Map<String, Object>> roleBindingMap = roleBindingService.getRoleBindingList(
"test-access-token-binding", "test-tenant-binding");
for (Map<String, Object> stringObjectMap : roleBindingMap) {
- Assert.assertEquals(stringObjectMap.get("name"), "test-role-binding");
- Assert.assertEquals(stringObjectMap.get("userId"), userId);
- Assert.assertEquals(stringObjectMap.get("userName"), "test-user-binding");
- Assert.assertEquals(stringObjectMap.get("roleId"), roleId);
- Assert.assertEquals(stringObjectMap.get("roleName"), "test-role-binding");
- Assert.assertEquals(stringObjectMap.get("description"), "test-role-binding-description");
+ Assert.assertEquals("test-role-binding", stringObjectMap.get("name"));
+ Assert.assertEquals(userId, stringObjectMap.get("userId"));
+ Assert.assertEquals("test-user-binding", stringObjectMap.get("userName"));
+ Assert.assertEquals(roleId, stringObjectMap.get("roleId"));
+ Assert.assertEquals("test-role-binding", stringObjectMap.get("roleName"));
+ Assert.assertEquals("test-role-binding-description", stringObjectMap.get("description"));
}
roleBindingRepository.delete(roleId, userId);
diff --git a/src/test/java/org/apache/pulsar/manager/service/RolesServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/RolesServiceImplTest.java
index dc6b8dc..604ec3d 100644
--- a/src/test/java/org/apache/pulsar/manager/service/RolesServiceImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/RolesServiceImplTest.java
@@ -78,33 +78,33 @@ public class RolesServiceImplTest {
RoleInfoEntity roleInfoEntity = new RoleInfoEntity();
Map<String, String> roleNameIsEmpty = rolesService.validateRoleInfoEntity(roleInfoEntity);
- Assert.assertEquals(roleNameIsEmpty.get("error"), "Role name cannot be empty");
+ Assert.assertEquals("Role name cannot be empty", roleNameIsEmpty.get("error"));
roleInfoEntity.setRoleName("------");
Map<String, String> resourceNameIsEmpty = rolesService.validateRoleInfoEntity(roleInfoEntity);
- Assert.assertEquals(resourceNameIsEmpty.get("error"), "Resource name cannot be empty");
+ Assert.assertEquals("Resource name cannot be empty", resourceNameIsEmpty.get("error"));
roleInfoEntity.setResourceName("===========");
Map<String, String> roleNameIsIllegal = rolesService.validateRoleInfoEntity(roleInfoEntity);
- Assert.assertEquals(roleNameIsIllegal.get("error"), "Role name is illegal");
+ Assert.assertEquals("Role name is illegal", roleNameIsIllegal.get("error"));
roleInfoEntity.setRoleName("testRoleName");
Map<String, String> resourceNameIsIllegal = rolesService.validateRoleInfoEntity(roleInfoEntity);
- Assert.assertEquals(resourceNameIsIllegal.get("error"), "Resource Name is illegal");
+ Assert.assertEquals("Resource Name is illegal", resourceNameIsIllegal.get("error"));
roleInfoEntity.setResourceName("testResourceName");
roleInfoEntity.setResourceType("test-resourceType");
Map<String, String> resourceTypeIsIllegal = rolesService.validateRoleInfoEntity(roleInfoEntity);
- Assert.assertEquals(resourceTypeIsIllegal.get("error"), "Resource type is illegal");
+ Assert.assertEquals("Resource type is illegal", resourceTypeIsIllegal.get("error"));
roleInfoEntity.setResourceId(10);
roleInfoEntity.setResourceType(ResourceType.TENANTS.name());
Map<String, String> resourceNoExist = rolesService.validateRoleInfoEntity(roleInfoEntity);
- Assert.assertEquals(resourceNoExist.get("error"), "Tenant no exist, please check");
+ Assert.assertEquals("Tenant no exist, please check", resourceNoExist.get("error"));
TenantEntity tenantEntity = new TenantEntity();
tenantEntity.setTenant("test-tenant");
@@ -116,7 +116,7 @@ public class RolesServiceImplTest {
roleInfoEntity.setResourceId(20);
roleInfoEntity.setResourceType(ResourceType.NAMESPACES.name());
Map<String, String> namespaceNoExist = rolesService.validateRoleInfoEntity(roleInfoEntity);
- Assert.assertEquals(namespaceNoExist.get("error"), "Namespace no exist, please check");
+ Assert.assertEquals("Namespace no exist, please check", namespaceNoExist.get("error"));
NamespaceEntity namespaceEntity = new NamespaceEntity();
namespaceEntity.setTenant("test-tenant");
@@ -131,18 +131,18 @@ public class RolesServiceImplTest {
roleInfoEntity.setResourceType(ResourceType.TOPICS.name());
roleInfoEntity.setResourceVerbs(ResourceVerbs.ADMIN.name());
Map<String, String> stringMapTopics = rolesService.validateRoleInfoEntity(roleInfoEntity);
- Assert.assertEquals(stringMapTopics.get("error"),
- "admin should not be excluded for resources of type topic");
+ Assert.assertEquals("admin should not be excluded for resources of type topic",
+ stringMapTopics.get("error"));
roleInfoEntity.setResourceType(ResourceType.TENANTS.name());
roleInfoEntity.setResourceVerbs(ResourceVerbs.PRODUCE.name() + "," + ResourceVerbs.CONSUME.name());
Map<String, String> stringMapTenants = rolesService.validateRoleInfoEntity(roleInfoEntity);
- Assert.assertEquals(stringMapTenants.get("error"), "Type TENANTS include not supported verbs");
+ Assert.assertEquals("Type TENANTS include not supported verbs", stringMapTenants.get("error"));
roleInfoEntity.setResourceType(ResourceType.ALL.name());
roleInfoEntity.setResourceVerbs(ResourceVerbs.ADMIN.name());
Map<String, String> stringMapAll = rolesService.validateRoleInfoEntity(roleInfoEntity);
- Assert.assertEquals(stringMapAll.get("message"), "Role validate success");
+ Assert.assertEquals("Role validate success", stringMapAll.get("message"));
}
@Test
diff --git a/src/test/java/org/apache/pulsar/manager/service/TenantsServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/TenantsServiceImplTest.java
index ab81a7a..306b9b4 100644
--- a/src/test/java/org/apache/pulsar/manager/service/TenantsServiceImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/TenantsServiceImplTest.java
@@ -15,35 +15,31 @@ package org.apache.pulsar.manager.service;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.gson.Gson;
+
+import org.apache.pulsar.client.admin.Namespaces;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Tenants;
+import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.manager.PulsarManagerApplication;
import org.apache.pulsar.manager.profiles.HerdDBTestProfile;
-import org.apache.pulsar.manager.utils.HttpUtil;
-import org.apache.commons.lang3.StringUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import org.mockito.Mock;
+import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
-import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
-@RunWith(PowerMockRunner.class)
-@PowerMockRunnerDelegate(SpringRunner.class)
-@PowerMockIgnore( {"javax.*", "sun.*", "com.sun.*", "org.xml.*", "org.w3c.*"})
-@PrepareForTest(HttpUtil.class)
+@RunWith(SpringRunner.class)
@SpringBootTest(
classes = {
PulsarManagerApplication.class,
@@ -56,25 +52,28 @@ public class TenantsServiceImplTest {
@Autowired
private TenantsService tenantsService;
- @Value("${backend.jwt.token}")
- private static String pulsarJwtToken;
+ @MockBean
+ private PulsarAdminService pulsarAdminService;
+
+ @Mock
+ private Tenants tenants;
+
+ @Mock
+ private Namespaces namespaces;
@Test
- public void tenantsServiceImplTest() {
- PowerMockito.mockStatic(HttpUtil.class);
- Map<String, String> header = Maps.newHashMap();
- if (StringUtils.isNotBlank(pulsarJwtToken)) {
- header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
- }
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/tenants", header)).thenReturn("[\"public\"]");
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/tenants/public", header))
- .thenReturn("{\"adminRoles\": [\"admin\"], \"allowedClusters\": [\"standalone\"]}");
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/namespaces/public", header))
- .thenReturn("[\"public/default\"]");
+ public void tenantsServiceImplTest() throws PulsarAdminException {
+ Mockito.when(pulsarAdminService.tenants("http://localhost:8080")).thenReturn(tenants);
+ Mockito.when(tenants.getTenants()).thenReturn(Arrays.asList("public"));
+ TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("admin"), Sets.newHashSet("standalone"));
+ Mockito.when(tenants.getTenantInfo("public")).thenReturn(tenantInfo);
+ Mockito.when(pulsarAdminService.namespaces("http://localhost:8080")).thenReturn(namespaces);
+ Mockito.when(namespaces.getNamespaces("public")).thenReturn(Arrays.asList("public/default"));
+
Map<String, Object> objectMap = tenantsService.getTenantsList(1, 2, "http://localhost:8080");
- Assert.assertEquals(objectMap.get("total"), 1);
- Assert.assertEquals(objectMap.get("pageSize"), 1);
- Assert.assertEquals(objectMap.get("pageNum"), 1);
+ Assert.assertEquals(1, objectMap.get("total"));
+ Assert.assertEquals(1, objectMap.get("pageSize"));
+ Assert.assertEquals(1, objectMap.get("pageNum"));
List<Map<String, Object>> tenantsArray = new ArrayList<>();
Map<String, Object> tenantMap = Maps.newHashMap();
tenantMap.put("adminRoles", "admin");
@@ -82,29 +81,19 @@ public class TenantsServiceImplTest {
tenantMap.put("tenant", "public");
tenantMap.put("namespaces", 1);
tenantsArray.add(tenantMap);
- Assert.assertEquals(objectMap.get("data"), tenantsArray);
+ Assert.assertEquals(tenantsArray, objectMap.get("data"));
}
@Test
- public void createTenantTest() throws UnsupportedEncodingException {
- PowerMockito.mockStatic(HttpUtil.class);
- Map<String, String> header = Maps.newHashMap();
- header.put("Content-Type", "application/json");
- if (StringUtils.isNotBlank(pulsarJwtToken)) {
- header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
- }
- Map<String, Object> body = Maps.newHashMap();
+ public void createTenantTest() throws PulsarAdminException {
String tenant = "test";
String role = "test-role";
String cluster = "test-cluster";
- body.put("adminRoles", Sets.newHashSet(role));
- // Get cluster from standalone, to do
- body.put("allowedClusters", Sets.newHashSet(cluster));
- Gson gson = new Gson();
- PowerMockito.when(HttpUtil.doPut(
- "http://localhost:8080/admin/v2/tenants/" + tenant, header, gson.toJson(body))).thenReturn("");
+ Mockito.when(pulsarAdminService.tenants("http://localhost:8080")).thenReturn(tenants);
+ TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet(role), Sets.newHashSet(cluster));
+ Mockito.doNothing().when(tenants).createTenant(tenant, tenantInfo);
Map<String, String> createTenantResult = tenantsService.createTenant(
tenant, role, cluster, "http://localhost:8080");
- Assert.assertEquals(createTenantResult.get("message"), "Create tenant success");
+ Assert.assertEquals("Create tenant success", createTenantResult.get("message"));
}
}
diff --git a/src/test/java/org/apache/pulsar/manager/service/TopicsServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/TopicsServiceImplTest.java
index 2036b47..e998d07 100644
--- a/src/test/java/org/apache/pulsar/manager/service/TopicsServiceImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/TopicsServiceImplTest.java
@@ -14,32 +14,33 @@
package org.apache.pulsar.manager.service;
import com.google.common.collect.Maps;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Topics;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.manager.PulsarManagerApplication;
+import org.apache.pulsar.manager.entity.TopicStatsEntity;
+import org.apache.pulsar.manager.entity.TopicsStatsRepository;
import org.apache.pulsar.manager.profiles.HerdDBTestProfile;
-import org.apache.pulsar.manager.utils.HttpUtil;
-import org.apache.commons.lang3.StringUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import org.mockito.Mock;
+import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
-@RunWith(PowerMockRunner.class)
-@PowerMockRunnerDelegate(SpringRunner.class)
-@PowerMockIgnore( {"javax.*", "sun.*", "com.sun.*", "org.xml.*", "org.w3c.*"})
-@PrepareForTest(HttpUtil.class)
+@RunWith(SpringRunner.class)
@SpringBootTest(
classes = {
PulsarManagerApplication.class,
@@ -49,92 +50,103 @@ import java.util.Map;
@ActiveProfiles("test")
public class TopicsServiceImplTest {
+ @MockBean
+ private PulsarAdminService pulsarAdminService;
+
+ @Mock
+ private Topics topics;
+
@Autowired
private TopicsService topicsService;
@Autowired
- private BrokerStatsService brokerStatsService;
-
- @Value("${backend.jwt.token}")
- private static String pulsarJwtToken;
-
- private final String topics = "[" +
- "\"persistent://public/default/test789\"," +
- "\"persistent://public/default/test900-partition-0\"," +
- "\"persistent://public/default/test900-partition-1\"," +
- "\"persistent://public/default/test900-partition-2\"]";
-
- private final String partitionedTopics = "[\"persistent://public/default/test900\"]";
+ private TopicsStatsRepository topicsStatsRepository;
@Test
- public void topicsServiceImplTest() {
- PowerMockito.mockStatic(HttpUtil.class);
- Map<String, String> header = Maps.newHashMap();
- if (StringUtils.isNotBlank(pulsarJwtToken)) {
- header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
- }
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/persistent/public/default", header))
- .thenReturn(topics);
- PowerMockito.when(HttpUtil.doGet(
- "http://localhost:8080/admin/v2/persistent/public/default/partitioned", header))
- .thenReturn(partitionedTopics);
- PowerMockito.when(HttpUtil.doGet(
- "http://localhost:8080/admin/v2/persistent/public/default/test900/partitions", header))
- .thenReturn("{\"partitions\":3}");
+ public void topicsServiceImplTest() throws PulsarAdminException {
+ Mockito.when(pulsarAdminService.topics("http://localhost:8080")).thenReturn(topics);
+ Mockito.when(topics.getList("public/default")).thenReturn(
+ Arrays.asList(
+ "persistent://public/default/test789",
+ "persistent://public/default/test900-partition-0",
+ "persistent://public/default/test900-partition-1",
+ "persistent://public/default/test900-partition-2"
+ )
+ );
+ Mockito.when(topics.getPartitionedTopicList("public/default")).thenReturn(
+ Arrays.asList(
+ "persistent://public/default/test900"
+ )
+ );
+ Mockito.when(topics.getPartitionedTopicMetadata("persistent://public/default/test900")).thenReturn(
+ new PartitionedTopicMetadata(3)
+ );
Map<String, Object> topicsMap = topicsService.getTopicsList(
1, 1, "public", "default", "http://localhost:8080");
- Assert.assertEquals(topicsMap.get("total"), 2);
+ Assert.assertEquals(2, topicsMap.get("total"));
Assert.assertFalse((Boolean) topicsMap.get("isPage"));
- Assert.assertEquals(topicsMap.get("topics").toString(),
- "[{partitions=0, topic=test789, persistent=persistent}, {partitions=3, topic=test900, persistent=persistent}]");
+ Assert.assertEquals("[{partitions=0, topic=test789, persistent=persistent}, {partitions=3, topic=test900, persistent=persistent}]", topicsMap.get("topics").toString());
}
@Test
- public void getTopicsStatsImplTest() {
- PowerMockito.mockStatic(HttpUtil.class);
- Map<String, String> header = Maps.newHashMap();
- if (StringUtils.isNotBlank(pulsarJwtToken)) {
- header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
- }
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters", header))
- .thenReturn("[\"standalone\"]");
-
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/brokers/standalone", header))
- .thenReturn("[\"localhost:8080\"]");
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/broker-stats/topics", header))
- .thenReturn(BrokerStatsServiceImplTest.testData);
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters/standalone/failureDomains", header))
- .thenReturn("{}");
- PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters/standalone", header))
- .thenReturn("{\n" +
- "\"serviceUrl\" : \"http://tengdeMBP:8080\",\n" +
- "\"brokerServiceUrl\" : \"pulsar://tengdeMBP:6650\"\n" +
- "}");
+ public void getTopicsStatsImplTest() throws Exception {
String environment = "staging";
- String cluster = "standalone";
- String serviceUrl = "http://localhost:8080";
- brokerStatsService.collectStatsToDB(
- System.currentTimeMillis() / 1000,
- environment,
- cluster,
- serviceUrl
- );
+ String tenant = "public";
+ String namespace = "functions";
+ String topic = "metadata";
+ String policy = "persistent";
+
+ TopicStatsEntity topicStatsEntity = new TopicStatsEntity();
+ topicStatsEntity.setEnvironment(environment);
+ topicStatsEntity.setCluster("standalone");
+ topicStatsEntity.setBroker("localhost:8080");
+ topicStatsEntity.setPersistent(policy);
+ topicStatsEntity.setTenant(tenant);
+ topicStatsEntity.setNamespace(namespace);
+ topicStatsEntity.setTopic(topic);
+ topicStatsEntity.setBundle("0x40000000_0x80000000");
+ topicStatsEntity.setMsgRateIn(0.0);
+ topicStatsEntity.setSubscriptionCount(1);
+ topicStatsEntity.setProducerCount(1);
+ topicStatsEntity.setTime_stamp((System.currentTimeMillis() / 1000L));
+ topicsStatsRepository.save(topicStatsEntity);
List<Map<String, String>> topics = new ArrayList<>();
- Map<String, String> topic = Maps.newHashMap();
- topic.put("topic", "metadata");
- topic.put("partitions", "0");
- topics.add(topic);
+ Map<String, String> topicMap = Maps.newHashMap();
+ topicMap.put("topic", topic);
+ topicMap.put("partitions", "0");
+ topics.add(topicMap);
List<Map<String, Object>> topicsList = topicsService.getTopicsStatsList(
- environment, "public", "functions", "persistent", topics);
+ environment, tenant, namespace, policy, topics);
topicsList.forEach((t) -> {
- Assert.assertEquals(t.get("partitions"), 0);
- Assert.assertEquals(t.get("subscriptions"), 1);
- Assert.assertEquals(t.get("inMsg"), 0.0);
- Assert.assertEquals(t.get("producers"), 1);
- Assert.assertEquals(t.get("persistent"), "persistent");
- Assert.assertEquals(t.get("topic"), "metadata");
+ Assert.assertEquals(0, t.get("partitions"));
+ Assert.assertEquals(1, t.get("subscriptions"));
+ Assert.assertEquals(0.0, t.get("inMsg"));
+ Assert.assertEquals(1, t.get("producers"));
+ Assert.assertEquals(policy, t.get("persistent"));
+ Assert.assertEquals(topic, t.get("topic"));
+ });
+ }
+
+ @Test
+ public void peekMessagesTest() throws PulsarAdminException {
+ Mockito.when(pulsarAdminService.topics("http://localhost:8080")).thenReturn(topics);
+ List<Message<byte[]>> messages = new ArrayList<>();
+ messages.add(new MessageImpl<byte[]>("persistent://public/default/test", "1:1", Maps.newTreeMap(), "test".getBytes(), Schema.BYTES));
+ Mockito.when(topics.peekMessages("persistent://public/default/test", "sub-1", 1)).thenReturn(messages);
+
+ List<Map<String, Object>> result = topicsService.peekMessages(
+ "persistent", "public",
+ "default", "test",
+ "sub-1", 1,
+ "http://localhost:8080");
+ Assert.assertEquals(1, result.size());
+ result.forEach((message) -> {
+ Assert.assertEquals(1L, message.get("ledgerId"));
+ Assert.assertEquals(1L, message.get("entryId"));
+ Assert.assertEquals(false, message.get("batch"));
+ Assert.assertEquals(new String("test".getBytes()), new String((byte[]) message.get("data")));
});
}
}
diff --git a/src/test/java/org/apache/pulsar/manager/service/UsersServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/UsersServiceImplTest.java
index 353c79c..460c4ba 100644
--- a/src/test/java/org/apache/pulsar/manager/service/UsersServiceImplTest.java
+++ b/src/test/java/org/apache/pulsar/manager/service/UsersServiceImplTest.java
@@ -49,31 +49,30 @@ public class UsersServiceImplTest {
UserInfoEntity userInfoEntity = new UserInfoEntity();
userInfoEntity.setName(" ");
Map<String, String> validateNameEmpty = usersService.validateUserInfo(userInfoEntity);
- Assert.assertEquals(validateNameEmpty.get("error"), "User name cannot be empty");
+ Assert.assertEquals("User name cannot be empty", validateNameEmpty.get("error"));
userInfoEntity.setName("----====");
Map<String, String> validateNameIllegal = usersService.validateUserInfo(userInfoEntity);
- Assert.assertEquals(validateNameIllegal.get("error"), "User name illegal");
+ Assert.assertEquals("User name illegal", validateNameIllegal.get("error"));
userInfoEntity.setName("test");
userInfoEntity.setEmail(" ");
Map<String, String> validateEmailEmpty = usersService.validateUserInfo(userInfoEntity);
- Assert.assertEquals(validateEmailEmpty.get("error"), "User email cannot be empty");
+ Assert.assertEquals("User email cannot be empty", validateEmailEmpty.get("error"));
userInfoEntity.setEmail("xxxx@");
Map<String, String> validateEmailIllegal = usersService.validateUserInfo(userInfoEntity);
- Assert.assertEquals(validateEmailIllegal.get("error"), "Email address illegal");
+ Assert.assertEquals("Email address illegal", validateEmailIllegal.get("error"));
userInfoEntity.setEmail("test@apache.org");
userInfoEntity.setPassword(" ");
userInfoEntity.setAccessToken(" ");
Map<String, String> validatePasswordAndTokenBlank = usersService.validateUserInfo(userInfoEntity);
- Assert.assertEquals(validatePasswordAndTokenBlank.get("error"),
- "Fields password and access token cannot be empty at the same time.");
+ Assert.assertEquals("Fields password and access token cannot be empty at the same time.",
+ validatePasswordAndTokenBlank.get("error"));
userInfoEntity.setPassword("password");
userInfoEntity.setAccessToken("token");
Map<String, String> validateSuccess = usersService.validateUserInfo(userInfoEntity);
- Assert.assertEquals(validateSuccess.get("message"),
- "Validate user success");
+ Assert.assertEquals("Validate user success", validateSuccess.get("message"));
}
}