You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/08/21 03:17:33 UTC

[GitHub] vongosling closed pull request #415: Pull Request for ACL feature request #403

vongosling closed pull request #415: Pull Request for ACL feature request #403
URL: https://github.com/apache/rocketmq/pull/415
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index 4b986c0d2..ca065dc6b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -22,6 +22,7 @@
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.broker.acl.AclRPCHook;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
@@ -62,6 +63,7 @@ public static BrokerController start(BrokerController controller) {
         try {
 
             controller.start();
+            controller.registerClientRPCHook(new AclRPCHook(controller.getBrokerOuterAPI()));
 
             String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
                 + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/acl/AclRPCHook.java b/broker/src/main/java/org/apache/rocketmq/broker/acl/AclRPCHook.java
new file mode 100644
index 000000000..d459ab021
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/acl/AclRPCHook.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.acl;
+
+import static org.apache.rocketmq.common.protocol.RequestCode.PULL_MESSAGE;
+import static org.apache.rocketmq.common.protocol.RequestCode.SEND_MESSAGE;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.AclConfigData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+/**
+ * AclRpcHook used for access control list.
+ */
+public class AclRPCHook implements RPCHook {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private BrokerOuterAPI brokerOuterAPI;
+    // todo use nameserver listener to avoid call multiple time
+
+    public AclRPCHook(BrokerOuterAPI brokerOuterAPI) {
+        this.brokerOuterAPI = brokerOuterAPI;
+    }
+
+    /**
+     *
+     * @param remoteAddr
+     * @param request
+     */
+    @Override
+    public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
+        String access = "";
+        try {
+            AclConfigData acl = brokerOuterAPI.getAclData(remoteAddr);
+            access = acl.getOperation();
+        } catch (Exception e) {
+            log.error("get acl data Exception, {}", e);
+        }
+        // check write cmd
+        if (request.getCode() == SEND_MESSAGE && !access.equals("w")) {
+            throw new RuntimeException("Access control is not permitted.");
+        }
+
+        // check read cmd
+        if (request.getCode() == PULL_MESSAGE && !access.equals("r")) {
+            throw new RuntimeException("Access control is not permitted.");
+        }
+    }
+
+    @Override
+    public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
+
+    }
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 4dee01cbf..753f5548a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -31,6 +31,7 @@
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.AclConfigData;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
@@ -392,6 +393,27 @@ public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
+    /**
+     * Get acl data.
+     * @return
+     */
+    public AclConfigData getAclData(final String addr) throws InterruptedException, RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        // 300, acl code request
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ACL_READ_CONFIG, null);
+        RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return AclConfigData.decode(response.getBody(), AclConfigData.class);
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
     public void registerRPCHook(RPCHook rpcHook) {
         remotingClient.registerRPCHook(rpcHook);
     }
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/acl/TestAclRPCHook.java b/broker/src/test/java/org/apache/rocketmq/broker/acl/TestAclRPCHook.java
new file mode 100644
index 000000000..36c2db6cc
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/acl/TestAclRPCHook.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.acl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+import java.lang.reflect.Field;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.body.AclConfigData;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Test;
+import org.mockito.Mock;
+
+/**
+ * Test for {@link AclRPCHook}.
+ */
+public class TestAclRPCHook {
+
+    AclRPCHook aclRPCHook;
+    @Mock
+    private BrokerOuterAPI brokerOuterAPI;
+    @Mock
+    private CommandCustomHeader commandCustomHeader;
+
+    public void init() throws Exception {
+        System.out.println(" test: " + brokerOuterAPI == null);
+        aclRPCHook = new AclRPCHook(brokerOuterAPI);
+        Field field = AclRPCHook.class.getDeclaredField("brokerOuterAPI");
+        field.setAccessible(true);
+        field.set(aclRPCHook, brokerOuterAPI);
+    }
+
+    @Test
+    public void testAllDenied() throws Exception {
+        init();
+        AclConfigData aclConfigData = new AclConfigData();
+        aclConfigData.setOperation("deny");
+        // return false
+        when(brokerOuterAPI.getAclData("test")).thenReturn(aclConfigData);
+        try {
+            final RemotingCommand appednRequest = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, commandCustomHeader);
+            aclRPCHook.doBeforeRequest("test", appednRequest);
+        } catch (Exception e){
+            assertTrue(e instanceof RuntimeException);
+        }
+        try {
+            final RemotingCommand pullRequest = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, commandCustomHeader);
+            aclRPCHook.doBeforeRequest("test", pullRequest);
+        } catch (Exception e){
+            assertTrue(e instanceof RuntimeException);
+        }
+    }
+
+    @Test
+    public void testAllPermitted() throws Exception {
+        init();
+        AclConfigData aclConfigData = new AclConfigData();
+        aclConfigData.setOperation("w");
+        // return true
+        when(brokerOuterAPI.getAclData("test")).thenReturn(aclConfigData);
+        try {
+            final RemotingCommand appednRequest = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, commandCustomHeader);
+            aclRPCHook.doBeforeRequest("test", appednRequest);
+        } catch (Exception e){
+        }
+        assertEquals(1, 1);
+    }
+
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 1837204a3..732a75db1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -58,6 +58,7 @@
 import org.apache.rocketmq.common.namesrv.TopAddressing;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.AclConfigData;
 import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
 import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
