You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/24 02:32:51 UTC

[rocketmq] branch snode updated: Add slow consumer process service for push model

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/snode by this push:
     new 5440b1c  Add slow consumer process service for push model
5440b1c is described below

commit 5440b1c4862e59c02bcf569888c7f224138bc3c9
Author: duhenglucky <du...@gmail.com>
AuthorDate: Thu Jan 24 10:32:19 2019 +0800

    Add slow consumer process service for push model
---
 .../org/apache/rocketmq/snode/SnodeController.java | 48 +++++++------
 .../org/apache/rocketmq/snode/client/Client.java   | 21 ++++--
 .../snode/client/ClientHousekeepingService.java    |  6 +-
 .../rocketmq/snode/client/SlowConsumerService.java |  3 +-
 .../snode/client/SubscriptionGroupManager.java     | 83 +---------------------
 .../rocketmq/snode/client/SubscriptionManager.java |  4 +-
 .../snode/client/impl/ClientManagerImpl.java       |  1 +
 .../snode/client/impl/ConsumerManagerImpl.java     |  4 +-
 .../snode/client/impl/SlowConsumerServiceImpl.java | 35 ++++++++-
 .../snode/client/impl/SubscriptionManagerImpl.java |  4 +-
 .../apache/rocketmq/snode/config/SnodeConfig.java  |  7 ++
 .../rocketmq/snode/constant/SnodeConstant.java     |  8 +--
 .../snode/offset/ConsumerOffsetManager.java        | 22 ++----
 .../snode/processor/ConsumerManageProcessor.java   |  2 +-
 .../snode/processor/HeartbeatProcessor.java        | 32 ++++-----
 .../snode/processor/SendMessageProcessor.java      |  2 +-
 .../snode/service/ConsumerOffsetService.java       | 20 ------
 .../apache/rocketmq/snode/service/PushService.java |  4 +-
 .../snode/service/impl/ClientServiceImpl.java      |  2 +-
 .../snode/service/impl/PushServiceImpl.java        | 21 +++---
 20 files changed, 133 insertions(+), 196 deletions(-)

diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
index 18631ea..0707db5 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
@@ -39,10 +39,12 @@ import org.apache.rocketmq.remoting.interceptor.InterceptorFactory;
 import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
 import org.apache.rocketmq.snode.client.ClientHousekeepingService;
 import org.apache.rocketmq.snode.client.ClientManager;
+import org.apache.rocketmq.snode.client.SlowConsumerService;
 import org.apache.rocketmq.snode.client.SubscriptionGroupManager;
 import org.apache.rocketmq.snode.client.SubscriptionManager;
 import org.apache.rocketmq.snode.client.impl.ConsumerManagerImpl;
 import org.apache.rocketmq.snode.client.impl.ProducerManagerImpl;
+import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl;
 import org.apache.rocketmq.snode.client.impl.SubscriptionManagerImpl;
 import org.apache.rocketmq.snode.config.SnodeConfig;
 import org.apache.rocketmq.snode.offset.ConsumerOffsetManager;
@@ -77,10 +79,8 @@ public class SnodeController {
     private NnodeService nnodeService;
     private ExecutorService consumerManagerExecutor;
     private ScheduledService scheduledService;
-//    private ProducerManager producerManager;
-//    private ConsumerManager consumerManager;
-    private ClientManager producerManagerImpl;
-    private ClientManager consumerManagerImpl;
+    private ClientManager producerManager;
+    private ClientManager consumerManager;
     private SubscriptionManager subscriptionManager;
     private ClientHousekeepingService clientHousekeepingService;
     private SubscriptionGroupManager subscriptionGroupManager;
@@ -94,6 +94,7 @@ public class SnodeController {
     private InterceptorGroup sendMessageInterceptorGroup;
     private PushService pushService;
     private ClientService clientService;
+    private SlowConsumerService slowConsumerService;
 
     private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
         "SnodeControllerScheduledThread"));
