You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by "zk-drizzle (via GitHub)" <gi...@apache.org> on 2023/05/09 09:50:42 UTC

[GitHub] [rocketmq] zk-drizzle opened a new pull request, #6724: [RIP-64] Heartbeat Optimization

zk-drizzle opened a new pull request, #6724:
URL: https://github.com/apache/rocketmq/pull/6724

   <!-- Please make sure the target branch is right. In most case, the target branch should be `develop`. -->
   
   ### Which Issue(s) This PR Fixes
   
   <!-- Please ensure that the related issue has already been created, and [link this pull request to that issue using keywords](<https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword>) to ensure automatic closure. -->
   
   Fixes #6720 
   
   ### Brief Description
   
   <!-- Write a brief description for your pull request to help the maintainer understand the reasons behind your changes. -->
   
   ### How Did You Test This Change?
   
   <!-- In order to ensure the code quality of Apache RocketMQ, we expect every pull request to have undergone thorough testing. -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] lizhimins commented on a diff in pull request #6724: [RIP-64] Heartbeat Optimization

Posted by "lizhimins (via GitHub)" <gi...@apache.org>.
lizhimins commented on code in PR #6724:
URL: https://github.com/apache/rocketmq/pull/6724#discussion_r1227820946


##########
broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java:
##########
@@ -138,6 +144,63 @@ public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand requ
         }
         response.setCode(ResponseCode.SUCCESS);
         response.setRemark(null);
