You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/03/12 13:47:35 UTC

[rocketmq] 02/03: Merge remote-tracking branch 'apache/develop' into 5.0.0-alpha

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 3aa31a6a36d70be025a4a84c4fdd8084708b35f5
Merge: 78dde88 40d9505
Author: RongtongJin <ji...@mails.ucas.ac.cn>
AuthorDate: Sat Mar 12 21:15:45 2022 +0800

    Merge remote-tracking branch 'apache/develop' into 5.0.0-alpha
    
    # Conflicts:
    #	acl/pom.xml
    #	broker/pom.xml
    #	client/pom.xml
    #	common/pom.xml
    #	common/src/main/java/org/apache/rocketmq/common/MQVersion.java
    #	distribution/pom.xml
    #	example/pom.xml
    #	filter/pom.xml
    #	logging/pom.xml
    #	namesrv/pom.xml
    #	openmessaging/pom.xml
    #	pom.xml
    #	remoting/pom.xml
    #	srvutil/pom.xml
    #	store/pom.xml
    #	store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
    #	test/pom.xml
    #	tools/pom.xml

 .github/ISSUE_TEMPLATE/issue_template.md           |  23 +-
 README.md                                          |   2 +
 .../rocketmq/acl/plain/PlainAccessValidator.java   |   9 -
 .../rocketmq/acl/plain/PlainPermissionManager.java | 140 ++++----
 .../acl/plain/PlainAccessControlFlowTest.java      | 396 +++++++++++++++++++++
 .../acl/plain/PlainAccessValidatorTest.java        |  60 +++-
 .../conf/acl/plain_acl.yml                         |  11 +-
 .../both_acl_file_folder_conf/conf}/plain_acl.yml  |  25 +-
 .../empty_acl_folder_conf/conf}/plain_acl.yml      |  25 +-
 .../only_acl_folder_conf}/conf/acl/plain_acl.yml   |  11 +-
 .../broker/offset/LmqConsumerOffsetManager.java    |   4 +
 .../offset/LmqConsumerOffsetManagerTest.java       |  27 ++
 .../rocketmq/client/impl/MQClientAPIImpl.java      |   7 +-
 .../ConsumeMessageConcurrentlyService.java         |   2 +-
 .../client/common/ThreadLocalIndexTest.java        |   2 +-
 .../java/org/apache/rocketmq/common/MQVersion.java |   6 -
 .../rocketmq/common/topic/TopicValidator.java      |   5 +
 distribution/conf/{acl => }/plain_acl.yml          |   0
 docs/cn/Deployment.md                              |  12 +-
 docs/cn/best_practice.md                           |   2 +-
 .../namespace/PushConsumerWithNamespace.java       |   2 +-
 .../apache/rocketmq/namesrv/NamesrvStartup.java    |   9 +-
 .../rocketmq/namesrv/NamesrvStartupTest.java       |  66 ++++
 .../rocketmq/remoting/protocol/LanguageCode.java   |   3 +-
 .../remoting/protocol/LanguageCodeTest.java        |  26 +-
 .../apache/rocketmq/store/StoreStatsService.java   |  74 +++-
 .../rocketmq/store/logfile/DefaultMappedFile.java  |  20 +-
 .../rocketmq/store/StoreStatsServiceTest.java      |  13 +
 .../org/apache/rocketmq/store/StoreTestUtil.java   |  15 +
 .../store/dledger/DLedgerCommitlogTest.java        |   4 +
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |   7 +
 .../tools/admin/DefaultMQAdminExtImpl.java         |  15 +
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |   4 +
 .../connection/ConsumerConnectionSubCommand.java   |   8 +-
 .../command/consumer/ConsumerStatusSubCommand.java |   8 +-
 .../tools/admin/DefaultMQAdminExtTest.java         |   4 +
 36 files changed, 856 insertions(+), 191 deletions(-)

