You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2023/02/27 04:56:34 UTC
[rocketmq] branch develop updated: [ISSUE #6192] Set a default value when UniqID is empty in Proxy (#6193)
This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 90bf88634 [ISSUE #6192] Set a default value when UniqID is empty in Proxy (#6193)
90bf88634 is described below
commit 90bf886340215b1c45e43bf740c67317fdf9665e
Author: lk <xd...@outlook.com>
AuthorDate: Mon Feb 27 12:56:26 2023 +0800
[ISSUE #6192] Set a default value when UniqID is empty in Proxy (#6193)
---
.../apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java | 2 +-
.../apache/rocketmq/proxy/processor/ConsumerProcessor.java | 12 ++++++++++++
.../apache/rocketmq/proxy/processor/ProducerProcessor.java | 4 ++++
3 files changed, 17 insertions(+), 1 deletion(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java
index 96a214750..21526054a 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java
@@ -139,7 +139,7 @@ public class GrpcConverter {
}
// message_id
- String uniqKey = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+ String uniqKey = messageExt.getMsgId();
if (uniqKey != null) {
systemPropertiesBuilder.setMessageId(uniqKey);
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
index 37c2e54d6..d67f4b855 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
@@ -34,6 +34,8 @@ import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageClientExt;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
@@ -144,6 +146,7 @@ public class ConsumerProcessor extends AbstractProcessor {
List<MessageExt> messageExtList = new ArrayList<>();
for (MessageExt messageExt : popResult.getMsgFoundList()) {
try {
+ fillUniqIDIfNeed(messageExt);
String handleString = createHandle(messageExt.getProperty(MessageConst.PROPERTY_POP_CK), messageExt.getCommitLogOffset());
if (handleString == null) {
log.error("[BUG] pop message from broker but handle is empty. requestHeader:{}, msg:{}", requestHeader, messageExt);
@@ -193,6 +196,15 @@ public class ConsumerProcessor extends AbstractProcessor {
return FutureUtils.addExecutor(future, this.executor);
}
+ private void fillUniqIDIfNeed(MessageExt messageExt) {
+ if (StringUtils.isBlank(MessageClientIDSetter.getUniqID(messageExt))) {
+ if (messageExt instanceof MessageClientExt) {
+ MessageClientExt clientExt = (MessageClientExt) messageExt;
+ MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, clientExt.getOffsetMsgId());
+ }
+ }
+ }
+
public CompletableFuture<AckResult> ackMessage(
ProxyContext ctx,
ReceiptHandle handle,
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
index 2fce78d31..749f9da2b 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageId;
@@ -84,6 +85,9 @@ public class ProducerProcessor extends AbstractProcessor {
throw new ProxyException(ProxyExceptionCode.FORBIDDEN, "no writable queue");
}
+ for (Message msg : messageList) {
+ MessageClientIDSetter.setUniqID(msg);
+ }
SendMessageRequestHeader requestHeader = buildSendMessageRequestHeader(messageList, producerGroup, sysFlag, messageQueue.getQueueId());
future = this.serviceManager.getMessageService().sendMessage(