@@ -172,9 +173,10 @@ public class SnodeController {
         this.pushService = new PushServiceImpl(this);
         this.clientService = new ClientServiceImpl(this);
         this.subscriptionManager = new SubscriptionManagerImpl();
-        this.producerManagerImpl = new ProducerManagerImpl();
-        this.consumerManagerImpl = new ConsumerManagerImpl(this);
-        this.clientHousekeepingService = new ClientHousekeepingService(this.producerManagerImpl, this.consumerManagerImpl);
+        this.producerManager = new ProducerManagerImpl();
+        this.consumerManager = new ConsumerManagerImpl(this);
+        this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager, this.consumerManager);
+        this.slowConsumerService = new SlowConsumerServiceImpl(this);
     }
 
     public SnodeConfig getSnodeConfig() {
@@ -257,18 +259,10 @@ public class SnodeController {
         this.pushService.shutdown();
     }
 
-//    public ProducerManager getProducerManager() {
-//        return producerManager;
-//    }
-
     public RemotingServer getSnodeServer() {
         return snodeServer;
     }
 
-//    public ConsumerManager getConsumerManager() {
-//        return consumerManager;
-//    }
-
     public SubscriptionGroupManager getSubscriptionGroupManager() {
         return subscriptionGroupManager;
     }
@@ -326,20 +320,20 @@ public class SnodeController {
         this.remotingServerInterceptorGroup = remotingServerInterceptorGroup;
     }
 
-    public ClientManager getProducerManagerImpl() {
-        return producerManagerImpl;
+    public ClientManager getProducerManager() {
+        return producerManager;
     }
 
-    public void setProducerManagerImpl(ClientManager producerManagerImpl) {
-        this.producerManagerImpl = producerManagerImpl;
+    public void setProducerManager(ClientManager producerManager) {
+        this.producerManager = producerManager;
     }
 
-    public ClientManager getConsumerManagerImpl() {
-        return consumerManagerImpl;
+    public ClientManager getConsumerManager() {
+        return consumerManager;
     }
 
-    public void setConsumerManagerImpl(ClientManager consumerManagerImpl) {
-        this.consumerManagerImpl = consumerManagerImpl;
+    public void setConsumerManager(ClientManager consumerManager) {
+        this.consumerManager = consumerManager;
     }
 
     public SubscriptionManager getSubscriptionManager() {
@@ -357,4 +351,12 @@ public class SnodeController {
     public void setClientService(ClientService clientService) {
         this.clientService = clientService;
     }
+
+    public SlowConsumerService getSlowConsumerService() {
+        return slowConsumerService;
+    }
+
+    public void setSlowConsumerService(SlowConsumerService slowConsumerService) {
+        this.slowConsumerService = slowConsumerService;
+    }
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java b/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java
index 2a3d52a..2376b70 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java
@@ -53,9 +53,7 @@ public class Client {
         if (o == null || getClass() != o.getClass())
             return false;
         Client client = (Client) o;
-        return heartbeatInterval == client.heartbeatInterval &&
-            lastUpdateTimestamp == client.lastUpdateTimestamp &&
-            version == client.version &&
+        return version == client.version &&
             clientRole == client.clientRole &&
             Objects.equals(groupId, client.groupId) &&
             Objects.equals(clientId, client.clientId) &&
@@ -65,7 +63,7 @@ public class Client {
 
     @Override
     public int hashCode() {
-        return Objects.hash(clientRole, groupId, clientId, remotingChannel, heartbeatInterval, lastUpdateTimestamp, version, language);
+        return Objects.hash(clientRole, groupId, clientId, remotingChannel, version, language);
     }
 
     public String getGroupId() {
@@ -123,4 +121,19 @@ public class Client {
     public void setLanguage(LanguageCode language) {
         this.language = language;
     }
+
+    @Override public String toString() {
+        return "Client{" +
+            "clientRole=" + clientRole +
+            ", groupId='" + groupId + '\'' +
+            ", clientId='" + clientId + '\'' +
+            ", remotingChannel=" + remotingChannel +
+            ", heartbeatInterval=" + heartbeatInterval +
+            ", lastUpdateTimestamp=" + lastUpdateTimestamp +
+            ", version=" + version +
+            ", language=" + language +
+            '}';
+    }
 }
+
+
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
index a86d14f..42ceacf 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
@@ -52,9 +52,9 @@ public class ClientHousekeepingService implements ChannelEventListener {
     private ClientRole clientRole(RemotingChannel remotingChannel) {
         if (remotingChannel instanceof NettyChannelImpl) {
             Channel channel = ((NettyChannelImpl) remotingChannel).getChannel();
-            Attribute<ClientRole> clientRoleAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ROLE_ATTRIBUTE_KEY);
-            if (clientRoleAttribute != null) {
-                return clientRoleAttribute.get();
+            Attribute<Client> clientAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ATTRIBUTE_KEY);
+            if (clientAttribute != null) {
+                return clientAttribute.get().getClientRole();
             }
         }
         log.warn("RemotingChannel type error: {}", remotingChannel.getClass());
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/SlowConsumerService.java b/snode/src/main/java/org/apache/rocketmq/snode/client/SlowConsumerService.java
index b341e69..1fdea74 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/SlowConsumerService.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/SlowConsumerService.java
@@ -20,5 +20,6 @@ import org.apache.rocketmq.remoting.RemotingChannel;
 
 public interface SlowConsumerService {
 
-    boolean isSlowConsumer(long latestLogicOffset, String topic, String queueId, RemotingChannel remotingChannel);
+    boolean isSlowConsumer(long latestLogicOffset, String topic, int queueId, RemotingChannel remotingChannel,
+        String enodeName);
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
index e153ef1..ab013b8 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
@@ -16,8 +16,6 @@
  */
 package org.apache.rocketmq.snode.client;
 
-import java.util.Iterator;
-import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.common.DataVersion;
@@ -26,20 +24,15 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 import org.apache.rocketmq.snode.SnodeController;
 
 public class SubscriptionGroupManager {
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
 
-    private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
-        new ConcurrentHashMap<>(1024);
+    private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable = new ConcurrentHashMap<>(1024);
     private final DataVersion dataVersion = new DataVersion();
-    private transient SnodeController snodeController;
 
-    public SubscriptionGroupManager() {
-        this.init();
-    }
+    private transient SnodeController snodeController;
 
     public SubscriptionGroupManager(SnodeController snodeController) {
         this.snodeController = snodeController;
@@ -47,51 +40,6 @@ public class SubscriptionGroupManager {
     }
 
     private void init() {
-        {
-            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
-            subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP);
-            this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig);
-        }
-
-        {
-            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
-            subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP);
-            this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig);
-        }
-
-        {
-            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
-            subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP);
-            this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig);
-        }
-
-        {
-            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
-            subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP);
-            subscriptionGroupConfig.setConsumeBroadcastEnable(true);
-            this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig);
-        }
-
-        {
-            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
-            subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP);
-            subscriptionGroupConfig.setConsumeBroadcastEnable(true);
-            this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig);
-        }
-
-        {
-            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
-            subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP);
-            subscriptionGroupConfig.setConsumeBroadcastEnable(true);
-            this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig);
-        }
-
-        {
-            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
-            subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP);
-            subscriptionGroupConfig.setConsumeBroadcastEnable(true);
-            this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig);
-        }
     }
 
     public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