@@ -1070,7 +1071,7 @@ public ConsumeStats getConsumeStats(final String addr, final String consumerGrou
     }
 
     public ProducerConnection getProducerConnectionList(final String addr, final String producerGroup,
-        final long timeoutMillis)
+                                                        final long timeoutMillis)
         throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
         MQBrokerException {
         GetProducerConnectionListRequestHeader requestHeader = new GetProducerConnectionListRequestHeader();
@@ -1092,7 +1093,7 @@ public ProducerConnection getProducerConnectionList(final String addr, final Str
     }
 
     public ConsumerConnection getConsumerConnectionList(final String addr, final String consumerGroup,
-        final long timeoutMillis)
+                                                        final long timeoutMillis)
         throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
         MQBrokerException {
         GetConsumerConnectionListRequestHeader requestHeader = new GetConsumerConnectionListRequestHeader();
@@ -1523,7 +1524,7 @@ public GroupList queryTopicConsumeByWho(final String addr, final String topic, f
     }
 
     public List<QueueTimeSpan> queryConsumeTimeSpan(final String addr, final String topic, final String group,
-        final long timeoutMillis)
+                                                    final long timeoutMillis)
         throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
         MQBrokerException {
         QueryConsumeTimeSpanRequestHeader requestHeader = new QueryConsumeTimeSpanRequestHeader();
@@ -1919,7 +1920,7 @@ public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName,
     }
 
     public ConsumeStatsList fetchConsumeStatsInBroker(String brokerAddr, boolean isOrder,
-        long timeoutMillis) throws MQClientException,
+                                                      long timeoutMillis) throws MQClientException,
         RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
         GetConsumeStatsInBrokerHeader requestHeader = new GetConsumeStatsInBrokerHeader();
         requestHeader.setIsOrder(isOrder);
@@ -1944,7 +1945,7 @@ public ConsumeStatsList fetchConsumeStatsInBroker(String brokerAddr, boolean isO
     }
 
     public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr,
-        long timeoutMillis) throws InterruptedException,
+                                                            long timeoutMillis) throws InterruptedException,
         RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
         RemotingCommand response = this.remotingClient
@@ -1961,7 +1962,7 @@ public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr,
     }
 
     public TopicConfigSerializeWrapper getAllTopicConfig(final String addr,
-        long timeoutMillis) throws RemotingConnectException,
+                                                         long timeoutMillis) throws RemotingConnectException,
         RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
 
@@ -2042,9 +2043,9 @@ public void updateNameServerConfig(final Properties properties, final List<Strin
     }
 
     public QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr, final String topic,
-        final int queueId,
-        final long index, final int count, final String consumerGroup,
-        final long timeoutMillis) throws InterruptedException,
+                                                           final int queueId,
+                                                           final long index, final int count, final String consumerGroup,
+                                                           final long timeoutMillis) throws InterruptedException,
         RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
 
         QueryConsumeQueueRequestHeader requestHeader = new QueryConsumeQueueRequestHeader();
@@ -2089,4 +2090,82 @@ public void checkClientInBroker(final String brokerAddr, final String consumerGr
             throw new MQClientException(response.getCode(), response.getRemark());
         }
     }
