You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/09/07 11:21:03 UTC

[incubator-tubemq] branch TUBEMQ-314 updated: [TUBEMQ-334]Optimize BrokerServiceServer and OffsetService classes logic (#249)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch TUBEMQ-314
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/TUBEMQ-314 by this push:
     new 05ca56c  [TUBEMQ-334]Optimize BrokerServiceServer and OffsetService classes logic (#249)
05ca56c is described below

commit 05ca56c7bb5acb59fda47885a89c2a0eb26aab06
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Sep 7 19:20:52 2020 +0800

    [TUBEMQ-334]Optimize BrokerServiceServer and OffsetService classes logic (#249)
    
    Co-authored-by: gosonzhang <go...@tencent.com>
---
 .../tubemq/server/broker/BrokerServiceServer.java  | 39 +++++------
 .../server/broker/offset/DefaultOffsetManager.java | 77 +++++++++++-----------
 .../tubemq/server/broker/offset/OffsetService.java | 16 ++---
 .../server/broker/web/BrokerAdminServlet.java      |  3 +-
 4 files changed, 67 insertions(+), 68 deletions(-)

diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
index 1d14fcd..4ccfc56 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
@@ -245,14 +245,13 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
     /***
      * Get consumer's register time.
      *
-     * @param consumerId
-     * @param partitionStr
+     * @param heartbeatNodeId
      * @return
      */
-    public Long getConsumerRegisterTime(String consumerId, String partitionStr) {
+    public Long getConsumerRegisterTime(String heartbeatNodeId) {
         TimeoutInfo timeoutInfo =
                 heartbeatManager.getConsumerRegMap()
-                        .get(getHeartbeatNodeId(consumerId, partitionStr));
+                        .get(heartbeatNodeId);
         if (timeoutInfo == null) {
             return null;
         }
@@ -342,7 +341,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
         }
         String rmtAddrInfo = consumerNodeInfo.getRmtAddrInfo();
         try {
-            heartbeatManager.updConsumerNode(getHeartbeatNodeId(clientId, partStr));
+            heartbeatManager.updConsumerNode(consumerNodeInfo.getHeartbeatNodeId());
         } catch (HeartbeatException e) {
             logger.warn(strBuffer.append("[Invalid Request]").append(clientId)
                     .append(TokenConstants.SEGMENT_SEP).append(topicName)
@@ -417,7 +416,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
      * Query offset, then read data.
      *
      * @param msgStore
-     * @param consumerNodeInfo
+     * @param nodeInfo
      * @param group
      * @param topic
      * @param partitionId
@@ -432,25 +431,25 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
      * @throws IOException
      */
     private GetMessageResult getMessages(final MessageStore msgStore,
-                                         final ConsumerNodeInfo consumerNodeInfo,
+                                         final ConsumerNodeInfo nodeInfo,
                                          final String group, final String topic,
                                          final int partitionId, final boolean lastConsumed,
                                          final boolean isManualCommitOffset, final String sentAddr,
                                          final String brokerAddr, final String rmtAddrInfo,
                                          boolean isEscFlowCtrl, final StringBuilder sb) throws IOException {
         long requestOffset =
-                offsetManager.getOffset(msgStore, group, topic,
-                        partitionId, isManualCommitOffset, lastConsumed, sb);
+                offsetManager.getOffset(msgStore, nodeInfo,
+                        isManualCommitOffset, lastConsumed, sb);
         if (requestOffset < 0) {
             return new GetMessageResult(false, TErrCodeConstants.NOT_FOUND,
                     -requestOffset, 0, "The request offset reached maxOffset!");
         }
         final long maxDataOffset = msgStore.getDataMaxOffset();
-        int reqSwitch = getRealQryPriorityId(consumerNodeInfo);
+        int reqSwitch = getRealQryPriorityId(nodeInfo);
         int msgDataSizeLimit =
-                consumerNodeInfo.getAllowedQuota(maxDataOffset, isEscFlowCtrl);
+                nodeInfo.getAllowedQuota(maxDataOffset, isEscFlowCtrl);
         if (msgDataSizeLimit <= 0) {
-            if (consumerNodeInfo.isSpLimit()) {
+            if (nodeInfo.isSpLimit()) {
                 return new GetMessageResult(false, TErrCodeConstants.SERVER_CONSUME_SPEED_LIMIT,
                         requestOffset, 0, (-msgDataSizeLimit), "RpcServer consume speed limit!");
             } else {
@@ -465,8 +464,8 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
             sb.delete(0, sb.length());
             GetMessageResult msgQueryResult =
                     msgStore.getMessages(reqSwitch, requestOffset,
-                            partitionId, consumerNodeInfo, baseKey, msgDataSizeLimit);
-            offsetManager.bookOffset(group, topic, partitionId,
+                            partitionId, nodeInfo, baseKey, msgDataSizeLimit);
+            offsetManager.bookOffset(nodeInfo,
                     msgQueryResult.lastReadOffset, isManualCommitOffset,
                     msgQueryResult.transferedMessageList.isEmpty(), sb);
             msgQueryResult.setWaitTime(maxDataOffset - msgQueryResult.lastRdDataOffset);
@@ -870,10 +869,8 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
                 return builder.build();
             }
             OffsetStorageInfo offsetInfo =
-                    offsetManager.loadOffset(dataStore, newNodeInfo.getGroupName(),
-                            newNodeInfo.getTopicName(), request.getPartitionId(),
-                            request.getReadStatus(), newNodeInfo.getLeftOffset(),
-                            strBuffer);
+                    offsetManager.loadOffset(dataStore, newNodeInfo,
+                            request.getReadStatus(), strBuffer);
             logger.info(strBuffer.append("[Consumer Register]")
                     .append(newNodeInfo.getConsumerId())
                     .append(TokenConstants.SEGMENT_SEP).append(newNodeInfo.getPartStr())
@@ -954,8 +951,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
                     offsetManager.commitOffset(curNodeInfo.getGroupName(),
                             curNodeInfo.getTopicName(), curNodeInfo.getPartitionId(), isConsumed);
             consumerRegisterMap.remove(curNodeInfo.getPartStr());
-            heartbeatManager.unRegConsumerNode(
-                    getHeartbeatNodeId(curNodeInfo.getConsumerId(), curNodeInfo.getPartStr()));
+            heartbeatManager.unRegConsumerNode(curNodeInfo.getHeartbeatNodeId());
             logger.info(strBuffer.append("[Consumer Unregister]")
                     .append(curNodeInfo.getConsumerId()).append(" unregistered topic-partition=")
                     .append(curNodeInfo.getOffsetCacheKey()).append(", updated Offset=")
@@ -1067,8 +1063,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
                 isAuthorized = true;
             }
             try {
-                heartbeatManager.updConsumerNode(
-                        getHeartbeatNodeId(clientId, partStr));
+                heartbeatManager.updConsumerNode(consumerNodeInfo.getHeartbeatNodeId());
             } catch (HeartbeatException e) {
                 failureInfo.add(strBuffer.append(TErrCodeConstants.HB_NO_NODE)
                         .append(TokenConstants.ATTR_SEP)
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
index 82a758c..8e3e4d5 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
@@ -24,6 +24,7 @@ import org.apache.tubemq.corebase.daemon.AbstractDaemonService;
 import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.server.broker.BrokerConfig;
 import org.apache.tubemq.server.broker.msgstore.MessageStore;
+import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
 import org.apache.tubemq.server.broker.utils.DataStoreUtils;
 import org.apache.tubemq.server.common.offsetstorage.OffsetStorage;
 import org.apache.tubemq.server.common.offsetstorage.OffsetStorageInfo;
@@ -86,27 +87,26 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
      * Load offset.
      *
      * @param msgStore
-     * @param group
-     * @param topic
-     * @param partitionId
+     * @param nodeInfo
      * @param readStatus
-     * @param reqOffset
      * @param sBuilder
      * @return
      */
     @Override
-    public OffsetStorageInfo loadOffset(final MessageStore msgStore, final String group,
-                                        final String topic, int partitionId, int readStatus,
-                                        long reqOffset, final StringBuilder sBuilder) {
+    public OffsetStorageInfo loadOffset(final MessageStore msgStore,
+                                        final ConsumerNodeInfo nodeInfo,
+                                        int readStatus, final StringBuilder sBuilder) {
         OffsetStorageInfo regInfo;
+        long reqOffset = nodeInfo.getLeftOffset();
         long indexMaxOffset = msgStore.getIndexMaxOffset();
         long indexMinOffset = msgStore.getIndexMinOffset();
         long defOffset =
                 (readStatus == TBaseConstants.CONSUME_MODEL_READ_NORMAL)
                         ? indexMinOffset : indexMaxOffset;
-        String offsetCacheKey = getOffsetCacheKey(topic, partitionId);
-        regInfo = loadOrCreateOffset(group, topic, partitionId, offsetCacheKey, defOffset);
-        getAndResetTmpOffset(group, offsetCacheKey);
+        regInfo = loadOrCreateOffset(nodeInfo.getGroupName(),
+                nodeInfo.getTopicName(), nodeInfo.getPartitionId(),
+                nodeInfo.getOffsetCacheKey(), defOffset);
+        getAndResetTmpOffset(nodeInfo.getGroupName(), nodeInfo.getOffsetCacheKey());
         final long curOffset = regInfo.getOffset();
         final boolean isFirstCreate = regInfo.isFirstCreate();
         if ((reqOffset >= 0)
@@ -138,8 +138,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
                 .append(",current offset=").append(regInfo.getOffset())
                 .append(",maxOffset=").append(indexMaxOffset)
                 .append(",offset delta=").append(indexMaxOffset - regInfo.getOffset())
-                .append(",group=").append(group).append(",topic=").append(topic)
-                .append(",partitionId=").append(partitionId).toString());
+                .append(",part_str=").append(nodeInfo.getPartStr()).toString());
         sBuilder.delete(0, sBuilder.length());
         return regInfo;
     }
@@ -148,28 +147,29 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
      * Get offset by parameters.
      *
      * @param msgStore
-     * @param group
-     * @param topic
-     * @param partitionId
+     * @param nodeInfo
      * @param isManCommit
      * @param lastConsumed
      * @param sb
      * @return
      */
     @Override
-    public long getOffset(final MessageStore msgStore, final String group,
-                          final String topic, int partitionId,
+    public long getOffset(final MessageStore msgStore,
+                          final ConsumerNodeInfo nodeInfo,
                           boolean isManCommit, boolean lastConsumed,
                           final StringBuilder sb) {
-        String offsetCacheKey = getOffsetCacheKey(topic, partitionId);
         OffsetStorageInfo regInfo =
-                loadOrCreateOffset(group, topic, partitionId, offsetCacheKey, 0);
+                loadOrCreateOffset(nodeInfo.getGroupName(),
+                        nodeInfo.getTopicName(), nodeInfo.getPartitionId(),
+                        nodeInfo.getOffsetCacheKey(), 0);
         long requestOffset = regInfo.getOffset();
         if (isManCommit) {
-            requestOffset = requestOffset + getTmpOffset(group, topic, partitionId);
+            requestOffset = requestOffset + getTmpOffset(nodeInfo.getGroupName(),
+                    nodeInfo.getTopicName(), nodeInfo.getPartitionId());
         } else {
             if (lastConsumed) {
-                requestOffset = commitOffset(group, topic, partitionId, true);
+                requestOffset = commitOffset(nodeInfo.getGroupName(),
+                        nodeInfo.getTopicName(), nodeInfo.getPartitionId(), true);
             }
         }
         final long maxOffset = msgStore.getIndexMaxOffset();
@@ -179,12 +179,12 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
                 logger.warn(sb
                         .append("[Offset Manager] Offset is bigger than current max offset, reset! requestOffset=")
                         .append(requestOffset).append(",maxOffset=").append(maxOffset)
-                        .append(",group=").append(group).append(",topic=").append(topic)
-                        .append(",partitionId=").append(partitionId).toString());
+                        .append(", part_str=").append(nodeInfo.getPartStr()).toString());
                 sb.delete(0, sb.length());
-                setTmpOffset(group, offsetCacheKey, maxOffset - requestOffset);
+                setTmpOffset(nodeInfo.getGroupName(), nodeInfo.getOffsetCacheKey(), maxOffset - requestOffset);
                 if (!isManCommit) {
-                    requestOffset = commitOffset(group, topic, partitionId, true);
+                    requestOffset = commitOffset(nodeInfo.getGroupName(),
+                            nodeInfo.getTopicName(), nodeInfo.getPartitionId(), true);
                 }
             }
             return -requestOffset;
@@ -192,11 +192,12 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
             logger.warn(sb
                     .append("[Offset Manager] Offset is lower than current min offset, reset! requestOffset=")
                     .append(requestOffset).append(",minOffset=").append(minOffset)
-                    .append(",group=").append(group).append(",topic=").append(topic)
-                    .append(",partitionId=").append(partitionId).toString());
+                    .append(", part_str=").append(nodeInfo.getPartStr()).toString());
             sb.delete(0, sb.length());
-            setTmpOffset(group, offsetCacheKey, minOffset - requestOffset);
-            requestOffset = commitOffset(group, topic, partitionId, true);
+            setTmpOffset(nodeInfo.getGroupName(),
+                    nodeInfo.getOffsetCacheKey(), minOffset - requestOffset);
+            requestOffset = commitOffset(nodeInfo.getGroupName(),
+                    nodeInfo.getTopicName(), nodeInfo.getPartitionId(), true);
         }
         return requestOffset;
     }
@@ -210,20 +211,22 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
     }
 
     @Override
-    public void bookOffset(final String group, final String topic, int partitionId,
-                           int readDalt, boolean isManCommit, boolean isMsgEmpty,
-                           final StringBuilder sb) {
+    public void bookOffset(final ConsumerNodeInfo nodeInfo,
+                           int readDalt, boolean isManCommit,
+                           boolean isMsgEmpty, final StringBuilder sb) {
         if (readDalt == 0) {
             return;
         }
-        String offsetCacheKey = getOffsetCacheKey(topic, partitionId);
         if (isManCommit) {
-            long tmpOffset = getTmpOffset(group, topic, partitionId);
-            setTmpOffset(group, offsetCacheKey, readDalt + tmpOffset);
+            long tmpOffset = getTmpOffset(nodeInfo.getGroupName(),
+                    nodeInfo.getTopicName(), nodeInfo.getPartitionId());
+            setTmpOffset(nodeInfo.getGroupName(),
+                    nodeInfo.getOffsetCacheKey(), readDalt + tmpOffset);
         } else {
-            setTmpOffset(group, offsetCacheKey, readDalt);
+            setTmpOffset(nodeInfo.getGroupName(), nodeInfo.getOffsetCacheKey(), readDalt);
             if (isMsgEmpty) {
-                commitOffset(group, topic, partitionId, true);
+                commitOffset(nodeInfo.getGroupName(),
+                        nodeInfo.getTopicName(), nodeInfo.getPartitionId(), true);
             }
         }
     }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
index 066fb3b..d39866c 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
@@ -18,6 +18,7 @@
 package org.apache.tubemq.server.broker.offset;
 
 import org.apache.tubemq.server.broker.msgstore.MessageStore;
+import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
 import org.apache.tubemq.server.common.offsetstorage.OffsetStorageInfo;
 
 /***
@@ -27,20 +28,19 @@ public interface OffsetService {
 
     void close(long waitTimeMs);
 
-    OffsetStorageInfo loadOffset(final MessageStore store, final String group,
-                                 final String topic, int partitionId,
-                                 int readStatus, long reqOffset,
-                                 final StringBuilder sb);
+    OffsetStorageInfo loadOffset(final MessageStore store,
+                                 final ConsumerNodeInfo nodeInfo,
+                                 int readStatus, final StringBuilder sb);
 
-    long getOffset(final MessageStore msgStore, final String group,
-                   final String topic, int partitionId,
+    long getOffset(final MessageStore msgStore,
+                   final ConsumerNodeInfo nodeInfo,
                    boolean isManCommit, boolean lastConsumed,
                    final StringBuilder sb);
 
     long getOffset(String group, String topic, int partitionId);
 
-    void bookOffset(final String group, final String topic, int partitionId,
-                    int readDalt, boolean isManCommit, boolean isMsgEmpty,
+    void bookOffset(final ConsumerNodeInfo nodeInfo, int readDalt,
+                    boolean isManCommit, boolean isMsgEmpty,
                     final StringBuilder sb);
 
     long commitOffset(final String group, final String topic,
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
index 009c219..9f10246 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -154,7 +154,8 @@ public class BrokerAdminServlet extends HttpServlet {
                     .append(groupName).append("\",\"topicName\":\"").append(topicName)
                     .append("\",\"partitionId\":").append(partitionId);
             Long regTime =
-                    broker.getBrokerServiceServer().getConsumerRegisterTime(consumerId, entry.getKey());
+                    broker.getBrokerServiceServer()
+                            .getConsumerRegisterTime(entry.getValue().getHeartbeatNodeId());
             if (regTime == null || regTime <= 0) {
                 sBuilder.append(",\"consumerId\":\"").append(consumerId)
                         .append("\",\"isRegOk\":false")