You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/03/12 13:47:35 UTC
[rocketmq] 02/03: Merge remote-tracking branch 'apache/develop' into 5.0.0-alpha
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 3aa31a6a36d70be025a4a84c4fdd8084708b35f5
Merge: 78dde88 40d9505
Author: RongtongJin <ji...@mails.ucas.ac.cn>
AuthorDate: Sat Mar 12 21:15:45 2022 +0800
Merge remote-tracking branch 'apache/develop' into 5.0.0-alpha
# Conflicts:
# acl/pom.xml
# broker/pom.xml
# client/pom.xml
# common/pom.xml
# common/src/main/java/org/apache/rocketmq/common/MQVersion.java
# distribution/pom.xml
# example/pom.xml
# filter/pom.xml
# logging/pom.xml
# namesrv/pom.xml
# openmessaging/pom.xml
# pom.xml
# remoting/pom.xml
# srvutil/pom.xml
# store/pom.xml
# store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
# test/pom.xml
# tools/pom.xml
.github/ISSUE_TEMPLATE/issue_template.md | 23 +-
README.md | 2 +
.../rocketmq/acl/plain/PlainAccessValidator.java | 9 -
.../rocketmq/acl/plain/PlainPermissionManager.java | 140 ++++----
.../acl/plain/PlainAccessControlFlowTest.java | 396 +++++++++++++++++++++
.../acl/plain/PlainAccessValidatorTest.java | 60 +++-
.../conf/acl/plain_acl.yml | 11 +-
.../both_acl_file_folder_conf/conf}/plain_acl.yml | 25 +-
.../empty_acl_folder_conf/conf}/plain_acl.yml | 25 +-
.../only_acl_folder_conf}/conf/acl/plain_acl.yml | 11 +-
.../broker/offset/LmqConsumerOffsetManager.java | 4 +
.../offset/LmqConsumerOffsetManagerTest.java | 27 ++
.../rocketmq/client/impl/MQClientAPIImpl.java | 7 +-
.../ConsumeMessageConcurrentlyService.java | 2 +-
.../client/common/ThreadLocalIndexTest.java | 2 +-
.../java/org/apache/rocketmq/common/MQVersion.java | 6 -
.../rocketmq/common/topic/TopicValidator.java | 5 +
distribution/conf/{acl => }/plain_acl.yml | 0
docs/cn/Deployment.md | 12 +-
docs/cn/best_practice.md | 2 +-
.../namespace/PushConsumerWithNamespace.java | 2 +-
.../apache/rocketmq/namesrv/NamesrvStartup.java | 9 +-
.../rocketmq/namesrv/NamesrvStartupTest.java | 66 ++++
.../rocketmq/remoting/protocol/LanguageCode.java | 3 +-
.../remoting/protocol/LanguageCodeTest.java | 26 +-
.../apache/rocketmq/store/StoreStatsService.java | 74 +++-
.../rocketmq/store/logfile/DefaultMappedFile.java | 20 +-
.../rocketmq/store/StoreStatsServiceTest.java | 13 +
.../org/apache/rocketmq/store/StoreTestUtil.java | 15 +
.../store/dledger/DLedgerCommitlogTest.java | 4 +
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 7 +
.../tools/admin/DefaultMQAdminExtImpl.java | 15 +
.../apache/rocketmq/tools/admin/MQAdminExt.java | 4 +
.../connection/ConsumerConnectionSubCommand.java | 8 +-
.../command/consumer/ConsumerStatusSubCommand.java | 8 +-
.../tools/admin/DefaultMQAdminExtTest.java | 4 +
36 files changed, 856 insertions(+), 191 deletions(-)
diff --cc client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index d7f1652,e7e805d..09599d2
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@@ -272,37 -247,8 +272,36 @@@ public class MQClientAPIImpl
this.remotingClient.shutdown();
}
+ public Set<MessageQueueAssignment> queryAssignment(final String addr, final String topic,
+ final String consumerGroup, final String clientId, final String strategyName,
+ final MessageModel messageModel, final long timeoutMillis)
+ throws RemotingException, MQBrokerException, InterruptedException {
+ QueryAssignmentRequestBody requestBody = new QueryAssignmentRequestBody();
+ requestBody.setTopic(topic);
+ requestBody.setConsumerGroup(consumerGroup);
+ requestBody.setClientId(clientId);
+ requestBody.setMessageModel(messageModel);
+ requestBody.setStrategyName(strategyName);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_ASSIGNMENT, null);
+ request.setBody(requestBody.encode());
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ QueryAssignmentResponseBody queryAssignmentResponseBody = QueryAssignmentResponseBody.decode(response.getBody(), QueryAssignmentResponseBody.class);
+ return queryAssignmentResponseBody.getMessageQueueAssignments();
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
public void createSubscriptionGroup(final String addr, final SubscriptionGroupConfig config,
- final long timeoutMillis)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ final long timeoutMillis) throws RemotingException, InterruptedException, MQClientException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null);
byte[] body = RemotingSerializable.encode(config);
@@@ -672,11 -617,11 +671,11 @@@
String retryBrokerName = brokerName;//by default, it will send to the same broker
if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send
MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
- retryBrokerName = mqChosen.getBrokerName();
+ retryBrokerName = instance.getBrokerNameFromMessageQueue(mqChosen);
}
String addr = instance.findBrokerAddressInPublish(retryBrokerName);
- log.warn(String.format("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
- retryBrokerName), e);
+ log.warn("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
+ retryBrokerName, e);
try {
request.setOpaque(RemotingCommand.createNewRequestId());
sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
diff --cc store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
index a8ea1c6,b46e7ca..4d4830b
--- a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
@@@ -36,15 -36,10 +36,16 @@@ import org.apache.rocketmq.common.const
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageExtBatch;
-import org.apache.rocketmq.store.CommitLog.PutMessageContext;
+import org.apache.rocketmq.store.AppendMessageCallback;
+import org.apache.rocketmq.store.AppendMessageResult;
+import org.apache.rocketmq.store.AppendMessageStatus;
+import org.apache.rocketmq.store.MessageExtBatch;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.PutMessageContext;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.TransientStorePool;
import org.apache.rocketmq.store.config.FlushDiskType;
+ import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.util.LibC;
import sun.nio.ch.DirectBuffer;
diff --cc store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
index 1c0e54c,0e3e01d..18b9eac
--- a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
@@@ -40,9 -39,8 +40,11 @@@ import org.apache.rocketmq.store.PutMes
import org.junit.Assert;
import org.junit.Test;
+ import static org.apache.rocketmq.store.StoreTestUtil.releaseMmapFilesOnWindows;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+
public class DLedgerCommitlogTest extends MessageStoreTestBase {