diff --cc client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index d7f1652,e7e805d..09599d2
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@@ -272,37 -247,8 +272,36 @@@ public class MQClientAPIImpl 
          this.remotingClient.shutdown();
      }
  
 +    public Set<MessageQueueAssignment> queryAssignment(final String addr, final String topic,
 +        final String consumerGroup, final String clientId, final String strategyName,
 +        final MessageModel messageModel, final long timeoutMillis)
 +        throws RemotingException, MQBrokerException, InterruptedException {
 +        QueryAssignmentRequestBody requestBody = new QueryAssignmentRequestBody();
 +        requestBody.setTopic(topic);
 +        requestBody.setConsumerGroup(consumerGroup);
 +        requestBody.setClientId(clientId);
 +        requestBody.setMessageModel(messageModel);
 +        requestBody.setStrategyName(strategyName);
 +
 +        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_ASSIGNMENT, null);
 +        request.setBody(requestBody.encode());
 +
 +        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
 +            request, timeoutMillis);
 +        switch (response.getCode()) {
 +            case ResponseCode.SUCCESS: {
 +                QueryAssignmentResponseBody queryAssignmentResponseBody = QueryAssignmentResponseBody.decode(response.getBody(), QueryAssignmentResponseBody.class);
 +                return queryAssignmentResponseBody.getMessageQueueAssignments();
 +            }
 +            default:
 +                break;
 +        }
 +
 +        throw new MQBrokerException(response.getCode(), response.getRemark());
 +    }
 +
      public void createSubscriptionGroup(final String addr, final SubscriptionGroupConfig config,
-         final long timeoutMillis)
-         throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+         final long timeoutMillis) throws RemotingException, InterruptedException, MQClientException {
          RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null);
  
          byte[] body = RemotingSerializable.encode(config);
@@@ -672,11 -617,11 +671,11 @@@
              String retryBrokerName = brokerName;//by default, it will send to the same broker
              if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send
                  MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
 -                retryBrokerName = mqChosen.getBrokerName();
 +                retryBrokerName = instance.getBrokerNameFromMessageQueue(mqChosen);
              }
              String addr = instance.findBrokerAddressInPublish(retryBrokerName);
-             log.warn(String.format("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
-                 retryBrokerName), e);
+             log.warn("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
+                 retryBrokerName, e);
              try {
                  request.setOpaque(RemotingCommand.createNewRequestId());
                  sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
diff --cc store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
index a8ea1c6,b46e7ca..4d4830b
--- a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
@@@ -36,15 -36,10 +36,16 @@@ import org.apache.rocketmq.common.const
  import org.apache.rocketmq.logging.InternalLogger;
  import org.apache.rocketmq.logging.InternalLoggerFactory;
  import org.apache.rocketmq.common.message.MessageExt;
 -import org.apache.rocketmq.common.message.MessageExtBatch;
 -import org.apache.rocketmq.store.CommitLog.PutMessageContext;
 +import org.apache.rocketmq.store.AppendMessageCallback;
 +import org.apache.rocketmq.store.AppendMessageResult;
 +import org.apache.rocketmq.store.AppendMessageStatus;
 +import org.apache.rocketmq.store.MessageExtBatch;
 +import org.apache.rocketmq.store.MessageExtBrokerInner;
 +import org.apache.rocketmq.store.PutMessageContext;
 +import org.apache.rocketmq.store.SelectMappedBufferResult;
 +import org.apache.rocketmq.store.TransientStorePool;
  import org.apache.rocketmq.store.config.FlushDiskType;
+ import org.apache.rocketmq.store.config.MessageStoreConfig;
  import org.apache.rocketmq.store.util.LibC;
  import sun.nio.ch.DirectBuffer;
  
diff --cc store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
index 1c0e54c,0e3e01d..18b9eac
--- a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
@@@ -40,9 -39,8 +40,11 @@@ import org.apache.rocketmq.store.PutMes
  import org.junit.Assert;
  import org.junit.Test;
  
+ import static org.apache.rocketmq.store.StoreTestUtil.releaseMmapFilesOnWindows;
+ 
 +import static java.util.concurrent.TimeUnit.SECONDS;
 +import static org.awaitility.Awaitility.await;
 +
  public class DLedgerCommitlogTest extends MessageStoreTestBase {