You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/02/19 09:30:47 UTC
[rocketmq] branch snode updated: Polish push message process in
client(add broker name)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/snode by this push:
new 3c264c8 Polish push message process in client(add broker name)
3c264c8 is described below
commit 3c264c8f2cf9c9672a26e5b9967dcd10f2f23fba
Author: ShannonDing <li...@163.com>
AuthorDate: Tue Feb 19 17:30:24 2019 +0800
Polish push message process in client(add broker name)
---
.../client/impl/ClientRemotingProcessor.java | 2 +-
.../impl/consumer/DefaultMQPushConsumerImpl.java | 36 +++++++++++++---------
.../client/impl/factory/MQClientInstance.java | 6 +++-
3 files changed, 28 insertions(+), 16 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
index c8c3919..f3cfff7 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
@@ -218,12 +218,12 @@ public class ClientRemotingProcessor implements RequestProcessor {
final PushMessageHeader requestHeader =
(PushMessageHeader) request
.decodeCommandCustomHeader(PushMessageHeader.class);
-
final MessageExt msg = MessageDecoder.decode(ByteBuffer.wrap(request.getBody()));
boolean result =
this.mqClientFactory.processSnodePushMessage(msg,
requestHeader.getConsumerGroup(),
requestHeader.getTopic(),
+ requestHeader.getEnodeName(),
requestHeader.getQueueId(),
requestHeader.getQueueOffset());
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 000d569..f5251f5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -308,12 +308,13 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
//Update local offset according remote offset
- String localOffsetKey = pullRequest.getConsumerGroup()
- + "@" + pullRequest.getMessageQueue().getTopic()
- + "@" + pullRequest.getMessageQueue().getQueueId();
+ String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
+ pullRequest.getMessageQueue().getTopic(),
+ pullRequest.getMessageQueue().getBrokerName(),
+ pullRequest.getMessageQueue().getQueueId());
AtomicLong localOffset = localConsumerOffset.get(localOffsetKey);
if (localOffset == null) {
- localConsumerOffset.put(localOffsetKey, new AtomicLong(-1));
+ localConsumerOffset.putIfAbsent(localOffsetKey, new AtomicLong(-1));
}
localConsumerOffset.get(localOffsetKey).set(pullResult.getNextBeginOffset());
@@ -474,9 +475,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
- String localOffsetKey = pullRequest.getConsumerGroup()
- + "@" + pullRequest.getMessageQueue().getTopic()
- + "@" + pullRequest.getMessageQueue().getQueueId();
+ String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
+ pullRequest.getMessageQueue().getTopic(),
+ pullRequest.getMessageQueue().getBrokerName(),
+ pullRequest.getMessageQueue().getQueueId());
if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) {
//Stop pull request
log.info("Stop pull request, {}", localOffsetKey);
@@ -498,9 +500,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
- String localOffsetKey = pullRequest.getConsumerGroup()
- + "@" + pullRequest.getMessageQueue().getTopic()
- + "@" + pullRequest.getMessageQueue().getQueueId();
+ String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
+ pullRequest.getMessageQueue().getTopic(),
+ pullRequest.getMessageQueue().getBrokerName(),
+ pullRequest.getMessageQueue().getQueueId());
if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) {
//Stop pull request
log.info("Stop pull request, {}", localOffsetKey);
@@ -1197,21 +1200,23 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public boolean processPushMessage(final MessageExt msg,
final String consumerGroup,
final String topic,
+ final String brokerName,
final int queueID,
final long offset) {
- String localOffsetKey = consumerGroup + "@" + topic + "@" + queueID;
+ String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
AtomicLong localOffset = this.localConsumerOffset.get(localOffsetKey);
if (localOffset == null) {
log.info("Current Local offset have not set, initiallized to -1.");
this.localConsumerOffset.put(localOffsetKey, new AtomicLong(-1));
return false;
}
- if (localOffset.get() < offset) {
+ if (localOffset.get() + 1 < offset) {
//should start pull message process
log.debug("Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
return false;
} else {
//Stop pull request
+ log.debug("Process Push : Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
if (pullStop == null) {
this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
@@ -1233,10 +1238,13 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
processQueue = processQueues.get(localOffsetKey);
}
processQueue.putMessage(messageExtList);
- MessageQueue messageQueue = new MessageQueue(topic, "", queueID);
+ MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueID);
this.consumeMessageService.submitConsumeRequest(messageExtList, processQueue, messageQueue, true);
- log.info(".......submitConsumeRequest:{},Offset:{}...", localOffsetKey, offset);
}
return true;
}
+
+ private String genLocalOffsetKey(String consumerGroup, String topic, String brokerName, int queueID) {
+ return consumerGroup + "@" + topic + "@" + brokerName + "@" + queueID;
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 4a81c3b..0953fdc 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -1376,15 +1376,19 @@ public class MQClientInstance {
public ClientConfig getNettyClientConfig() {
return nettyClientConfig;
}
+
public boolean processSnodePushMessage(final MessageExt msg,
final String consumerGroup,
final String topic,
+ final String brokerName,
final int queueID,
final long offset) {
+ log.debug("Recieve:processSnodePushMessage :{}-{}-{}-{}-{}",
+ consumerGroup, topic, brokerName, queueID, offset);
MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
if (null != mqConsumerInner) {
DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) mqConsumerInner;
- consumer.processPushMessage(msg,consumerGroup,topic,queueID,offset);
+ consumer.processPushMessage(msg, consumerGroup, topic, brokerName, queueID, offset);
return true;
}