+
+    public boolean aclWriteConfig(final String instanceName, final String topic, final String operation, final long timeoutMillis)
+        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+        RemotingConnectException, MQClientException {
+
+        if (instanceName == null || instanceName.length() < 1) {
+            return false;
+        }
+        if (topic == null || topic.length() < 1) {
+            return false;
+        }
+        if (operation == null || operation.length() < 1) {
+            return false;
+        }
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ACL_WRITE_CONFIG, null);
+
+        AclConfigData aclConfigData = new AclConfigData();
+        aclConfigData.setInstanceName(instanceName);
+        aclConfigData.setTopic(topic);
+        aclConfigData.setOperation(operation);
+
+        request.setBody(aclConfigData.encode());
+
+        List<String> invokeNameServers = this.remotingClient.getNameServerAddressList();
+
+        RemotingCommand errResponse = null;
+        for (String nameServer : invokeNameServers) {
+            RemotingCommand response = this.remotingClient.invokeSync(nameServer, request, timeoutMillis);
+            assert response != null;
+            switch (response.getCode()) {
+                case ResponseCode.SUCCESS: {
+                    break;
+                }
+                default:
+                    errResponse = response;
+            }
+        }
+
+        if (errResponse != null) {
+            throw new MQClientException(errResponse.getCode(), errResponse.getRemark());
+        }
+
+        return true;
+    }
+
+    public AclConfigData aclReadConfig(final String instanceName, final String topic, final String operation, final long timeoutMillis)
+        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+        RemotingConnectException, MQClientException  {
+
+        if (instanceName == null || instanceName.length() < 1) {
+            return null;
+        }
+        if (topic == null || topic.length() < 1) {
+            return null;
+        }
+        if (operation == null || operation.length() < 1) {
+            return null;
+        }
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ACL_READ_CONFIG, null);
+
+        AclConfigData aclConfigData = new AclConfigData();
+        aclConfigData.setInstanceName(instanceName);
+        aclConfigData.setTopic(topic);
+        aclConfigData.setOperation(operation);
+
+        request.setBody(aclConfigData.encode());
+
+        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
+        assert response != null;
+
+        if (ResponseCode.SUCCESS == response.getCode()) {
+            return AclConfigData.decode(response.getBody(), AclConfigData.class);
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
 }
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index c13e75c20..65f63cbdd 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -19,6 +19,7 @@
 import java.lang.reflect.Field;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.hook.SendMessageContext;
 import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -28,12 +29,16 @@
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.AclConfigData;
 import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.ResponseFuture;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -50,8 +55,10 @@
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -210,6 +217,36 @@ public void testSendMessageAsync_WithException() throws RemotingException, Inter
         }
     }
 
