You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/03/21 02:10:11 UTC
[rocketmq] branch 4.9.x updated: [ISSUE #6411] Cherry pick some bug fixes from develop to 4.9.x branch (#6410)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch 4.9.x
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/4.9.x by this push:
new ece624928 [ISSUE #6411] Cherry pick some bug fixes from develop to 4.9.x branch (#6410)
ece624928 is described below
commit ece62492803eb3c3c1256f3fa108c60b1ec2350f
Author: rongtong <ji...@163.com>
AuthorDate: Tue Mar 21 10:10:00 2023 +0800
[ISSUE #6411] Cherry pick some bug fixes from develop to 4.9.x branch (#6410)
* [ISSUE #4841] Fix the reply message is the original request msg. (#4896)
(cherry picked from commit f7c2072a27773d5feefeb5cb6cd5dead0450e3b7)
* [ISSUE #4647] Update CommandLineOption secretKey to necessary (#4648)
* Ignore the unstable updateGlobalWhiteRemoteAddressesTest in the arm environment of CI
* [ISSUE #4647] udpate option to necessary
Co-authored-by: RongtongJin <ji...@mails.ucas.ac.cn>
(cherry picked from commit a3aef179f7d07ef0425debff40041f81f1bf94dd)
* [ISSUE #4644] Fix mqadmin deleteTopic bug when command exec on slave broker
(cherry picked from commit 3a1d1a7e60f9049fb7e5048a9f08cdcdbf996601)
* [ISSUE #5424] Fix null exception and array overflow bug exist in getHalfMsg method #5425
(cherry picked from commit dd4b66643581c962db0cfb3951a893792f4e349e)
* [ISSUE #4841] Fix the reply message is the original request msg. (#4896)
(cherry picked from commit f7c2072a27773d5feefeb5cb6cd5dead0450e3b7)
* [ISSUE #4584] Add new persist method to update consume offset to remote server.
(cherry picked from commit 2930994a37e065b946977d74f4ad3966f8963267)
* Prepare to release 4.9.5
---------
Co-authored-by: echooymxq <ec...@gmail.com>
Co-authored-by: Oliver <wq...@163.com>
Co-authored-by: JanWarlen <ja...@gmail.com>
Co-authored-by: RapperCL <44...@users.noreply.github.com>
Co-authored-by: dinglei <li...@163.com>
---
.../queue/TransactionalMessageServiceImpl.java | 14 +++++-----
.../client/consumer/DefaultMQPullConsumer.java | 4 +++
.../impl/producer/DefaultMQProducerImpl.java | 1 -
.../client/producer/DefaultMQProducerTest.java | 31 ++++++++++++++++------
.../java/org/apache/rocketmq/common/MQVersion.java | 2 +-
.../command/acl/UpdateAccessConfigSubCommand.java | 2 +-
.../tools/command/topic/DeleteTopicSubCommand.java | 4 +--
7 files changed, 39 insertions(+), 19 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
index 3f244f4a2..7bf316992 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
@@ -439,14 +439,16 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
private GetResult getHalfMsg(MessageQueue messageQueue, long offset) {
GetResult getResult = new GetResult();
-
+
PullResult result = pullHalfMsg(messageQueue, offset, PULL_MSG_RETRY_NUMBER);
- getResult.setPullResult(result);
- List<MessageExt> messageExts = result.getMsgFoundList();
- if (messageExts == null) {
- return getResult;
+ if (result != null) {
+ getResult.setPullResult(result);
+ List<MessageExt> messageExts = result.getMsgFoundList();
+ if (messageExts == null || messageExts.size() == 0) {
+ return getResult;
+ }
+ getResult.setMsg(messageExts.get(0));
}
- getResult.setMsg(messageExts.get(0));
return getResult;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index 5e2138e81..206ee660e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -452,4 +452,8 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
this.maxReconsumeTimes = maxReconsumeTimes;
}
+
+ public void persist(MessageQueue mq) {
+ this.getOffsetStore().persist(queueWithNamespace(mq));
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 6f3c59753..d50672e47 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -1381,7 +1381,6 @@ public class DefaultMQProducerImpl implements MQProducerInner {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendRequestOk(true);
- requestResponseFuture.putResponseMessage(msg);
}
@Override
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index 5ce761c30..a7f04731c 100644
--- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -42,6 +42,7 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
@@ -368,7 +369,8 @@ public class DefaultMQProducerTest {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
final AtomicBoolean finish = new AtomicBoolean(false);
new Thread(new Runnable() {
- @Override public void run() {
+ @Override
+ public void run() {
ConcurrentHashMap<String, RequestResponseFuture> responseMap = RequestFutureHolder.getInstance().getRequestFutureTable();
assertThat(responseMap).isNotNull();
while (!finish.get()) {
@@ -376,15 +378,19 @@ public class DefaultMQProducerTest {
Thread.sleep(10);
} catch (InterruptedException e) {
}
+ MessageExt responseMsg = new MessageExt();
+ responseMsg.setTopic(message.getTopic());
+ responseMsg.setBody(message.getBody());
for (Map.Entry<String, RequestResponseFuture> entry : responseMap.entrySet()) {
RequestResponseFuture future = entry.getValue();
- future.putResponseMessage(message);
+ future.putResponseMessage(responseMsg);
}
}
}
}).start();
Message result = producer.request(message, 3 * 1000L);
finish.getAndSet(true);
+ assertThat(result).isExactlyInstanceOf(MessageExt.class);
assertThat(result.getTopic()).isEqualTo("FooBar");
assertThat(result.getBody()).isEqualTo(new byte[] {'a'});
}
@@ -400,24 +406,31 @@ public class DefaultMQProducerTest {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
final CountDownLatch countDownLatch = new CountDownLatch(1);
RequestCallback requestCallback = new RequestCallback() {
- @Override public void onSuccess(Message message) {
+ @Override
+ public void onSuccess(Message message) {
+ assertThat(message).isExactlyInstanceOf(MessageExt.class);
assertThat(message.getTopic()).isEqualTo("FooBar");
assertThat(message.getBody()).isEqualTo(new byte[] {'a'});
assertThat(message.getFlag()).isEqualTo(1);
countDownLatch.countDown();
}
- @Override public void onException(Throwable e) {
+ @Override
+ public void onException(Throwable e) {
}
};
producer.request(message, requestCallback, 3 * 1000L);
ConcurrentHashMap<String, RequestResponseFuture> responseMap = RequestFutureHolder.getInstance().getRequestFutureTable();
assertThat(responseMap).isNotNull();
+
+ MessageExt responseMsg = new MessageExt();
+ responseMsg.setTopic(message.getTopic());
+ responseMsg.setBody(message.getBody());
+ responseMsg.setFlag(1);
for (Map.Entry<String, RequestResponseFuture> entry : responseMap.entrySet()) {
RequestResponseFuture future = entry.getValue();
future.setSendRequestOk(true);
- message.setFlag(1);
- future.getRequestCallback().onSuccess(message);
+ future.getRequestCallback().onSuccess(responseMsg);
}
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
}
@@ -427,11 +440,13 @@ public class DefaultMQProducerTest {
final AtomicInteger cc = new AtomicInteger(0);
final CountDownLatch countDownLatch = new CountDownLatch(1);
RequestCallback requestCallback = new RequestCallback() {
- @Override public void onSuccess(Message message) {
+ @Override
+ public void onSuccess(Message message) {
}
- @Override public void onException(Throwable e) {
+ @Override
+ public void onException(Throwable e) {
cc.incrementAndGet();
countDownLatch.countDown();
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/MQVersion.java b/common/src/main/java/org/apache/rocketmq/common/MQVersion.java
index 60ab6f8cb..08ad97f66 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MQVersion.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MQVersion.java
@@ -18,7 +18,7 @@ package org.apache.rocketmq.common;
public class MQVersion {
- public static final int CURRENT_VERSION = Version.V4_9_4.ordinal();
+ public static final int CURRENT_VERSION = Version.V4_9_5.ordinal();
public static String getVersionDesc(int value) {
int length = Version.values().length;
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java
index 465a2db58..c246b66d5 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java
@@ -61,7 +61,7 @@ public class UpdateAccessConfigSubCommand implements SubCommand {
options.addOption(opt);
opt = new Option("s", "secretKey", true, "set secretKey in acl config file");
- opt.setRequired(false);
+ opt.setRequired(true);
options.addOption(opt);
opt = new Option("w", "whiteRemoteAddress", true, "set white ip Address for account in acl config file");
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommand.java
index aa5a5ead9..d9bc6082f 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommand.java
@@ -38,8 +38,8 @@ public class DeleteTopicSubCommand implements SubCommand {
final String topic
) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
- Set<String> brokerAddressSet = CommandUtil.fetchMasterAndSlaveAddrByClusterName(adminExt, clusterName);
- adminExt.deleteTopicInBroker(brokerAddressSet, topic);
+ Set<String> masterBrokerAddressSet = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName);
+ adminExt.deleteTopicInBroker(masterBrokerAddressSet, topic);
System.out.printf("delete topic [%s] from cluster [%s] success.%n", topic, clusterName);
Set<String> nameServerSet = null;