@@ -133,32 +81,7 @@ public class SubscriptionGroupManager {
         return subscriptionGroupConfig;
     }
 
-    public String encode() {
-        return this.encode(false);
-    }
 
-    public void decode(String jsonString) {
-        if (jsonString != null) {
-            SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);
-            if (obj != null) {
-                this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable);
-                this.dataVersion.assignNewOne(obj.dataVersion);
-                this.printLoadDataWhenFirstBoot(obj);
-            }
-        }
-    }
-
-    public String encode(final boolean prettyFormat) {
-        return RemotingSerializable.toJson(this, prettyFormat);
-    }
-
-    private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager sgm) {
-        Iterator<Entry<String, SubscriptionGroupConfig>> it = sgm.getSubscriptionGroupTable().entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<String, SubscriptionGroupConfig> next = it.next();
-            log.info("load exist subscription group, {}", next.getValue().toString());
-        }
-    }
 
     public ConcurrentMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {
         return subscriptionGroupTable;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java
index 099dc12..a40bbb6 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java
@@ -35,9 +35,9 @@ public interface SubscriptionManager {
 
     Subscription getSubscription(String groupId);
 
-    void registerPush(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel, String groupId);
+    void registerPushSession(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel, String groupId);
 
-    void removePush(RemotingChannel remotingChannel);
+    void removePushSession(RemotingChannel remotingChannel);
 
     Set<RemotingChannel> getPushableChannel(String topic, Integer queueId);
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java
index 9aae903..34d7948 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java
@@ -122,6 +122,7 @@ public abstract class ClientManagerImpl implements ClientManager {
             }
             oldClient.setLastUpdateTimestamp(System.currentTimeMillis());
         }
+        log.debug("Register client role: {}, group: {}, last: {}", client.getClientRole(), client.getGroupId(), client.getLastUpdateTimestamp());
         onRegister(client.getGroupId(), client.getRemotingChannel());
         return updated;
     }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ConsumerManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ConsumerManagerImpl.java