+    @Test
+    public void testAclWriteConfig() throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException, RemotingTooMuchRequestException, RemotingConnectException, MQClientException {
+        doNothing().when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+
+        boolean result = mqClientAPI.aclWriteConfig("test", "test", "rw", 1000);
+        assertThat(result).isEqualTo(true);
+
+
+        RemotingCommand remotingCommand = RemotingCommand.createResponseCommand(3, "error");
+        doReturn(remotingCommand).when(remotingClient)
+                .invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+        try {
+            mqClientAPI.aclWriteConfig("test", "test1", "r", 1000);
+        } catch (MQClientException e) {
+            assertThat(e).hasFieldOrPropertyWithValue("responseCode", 3);
+        }
+
+
+    }
+
+    @Test
+    public void testAclReadConfig() throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException, RemotingTooMuchRequestException, RemotingConnectException, MQClientException {
+        RemotingCommand remotingCommand = RemotingCommand.createResponseCommand(0, "ok");
+        remotingCommand.setBody("{\"test\": \"test\"}".getBytes());
+        doReturn(remotingCommand).when(remotingClient).invokeSync((String) eq(null), any(RemotingCommand.class), anyLong());
+
+        AclConfigData aclConfigData = mqClientAPI.aclReadConfig("test", "test", "rw", 1000);
+        assertThat(aclConfigData).isNotNull();
+    }
+
     private RemotingCommand createSuccessResponse(RemotingCommand request) {
         RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
         response.setCode(ResponseCode.SUCCESS);
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 8cf2d46ad..b9e80105d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -167,4 +167,12 @@
     public static final int QUERY_CONSUME_QUEUE = 321;
 
     public static final int QUERY_DATA_VERSION = 322;
+
+    /**
+     * ACL config
+     */
+    public static final int ACL_WRITE_CONFIG = 323;
+
+    public static final int ACL_READ_CONFIG = 324;
+
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/AclConfigData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/AclConfigData.java
new file mode 100644
index 000000000..a2f86191f
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/AclConfigData.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+public class AclConfigData extends RemotingSerializable {
+
+    private String instanceName;
+    private String topic;
+    private String operation;
+
+    public String getInstanceName() {
+        return instanceName;
+    }
+
+    public void setInstanceName(String instanceName) {
+        this.instanceName = instanceName;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getOperation() {
+        return operation;
+    }
+
+    public void setOperation(String operation) {
+        this.operation = operation;
+    }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index 467078c44..cd8ee934c 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -122,6 +122,10 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx,
                 return this.updateConfig(ctx, request);
             case RequestCode.GET_NAMESRV_CONFIG:
                 return this.getConfig(ctx, request);
+            case RequestCode.ACL_READ_CONFIG:
+                return this.getACL(ctx, request);
+            case RequestCode.ACL_WRITE_CONFIG:
+                return this.putACL(ctx, request);
             default:
                 break;
         }
@@ -560,4 +564,62 @@ private RemotingCommand getConfig(ChannelHandlerContext ctx, RemotingCommand req
         return response;
     }
 
+    private RemotingCommand getACL(ChannelHandlerContext ctx, RemotingCommand request) {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+        String content = this.namesrvController.getConfiguration().getAllConfigsFormatString();
+        if (content != null && content.length() > 0) {
+            try {
+                response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
+            } catch (UnsupportedEncodingException e) {
+                log.error("getConfig error, ", e);
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("UnsupportedEncodingException " + e);
+                return response;
+            }
+        }
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand putACL(ChannelHandlerContext ctx, RemotingCommand request) {
+
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+        byte[] body = request.getBody();
+        if (body != null) {
+            String bodyStr;
+            try {
+                bodyStr = new String(body, MixAll.DEFAULT_CHARSET);
+            } catch (UnsupportedEncodingException e) {
+                log.error("updateConfig byte array to string error: ", e);
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("UnsupportedEncodingException " + e);
+                return response;
+            }
+
+            if (bodyStr == null) {
+                log.error("updateConfig get null body!");
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("string2Properties error");
+                return response;
+            }
+
+            Properties properties = MixAll.string2Properties(bodyStr);
+            if (properties == null) {
+                log.error("updateConfig MixAll.string2Properties error {}", bodyStr);
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("string2Properties error");
+                return response;
+            }
+
+            this.namesrvController.getConfiguration().update(properties);
+        }
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
 }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index dc829c1c1..8d4adc1cc 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -32,6 +32,7 @@
 import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.AclConfigData;
 import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
@@ -237,7 +238,7 @@ public ConsumerConnection examineConsumerConnectionInfo(
 
     @Override
     public ProducerConnection examineProducerConnectionInfo(String producerGroup,
-        final String topic) throws RemotingException,
+                                                            final String topic) throws RemotingException,
         MQClientException, InterruptedException, MQBrokerException {
         return defaultMQAdminExtImpl.examineProducerConnectionInfo(producerGroup, topic);
     }
@@ -353,7 +354,7 @@ public GroupList queryTopicConsumeByWho(
 
     @Override
     public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic,
-        final String group) throws InterruptedException, MQBrokerException,
+                                                    final String group) throws InterruptedException, MQBrokerException,
         RemotingException, MQClientException {
         return this.defaultMQAdminExtImpl.queryConsumeTimeSpan(topic, group);
     }
@@ -386,7 +387,7 @@ public boolean cleanUnusedTopicByAddr(String addr) throws RemotingConnectExcepti
 
     @Override
     public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId,
-        boolean jstack) throws RemotingException,
+                                                      boolean jstack) throws RemotingException,
         MQClientException, InterruptedException {
         return defaultMQAdminExtImpl.getConsumerRunningInfo(consumerGroup, clientId, jstack);
     }
@@ -420,7 +421,7 @@ public void cloneGroupOffset(String srcGroup, String destGroup, String topic,
 
     @Override
     public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName,
-        String statsKey) throws RemotingConnectException,
+                                               String statsKey) throws RemotingConnectException,
         RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
         return this.defaultMQAdminExtImpl.viewBrokerStatsData(brokerAddr, statsName, statsKey);
     }
