You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/03/21 02:10:11 UTC

[rocketmq] branch 4.9.x updated: [ISSUE #6411] Cherry pick some bug fixes from develop to 4.9.x branch (#6410)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch 4.9.x
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/4.9.x by this push:
     new ece624928 [ISSUE #6411] Cherry pick some bug fixes from develop to 4.9.x branch (#6410)
ece624928 is described below

commit ece62492803eb3c3c1256f3fa108c60b1ec2350f
Author: rongtong <ji...@163.com>
AuthorDate: Tue Mar 21 10:10:00 2023 +0800

    [ISSUE #6411] Cherry pick some bug fixes from develop to 4.9.x branch (#6410)
    
    * [ISSUE #4841] Fix the reply message is the original request msg. (#4896)
    
    (cherry picked from commit f7c2072a27773d5feefeb5cb6cd5dead0450e3b7)
    
    * [ISSUE #4647] Update CommandLineOption secretKey to necessary (#4648)
    
    * Ignore the unstable updateGlobalWhiteRemoteAddressesTest in the arm environment of CI
    
    * [ISSUE #4647] udpate option to necessary
    
    Co-authored-by: RongtongJin <ji...@mails.ucas.ac.cn>
    (cherry picked from commit a3aef179f7d07ef0425debff40041f81f1bf94dd)
    
    * [ISSUE #4644] Fix mqadmin deleteTopic bug when command exec on slave broker
    
    (cherry picked from commit 3a1d1a7e60f9049fb7e5048a9f08cdcdbf996601)
    
    * [ISSUE #5424] Fix null exception and array overflow bug exist in getHalfMsg method #5425
    
    (cherry picked from commit dd4b66643581c962db0cfb3951a893792f4e349e)
    
    * [ISSUE #4841] Fix the reply message is the original request msg. (#4896)
    
    (cherry picked from commit f7c2072a27773d5feefeb5cb6cd5dead0450e3b7)
    
    * [ISSUE #4584] Add new persist method to update consume offset to remote server.
    
    (cherry picked from commit 2930994a37e065b946977d74f4ad3966f8963267)
    
    * Prepare to release 4.9.5
    
    ---------
    
    Co-authored-by: echooymxq <ec...@gmail.com>
    Co-authored-by: Oliver <wq...@163.com>
    Co-authored-by: JanWarlen <ja...@gmail.com>
    Co-authored-by: RapperCL <44...@users.noreply.github.com>
    Co-authored-by: dinglei <li...@163.com>
---
 .../queue/TransactionalMessageServiceImpl.java     | 14 +++++-----
 .../client/consumer/DefaultMQPullConsumer.java     |  4 +++
 .../impl/producer/DefaultMQProducerImpl.java       |  1 -
 .../client/producer/DefaultMQProducerTest.java     | 31 ++++++++++++++++------
 .../java/org/apache/rocketmq/common/MQVersion.java |  2 +-
 .../command/acl/UpdateAccessConfigSubCommand.java  |  2 +-
 .../tools/command/topic/DeleteTopicSubCommand.java |  4 +--
 7 files changed, 39 insertions(+), 19 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
index 3f244f4a2..7bf316992 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
@@ -439,14 +439,16 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
 
     private GetResult getHalfMsg(MessageQueue messageQueue, long offset) {
         GetResult getResult = new GetResult();
-
+    
         PullResult result = pullHalfMsg(messageQueue, offset, PULL_MSG_RETRY_NUMBER);
-        getResult.setPullResult(result);
-        List<MessageExt> messageExts = result.getMsgFoundList();
-        if (messageExts == null) {
-            return getResult;
+        if (result != null) {
+            getResult.setPullResult(result);
+            List<MessageExt> messageExts = result.getMsgFoundList();
+            if (messageExts == null || messageExts.size() == 0) {
+                return getResult;
+            }
+            getResult.setMsg(messageExts.get(0));
         }
-        getResult.setMsg(messageExts.get(0));
         return getResult;
     }
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index 5e2138e81..206ee660e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -452,4 +452,8 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
     public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
         this.maxReconsumeTimes = maxReconsumeTimes;
     }
+
+    public void persist(MessageQueue mq) {
+        this.getOffsetStore().persist(queueWithNamespace(mq));
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 6f3c59753..d50672e47 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -1381,7 +1381,6 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                 @Override
                 public void onSuccess(SendResult sendResult) {
                     requestResponseFuture.setSendRequestOk(true);
-                    requestResponseFuture.putResponseMessage(msg);
                 }
 
                 @Override
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index 5ce761c30..a7f04731c 100644
--- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -42,6 +42,7 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
 import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
 import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
@@ -368,7 +369,8 @@ public class DefaultMQProducerTest {
         when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
         final AtomicBoolean finish = new AtomicBoolean(false);
         new Thread(new Runnable() {
-            @Override public void run() {
+            @Override
+            public void run() {
                 ConcurrentHashMap<String, RequestResponseFuture> responseMap = RequestFutureHolder.getInstance().getRequestFutureTable();
                 assertThat(responseMap).isNotNull();
                 while (!finish.get()) {
@@ -376,15 +378,19 @@ public class DefaultMQProducerTest {
                         Thread.sleep(10);
                     } catch (InterruptedException e) {
                     }
+                    MessageExt responseMsg = new MessageExt();
+                    responseMsg.setTopic(message.getTopic());
+                    responseMsg.setBody(message.getBody());
                     for (Map.Entry<String, RequestResponseFuture> entry : responseMap.entrySet()) {
                         RequestResponseFuture future = entry.getValue();
-                        future.putResponseMessage(message);
+                        future.putResponseMessage(responseMsg);
                     }
                 }
             }
         }).start();
         Message result = producer.request(message, 3 * 1000L);
         finish.getAndSet(true);
+        assertThat(result).isExactlyInstanceOf(MessageExt.class);
         assertThat(result.getTopic()).isEqualTo("FooBar");
         assertThat(result.getBody()).isEqualTo(new byte[] {'a'});
     }
@@ -400,24 +406,31 @@ public class DefaultMQProducerTest {
         when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
         final CountDownLatch countDownLatch = new CountDownLatch(1);
         RequestCallback requestCallback = new RequestCallback() {
-            @Override public void onSuccess(Message message) {
+            @Override
+            public void onSuccess(Message message) {
+                assertThat(message).isExactlyInstanceOf(MessageExt.class);
                 assertThat(message.getTopic()).isEqualTo("FooBar");
                 assertThat(message.getBody()).isEqualTo(new byte[] {'a'});
                 assertThat(message.getFlag()).isEqualTo(1);
                 countDownLatch.countDown();
             }
 
-            @Override public void onException(Throwable e) {
+            @Override
+            public void onException(Throwable e) {
             }
         };
         producer.request(message, requestCallback, 3 * 1000L);
         ConcurrentHashMap<String, RequestResponseFuture> responseMap = RequestFutureHolder.getInstance().getRequestFutureTable();
         assertThat(responseMap).isNotNull();
+
+        MessageExt responseMsg = new MessageExt();
+        responseMsg.setTopic(message.getTopic());
+        responseMsg.setBody(message.getBody());
+        responseMsg.setFlag(1);
         for (Map.Entry<String, RequestResponseFuture> entry : responseMap.entrySet()) {
             RequestResponseFuture future = entry.getValue();
             future.setSendRequestOk(true);
-            message.setFlag(1);
-            future.getRequestCallback().onSuccess(message);
+            future.getRequestCallback().onSuccess(responseMsg);
         }
         countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
     }
@@ -427,11 +440,13 @@ public class DefaultMQProducerTest {
         final AtomicInteger cc = new AtomicInteger(0);
         final CountDownLatch countDownLatch = new CountDownLatch(1);
         RequestCallback requestCallback = new RequestCallback() {
-            @Override public void onSuccess(Message message) {
+            @Override
+            public void onSuccess(Message message) {
 
             }
 
-            @Override public void onException(Throwable e) {
+            @Override
+            public void onException(Throwable e) {
                 cc.incrementAndGet();
                 countDownLatch.countDown();
             }
diff --git a/common/src/main/java/org/apache/rocketmq/common/MQVersion.java b/common/src/main/java/org/apache/rocketmq/common/MQVersion.java
index 60ab6f8cb..08ad97f66 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MQVersion.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MQVersion.java
@@ -18,7 +18,7 @@ package org.apache.rocketmq.common;
 
 public class MQVersion {
 
-    public static final int CURRENT_VERSION = Version.V4_9_4.ordinal();
+    public static final int CURRENT_VERSION = Version.V4_9_5.ordinal();
 
     public static String getVersionDesc(int value) {
         int length = Version.values().length;
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java
index 465a2db58..c246b66d5 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java
@@ -61,7 +61,7 @@ public class UpdateAccessConfigSubCommand implements SubCommand {
         options.addOption(opt);
 
         opt = new Option("s", "secretKey", true, "set secretKey in acl config file");
-        opt.setRequired(false);
+        opt.setRequired(true);
         options.addOption(opt);
 
         opt = new Option("w", "whiteRemoteAddress", true, "set white ip Address for account in acl config file");
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommand.java
index aa5a5ead9..d9bc6082f 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommand.java
@@ -38,8 +38,8 @@ public class DeleteTopicSubCommand implements SubCommand {
         final String topic
     ) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
 
-        Set<String> brokerAddressSet = CommandUtil.fetchMasterAndSlaveAddrByClusterName(adminExt, clusterName);
-        adminExt.deleteTopicInBroker(brokerAddressSet, topic);
+        Set<String> masterBrokerAddressSet = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName);
+        adminExt.deleteTopicInBroker(masterBrokerAddressSet, topic);
         System.out.printf("delete topic [%s] from cluster [%s] success.%n", topic, clusterName);
 
         Set<String> nameServerSet = null;