You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/02/19 06:25:41 UTC

[rocketmq] branch snode updated: Push MessageExt directly to consumer

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/snode by this push:
     new 3a4e866  Push MessageExt directly to consumer
3a4e866 is described below

commit 3a4e8669aaea6f35dd1c922f17feff57dd38ecb9
Author: duhenglucky <du...@gmail.com>
AuthorDate: Tue Feb 19 14:25:19 2019 +0800

    Push MessageExt directly to consumer
---
 .../broker/processor/SendMessageProcessor.java     |  14 ++-
 .../org/apache/rocketmq/common/SnodeConfig.java    |  10 ++
 .../apache/rocketmq/common/message/Message.java    |   2 +-
 .../rocketmq/common/message/MessageDecoder.java    |   7 +-
 .../apache/rocketmq/common/message/MessageExt.java |  24 ++--
 .../protocol/header/SendMessageRequestHeader.java  |  23 ++++
 .../header/SendMessageRequestHeaderV2.java         |  27 +++++
 .../protocol/header/SendMessageResponseHeader.java |  54 +++++++++
 .../rocketmq/example/quickstart/Consumer.java      |   2 +-
 .../rocketmq/remoting/common/RemotingUtil.java     |  14 ++-
 .../org/apache/rocketmq/snode/SnodeController.java |   8 --
 .../snode/client/impl/SubscriptionManagerImpl.java |   2 +-
 .../snode/offset/ConsumerOffsetManager.java        | 127 ++++++++++++---------
 .../snode/processor/HeartbeatProcessor.java        |   1 +
 .../snode/processor/SendMessageProcessor.java      |  10 +-
 .../apache/rocketmq/snode/service/PushService.java |  15 +--
 .../snode/service/impl/PushServiceImpl.java        |  78 +++++++++----
 17 files changed, 302 insertions(+), 116 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 6bb378e..3557a17 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -17,6 +17,9 @@
 package org.apache.rocketmq.broker.processor;
 
 import io.netty.channel.ChannelHandlerContext;
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.Map;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
@@ -42,8 +45,8 @@ import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.common.sysflag.TopicSysFlag;
 import org.apache.rocketmq.remoting.RemotingChannel;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.RequestProcessor;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
