You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2023/02/27 04:56:34 UTC

[rocketmq] branch develop updated: [ISSUE #6192] Set a default value when UniqID is empty in Proxy (#6193)

This is an automated email from the ASF dual-hosted git repository.

kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 90bf88634 [ISSUE #6192] Set a default value when UniqID is empty in Proxy (#6193)
90bf88634 is described below

commit 90bf886340215b1c45e43bf740c67317fdf9665e
Author: lk <xd...@outlook.com>
AuthorDate: Mon Feb 27 12:56:26 2023 +0800

    [ISSUE #6192] Set a default value when UniqID is empty in Proxy (#6193)
---
 .../apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java  |  2 +-
 .../apache/rocketmq/proxy/processor/ConsumerProcessor.java   | 12 ++++++++++++
 .../apache/rocketmq/proxy/processor/ProducerProcessor.java   |  4 ++++
 3 files changed, 17 insertions(+), 1 deletion(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java
index 96a214750..21526054a 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java
@@ -139,7 +139,7 @@ public class GrpcConverter {
         }
 
         // message_id
-        String uniqKey = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+        String uniqKey = messageExt.getMsgId();
         if (uniqKey != null) {
             systemPropertiesBuilder.setMessageId(uniqKey);
         }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
index 37c2e54d6..d67f4b855 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
@@ -34,6 +34,8 @@ import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.consumer.ReceiptHandle;
 import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageClientExt;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
@@ -144,6 +146,7 @@ public class ConsumerProcessor extends AbstractProcessor {
                         List<MessageExt> messageExtList = new ArrayList<>();
                         for (MessageExt messageExt : popResult.getMsgFoundList()) {
                             try {
+                                fillUniqIDIfNeed(messageExt);
                                 String handleString = createHandle(messageExt.getProperty(MessageConst.PROPERTY_POP_CK), messageExt.getCommitLogOffset());
                                 if (handleString == null) {
                                     log.error("[BUG] pop message from broker but handle is empty. requestHeader:{}, msg:{}", requestHeader, messageExt);
@@ -193,6 +196,15 @@ public class ConsumerProcessor extends AbstractProcessor {
         return FutureUtils.addExecutor(future, this.executor);
     }
 
+    private void fillUniqIDIfNeed(MessageExt messageExt) {
+        if (StringUtils.isBlank(MessageClientIDSetter.getUniqID(messageExt))) {
+            if (messageExt instanceof MessageClientExt) {
+                MessageClientExt clientExt = (MessageClientExt) messageExt;
+                MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, clientExt.getOffsetMsgId());
+            }
+        }
+    }
+
     public CompletableFuture<AckResult> ackMessage(
         ProxyContext ctx,
         ReceiptHandle handle,
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
index 2fce78d31..749f9da2b 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.consumer.ReceiptHandle;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageId;
@@ -84,6 +85,9 @@ public class ProducerProcessor extends AbstractProcessor {
                 throw new ProxyException(ProxyExceptionCode.FORBIDDEN, "no writable queue");
             }
 
+            for (Message msg : messageList) {
+                MessageClientIDSetter.setUniqID(msg);
+            }
             SendMessageRequestHeader requestHeader = buildSendMessageRequestHeader(messageList, producerGroup, sysFlag, messageQueue.getQueueId());
 
             future = this.serviceManager.getMessageService().sendMessage(