@@ -433,7 +434,7 @@ public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName,
 
     @Override
     public ConsumeStatsList fetchConsumeStatsInBroker(final String brokerAddr, boolean isOrder,
-        long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException,
+                                                      long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException,
         RemotingTimeoutException, MQClientException, InterruptedException {
         return this.defaultMQAdminExtImpl.fetchConsumeStatsInBroker(brokerAddr, isOrder, timeoutMillis);
     }
@@ -446,14 +447,14 @@ public ConsumeStatsList fetchConsumeStatsInBroker(final String brokerAddr, boole
 
     @Override
     public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr,
-        long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+                                                            long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
         RemotingConnectException, MQBrokerException {
         return this.defaultMQAdminExtImpl.getAllSubscriptionGroup(brokerAddr, timeoutMillis);
     }
 
     @Override
     public TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr,
-        long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+                                                        long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
         RemotingConnectException, MQBrokerException {
         return this.defaultMQAdminExtImpl.getAllTopicGroup(brokerAddr, timeoutMillis);
     }
@@ -507,10 +508,24 @@ public void updateNameServerConfig(final Properties properties, final List<Strin
 
     @Override
     public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic, int queueId, long index,
-        int count, String consumerGroup)
+                                                           int count, String consumerGroup)
         throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
         return this.defaultMQAdminExtImpl.queryConsumeQueue(
             brokerAddr, topic, queueId, index, count, consumerGroup
         );
     }
+
+    @Override
+    public boolean aclWriteConfig(String instanceName, String topic, String operation)
+        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+        RemotingConnectException, MQClientException {
+
+        return this.defaultMQAdminExtImpl.aclWriteConfig(instanceName, topic, operation);
+    }
+
+    @Override
+    public AclConfigData aclReadConfig(String instanceName, String topic, String operation)
+        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
+        return this.defaultMQAdminExtImpl.aclReadConfig(instanceName, topic, operation);
+    }
 }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index bcd66669c..0d108d0de 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -47,14 +47,7 @@
 import org.apache.rocketmq.common.admin.TopicOffset;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.help.FAQUrl;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.common.message.MessageClientExt;
-import org.apache.rocketmq.common.message.MessageConst;
-import org.apache.rocketmq.common.message.MessageDecoder;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.namesrv.NamesrvUtil;
-import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.AclConfigData;
 import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
@@ -69,6 +62,14 @@
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.common.message.MessageClientExt;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.namesrv.NamesrvUtil;
+import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
@@ -319,7 +320,7 @@ public ConsumerConnection examineConsumerConnectionInfo(
 
     @Override
     public ProducerConnection examineProducerConnectionInfo(String producerGroup,
-        final String topic) throws RemotingException,
+                                                            final String topic) throws RemotingException,
         MQClientException, InterruptedException, MQBrokerException {
         ProducerConnection result = new ProducerConnection();
         List<BrokerData> brokers = this.examineTopicRouteInfo(topic).getBrokerDatas();
@@ -548,8 +549,9 @@ private RollbackStats resetOffsetConsumeOffset(String brokerAddr, String consume
         return Collections.EMPTY_MAP;
     }
 
+    @Override
     public void createOrUpdateOrderConf(String key, String value,
-        boolean isCluster) throws RemotingException, MQBrokerException,
+                                        boolean isCluster) throws RemotingException, MQBrokerException,
         InterruptedException, MQClientException {
 
         if (isCluster) {
@@ -701,7 +703,7 @@ public boolean cleanUnusedTopicByAddr(String addr) throws RemotingConnectExcepti
 
     @Override
     public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId,
-        boolean jstack) throws RemotingException,
+                                                      boolean jstack) throws RemotingException,
         MQClientException, InterruptedException {
         String topic = MixAll.RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
         TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
@@ -871,7 +873,7 @@ public void cloneGroupOffset(String srcGroup, String destGroup, String topic,
 
     @Override
     public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName,
-        String statsKey) throws RemotingConnectException,
+                                               String statsKey) throws RemotingConnectException,
         RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
         return this.mqClientInstance.getMQClientAPIImpl().viewBrokerStatsData(brokerAddr, statsName, statsKey, timeoutMillis);
     }