+        response.addExtField(MixAll.IS_SUPPORT_HEART_BEAT_V2, Boolean.TRUE.toString());
+        response.addExtField(MixAll.IS_SUB_CHANGE, Boolean.TRUE.toString());
+        return response;
+    }
+
+    private RemotingCommand heartBeatV2(ChannelHandlerContext ctx, HeartbeatData heartbeatData, ClientChannelInfo clientChannelInfo, RemotingCommand response) {
+        boolean isSubChange = false;
+        for (ConsumerData consumerData : heartbeatData.getConsumerDataSet()) {
+            //Reject the PullConsumer
+            if (brokerController.getBrokerConfig().isRejectPullConsumerEnable()) {
+                if (ConsumeType.CONSUME_ACTIVELY == consumerData.getConsumeType()) {
+                    continue;
+                }
+            }
+            if (null != consumerGroupHeartbeatTable.get(consumerData.getGroupName()) && consumerGroupHeartbeatTable.get(consumerData.getGroupName()) != heartbeatData.getHeartbeatFingerprint()) {

Review Comment:
   Could use StringUtils.equals()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] zk-drizzle commented on a diff in pull request #6724: [RIP-64] Heartbeat Optimization

Posted by "zk-drizzle (via GitHub)" <gi...@apache.org>.
zk-drizzle commented on code in PR #6724:
URL: https://github.com/apache/rocketmq/pull/6724#discussion_r1226155065


##########
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java:
##########
@@ -563,6 +589,83 @@ private void sendHeartbeatToAllBroker() {
         }
     }
 
+    private void sendHeartbeatToAllBrokerV2(boolean isRebalance) {
+        final HeartbeatData heartbeatDataWithSub = this.prepareHeartbeatData(false);
+        final boolean producerEmpty = heartbeatDataWithSub.getProducerDataSet().isEmpty();
+        final boolean consumerEmpty = heartbeatDataWithSub.getConsumerDataSet().isEmpty();
+        if (producerEmpty && consumerEmpty) {
+            log.warn("sendHeartbeatToAllBrokerV2 sending heartbeat, but no consumer and no producer. [{}]", this.clientId);
+            return;
+        }
+        if (this.brokerAddrTable.isEmpty()) {
+            return;
+        }
+        if (isRebalance) {
+            resetBrokerAddrHeartbeatFingerprintMap();
+        }
+        long times = this.sendHeartbeatTimesTotal.getAndIncrement();
+        int currentHeartbeatFingerprint = heartbeatDataWithSub.computeHeartbeatFingerprint();
+        heartbeatDataWithSub.setHeartbeatFingerprint(currentHeartbeatFingerprint);
+        HeartbeatData heartbeatDataWithoutSub = this.prepareHeartbeatData(true);
+        heartbeatDataWithoutSub.setHeartbeatFingerprint(currentHeartbeatFingerprint);
+
+        for (Entry<String, HashMap<Long, String>> brokerClusterInfo : this.brokerAddrTable.entrySet()) {
+            String brokerName = brokerClusterInfo.getKey();
+            HashMap<Long, String> oneTable = brokerClusterInfo.getValue();
+            if (oneTable == null) {
+                continue;
+            }
+            for (Entry<Long, String> singleBrokerInstance : oneTable.entrySet()) {
+                Long id = singleBrokerInstance.getKey();
+                String addr = singleBrokerInstance.getValue();
+                if (addr == null) {
+                    continue;
+                }
+                if (consumerEmpty && MixAll.MASTER_ID != id) {
+                    continue;
+                }
+                try {
+                    int version = 0;
+                    boolean isBrokerSupportV2 = brokerSupportV2HeartbeatSet.contains(addr);
+                    HeartbeatV2Result heartbeatV2Result = null;
+                    if (isBrokerSupportV2 && null != brokerAddrHeartbeatFingerprintTable.get(addr) && brokerAddrHeartbeatFingerprintTable.get(addr) == currentHeartbeatFingerprint) {
+                        heartbeatV2Result = this.mQClientAPIImpl.sendHeartbeatV2(addr, heartbeatDataWithoutSub, 3000);
+                        if (heartbeatV2Result.isSubChange()) {
+                            brokerAddrHeartbeatFingerprintTable.remove(addr);
+                        }
+                        log.info("sendHeartbeatToAllBrokerV2 simple brokerName: {} subChange: {} brokerAddrHeartbeatFingerprintTable: {}", brokerName, heartbeatV2Result.isSubChange(), JSON.toJSONString(brokerAddrHeartbeatFingerprintTable));
+                    } else {
+                        heartbeatV2Result = this.mQClientAPIImpl.sendHeartbeatV2(addr, heartbeatDataWithSub, 3000);

Review Comment:
   ![Uploading image.png…]()
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] codecov-commenter commented on pull request #6724: [RIP-64] Heartbeat Optimization

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #6724:
URL: https://github.com/apache/rocketmq/pull/6724#issuecomment-1584598667

   ## [Codecov](https://app.codecov.io/gh/apache/rocketmq/pull/6724?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#6724](https://app.codecov.io/gh/apache/rocketmq/pull/6724?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (493e8df) into [develop](https://app.codecov.io/gh/apache/rocketmq/commit/eef581b464d0144a3ec400a20087196f7eefd764?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (eef581b) will **decrease** coverage by `0.50%`.
   > The diff coverage is `28.94%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             develop    #6724      +/-   ##
   =============================================
   - Coverage      42.93%   42.44%   -0.50%     
   - Complexity      8995     9134     +139     
   =============================================
     Files           1104     1126      +22     
     Lines          78380    80442    +2062     
     Branches       10207    10493     +286     
   =============================================
   + Hits           33654    34140     +486     
   - Misses         40512    42006    +1494     
   - Partials        4214     4296      +82     
   ```
   
   
   | [Impacted Files](https://app.codecov.io/gh/apache/rocketmq/pull/6724?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [...g/apache/rocketmq/client/impl/MQClientAPIImpl.java](https://app.codecov.io/gh/apache/rocketmq/pull/6724?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9NUUNsaWVudEFQSUltcGwuamF2YQ==) | `22.56% <0.00%> (-0.82%)` | :arrow_down: |
   | [...c/main/java/org/apache/rocketmq/common/MixAll.java](https://app.codecov.io/gh/apache/rocketmq/pull/6724?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vTWl4QWxsLmphdmE=) | `38.98% <ø> (-2.46%)` | :arrow_down: |
   | [...he/rocketmq/remoting/common/HeartbeatV2Result.java](https://app.codecov.io/gh/apache/rocketmq/pull/6724?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cmVtb3Rpbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3JlbW90aW5nL2NvbW1vbi9IZWFydGJlYXRWMlJlc3VsdC5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...tmq/remoting/protocol/heartbeat/HeartbeatData.java](https://app.codecov.io/gh/apache/rocketmq/pull/6724?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cmVtb3Rpbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3JlbW90aW5nL3Byb3RvY29sL2hlYXJ0YmVhdC9IZWFydGJlYXREYXRhLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...rocketmq/client/impl/factory/MQClientInstance.java](https://app.codecov.io/gh/apache/rocketmq/pull/6724?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9mYWN0b3J5L01RQ2xpZW50SW5zdGFuY2UuamF2YQ==) | `44.72% <20.25%> (-3.23%)` | :arrow_down: |
   | [...apache/rocketmq/broker/client/ConsumerManager.java](https://app.codecov.io/gh/apache/rocketmq/pull/6724?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvY2xpZW50L0NvbnN1bWVyTWFuYWdlci5qYXZh) | `79.21% <41.66%> (-1.51%)` | :arrow_down: |
   | [...cketmq/broker/processor/ClientManageProcessor.java](https://app.codecov.io/gh/apache/rocketmq/pull/6724?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvcHJvY2Vzc29yL0NsaWVudE1hbmFnZVByb2Nlc3Nvci5qYXZh) | `44.21% <61.70%> (+17.21%)` | :arrow_up: |
   | [.../java/org/apache/rocketmq/client/ClientConfig.java](https://app.codecov.io/gh/apache/rocketmq/pull/6724?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvQ2xpZW50Q29uZmlnLmphdmE=) | `63.48% <66.66%> (+0.11%)` | :arrow_up: |
   | [...cketmq/client/impl/consumer/RebalancePushImpl.java](https://app.codecov.io/gh/apache/rocketmq/pull/6724?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9jb25zdW1lci9SZWJhbGFuY2VQdXNoSW1wbC5qYXZh) | `49.18% <100.00%> (ø)` | |
   
   ... and [106 files with indirect coverage changes](https://app.codecov.io/gh/apache/rocketmq/pull/6724/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #6724: [RIP-64] Heartbeat Optimization

Posted by "fuyou001 (via GitHub)" <gi...@apache.org>.
fuyou001 commented on code in PR #6724:
URL: https://github.com/apache/rocketmq/pull/6724#discussion_r1224879597


##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/heartbeat/HeartbeatData.java:
##########
@@ -53,9 +56,38 @@ public void setConsumerDataSet(Set<ConsumerData> consumerDataSet) {
         this.consumerDataSet = consumerDataSet;
     }
 
+    public int getHeartbeatFingerprint() {
+        return heartbeatFingerprint;
+    }
+
+    public void setHeartbeatFingerprint(int heartbeatFingerprint) {
+        this.heartbeatFingerprint = heartbeatFingerprint;
+    }
+
+    public boolean isWithoutSub() {
+        return isWithoutSub;
+    }
+
+    public void setWithoutSub(boolean withoutSub) {
+        isWithoutSub = withoutSub;
+    }
+
     @Override
     public String toString() {
         return "HeartbeatData [clientID=" + clientID + ", producerDataSet=" + producerDataSet
             + ", consumerDataSet=" + consumerDataSet + "]";
     }
+
+    public int computeHeartbeatFingerprint() {
+        HeartbeatData heartbeatDataCopy = JSON.parseObject(JSON.toJSONString(this), HeartbeatData.class);

Review Comment:
   the performance issues in the code, especially when there is a large MQ cluster and many subscribed topics



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on pull request #6724: [RIP-64] Heartbeat Optimization

Posted by "fuyou001 (via GitHub)" <gi...@apache.org>.
fuyou001 commented on PR #6724:
URL: https://github.com/apache/rocketmq/pull/6724#issuecomment-1567826790

   Could you please add unit test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #6724: [RIP-64] Heartbeat Optimization

Posted by "fuyou001 (via GitHub)" <gi...@apache.org>.
fuyou001 commented on code in PR #6724:
URL: https://github.com/apache/rocketmq/pull/6724#discussion_r1209671177


##########
client/src/main/java/org/apache/rocketmq/client/ClientConfig.java:
##########
@@ -64,6 +65,7 @@ public class ClientConfig {
     private boolean decodeReadBody = Boolean.parseBoolean(System.getProperty(DECODE_READ_BODY, "true"));
     private boolean decodeDecompressBody = Boolean.parseBoolean(System.getProperty(DECODE_DECOMPRESS_BODY, "true"));
     private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "false"));
+    private boolean useHeartbeatV2 = Boolean.parseBoolean(System.getProperty(HEART_BEAT_V2, "true"));

Review Comment:
   HEART_BEAT_V2 default values false



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] zk-drizzle commented on a diff in pull request #6724: [RIP-64] Heartbeat Optimization

Posted by "zk-drizzle (via GitHub)" <gi...@apache.org>.
zk-drizzle commented on code in PR #6724:
URL: https://github.com/apache/rocketmq/pull/6724#discussion_r1226160674


##########
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java:
##########
@@ -563,6 +589,83 @@ private void sendHeartbeatToAllBroker() {
         }
     }
 
+    private void sendHeartbeatToAllBrokerV2(boolean isRebalance) {
+        final HeartbeatData heartbeatDataWithSub = this.prepareHeartbeatData(false);
+        final boolean producerEmpty = heartbeatDataWithSub.getProducerDataSet().isEmpty();
+        final boolean consumerEmpty = heartbeatDataWithSub.getConsumerDataSet().isEmpty();
+        if (producerEmpty && consumerEmpty) {
+            log.warn("sendHeartbeatToAllBrokerV2 sending heartbeat, but no consumer and no producer. [{}]", this.clientId);
+            return;
+        }
+        if (this.brokerAddrTable.isEmpty()) {
+            return;
+        }
+        if (isRebalance) {
+            resetBrokerAddrHeartbeatFingerprintMap();
+        }
+        long times = this.sendHeartbeatTimesTotal.getAndIncrement();
+        int currentHeartbeatFingerprint = heartbeatDataWithSub.computeHeartbeatFingerprint();
+        heartbeatDataWithSub.setHeartbeatFingerprint(currentHeartbeatFingerprint);
+        HeartbeatData heartbeatDataWithoutSub = this.prepareHeartbeatData(true);
+        heartbeatDataWithoutSub.setHeartbeatFingerprint(currentHeartbeatFingerprint);
+
+        for (Entry<String, HashMap<Long, String>> brokerClusterInfo : this.brokerAddrTable.entrySet()) {
+            String brokerName = brokerClusterInfo.getKey();
+            HashMap<Long, String> oneTable = brokerClusterInfo.getValue();
+            if (oneTable == null) {
+                continue;
+            }
+            for (Entry<Long, String> singleBrokerInstance : oneTable.entrySet()) {
+                Long id = singleBrokerInstance.getKey();
+                String addr = singleBrokerInstance.getValue();
+                if (addr == null) {
+                    continue;
+                }
+                if (consumerEmpty && MixAll.MASTER_ID != id) {
+                    continue;
+                }
+                try {
+                    int version = 0;
+                    boolean isBrokerSupportV2 = brokerSupportV2HeartbeatSet.contains(addr);
+                    HeartbeatV2Result heartbeatV2Result = null;
+                    if (isBrokerSupportV2 && null != brokerAddrHeartbeatFingerprintTable.get(addr) && brokerAddrHeartbeatFingerprintTable.get(addr) == currentHeartbeatFingerprint) {
+                        heartbeatV2Result = this.mQClientAPIImpl.sendHeartbeatV2(addr, heartbeatDataWithoutSub, 3000);

Review Comment:
   Thank you very much for your review, take it ~



##########
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java:
##########
@@ -563,6 +589,83 @@ private void sendHeartbeatToAllBroker() {
         }
     }
 
+    private void sendHeartbeatToAllBrokerV2(boolean isRebalance) {
+        final HeartbeatData heartbeatDataWithSub = this.prepareHeartbeatData(false);
+        final boolean producerEmpty = heartbeatDataWithSub.getProducerDataSet().isEmpty();
+        final boolean consumerEmpty = heartbeatDataWithSub.getConsumerDataSet().isEmpty();
+        if (producerEmpty && consumerEmpty) {
+            log.warn("sendHeartbeatToAllBrokerV2 sending heartbeat, but no consumer and no producer. [{}]", this.clientId);
+            return;
+        }
+        if (this.brokerAddrTable.isEmpty()) {
+            return;
+        }
+        if (isRebalance) {
+            resetBrokerAddrHeartbeatFingerprintMap();
+        }
+        long times = this.sendHeartbeatTimesTotal.getAndIncrement();
+        int currentHeartbeatFingerprint = heartbeatDataWithSub.computeHeartbeatFingerprint();
+        heartbeatDataWithSub.setHeartbeatFingerprint(currentHeartbeatFingerprint);
+        HeartbeatData heartbeatDataWithoutSub = this.prepareHeartbeatData(true);
+        heartbeatDataWithoutSub.setHeartbeatFingerprint(currentHeartbeatFingerprint);
+
+        for (Entry<String, HashMap<Long, String>> brokerClusterInfo : this.brokerAddrTable.entrySet()) {
+            String brokerName = brokerClusterInfo.getKey();
+            HashMap<Long, String> oneTable = brokerClusterInfo.getValue();
+            if (oneTable == null) {
+                continue;
+            }
+            for (Entry<Long, String> singleBrokerInstance : oneTable.entrySet()) {
+                Long id = singleBrokerInstance.getKey();
+                String addr = singleBrokerInstance.getValue();
+                if (addr == null) {
+                    continue;
+                }
+                if (consumerEmpty && MixAll.MASTER_ID != id) {
+                    continue;
+                }
+                try {
+                    int version = 0;
+                    boolean isBrokerSupportV2 = brokerSupportV2HeartbeatSet.contains(addr);
+                    HeartbeatV2Result heartbeatV2Result = null;
+                    if (isBrokerSupportV2 && null != brokerAddrHeartbeatFingerprintTable.get(addr) && brokerAddrHeartbeatFingerprintTable.get(addr) == currentHeartbeatFingerprint) {
+                        heartbeatV2Result = this.mQClientAPIImpl.sendHeartbeatV2(addr, heartbeatDataWithoutSub, 3000);
+                        if (heartbeatV2Result.isSubChange()) {
+                            brokerAddrHeartbeatFingerprintTable.remove(addr);
+                        }
+                        log.info("sendHeartbeatToAllBrokerV2 simple brokerName: {} subChange: {} brokerAddrHeartbeatFingerprintTable: {}", brokerName, heartbeatV2Result.isSubChange(), JSON.toJSONString(brokerAddrHeartbeatFingerprintTable));
+                    } else {
+                        heartbeatV2Result = this.mQClientAPIImpl.sendHeartbeatV2(addr, heartbeatDataWithSub, 3000);

Review Comment:
   Thank you very much for your review, take it ~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] zk-drizzle commented on pull request #6724: [RIP-64] Heartbeat Optimization

Posted by "zk-drizzle (via GitHub)" <gi...@apache.org>.
zk-drizzle commented on PR #6724:
URL: https://github.com/apache/rocketmq/pull/6724#issuecomment-1568382353

   > Could you add unit and compatibility test
   
   Thanks very much for your review . No problem,I will add this part .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #6724: [RIP-64] Heartbeat Optimization

Posted by "fuyou001 (via GitHub)" <gi...@apache.org>.
fuyou001 commented on code in PR #6724:
URL: https://github.com/apache/rocketmq/pull/6724#discussion_r1228029295


##########
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java:
##########
@@ -120,6 +122,8 @@ public class MQClientInstance {
     private final ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable = new ConcurrentHashMap<>();
 
     private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable = new ConcurrentHashMap<>();
+    private final Set<String/* Broker address */> brokerSupportV2HeartbeatSet = new HashSet();

Review Comment:
   not thread safe. use ConcurentHashMap



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin merged pull request #6724: [RIP-64] Heartbeat Optimization

Posted by "RongtongJin (via GitHub)" <gi...@apache.org>.
RongtongJin merged PR #6724:
URL: https://github.com/apache/rocketmq/pull/6724


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] zk-drizzle commented on a diff in pull request #6724: [RIP-64] Heartbeat Optimization

Posted by "zk-drizzle (via GitHub)" <gi...@apache.org>.
zk-drizzle commented on code in PR #6724:
URL: https://github.com/apache/rocketmq/pull/6724#discussion_r1226159355


##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/heartbeat/HeartbeatData.java:
##########
@@ -53,9 +56,38 @@ public void setConsumerDataSet(Set<ConsumerData> consumerDataSet) {
         this.consumerDataSet = consumerDataSet;
     }
 
+    public int getHeartbeatFingerprint() {
+        return heartbeatFingerprint;
+    }
+
+    public void setHeartbeatFingerprint(int heartbeatFingerprint) {
+        this.heartbeatFingerprint = heartbeatFingerprint;
+    }
+
+    public boolean isWithoutSub() {
+        return isWithoutSub;
+    }
+
+    public void setWithoutSub(boolean withoutSub) {
+        isWithoutSub = withoutSub;
+    }
+
     @Override
     public String toString() {
         return "HeartbeatData [clientID=" + clientID + ", producerDataSet=" + producerDataSet
             + ", consumerDataSet=" + consumerDataSet + "]";
     }
+
+    public int computeHeartbeatFingerprint() {
+        HeartbeatData heartbeatDataCopy = JSON.parseObject(JSON.toJSONString(this), HeartbeatData.class);

Review Comment:
   ![image](https://github.com/apache/rocketmq/assets/13689954/adeca391-b6e9-4699-8203-dc4ec70cc4ec)
   Thank for your review at first,when a cid subscribe 500 topics, the computeHeartbeatFingerprint cost 70ms(In a 30s heartbeat cycle, it's not very much, but our broker can free up a lot of computation and reduce network bandwidth pressure).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] zk-drizzle commented on pull request #6724: [RIP-64] Heartbeat Optimization

Posted by "zk-drizzle (via GitHub)" <gi...@apache.org>.
zk-drizzle commented on PR #6724:
URL: https://github.com/apache/rocketmq/pull/6724#issuecomment-1584235068

   The test report on heartbeat optimization is as follows link
   https://docs.google.com/document/d/1aA159aIitPZLqaCoZ-FCCbacN1BiTN1c4q5C0gmMHbk/edit?usp=sharing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] zk-drizzle commented on a diff in pull request #6724: [RIP-64] Heartbeat Optimization

Posted by "zk-drizzle (via GitHub)" <gi...@apache.org>.
zk-drizzle commented on code in PR #6724:
URL: https://github.com/apache/rocketmq/pull/6724#discussion_r1224871924


##########
client/src/main/java/org/apache/rocketmq/client/ClientConfig.java:
##########
@@ -64,6 +65,7 @@ public class ClientConfig {
     private boolean decodeReadBody = Boolean.parseBoolean(System.getProperty(DECODE_READ_BODY, "true"));
     private boolean decodeDecompressBody = Boolean.parseBoolean(System.getProperty(DECODE_DECOMPRESS_BODY, "true"));
     private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "false"));
+    private boolean useHeartbeatV2 = Boolean.parseBoolean(System.getProperty(HEART_BEAT_V2, "true"));

Review Comment:
   get 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #6724: [RIP-64] Heartbeat Optimization

Posted by "RongtongJin (via GitHub)" <gi...@apache.org>.
RongtongJin commented on code in PR #6724:
URL: https://github.com/apache/rocketmq/pull/6724#discussion_r1225681656


##########
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java:
##########
@@ -563,6 +589,83 @@ private void sendHeartbeatToAllBroker() {
         }
     }
 
+    private void sendHeartbeatToAllBrokerV2(boolean isRebalance) {
+        final HeartbeatData heartbeatDataWithSub = this.prepareHeartbeatData(false);
+        final boolean producerEmpty = heartbeatDataWithSub.getProducerDataSet().isEmpty();
+        final boolean consumerEmpty = heartbeatDataWithSub.getConsumerDataSet().isEmpty();
+        if (producerEmpty && consumerEmpty) {
+            log.warn("sendHeartbeatToAllBrokerV2 sending heartbeat, but no consumer and no producer. [{}]", this.clientId);
+            return;
+        }
+        if (this.brokerAddrTable.isEmpty()) {
+            return;
+        }
+        if (isRebalance) {
+            resetBrokerAddrHeartbeatFingerprintMap();
+        }
+        long times = this.sendHeartbeatTimesTotal.getAndIncrement();
+        int currentHeartbeatFingerprint = heartbeatDataWithSub.computeHeartbeatFingerprint();
+        heartbeatDataWithSub.setHeartbeatFingerprint(currentHeartbeatFingerprint);
+        HeartbeatData heartbeatDataWithoutSub = this.prepareHeartbeatData(true);
+        heartbeatDataWithoutSub.setHeartbeatFingerprint(currentHeartbeatFingerprint);
+
+        for (Entry<String, HashMap<Long, String>> brokerClusterInfo : this.brokerAddrTable.entrySet()) {
+            String brokerName = brokerClusterInfo.getKey();
+            HashMap<Long, String> oneTable = brokerClusterInfo.getValue();
+            if (oneTable == null) {
+                continue;
+            }
+            for (Entry<Long, String> singleBrokerInstance : oneTable.entrySet()) {
+                Long id = singleBrokerInstance.getKey();
+                String addr = singleBrokerInstance.getValue();
+                if (addr == null) {
+                    continue;
+                }
+                if (consumerEmpty && MixAll.MASTER_ID != id) {
+                    continue;
+                }
+                try {
+                    int version = 0;
+                    boolean isBrokerSupportV2 = brokerSupportV2HeartbeatSet.contains(addr);
+                    HeartbeatV2Result heartbeatV2Result = null;
+                    if (isBrokerSupportV2 && null != brokerAddrHeartbeatFingerprintTable.get(addr) && brokerAddrHeartbeatFingerprintTable.get(addr) == currentHeartbeatFingerprint) {
+                        heartbeatV2Result = this.mQClientAPIImpl.sendHeartbeatV2(addr, heartbeatDataWithoutSub, 3000);

Review Comment:
   How to replace 3000 with clientConfig.getMqClientApiTimeout()?



##########
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java:
##########
@@ -563,6 +589,83 @@ private void sendHeartbeatToAllBroker() {
         }
     }
 
+    private void sendHeartbeatToAllBrokerV2(boolean isRebalance) {
+        final HeartbeatData heartbeatDataWithSub = this.prepareHeartbeatData(false);
+        final boolean producerEmpty = heartbeatDataWithSub.getProducerDataSet().isEmpty();
+        final boolean consumerEmpty = heartbeatDataWithSub.getConsumerDataSet().isEmpty();
+        if (producerEmpty && consumerEmpty) {
+            log.warn("sendHeartbeatToAllBrokerV2 sending heartbeat, but no consumer and no producer. [{}]", this.clientId);
+            return;
+        }
+        if (this.brokerAddrTable.isEmpty()) {
+            return;
+        }
+        if (isRebalance) {
+            resetBrokerAddrHeartbeatFingerprintMap();
+        }
+        long times = this.sendHeartbeatTimesTotal.getAndIncrement();
+        int currentHeartbeatFingerprint = heartbeatDataWithSub.computeHeartbeatFingerprint();
+        heartbeatDataWithSub.setHeartbeatFingerprint(currentHeartbeatFingerprint);
+        HeartbeatData heartbeatDataWithoutSub = this.prepareHeartbeatData(true);
+        heartbeatDataWithoutSub.setHeartbeatFingerprint(currentHeartbeatFingerprint);
+
+        for (Entry<String, HashMap<Long, String>> brokerClusterInfo : this.brokerAddrTable.entrySet()) {
+            String brokerName = brokerClusterInfo.getKey();
+            HashMap<Long, String> oneTable = brokerClusterInfo.getValue();
+            if (oneTable == null) {
+                continue;
+            }
+            for (Entry<Long, String> singleBrokerInstance : oneTable.entrySet()) {
+                Long id = singleBrokerInstance.getKey();
+                String addr = singleBrokerInstance.getValue();
+                if (addr == null) {
+                    continue;
+                }
+                if (consumerEmpty && MixAll.MASTER_ID != id) {
+                    continue;
+                }
+                try {
+                    int version = 0;
+                    boolean isBrokerSupportV2 = brokerSupportV2HeartbeatSet.contains(addr);
+                    HeartbeatV2Result heartbeatV2Result = null;
+                    if (isBrokerSupportV2 && null != brokerAddrHeartbeatFingerprintTable.get(addr) && brokerAddrHeartbeatFingerprintTable.get(addr) == currentHeartbeatFingerprint) {
+                        heartbeatV2Result = this.mQClientAPIImpl.sendHeartbeatV2(addr, heartbeatDataWithoutSub, 3000);
+                        if (heartbeatV2Result.isSubChange()) {
+                            brokerAddrHeartbeatFingerprintTable.remove(addr);
+                        }
+                        log.info("sendHeartbeatToAllBrokerV2 simple brokerName: {} subChange: {} brokerAddrHeartbeatFingerprintTable: {}", brokerName, heartbeatV2Result.isSubChange(), JSON.toJSONString(brokerAddrHeartbeatFingerprintTable));
+                    } else {
+                        heartbeatV2Result = this.mQClientAPIImpl.sendHeartbeatV2(addr, heartbeatDataWithSub, 3000);

Review Comment:
   Same as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] zk-drizzle commented on a diff in pull request #6724: [RIP-64] Heartbeat Optimization

Posted by "zk-drizzle (via GitHub)" <gi...@apache.org>.
zk-drizzle commented on code in PR #6724:
URL: https://github.com/apache/rocketmq/pull/6724#discussion_r1227571619


##########
broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java:
##########
@@ -138,6 +144,63 @@ public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand requ
         }
         response.setCode(ResponseCode.SUCCESS);
         response.setRemark(null);
+        response.addExtField(MixAll.IS_SUPPORT_HEART_BEAT_V2, Boolean.TRUE.toString());
+        response.addExtField(MixAll.IS_SUB_CHANGE, Boolean.TRUE.toString());
+        return response;
+    }
+
+    private RemotingCommand heartBeatV2(ChannelHandlerContext ctx, HeartbeatData heartbeatData, ClientChannelInfo clientChannelInfo, RemotingCommand response) {
+        boolean isSubChange = false;
+        for (ConsumerData consumerData : heartbeatData.getConsumerDataSet()) {
+            //Reject the PullConsumer
+            if (brokerController.getBrokerConfig().isRejectPullConsumerEnable()) {
+                if (ConsumeType.CONSUME_ACTIVELY == consumerData.getConsumeType()) {
+                    continue;
+                }
+            }
+            if (null != consumerGroupHeartbeatTable.get(consumerData.getGroupName()) && consumerGroupHeartbeatTable.get(consumerData.getGroupName()) != heartbeatData.getHeartbeatFingerprint()) {

Review Comment:
   Thank you for your review,  one line is also a kind of beauty~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on pull request #6724: [RIP-64] Heartbeat Optimization

Posted by "RongtongJin (via GitHub)" <gi...@apache.org>.
RongtongJin commented on PR #6724:
URL: https://github.com/apache/rocketmq/pull/6724#issuecomment-1590328978

   LGTM
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #6724: [RIP-64] Heartbeat Optimization

Posted by "fuyou001 (via GitHub)" <gi...@apache.org>.
fuyou001 commented on code in PR #6724:
URL: https://github.com/apache/rocketmq/pull/6724#discussion_r1228029295


##########
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java:
##########
@@ -120,6 +122,8 @@ public class MQClientInstance {
     private final ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable = new ConcurrentHashMap<>();
 
     private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable = new ConcurrentHashMap<>();
+    private final Set<String/* Broker address */> brokerSupportV2HeartbeatSet = new HashSet();

Review Comment:
   thread safe. use ConcurentHashMap



##########
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java:
##########
@@ -120,6 +122,8 @@ public class MQClientInstance {
     private final ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable = new ConcurrentHashMap<>();
 
     private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable = new ConcurrentHashMap<>();
+    private final Set<String/* Broker address */> brokerSupportV2HeartbeatSet = new HashSet();

Review Comment:
   not thread safe. use ConcurentHashMap



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] lizhimins commented on a diff in pull request #6724: [RIP-64] Heartbeat Optimization

Posted by "lizhimins (via GitHub)" <gi...@apache.org>.
lizhimins commented on code in PR #6724:
URL: https://github.com/apache/rocketmq/pull/6724#discussion_r1226383863


##########
broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java:
##########
@@ -138,6 +144,63 @@ public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand requ
         }
         response.setCode(ResponseCode.SUCCESS);
         response.setRemark(null);
+        response.addExtField(MixAll.IS_SUPPORT_HEART_BEAT_V2, Boolean.TRUE.toString());
+        response.addExtField(MixAll.IS_SUB_CHANGE, Boolean.TRUE.toString());
+        return response;
+    }
+
+    private RemotingCommand heartBeatV2(ChannelHandlerContext ctx, HeartbeatData heartbeatData, ClientChannelInfo clientChannelInfo, RemotingCommand response) {
+        boolean isSubChange = false;
+        for (ConsumerData consumerData : heartbeatData.getConsumerDataSet()) {
+            //Reject the PullConsumer
+            if (brokerController.getBrokerConfig().isRejectPullConsumerEnable()) {
+                if (ConsumeType.CONSUME_ACTIVELY == consumerData.getConsumeType()) {
+                    continue;
+                }
+            }
+            if (null != consumerGroupHeartbeatTable.get(consumerData.getGroupName()) && consumerGroupHeartbeatTable.get(consumerData.getGroupName()) != heartbeatData.getHeartbeatFingerprint()) {
+                isSubChange = true;
+            }
+            consumerGroupHeartbeatTable.put(consumerData.getGroupName(), heartbeatData.getHeartbeatFingerprint());
+            boolean hasOrderTopicSub = false;
+
+            for (final SubscriptionData subscriptionData : consumerData.getSubscriptionDataSet()) {
+                if (this.brokerController.getTopicConfigManager().isOrderTopic(subscriptionData.getTopic())) {
+                    hasOrderTopicSub = true;
+                    break;
+                }
+            }
+            SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(consumerData.getGroupName());
+            boolean isNotifyConsumerIdsChangedEnable = true;
+            if (null == subscriptionGroupConfig) {
+                continue;
+            }
+            isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
+            int topicSysFlag = 0;
+            if (consumerData.isUnitMode()) {
+                topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
+            }
+            String newTopic = MixAll.getRetryTopic(consumerData.getGroupName());
+            this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, subscriptionGroupConfig.getRetryQueueNums(), PermName.PERM_WRITE | PermName.PERM_READ, hasOrderTopicSub, topicSysFlag);
+            boolean changed = false;
+            if (heartbeatData.isWithoutSub()) {
+                changed = this.brokerController.getConsumerManager().registerConsumerWithoutSub(consumerData.getGroupName(), clientChannelInfo, consumerData.getConsumeType(), consumerData.getMessageModel(), consumerData.getConsumeFromWhere(), isNotifyConsumerIdsChangedEnable);

Review Comment:
   Same as before



##########
broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java:
##########
@@ -138,6 +144,63 @@ public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand requ
         }
         response.setCode(ResponseCode.SUCCESS);
         response.setRemark(null);
+        response.addExtField(MixAll.IS_SUPPORT_HEART_BEAT_V2, Boolean.TRUE.toString());
+        response.addExtField(MixAll.IS_SUB_CHANGE, Boolean.TRUE.toString());
+        return response;
+    }
+
+    private RemotingCommand heartBeatV2(ChannelHandlerContext ctx, HeartbeatData heartbeatData, ClientChannelInfo clientChannelInfo, RemotingCommand response) {
+        boolean isSubChange = false;
+        for (ConsumerData consumerData : heartbeatData.getConsumerDataSet()) {
+            //Reject the PullConsumer
+            if (brokerController.getBrokerConfig().isRejectPullConsumerEnable()) {
+                if (ConsumeType.CONSUME_ACTIVELY == consumerData.getConsumeType()) {
+                    continue;
+                }
+            }
+            if (null != consumerGroupHeartbeatTable.get(consumerData.getGroupName()) && consumerGroupHeartbeatTable.get(consumerData.getGroupName()) != heartbeatData.getHeartbeatFingerprint()) {

Review Comment:
   Suggest line breaks for better codestyle



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org