You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/02/02 08:51:38 UTC
[rocketmq] branch snode updated: [ISSUE #747]Optimize and adjust
codes on Snode.
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/snode by this push:
new c47978d [ISSUE #747]Optimize and adjust codes on Snode.
new 3b62168 Merge pull request #748 from zongtanghu/snode
c47978d is described below
commit c47978d62f07c0d7864d4567cf4a8528466ba5df
Author: huzongtang <hu...@cmss.chinamobile.com>
AuthorDate: Sat Feb 2 15:22:47 2019 +0800
[ISSUE #747]Optimize and adjust codes on Snode.
---
.../org/apache/rocketmq/common}/SnodeConfig.java | 8 +++----
.../org/apache/rocketmq/snode/SnodeController.java | 2 +-
.../org/apache/rocketmq/snode/SnodeStartup.java | 26 ++++++++++++++++++++--
.../snode/client/ClientHousekeepingService.java | 2 +-
.../snode/processor/ConsumerManageProcessor.java | 2 +-
.../processor/DefaultMqttMessageProcessor.java | 4 ++++
.../rocketmq/snode/service/NnodeService.java | 2 +-
.../snode/service/impl/NnodeServiceImpl.java | 2 +-
.../snode/service/impl/ScheduledServiceImpl.java | 2 +-
.../apache/rocketmq/snode/SnodeControllerTest.java | 2 +-
.../processor/DefaultMqttMessageProcessorTest.java | 2 +-
.../processor/MqttConnectMessageHandlerTest.java | 2 +-
.../snode/processor/SendMessageProcessorTest.java | 2 +-
.../snode/service/EnodeServiceImplTest.java | 2 +-
.../snode/service/NnodeServiceImplTest.java | 2 +-
.../snode/service/SlowConsumerServiceImplTest.java | 2 +-
16 files changed, 45 insertions(+), 19 deletions(-)
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java
similarity index 98%
rename from snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
rename to common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java
index 6c51fad..fd3b299 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java
@@ -14,21 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.snode.config;
+package org.apache.rocketmq.common;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
-import static org.apache.rocketmq.client.ClientConfig.SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY;
public class SnodeConfig {
- private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+
+ public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel";
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
index 43b0b5f..b47e3b5 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
@@ -24,6 +24,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.acl.AccessValidator;
+import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
@@ -55,7 +56,6 @@ import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
import org.apache.rocketmq.snode.client.impl.ProducerManagerImpl;
import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl;
import org.apache.rocketmq.snode.client.impl.SubscriptionManagerImpl;
-import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.offset.ConsumerOffsetManager;
import org.apache.rocketmq.snode.processor.ConsumerManageProcessor;
import org.apache.rocketmq.snode.processor.DefaultMqttMessageProcessor;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
index d6060a9..232def9 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
@@ -26,11 +26,13 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -39,7 +41,6 @@ import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.slf4j.LoggerFactory;
@@ -123,19 +124,31 @@ public class SnodeStartup {
nettyClientConfig,
snodeConfig);
+ boolean initResult = snodeController.initialize();
+ if (!initResult) {
+ snodeController.shutdown();
+ System.exit(-3);
+ }
+
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
+ private AtomicInteger shutdownTimes = new AtomicInteger(0);
@Override
public void run() {
synchronized (this) {
+ log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
+
if (!this.hasShutdown) {
this.hasShutdown = true;
+ long beginTime = System.currentTimeMillis();
snodeController.shutdown();
+ long consumingTimeTotal = System.currentTimeMillis() - beginTime;
+ log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
}
}
}
- }));
+ },"ShutdownHook"));
return snodeController;
}
@@ -143,6 +156,15 @@ public class SnodeStartup {
Option opt = new Option("c", "configFile", true, "SNode config properties file");
opt.setRequired(false);
options.addOption(opt);
+
+ opt = new Option("p", "printConfigItem", false, "Print all config item");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("m", "printImportantConfig", false, "Print important config item");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return options;
}
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
index 1db3f9c..910d5be 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
@@ -29,7 +29,7 @@ import org.apache.rocketmq.snode.client.impl.ClientRole;
import org.apache.rocketmq.snode.constant.SnodeConstant;
public class ClientHousekeepingService implements ChannelEventListener {
- private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final ClientManager producerManager;
private final ClientManager consumerManager;
private final ClientManager iotClientManager;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
index 4cd54d6..0bb0647 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
@@ -43,7 +43,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
public class ConsumerManageProcessor implements RequestProcessor {
- private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final SnodeController snodeController;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java
index 1a3ced1..1ca672f 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java
@@ -28,6 +28,9 @@ import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.RemotingUtil;
@@ -38,6 +41,7 @@ import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload;
import org.apache.rocketmq.snode.processor.mqtthandler.MessageHandler;
public class DefaultMqttMessageProcessor implements RequestProcessor {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private Map<MqttMessageType, MessageHandler> type2handler = new HashMap<>();
private static final int MIN_AVAILABLE_VERSION = 3;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java
index 84f224b..f5a6600 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java
@@ -18,12 +18,12 @@ package org.apache.rocketmq.snode.service;
import java.util.Set;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.snode.config.SnodeConfig;
public interface NnodeService {
/**
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
index 8dcdf0c..5758555 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
@@ -22,6 +22,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.RequestCode;
@@ -37,7 +38,6 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
-import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.constant.SnodeConstant;
import org.apache.rocketmq.snode.service.NnodeService;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
index 685af3f..0e3479a 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
@@ -20,6 +20,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
@@ -28,7 +29,6 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
-import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.service.ScheduledService;
public class ScheduledServiceImpl implements ScheduledService {
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java b/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java
index ff5235b..414f06b 100644
--- a/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java
+++ b/snode/src/test/java/org/apache/rocketmq/snode/SnodeControllerTest.java
@@ -16,9 +16,9 @@
*/
package org.apache.rocketmq.snode;
+import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
-import org.apache.rocketmq.snode.config.SnodeConfig;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java
index 2a2a445..7186306 100644
--- a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java
+++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java
@@ -22,6 +22,7 @@ import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
+import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel;
@@ -33,7 +34,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload;
import org.apache.rocketmq.snode.SnodeController;
-import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler;
import org.junit.Before;
import org.junit.Test;
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java
index b610057..8846ef1 100644
--- a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java
+++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java
@@ -22,11 +22,11 @@ import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
+import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.SnodeController;
-import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler;
import org.junit.Test;
import org.junit.runner.RunWith;
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java
index a7a2667..2573f02 100644
--- a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java
+++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.snode.processor;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
@@ -28,7 +29,6 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
-import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.service.EnodeService;
import org.apache.rocketmq.snode.service.NnodeService;
import org.junit.Before;
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java
index c228a83..4255c77 100644
--- a/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java
+++ b/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.snode.service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
@@ -30,7 +31,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.SnodeTestBase;
-import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.service.impl.EnodeServiceImpl;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java
index 45a19dd..51ff8ff 100644
--- a/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java
+++ b/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.snode.service;
import java.util.ArrayList;
import java.util.List;
+import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
@@ -27,7 +28,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.SnodeTestBase;
-import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl;
import org.junit.Before;
import org.junit.Test;
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java
index 6a25009..5338a23 100644
--- a/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java
+++ b/snode/src/test/java/org/apache/rocketmq/snode/service/SlowConsumerServiceImplTest.java
@@ -16,12 +16,12 @@
*/
package org.apache.rocketmq.snode.service;
+import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.SlowConsumerService;
import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl;
-import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.offset.ConsumerOffsetManager;
import org.junit.Before;
import org.junit.Test;