You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/02/19 06:25:41 UTC
[rocketmq] branch snode updated: Push MessageExt directly to
consumer
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/snode by this push:
new 3a4e866 Push MessageExt directly to consumer
3a4e866 is described below
commit 3a4e8669aaea6f35dd1c922f17feff57dd38ecb9
Author: duhenglucky <du...@gmail.com>
AuthorDate: Tue Feb 19 14:25:19 2019 +0800
Push MessageExt directly to consumer
---
.../broker/processor/SendMessageProcessor.java | 14 ++-
.../org/apache/rocketmq/common/SnodeConfig.java | 10 ++
.../apache/rocketmq/common/message/Message.java | 2 +-
.../rocketmq/common/message/MessageDecoder.java | 7 +-
.../apache/rocketmq/common/message/MessageExt.java | 24 ++--
.../protocol/header/SendMessageRequestHeader.java | 23 ++++
.../header/SendMessageRequestHeaderV2.java | 27 +++++
.../protocol/header/SendMessageResponseHeader.java | 54 +++++++++
.../rocketmq/example/quickstart/Consumer.java | 2 +-
.../rocketmq/remoting/common/RemotingUtil.java | 14 ++-
.../org/apache/rocketmq/snode/SnodeController.java | 8 --
.../snode/client/impl/SubscriptionManagerImpl.java | 2 +-
.../snode/offset/ConsumerOffsetManager.java | 127 ++++++++++++---------
.../snode/processor/HeartbeatProcessor.java | 1 +
.../snode/processor/SendMessageProcessor.java | 10 +-
.../apache/rocketmq/snode/service/PushService.java | 15 +--
.../snode/service/impl/PushServiceImpl.java | 78 +++++++++----
17 files changed, 302 insertions(+), 116 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 6bb378e..3557a17 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -17,6 +17,9 @@
package org.apache.rocketmq.broker.processor;
import io.netty.channel.ChannelHandlerContext;
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.Map;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
@@ -42,8 +45,8 @@ import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.remoting.RemotingChannel;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.RequestProcessor;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner;
@@ -51,10 +54,6 @@ import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
-import java.net.SocketAddress;
-import java.util.List;
-import java.util.Map;
-
public class SendMessageProcessor extends AbstractSendMessageProcessor implements RequestProcessor {
private List<ConsumeMessageHook> consumeMessageHookList;
@@ -443,7 +442,10 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
responseHeader.setQueueId(queueIdInt);
responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
-
+ responseHeader.setCommitLogOffset(putMessageResult.getAppendMessageResult().getWroteOffset());
+ responseHeader.setStoreTimestamp(putMessageResult.getAppendMessageResult().getStoreTimestamp());
+ responseHeader.setStoreSize(putMessageResult.getAppendMessageResult().getWroteBytes());
+ responseHeader.setStoreHost(ctx.channel().localAddress().toString());
doResponse(ctx, request, response);
if (hasSendMessageHook()) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java
index e866d9c..abe1a57 100644
--- a/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java
@@ -99,6 +99,7 @@ public class SnodeConfig {
private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
private boolean enablePropertyFilter = true;
+ private int loadOffsetInterval = 3000;
/**
* Acl feature switch
*/
@@ -400,4 +401,13 @@ public class SnodeConfig {
public void setMetricsEnable(boolean metricsEnable) {
this.metricsEnable = metricsEnable;
}
+
+ public int getLoadOffsetInterval() {
+ return loadOffsetInterval;
+ }
+
+ public void setLoadOffsetInterval(int loadOffsetInterval) {
+ this.loadOffsetInterval = loadOffsetInterval;
+ }
+
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/Message.java b/common/src/main/java/org/apache/rocketmq/common/message/Message.java
index 287be13..82e65c2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/Message.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/Message.java
@@ -181,7 +181,7 @@ public class Message implements Serializable {
return properties;
}
- void setProperties(Map<String, String> properties) {
+ public void setProperties(Map<String, String> properties) {
this.properties = properties;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
index af0b638..5b6ddca 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
@@ -16,9 +16,6 @@
*/
package org.apache.rocketmq.common.message;
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.sysflag.MessageSysFlag;
-
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@@ -29,6 +26,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
public class MessageDecoder {
public final static int MSG_ID_LENGTH = 8 + 8;
@@ -41,7 +40,7 @@ public class MessageDecoder {
public final static int MESSAGE_MAGIC_CODE = -626843481;
public static final char NAME_VALUE_SEPARATOR = 1;
public static final char PROPERTY_SEPARATOR = 2;
- public static final int PHY_POS_POSITION = 4 + 4 + 4 + 4 + 4 + 8;
+ public static final int PHY_POS_POSITION = 4 + 4 + 4 + 4 + 4 + 8;
public static final int BODY_SIZE_POSITION = 4 // 1 TOTALSIZE
+ 4 // 2 MAGICCODE
+ 4 // 3 BODYCRC
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java
index 3f77767..69b0a25 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java
@@ -215,13 +215,21 @@ public class MessageExt extends Message {
this.preparedTransactionOffset = preparedTransactionOffset;
}
- @Override
- public String toString() {
- return "MessageExt [queueId=" + queueId + ", storeSize=" + storeSize + ", queueOffset=" + queueOffset
- + ", sysFlag=" + sysFlag + ", bornTimestamp=" + bornTimestamp + ", bornHost=" + bornHost
- + ", storeTimestamp=" + storeTimestamp + ", storeHost=" + storeHost + ", msgId=" + msgId
- + ", commitLogOffset=" + commitLogOffset + ", bodyCRC=" + bodyCRC + ", reconsumeTimes="
- + reconsumeTimes + ", preparedTransactionOffset=" + preparedTransactionOffset
- + ", toString()=" + super.toString() + "]";
+ @Override public String toString() {
+ return "MessageExt{" +
+ "queueId=" + queueId +
+ ", storeSize=" + storeSize +
+ ", queueOffset=" + queueOffset +
+ ", sysFlag=" + sysFlag +
+ ", bornTimestamp=" + bornTimestamp +
+ ", bornHost=" + bornHost +
+ ", storeTimestamp=" + storeTimestamp +
+ ", storeHost=" + storeHost +
+ ", msgId='" + msgId + '\'' +
+ ", commitLogOffset=" + commitLogOffset +
+ ", bodyCRC=" + bodyCRC +
+ ", reconsumeTimes=" + reconsumeTimes +
+ ", preparedTransactionOffset=" + preparedTransactionOffset +
+ '}';
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
index bab833b..84ab10d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
@@ -20,6 +20,7 @@
*/
package org.apache.rocketmq.common.protocol.header;
+import java.net.SocketAddress;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
@@ -55,6 +56,10 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
private String enodeName;
+ private SocketAddress bornHost;
+
+ private SocketAddress snodeHost;
+
@Override
public void checkFields() throws RemotingCommandException {
}
@@ -171,6 +176,22 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
this.enodeName = enodeName;
}
+ public SocketAddress getBornHost() {
+ return bornHost;
+ }
+
+ public void setBornHost(SocketAddress bornHost) {
+ this.bornHost = bornHost;
+ }
+
+ public SocketAddress getSnodeHost() {
+ return snodeHost;
+ }
+
+ public void setSnodeHost(SocketAddress snodeHost) {
+ this.snodeHost = snodeHost;
+ }
+
@Override public String toString() {
return "SendMessageRequestHeader{" +
"producerGroup='" + producerGroup + '\'' +
@@ -187,6 +208,8 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
", batch=" + batch +
", maxReconsumeTimes=" + maxReconsumeTimes +
", enodeName='" + enodeName + '\'' +
+ ", bornHost=" + bornHost +
+ ", snodeHost=" + snodeHost +
'}';
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
index 10dce94..3260c25 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.common.protocol.header;
+import java.net.SocketAddress;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
@@ -56,6 +57,10 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
private String n; //enode name
+ private SocketAddress o; //born host
+
+ private SocketAddress p; //snode host
+
public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) {
SendMessageRequestHeader v1 = new SendMessageRequestHeader();
v1.setProducerGroup(v2.a);
@@ -72,6 +77,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
v1.setMaxReconsumeTimes(v2.l);
v1.setBatch(v2.m);
v1.setEnodeName(v2.n);
+ v1.setBornHost(v2.getO());
+ v1.setSnodeHost(v2.getP());
return v1;
}
@@ -91,6 +98,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
v2.l = v1.getMaxReconsumeTimes();
v2.m = v1.isBatch();
v2.n = v1.getEnodeName();
+ v2.o = v1.getBornHost();
+ v2.p = v1.getSnodeHost();
return v2;
}
@@ -210,6 +219,22 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
this.n = n;
}
+ public SocketAddress getO() {
+ return o;
+ }
+
+ public void setO(SocketAddress o) {
+ this.o = o;
+ }
+
+ public SocketAddress getP() {
+ return p;
+ }
+
+ public void setP(SocketAddress p) {
+ this.p = p;
+ }
+
@Override public String toString() {
return "SendMessageRequestHeaderV2{" +
"a='" + a + '\'' +
@@ -226,6 +251,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
", l=" + l +
", m=" + m +
", n='" + n + '\'' +
+ ", o=" + o +
+ ", p=" + p +
'}';
}
}
\ No newline at end of file
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java
index 6834881..a287e3f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java
@@ -31,8 +31,17 @@ public class SendMessageResponseHeader implements CommandCustomHeader {
private Integer queueId;
@CFNotNull
private Long queueOffset;
+
private String transactionId;
+ private long storeTimestamp;
+
+ private String storeHost;
+
+ private long commitLogOffset;
+
+ private int storeSize;
+
@Override
public void checkFields() throws RemotingCommandException {
}
@@ -68,4 +77,49 @@ public class SendMessageResponseHeader implements CommandCustomHeader {
public void setTransactionId(String transactionId) {
this.transactionId = transactionId;
}
+
+ public long getStoreTimestamp() {
+ return storeTimestamp;
+ }
+
+ public void setStoreTimestamp(long storeTimestamp) {
+ this.storeTimestamp = storeTimestamp;
+ }
+
+ public long getCommitLogOffset() {
+ return commitLogOffset;
+ }
+
+ public void setCommitLogOffset(long commitLogOffset) {
+ this.commitLogOffset = commitLogOffset;
+ }
+
+ public int getStoreSize() {
+ return storeSize;
+ }
+
+ public void setStoreSize(int storeSize) {
+ this.storeSize = storeSize;
+ }
+
+ public String getStoreHost() {
+ return storeHost;
+ }
+
+ public void setStoreHost(String storeHost) {
+ this.storeHost = storeHost;
+ }
+
+ @Override public String toString() {
+ return "SendMessageResponseHeader{" +
+ "msgId='" + msgId + '\'' +
+ ", queueId=" + queueId +
+ ", queueOffset=" + queueOffset +
+ ", transactionId='" + transactionId + '\'' +
+ ", storeTimestamp=" + storeTimestamp +
+ ", storeHost=" + storeHost +
+ ", commitLogOffset=" + commitLogOffset +
+ ", storeSize=" + storeSize +
+ '}';
+ }
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
index c85ed9c..df3b3b0 100644
--- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
@@ -61,7 +61,7 @@ public class Consumer {
/*
* Register callback to execute on arrival of messages fetched from brokers.
*/
- consumer.setNamesrvAddr("47.102.149.193:9876");
+// consumer.setNamesrvAddr("47.102.149.193:9876");
consumer.registerMessageListener(new MessageListenerConcurrently() {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
index c053d8f..a118c40 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
@@ -31,7 +31,6 @@ import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayList;
import java.util.Enumeration;
-
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -154,6 +153,19 @@ public class RemotingUtil {
return isa;
}
+ public static SocketAddress string2SocketAddressWithIp(final String address) {
+ String[] s = address.split(":");
+ try {
+ String ip = s[0].substring(1);
+ InetAddress inetAddress = InetAddress.getByName(ip);
+ InetSocketAddress isa = new InetSocketAddress(inetAddress, Integer.parseInt(s[1]));
+ return isa;
+ } catch (Exception e) {
+ log.error("Failed to obtain address", e);
+ }
+ return null;
+ }
+
public static String socketAddress2String(final SocketAddress addr) {
StringBuilder sb = new StringBuilder();
InetSocketAddress inetSocketAddress = (InetSocketAddress) addr;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
index e3ed0e7..d133b74 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
@@ -171,14 +171,6 @@ public class SnodeController {
"SnodeHeartbeatThread",
true);
-// this.consumerManagerExecutor = ThreadUtils.newThreadPoolExecutor(
-// snodeConfig.getSnodeSendMessageMinPoolSize(),
-// snodeConfig.getSnodeSendMessageMaxPoolSize(),
-// 3000,
-// TimeUnit.MILLISECONDS,
-// new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
-// "SnodePullMessageThread",
-// false);
this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(),
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
index 72cfd6a..be337da 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
@@ -56,7 +56,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
Set<RemotingChannel> prev = pushTable.putIfAbsent(messageQueue, clientSet);
clientSet = prev != null ? prev : clientSet;
}
- log.debug("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress());
+ log.info("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress());
clientSet.add(remotingChannel);
}
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
index e4ed77e..d9b23b4 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
@@ -16,16 +16,13 @@
*/
package org.apache.rocketmq.snode.offset;
-import java.util.Iterator;
-import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
@@ -37,10 +34,10 @@ public class ConsumerOffsetManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private static final String TOPIC_GROUP_SEPARATOR = "@";
- private ConcurrentMap<String/* Enode@Topic@Group */, ConcurrentMap<Integer, Long>> offsetTable =
+ private ConcurrentMap<String/* Enode@Topic@Group */, ConcurrentMap<Integer, CacheOffset>> offsetTable =
new ConcurrentHashMap<>(512);
- private transient SnodeController snodeController;
+ private SnodeController snodeController;
public ConsumerOffsetManager(SnodeController brokerController) {
this.snodeController = brokerController;
@@ -57,74 +54,63 @@ public class ConsumerOffsetManager {
return sb.toString();
}
- private boolean offsetBehindMuchThanData(final String enodeName, final String topic,
- ConcurrentMap<Integer, Long> table) throws InterruptedException, RemotingTimeoutException,
- RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
- Iterator<Entry<Integer, Long>> it = table.entrySet().iterator();
- boolean result = !table.isEmpty();
-
- while (it.hasNext() && result) {
- Entry<Integer, Long> next = it.next();
- RemotingCommand remotingCommand = this.snodeController.getEnodeService().getMinOffsetInQueue(enodeName, topic, next.getKey());
- long minOffsetInStore = 0;
- if (remotingCommand != null) {
- switch (remotingCommand.getCode()) {
- case ResponseCode.SUCCESS: {
- GetMinOffsetResponseHeader responseHeader =
- (GetMinOffsetResponseHeader) remotingCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
- minOffsetInStore = responseHeader.getOffset();
- }
- default:
- break;
- }
- } else {
- throw new SnodeException(ResponseCode.QUERY_OFFSET_ERROR, "Query min offset error!");
- }
- long offsetInPersist = next.getValue();
- result = offsetInPersist <= minOffsetInStore;
- }
- return result;
- }
-
public void cacheOffset(final String enodeName, final String clientHost, final String group, final String topic,
final int queueId,
final long offset) {
- // Topic@group
+ // EnodeName@Topic@group
String key = buildKey(enodeName, topic, group);
this.commitOffset(clientHost, key, queueId, offset);
}
private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
- ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
+ ConcurrentMap<Integer, CacheOffset> map = this.offsetTable.get(key);
+ CacheOffset cacheOffset = new CacheOffset(key, offset, System.currentTimeMillis());
if (null == map) {
map = new ConcurrentHashMap<>(32);
- ConcurrentMap<Integer, Long> prev = this.offsetTable.putIfAbsent(key, map);
+ ConcurrentMap<Integer, CacheOffset> prev = this.offsetTable.putIfAbsent(key, map);
map = prev != null ? prev : map;
- map.put(queueId, offset);
+ map.put(queueId, cacheOffset);
} else {
- Long storeOffset = map.put(queueId, offset);
- if (storeOffset != null && offset < storeOffset) {
- log.warn("[NOTIFYME]update consumer offset less than store. clientHost: {}, key: {}, queueId: {}, requestOffset: {}, storeOffset: {}", clientHost, key, queueId, offset, storeOffset);
+ CacheOffset storeOffset = map.put(queueId, cacheOffset);
+ if (storeOffset != null && offset < storeOffset.getOffset()) {
+ log.warn("[NOTIFYME]update consumer offset less than store. clientHost: {}, key: {}, queueId: {}, requestOffset: {}, storeOffset: {}",
+ clientHost, key, queueId, offset, storeOffset);
}
}
}
- public long queryCacheOffset(final String enodeName, final String group, final String topic, final int queueId) {
- String key = buildKey(enodeName, topic, group);
- ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
- if (null != map) {
- Long offset = map.get(queueId);
- if (offset != null)
- return offset;
+ private long parserOffset(final String enodeName, final String group, final String topic, final int queueId) {
+ try {
+ RemotingCommand remotingCommand = queryOffset(enodeName, group, topic, queueId);
+ QueryConsumerOffsetResponseHeader responseHeader =
+ (QueryConsumerOffsetResponseHeader) remotingCommand.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
+ return responseHeader.getOffset();
+ } catch (Exception ex) {
+ log.error("Load offset from broker error", ex);
}
-
return -1;
}
- public ConcurrentMap<String, ConcurrentMap<Integer, Long>> getOffsetTable() {
- return offsetTable;
- }
+ public long queryCacheOffset(final String enodeName, final String group, final String topic, final int queueId) {
+ String key = buildKey(enodeName, topic, group);
+ ConcurrentMap<Integer, CacheOffset> map = this.offsetTable.get(key);
+ if (map == null) {
+ map = new ConcurrentHashMap<>();
+ map = this.offsetTable.putIfAbsent(key, map);
+ }
+ CacheOffset cacheOffset = map.get(queueId);
+ if (cacheOffset != null) {
+ if (System.currentTimeMillis() - cacheOffset.getUpdateTimestamp() > snodeController.getSnodeConfig().getLoadOffsetInterval()) {
+ cacheOffset.setOffset(parserOffset(enodeName, group, topic, queueId));
+ cacheOffset.setUpdateTimestamp(System.currentTimeMillis());
+ }
+ } else {
+ cacheOffset = new CacheOffset(key, parserOffset(enodeName, group, topic, queueId), System.currentTimeMillis());
+ map.put(queueId, cacheOffset);
+ }
+ return cacheOffset.getOffset();
+ }
public void commitOffset(final String enodeName, final String clientHost, final String group, final String topic,
final int queueId,
@@ -139,4 +125,39 @@ public class ConsumerOffsetManager {
return this.snodeController.getEnodeService().loadOffset(enodeName, group, topic, queueId);
}
+ public class CacheOffset {
+ private String key;
+ private long offset;
+ private long updateTimestamp;
+
+ public CacheOffset(final String key, final long offset, final long updateTimestamp) {
+ this.key = key;
+ this.offset = offset;
+ this.updateTimestamp = updateTimestamp;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public void setKey(String key) {
+ this.key = key;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+
+ public long getUpdateTimestamp() {
+ return updateTimestamp;
+ }
+
+ public void setUpdateTimestamp(long updateTimestamp) {
+ this.updateTimestamp = updateTimestamp;
+ }
+ }
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
index a36704c..08e342e 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
@@ -71,6 +71,7 @@ public class HeartbeatProcessor implements RequestProcessor {
private RemotingCommand register(RemotingChannel remotingChannel, RemotingCommand request) {
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
+ log.info("heartbeatData: {}", heartbeatData);
Channel channel = null;
Attribute<Client> clientAttribute = null;
if (remotingChannel instanceof NettyChannelHandlerContextImpl) {
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
index ff13aea..cadc4d7 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
@@ -22,6 +22,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -73,6 +74,7 @@ public class SendMessageProcessor implements RequestProcessor {
request.getCode() == RequestCode.SEND_BATCH_MESSAGE) {
sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
enodeName = sendMessageRequestHeaderV2.getN();
+ sendMessageRequestHeaderV2.setP(remotingChannel.localAddress());
stringBuffer.append(sendMessageRequestHeaderV2.getB());
} else {
isSendBack = true;
@@ -82,9 +84,12 @@ public class SendMessageProcessor implements RequestProcessor {
}
CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().sendMessage(enodeName, request);
- final Integer queueId = sendMessageRequestHeaderV2.getE();
+
+ sendMessageRequestHeaderV2.setO(remotingChannel.remoteAddress());
final byte[] message = request.getBody();
final boolean isNeedPush = !isSendBack;
+ final SendMessageRequestHeader sendMessageRequestHeader =
+ SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(sendMessageRequestHeaderV2);
responseFuture.whenComplete((data, ex) -> {
if (ex == null) {
if (this.snodeController.getSendMessageInterceptorGroup() != null) {
@@ -94,7 +99,8 @@ public class SendMessageProcessor implements RequestProcessor {
remotingChannel.reply(data);
this.snodeController.getMetricsService().recordRequestSize(stringBuffer.toString(), request.getBody().length);
if (data.getCode() == ResponseCode.SUCCESS && isNeedPush) {
- this.snodeController.getPushService().pushMessage(enodeName, stringBuffer.toString(), queueId, message, data);
+ log.info("Send message response: {}", data);
+ this.snodeController.getPushService().pushMessage(sendMessageRequestHeader, message, data);
}
} else {
this.snodeController.getMetricsService().incRequestCount(request.getCode(), false);
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java
index ad1cab3..597afee 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java
@@ -16,19 +16,20 @@
*/
package org.apache.rocketmq.snode.service;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface PushService {
/**
- * TODO how to resolve the slow consumer: close or ignore?
+ * Push message to consumer which subscribed target {@link MessageQueue}
+ * <p>
*
- * @param enodeName
- * @param topic
- * @param queueId
- * @param message
- * @param response
+ * @param requestHeader Send message request header
+ * @param message Message body
+ * @param response Send message response
*/
- void pushMessage(final String enodeName, final String topic, final Integer queueId, final byte[] message,
+ void pushMessage(final SendMessageRequestHeader requestHeader, final byte[] message,
final RemotingCommand response);
void shutdown();
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
index da6e9e5..37bb9f6 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
@@ -23,15 +23,20 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.PushMessageHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
@@ -61,34 +66,58 @@ public class PushServiceImpl implements PushService {
public class PushTask implements Runnable {
private AtomicBoolean canceled = new AtomicBoolean(false);
private final byte[] message;
- private final Integer queueId;
- private final String topic;
- private final RemotingCommand response;
- private final String enodeName;
+ private final RemotingCommand sendMessageResponse;
+ private final SendMessageRequestHeader sendMessageRequestHeader;
- public PushTask(final String topic, final Integer queueId, final byte[] message,
- final RemotingCommand response, final String enodeName) {
+ public PushTask(final SendMessageRequestHeader sendMessageRequestHeader, final byte[] message,
+ final RemotingCommand response) {
this.message = message;
- this.queueId = queueId;
- this.topic = topic;
- this.response = response;
- this.enodeName = enodeName;
+ this.sendMessageRequestHeader = sendMessageRequestHeader;
+ this.sendMessageResponse = response;
+ }
+
+ private MessageExt buildMessageExt(final SendMessageResponseHeader sendMessageResponseHeader,
+ final byte[] message, final SendMessageRequestHeader sendMessageRequestHeader) {
+ MessageExt messageExt = new MessageExt();
+ messageExt.setProperties(MessageDecoder.string2messageProperties(sendMessageRequestHeader.getProperties()));
+ messageExt.setTopic(sendMessageRequestHeader.getTopic());
+ messageExt.setMsgId(sendMessageResponseHeader.getMsgId());
+ messageExt.setQueueId(sendMessageResponseHeader.getQueueId());
+ messageExt.setQueueOffset(sendMessageResponseHeader.getQueueOffset());
+ messageExt.setReconsumeTimes(sendMessageRequestHeader.getReconsumeTimes());
+ messageExt.setCommitLogOffset(sendMessageResponseHeader.getCommitLogOffset());
+ messageExt.setBornTimestamp(sendMessageRequestHeader.getBornTimestamp());
+ messageExt.setBornHost(sendMessageRequestHeader.getBornHost());
+// messageExt.setStoreSize(sendMessageResponseHeader.getStoreSize());
+ messageExt.setStoreHost(RemotingUtil.string2SocketAddressWithIp(sendMessageResponseHeader.getStoreHost()));
+ messageExt.setStoreTimestamp(sendMessageResponseHeader.getStoreTimestamp());
+ messageExt.setWaitStoreMsgOK(false);
+ messageExt.setSnodeAddress(sendMessageRequestHeader.getSnodeHost());
+ messageExt.setSysFlag(sendMessageRequestHeader.getSysFlag());
+ messageExt.setFlag(sendMessageRequestHeader.getFlag());
+ messageExt.setBody(message);
+ messageExt.setBodyCRC(UtilAll.crc32(message));
+ log.info("MessageExt:{}", messageExt);
+ return messageExt;
}
@Override
public void run() {
if (!canceled.get()) {
try {
- SendMessageResponseHeader sendMessageResponseHeader = (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
- PushMessageHeader pushMessageHeader = new PushMessageHeader();
- pushMessageHeader.setQueueOffset(sendMessageResponseHeader.getQueueOffset());
- pushMessageHeader.setTopic(topic);
- pushMessageHeader.setQueueId(queueId);
- RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.SNODE_PUSH_MESSAGE, pushMessageHeader);
- pushMessage.setBody(message);
- MessageQueue messageQueue = new MessageQueue(topic, enodeName, queueId);
+ log.info("sendMessageResponse:{}", sendMessageResponse);
+ SendMessageResponseHeader sendMessageResponseHeader = (SendMessageResponseHeader) sendMessageResponse.decodeCommandCustomHeader(SendMessageResponseHeader.class);
+ log.info("sendMessageResponseHeader:{}", sendMessageResponseHeader);
+ MessageQueue messageQueue = new MessageQueue(sendMessageRequestHeader.getTopic(), sendMessageRequestHeader.getEnodeName(), sendMessageRequestHeader.getQueueId());
Set<RemotingChannel> consumerTable = snodeController.getSubscriptionManager().getPushableChannel(messageQueue);
if (consumerTable != null) {
+ PushMessageHeader pushMessageHeader = new PushMessageHeader();
+ pushMessageHeader.setQueueOffset(sendMessageResponseHeader.getQueueOffset());
+ pushMessageHeader.setTopic(sendMessageRequestHeader.getTopic());
+ pushMessageHeader.setQueueId(sendMessageResponseHeader.getQueueId());
+ RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.SNODE_PUSH_MESSAGE, pushMessageHeader);
+ MessageExt messageExt = buildMessageExt(sendMessageResponseHeader, message, sendMessageRequestHeader);
+ pushMessage.setBody(MessageDecoder.encode(messageExt, false));
for (RemotingChannel remotingChannel : consumerTable) {
Client client = null;
if (remotingChannel instanceof NettyChannelImpl) {
@@ -101,13 +130,14 @@ public class PushServiceImpl implements PushService {
if (client != null) {
for (String consumerGroup : client.getGroups()) {
Subscription subscription = snodeController.getSubscriptionManager().getSubscription(consumerGroup);
- if (subscription.getSubscriptionData(topic) != null) {
- boolean slowConsumer = snodeController.getSlowConsumerService().isSlowConsumer(sendMessageResponseHeader.getQueueOffset(), topic, queueId, consumerGroup, enodeName);
+ if (subscription.getSubscriptionData(sendMessageRequestHeader.getTopic()) != null) {
+ boolean slowConsumer = snodeController.getSlowConsumerService().isSlowConsumer(sendMessageResponseHeader.getQueueOffset(), sendMessageRequestHeader.getTopic(), sendMessageRequestHeader.getQueueId(), consumerGroup, sendMessageRequestHeader.getEnodeName());
if (slowConsumer) {
log.warn("[SlowConsumer]: {} is slow consumer", remotingChannel);
snodeController.getSlowConsumerService().slowConsumerResolve(pushMessage, remotingChannel);
continue;
}
+ pushMessageHeader.setConsumerGroup(consumerGroup);
snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
}
}
@@ -119,10 +149,10 @@ public class PushServiceImpl implements PushService {
log.info("No online registered as push consumer and online for messageQueue: {} ", messageQueue);
}
} catch (Exception ex) {
- log.warn("Push message to topic: {} queueId: {}", topic, queueId, ex);
+ log.warn("Push message to topic: {} queueId: {}", sendMessageRequestHeader.getTopic(), sendMessageRequestHeader.getQueueId(), ex);
}
} else {
- log.info("Push message to topic: {} queueId: {} canceled!", topic, queueId);
+ log.info("Push message to topic: {} queueId: {} canceled!", sendMessageRequestHeader.getTopic(), sendMessageRequestHeader.getQueueId());
}
}
@@ -133,9 +163,9 @@ public class PushServiceImpl implements PushService {
}
@Override
- public void pushMessage(final String enodeName, final String topic, final Integer queueId, final byte[] message,
+ public void pushMessage(final SendMessageRequestHeader requestHeader, final byte[] message,
final RemotingCommand response) {
- PushTask pushTask = new PushTask(topic, queueId, message, response, enodeName);
+ PushTask pushTask = new PushTask(requestHeader, message, response);
pushMessageExecutorService.submit(pushTask);
}