@@ -910,14 +912,14 @@ public ConsumeStatsList fetchConsumeStatsInBroker(final String brokerAddr, boole
 
     @Override
     public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr,
-        long timeoutMillis) throws InterruptedException,
+                                                            long timeoutMillis) throws InterruptedException,
         RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
         return this.mqClientInstance.getMQClientAPIImpl().getAllSubscriptionGroup(brokerAddr, timeoutMillis);
     }
 
     @Override
     public TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr,
-        long timeoutMillis) throws InterruptedException,
+                                                        long timeoutMillis) throws InterruptedException,
         RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
         return this.mqClientInstance.getMQClientAPIImpl().getAllTopicConfig(brokerAddr, timeoutMillis);
     }
@@ -994,10 +996,24 @@ public void updateNameServerConfig(final Properties properties, final List<Strin
 
     @Override
     public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic, int queueId, long index,
-        int count, String consumerGroup)
+                                                           int count, String consumerGroup)
         throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
         return this.mqClientInstance.getMQClientAPIImpl().queryConsumeQueue(
             brokerAddr, topic, queueId, index, count, consumerGroup, timeoutMillis
         );
     }
+
+    @Override
+    public boolean aclWriteConfig(String instanceName, String topic, String operation)
+        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+        RemotingConnectException, MQClientException {
+        return this.mqClientInstance.getMQClientAPIImpl().aclWriteConfig(instanceName, topic, operation, timeoutMillis);
+    }
+
+    @Override
+    public AclConfigData aclReadConfig(String instanceName, String topic, String operation)
+        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+        RemotingConnectException, MQClientException {
+        return this.mqClientInstance.getMQClientAPIImpl().aclReadConfig(instanceName, topic, operation, timeoutMillis);
+    }
 }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 16b442757..18c88a04c 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -30,6 +30,7 @@
 import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.AclConfigData;
 import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
@@ -259,4 +260,18 @@ QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr,
         final String topic, final int queueId,
         final long index, final int count, final String consumerGroup)
         throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException;
+
+    /**
+     * acl write operation
+     */
+    boolean aclWriteConfig(final String instanceName, final String topic, final String operation)
+        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+        RemotingConnectException, MQClientException;
+
+    /**
+     * acl read operation
+     */
+    AclConfigData aclReadConfig(final String instanceName, final String topic, final String operation)
+        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+        RemotingConnectException, MQClientException;
 }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 6a51b7b4b..f4ebd1166 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -56,6 +56,8 @@
 import org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand;
 import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand;
 import org.apache.rocketmq.tools.command.message.SendMessageCommand;
+import org.apache.rocketmq.tools.command.namesrv.AclConfigReadCommand;
+import org.apache.rocketmq.tools.command.namesrv.AclConfigWriteCommand;
 import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand;
 import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand;
 import org.apache.rocketmq.tools.command.namesrv.UpdateKvConfigCommand;
