You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/02/19 08:26:59 UTC
[rocketmq] branch snode updated: Fix the push subscription not
update issue
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/snode by this push:
new 28a8d64 Fix the push subscription not update issue
new 8e12495 Merge branch 'snode' of github.com:apache/rocketmq into snode
28a8d64 is described below
commit 28a8d649fa7a734831d28e01407fc37c535f08df
Author: duhenglucky <du...@gmail.com>
AuthorDate: Tue Feb 19 16:26:02 2019 +0800
Fix the push subscription not update issue
---
.../common/protocol/header/PushMessageHeader.java | 21 +++++++++++++++++++++
.../snode/client/impl/SubscriptionManagerImpl.java | 7 ++-----
.../snode/service/impl/PushServiceImpl.java | 7 ++++---
3 files changed, 27 insertions(+), 8 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PushMessageHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PushMessageHeader.java
index 40277eb..92285e1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PushMessageHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PushMessageHeader.java
@@ -36,6 +36,8 @@ public class PushMessageHeader implements CommandCustomHeader {
@CFNotNull
private String consumerGroup;
+ private String enodeName;
+
@Override
public void checkFields() throws RemotingCommandException {
@@ -80,4 +82,23 @@ public class PushMessageHeader implements CommandCustomHeader {
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
+
+ public String getEnodeName() {
+ return enodeName;
+ }
+
+ public void setEnodeName(String enodeName) {
+ this.enodeName = enodeName;
+ }
+
+ @Override public String toString() {
+ return "PushMessageHeader{" +
+ "queueOffset=" + queueOffset +
+ ", messageId='" + messageId + '\'' +
+ ", queueId=" + queueId +
+ ", topic='" + topic + '\'' +
+ ", consumerGroup='" + consumerGroup + '\'' +
+ ", enodeName='" + enodeName + '\'' +
+ '}';
+ }
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
index 22e245a..c75c224 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
@@ -44,7 +44,6 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
@Override
public void registerPushSession(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel,
String groupId) {
- log.info("Register push session subscriptionDataSet: {}", subscriptionDataSet);
Set<MessageQueue> prevSubSet = this.clientSubscriptionTable.get(remotingChannel);
Set<MessageQueue> keySet = new HashSet<>();
for (SubscriptionData subscriptionData : subscriptionDataSet) {
@@ -57,15 +56,14 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
Set<RemotingChannel> prev = pushTable.putIfAbsent(messageQueue, clientSet);
clientSet = prev != null ? prev : clientSet;
}
- log.info("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress());
+ log.debug("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress());
clientSet.add(remotingChannel);
}
}
}
if (keySet.size() > 0) {
- this.clientSubscriptionTable.putIfAbsent(remotingChannel, keySet);
+ this.clientSubscriptionTable.put(remotingChannel, keySet);
}
- log.info("Register push session clientSubscriptionTable: {}", clientSubscriptionTable);
if (prevSubSet != null) {
for (MessageQueue messageQueue : prevSubSet) {
if (!keySet.contains(messageQueue)) {
@@ -77,7 +75,6 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
}
}
}
- log.info("Register push session clientSubscriptionTable: {}", clientSubscriptionTable);
}
@Override
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
index 8a81c4c..72f658e 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
@@ -95,7 +95,7 @@ public class PushServiceImpl implements PushService {
messageExt.setFlag(sendMessageRequestHeader.getFlag());
messageExt.setBody(message);
messageExt.setBodyCRC(UtilAll.crc32(message));
- log.info("MessageExt:{}", messageExt);
+ log.debug("MessageExt:{}", messageExt);
return messageExt;
}
@@ -103,9 +103,9 @@ public class PushServiceImpl implements PushService {
public void run() {
if (!canceled.get()) {
try {
- log.info("sendMessageResponse:{}", sendMessageResponse);
+ log.debug("sendMessageResponse: {}", sendMessageResponse);
SendMessageResponseHeader sendMessageResponseHeader = (SendMessageResponseHeader) sendMessageResponse.decodeCommandCustomHeader(SendMessageResponseHeader.class);
- log.info("sendMessageResponseHeader:{}", sendMessageResponseHeader);
+ log.debug("sendMessageResponseHeader: {}", sendMessageResponseHeader);
MessageQueue messageQueue = new MessageQueue(sendMessageRequestHeader.getTopic(), sendMessageRequestHeader.getEnodeName(), sendMessageRequestHeader.getQueueId());
Set<RemotingChannel> consumerTable = snodeController.getSubscriptionManager().getPushableChannel(messageQueue);
if (consumerTable != null) {
@@ -113,6 +113,7 @@ public class PushServiceImpl implements PushService {
pushMessageHeader.setQueueOffset(sendMessageResponseHeader.getQueueOffset());
pushMessageHeader.setTopic(sendMessageRequestHeader.getTopic());
pushMessageHeader.setQueueId(sendMessageResponseHeader.getQueueId());
+ pushMessageHeader.setEnodeName(sendMessageRequestHeader.getEnodeName());
RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.SNODE_PUSH_MESSAGE, pushMessageHeader);
MessageExt messageExt = buildMessageExt(sendMessageResponseHeader, message, sendMessageRequestHeader);
pushMessage.setBody(MessageDecoder.encode(messageExt, false));