index cf9492c..fb6693c 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ConsumerManagerImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ConsumerManagerImpl.java
@@ -39,12 +39,12 @@ public class ConsumerManagerImpl extends ClientManagerImpl {
     @Override
     public void onClosed(String groupId, RemotingChannel remotingChannel) {
         this.snodeController.getClientService().notifyConsumer(groupId);
-        this.snodeController.getSubscriptionManager().removePush(remotingChannel);
+        this.snodeController.getSubscriptionManager().removePushSession(remotingChannel);
     }
 
     @Override
     public void onUnregister(String groupId, RemotingChannel remotingChannel) {
         this.snodeController.getClientService().notifyConsumer(groupId);
-        this.snodeController.getSubscriptionManager().removePush(remotingChannel);
+        this.snodeController.getSubscriptionManager().removePushSession(remotingChannel);
     }
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java
index 9b95301..acf6d50 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java
@@ -16,14 +16,45 @@
  */
 package org.apache.rocketmq.snode.client.impl;
 
+import io.netty.channel.Channel;
+import io.netty.util.Attribute;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
+import org.apache.rocketmq.snode.SnodeController;
+import org.apache.rocketmq.snode.client.Client;
 import org.apache.rocketmq.snode.client.SlowConsumerService;
+import org.apache.rocketmq.snode.constant.SnodeConstant;
 
 public class SlowConsumerServiceImpl implements SlowConsumerService {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+
+    private final SnodeController snodeController;
+
+    public SlowConsumerServiceImpl(SnodeController snodeController) {
+        this.snodeController = snodeController;
+    }
 
     @Override
-    public boolean isSlowConsumer(long latestLogicOffset, String topic, String queueId,
-        RemotingChannel remotingChannel) {
+    public boolean isSlowConsumer(long latestLogicOffset, String topic, int queueId,
+        RemotingChannel remotingChannel, String enodeName) {
+        Client client = null;
+        if (remotingChannel instanceof NettyChannelImpl) {
+            Channel channel = ((NettyChannelImpl) remotingChannel).getChannel();
+            Attribute<Client> clientAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ATTRIBUTE_KEY);
+            if (clientAttribute != null) {
+                client = clientAttribute.get();
+            }
+        }
+        if (client != null) {
+            long ackedOffset = this.snodeController.getConsumerOffsetManager().queryOffset(enodeName, client.getGroupId(), topic, queueId);
+            if (latestLogicOffset - ackedOffset > snodeController.getSnodeConfig().getSlowConsumerThreshold()) {
+                log.warn("[SlowConsumer] group: {}, lastAckedOffset:{} nowOffset:{} ", client.getGroupId(), ackedOffset, latestLogicOffset);
+                return true;
+            }
+        }
         return false;
     }
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
index e44f9a9..9011c5f 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
@@ -52,7 +52,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
     }
 
     @Override
-    public void registerPush(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel,
+    public void registerPushSession(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel,
         String groupId) {
         Set<String> prevSubSet = this.clientSubscriptionTable.get(remotingChannel);
         Set<String> keySet = new HashSet<>();
@@ -89,7 +89,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
     }
 
     @Override
-    public void removePush(RemotingChannel remotingChannel) {
+    public void removePushSession(RemotingChannel remotingChannel) {
         Set<String> subSet = this.clientSubscriptionTable.get(remotingChannel);
         if (subSet != null) {
             for (String key : subSet) {
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java b/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
index fa16819..3143d7d 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
@@ -284,4 +284,11 @@ public class SnodeConfig {
         return remotingServerInterceptorPath;
     }
 
+    public int getSlowConsumerThreshold() {
+        return slowConsumerThreshold;
+    }
+
+    public void setSlowConsumerThreshold(int slowConsumerThreshold) {
+        this.slowConsumerThreshold = slowConsumerThreshold;
+    }
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java b/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
index 9fe315e..16deb6d 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.snode.constant;
 
 import io.netty.util.AttributeKey;
+import org.apache.rocketmq.snode.client.Client;
 import org.apache.rocketmq.snode.client.impl.ClientRole;
 
 public class SnodeConstant {
@@ -30,10 +31,5 @@ public class SnodeConstant {
 
     public static final AttributeKey<ClientRole> NETTY_CLIENT_ROLE_ATTRIBUTE_KEY = AttributeKey.valueOf("netty.client.role");
 
-    public static final String NETTY_PRODUCER_ROLE_ATTRIBUTE_VALUE = "Producer";
-
-    public static final String NETTY_CONSUMER_ROLE_ATTRIBUTE_VALUE = "Consumer";
-
-    public static final String NETTY_IOT_ROLE_ATTRIBUTE_VALUE = "IOTGroup";
-
+    public static final AttributeKey<Client> NETTY_CLIENT_ATTRIBUTE_KEY = AttributeKey.valueOf("netty.client");
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
index 7cd485c..7c41736 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
@@ -39,7 +39,7 @@ public class ConsumerOffsetManager {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
     private static final String TOPIC_GROUP_SEPARATOR = "@";
 
-    private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
+    private ConcurrentMap<String/* Enode@Topic@Group */, ConcurrentMap<Integer, Long>> offsetTable =
         new ConcurrentHashMap<>(512);
 
     private transient SnodeController snodeController;
@@ -88,11 +88,10 @@ public class ConsumerOffsetManager {
         return result;
     }
 
-
     public void commitOffset(final String enodeName, final String clientHost, final String group, final String topic,
         final int queueId,
         final long offset) {
-        // topic@group
+        // Topic@group
         String key = buildKey(enodeName, topic, group);
         this.commitOffset(clientHost, key, queueId, offset);
     }
@@ -101,12 +100,13 @@ public class ConsumerOffsetManager {
         ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
         if (null == map) {
             map = new ConcurrentHashMap<>(32);
+            ConcurrentMap<Integer, Long> prev = this.offsetTable.putIfAbsent(key, map);
+            map = prev != null ? prev : map;
             map.put(queueId, offset);
-            this.offsetTable.put(key, map);
         } else {
             Long storeOffset = map.put(queueId, offset);
             if (storeOffset != null && offset < storeOffset) {
-                log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
+                log.warn("[NOTIFYME]update consumer offset less than store. clientHost: {}, key: {}, queueId: {}, requestOffset: {}, storeOffset: {}", clientHost, key, queueId, offset, storeOffset);
             }
         }
     }
@@ -123,18 +123,6 @@ public class ConsumerOffsetManager {
         return -1;
     }
 
-    public String encode() {
-        return this.encode(false);
-    }
-
-    public void decode(String jsonString) {
-        if (jsonString != null) {
-            ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
-            if (obj != null) {
-                this.offsetTable = obj.offsetTable;
-            }
-        }
-    }
 
     public String encode(final boolean prettyFormat) {
         return RemotingSerializable.toJson(this, prettyFormat);
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
index 9f044ec..4cd54d6 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
@@ -126,7 +126,7 @@ public class ConsumerManageProcessor implements RequestProcessor {
             (GetConsumerListByGroupRequestHeader) request
                 .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
 
-        List<String> clientIds = this.snodeController.getConsumerManagerImpl().getAllClientId(requestHeader.getConsumerGroup());
+        List<String> clientIds = this.snodeController.getConsumerManager().getAllClientId(requestHeader.getConsumerGroup());
         if (!clientIds.isEmpty()) {
             GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody();
             body.setConsumerIdList(clientIds);
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
index 55bbe82..3d2c03a 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
@@ -63,47 +63,41 @@ public class HeartbeatProcessor implements RequestProcessor {
 
     private RemotingCommand register(RemotingChannel remotingChannel, RemotingCommand request) {
         HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
-        Channel channel;
-        ClientRole role = null;
-        Attribute<ClientRole> clientRoleAttribute = null;
+        Channel channel = null;
+        Attribute<Client> clientAttribute = null;
         if (remotingChannel instanceof NettyChannelHandlerContextImpl) {
             channel = ((NettyChannelHandlerContextImpl) remotingChannel).getChannelHandlerContext().channel();
-            clientRoleAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ROLE_ATTRIBUTE_KEY);
+            clientAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ATTRIBUTE_KEY);
         }
         Client client = new Client();
         client.setClientId(heartbeatData.getClientID());
         client.setRemotingChannel(remotingChannel);
 
         for (ProducerData producerData : heartbeatData.getProducerDataSet()) {
-            role = ClientRole.Producer;
             client.setGroupId(producerData.getGroupName());
-            client.setClientRole(role);
-            this.snodeController.getProducerManagerImpl().register(client);
+            client.setClientRole(ClientRole.Producer);
+            this.snodeController.getProducerManager().register(client);
         }
 
         for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
             client.setGroupId(data.getGroupName());
-            role = ClientRole.Consumer;
-            client.setClientRole(role);
-            boolean channelChanged = this.snodeController.getConsumerManagerImpl().register(client);
+            client.setClientRole(ClientRole.Consumer);
+            boolean channelChanged = this.snodeController.getConsumerManager().register(client);
             boolean subscriptionChanged = this.snodeController.getSubscriptionManager().subscribe(data.getGroupName(),
                 data.getSubscriptionDataSet(),
                 data.getConsumeType(),
                 data.getMessageModel(),
                 data.getConsumeFromWhere());
             if (data.getConsumeType() == ConsumeType.CONSUME_PUSH) {
-                NettyChannelImpl nettyChannel = new NettyChannelImpl(((NettyChannelHandlerContextImpl)remotingChannel).getChannelHandlerContext().channel());
-                this.snodeController.getSubscriptionManager().registerPush(data.getSubscriptionDataSet(), nettyChannel, data.getGroupName());
+                NettyChannelImpl nettyChannel = new NettyChannelImpl(channel);
+                this.snodeController.getSubscriptionManager().registerPushSession(data.getSubscriptionDataSet(), nettyChannel, data.getGroupName());
             }
             if (subscriptionChanged || channelChanged) {
                 this.snodeController.getClientService().notifyConsumer(data.getGroupName());
             }
         }
-        if (role != null) {
-            log.debug("Set channel attribute value: {}", role);
-            clientRoleAttribute.setIfAbsent(role);
-        }
 
+        clientAttribute.setIfAbsent(client);
         RemotingCommand response = RemotingCommand.createResponseCommand(null);
         response.setCode(ResponseCode.SUCCESS);
         response.setRemark(null);
@@ -118,13 +112,13 @@ public class HeartbeatProcessor implements RequestProcessor {
 
         final String producerGroup = requestHeader.getProducerGroup();
         if (producerGroup != null) {
-            this.snodeController.getProducerManagerImpl().unRegister(producerGroup, remotingChannel);
+            this.snodeController.getProducerManager().unRegister(producerGroup, remotingChannel);
         }
 
         final String consumerGroup = requestHeader.getConsumerGroup();
         if (consumerGroup != null) {
-            this.snodeController.getConsumerManagerImpl().unRegister(consumerGroup, remotingChannel);
-            this.snodeController.getSubscriptionManager().removePush(remotingChannel);
+            this.snodeController.getConsumerManager().unRegister(consumerGroup, remotingChannel);
+            this.snodeController.getSubscriptionManager().removePushSession(remotingChannel);
             this.snodeController.getClientService().notifyConsumer(consumerGroup);
         }
 
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
index f9f6be2..b498fa5 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
@@ -75,7 +75,7 @@ public class SendMessageProcessor implements RequestProcessor {
                 }
                 remotingChannel.reply(data);
                 if (data.getCode() == ResponseCode.SUCCESS && isNeedPush) {
-                    this.snodeController.getPushService().pushMessage(topic, queueId, message, data);
+                    this.snodeController.getPushService().pushMessage(enodeName, topic, queueId, message, data);
                 }
             } else {
                 if (this.snodeController.getSendMessageInterceptorGroup() != null) {
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/ConsumerOffsetService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/ConsumerOffsetService.java
deleted file mode 100644
index 3699b08..0000000
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/ConsumerOffsetService.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.snode.service;
-public class ConsumerOffsetService {
-
-}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java
index fd61303..db24055 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java
@@ -20,10 +20,8 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public interface PushService {
 
-    void pushMessage(final String topic, final Integer queueId, final byte[] message,
+    void pushMessage(final String enodeName, final String topic, final Integer queueId, final byte[] message,
         final RemotingCommand response);
 
-    void start();
-
     void shutdown();
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java
index 5ce42bd..6e49c6a 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java
@@ -42,7 +42,7 @@ public class ClientServiceImpl implements ClientService {
             SubscriptionGroupConfig subscriptionGroupConfig = snodeController.getSubscriptionGroupManager().findSubscriptionGroupConfig(group);
             boolean notifyConsumer = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
             if (notifyConsumer) {
-                List<RemotingChannel> remotingChannels = snodeController.getConsumerManagerImpl().getChannels(group);
+                List<RemotingChannel> remotingChannels = snodeController.getConsumerManager().getChannels(group);
                 if (remotingChannels != null && snodeController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(group).isNotifyConsumerIdsChangedEnable()) {
                     for (RemotingChannel remotingChannel : remotingChannels) {
                         NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
index e1b2562..f6a3ac6 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
@@ -58,13 +58,15 @@ public class PushServiceImpl implements PushService {
         private final Integer queueId;
         private final String topic;
         private final RemotingCommand response;
+        private final String enodeName;
 
         public PushTask(final String topic, final Integer queueId, final byte[] message,
-            final RemotingCommand response) {
+            final RemotingCommand response, final String enodeName) {
             this.message = message;
             this.queueId = queueId;
             this.topic = topic;
             this.response = response;
+            this.enodeName = enodeName;
         }
 
         @Override
@@ -79,11 +81,16 @@ public class PushServiceImpl implements PushService {
                     RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.SNODE_PUSH_MESSAGE, pushMessageHeader);
                     pushMessage.setBody(message);
                     Set<RemotingChannel> consumerTable = snodeController.getSubscriptionManager().getPushableChannel(topic, queueId);
-                    log.info("Push message to consumerTable: {}", consumerTable);
                     if (consumerTable != null) {
                         for (RemotingChannel remotingChannel : consumerTable) {
                             if (remotingChannel.isWritable()) {
-                                log.info("Push message to remotingChannel: {}", remotingChannel.remoteAddress());
+                                boolean slowConsumer = snodeController.getSlowConsumerService().isSlowConsumer(sendMessageResponseHeader.getQueueOffset(), topic, queueId, remotingChannel, enodeName);
+                                if (slowConsumer) {
+                                    log.warn("[SlowConsumer]: {} closed as slow consumer", remotingChannel);//TODO metrics
+                                    remotingChannel.close();
+                                    continue;
+                                }
+                                log.debug("Push message to remotingChannel: {}", remotingChannel.remoteAddress());
                                 snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
                             } else {
                                 log.warn("Remoting channel is not writable: {}", remotingChannel.remoteAddress());
@@ -107,11 +114,11 @@ public class PushServiceImpl implements PushService {
     }
 
     @Override
-    public void pushMessage(final String topic, final Integer queueId, final byte[] message,
+    public void pushMessage(final String enodeName, final String topic, final Integer queueId, final byte[] message,
         final RemotingCommand response) {
         Set<RemotingChannel> pushableChannels = this.snodeController.getSubscriptionManager().getPushableChannel(topic, queueId);
         if (pushableChannels != null) {
-            PushTask pushTask = new PushTask(topic, queueId, message, response);
+            PushTask pushTask = new PushTask(topic, queueId, message, response, enodeName);
             pushMessageExecutorService.submit(pushTask);
         } else {
             log.info("Topic: {} QueueId: {} no need to push", topic, queueId);
@@ -119,10 +126,6 @@ public class PushServiceImpl implements PushService {
     }
 
     @Override
-    public void start() {
-    }
-
-    @Override
     public void shutdown() {
         this.pushMessageExecutorService.shutdown();
     }