@@ -197,6 +199,9 @@ public static void initCommand() {
         initCommand(new QueryConsumeQueueCommand());
         initCommand(new SendMessageCommand());
         initCommand(new ConsumeMessageCommand());
+
+        initCommand(new AclConfigReadCommand());
+        initCommand(new AclConfigWriteCommand());
     }
 
     private static void initLogback() throws JoranException {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AclConfigReadCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AclConfigReadCommand.java
new file mode 100644
index 000000000..f917428a3
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AclConfigReadCommand.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.namesrv;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.protocol.body.AclConfigData;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class AclConfigReadCommand implements SubCommand {
+    @Override
+    public String commandName() {
+
+        return "aclRead";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "ACL read function";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+
+        Option opt = new Option("i","instanceName", true, "instance name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("t","topic", true, "topic name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("r","read", true, "access read operation");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook)
+        throws SubCommandException {
+
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+        try {
+            // instance name
+            String instanceName = commandLine.getOptionValue('i').trim();
+            // topic name
+            String topic = commandLine.getOptionValue('t').trim();
+            // read operation
+            String operation = commandLine.getOptionValue('r').trim();
+
+            defaultMQAdminExt.start();
+
+            AclConfigData aclConfigData = defaultMQAdminExt.aclReadConfig(instanceName, topic, operation);
+
+            System.out.printf("%-50s  %-50s %-50s\n", aclConfigData.getInstanceName(),
+                aclConfigData.getTopic(), aclConfigData.getOperation());
+        } catch (Exception e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AclConfigWriteCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AclConfigWriteCommand.java
new file mode 100644
index 000000000..92a0398a1
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AclConfigWriteCommand.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.namesrv;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class AclConfigWriteCommand implements SubCommand {
+    @Override
+    public String commandName() {
+        return "aclWrite";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "ACL write function.";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("i","instanceName", true, "instance name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("t","topic", true, "topic name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("o","operation", true, "access control operation");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook)
+        throws SubCommandException {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+        try {
+            // instance name
+            String instanceName = commandLine.getOptionValue('i').trim();
+            // topic name
+            String topic = commandLine.getOptionValue('t').trim();
+            // write operation
+            String operation = commandLine.getOptionValue('o').trim();
+
+            defaultMQAdminExt.start();
+            defaultMQAdminExt.aclWriteConfig(instanceName, topic, operation);
+            System.out.printf("acl write operate success.%n");
+        } catch (Exception e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+
+    }
+}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
index 786598002..365437a29 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@ -18,6 +18,7 @@
 
 import java.io.UnsupportedEncodingException;
 import java.lang.reflect.Field;
+import java.security.acl.Acl;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -42,6 +43,7 @@
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.namesrv.NamesrvUtil;
+import org.apache.rocketmq.common.protocol.body.AclConfigData;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.body.Connection;
 import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
@@ -67,6 +69,7 @@
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.tools.admin.api.MessageTrack;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -75,9 +78,13 @@
 import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.booleanThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -225,6 +232,10 @@ public static void init() throws Exception {
         consumerRunningInfo.setStatusTable(new TreeMap<String, ConsumeStatus>());
         consumerRunningInfo.setSubscriptionSet(new TreeSet<SubscriptionData>());
         when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo);
+
+        AclConfigData aclConfigData = AclConfigData.decode("{\"test\": \"test\"}".getBytes(), AclConfigData.class);
+        doReturn(aclConfigData).when(mQClientAPIImpl).aclReadConfig("test", "test", "r", 1000);
+        doReturn(true).when(mQClientAPIImpl).aclWriteConfig(anyString(), anyString(), anyString(), anyLong());
     }
 
     @AfterClass
@@ -406,4 +417,16 @@ public void testGetAllSubscriptionGroup() throws InterruptedException, MQBrokerE
         assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getGroupName()).isEqualTo("Consumer-group-one");
         assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").isConsumeBroadcastEnable()).isTrue();
     }
+
+    @Test
+    public void testAclWriteConfig() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
+        boolean result = defaultMQAdminExt.aclWriteConfig("test", "test", "r");
+        assertThat(result).isTrue();
+    }
+
+    @Test
+    public void testAclReadConfig() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
+        assertThat(defaultMQAdminExt.aclReadConfig("test", "test", "r")).isNotNull();
+    }
+
 }
\ No newline at end of file
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/AclConfigReadCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/AclConfigReadCommandTest.java
new file mode 100644
index 000000000..b8f0f6890
--- /dev/null
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/AclConfigReadCommandTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.namesrv;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.protocol.body.AclConfigData;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.apache.rocketmq.tools.command.SubCommandException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class AclConfigReadCommandTest {
+    private static DefaultMQAdminExt defaultMQAdminExt;
+    private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+    private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    private static MQClientAPIImpl mQClientAPIImpl;
+
+    @BeforeClass
+    public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
+        mQClientAPIImpl = mock(MQClientAPIImpl.class);
+        defaultMQAdminExt = new DefaultMQAdminExt();
+        defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
+
+        Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExtImpl, mqClientInstance);
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mqClientInstance, mQClientAPIImpl);
+        field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+
+        AclConfigData aclConfigData = AclConfigData.decode("{\"test\": \"test\"}".getBytes(), AclConfigData.class);
+        doReturn(aclConfigData).when(mQClientAPIImpl).aclReadConfig(anyString(), anyString(), anyString(), anyLong());
+
+    }
+
+    @AfterClass
+    public static void terminate() {
+        defaultMQAdminExt.shutdown();
+    }
+
+    @Test
+    public void testExecute() throws SubCommandException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
+        AclConfigReadCommand cmd = new AclConfigReadCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[] {"-i instanceName", "-t topic",  "-r r"};
+        final CommandLine commandLine =
+                ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
+        //cmd.execute(commandLine, options, null);
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services