@@ -51,10 +54,6 @@ import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.config.StorePathConfigHelper;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
-import java.net.SocketAddress;
-import java.util.List;
-import java.util.Map;
-
 public class SendMessageProcessor extends AbstractSendMessageProcessor implements RequestProcessor {
 
     private List<ConsumeMessageHook> consumeMessageHookList;
@@ -443,7 +442,10 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
             responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
             responseHeader.setQueueId(queueIdInt);
             responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
-
+            responseHeader.setCommitLogOffset(putMessageResult.getAppendMessageResult().getWroteOffset());
+            responseHeader.setStoreTimestamp(putMessageResult.getAppendMessageResult().getStoreTimestamp());
+            responseHeader.setStoreSize(putMessageResult.getAppendMessageResult().getWroteBytes());
+            responseHeader.setStoreHost(ctx.channel().localAddress().toString());
             doResponse(ctx, request, response);
 
             if (hasSendMessageHook()) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java
index e866d9c..abe1a57 100644
--- a/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java
@@ -99,6 +99,7 @@ public class SnodeConfig {
     private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
     private boolean enablePropertyFilter = true;
 
+    private int loadOffsetInterval = 3000;
     /**
      * Acl feature switch
      */
@@ -400,4 +401,13 @@ public class SnodeConfig {
     public void setMetricsEnable(boolean metricsEnable) {
         this.metricsEnable = metricsEnable;
     }
+
+    public int getLoadOffsetInterval() {
+        return loadOffsetInterval;
+    }
+
+    public void setLoadOffsetInterval(int loadOffsetInterval) {
+        this.loadOffsetInterval = loadOffsetInterval;
+    }
+
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/Message.java b/common/src/main/java/org/apache/rocketmq/common/message/Message.java
index 287be13..82e65c2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/Message.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/Message.java
@@ -181,7 +181,7 @@ public class Message implements Serializable {
         return properties;
     }
 
-    void setProperties(Map<String, String> properties) {
+    public void setProperties(Map<String, String> properties) {
         this.properties = properties;
     }
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
index af0b638..5b6ddca 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
@@ -16,9 +16,6 @@
  */
 package org.apache.rocketmq.common.message;
 
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.sysflag.MessageSysFlag;
-
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
@@ -29,6 +26,8 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 
 public class MessageDecoder {
     public final static int MSG_ID_LENGTH = 8 + 8;
@@ -41,7 +40,7 @@ public class MessageDecoder {
     public final static int MESSAGE_MAGIC_CODE = -626843481;
     public static final char NAME_VALUE_SEPARATOR = 1;
     public static final char PROPERTY_SEPARATOR = 2;
-    public static final int PHY_POS_POSITION =  4 + 4 + 4 + 4 + 4 + 8;
+    public static final int PHY_POS_POSITION = 4 + 4 + 4 + 4 + 4 + 8;
     public static final int BODY_SIZE_POSITION = 4 // 1 TOTALSIZE
         + 4 // 2 MAGICCODE
         + 4 // 3 BODYCRC
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java
index 3f77767..69b0a25 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java
@@ -215,13 +215,21 @@ public class MessageExt extends Message {
         this.preparedTransactionOffset = preparedTransactionOffset;
     }
 
-    @Override
-    public String toString() {
-        return "MessageExt [queueId=" + queueId + ", storeSize=" + storeSize + ", queueOffset=" + queueOffset
-            + ", sysFlag=" + sysFlag + ", bornTimestamp=" + bornTimestamp + ", bornHost=" + bornHost
-            + ", storeTimestamp=" + storeTimestamp + ", storeHost=" + storeHost + ", msgId=" + msgId
-            + ", commitLogOffset=" + commitLogOffset + ", bodyCRC=" + bodyCRC + ", reconsumeTimes="
-            + reconsumeTimes + ", preparedTransactionOffset=" + preparedTransactionOffset
-            + ", toString()=" + super.toString() + "]";
+    @Override public String toString() {
+        return "MessageExt{" +
+            "queueId=" + queueId +
+            ", storeSize=" + storeSize +
+            ", queueOffset=" + queueOffset +
+            ", sysFlag=" + sysFlag +
+            ", bornTimestamp=" + bornTimestamp +
+            ", bornHost=" + bornHost +
+            ", storeTimestamp=" + storeTimestamp +
+            ", storeHost=" + storeHost +
+            ", msgId='" + msgId + '\'' +
+            ", commitLogOffset=" + commitLogOffset +
+            ", bodyCRC=" + bodyCRC +
+            ", reconsumeTimes=" + reconsumeTimes +
+            ", preparedTransactionOffset=" + preparedTransactionOffset +
+            '}';
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
index bab833b..84ab10d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
@@ -20,6 +20,7 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
+import java.net.SocketAddress;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.annotation.CFNullable;
@@ -55,6 +56,10 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
 
     private String enodeName;
 
+    private SocketAddress bornHost;
+
+    private SocketAddress snodeHost;
+
     @Override
     public void checkFields() throws RemotingCommandException {
     }
@@ -171,6 +176,22 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
         this.enodeName = enodeName;
     }
 
+    public SocketAddress getBornHost() {
+        return bornHost;
+    }
+
+    public void setBornHost(SocketAddress bornHost) {
+        this.bornHost = bornHost;
+    }
+
+    public SocketAddress getSnodeHost() {
+        return snodeHost;
+    }
+
+    public void setSnodeHost(SocketAddress snodeHost) {
+        this.snodeHost = snodeHost;
+    }
+
     @Override public String toString() {
         return "SendMessageRequestHeader{" +
             "producerGroup='" + producerGroup + '\'' +
@@ -187,6 +208,8 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
             ", batch=" + batch +
             ", maxReconsumeTimes=" + maxReconsumeTimes +
             ", enodeName='" + enodeName + '\'' +
+            ", bornHost=" + bornHost +
+            ", snodeHost=" + snodeHost +
             '}';
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
index 10dce94..3260c25 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.common.protocol.header;
 
+import java.net.SocketAddress;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.annotation.CFNullable;
@@ -56,6 +57,10 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
 
     private String n; //enode name
 
+    private SocketAddress o; //born host
+
+    private SocketAddress p; //snode host
+
     public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) {
         SendMessageRequestHeader v1 = new SendMessageRequestHeader();
         v1.setProducerGroup(v2.a);
@@ -72,6 +77,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
         v1.setMaxReconsumeTimes(v2.l);
         v1.setBatch(v2.m);
         v1.setEnodeName(v2.n);
+        v1.setBornHost(v2.getO());
+        v1.setSnodeHost(v2.getP());
         return v1;
     }
 
@@ -91,6 +98,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
         v2.l = v1.getMaxReconsumeTimes();
         v2.m = v1.isBatch();
         v2.n = v1.getEnodeName();
+        v2.o = v1.getBornHost();
+        v2.p = v1.getSnodeHost();
         return v2;
     }
 
@@ -210,6 +219,22 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
         this.n = n;
     }
 
+    public SocketAddress getO() {
+        return o;
+    }
+
+    public void setO(SocketAddress o) {
+        this.o = o;
+    }
+
+    public SocketAddress getP() {
+        return p;
+    }
+
+    public void setP(SocketAddress p) {
+        this.p = p;
+    }
+
     @Override public String toString() {
         return "SendMessageRequestHeaderV2{" +
             "a='" + a + '\'' +
@@ -226,6 +251,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
             ", l=" + l +
             ", m=" + m +
             ", n='" + n + '\'' +
+            ", o=" + o +
+            ", p=" + p +
             '}';
     }
 }
\ No newline at end of file
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java
index 6834881..a287e3f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java
@@ -31,8 +31,17 @@ public class SendMessageResponseHeader implements CommandCustomHeader {
     private Integer queueId;
     @CFNotNull
     private Long queueOffset;
+
     private String transactionId;
 
+    private long storeTimestamp;
+
+    private String storeHost;
+
+    private long commitLogOffset;
+
+    private int storeSize;
+
     @Override
     public void checkFields() throws RemotingCommandException {
     }
@@ -68,4 +77,49 @@ public class SendMessageResponseHeader implements CommandCustomHeader {
     public void setTransactionId(String transactionId) {
         this.transactionId = transactionId;
     }
+
+    public long getStoreTimestamp() {
+        return storeTimestamp;
+    }
+
+    public void setStoreTimestamp(long storeTimestamp) {
+        this.storeTimestamp = storeTimestamp;
+    }
+
+    public long getCommitLogOffset() {
+        return commitLogOffset;
+    }
+
+    public void setCommitLogOffset(long commitLogOffset) {
+        this.commitLogOffset = commitLogOffset;
+    }
+
+    public int getStoreSize() {
+        return storeSize;
+    }
+
+    public void setStoreSize(int storeSize) {
+        this.storeSize = storeSize;
+    }
+
+    public String getStoreHost() {
+        return storeHost;
+    }
+
+    public void setStoreHost(String storeHost) {
+        this.storeHost = storeHost;
+    }
+
+    @Override public String toString() {
+        return "SendMessageResponseHeader{" +
+            "msgId='" + msgId + '\'' +
+            ", queueId=" + queueId +
+            ", queueOffset=" + queueOffset +
+            ", transactionId='" + transactionId + '\'' +
+            ", storeTimestamp=" + storeTimestamp +
+            ", storeHost=" + storeHost +
+            ", commitLogOffset=" + commitLogOffset +
+            ", storeSize=" + storeSize +
+            '}';
+    }
 }
diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
index c85ed9c..df3b3b0 100644
--- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
@@ -61,7 +61,7 @@ public class Consumer {
         /*
          *  Register callback to execute on arrival of messages fetched from brokers.
          */
-        consumer.setNamesrvAddr("47.102.149.193:9876");
+//        consumer.setNamesrvAddr("47.102.149.193:9876");
 
         consumer.registerMessageListener(new MessageListenerConcurrently() {
 
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
index c053d8f..a118c40 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
@@ -31,7 +31,6 @@ import java.nio.channels.SocketChannel;
 import java.nio.channels.spi.SelectorProvider;
 import java.util.ArrayList;
 import java.util.Enumeration;
-
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 
@@ -154,6 +153,19 @@ public class RemotingUtil {
         return isa;
     }
 
+    public static SocketAddress string2SocketAddressWithIp(final String address) {
+        String[] s = address.split(":");
+        try {
+            String ip = s[0].substring(1);
+            InetAddress inetAddress = InetAddress.getByName(ip);
+            InetSocketAddress isa = new InetSocketAddress(inetAddress, Integer.parseInt(s[1]));
+            return isa;
+        } catch (Exception e) {
+            log.error("Failed to obtain address", e);
+        }
+        return null;
+    }
+
     public static String socketAddress2String(final SocketAddress addr) {
         StringBuilder sb = new StringBuilder();
         InetSocketAddress inetSocketAddress = (InetSocketAddress) addr;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
index e3ed0e7..d133b74 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
@@ -171,14 +171,6 @@ public class SnodeController {
             "SnodeHeartbeatThread",
             true);
 
-//        this.consumerManagerExecutor = ThreadUtils.newThreadPoolExecutor(
-//            snodeConfig.getSnodeSendMessageMinPoolSize(),
-//            snodeConfig.getSnodeSendMessageMaxPoolSize(),
-//            3000,
-//            TimeUnit.MILLISECONDS,
-//            new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
-//            "SnodePullMessageThread",
-//            false);
 
         this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor(
             snodeConfig.getSnodeSendMessageMinPoolSize(),
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
index 72cfd6a..be337da 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
@@ -56,7 +56,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
                         Set<RemotingChannel> prev = pushTable.putIfAbsent(messageQueue, clientSet);
                         clientSet = prev != null ? prev : clientSet;
                     }
-                    log.debug("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress());
+                    log.info("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress());
                     clientSet.add(remotingChannel);
                 }
             }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
index e4ed77e..d9b23b4 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
@@ -16,16 +16,13 @@
  */
 package org.apache.rocketmq.snode.offset;
 
-import java.util.Iterator;
-import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
@@ -37,10 +34,10 @@ public class ConsumerOffsetManager {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
     private static final String TOPIC_GROUP_SEPARATOR = "@";
 
-    private ConcurrentMap<String/* Enode@Topic@Group */, ConcurrentMap<Integer, Long>> offsetTable =
+    private ConcurrentMap<String/* Enode@Topic@Group */, ConcurrentMap<Integer, CacheOffset>> offsetTable =
         new ConcurrentHashMap<>(512);
 
-    private transient SnodeController snodeController;
+    private SnodeController snodeController;
 
     public ConsumerOffsetManager(SnodeController brokerController) {
         this.snodeController = brokerController;
@@ -57,74 +54,63 @@ public class ConsumerOffsetManager {
         return sb.toString();
     }
 
-    private boolean offsetBehindMuchThanData(final String enodeName, final String topic,
-        ConcurrentMap<Integer, Long> table) throws InterruptedException, RemotingTimeoutException,
-        RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
-        Iterator<Entry<Integer, Long>> it = table.entrySet().iterator();
-        boolean result = !table.isEmpty();
-
-        while (it.hasNext() && result) {
-            Entry<Integer, Long> next = it.next();
-            RemotingCommand remotingCommand = this.snodeController.getEnodeService().getMinOffsetInQueue(enodeName, topic, next.getKey());
-            long minOffsetInStore = 0;
-            if (remotingCommand != null) {
-                switch (remotingCommand.getCode()) {
-                    case ResponseCode.SUCCESS: {
-                        GetMinOffsetResponseHeader responseHeader =
-                            (GetMinOffsetResponseHeader) remotingCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
-                        minOffsetInStore = responseHeader.getOffset();
-                    }
-                    default:
-                        break;
-                }
-            } else {
-                throw new SnodeException(ResponseCode.QUERY_OFFSET_ERROR, "Query min offset error!");
-            }
-            long offsetInPersist = next.getValue();
-            result = offsetInPersist <= minOffsetInStore;
-        }
-        return result;
-    }
-
     public void cacheOffset(final String enodeName, final String clientHost, final String group, final String topic,
         final int queueId,
         final long offset) {
-        // Topic@group
+        // EnodeName@Topic@group
         String key = buildKey(enodeName, topic, group);
         this.commitOffset(clientHost, key, queueId, offset);
     }
 
     private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
-        ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
+        ConcurrentMap<Integer, CacheOffset> map = this.offsetTable.get(key);
+        CacheOffset cacheOffset = new CacheOffset(key, offset, System.currentTimeMillis());
         if (null == map) {
             map = new ConcurrentHashMap<>(32);
-            ConcurrentMap<Integer, Long> prev = this.offsetTable.putIfAbsent(key, map);
+            ConcurrentMap<Integer, CacheOffset> prev = this.offsetTable.putIfAbsent(key, map);
             map = prev != null ? prev : map;
-            map.put(queueId, offset);
+            map.put(queueId, cacheOffset);
         } else {
-            Long storeOffset = map.put(queueId, offset);
-            if (storeOffset != null && offset < storeOffset) {
-                log.warn("[NOTIFYME]update consumer offset less than store. clientHost: {}, key: {}, queueId: {}, requestOffset: {}, storeOffset: {}", clientHost, key, queueId, offset, storeOffset);
+            CacheOffset storeOffset = map.put(queueId, cacheOffset);
+            if (storeOffset != null && offset < storeOffset.getOffset()) {
+                log.warn("[NOTIFYME]update consumer offset less than store. clientHost: {}, key: {}, queueId: {}, requestOffset: {}, storeOffset: {}",
+                    clientHost, key, queueId, offset, storeOffset);
             }
         }
     }
 
-    public long queryCacheOffset(final String enodeName, final String group, final String topic, final int queueId) {
-        String key = buildKey(enodeName, topic, group);
-        ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
-        if (null != map) {
-            Long offset = map.get(queueId);
-            if (offset != null)
-                return offset;
+    private long parserOffset(final String enodeName, final String group, final String topic, final int queueId) {
+        try {
+            RemotingCommand remotingCommand = queryOffset(enodeName, group, topic, queueId);
+            QueryConsumerOffsetResponseHeader responseHeader =
+                (QueryConsumerOffsetResponseHeader) remotingCommand.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
+            return responseHeader.getOffset();
+        } catch (Exception ex) {
+            log.error("Load offset from broker error", ex);
         }
-
         return -1;
     }
 
-    public ConcurrentMap<String, ConcurrentMap<Integer, Long>> getOffsetTable() {
-        return offsetTable;
-    }
+    public long queryCacheOffset(final String enodeName, final String group, final String topic, final int queueId) {
+        String key = buildKey(enodeName, topic, group);
+        ConcurrentMap<Integer, CacheOffset> map = this.offsetTable.get(key);
+        if (map == null) {
+            map = new ConcurrentHashMap<>();
+            map = this.offsetTable.putIfAbsent(key, map);
+        }
+        CacheOffset cacheOffset = map.get(queueId);
+        if (cacheOffset != null) {
+            if (System.currentTimeMillis() - cacheOffset.getUpdateTimestamp() > snodeController.getSnodeConfig().getLoadOffsetInterval()) {
+                cacheOffset.setOffset(parserOffset(enodeName, group, topic, queueId));
+                cacheOffset.setUpdateTimestamp(System.currentTimeMillis());
+            }
+        } else {
+            cacheOffset = new CacheOffset(key, parserOffset(enodeName, group, topic, queueId), System.currentTimeMillis());
+            map.put(queueId, cacheOffset);
+        }
+        return cacheOffset.getOffset();
 
+    }
 
     public void commitOffset(final String enodeName, final String clientHost, final String group, final String topic,
         final int queueId,
@@ -139,4 +125,39 @@ public class ConsumerOffsetManager {
         return this.snodeController.getEnodeService().loadOffset(enodeName, group, topic, queueId);
     }
 
+    public class CacheOffset {
+        private String key;
+        private long offset;
+        private long updateTimestamp;
+
+        public CacheOffset(final String key, final long offset, final long updateTimestamp) {
+            this.key = key;
+            this.offset = offset;
+            this.updateTimestamp = updateTimestamp;
+        }
+
+        public String getKey() {
+            return key;
+        }
+
+        public void setKey(String key) {
+            this.key = key;
+        }
+
+        public long getOffset() {
+            return offset;
+        }
+
+        public void setOffset(long offset) {
+            this.offset = offset;
+        }
+
+        public long getUpdateTimestamp() {
+            return updateTimestamp;
+        }
+
+        public void setUpdateTimestamp(long updateTimestamp) {
+            this.updateTimestamp = updateTimestamp;
+        }
+    }
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
index a36704c..08e342e 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
@@ -71,6 +71,7 @@ public class HeartbeatProcessor implements RequestProcessor {
 
     private RemotingCommand register(RemotingChannel remotingChannel, RemotingCommand request) {
         HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
+        log.info("heartbeatData: {}", heartbeatData);
         Channel channel = null;
         Attribute<Client> clientAttribute = null;
         if (remotingChannel instanceof NettyChannelHandlerContextImpl) {
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
index ff13aea..cadc4d7 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
@@ -22,6 +22,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -73,6 +74,7 @@ public class SendMessageProcessor implements RequestProcessor {
             request.getCode() == RequestCode.SEND_BATCH_MESSAGE) {
             sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
             enodeName = sendMessageRequestHeaderV2.getN();
+            sendMessageRequestHeaderV2.setP(remotingChannel.localAddress());
             stringBuffer.append(sendMessageRequestHeaderV2.getB());
         } else {
             isSendBack = true;
@@ -82,9 +84,12 @@ public class SendMessageProcessor implements RequestProcessor {
         }
 
         CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().sendMessage(enodeName, request);
-        final Integer queueId = sendMessageRequestHeaderV2.getE();
+
+        sendMessageRequestHeaderV2.setO(remotingChannel.remoteAddress());
         final byte[] message = request.getBody();
         final boolean isNeedPush = !isSendBack;
+        final SendMessageRequestHeader sendMessageRequestHeader =
+            SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(sendMessageRequestHeaderV2);
         responseFuture.whenComplete((data, ex) -> {
             if (ex == null) {
                 if (this.snodeController.getSendMessageInterceptorGroup() != null) {
@@ -94,7 +99,8 @@ public class SendMessageProcessor implements RequestProcessor {
                 remotingChannel.reply(data);
                 this.snodeController.getMetricsService().recordRequestSize(stringBuffer.toString(), request.getBody().length);
                 if (data.getCode() == ResponseCode.SUCCESS && isNeedPush) {
-                    this.snodeController.getPushService().pushMessage(enodeName, stringBuffer.toString(), queueId, message, data);
+                    log.info("Send message response: {}", data);
+                    this.snodeController.getPushService().pushMessage(sendMessageRequestHeader, message, data);
                 }
             } else {
                 this.snodeController.getMetricsService().incRequestCount(request.getCode(), false);
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java
index ad1cab3..597afee 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java
@@ -16,19 +16,20 @@
  */
 package org.apache.rocketmq.snode.service;
 
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public interface PushService {
     /**
-     * TODO how to resolve the slow consumer: close or ignore?
+     * Push message to consumer which subscribed target {@link MessageQueue}
+     * <p>
      *
-     * @param enodeName
-     * @param topic
-     * @param queueId
-     * @param message
-     * @param response
+     * @param requestHeader Send message request header
+     * @param message Message body
+     * @param response Send message response
      */
-    void pushMessage(final String enodeName, final String topic, final Integer queueId, final byte[] message,
+    void pushMessage(final SendMessageRequestHeader requestHeader, final byte[] message,
         final RemotingCommand response);
 
     void shutdown();
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
index da6e9e5..37bb9f6 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
@@ -23,15 +23,20 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.header.PushMessageHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
 import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.snode.SnodeController;
@@ -61,34 +66,58 @@ public class PushServiceImpl implements PushService {
     public class PushTask implements Runnable {
         private AtomicBoolean canceled = new AtomicBoolean(false);
         private final byte[] message;
-        private final Integer queueId;
-        private final String topic;
-        private final RemotingCommand response;
-        private final String enodeName;
+        private final RemotingCommand sendMessageResponse;
+        private final SendMessageRequestHeader sendMessageRequestHeader;
 
-        public PushTask(final String topic, final Integer queueId, final byte[] message,
-            final RemotingCommand response, final String enodeName) {
+        public PushTask(final SendMessageRequestHeader sendMessageRequestHeader, final byte[] message,
+            final RemotingCommand response) {
             this.message = message;
-            this.queueId = queueId;
-            this.topic = topic;
-            this.response = response;
-            this.enodeName = enodeName;
+            this.sendMessageRequestHeader = sendMessageRequestHeader;
+            this.sendMessageResponse = response;
+        }
+
+        private MessageExt buildMessageExt(final SendMessageResponseHeader sendMessageResponseHeader,
+            final byte[] message, final SendMessageRequestHeader sendMessageRequestHeader) {
+            MessageExt messageExt = new MessageExt();
+            messageExt.setProperties(MessageDecoder.string2messageProperties(sendMessageRequestHeader.getProperties()));
+            messageExt.setTopic(sendMessageRequestHeader.getTopic());
+            messageExt.setMsgId(sendMessageResponseHeader.getMsgId());
+            messageExt.setQueueId(sendMessageResponseHeader.getQueueId());
+            messageExt.setQueueOffset(sendMessageResponseHeader.getQueueOffset());
+            messageExt.setReconsumeTimes(sendMessageRequestHeader.getReconsumeTimes());
+            messageExt.setCommitLogOffset(sendMessageResponseHeader.getCommitLogOffset());
+            messageExt.setBornTimestamp(sendMessageRequestHeader.getBornTimestamp());
+            messageExt.setBornHost(sendMessageRequestHeader.getBornHost());
+//            messageExt.setStoreSize(sendMessageResponseHeader.getStoreSize());
+            messageExt.setStoreHost(RemotingUtil.string2SocketAddressWithIp(sendMessageResponseHeader.getStoreHost()));
+            messageExt.setStoreTimestamp(sendMessageResponseHeader.getStoreTimestamp());
+            messageExt.setWaitStoreMsgOK(false);
+            messageExt.setSnodeAddress(sendMessageRequestHeader.getSnodeHost());
+            messageExt.setSysFlag(sendMessageRequestHeader.getSysFlag());
+            messageExt.setFlag(sendMessageRequestHeader.getFlag());
+            messageExt.setBody(message);
+            messageExt.setBodyCRC(UtilAll.crc32(message));
+            log.info("MessageExt:{}", messageExt);
+            return messageExt;
         }
 
         @Override
         public void run() {
             if (!canceled.get()) {
                 try {
-                    SendMessageResponseHeader sendMessageResponseHeader = (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
-                    PushMessageHeader pushMessageHeader = new PushMessageHeader();
-                    pushMessageHeader.setQueueOffset(sendMessageResponseHeader.getQueueOffset());
-                    pushMessageHeader.setTopic(topic);
-                    pushMessageHeader.setQueueId(queueId);
-                    RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.SNODE_PUSH_MESSAGE, pushMessageHeader);
-                    pushMessage.setBody(message);
-                    MessageQueue messageQueue = new MessageQueue(topic, enodeName, queueId);
+                    log.info("sendMessageResponse:{}", sendMessageResponse);
+                    SendMessageResponseHeader sendMessageResponseHeader = (SendMessageResponseHeader) sendMessageResponse.decodeCommandCustomHeader(SendMessageResponseHeader.class);
+                    log.info("sendMessageResponseHeader:{}", sendMessageResponseHeader);
+                    MessageQueue messageQueue = new MessageQueue(sendMessageRequestHeader.getTopic(), sendMessageRequestHeader.getEnodeName(), sendMessageRequestHeader.getQueueId());
                     Set<RemotingChannel> consumerTable = snodeController.getSubscriptionManager().getPushableChannel(messageQueue);
                     if (consumerTable != null) {
+                        PushMessageHeader pushMessageHeader = new PushMessageHeader();
+                        pushMessageHeader.setQueueOffset(sendMessageResponseHeader.getQueueOffset());
+                        pushMessageHeader.setTopic(sendMessageRequestHeader.getTopic());
+                        pushMessageHeader.setQueueId(sendMessageResponseHeader.getQueueId());
+                        RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.SNODE_PUSH_MESSAGE, pushMessageHeader);
+                        MessageExt messageExt = buildMessageExt(sendMessageResponseHeader, message, sendMessageRequestHeader);
+                        pushMessage.setBody(MessageDecoder.encode(messageExt, false));
                         for (RemotingChannel remotingChannel : consumerTable) {
                             Client client = null;
                             if (remotingChannel instanceof NettyChannelImpl) {
@@ -101,13 +130,14 @@ public class PushServiceImpl implements PushService {
                             if (client != null) {
                                 for (String consumerGroup : client.getGroups()) {
                                     Subscription subscription = snodeController.getSubscriptionManager().getSubscription(consumerGroup);
-                                    if (subscription.getSubscriptionData(topic) != null) {
-                                        boolean slowConsumer = snodeController.getSlowConsumerService().isSlowConsumer(sendMessageResponseHeader.getQueueOffset(), topic, queueId, consumerGroup, enodeName);
+                                    if (subscription.getSubscriptionData(sendMessageRequestHeader.getTopic()) != null) {
+                                        boolean slowConsumer = snodeController.getSlowConsumerService().isSlowConsumer(sendMessageResponseHeader.getQueueOffset(), sendMessageRequestHeader.getTopic(), sendMessageRequestHeader.getQueueId(), consumerGroup, sendMessageRequestHeader.getEnodeName());
                                         if (slowConsumer) {
                                             log.warn("[SlowConsumer]: {} is slow consumer", remotingChannel);
                                             snodeController.getSlowConsumerService().slowConsumerResolve(pushMessage, remotingChannel);
                                             continue;
                                         }
+                                        pushMessageHeader.setConsumerGroup(consumerGroup);
                                         snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
                                     }
                                 }
@@ -119,10 +149,10 @@ public class PushServiceImpl implements PushService {
                         log.info("No online registered as push consumer and online for messageQueue: {} ", messageQueue);
                     }
                 } catch (Exception ex) {
-                    log.warn("Push message to topic: {} queueId: {}", topic, queueId, ex);
+                    log.warn("Push message to topic: {} queueId: {}", sendMessageRequestHeader.getTopic(), sendMessageRequestHeader.getQueueId(), ex);
                 }
             } else {
-                log.info("Push message to topic: {} queueId: {} canceled!", topic, queueId);
+                log.info("Push message to topic: {} queueId: {} canceled!", sendMessageRequestHeader.getTopic(), sendMessageRequestHeader.getQueueId());
             }
         }
 
@@ -133,9 +163,9 @@ public class PushServiceImpl implements PushService {
     }
 
     @Override
-    public void pushMessage(final String enodeName, final String topic, final Integer queueId, final byte[] message,
+    public void pushMessage(final SendMessageRequestHeader requestHeader, final byte[] message,
         final RemotingCommand response) {
-        PushTask pushTask = new PushTask(topic, queueId, message, response, enodeName);
+        PushTask pushTask = new PushTask(requestHeader, message, response);
         pushMessageExecutorService.submit(pushTask);
     }