You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by st...@apache.org on 2021/09/08 16:13:49 UTC
[rocketmq-dashboard] branch master updated: [ISSUE #16]Use an
object pool to manage DefaultMQAdminExt objects. (#17)
This is an automated email from the ASF dual-hosted git repository.
styletang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-dashboard.git
The following commit(s) were added to refs/heads/master by this push:
new 975449e [ISSUE #16]Use an object pool to manage DefaultMQAdminExt objects. (#17)
975449e is described below
commit 975449e26844ff27d827241acf4e0d87ce7162bb
Author: zhangjidi2016 <10...@qq.com>
AuthorDate: Thu Sep 9 00:13:44 2021 +0800
[ISSUE #16]Use an object pool to manage DefaultMQAdminExt objects. (#17)
* [ISSUE #16]Use an object pool to manage DefaultMQAdminExt objects.
* Modify the code
* Optimize the code
Co-authored-by: zhangjidi2016 <zh...@cmss.chinamobile.com>
---
pom.xml | 10 ++-
.../java/org/apache/rocketmq/dashboard/App.java | 3 +
.../rocketmq/dashboard/admin/MQAdminFactory.java | 57 +++++++++++++
.../admin/MQAdminPooledObjectFactory.java | 79 +++++++++++++++++
.../dashboard/admin/MqAdminExtObjectPool.java | 47 ++++++++++
.../dashboard/aspect/admin/MQAdminAspect.java | 38 +++------
.../admin/annotation/MultiMQAdminCmdMethod.java | 31 -------
.../rocketmq/dashboard/config/RMQConfigure.java | 10 +++
.../dashboard/controller/TopicController.java | 3 +-
.../dashboard/service/client/MQAdminInstance.java | 69 ++++++---------
.../service/impl/ConsumerServiceImpl.java | 9 --
.../dashboard/service/impl/OpsServiceImpl.java | 10 +++
.../dashboard/task/DashboardCollectTask.java | 10 +--
src/main/resources/application.properties | 2 +
.../dashboard/admin/MQAdminAspectTest.java | 26 +++---
.../dashboard/admin/MQAdminExtImplTest.java | 13 ---
.../rocketmq/dashboard/admin/MQAdminPoolTest.java | 99 ++++++++++++++++++++++
.../dashboard/config/RMQConfigureTest.java | 2 +
.../dashboard/controller/OpsControllerTest.java | 21 +++++
19 files changed, 393 insertions(+), 146 deletions(-)
diff --git a/pom.xml b/pom.xml
index 171169c..f547b7f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,6 +89,7 @@
<spring.boot.version>2.2.2.RELEASE</spring.boot.version>
<mockito-inline.version>3.3.3</mockito-inline.version>
<jaxb-api.version>2.3.1</jaxb-api.version>
+ <commons-pool2.version>2.4.3</commons-pool2.version>
</properties>
<dependencies>
@@ -218,6 +219,11 @@
<version>${mockito-inline.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-pool2</artifactId>
+ <version>${commons-pool2.version}</version>
+ </dependency>
</dependencies>
<build>
<plugins>
@@ -251,7 +257,7 @@
</execution>
</executions>
</plugin>
- <!-- Use dockerfile-maven instead, https://github.com/spotify/dockerfile-maven -->
+ <!-- Use dockerfile-maven instead, https://github.com/spotify/dockerfile-maven -->
<plugin>
<groupId>com.spotify</groupId>
<artifactId>docker-maven-plugin</artifactId>
@@ -331,7 +337,7 @@
</dependency>
</dependencies>
</plugin>
- <plugin>
+ <plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<version>0.12</version>
diff --git a/src/main/java/org/apache/rocketmq/dashboard/App.java b/src/main/java/org/apache/rocketmq/dashboard/App.java
index 7761ce3..9e88277 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/App.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/App.java
@@ -19,11 +19,14 @@ package org.apache.rocketmq.dashboard;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
+import org.springframework.context.annotation.EnableMBeanExport;
+import org.springframework.jmx.support.RegistrationPolicy;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
@ServletComponentScan
+@EnableMBeanExport(registration = RegistrationPolicy.IGNORE_EXISTING)
public class App {
public static void main(String[] args) {
diff --git a/src/main/java/org/apache/rocketmq/dashboard/admin/MQAdminFactory.java b/src/main/java/org/apache/rocketmq/dashboard/admin/MQAdminFactory.java
new file mode 100644
index 0000000..8bbf19d
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/dashboard/admin/MQAdminFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.dashboard.admin;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.dashboard.config.RMQConfigure;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+
+@Slf4j
+public class MQAdminFactory {
+ private RMQConfigure rmqConfigure;
+
+ public MQAdminFactory(RMQConfigure rmqConfigure) {
+ this.rmqConfigure = rmqConfigure;
+ }
+
+ public MQAdminExt getInstance() throws Exception {
+ RPCHook rpcHook = null;
+ final String accessKey = rmqConfigure.getAccessKey();
+ final String secretKey = rmqConfigure.getSecretKey();
+ boolean isEnableAcl = StringUtils.isNotEmpty(accessKey) && StringUtils.isNotEmpty(secretKey);
+ if (isEnableAcl) {
+ rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
+ }
+ DefaultMQAdminExt mqAdminExt = null;
+ if (rmqConfigure.getTimeoutMillis() == null) {
+ mqAdminExt = new DefaultMQAdminExt(rpcHook);
+ } else {
+ mqAdminExt = new DefaultMQAdminExt(rpcHook, rmqConfigure.getTimeoutMillis());
+ }
+ mqAdminExt.setVipChannelEnabled(Boolean.parseBoolean(rmqConfigure.getIsVIPChannel()));
+ mqAdminExt.setUseTLS(rmqConfigure.isUseTLS());
+ mqAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+ mqAdminExt.start();
+ log.info("create MQAdmin instance {} success.", mqAdminExt);
+ return mqAdminExt;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/dashboard/admin/MQAdminPooledObjectFactory.java b/src/main/java/org/apache/rocketmq/dashboard/admin/MQAdminPooledObjectFactory.java
new file mode 100644
index 0000000..b68f931
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/dashboard/admin/MQAdminPooledObjectFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.dashboard.admin;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.PooledObjectFactory;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+
+@Slf4j
+public class MQAdminPooledObjectFactory implements PooledObjectFactory<MQAdminExt> {
+
+ private MQAdminFactory mqAdminFactory;
+
+ @Override
+ public PooledObject<MQAdminExt> makeObject() throws Exception {
+ DefaultPooledObject<MQAdminExt> pooledObject = new DefaultPooledObject<>(
+ mqAdminFactory.getInstance());
+ return pooledObject;
+ }
+
+ @Override
+ public void destroyObject(PooledObject<MQAdminExt> p) {
+ MQAdminExt mqAdmin = p.getObject();
+ if (mqAdmin != null) {
+ try {
+ mqAdmin.shutdown();
+ } catch (Exception e) {
+ log.warn("MQAdminExt shutdown err", e);
+ }
+ }
+ log.info("destroy object {}", p.getObject());
+ }
+
+ @Override
+ public boolean validateObject(PooledObject<MQAdminExt> p) {
+ MQAdminExt mqAdmin = p.getObject();
+ ClusterInfo clusterInfo = null;
+ try {
+ clusterInfo = mqAdmin.examineBrokerClusterInfo();
+ } catch (Exception e) {
+ log.warn("validate object {} err", p.getObject(), e);
+ }
+ if (clusterInfo == null || MapUtils.isEmpty(clusterInfo.getBrokerAddrTable())) {
+ log.warn("validateObject failed, clusterInfo = {}", clusterInfo);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void activateObject(PooledObject<MQAdminExt> p) {
+ }
+
+ @Override
+ public void passivateObject(PooledObject<MQAdminExt> p) {
+ }
+
+ public void setMqAdminFactory(MQAdminFactory mqAdminFactory) {
+ this.mqAdminFactory = mqAdminFactory;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/dashboard/admin/MqAdminExtObjectPool.java b/src/main/java/org/apache/rocketmq/dashboard/admin/MqAdminExtObjectPool.java
new file mode 100644
index 0000000..976c009
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/dashboard/admin/MqAdminExtObjectPool.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.dashboard.admin;
+
+import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.rocketmq.dashboard.config.RMQConfigure;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class MqAdminExtObjectPool {
+
+ @Autowired
+ private RMQConfigure rmqConfigure;
+
+ @Bean
+ public GenericObjectPool<MQAdminExt> mqAdminExtPool() {
+ GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
+ genericObjectPoolConfig.setTestWhileIdle(true);
+ genericObjectPoolConfig.setMaxWaitMillis(10000);
+ genericObjectPoolConfig.setTimeBetweenEvictionRunsMillis(20000);
+ MQAdminPooledObjectFactory mqAdminPooledObjectFactory = new MQAdminPooledObjectFactory();
+ MQAdminFactory mqAdminFactory = new MQAdminFactory(rmqConfigure);
+ mqAdminPooledObjectFactory.setMqAdminFactory(mqAdminFactory);
+ GenericObjectPool<MQAdminExt> genericObjectPool = new GenericObjectPool<MQAdminExt>(
+ mqAdminPooledObjectFactory,
+ genericObjectPoolConfig);
+ return genericObjectPool;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/dashboard/aspect/admin/MQAdminAspect.java b/src/main/java/org/apache/rocketmq/dashboard/aspect/admin/MQAdminAspect.java
index f1d7b18..56f37f6 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/aspect/admin/MQAdminAspect.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/aspect/admin/MQAdminAspect.java
@@ -16,28 +16,24 @@
*/
package org.apache.rocketmq.dashboard.aspect.admin;
-import org.apache.rocketmq.dashboard.aspect.admin.annotation.MultiMQAdminCmdMethod;
-import org.apache.rocketmq.dashboard.config.RMQConfigure;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.rocketmq.dashboard.service.client.MQAdminInstance;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
-import org.aspectj.lang.reflect.MethodSignature;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import java.lang.reflect.Method;
-
@Aspect
@Service
+@Slf4j
public class MQAdminAspect {
- private Logger logger = LoggerFactory.getLogger(MQAdminAspect.class);
@Autowired
- private RMQConfigure rmqConfigure;
+ private GenericObjectPool<MQAdminExt> mqAdminExtPool;
public MQAdminAspect() {
}
@@ -47,30 +43,16 @@ public class MQAdminAspect {
}
- @Pointcut("@annotation(org.apache.rocketmq.dashboard.aspect.admin.annotation.MultiMQAdminCmdMethod)")
- public void multiMQAdminMethodPointCut() {
-
- }
-
- @Around(value = "mQAdminMethodPointCut() || multiMQAdminMethodPointCut()")
+ @Around(value = "mQAdminMethodPointCut()")
public Object aroundMQAdminMethod(ProceedingJoinPoint joinPoint) throws Throwable {
long start = System.currentTimeMillis();
Object obj = null;
try {
- MethodSignature signature = (MethodSignature)joinPoint.getSignature();
- Method method = signature.getMethod();
- MultiMQAdminCmdMethod multiMQAdminCmdMethod = method.getAnnotation(MultiMQAdminCmdMethod.class);
- if (multiMQAdminCmdMethod != null && multiMQAdminCmdMethod.timeoutMillis() > 0) {
- MQAdminInstance.initMQAdminInstance(multiMQAdminCmdMethod.timeoutMillis(),rmqConfigure.getAccessKey(),rmqConfigure.getSecretKey(), rmqConfigure.isUseTLS());
- }
- else {
- MQAdminInstance.initMQAdminInstance(0,rmqConfigure.getAccessKey(),rmqConfigure.getSecretKey(), rmqConfigure.isUseTLS());
- }
+ MQAdminInstance.createMQAdmin(mqAdminExtPool);
obj = joinPoint.proceed();
- }
- finally {
- MQAdminInstance.destroyMQAdminInstance();
- logger.debug("op=look method={} cost={}", joinPoint.getSignature().getName(), System.currentTimeMillis() - start);
+ } finally {
+ MQAdminInstance.returnMQAdmin(mqAdminExtPool);
+ log.debug("op=look method={} cost={}", joinPoint.getSignature().getName(), System.currentTimeMillis() - start);
}
return obj;
}
diff --git a/src/main/java/org/apache/rocketmq/dashboard/aspect/admin/annotation/MultiMQAdminCmdMethod.java b/src/main/java/org/apache/rocketmq/dashboard/aspect/admin/annotation/MultiMQAdminCmdMethod.java
deleted file mode 100644
index d62f167..0000000
--- a/src/main/java/org/apache/rocketmq/dashboard/aspect/admin/annotation/MultiMQAdminCmdMethod.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.dashboard.aspect.admin.annotation;
-
-import java.lang.annotation.Documented;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-@Target({ElementType.METHOD})
-@Retention(RetentionPolicy.RUNTIME)
-@Documented
-public @interface MultiMQAdminCmdMethod {
- long timeoutMillis() default 0;
-}
diff --git a/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java b/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java
index 09e3c71..43a7a2b 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java
@@ -58,6 +58,8 @@ public class RMQConfigure {
private boolean useTLS = false;
+ private Long timeoutMillis;
+
public String getAccessKey() {
return accessKey;
}
@@ -148,6 +150,14 @@ public class RMQConfigure {
this.useTLS = useTLS;
}
+ public Long getTimeoutMillis() {
+ return timeoutMillis;
+ }
+
+ public void setTimeoutMillis(Long timeoutMillis) {
+ this.timeoutMillis = timeoutMillis;
+ }
+
// Error Page process logic, move to a central configure later
@Bean
public ErrorPageRegistrar errorPageRegistrar() {
diff --git a/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java b/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java
index 373db5b..fbcb7c6 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java
@@ -51,8 +51,7 @@ public class TopicController {
@RequestMapping(value = "/list.query", method = RequestMethod.GET)
@ResponseBody
- public Object list(@RequestParam(value = "skipSysProcess", required = false) String skipSysProcess)
- throws MQClientException, RemotingException, InterruptedException {
+ public Object list(@RequestParam(value = "skipSysProcess", required = false) String skipSysProcess) {
boolean flag = false;
if ("true".equals(skipSysProcess)) {
flag = true;
diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminInstance.java b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminInstance.java
index 6f40db0..8d9ae4f 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminInstance.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminInstance.java
@@ -16,29 +16,27 @@
*/
package org.apache.rocketmq.dashboard.service.client;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.acl.common.AclClientRPCHook;
-import org.apache.rocketmq.acl.common.SessionCredentials;
-import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
-import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.joor.Reflect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MQAdminInstance {
- private static final ThreadLocal<DefaultMQAdminExt> MQ_ADMIN_EXT_THREAD_LOCAL = new ThreadLocal<DefaultMQAdminExt>();
- private static final ThreadLocal<Integer> INIT_COUNTER = new ThreadLocal<Integer>();
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MQAdminInstance.class);
+ private static final ThreadLocal<MQAdminExt> MQ_ADMIN_EXT_THREAD_LOCAL = new ThreadLocal<>();
public static MQAdminExt threadLocalMQAdminExt() {
- DefaultMQAdminExt defaultMQAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get();
- if (defaultMQAdminExt == null) {
+ MQAdminExt mqAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get();
+ if (mqAdminExt == null) {
throw new IllegalStateException("defaultMQAdminExt should be init before you get this");
}
- return defaultMQAdminExt;
+ return mqAdminExt;
}
public static RemotingClient threadLocalRemotingClient() {
@@ -51,44 +49,27 @@ public class MQAdminInstance {
DefaultMQAdminExtImpl defaultMQAdminExtImpl = Reflect.on(MQAdminInstance.threadLocalMQAdminExt()).get("defaultMQAdminExtImpl");
return Reflect.on(defaultMQAdminExtImpl).get("mqClientInstance");
}
- public static void initMQAdminInstance(long timeoutMillis,String accessKey,String secretKey, boolean useTLS) throws MQClientException {
- Integer nowCount = INIT_COUNTER.get();
- if (nowCount == null) {
- RPCHook rpcHook = null;
- boolean isEnableAcl = !StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey);
- if (isEnableAcl) {
- rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
- }
- DefaultMQAdminExt defaultMQAdminExt;
- if (timeoutMillis > 0) {
- defaultMQAdminExt = new DefaultMQAdminExt(rpcHook,timeoutMillis);
- }
- else {
- defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
- }
- defaultMQAdminExt.setUseTLS(useTLS);
- defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
- defaultMQAdminExt.start();
- MQ_ADMIN_EXT_THREAD_LOCAL.set(defaultMQAdminExt);
- INIT_COUNTER.set(1);
- }
- else {
- INIT_COUNTER.set(nowCount + 1);
- }
+ public static void createMQAdmin(GenericObjectPool<MQAdminExt> mqAdminExtPool) {
+ try {
+ // Get the mqAdmin instance from the object pool
+ MQAdminExt mqAdminExt = mqAdminExtPool.borrowObject();
+ MQ_ADMIN_EXT_THREAD_LOCAL.set(mqAdminExt);
+ } catch (Exception e) {
+ LOGGER.error("get mqAdmin from pool error", e);
+ }
}
- public static void destroyMQAdminInstance() {
- Integer nowCount = INIT_COUNTER.get() - 1;
- if (nowCount > 0) {
- INIT_COUNTER.set(nowCount);
- return;
- }
+ public static void returnMQAdmin(GenericObjectPool<MQAdminExt> mqAdminExtPool) {
MQAdminExt mqAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get();
if (mqAdminExt != null) {
- mqAdminExt.shutdown();
- MQ_ADMIN_EXT_THREAD_LOCAL.remove();
- INIT_COUNTER.remove();
+ try {
+ // After execution, return the mqAdmin instance to the object pool
+ mqAdminExtPool.returnObject(mqAdminExt);
+ } catch (Exception e) {
+ LOGGER.error("return mqAdmin to pool error", e);
+ }
}
+ MQ_ADMIN_EXT_THREAD_LOCAL.remove();
}
}
diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
index 4e69e83..37073d7 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
@@ -43,7 +43,6 @@ import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
-import org.apache.rocketmq.dashboard.aspect.admin.annotation.MultiMQAdminCmdMethod;
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
import org.apache.rocketmq.dashboard.model.QueueStatInfo;
@@ -64,7 +63,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class);
@Override
- @MultiMQAdminCmdMethod
public List<GroupConsumeInfo> queryGroupList() {
Set<String> consumerGroupSet = Sets.newHashSet();
try {
@@ -86,7 +84,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
}
@Override
- @MultiMQAdminCmdMethod
public GroupConsumeInfo queryGroup(String consumerGroup) {
GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo();
try {
@@ -133,7 +130,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
}
@Override
- @MultiMQAdminCmdMethod
public List<TopicConsumerInfo> queryConsumeStatsList(final String topic, String groupName) {
ConsumeStats consumeStats = null;
try {
@@ -184,7 +180,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
}
@Override
- @MultiMQAdminCmdMethod
public Map<String /*groupName*/, TopicConsumerInfo> queryConsumeStatsListByTopicName(String topic) {
Map<String, TopicConsumerInfo> group2ConsumerInfoMap = Maps.newHashMap();
try {
@@ -206,7 +201,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
}
@Override
- @MultiMQAdminCmdMethod
public Map<String, ConsumerGroupRollBackStat> resetOffset(ResetOffsetRequest resetOffsetRequest) {
Map<String, ConsumerGroupRollBackStat> groupRollbackStats = Maps.newHashMap();
for (String consumerGroup : resetOffsetRequest.getConsumerGroupList()) {
@@ -251,7 +245,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
}
@Override
- @MultiMQAdminCmdMethod
public List<ConsumerConfigInfo> examineSubscriptionGroupConfig(String group) {
List<ConsumerConfigInfo> consumerConfigInfoList = Lists.newArrayList();
try {
@@ -272,7 +265,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
}
@Override
- @MultiMQAdminCmdMethod
public boolean deleteSubGroup(DeleteSubGroupRequest deleteSubGroupRequest) {
try {
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
@@ -303,7 +295,6 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
}
@Override
- @MultiMQAdminCmdMethod
public Set<String> fetchBrokerNameSetBySubscriptionGroup(String group) {
Set<String> brokerNameSet = Sets.newHashSet();
try {
diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/OpsServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/OpsServiceImpl.java
index 7b56220..2d98e12 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/OpsServiceImpl.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/OpsServiceImpl.java
@@ -21,11 +21,14 @@ import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;
+import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.service.AbstractCommonService;
import org.apache.rocketmq.dashboard.service.OpsService;
import org.apache.rocketmq.dashboard.service.checker.CheckerType;
import org.apache.rocketmq.dashboard.service.checker.RocketMqChecker;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
@@ -34,6 +37,9 @@ public class OpsServiceImpl extends AbstractCommonService implements OpsService
@Resource
private RMQConfigure configure;
+ @Autowired
+ private GenericObjectPool<MQAdminExt> mqAdminExtPool;
+
@Resource
private List<RocketMqChecker> rocketMqCheckerList;
@@ -49,6 +55,8 @@ public class OpsServiceImpl extends AbstractCommonService implements OpsService
@Override
public void updateNameSvrAddrList(String nameSvrAddrList) {
configure.setNamesrvAddr(nameSvrAddrList);
+ // when update namesrvAddr, clean the mqAdminExt objects pool.
+ mqAdminExtPool.clear();
}
@Override
@@ -67,12 +75,14 @@ public class OpsServiceImpl extends AbstractCommonService implements OpsService
@Override public boolean updateIsVIPChannel(String useVIPChannel) {
configure.setIsVIPChannel(useVIPChannel);
+ mqAdminExtPool.clear();
return true;
}
@Override
public boolean updateUseTLS(boolean useTLS) {
configure.setUseTLS(useTLS);
+ mqAdminExtPool.clear();
return true;
}
}
diff --git a/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java b/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java
index 2e00223..c09568d 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java
@@ -24,7 +24,6 @@ import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.dashboard.aspect.admin.annotation.MultiMQAdminCmdMethod;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
@@ -69,12 +68,11 @@ public class DashboardCollectTask {
private final static Logger log = LoggerFactory.getLogger(DashboardCollectTask.class);
@Scheduled(cron = "30 0/1 * * * ?")
- @MultiMQAdminCmdMethod(timeoutMillis = 5000)
public void collectTopic() {
if (!rmqConfigure.isEnableDashBoardCollect()) {
return;
}
-
+
Date date = new Date();
Stopwatch stopwatch = Stopwatch.createUnstarted();
try {
@@ -83,8 +81,8 @@ public class DashboardCollectTask {
this.addSystemTopic();
for (String topic : topicSet) {
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)
- || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)
- || TopicValidator.isSystemTopic(topic)) {
+ || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)
+ || TopicValidator.isSystemTopic(topic)) {
continue;
}
TopicRouteData topicRouteData = mqAdminExt.examineTopicRouteInfo(topic);
@@ -323,7 +321,7 @@ public class DashboardCollectTask {
}
return newTpsList;
}
-
+
private void addSystemTopic() throws Exception {
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
HashMap<String, Set<String>> clusterTable = clusterInfo.getClusterAddrTable();
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index 0ad1228..335ac69 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -35,6 +35,8 @@ logging.config=classpath:logback.xml
rocketmq.config.namesrvAddr=
#if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
rocketmq.config.isVIPChannel=
+#timeout for mqadminExt, default 5000ms
+rocketmq.config.timeoutMillis=
#rocketmq-console's data path:dashboard/monitor
rocketmq.config.dataPath=/tmp/rocketmq-console/data
#set it false if you don't want use dashboard.default true
diff --git a/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminAspectTest.java b/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminAspectTest.java
index 9874a09..a76dc6c 100644
--- a/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminAspectTest.java
+++ b/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminAspectTest.java
@@ -19,13 +19,16 @@ package org.apache.rocketmq.dashboard.admin;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
+import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.rocketmq.dashboard.aspect.admin.MQAdminAspect;
-import org.apache.rocketmq.dashboard.aspect.admin.annotation.MultiMQAdminCmdMethod;
-import org.apache.rocketmq.dashboard.config.RMQConfigure;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.reflect.MethodSignature;
import org.junit.Test;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -37,19 +40,20 @@ public class MQAdminAspectTest {
ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class);
MethodSignature signature = mock(MethodSignature.class);
Method method = mock(Method.class);
- MultiMQAdminCmdMethod annotationValue = mock(MultiMQAdminCmdMethod.class);
- when(annotationValue.timeoutMillis()).thenReturn(0L).thenReturn(3000L);
- when(method.getAnnotation(MultiMQAdminCmdMethod.class)).thenReturn(annotationValue);
when(signature.getMethod()).thenReturn(method);
when(joinPoint.getSignature()).thenReturn(signature);
- RMQConfigure rmqConfigure = mock(RMQConfigure.class);
- when(rmqConfigure.getAccessKey()).thenReturn("rocketmq");
- when(rmqConfigure.getSecretKey()).thenReturn("12345678");
- Field field = mqAdminAspect.getClass().getDeclaredField("rmqConfigure");
+ GenericObjectPool<MQAdminExt> mqAdminExtPool = mock(GenericObjectPool.class);
+ when(mqAdminExtPool.borrowObject())
+ .thenThrow(new RuntimeException("borrowObject exception"))
+ .thenReturn(new DefaultMQAdminExt());
+ doNothing().doThrow(new RuntimeException("returnObject exception"))
+ .when(mqAdminExtPool).returnObject(any());
+ Field field = mqAdminAspect.getClass().getDeclaredField("mqAdminExtPool");
field.setAccessible(true);
- field.set(mqAdminAspect, rmqConfigure);
-
+ field.set(mqAdminAspect, mqAdminExtPool);
+ // exception
+ mqAdminAspect.aroundMQAdminMethod(joinPoint);
mqAdminAspect.aroundMQAdminMethod(joinPoint);
}
}
diff --git a/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminExtImplTest.java b/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminExtImplTest.java
index 32a82bd..28a898d 100644
--- a/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminExtImplTest.java
+++ b/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminExtImplTest.java
@@ -65,7 +65,6 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
-import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -121,24 +120,12 @@ public class MQAdminExtImplTest {
defaultMQAdminExt = mock(DefaultMQAdminExt.class);
threadLocal.set(defaultMQAdminExt);
- field = MQAdminInstance.class.getDeclaredField("INIT_COUNTER");
- field.setAccessible(true);
- object = field.get(mqAdminExtImpl);
- assertNotNull(object);
- ThreadLocal<Integer> threadLocal1 = (ThreadLocal<Integer>) object;
- threadLocal1.set(1);
-
ReflectionTestUtils.setField(defaultMQAdminExt, "defaultMQAdminExtImpl", defaultMQAdminExtImpl);
ReflectionTestUtils.setField(defaultMQAdminExtImpl, "mqClientInstance", mqClientInstance);
ReflectionTestUtils.setField(mqClientInstance, "mQClientAPIImpl", mQClientAPIImpl);
ReflectionTestUtils.setField(mQClientAPIImpl, "remotingClient", remotingClient);
}
- @After
- public void destroy() {
- MQAdminInstance.destroyMQAdminInstance();
- }
-
@Test
public void testUpdateBrokerConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
diff --git a/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminPoolTest.java b/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminPoolTest.java
new file mode 100644
index 0000000..6859927
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminPoolTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.dashboard.admin;
+
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.dashboard.config.RMQConfigure;
+import org.apache.rocketmq.dashboard.util.MockObjectUtil;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.test.util.ReflectionTestUtils;
+
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class MQAdminPoolTest {
+
+ private MqAdminExtObjectPool mqAdminExtObjectPool;
+
+ private GenericObjectPool<MQAdminExt> pool;
+
+ private MQAdminPooledObjectFactory mqAdminPooledObjectFactory;
+
+ @Before
+ public void init() {
+ mqAdminExtObjectPool = new MqAdminExtObjectPool();
+ RMQConfigure rmqConfigure = mock(RMQConfigure.class);
+ when(rmqConfigure.getNamesrvAddr()).thenReturn("127.0.0.1:9876");
+ when(rmqConfigure.getAccessKey()).thenReturn("rocketmq");
+ when(rmqConfigure.getSecretKey()).thenReturn("12345678");
+ ReflectionTestUtils.setField(mqAdminExtObjectPool, "rmqConfigure", rmqConfigure);
+ pool = mqAdminExtObjectPool.mqAdminExtPool();
+ mqAdminPooledObjectFactory = (MQAdminPooledObjectFactory) pool.getFactory();
+ }
+
+ @Test
+ public void testMakeObject() throws Exception {
+ PooledObject<MQAdminExt> mqAdmin = mqAdminPooledObjectFactory.makeObject();
+ Assert.assertNotNull(mqAdmin);
+ }
+
+ @Test
+ public void testDestroyObject() {
+ PooledObject<MQAdminExt> mqAdmin = mock(PooledObject.class);
+ Assert.assertNotNull(mqAdmin);
+ MQAdminExt mqAdminExt = mock(MQAdminExt.class);
+ doNothing().doThrow(new RuntimeException("shutdown exception")).when(mqAdminExt).shutdown();
+ when(mqAdmin.getObject()).thenReturn(mqAdminExt);
+ // shutdown
+ mqAdminPooledObjectFactory.destroyObject(mqAdmin);
+ // exception
+ mqAdminPooledObjectFactory.destroyObject(mqAdmin);
+ }
+
+ @Test
+ public void testValidateObject() throws Exception {
+ PooledObject<MQAdminExt> mqAdmin = mock(PooledObject.class);
+ Assert.assertNotNull(mqAdmin);
+ MQAdminExt mqAdminExt = mock(MQAdminExt.class);
+ ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo();
+ clusterInfo.getBrokerAddrTable().clear();
+ when(mqAdminExt.examineBrokerClusterInfo())
+ .thenThrow(new RuntimeException("examineBrokerClusterInfo exception"))
+ .thenReturn(null)
+ .thenReturn(new ClusterInfo())
+ .thenReturn(clusterInfo)
+ .thenReturn(MockObjectUtil.createClusterInfo());
+ when(mqAdmin.getObject()).thenReturn(mqAdminExt);
+ // exception
+ Assert.assertFalse(mqAdminPooledObjectFactory.validateObject(mqAdmin));
+ // clusterInfo == null
+ Assert.assertFalse(mqAdminPooledObjectFactory.validateObject(mqAdmin));
+ // clusterInfo.getBrokerAddrTable() == null
+ Assert.assertFalse(mqAdminPooledObjectFactory.validateObject(mqAdmin));
+ // clusterInfo.getBrokerAddrTable().size() <= 0
+ Assert.assertFalse(mqAdminPooledObjectFactory.validateObject(mqAdmin));
+ // pass validate
+ Assert.assertTrue(mqAdminPooledObjectFactory.validateObject(mqAdmin));
+
+ }
+}
diff --git a/src/test/java/org/apache/rocketmq/dashboard/config/RMQConfigureTest.java b/src/test/java/org/apache/rocketmq/dashboard/config/RMQConfigureTest.java
index 462e648..56c48e3 100644
--- a/src/test/java/org/apache/rocketmq/dashboard/config/RMQConfigureTest.java
+++ b/src/test/java/org/apache/rocketmq/dashboard/config/RMQConfigureTest.java
@@ -40,6 +40,7 @@ public class RMQConfigureTest {
rmqConfigure.setLoginRequired(true);
rmqConfigure.setMsgTrackTopicName(null);
rmqConfigure.setNamesrvAddr("127.0.0.1:9876");
+ rmqConfigure.setTimeoutMillis(3000L);
}
@Test
@@ -56,6 +57,7 @@ public class RMQConfigureTest {
Assert.assertTrue(rmqConfigure.isLoginRequired());
Assert.assertEquals(rmqConfigure.getMsgTrackTopicNameOrDefault(), TopicValidator.RMQ_SYS_TRACE_TOPIC);
Assert.assertEquals(rmqConfigure.getNamesrvAddr(), "127.0.0.1:9876");
+ Assert.assertEquals(rmqConfigure.getTimeoutMillis().longValue(), 3000L);
ErrorPageRegistrar registrar = rmqConfigure.errorPageRegistrar();
registrar.registerErrorPages(new ErrorPageRegistry() {
@Override
diff --git a/src/test/java/org/apache/rocketmq/dashboard/controller/OpsControllerTest.java b/src/test/java/org/apache/rocketmq/dashboard/controller/OpsControllerTest.java
index 37edbfb..f864755 100644
--- a/src/test/java/org/apache/rocketmq/dashboard/controller/OpsControllerTest.java
+++ b/src/test/java/org/apache/rocketmq/dashboard/controller/OpsControllerTest.java
@@ -19,14 +19,17 @@ package org.apache.rocketmq.dashboard.controller;
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.rocketmq.dashboard.service.checker.RocketMqChecker;
import org.apache.rocketmq.dashboard.service.checker.impl.ClusterHealthCheckerImpl;
import org.apache.rocketmq.dashboard.service.checker.impl.TopicOnlyOneBrokerCheckerImpl;
import org.apache.rocketmq.dashboard.service.impl.OpsServiceImpl;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InjectMocks;
+import org.mockito.Mock;
import org.mockito.Spy;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
@@ -44,6 +47,9 @@ public class OpsControllerTest extends BaseControllerTest {
@Spy
private OpsServiceImpl opsService;
+ @Mock
+ private GenericObjectPool<MQAdminExt> mqAdminExtPool;
+
@Before
public void init() {
super.mockRmqConfigure();
@@ -67,6 +73,7 @@ public class OpsControllerTest extends BaseControllerTest {
final String url = "/ops/updateNameSvrAddr.do";
{
doNothing().when(configure).setNamesrvAddr(anyString());
+ doNothing().when(mqAdminExtPool).clear();
}
requestBuilder = MockMvcRequestBuilders.post(url);
requestBuilder.param("nameSvrAddrList", "127.0.0.1:9876");
@@ -89,6 +96,20 @@ public class OpsControllerTest extends BaseControllerTest {
.andExpect(jsonPath("$.data").value(true));
}
+
+ @Test
+ public void testUpdateUseTLS() throws Exception {
+ final String url = "/ops/updateUseTLS.do";
+ {
+ doNothing().when(configure).setUseTLS(true);
+ }
+ requestBuilder = MockMvcRequestBuilders.post(url);
+ requestBuilder.param("useTLS", "true");
+ perform = mockMvc.perform(requestBuilder);
+ perform.andExpect(status().isOk())
+ .andExpect(jsonPath("$.data").value(true));
+ }
+
@Test
public void testClusterStatus() throws Exception {
final String url = "/ops/rocketMqStatus.query";