You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/24 02:32:51 UTC
[rocketmq] branch snode updated: Add slow consumer process service
for push model
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/snode by this push:
new 5440b1c Add slow consumer process service for push model
5440b1c is described below
commit 5440b1c4862e59c02bcf569888c7f224138bc3c9
Author: duhenglucky <du...@gmail.com>
AuthorDate: Thu Jan 24 10:32:19 2019 +0800
Add slow consumer process service for push model
---
.../org/apache/rocketmq/snode/SnodeController.java | 48 +++++++------
.../org/apache/rocketmq/snode/client/Client.java | 21 ++++--
.../snode/client/ClientHousekeepingService.java | 6 +-
.../rocketmq/snode/client/SlowConsumerService.java | 3 +-
.../snode/client/SubscriptionGroupManager.java | 83 +---------------------
.../rocketmq/snode/client/SubscriptionManager.java | 4 +-
.../snode/client/impl/ClientManagerImpl.java | 1 +
.../snode/client/impl/ConsumerManagerImpl.java | 4 +-
.../snode/client/impl/SlowConsumerServiceImpl.java | 35 ++++++++-
.../snode/client/impl/SubscriptionManagerImpl.java | 4 +-
.../apache/rocketmq/snode/config/SnodeConfig.java | 7 ++
.../rocketmq/snode/constant/SnodeConstant.java | 8 +--
.../snode/offset/ConsumerOffsetManager.java | 22 ++----
.../snode/processor/ConsumerManageProcessor.java | 2 +-
.../snode/processor/HeartbeatProcessor.java | 32 ++++-----
.../snode/processor/SendMessageProcessor.java | 2 +-
.../snode/service/ConsumerOffsetService.java | 20 ------
.../apache/rocketmq/snode/service/PushService.java | 4 +-
.../snode/service/impl/ClientServiceImpl.java | 2 +-
.../snode/service/impl/PushServiceImpl.java | 21 +++---
20 files changed, 133 insertions(+), 196 deletions(-)
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
index 18631ea..0707db5 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
@@ -39,10 +39,12 @@ import org.apache.rocketmq.remoting.interceptor.InterceptorFactory;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.snode.client.ClientHousekeepingService;
import org.apache.rocketmq.snode.client.ClientManager;
+import org.apache.rocketmq.snode.client.SlowConsumerService;
import org.apache.rocketmq.snode.client.SubscriptionGroupManager;
import org.apache.rocketmq.snode.client.SubscriptionManager;
import org.apache.rocketmq.snode.client.impl.ConsumerManagerImpl;
import org.apache.rocketmq.snode.client.impl.ProducerManagerImpl;
+import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl;
import org.apache.rocketmq.snode.client.impl.SubscriptionManagerImpl;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.offset.ConsumerOffsetManager;
@@ -77,10 +79,8 @@ public class SnodeController {
private NnodeService nnodeService;
private ExecutorService consumerManagerExecutor;
private ScheduledService scheduledService;
-// private ProducerManager producerManager;
-// private ConsumerManager consumerManager;
- private ClientManager producerManagerImpl;
- private ClientManager consumerManagerImpl;
+ private ClientManager producerManager;
+ private ClientManager consumerManager;
private SubscriptionManager subscriptionManager;
private ClientHousekeepingService clientHousekeepingService;
private SubscriptionGroupManager subscriptionGroupManager;
@@ -94,6 +94,7 @@ public class SnodeController {
private InterceptorGroup sendMessageInterceptorGroup;
private PushService pushService;
private ClientService clientService;
+ private SlowConsumerService slowConsumerService;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"SnodeControllerScheduledThread"));
@@ -172,9 +173,10 @@ public class SnodeController {
this.pushService = new PushServiceImpl(this);
this.clientService = new ClientServiceImpl(this);
this.subscriptionManager = new SubscriptionManagerImpl();
- this.producerManagerImpl = new ProducerManagerImpl();
- this.consumerManagerImpl = new ConsumerManagerImpl(this);
- this.clientHousekeepingService = new ClientHousekeepingService(this.producerManagerImpl, this.consumerManagerImpl);
+ this.producerManager = new ProducerManagerImpl();
+ this.consumerManager = new ConsumerManagerImpl(this);
+ this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager, this.consumerManager);
+ this.slowConsumerService = new SlowConsumerServiceImpl(this);
}
public SnodeConfig getSnodeConfig() {
@@ -257,18 +259,10 @@ public class SnodeController {
this.pushService.shutdown();
}
-// public ProducerManager getProducerManager() {
-// return producerManager;
-// }
-
public RemotingServer getSnodeServer() {
return snodeServer;
}
-// public ConsumerManager getConsumerManager() {
-// return consumerManager;
-// }
-
public SubscriptionGroupManager getSubscriptionGroupManager() {
return subscriptionGroupManager;
}
@@ -326,20 +320,20 @@ public class SnodeController {
this.remotingServerInterceptorGroup = remotingServerInterceptorGroup;
}
- public ClientManager getProducerManagerImpl() {
- return producerManagerImpl;
+ public ClientManager getProducerManager() {
+ return producerManager;
}
- public void setProducerManagerImpl(ClientManager producerManagerImpl) {
- this.producerManagerImpl = producerManagerImpl;
+ public void setProducerManager(ClientManager producerManager) {
+ this.producerManager = producerManager;
}
- public ClientManager getConsumerManagerImpl() {
- return consumerManagerImpl;
+ public ClientManager getConsumerManager() {
+ return consumerManager;
}
- public void setConsumerManagerImpl(ClientManager consumerManagerImpl) {
- this.consumerManagerImpl = consumerManagerImpl;
+ public void setConsumerManager(ClientManager consumerManager) {
+ this.consumerManager = consumerManager;
}
public SubscriptionManager getSubscriptionManager() {
@@ -357,4 +351,12 @@ public class SnodeController {
public void setClientService(ClientService clientService) {
this.clientService = clientService;
}
+
+ public SlowConsumerService getSlowConsumerService() {
+ return slowConsumerService;
+ }
+
+ public void setSlowConsumerService(SlowConsumerService slowConsumerService) {
+ this.slowConsumerService = slowConsumerService;
+ }
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java b/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java
index 2a3d52a..2376b70 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java
@@ -53,9 +53,7 @@ public class Client {
if (o == null || getClass() != o.getClass())
return false;
Client client = (Client) o;
- return heartbeatInterval == client.heartbeatInterval &&
- lastUpdateTimestamp == client.lastUpdateTimestamp &&
- version == client.version &&
+ return version == client.version &&
clientRole == client.clientRole &&
Objects.equals(groupId, client.groupId) &&
Objects.equals(clientId, client.clientId) &&
@@ -65,7 +63,7 @@ public class Client {
@Override
public int hashCode() {
- return Objects.hash(clientRole, groupId, clientId, remotingChannel, heartbeatInterval, lastUpdateTimestamp, version, language);
+ return Objects.hash(clientRole, groupId, clientId, remotingChannel, version, language);
}
public String getGroupId() {
@@ -123,4 +121,19 @@ public class Client {
public void setLanguage(LanguageCode language) {
this.language = language;
}
+
+ @Override public String toString() {
+ return "Client{" +
+ "clientRole=" + clientRole +
+ ", groupId='" + groupId + '\'' +
+ ", clientId='" + clientId + '\'' +
+ ", remotingChannel=" + remotingChannel +
+ ", heartbeatInterval=" + heartbeatInterval +
+ ", lastUpdateTimestamp=" + lastUpdateTimestamp +
+ ", version=" + version +
+ ", language=" + language +
+ '}';
+ }
}
+
+
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
index a86d14f..42ceacf 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
@@ -52,9 +52,9 @@ public class ClientHousekeepingService implements ChannelEventListener {
private ClientRole clientRole(RemotingChannel remotingChannel) {
if (remotingChannel instanceof NettyChannelImpl) {
Channel channel = ((NettyChannelImpl) remotingChannel).getChannel();
- Attribute<ClientRole> clientRoleAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ROLE_ATTRIBUTE_KEY);
- if (clientRoleAttribute != null) {
- return clientRoleAttribute.get();
+ Attribute<Client> clientAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ATTRIBUTE_KEY);
+ if (clientAttribute != null) {
+ return clientAttribute.get().getClientRole();
}
}
log.warn("RemotingChannel type error: {}", remotingChannel.getClass());
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/SlowConsumerService.java b/snode/src/main/java/org/apache/rocketmq/snode/client/SlowConsumerService.java
index b341e69..1fdea74 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/SlowConsumerService.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/SlowConsumerService.java
@@ -20,5 +20,6 @@ import org.apache.rocketmq.remoting.RemotingChannel;
public interface SlowConsumerService {
- boolean isSlowConsumer(long latestLogicOffset, String topic, String queueId, RemotingChannel remotingChannel);
+ boolean isSlowConsumer(long latestLogicOffset, String topic, int queueId, RemotingChannel remotingChannel,
+ String enodeName);
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
index e153ef1..ab013b8 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
@@ -16,8 +16,6 @@
*/
package org.apache.rocketmq.snode.client;
-import java.util.Iterator;
-import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.DataVersion;
@@ -26,20 +24,15 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
import org.apache.rocketmq.snode.SnodeController;
public class SubscriptionGroupManager {
- private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
- private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
- new ConcurrentHashMap<>(1024);
+ private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable = new ConcurrentHashMap<>(1024);
private final DataVersion dataVersion = new DataVersion();
- private transient SnodeController snodeController;
- public SubscriptionGroupManager() {
- this.init();
- }
+ private transient SnodeController snodeController;
public SubscriptionGroupManager(SnodeController snodeController) {
this.snodeController = snodeController;
@@ -47,51 +40,6 @@ public class SubscriptionGroupManager {
}
private void init() {
- {
- SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
- subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP);
- this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig);
- }
-
- {
- SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
- subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP);
- this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig);
- }
-
- {
- SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
- subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP);
- this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig);
- }
-
- {
- SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
- subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP);
- subscriptionGroupConfig.setConsumeBroadcastEnable(true);
- this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig);
- }
-
- {
- SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
- subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP);
- subscriptionGroupConfig.setConsumeBroadcastEnable(true);
- this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig);
- }
-
- {
- SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
- subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP);
- subscriptionGroupConfig.setConsumeBroadcastEnable(true);
- this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig);
- }
-
- {
- SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
- subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP);
- subscriptionGroupConfig.setConsumeBroadcastEnable(true);
- this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig);
- }
}
public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
@@ -133,32 +81,7 @@ public class SubscriptionGroupManager {
return subscriptionGroupConfig;
}
- public String encode() {
- return this.encode(false);
- }
- public void decode(String jsonString) {
- if (jsonString != null) {
- SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);
- if (obj != null) {
- this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable);
- this.dataVersion.assignNewOne(obj.dataVersion);
- this.printLoadDataWhenFirstBoot(obj);
- }
- }
- }
-
- public String encode(final boolean prettyFormat) {
- return RemotingSerializable.toJson(this, prettyFormat);
- }
-
- private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager sgm) {
- Iterator<Entry<String, SubscriptionGroupConfig>> it = sgm.getSubscriptionGroupTable().entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, SubscriptionGroupConfig> next = it.next();
- log.info("load exist subscription group, {}", next.getValue().toString());
- }
- }
public ConcurrentMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {
return subscriptionGroupTable;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java
index 099dc12..a40bbb6 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java
@@ -35,9 +35,9 @@ public interface SubscriptionManager {
Subscription getSubscription(String groupId);
- void registerPush(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel, String groupId);
+ void registerPushSession(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel, String groupId);
- void removePush(RemotingChannel remotingChannel);
+ void removePushSession(RemotingChannel remotingChannel);
Set<RemotingChannel> getPushableChannel(String topic, Integer queueId);
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java
index 9aae903..34d7948 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java
@@ -122,6 +122,7 @@ public abstract class ClientManagerImpl implements ClientManager {
}
oldClient.setLastUpdateTimestamp(System.currentTimeMillis());
}
+ log.debug("Register client role: {}, group: {}, last: {}", client.getClientRole(), client.getGroupId(), client.getLastUpdateTimestamp());
onRegister(client.getGroupId(), client.getRemotingChannel());
return updated;
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ConsumerManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ConsumerManagerImpl.java
index cf9492c..fb6693c 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ConsumerManagerImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ConsumerManagerImpl.java
@@ -39,12 +39,12 @@ public class ConsumerManagerImpl extends ClientManagerImpl {
@Override
public void onClosed(String groupId, RemotingChannel remotingChannel) {
this.snodeController.getClientService().notifyConsumer(groupId);
- this.snodeController.getSubscriptionManager().removePush(remotingChannel);
+ this.snodeController.getSubscriptionManager().removePushSession(remotingChannel);
}
@Override
public void onUnregister(String groupId, RemotingChannel remotingChannel) {
this.snodeController.getClientService().notifyConsumer(groupId);
- this.snodeController.getSubscriptionManager().removePush(remotingChannel);
+ this.snodeController.getSubscriptionManager().removePushSession(remotingChannel);
}
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java
index 9b95301..acf6d50 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java
@@ -16,14 +16,45 @@
*/
package org.apache.rocketmq.snode.client.impl;
+import io.netty.channel.Channel;
+import io.netty.util.Attribute;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
+import org.apache.rocketmq.snode.SnodeController;
+import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.SlowConsumerService;
+import org.apache.rocketmq.snode.constant.SnodeConstant;
public class SlowConsumerServiceImpl implements SlowConsumerService {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+
+ private final SnodeController snodeController;
+
+ public SlowConsumerServiceImpl(SnodeController snodeController) {
+ this.snodeController = snodeController;
+ }
@Override
- public boolean isSlowConsumer(long latestLogicOffset, String topic, String queueId,
- RemotingChannel remotingChannel) {
+ public boolean isSlowConsumer(long latestLogicOffset, String topic, int queueId,
+ RemotingChannel remotingChannel, String enodeName) {
+ Client client = null;
+ if (remotingChannel instanceof NettyChannelImpl) {
+ Channel channel = ((NettyChannelImpl) remotingChannel).getChannel();
+ Attribute<Client> clientAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ATTRIBUTE_KEY);
+ if (clientAttribute != null) {
+ client = clientAttribute.get();
+ }
+ }
+ if (client != null) {
+ long ackedOffset = this.snodeController.getConsumerOffsetManager().queryOffset(enodeName, client.getGroupId(), topic, queueId);
+ if (latestLogicOffset - ackedOffset > snodeController.getSnodeConfig().getSlowConsumerThreshold()) {
+ log.warn("[SlowConsumer] group: {}, lastAckedOffset:{} nowOffset:{} ", client.getGroupId(), ackedOffset, latestLogicOffset);
+ return true;
+ }
+ }
return false;
}
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
index e44f9a9..9011c5f 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
@@ -52,7 +52,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
}
@Override
- public void registerPush(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel,
+ public void registerPushSession(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel,
String groupId) {
Set<String> prevSubSet = this.clientSubscriptionTable.get(remotingChannel);
Set<String> keySet = new HashSet<>();
@@ -89,7 +89,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
}
@Override
- public void removePush(RemotingChannel remotingChannel) {
+ public void removePushSession(RemotingChannel remotingChannel) {
Set<String> subSet = this.clientSubscriptionTable.get(remotingChannel);
if (subSet != null) {
for (String key : subSet) {
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java b/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
index fa16819..3143d7d 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
@@ -284,4 +284,11 @@ public class SnodeConfig {
return remotingServerInterceptorPath;
}
+ public int getSlowConsumerThreshold() {
+ return slowConsumerThreshold;
+ }
+
+ public void setSlowConsumerThreshold(int slowConsumerThreshold) {
+ this.slowConsumerThreshold = slowConsumerThreshold;
+ }
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java b/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
index 9fe315e..16deb6d 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.snode.constant;
import io.netty.util.AttributeKey;
+import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.impl.ClientRole;
public class SnodeConstant {
@@ -30,10 +31,5 @@ public class SnodeConstant {
public static final AttributeKey<ClientRole> NETTY_CLIENT_ROLE_ATTRIBUTE_KEY = AttributeKey.valueOf("netty.client.role");
- public static final String NETTY_PRODUCER_ROLE_ATTRIBUTE_VALUE = "Producer";
-
- public static final String NETTY_CONSUMER_ROLE_ATTRIBUTE_VALUE = "Consumer";
-
- public static final String NETTY_IOT_ROLE_ATTRIBUTE_VALUE = "IOTGroup";
-
+ public static final AttributeKey<Client> NETTY_CLIENT_ATTRIBUTE_KEY = AttributeKey.valueOf("netty.client");
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
index 7cd485c..7c41736 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
@@ -39,7 +39,7 @@ public class ConsumerOffsetManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private static final String TOPIC_GROUP_SEPARATOR = "@";
- private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
+ private ConcurrentMap<String/* Enode@Topic@Group */, ConcurrentMap<Integer, Long>> offsetTable =
new ConcurrentHashMap<>(512);
private transient SnodeController snodeController;
@@ -88,11 +88,10 @@ public class ConsumerOffsetManager {
return result;
}
-
public void commitOffset(final String enodeName, final String clientHost, final String group, final String topic,
final int queueId,
final long offset) {
- // topic@group
+ // Topic@group
String key = buildKey(enodeName, topic, group);
this.commitOffset(clientHost, key, queueId, offset);
}
@@ -101,12 +100,13 @@ public class ConsumerOffsetManager {
ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
if (null == map) {
map = new ConcurrentHashMap<>(32);
+ ConcurrentMap<Integer, Long> prev = this.offsetTable.putIfAbsent(key, map);
+ map = prev != null ? prev : map;
map.put(queueId, offset);
- this.offsetTable.put(key, map);
} else {
Long storeOffset = map.put(queueId, offset);
if (storeOffset != null && offset < storeOffset) {
- log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
+ log.warn("[NOTIFYME]update consumer offset less than store. clientHost: {}, key: {}, queueId: {}, requestOffset: {}, storeOffset: {}", clientHost, key, queueId, offset, storeOffset);
}
}
}
@@ -123,18 +123,6 @@ public class ConsumerOffsetManager {
return -1;
}
- public String encode() {
- return this.encode(false);
- }
-
- public void decode(String jsonString) {
- if (jsonString != null) {
- ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
- if (obj != null) {
- this.offsetTable = obj.offsetTable;
- }
- }
- }
public String encode(final boolean prettyFormat) {
return RemotingSerializable.toJson(this, prettyFormat);
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
index 9f044ec..4cd54d6 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
@@ -126,7 +126,7 @@ public class ConsumerManageProcessor implements RequestProcessor {
(GetConsumerListByGroupRequestHeader) request
.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
- List<String> clientIds = this.snodeController.getConsumerManagerImpl().getAllClientId(requestHeader.getConsumerGroup());
+ List<String> clientIds = this.snodeController.getConsumerManager().getAllClientId(requestHeader.getConsumerGroup());
if (!clientIds.isEmpty()) {
GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody();
body.setConsumerIdList(clientIds);
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
index 55bbe82..3d2c03a 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
@@ -63,47 +63,41 @@ public class HeartbeatProcessor implements RequestProcessor {
private RemotingCommand register(RemotingChannel remotingChannel, RemotingCommand request) {
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
- Channel channel;
- ClientRole role = null;
- Attribute<ClientRole> clientRoleAttribute = null;
+ Channel channel = null;
+ Attribute<Client> clientAttribute = null;
if (remotingChannel instanceof NettyChannelHandlerContextImpl) {
channel = ((NettyChannelHandlerContextImpl) remotingChannel).getChannelHandlerContext().channel();
- clientRoleAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ROLE_ATTRIBUTE_KEY);
+ clientAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ATTRIBUTE_KEY);
}
Client client = new Client();
client.setClientId(heartbeatData.getClientID());
client.setRemotingChannel(remotingChannel);
for (ProducerData producerData : heartbeatData.getProducerDataSet()) {
- role = ClientRole.Producer;
client.setGroupId(producerData.getGroupName());
- client.setClientRole(role);
- this.snodeController.getProducerManagerImpl().register(client);
+ client.setClientRole(ClientRole.Producer);
+ this.snodeController.getProducerManager().register(client);
}
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
client.setGroupId(data.getGroupName());
- role = ClientRole.Consumer;
- client.setClientRole(role);
- boolean channelChanged = this.snodeController.getConsumerManagerImpl().register(client);
+ client.setClientRole(ClientRole.Consumer);
+ boolean channelChanged = this.snodeController.getConsumerManager().register(client);
boolean subscriptionChanged = this.snodeController.getSubscriptionManager().subscribe(data.getGroupName(),
data.getSubscriptionDataSet(),
data.getConsumeType(),
data.getMessageModel(),
data.getConsumeFromWhere());
if (data.getConsumeType() == ConsumeType.CONSUME_PUSH) {
- NettyChannelImpl nettyChannel = new NettyChannelImpl(((NettyChannelHandlerContextImpl)remotingChannel).getChannelHandlerContext().channel());
- this.snodeController.getSubscriptionManager().registerPush(data.getSubscriptionDataSet(), nettyChannel, data.getGroupName());
+ NettyChannelImpl nettyChannel = new NettyChannelImpl(channel);
+ this.snodeController.getSubscriptionManager().registerPushSession(data.getSubscriptionDataSet(), nettyChannel, data.getGroupName());
}
if (subscriptionChanged || channelChanged) {
this.snodeController.getClientService().notifyConsumer(data.getGroupName());
}
}
- if (role != null) {
- log.debug("Set channel attribute value: {}", role);
- clientRoleAttribute.setIfAbsent(role);
- }
+ clientAttribute.setIfAbsent(client);
RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
@@ -118,13 +112,13 @@ public class HeartbeatProcessor implements RequestProcessor {
final String producerGroup = requestHeader.getProducerGroup();
if (producerGroup != null) {
- this.snodeController.getProducerManagerImpl().unRegister(producerGroup, remotingChannel);
+ this.snodeController.getProducerManager().unRegister(producerGroup, remotingChannel);
}
final String consumerGroup = requestHeader.getConsumerGroup();
if (consumerGroup != null) {
- this.snodeController.getConsumerManagerImpl().unRegister(consumerGroup, remotingChannel);
- this.snodeController.getSubscriptionManager().removePush(remotingChannel);
+ this.snodeController.getConsumerManager().unRegister(consumerGroup, remotingChannel);
+ this.snodeController.getSubscriptionManager().removePushSession(remotingChannel);
this.snodeController.getClientService().notifyConsumer(consumerGroup);
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
index f9f6be2..b498fa5 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
@@ -75,7 +75,7 @@ public class SendMessageProcessor implements RequestProcessor {
}
remotingChannel.reply(data);
if (data.getCode() == ResponseCode.SUCCESS && isNeedPush) {
- this.snodeController.getPushService().pushMessage(topic, queueId, message, data);
+ this.snodeController.getPushService().pushMessage(enodeName, topic, queueId, message, data);
}
} else {
if (this.snodeController.getSendMessageInterceptorGroup() != null) {
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/ConsumerOffsetService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/ConsumerOffsetService.java
deleted file mode 100644
index 3699b08..0000000
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/ConsumerOffsetService.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.snode.service;
-public class ConsumerOffsetService {
-
-}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java
index fd61303..db24055 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java
@@ -20,10 +20,8 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface PushService {
- void pushMessage(final String topic, final Integer queueId, final byte[] message,
+ void pushMessage(final String enodeName, final String topic, final Integer queueId, final byte[] message,
final RemotingCommand response);
- void start();
-
void shutdown();
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java
index 5ce42bd..6e49c6a 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java
@@ -42,7 +42,7 @@ public class ClientServiceImpl implements ClientService {
SubscriptionGroupConfig subscriptionGroupConfig = snodeController.getSubscriptionGroupManager().findSubscriptionGroupConfig(group);
boolean notifyConsumer = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
if (notifyConsumer) {
- List<RemotingChannel> remotingChannels = snodeController.getConsumerManagerImpl().getChannels(group);
+ List<RemotingChannel> remotingChannels = snodeController.getConsumerManager().getChannels(group);
if (remotingChannels != null && snodeController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(group).isNotifyConsumerIdsChangedEnable()) {
for (RemotingChannel remotingChannel : remotingChannels) {
NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
index e1b2562..f6a3ac6 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
@@ -58,13 +58,15 @@ public class PushServiceImpl implements PushService {
private final Integer queueId;
private final String topic;
private final RemotingCommand response;
+ private final String enodeName;
public PushTask(final String topic, final Integer queueId, final byte[] message,
- final RemotingCommand response) {
+ final RemotingCommand response, final String enodeName) {
this.message = message;
this.queueId = queueId;
this.topic = topic;
this.response = response;
+ this.enodeName = enodeName;
}
@Override
@@ -79,11 +81,16 @@ public class PushServiceImpl implements PushService {
RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.SNODE_PUSH_MESSAGE, pushMessageHeader);
pushMessage.setBody(message);
Set<RemotingChannel> consumerTable = snodeController.getSubscriptionManager().getPushableChannel(topic, queueId);
- log.info("Push message to consumerTable: {}", consumerTable);
if (consumerTable != null) {
for (RemotingChannel remotingChannel : consumerTable) {
if (remotingChannel.isWritable()) {
- log.info("Push message to remotingChannel: {}", remotingChannel.remoteAddress());
+ boolean slowConsumer = snodeController.getSlowConsumerService().isSlowConsumer(sendMessageResponseHeader.getQueueOffset(), topic, queueId, remotingChannel, enodeName);
+ if (slowConsumer) {
+ log.warn("[SlowConsumer]: {} closed as slow consumer", remotingChannel);//TODO metrics
+ remotingChannel.close();
+ continue;
+ }
+ log.debug("Push message to remotingChannel: {}", remotingChannel.remoteAddress());
snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
} else {
log.warn("Remoting channel is not writable: {}", remotingChannel.remoteAddress());
@@ -107,11 +114,11 @@ public class PushServiceImpl implements PushService {
}
@Override
- public void pushMessage(final String topic, final Integer queueId, final byte[] message,
+ public void pushMessage(final String enodeName, final String topic, final Integer queueId, final byte[] message,
final RemotingCommand response) {
Set<RemotingChannel> pushableChannels = this.snodeController.getSubscriptionManager().getPushableChannel(topic, queueId);
if (pushableChannels != null) {
- PushTask pushTask = new PushTask(topic, queueId, message, response);
+ PushTask pushTask = new PushTask(topic, queueId, message, response, enodeName);
pushMessageExecutorService.submit(pushTask);
} else {
log.info("Topic: {} QueueId: {} no need to push", topic, queueId);
@@ -119,10 +126,6 @@ public class PushServiceImpl implements PushService {
}
@Override
- public void start() {
- }
-
- @Override
public void shutdown() {
this.pushMessageExecutorService.shutdown();
}