You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/10/27 06:07:00 UTC

[GitHub] dongeforever closed pull request #485: Develop acl

dongeforever closed pull request #485: Develop acl
URL: https://github.com/apache/rocketmq/pull/485
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/acl-plug/pom.xml b/acl-plug/pom.xml
new file mode 100644
index 000000000..1cdc4a29d
--- /dev/null
+++ b/acl-plug/pom.xml
@@ -0,0 +1,52 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+	license agreements. See the NOTICE file distributed with this work for additional
+	information regarding copyright ownership. The ASF licenses this file to
+	You under the Apache License, Version 2.0 (the "License"); you may not use
+	this file except in compliance with the License. You may obtain a copy of
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+	by applicable law or agreed to in writing, software distributed under the
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+	OF ANY KIND, either express or implied. See the License for the specific
+	language governing permissions and limitations under the License. -->
+<project
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+    xmlns="http://maven.apache.org/POM/4.0.0">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.rocketmq</groupId>
+        <artifactId>rocketmq-all</artifactId>
+        <version>4.4.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>rocketmq-acl-plug</artifactId>
+    <name>rocketmq-acl-plug ${project.version}</name>
+
+    <url>http://maven.apache.org</url>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-remoting</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-logging</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.yaml</groupId>
+            <artifactId>snakeyaml</artifactId>
+            <version>1.19</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AccessContralAnalysis.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AccessContralAnalysis.java
new file mode 100644
index 000000000..75c907d82
--- /dev/null
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AccessContralAnalysis.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.plug.entity.AccessControl;
+import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
+
+public class AccessContralAnalysis {
+
+    private Map<Class<?>, Map<Integer, Field>> classTocodeAndMentod = new HashMap<>();
+
+    private Map<String, Integer> fieldNameAndCode = new HashMap<>();
+
+    public void analysisClass(Class<?> clazz) {
+        Field[] fields = clazz.getDeclaredFields();
+        try {
+            for (Field field : fields) {
+                if (field.getType().equals(int.class)) {
+                    String name = StringUtils.replace(field.getName(), "_", "").toLowerCase();
+                    fieldNameAndCode.put(name, (Integer) field.get(null));
+                }
+
+            }
+        } catch (IllegalArgumentException | IllegalAccessException e) {
+            throw new AclPlugRuntimeException(String.format("analysis on failure Class is %s", clazz.getName()), e);
+        }
+    }
+
+    public Map<Integer, Boolean> analysis(AccessControl accessControl) {
+        Class<? extends AccessControl> clazz = accessControl.getClass();
+        Map<Integer, Field> codeAndField = classTocodeAndMentod.get(clazz);
+        if (codeAndField == null) {
+            codeAndField = new HashMap<>();
+            Field[] fields = clazz.getDeclaredFields();
+            for (Field field : fields) {
+                if (!field.getType().equals(boolean.class))
+                    continue;
+                Integer code = fieldNameAndCode.get(field.getName().toLowerCase());
+                if (code == null) {
+                    throw new AclPlugRuntimeException(String.format("field nonexistent in code  fieldName is %s", field.getName()));
+                }
+                field.setAccessible(true);
+                codeAndField.put(code, field);
+
+            }
+            if (codeAndField.isEmpty()) {
+                throw new AclPlugRuntimeException(String.format("AccessControl nonexistent code , name %s", accessControl.getClass().getName()));
+            }
+            classTocodeAndMentod.put(clazz, codeAndField);
+        }
+        Iterator<Entry<Integer, Field>> it = codeAndField.entrySet().iterator();
+        Map<Integer, Boolean> authority = new HashMap<>();
+        try {
+            while (it.hasNext()) {
+                Entry<Integer, Field> e = it.next();
+                authority.put(e.getKey(), (Boolean) e.getValue().get(accessControl));
+            }
+        } catch (IllegalArgumentException | IllegalAccessException e) {
+            throw new AclPlugRuntimeException(String.format("analysis on failure AccessControl is %s", AccessControl.class.getName()), e);
+        }
+        return authority;
+    }
+
+}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AclPlugController.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AclPlugController.java
new file mode 100644
index 000000000..1ec1f1e99
--- /dev/null
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AclPlugController.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug;
+
+import org.apache.rocketmq.acl.plug.engine.AclPlugEngine;
+import org.apache.rocketmq.acl.plug.engine.PlainAclPlugEngine;
+import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
+import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
+
+public class AclPlugController {
+
+    private ControllerParameters controllerParameters;
+
+    private AclPlugEngine aclPlugEngine;
+
+    private AclRemotingService aclRemotingService;
+
+    private boolean startSucceed = false;
+
+    public AclPlugController(ControllerParameters controllerParameters) throws AclPlugRuntimeException {
+        try {
+            this.controllerParameters = controllerParameters;
+            aclPlugEngine = new PlainAclPlugEngine(controllerParameters);
+            aclPlugEngine.initialize();
+            aclRemotingService = new DefaultAclRemotingServiceImpl(aclPlugEngine);
+            this.startSucceed = true;
+        } catch (Exception e) {
+            throw new AclPlugRuntimeException(String.format("Start the abnormal , Launch parameters is %s", this.controllerParameters.toString()), e);
+        }
+    }
+
+    public AclRemotingService getAclRemotingService() {
+        return this.aclRemotingService;
+    }
+
+    public void doChannelCloseEvent(String remoteAddr) {
+        if (this.startSucceed) {
+            aclPlugEngine.deleteLoginInfo(remoteAddr);
+        }
+    }
+
+    public boolean isStartSucceed() {
+        return startSucceed;
+    }
+}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AclRemotingService.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AclRemotingService.java
new file mode 100644
index 000000000..c651a5d99
--- /dev/null
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AclRemotingService.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug;
+
+import org.apache.rocketmq.acl.plug.entity.AccessControl;
+import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
+
+public interface AclRemotingService {
+
+    public AuthenticationResult check(AccessControl accessControl);
+
+}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AclUtils.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AclUtils.java
new file mode 100644
index 000000000..df997b59d
--- /dev/null
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AclUtils.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
+
+public class AclUtils {
+
+    public static void verify(String netaddress, int index) {
+        if (!AclUtils.isScope(netaddress, index)) {
+            throw new AclPlugRuntimeException(String.format("netaddress examine scope Exception netaddress is %s", netaddress));
+        }
+    }
+
+    public static String[] getAddreeStrArray(String netaddress, String four) {
+        String[] fourStrArray = StringUtils.split(four.substring(1, four.length() - 1), ",");
+        String address = netaddress.substring(0, netaddress.indexOf("{"));
+        String[] addreeStrArray = new String[fourStrArray.length];
+        for (int i = 0; i < fourStrArray.length; i++) {
+            addreeStrArray[i] = address + fourStrArray[i];
+        }
+        return addreeStrArray;
+    }
+
+    public static boolean isScope(String num, int index) {
+        String[] strArray = StringUtils.split(num, ".");
+        if (strArray.length != 4) {
+            return false;
+        }
+        return isScope(strArray, index);
+
+    }
+
+    public static boolean isScope(String[] num, int index) {
+        if (num.length <= index) {
+
+        }
+        for (int i = 0; i < index; i++) {
+            if (!isScope(num[i])) {
+                return false;
+            }
+        }
+        return true;
+
+    }
+
+    public static boolean isScope(String num) {
+        return isScope(Integer.valueOf(num.trim()));
+    }
+
+    public static boolean isScope(int num) {
+        return num >= 0 && num <= 255;
+    }
+
+    public static boolean isAsterisk(String asterisk) {
+        return asterisk.indexOf('*') > -1;
+    }
+
+    public static boolean isColon(String colon) {
+        return colon.indexOf(',') > -1;
+    }
+
+    public static boolean isMinus(String minus) {
+        return minus.indexOf('-') > -1;
+
+    }
+}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/Authentication.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/Authentication.java
new file mode 100644
index 000000000..901cc409d
--- /dev/null
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/Authentication.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug;
+
+import org.apache.rocketmq.acl.plug.entity.AccessControl;
+import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
+import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
+import org.apache.rocketmq.acl.plug.entity.BorkerAccessControl;
+
+public class Authentication {
+
+    public boolean authentication(AuthenticationInfo authenticationInfo,
+        AccessControl accessControl, AuthenticationResult authenticationResult) {
+        int code = accessControl.getCode();
+        if (!authenticationInfo.getAuthority().get(code)) {
+            authenticationResult.setResultString(String.format("code is %d Authentication failed", code));
+            return false;
+        }
+        if (!(authenticationInfo.getAccessControl() instanceof BorkerAccessControl)) {
+            return true;
+        }
+        BorkerAccessControl borker = (BorkerAccessControl) authenticationInfo.getAccessControl();
+        String topicName = accessControl.getTopic();
+        if (code == 10 || code == 310 || code == 320) {
+            if (borker.getPermitSendTopic().contains(topicName)) {
+                return true;
+            }
+            if (borker.getNoPermitSendTopic().contains(topicName)) {
+                authenticationResult.setResultString(String.format("noPermitSendTopic include %s", topicName));
+                return false;
+            }
+            return borker.getPermitSendTopic().isEmpty() ? true : false;
+        } else if (code == 11) {
+            if (borker.getPermitPullTopic().contains(topicName)) {
+                return true;
+            }
+            if (borker.getNoPermitPullTopic().contains(topicName)) {
+                authenticationResult.setResultString(String.format("noPermitPullTopic include %s", topicName));
+                return false;
+            }
+            return borker.getPermitPullTopic().isEmpty() ? true : false;
+        }
+        return true;
+    }
+}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java
new file mode 100644
index 000000000..b42205abc
--- /dev/null
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug;
+
+import org.apache.rocketmq.acl.plug.engine.AclPlugEngine;
+import org.apache.rocketmq.acl.plug.entity.AccessControl;
+import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
+import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
+
+public class DefaultAclRemotingServiceImpl implements AclRemotingService {
+
+    private AclPlugEngine aclPlugEngine;
+
+    public DefaultAclRemotingServiceImpl(AclPlugEngine aclPlugEngine) {
+        this.aclPlugEngine = aclPlugEngine;
+    }
+
+    @Override
+    public AuthenticationResult check(AccessControl accessControl) {
+        AuthenticationResult authenticationResult = aclPlugEngine.eachCheckLoginAndAuthentication(accessControl);
+        if (authenticationResult.getException() != null) {
+            throw new AclPlugRuntimeException(String.format("eachCheck the inspection appear exception, accessControl data is %s", accessControl.toString()), authenticationResult.getException());
+        }
+        if (authenticationResult.getAccessControl() == null || !authenticationResult.isSucceed()) {
+            throw new AclPlugRuntimeException(String.format("%s accessControl data is %s", authenticationResult.getResultString(), accessControl.toString()));
+        }
+        return authenticationResult;
+    }
+
+}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AclPlugEngine.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AclPlugEngine.java
new file mode 100644
index 000000000..badae946c
--- /dev/null
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AclPlugEngine.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug.engine;
+
+import org.apache.rocketmq.acl.plug.entity.AccessControl;
+import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
+import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
+import org.apache.rocketmq.acl.plug.entity.LoginInfo;
+
+public interface AclPlugEngine {
+
+    public AuthenticationInfo getAccessControl(AccessControl accessControl);
+
+    public LoginInfo getLoginInfo(AccessControl accessControl);
+
+    public void deleteLoginInfo(String remoteAddr);
+
+    public AuthenticationResult eachCheckLoginAndAuthentication(AccessControl accessControl);
+
+    public void initialize();
+}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AuthenticationInfoManagementAclPlugEngine.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AuthenticationInfoManagementAclPlugEngine.java
new file mode 100644
index 000000000..6aac6bd43
--- /dev/null
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AuthenticationInfoManagementAclPlugEngine.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug.engine;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.acl.plug.AccessContralAnalysis;
+import org.apache.rocketmq.acl.plug.Authentication;
+import org.apache.rocketmq.acl.plug.entity.AccessControl;
+import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
+import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
+import org.apache.rocketmq.acl.plug.entity.BorkerAccessControlTransport;
+import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
+import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
+import org.apache.rocketmq.acl.plug.strategy.NetaddressStrategy;
+import org.apache.rocketmq.acl.plug.strategy.NetaddressStrategyFactory;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPlugEngine {
+
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ACL_PLUG_LOGGER_NAME);
+    ControllerParameters controllerParameters;
+    private Map<String/** account **/, Map<String/** netaddress **/, AuthenticationInfo>> accessControlMap = new HashMap<>();
+    private AuthenticationInfo authenticationInfo;
+    private NetaddressStrategyFactory netaddressStrategyFactory = new NetaddressStrategyFactory();
+    private AccessContralAnalysis accessContralAnalysis = new AccessContralAnalysis();
+    private Authentication authentication = new Authentication();
+
+    public AuthenticationInfoManagementAclPlugEngine(ControllerParameters controllerParameters) {
+        this.controllerParameters = controllerParameters;
+        accessContralAnalysis.analysisClass(controllerParameters.getAccessContralAnalysisClass());
+    }
+
+    public void setAccessControl(AccessControl accessControl) throws AclPlugRuntimeException {
+        if (accessControl.getAccount() == null || accessControl.getPassword() == null || accessControl.getAccount().length() <= 6 || accessControl.getPassword().length() <= 6) {
+            throw new AclPlugRuntimeException(String.format("The account password cannot be null and is longer than 6, account is %s  password is %s", accessControl.getAccount(), accessControl.getPassword()));
+        }
+        try {
+            NetaddressStrategy netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
+            Map<String, AuthenticationInfo> accessControlAddressMap = accessControlMap.get(accessControl.getAccount());
+            if (accessControlAddressMap == null) {
+                accessControlAddressMap = new HashMap<>();
+                accessControlMap.put(accessControl.getAccount(), accessControlAddressMap);
+            }
+            AuthenticationInfo authenticationInfo = new AuthenticationInfo(accessContralAnalysis.analysis(accessControl), accessControl, netaddressStrategy);
+            accessControlAddressMap.put(accessControl.getNetaddress(), authenticationInfo);
+            log.info("authenticationInfo is {}", authenticationInfo.toString());
+        } catch (Exception e) {
+            throw new AclPlugRuntimeException(String.format("Exception info %s  %s" ,e.getMessage() , accessControl.toString()), e);
+        }
+    }
+
+    public void setAccessControlList(List<AccessControl> accessControlList) throws AclPlugRuntimeException {
+        for (AccessControl accessControl : accessControlList) {
+            setAccessControl(accessControl);
+        }
+    }
+
+    public void setNetaddressAccessControl(AccessControl accessControl) throws AclPlugRuntimeException {
+        try {
+            authenticationInfo = new AuthenticationInfo(accessContralAnalysis.analysis(accessControl), accessControl, netaddressStrategyFactory.getNetaddressStrategy(accessControl));
+            log.info("default authenticationInfo is {}", authenticationInfo.toString());
+        } catch (Exception e) {
+            throw new AclPlugRuntimeException(accessControl.toString(), e);
+        }
+
+    }
+
+    public AuthenticationInfo getAccessControl(AccessControl accessControl) {
+        AuthenticationInfo existing = null;
+        if (accessControl.getAccount() == null && authenticationInfo != null) {
+            existing = authenticationInfo.getNetaddressStrategy().match(accessControl) ? authenticationInfo : null;
+        } else {
+            Map<String, AuthenticationInfo> accessControlAddressMap = accessControlMap.get(accessControl.getAccount());
+            if (accessControlAddressMap != null) {
+                existing = accessControlAddressMap.get(accessControl.getNetaddress());
+                if (existing == null)
+                    return null;
+                if (existing.getAccessControl().getPassword().equals(accessControl.getPassword())) {
+                    if (existing.getNetaddressStrategy().match(accessControl)) {
+                        return existing;
+                    }
+                }
+                existing = null;
+            }
+        }
+        return existing;
+    }
+
+    @Override
+    public AuthenticationResult eachCheckLoginAndAuthentication(AccessControl accessControl) {
+        AuthenticationResult authenticationResult = new AuthenticationResult();
+        try {
+            AuthenticationInfo authenticationInfo = getAuthenticationInfo(accessControl, authenticationResult);
+            if (authenticationInfo != null) {
+                boolean boo = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
+                authenticationResult.setSucceed(boo);
+            }
+        } catch (Exception e) {
+            authenticationResult.setException(e);
+        }
+        return authenticationResult;
+    }
+
+    void setBorkerAccessControlTransport(BorkerAccessControlTransport transport) {
+        if (transport.getOnlyNetAddress() == null && (transport.getList() == null || transport.getList().size() == 0)) {
+            throw new AclPlugRuntimeException("onlyNetAddress and list  can't be all empty");
+        }
+
+        if (transport.getOnlyNetAddress() != null) {
+            this.setNetaddressAccessControl(transport.getOnlyNetAddress());
+        }
+        if (transport.getList() != null || transport.getList().size() > 0) {
+            for (AccessControl accessControl : transport.getList()) {
+                this.setAccessControl(accessControl);
+            }
+        }
+    }
+
+    protected abstract AuthenticationInfo getAuthenticationInfo(AccessControl accessControl,
+        AuthenticationResult authenticationResult);
+}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/LoginInfoAclPlugEngine.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/LoginInfoAclPlugEngine.java
new file mode 100644
index 000000000..e8dc59c42
--- /dev/null
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/LoginInfoAclPlugEngine.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug.engine;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.acl.plug.entity.AccessControl;
+import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
+import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
+import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
+import org.apache.rocketmq.acl.plug.entity.LoginInfo;
+
+public abstract class LoginInfoAclPlugEngine extends AuthenticationInfoManagementAclPlugEngine {
+
+    private Map<String, LoginInfo> loginInfoMap = new ConcurrentHashMap<>();
+
+    public LoginInfoAclPlugEngine(ControllerParameters controllerParameters) {
+        super(controllerParameters);
+    }
+
+    public LoginInfo getLoginInfo(AccessControl accessControl) {
+        LoginInfo loginInfo = loginInfoMap.get(accessControl.getRecognition());
+        if (loginInfo == null) {
+            AuthenticationInfo authenticationInfo = super.getAccessControl(accessControl);
+            if (authenticationInfo != null) {
+                loginInfo = new LoginInfo();
+                loginInfo.setAuthenticationInfo(authenticationInfo);
+                loginInfoMap.put(accessControl.getRecognition(), loginInfo);
+            }
+        }
+        if (loginInfo != null) {
+            loginInfo.setOperationTime(System.currentTimeMillis());
+        }
+        return loginInfo;
+    }
+
+    public void deleteLoginInfo(String remoteAddr) {
+        loginInfoMap.remove(remoteAddr);
+    }
+
+    protected AuthenticationInfo getAuthenticationInfo(AccessControl accessControl,
+        AuthenticationResult authenticationResult) {
+        LoginInfo loginInfo = getLoginInfo(accessControl);
+        if (loginInfo != null && loginInfo.getAuthenticationInfo() != null) {
+            return loginInfo.getAuthenticationInfo();
+        }
+        authenticationResult.setResultString("Login information does not exist, Please check login, password, IP");
+        return null;
+    }
+
+}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngine.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngine.java
new file mode 100644
index 000000000..d1a7d9529
--- /dev/null
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngine.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug.engine;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import org.apache.rocketmq.acl.plug.entity.BorkerAccessControlTransport;
+import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
+import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
+import org.yaml.snakeyaml.Yaml;
+
+public class PlainAclPlugEngine extends LoginInfoAclPlugEngine {
+
+    public PlainAclPlugEngine(
+        ControllerParameters controllerParameters) throws AclPlugRuntimeException {
+        super(controllerParameters);
+    }
+
+    public void initialize() throws AclPlugRuntimeException {
+        String filePath = controllerParameters.getFileHome() + "/conf/transport.yml";
+        Yaml ymal = new Yaml();
+        FileInputStream fis = null;
+        BorkerAccessControlTransport transport;
+        try {
+            fis = new FileInputStream(new File(filePath));
+            transport = ymal.loadAs(fis, BorkerAccessControlTransport.class);
+        } catch (Exception e) {
+            throw new AclPlugRuntimeException(String.format("The transport.yml file for Plain mode was not found , paths %s", filePath), e);
+        } finally {
+            if (fis != null) {
+                try {
+                    fis.close();
+                } catch (IOException e) {
+                    throw new AclPlugRuntimeException("close transport fileInputStream Exception", e);
+                }
+            }
+        }
+        if (transport == null) {
+            throw new AclPlugRuntimeException("transport.yml file  is no data");
+        }
+        super.setBorkerAccessControlTransport(transport);
+    }
+
+}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AccessControl.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AccessControl.java
new file mode 100644
index 000000000..cf3a736a7
--- /dev/null
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AccessControl.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug.entity;
+
+public class AccessControl {
+
+    private String account;
+
+    private String password;
+
+    private String netaddress;
+
+    private String recognition;
+
+    private int code;
+
+    private String topic;
+
+    public AccessControl() {
+    }
+
+    public String getAccount() {
+        return account;
+    }
+
+    public void setAccount(String account) {
+        this.account = account;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public String getNetaddress() {
+        return netaddress;
+    }
+
+    public void setNetaddress(String netaddress) {
+        this.netaddress = netaddress;
+    }
+
+    public String getRecognition() {
+        return recognition;
+    }
+
+    public void setRecognition(String recognition) {
+        this.recognition = recognition;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public void setCode(int code) {
+        this.code = code;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("AccessControl [account=").append(account).append(", password=").append(password)
+            .append(", netaddress=").append(netaddress).append(", recognition=").append(recognition)
+            .append(", code=").append(code).append(", topic=").append(topic).append("]");
+        return builder.toString();
+    }
+
+}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AuthenticationInfo.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AuthenticationInfo.java
new file mode 100644
index 000000000..981bef855
--- /dev/null
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AuthenticationInfo.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug.entity;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.rocketmq.acl.plug.strategy.NetaddressStrategy;
+
+public class AuthenticationInfo {
+
+    private AccessControl accessControl;
+
+    private NetaddressStrategy netaddressStrategy;
+
+    private Map<Integer, Boolean> authority;
+
+    public AuthenticationInfo(Map<Integer, Boolean> authority, AccessControl accessControl,
+        NetaddressStrategy netaddressStrategy) {
+        super();
+        this.authority = authority;
+        this.accessControl = accessControl;
+        this.netaddressStrategy = netaddressStrategy;
+    }
+
+    public AccessControl getAccessControl() {
+        return accessControl;
+    }
+
+    public void setAccessControl(AccessControl accessControl) {
+        this.accessControl = accessControl;
+    }
+
+    public NetaddressStrategy getNetaddressStrategy() {
+        return netaddressStrategy;
+    }
+
+    public void setNetaddressStrategy(NetaddressStrategy netaddressStrategy) {
+        this.netaddressStrategy = netaddressStrategy;
+    }
+
+    public Map<Integer, Boolean> getAuthority() {
+        return authority;
+    }
+
+    public void setAuthority(Map<Integer, Boolean> authority) {
+        this.authority = authority;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("AuthenticationInfo [accessControl=").append(accessControl).append(", netaddressStrategy=")
+            .append(netaddressStrategy).append(", authority={");
+        Iterator<Entry<Integer, Boolean>> it = authority.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<Integer, Boolean> e = it.next();
+            if (!e.getValue()) {
+                builder.append(e.getKey().toString()).append(":").append(e.getValue()).append(",");
+            }
+        }
+        builder.append("}]");
+        return builder.toString();
+    }
+
+}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AuthenticationResult.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AuthenticationResult.java
new file mode 100644
index 000000000..bef05cef0
--- /dev/null
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AuthenticationResult.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug.entity;
+
+public class AuthenticationResult {
+
+    private AccessControl accessControl;
+
+    private boolean succeed;
+
+    private Exception exception;
+
+    private String resultString;
+
+    public AccessControl getAccessControl() {
+        return accessControl;
+    }
+
+    public void setAccessControl(AccessControl accessControl) {
+        this.accessControl = accessControl;
+    }
+
+    public boolean isSucceed() {
+        return succeed;
+    }
+
+    public void setSucceed(boolean succeed) {
+        this.succeed = succeed;
+    }
+
+    public Exception getException() {
+        return exception;
+    }
+
+    public void setException(Exception exception) {
+        this.exception = exception;
+    }
+
+    public String getResultString() {
+        return resultString;
+    }
+
+    public void setResultString(String resultString) {
+        this.resultString = resultString;
+    }
+
+}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/BorkerAccessControl.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/BorkerAccessControl.java
new file mode 100644
index 000000000..d40fadfac
--- /dev/null
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/BorkerAccessControl.java
@@ -0,0 +1,663 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug.entity;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class BorkerAccessControl extends AccessControl {
+
+    private Set<String> permitSendTopic = new HashSet<>();
+    private Set<String> noPermitSendTopic = new HashSet<>();
+    private Set<String> permitPullTopic = new HashSet<>();
+    private Set<String> noPermitPullTopic = new HashSet<>();
+
+    private boolean sendMessage = true;
+
+    private boolean sendMessageV2 = true;
+
+    private boolean sendBatchMessage = true;
+
+    private boolean consumerSendMsgBack = true;
+
+    private boolean pullMessage = true;
+
+    private boolean queryMessage = true;
+
+    private boolean viewMessageById = true;
+
+    private boolean heartBeat = true;
+
+    private boolean unregisterClient = true;
+
+    private boolean checkClientConfig = true;
+
+    private boolean getConsumerListByGroup = true;
+
+    private boolean updateConsumerOffset = true;
+
+    private boolean queryConsumerOffset = true;
+
+    private boolean endTransaction = true;
+
+    private boolean updateAndCreateTopic = true;
+
+    private boolean deleteTopicInbroker = true;
+
+    private boolean getAllTopicConfig = true;
+
+    private boolean updateBrokerConfig = true;
+
+    private boolean getBrokerConfig = true;
+
+    private boolean searchOffsetByTimestamp = true;
+
+    private boolean getMaxOffset = true;
+
+    private boolean getMinOffset = true;
+
+    private boolean getEarliestMsgStoretime = true;
+
+    private boolean getBrokerRuntimeInfo = true;
+
+    private boolean lockBatchMQ = true;
+
+    private boolean unlockBatchMQ = true;
+
+    private boolean updateAndCreateSubscriptiongroup = true;
+
+    private boolean getAllSubscriptiongroupConfig = true;
+
+    private boolean deleteSubscriptiongroup = true;
+
+    private boolean getTopicStatsInfo = true;
+
+    private boolean getConsumerConnectionList = true;
+
+    private boolean getProducerConnectionList = true;
+
+    private boolean getConsumeStats = true;
+
+    private boolean getAllConsumerOffset = true;
+
+    private boolean getAllDelayOffset = true;
+
+    private boolean invokeBrokerToresetOffset = true;
+
+    private boolean queryTopicConsumeByWho = true;
+
+    private boolean registerFilterServer = true;
+
+    private boolean queryConsumeTimeSpan = true;
+
+    private boolean getSystemTopicListFromBroker = true;
+
+    private boolean cleanExpiredConsumequeue = true;
+
+    private boolean cleanUnusedTopic = true;
+
+    private boolean getConsumerRunningInfo = true;
+
+    private boolean queryCorrectionOffset = true;
+
+    private boolean consumeMessageDirectly = true;
+
+    private boolean cloneGroupOffset = true;
+
+    private boolean viewBrokerStatsData = true;
+
+    private boolean getBrokerConsumeStats = true;
+
+    private boolean queryConsumeQueue = true;
+
+    public BorkerAccessControl() {
+
+    }
+
+    public Set<String> getPermitSendTopic() {
+        return permitSendTopic;
+    }
+
+    public void setPermitSendTopic(Set<String> permitSendTopic) {
+        this.permitSendTopic = permitSendTopic;
+    }
+
+    public Set<String> getNoPermitSendTopic() {
+        return noPermitSendTopic;
+    }
+
+    public void setNoPermitSendTopic(Set<String> noPermitSendTopic) {
+        this.noPermitSendTopic = noPermitSendTopic;
+    }
+
+    public Set<String> getPermitPullTopic() {
+        return permitPullTopic;
+    }
+
+    public void setPermitPullTopic(Set<String> permitPullTopic) {
+        this.permitPullTopic = permitPullTopic;
+    }
+
+    public Set<String> getNoPermitPullTopic() {
+        return noPermitPullTopic;
+    }
+
+    public void setNoPermitPullTopic(Set<String> noPermitPullTopic) {
+        this.noPermitPullTopic = noPermitPullTopic;
+    }
+
+    public boolean isSendMessage() {
+        return sendMessage;
+    }
+
+    public void setSendMessage(boolean sendMessage) {
+        this.sendMessage = sendMessage;
+    }
+
+    public boolean isSendMessageV2() {
+        return sendMessageV2;
+    }
+
+    public void setSendMessageV2(boolean sendMessageV2) {
+        this.sendMessageV2 = sendMessageV2;
+    }
+
+    public boolean isSendBatchMessage() {
+        return sendBatchMessage;
+    }
+
+    public void setSendBatchMessage(boolean sendBatchMessage) {
+        this.sendBatchMessage = sendBatchMessage;
+    }
+
+    public boolean isConsumerSendMsgBack() {
+        return consumerSendMsgBack;
+    }
+
+    public void setConsumerSendMsgBack(boolean consumerSendMsgBack) {
+        this.consumerSendMsgBack = consumerSendMsgBack;
+    }
+
+    public boolean isPullMessage() {
+        return pullMessage;
+    }
+
+    public void setPullMessage(boolean pullMessage) {
+        this.pullMessage = pullMessage;
+    }
+
+    public boolean isQueryMessage() {
+        return queryMessage;
+    }
+
+    public void setQueryMessage(boolean queryMessage) {
+        this.queryMessage = queryMessage;
+    }
+
+    public boolean isViewMessageById() {
+        return viewMessageById;
+    }
+
+    public void setViewMessageById(boolean viewMessageById) {
+        this.viewMessageById = viewMessageById;
+    }
+
+    public boolean isHeartBeat() {
+        return heartBeat;
+    }
+
+    public void setHeartBeat(boolean heartBeat) {
+        this.heartBeat = heartBeat;
+    }
+
+    public boolean isUnregisterClient() {
+        return unregisterClient;
+    }
+
+    public void setUnregisterClient(boolean unregisterClient) {
+        this.unregisterClient = unregisterClient;
+    }
+
+    public boolean isCheckClientConfig() {
+        return checkClientConfig;
+    }
+
+    public void setCheckClientConfig(boolean checkClientConfig) {
+        this.checkClientConfig = checkClientConfig;
+    }
+
+    public boolean isGetConsumerListByGroup() {
+        return getConsumerListByGroup;
+    }
+
+    public void setGetConsumerListByGroup(boolean getConsumerListByGroup) {
+        this.getConsumerListByGroup = getConsumerListByGroup;
+    }
+
+    public boolean isUpdateConsumerOffset() {
+        return updateConsumerOffset;
+    }
+
+    public void setUpdateConsumerOffset(boolean updateConsumerOffset) {
+        this.updateConsumerOffset = updateConsumerOffset;
+    }
+
+    public boolean isQueryConsumerOffset() {
+        return queryConsumerOffset;
+    }
+
+    public void setQueryConsumerOffset(boolean queryConsumerOffset) {
+        this.queryConsumerOffset = queryConsumerOffset;
+    }
+
+    public boolean isEndTransaction() {
+        return endTransaction;
+    }
+
+    public void setEndTransaction(boolean endTransaction) {
+        this.endTransaction = endTransaction;
+    }
+
+    public boolean isUpdateAndCreateTopic() {
+        return updateAndCreateTopic;
+    }
+
+    public void setUpdateAndCreateTopic(boolean updateAndCreateTopic) {
+        this.updateAndCreateTopic = updateAndCreateTopic;
+    }
+
+    public boolean isDeleteTopicInbroker() {
+        return deleteTopicInbroker;
+    }
+
+    public void setDeleteTopicInbroker(boolean deleteTopicInbroker) {
+        this.deleteTopicInbroker = deleteTopicInbroker;
+    }
+
+    public boolean isGetAllTopicConfig() {
+        return getAllTopicConfig;
+    }
+
+    public void setGetAllTopicConfig(boolean getAllTopicConfig) {
+        this.getAllTopicConfig = getAllTopicConfig;
+    }
+
+    public boolean isUpdateBrokerConfig() {
+        return updateBrokerConfig;
+    }
+
+    public void setUpdateBrokerConfig(boolean updateBrokerConfig) {
+        this.updateBrokerConfig = updateBrokerConfig;
+    }
+
+    public boolean isGetBrokerConfig() {
+        return getBrokerConfig;
+    }
+
+    public void setGetBrokerConfig(boolean getBrokerConfig) {
+        this.getBrokerConfig = getBrokerConfig;
+    }
+
+    public boolean isSearchOffsetByTimestamp() {
+        return searchOffsetByTimestamp;
+    }
+
+    public void setSearchOffsetByTimestamp(boolean searchOffsetByTimestamp) {
+        this.searchOffsetByTimestamp = searchOffsetByTimestamp;
+    }
+
+    public boolean isGetMaxOffset() {
+        return getMaxOffset;
+    }
+
+    public void setGetMaxOffset(boolean getMinOffset) {
+        this.getMaxOffset = getMinOffset;
+    }
+
+    public boolean isGetMinOffset() {
+        return getMinOffset;
+    }
+
+    public void setGetMinOffset(boolean getMinOffset) {
+        this.getMinOffset = getMinOffset;
+    }
+
+    public boolean isGetEarliestMsgStoretime() {
+        return getEarliestMsgStoretime;
+    }
+
+    public void setGetEarliestMsgStoretime(boolean getEarliestMsgStoretime) {
+        this.getEarliestMsgStoretime = getEarliestMsgStoretime;
+    }
+
+    public boolean isGetBrokerRuntimeInfo() {
+        return getBrokerRuntimeInfo;
+    }
+
+    public void setGetBrokerRuntimeInfo(boolean getBrokerRuntimeInfo) {
+        this.getBrokerRuntimeInfo = getBrokerRuntimeInfo;
+    }
+
+    public boolean isLockBatchMQ() {
+        return lockBatchMQ;
+    }
+
+    public void setLockBatchMQ(boolean lockBatchMQ) {
+        this.lockBatchMQ = lockBatchMQ;
+    }
+
+    public boolean isUnlockBatchMQ() {
+        return unlockBatchMQ;
+    }
+
+    public void setUnlockBatchMQ(boolean unlockBatchMQ) {
+        this.unlockBatchMQ = unlockBatchMQ;
+    }
+
+    public boolean isUpdateAndCreateSubscriptiongroup() {
+        return updateAndCreateSubscriptiongroup;
+    }
+
+    public void setUpdateAndCreateSubscriptiongroup(boolean updateAndCreateSubscriptiongroup) {
+        this.updateAndCreateSubscriptiongroup = updateAndCreateSubscriptiongroup;
+    }
+
+    public boolean isGetAllSubscriptiongroupConfig() {
+        return getAllSubscriptiongroupConfig;
+    }
+
+    public void setGetAllSubscriptiongroupConfig(boolean getAllSubscriptiongroupConfig) {
+        this.getAllSubscriptiongroupConfig = getAllSubscriptiongroupConfig;
+    }
+
+    public boolean isDeleteSubscriptiongroup() {
+        return deleteSubscriptiongroup;
+    }
+
+    public void setDeleteSubscriptiongroup(boolean deleteSubscriptiongroup) {
+        this.deleteSubscriptiongroup = deleteSubscriptiongroup;
+    }
+
+    public boolean isGetTopicStatsInfo() {
+        return getTopicStatsInfo;
+    }
+
+    public void setGetTopicStatsInfo(boolean getTopicStatsInfo) {
+        this.getTopicStatsInfo = getTopicStatsInfo;
+    }
+
+    public boolean isGetConsumerConnectionList() {
+        return getConsumerConnectionList;
+    }
+
+    public void setGetConsumerConnectionList(boolean getConsumerConnectionList) {
+        this.getConsumerConnectionList = getConsumerConnectionList;
+    }
+
+    public boolean isGetProducerConnectionList() {
+        return getProducerConnectionList;
+    }
+
+    public void setGetProducerConnectionList(boolean getProducerConnectionList) {
+        this.getProducerConnectionList = getProducerConnectionList;
+    }
+
+    public boolean isGetConsumeStats() {
+        return getConsumeStats;
+    }
+
+    public void setGetConsumeStats(boolean getConsumeStats) {
+        this.getConsumeStats = getConsumeStats;
+    }
+
+    public boolean isGetAllConsumerOffset() {
+        return getAllConsumerOffset;
+    }
+
+    public void setGetAllConsumerOffset(boolean getAllConsumerOffset) {
+        this.getAllConsumerOffset = getAllConsumerOffset;
+    }
+
+    public boolean isGetAllDelayOffset() {
+        return getAllDelayOffset;
+    }
+
+    public void setGetAllDelayOffset(boolean getAllDelayOffset) {
+        this.getAllDelayOffset = getAllDelayOffset;
+    }
+
+    public boolean isInvokeBrokerToresetOffset() {
+        return invokeBrokerToresetOffset;
+    }
+
+    public void setInvokeBrokerToresetOffset(boolean invokeBrokerToresetOffset) {
+        this.invokeBrokerToresetOffset = invokeBrokerToresetOffset;
+    }
+
+    public boolean isQueryTopicConsumeByWho() {
+        return queryTopicConsumeByWho;
+    }
+
+    public void setQueryTopicConsumeByWho(boolean queryTopicConsumeByWho) {
+        this.queryTopicConsumeByWho = queryTopicConsumeByWho;
+    }
+
+    public boolean isRegisterFilterServer() {
+        return registerFilterServer;
+    }
+
+    public void setRegisterFilterServer(boolean registerFilterServer) {
+        this.registerFilterServer = registerFilterServer;
+    }
+
+    public boolean isQueryConsumeTimeSpan() {
+        return queryConsumeTimeSpan;
+    }
+
+    public void setQueryConsumeTimeSpan(boolean queryConsumeTimeSpan) {
+        this.queryConsumeTimeSpan = queryConsumeTimeSpan;
+    }
+
+    public boolean isGetSystemTopicListFromBroker() {
+        return getSystemTopicListFromBroker;
+    }
+
+    public void setGetSystemTopicListFromBroker(boolean getSystemTopicListFromBroker) {
+        this.getSystemTopicListFromBroker = getSystemTopicListFromBroker;
+    }
+
+    public boolean isCleanExpiredConsumequeue() {
+        return cleanExpiredConsumequeue;
+    }
+
+    public void setCleanExpiredConsumequeue(boolean cleanExpiredConsumequeue) {
+        this.cleanExpiredConsumequeue = cleanExpiredConsumequeue;
+    }
+
+    public boolean isCleanUnusedTopic() {
+        return cleanUnusedTopic;
+    }
+
+    public void setCleanUnusedTopic(boolean cleanUnusedTopic) {
+        this.cleanUnusedTopic = cleanUnusedTopic;
+    }
+
+    public boolean isGetConsumerRunningInfo() {
+        return getConsumerRunningInfo;
+    }
+
+    public void setGetConsumerRunningInfo(boolean getConsumerRunningInfo) {
+        this.getConsumerRunningInfo = getConsumerRunningInfo;
+    }
+
+    public boolean isQueryCorrectionOffset() {
+        return queryCorrectionOffset;
+    }
+
+    public void setQueryCorrectionOffset(boolean queryCorrectionOffset) {
+        this.queryCorrectionOffset = queryCorrectionOffset;
+    }
+
+    public boolean isConsumeMessageDirectly() {
+        return consumeMessageDirectly;
+    }
+
+    public void setConsumeMessageDirectly(boolean consumeMessageDirectly) {
+        this.consumeMessageDirectly = consumeMessageDirectly;
+    }
+
+    public boolean isCloneGroupOffset() {
+        return cloneGroupOffset;
+    }
+
+    public void setCloneGroupOffset(boolean cloneGroupOffset) {
+        this.cloneGroupOffset = cloneGroupOffset;
+    }
+
+    public boolean isViewBrokerStatsData() {
+        return viewBrokerStatsData;
+    }
+
+    public void setViewBrokerStatsData(boolean viewBrokerStatsData) {
+        this.viewBrokerStatsData = viewBrokerStatsData;
+    }
+
+    public boolean isGetBrokerConsumeStats() {
+        return getBrokerConsumeStats;
+    }
+
+    public void setGetBrokerConsumeStats(boolean getBrokerConsumeStats) {
+        this.getBrokerConsumeStats = getBrokerConsumeStats;
+    }
+
+    public boolean isQueryConsumeQueue() {
+        return queryConsumeQueue;
+    }
+
+    public void setQueryConsumeQueue(boolean queryConsumeQueue) {
+        this.queryConsumeQueue = queryConsumeQueue;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("BorkerAccessControl [permitSendTopic=").append(permitSendTopic).append(", noPermitSendTopic=")
+            .append(noPermitSendTopic).append(", permitPullTopic=").append(permitPullTopic)
+            .append(", noPermitPullTopic=").append(noPermitPullTopic);
+        if (!!sendMessage)
+            builder.append(", sendMessage=").append(sendMessage);
+        if (!!sendMessageV2)
+            builder.append(", sendMessageV2=").append(sendMessageV2);
+        if (!sendBatchMessage)
+            builder.append(", sendBatchMessage=").append(sendBatchMessage);
+        if (!consumerSendMsgBack)
+            builder.append(", consumerSendMsgBack=").append(consumerSendMsgBack);
+        if (!pullMessage)
+            builder.append(", pullMessage=").append(pullMessage);
+        if (!queryMessage)
+            builder.append(", queryMessage=").append(queryMessage);
+        if (!viewMessageById)
+            builder.append(", viewMessageById=").append(viewMessageById);
+        if (!heartBeat)
+            builder.append(", heartBeat=").append(heartBeat);
+        if (!unregisterClient)
+            builder.append(", unregisterClient=").append(unregisterClient);
+        if (!checkClientConfig)
+            builder.append(", checkClientConfig=").append(checkClientConfig);
+        if (!getConsumerListByGroup)
+            builder.append(", getConsumerListByGroup=").append(getConsumerListByGroup);
+        if (!updateConsumerOffset)
+            builder.append(", updateConsumerOffset=").append(updateConsumerOffset);
+        if (!queryConsumerOffset)
+            builder.append(", queryConsumerOffset=").append(queryConsumerOffset);
+        if (!endTransaction)
+            builder.append(", endTransaction=").append(endTransaction);
+        if (!updateAndCreateTopic)
+            builder.append(", updateAndCreateTopic=").append(updateAndCreateTopic);
+        if (!deleteTopicInbroker)
+            builder.append(", deleteTopicInbroker=").append(deleteTopicInbroker);
+        if (!getAllTopicConfig)
+            builder.append(", getAllTopicConfig=").append(getAllTopicConfig);
+        if (!updateBrokerConfig)
+            builder.append(", updateBrokerConfig=").append(updateBrokerConfig);
+        if (!getBrokerConfig)
+            builder.append(", getBrokerConfig=").append(getBrokerConfig);
+        if (!searchOffsetByTimestamp)
+            builder.append(", searchOffsetByTimestamp=").append(searchOffsetByTimestamp);
+        if (!getMaxOffset)
+            builder.append(", getMaxOffset=").append(getMaxOffset);
+        if (!getMinOffset)
+            builder.append(", getMixOffset=").append(getMinOffset);
+        if (!getEarliestMsgStoretime)
+            builder.append(", getEarliestMsgStoretime=").append(getEarliestMsgStoretime);
+        if (!getBrokerRuntimeInfo)
+            builder.append(", getBrokerRuntimeInfo=").append(getBrokerRuntimeInfo);
+        if (!lockBatchMQ)
+            builder.append(", lockBatchMQ=").append(lockBatchMQ);
+        if (!unlockBatchMQ)
+            builder.append(", unlockBatchMQ=").append(unlockBatchMQ);
+        if (!updateAndCreateSubscriptiongroup)
+            builder.append(", updateAndCreateSubscriptiongroup=").append(updateAndCreateSubscriptiongroup);
+        if (!getAllSubscriptiongroupConfig)
+            builder.append(", getAllSubscriptiongroupConfig=").append(getAllSubscriptiongroupConfig);
+        if (!deleteSubscriptiongroup)
+            builder.append(", deleteSubscriptiongroup=").append(deleteSubscriptiongroup);
+        if (!getTopicStatsInfo)
+            builder.append(", getTopicStatsInfo=").append(getTopicStatsInfo);
+        if (!getConsumerConnectionList)
+            builder.append(", getConsumerConnectionList=").append(getConsumerConnectionList);
+        if (!getProducerConnectionList)
+            builder.append(", getProducerConnectionList=").append(getProducerConnectionList);
+        if (!getConsumeStats)
+            builder.append(", getConsumeStats=").append(getConsumeStats);
+        if (!getAllConsumerOffset)
+            builder.append(", getAllConsumerOffset=").append(getAllConsumerOffset);
+        if (!getAllDelayOffset)
+            builder.append(", getAllDelayOffset=").append(getAllDelayOffset);
+        if (!invokeBrokerToresetOffset)
+            builder.append(", invokeBrokerToresetOffset=").append(invokeBrokerToresetOffset);
+        if (!queryTopicConsumeByWho)
+            builder.append(", queryTopicConsumeByWho=").append(queryTopicConsumeByWho);
+        if (!registerFilterServer)
+            builder.append(", registerFilterServer=").append(registerFilterServer);
+        if (!queryConsumeTimeSpan)
+            builder.append(", queryConsumeTimeSpan=").append(queryConsumeTimeSpan);
+        if (!getSystemTopicListFromBroker)
+            builder.append(", getSystemTopicListFromBroker=").append(getSystemTopicListFromBroker);
+        if (!cleanExpiredConsumequeue)
+            builder.append(", cleanExpiredConsumequeue=").append(cleanExpiredConsumequeue);
+        if (!getConsumerRunningInfo)
+            builder.append(", cleanUnusedTopic=").append(getConsumerRunningInfo);
+        if (!getConsumerRunningInfo)
+            builder.append(", getConsumerRunningInfo=").append(getConsumerRunningInfo);
+        if (!queryCorrectionOffset)
+            builder.append(", queryCorrectionOffset=").append(queryCorrectionOffset);
+        if (!consumeMessageDirectly)
+            builder.append(", consumeMessageDirectly=").append(consumeMessageDirectly);
+        if (!cloneGroupOffset)
+            builder.append(", cloneGroupOffset=").append(cloneGroupOffset);
+        if (!viewBrokerStatsData)
+            builder.append(", viewBrokerStatsData=").append(viewBrokerStatsData);
+        if (!getBrokerConsumeStats)
+            builder.append(", getBrokerConsumeStats=").append(getBrokerConsumeStats);
+        if (!queryConsumeQueue)
+            builder.append(", queryConsumeQueue=").append(queryConsumeQueue);
+        builder.append("]");
+        return builder.toString();
+    }
+
+}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/BorkerAccessControlTransport.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/BorkerAccessControlTransport.java
new file mode 100644
index 000000000..93d002315
--- /dev/null
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/BorkerAccessControlTransport.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug.entity;
+
+import java.util.List;
+
+public class BorkerAccessControlTransport {
+
+    private BorkerAccessControl onlyNetAddress;
+
+    private List<BorkerAccessControl> list;
+
+    public BorkerAccessControlTransport() {
+        super();
+    }
+
+    public BorkerAccessControl getOnlyNetAddress() {
+        return onlyNetAddress;
+    }
+
+    public void setOnlyNetAddress(BorkerAccessControl onlyNetAddress) {
+        this.onlyNetAddress = onlyNetAddress;
+    }
+
+    public List<BorkerAccessControl> getList() {
+        return list;
+    }
+
+    public void setList(List<BorkerAccessControl> list) {
+        this.list = list;
+    }
+
+    @Override
+    public String toString() {
+        return "BorkerAccessControlTransport [onlyNetAddress=" + onlyNetAddress + ", list=" + list + "]";
+    }
+
+}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/ControllerParameters.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/ControllerParameters.java
new file mode 100644
index 000000000..708bcbeb9
--- /dev/null
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/ControllerParameters.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug.entity;
+
+import org.apache.rocketmq.common.protocol.RequestCode;
+
+public class ControllerParameters {
+
+    private String fileHome;
+
+    private Class<?> accessContralAnalysisClass = RequestCode.class;
+
+    public String getFileHome() {
+        return fileHome;
+    }
+
+    public void setFileHome(String fileHome) {
+        this.fileHome = fileHome;
+    }
+
+    public Class<?> getAccessContralAnalysisClass() {
+        return accessContralAnalysisClass;
+    }
+
+    public void setAccessContralAnalysisClass(Class<?> accessContralAnalysisClass) {
+        this.accessContralAnalysisClass = accessContralAnalysisClass;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("ControllerParametersEntity [fileHome=").append(fileHome).append(", accessContralAnalysisClass=")
+            .append(accessContralAnalysisClass).append("]");
+        return builder.toString();
+    }
+
+}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/LoginInfo.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/LoginInfo.java
new file mode 100644
index 000000000..e08d7d38b
--- /dev/null
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/LoginInfo.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug.entity;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class LoginInfo {
+
+    private String recognition;
+
+    private long loginTime = System.currentTimeMillis();
+
+    private volatile long operationTime = loginTime;
+
+    private volatile AtomicBoolean clear = new AtomicBoolean();
+
+    private AuthenticationInfo authenticationInfo;
+
+    public AuthenticationInfo getAuthenticationInfo() {
+        return authenticationInfo;
+    }
+
+    public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
+        this.authenticationInfo = authenticationInfo;
+    }
+
+    public String getRecognition() {
+        return recognition;
+    }
+
+    public void setRecognition(String recognition) {
+        this.recognition = recognition;
+    }
+
+    public long getLoginTime() {
+        return loginTime;
+    }
+
+    public void setLoginTime(long loginTime) {
+        this.loginTime = loginTime;
+    }
+
+    public long getOperationTime() {
+        return operationTime;
+    }
+
+    public void setOperationTime(long operationTime) {
+        this.operationTime = operationTime;
+    }
+
+    public AtomicBoolean getClear() {
+        return clear;
+    }
+
+    public void setClear(AtomicBoolean clear) {
+        this.clear = clear;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("LoginInfo [recognition=").append(recognition).append(", loginTime=").append(loginTime)
+            .append(", operationTime=").append(operationTime).append(", clear=").append(clear)
+            .append(", authenticationInfo=").append(authenticationInfo).append("]");
+        return builder.toString();
+    }
+
+}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/exception/AclPlugRuntimeException.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/exception/AclPlugRuntimeException.java
new file mode 100644
index 000000000..0048b2c68
--- /dev/null
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/exception/AclPlugRuntimeException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug.exception;
+
+public class AclPlugRuntimeException extends RuntimeException {
+
+    private static final long serialVersionUID = 6062101368637228900L;
+
+    public AclPlugRuntimeException(String message) {
+        super(message);
+    }
+
+    public AclPlugRuntimeException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/strategy/NetaddressStrategy.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/strategy/NetaddressStrategy.java
new file mode 100644
index 000000000..7276634e3
--- /dev/null
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/strategy/NetaddressStrategy.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug.strategy;
+
+import org.apache.rocketmq.acl.plug.entity.AccessControl;
+
+public interface NetaddressStrategy {
+
+    public boolean match(AccessControl accessControl);
+}
diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/strategy/NetaddressStrategyFactory.java b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/strategy/NetaddressStrategyFactory.java
new file mode 100644
index 000000000..cdb78675e
--- /dev/null
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/strategy/NetaddressStrategyFactory.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug.strategy;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.plug.AclUtils;
+import org.apache.rocketmq.acl.plug.entity.AccessControl;
+import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
+
+public class NetaddressStrategyFactory {
+
+    public static final NullNetaddressStrategy NULL_NET_ADDRESS_STRATEGY = new NullNetaddressStrategy();
+
+    public NetaddressStrategy getNetaddressStrategy(AccessControl accessControl) {
+        String netaddress = accessControl.getNetaddress();
+        if (StringUtils.isBlank(netaddress) || "*".equals(netaddress)) {
+            return NULL_NET_ADDRESS_STRATEGY;
+        }
+        if (netaddress.endsWith("}")) {
+            String[] strArray = StringUtils.split(netaddress, ".");
+            String four = strArray[3];
+            if (!four.startsWith("{")) {
+                throw new AclPlugRuntimeException(String.format("MultipleNetaddressStrategy netaddress examine scope Exception netaddress", netaddress));
+            }
+            return new MultipleNetaddressStrategy(AclUtils.getAddreeStrArray(netaddress, four));
+        } else if (AclUtils.isColon(netaddress)) {
+            return new MultipleNetaddressStrategy(StringUtils.split(netaddress, ","));
+        } else if (AclUtils.isAsterisk(netaddress) || AclUtils.isMinus(netaddress)) {
+            return new RangeNetaddressStrategy(netaddress);
+        }
+        return new OneNetaddressStrategy(netaddress);
+
+    }
+
+    public static class NullNetaddressStrategy implements NetaddressStrategy {
+        @Override
+        public boolean match(AccessControl accessControl) {
+            return true;
+        }
+
+    }
+
+    public static class MultipleNetaddressStrategy implements NetaddressStrategy {
+
+        private final Set<String> multipleSet = new HashSet<>();
+
+        public MultipleNetaddressStrategy(String[] strArray) {
+            for (String netaddress : strArray) {
+                AclUtils.verify(netaddress, 4);
+                multipleSet.add(netaddress);
+            }
+        }
+
+        @Override
+        public boolean match(AccessControl accessControl) {
+            return multipleSet.contains(accessControl.getNetaddress());
+        }
+
+    }
+
+    public static class OneNetaddressStrategy implements NetaddressStrategy {
+
+        private String netaddress;
+
+        public OneNetaddressStrategy(String netaddress) {
+            this.netaddress = netaddress;
+            AclUtils.verify(netaddress, 4);
+        }
+
+        @Override
+        public boolean match(AccessControl accessControl) {
+            return netaddress.equals(accessControl.getNetaddress());
+        }
+
+    }
+
+    public static class RangeNetaddressStrategy implements NetaddressStrategy {
+
+        private String head;
+
+        private int start;
+
+        private int end;
+
+        private int index;
+
+        public RangeNetaddressStrategy(String netaddress) {
+            String[] strArray = StringUtils.split(netaddress, ".");
+            if (analysis(strArray, 2) || analysis(strArray, 3)) {
+                AclUtils.verify(netaddress, index - 1);
+                StringBuffer sb = new StringBuffer().append(strArray[0].trim()).append(".").append(strArray[1].trim()).append(".");
+                if (index == 3) {
+                    sb.append(strArray[2].trim()).append(".");
+                }
+                this.head = sb.toString();
+            }
+        }
+
+        private boolean analysis(String[] strArray, int index) {
+            String value = strArray[index].trim();
+            this.index = index;
+            if ("*".equals(value)) {
+                setValue(0, 255);
+            } else if (AclUtils.isMinus(value)) {
+                if (value.indexOf("-") == 0) {
+                    throw new AclPlugRuntimeException(String.format("RangeNetaddressStrategy netaddress examine scope Exception value %s ", value));
+
+                }
+                String[] valueArray = StringUtils.split(value, "-");
+                this.start = Integer.valueOf(valueArray[0]);
+                this.end = Integer.valueOf(valueArray[1]);
+                if (!(AclUtils.isScope(end) && AclUtils.isScope(start) && start <= end)) {
+                    throw new AclPlugRuntimeException(String.format("RangeNetaddressStrategy netaddress examine scope Exception start is %s , end is %s", start, end));
+                }
+            }
+            return this.end > 0 ? true : false;
+        }
+
+        private void setValue(int start, int end) {
+            this.start = start;
+            this.end = end;
+        }
+
+        @Override
+        public boolean match(AccessControl accessControl) {
+            String netAddress = accessControl.getNetaddress();
+            if (netAddress.startsWith(this.head)) {
+                String value;
+                if (index == 3) {
+                    value = netAddress.substring(this.head.length());
+                } else {
+                    value = netAddress.substring(this.head.length(), netAddress.lastIndexOf('.'));
+                }
+                Integer address = Integer.valueOf(value);
+                if (address >= this.start && address <= this.end) {
+                    return true;
+                }
+            }
+            return false;
+        }
+
+    }
+
+}
diff --git a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AccessContralAnalysisTest.java b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AccessContralAnalysisTest.java
new file mode 100644
index 000000000..b7896b13d
--- /dev/null
+++ b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AccessContralAnalysisTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.rocketmq.acl.plug.entity.AccessControl;
+import org.apache.rocketmq.acl.plug.entity.BorkerAccessControl;
+import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AccessContralAnalysisTest {
+
+    AccessContralAnalysis accessContralAnalysis = new AccessContralAnalysis();
+
+    @Before
+    public void init() {
+        accessContralAnalysis.analysisClass(RequestCode.class);
+    }
+
+    @Test
+    public void analysisTest() {
+        BorkerAccessControl accessControl = new BorkerAccessControl();
+        accessControl.setSendMessage(false);
+        Map<Integer, Boolean> map = accessContralAnalysis.analysis(accessControl);
+
+        Iterator<Entry<Integer, Boolean>> it = map.entrySet().iterator();
+        long num = 0;
+        while (it.hasNext()) {
+            Entry<Integer, Boolean> e = it.next();
+            if (!e.getValue()) {
+                Assert.assertEquals(e.getKey(), Integer.valueOf(10));
+                num++;
+            }
+        }
+        Assert.assertEquals(num, 1);
+    }
+
+    @Test(expected = AclPlugRuntimeException.class)
+    public void analysisExceptionTest() {
+        AccessControl accessControl = new AccessControl();
+        accessContralAnalysis.analysis(accessControl);
+    }
+
+}
diff --git a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclPlugControllerTest.java b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclPlugControllerTest.java
new file mode 100644
index 000000000..223cbc7c2
--- /dev/null
+++ b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclPlugControllerTest.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug;
+
+public class AclPlugControllerTest {
+
+}
diff --git a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclUtilsTest.java b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclUtilsTest.java
new file mode 100644
index 000000000..806d18089
--- /dev/null
+++ b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclUtilsTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class AclUtilsTest {
+
+    @Test
+    public void getAddreeStrArray() {
+        String address = "1.1.1.{1,2,3,4}";
+        String[] addressArray = AclUtils.getAddreeStrArray(address, "{1,2,3,4}");
+        List<String> newAddressList = new ArrayList<>();
+        for (String a : addressArray) {
+            newAddressList.add(a);
+        }
+
+        List<String> addressList = new ArrayList<>();
+        addressList.add("1.1.1.1");
+        addressList.add("1.1.1.2");
+        addressList.add("1.1.1.3");
+        addressList.add("1.1.1.4");
+        Assert.assertEquals(newAddressList, addressList);
+    }
+
+    @Test
+    public void isScopeStringArray() {
+        String adderss = "12";
+
+        for (int i = 0; i < 6; i++) {
+            boolean isScope = AclUtils.isScope(adderss, 4);
+            if (i == 3) {
+                Assert.assertTrue(isScope);
+            } else {
+                Assert.assertFalse(isScope);
+            }
+            adderss = adderss + ".12";
+        }
+    }
+
+    @Test
+    public void isScopeArray() {
+        String[] adderss = StringUtils.split("12.12.12.12", ".");
+        boolean isScope = AclUtils.isScope(adderss, 4);
+        Assert.assertTrue(isScope);
+        isScope = AclUtils.isScope(adderss, 3);
+        Assert.assertTrue(isScope);
+
+        adderss = StringUtils.split("12.12.1222.1222", ".");
+        isScope = AclUtils.isScope(adderss, 4);
+        Assert.assertFalse(isScope);
+        isScope = AclUtils.isScope(adderss, 3);
+        Assert.assertFalse(isScope);
+
+    }
+
+    @Test
+    public void isScopeStringTest() {
+        for (int i = 0; i < 256; i++) {
+            boolean isScope = AclUtils.isScope(i + "");
+            Assert.assertTrue(isScope);
+        }
+        boolean isScope = AclUtils.isScope("-1");
+        Assert.assertFalse(isScope);
+        isScope = AclUtils.isScope("256");
+        Assert.assertFalse(isScope);
+    }
+
+    @Test
+    public void isScopeTest() {
+        for (int i = 0; i < 256; i++) {
+            boolean isScope = AclUtils.isScope(i);
+            Assert.assertTrue(isScope);
+        }
+        boolean isScope = AclUtils.isScope(-1);
+        Assert.assertFalse(isScope);
+        isScope = AclUtils.isScope(256);
+        Assert.assertFalse(isScope);
+
+    }
+
+    @Test
+    public void isAsteriskTest() {
+        boolean isAsterisk = AclUtils.isAsterisk("*");
+        Assert.assertTrue(isAsterisk);
+
+        isAsterisk = AclUtils.isAsterisk(",");
+        Assert.assertFalse(isAsterisk);
+    }
+
+    @Test
+    public void isColonTest() {
+        boolean isColon = AclUtils.isColon(",");
+        Assert.assertTrue(isColon);
+
+        isColon = AclUtils.isColon("-");
+        Assert.assertFalse(isColon);
+    }
+
+    @Test
+    public void isMinusTest() {
+        boolean isMinus = AclUtils.isMinus("-");
+        Assert.assertTrue(isMinus);
+
+        isMinus = AclUtils.isMinus("*");
+        Assert.assertFalse(isMinus);
+    }
+}
diff --git a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AuthenticationTest.java b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AuthenticationTest.java
new file mode 100644
index 000000000..6e5d1444d
--- /dev/null
+++ b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AuthenticationTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.rocketmq.acl.plug.entity.AccessControl;
+import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
+import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
+import org.apache.rocketmq.acl.plug.entity.BorkerAccessControl;
+import org.apache.rocketmq.acl.plug.strategy.NetaddressStrategyFactory;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AuthenticationTest {
+
+    Authentication authentication = new Authentication();
+
+    AuthenticationInfo authenticationInfo;
+
+    BorkerAccessControl borkerAccessControl;
+
+    AuthenticationResult authenticationResult = new AuthenticationResult();
+    AccessControl accessControl = new AccessControl();
+
+    @Before
+    public void init() {
+        borkerAccessControl = new BorkerAccessControl();
+        //321
+        borkerAccessControl.setQueryConsumeQueue(false);
+
+        Set<String> permitSendTopic = new HashSet<>();
+        permitSendTopic.add("permitSendTopic");
+        borkerAccessControl.setPermitSendTopic(permitSendTopic);
+
+        Set<String> noPermitSendTopic = new HashSet<>();
+        noPermitSendTopic.add("noPermitSendTopic");
+        borkerAccessControl.setNoPermitSendTopic(noPermitSendTopic);
+
+        Set<String> permitPullTopic = new HashSet<>();
+        permitPullTopic.add("permitPullTopic");
+        borkerAccessControl.setPermitPullTopic(permitPullTopic);
+
+        Set<String> noPermitPullTopic = new HashSet<>();
+        noPermitPullTopic.add("noPermitPullTopic");
+        borkerAccessControl.setNoPermitPullTopic(noPermitPullTopic);
+
+        AccessContralAnalysis accessContralAnalysis = new AccessContralAnalysis();
+        accessContralAnalysis.analysisClass(RequestCode.class);
+        Map<Integer, Boolean> map = accessContralAnalysis.analysis(borkerAccessControl);
+
+        authenticationInfo = new AuthenticationInfo(map, borkerAccessControl, NetaddressStrategyFactory.NULL_NET_ADDRESS_STRATEGY);
+    }
+
+    @Test
+    public void authenticationTest() {
+
+        accessControl.setCode(317);
+
+        boolean isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
+        Assert.assertTrue(isReturn);
+
+        accessControl.setCode(321);
+        isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
+        Assert.assertFalse(isReturn);
+
+        accessControl.setCode(10);
+        accessControl.setTopic("permitSendTopic");
+        isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
+        Assert.assertTrue(isReturn);
+
+        accessControl.setCode(310);
+        isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
+        Assert.assertTrue(isReturn);
+
+        accessControl.setCode(320);
+        isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
+        Assert.assertTrue(isReturn);
+
+        accessControl.setTopic("noPermitSendTopic");
+        isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
+        Assert.assertFalse(isReturn);
+
+        accessControl.setTopic("nopermitSendTopic");
+        isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
+        Assert.assertFalse(isReturn);
+
+        accessControl.setCode(11);
+        accessControl.setTopic("permitPullTopic");
+        isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
+        Assert.assertTrue(isReturn);
+
+        accessControl.setTopic("noPermitPullTopic");
+        isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
+        Assert.assertFalse(isReturn);
+
+        accessControl.setTopic("nopermitPullTopic");
+        isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
+        Assert.assertFalse(isReturn);
+
+    }
+
+    @Test
+    public void isEmptyTest() {
+        accessControl.setCode(10);
+        accessControl.setTopic("absentTopic");
+        boolean isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
+        Assert.assertFalse(isReturn);
+
+        Set<String> permitSendTopic = new HashSet<>();
+        borkerAccessControl.setPermitSendTopic(permitSendTopic);
+        isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
+        Assert.assertTrue(isReturn);
+
+        accessControl.setCode(11);
+        isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
+        Assert.assertFalse(isReturn);
+
+        borkerAccessControl.setPermitPullTopic(permitSendTopic);
+        isReturn = authentication.authentication(authenticationInfo, accessControl, authenticationResult);
+        Assert.assertTrue(isReturn);
+    }
+
+}
diff --git a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngineTest.java b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngineTest.java
new file mode 100644
index 000000000..c925ef412
--- /dev/null
+++ b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngineTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug.engine;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.acl.plug.entity.AccessControl;
+import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
+import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
+import org.apache.rocketmq.acl.plug.entity.BorkerAccessControl;
+import org.apache.rocketmq.acl.plug.entity.BorkerAccessControlTransport;
+import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
+import org.apache.rocketmq.acl.plug.entity.LoginInfo;
+import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
+import org.apache.rocketmq.common.MixAll;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.internal.util.reflection.FieldSetter;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.yaml.snakeyaml.Yaml;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PlainAclPlugEngineTest {
+
+    PlainAclPlugEngine plainAclPlugEngine;
+
+    BorkerAccessControlTransport transport;
+
+    AccessControl accessControl;
+
+    AccessControl accessControlTwo;
+
+    Map<String, LoginInfo> loginInfoMap;
+
+    @Before
+    public void init() throws NoSuchFieldException, SecurityException, IOException {
+        Yaml ymal = new Yaml();
+        String home = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
+        InputStream fis = null;
+        if (home == null) {
+            URL url = PlainAclPlugEngineTest.class.getResource("/");
+            home = url.toString();
+            home = home.substring(0, home.length() - 1).replace("file:/", "").replace("target/test-classes", "");
+            home = home + "src/test/resources";
+            if (!new File(home + "/conf/transport.yml").exists()) {
+                home = "/home/travis/build/githublaohu/rocketmq/acl-plug/src/test/resources";
+            }
+        }
+        String filePath = home + "/conf/transport.yml";
+        try {
+	        fis = new FileInputStream(new File(filePath));
+	        transport = ymal.loadAs(fis, BorkerAccessControlTransport.class);
+        }catch(Exception e) {
+        	AccessControl accessControl = new BorkerAccessControl();
+            accessControl.setAccount("onlyNetAddress");
+            accessControl.setPassword("aliyun11");
+            accessControl.setNetaddress("127.0.0.1");
+            accessControl.setRecognition("127.0.0.1:1");
+
+            AccessControl accessControlTwo = new BorkerAccessControl();
+            accessControlTwo.setAccount("listTransport");
+            accessControlTwo.setPassword("aliyun1");
+            accessControlTwo.setNetaddress("127.0.0.1");
+            accessControlTwo.setRecognition("127.0.0.1:2");
+        	transport = new BorkerAccessControlTransport();
+        	transport.setOnlyNetAddress((BorkerAccessControl)accessControl);
+        	
+        }
+        ControllerParameters controllerParametersEntity = new ControllerParameters();
+        controllerParametersEntity.setFileHome(null);
+        try {
+            plainAclPlugEngine = new PlainAclPlugEngine(controllerParametersEntity);
+            plainAclPlugEngine.initialize();
+        } catch (Exception e) {
+
+        }
+        
+        accessControl = new BorkerAccessControl();
+        accessControl.setAccount("rokcetmq");
+        accessControl.setPassword("aliyun11");
+        accessControl.setNetaddress("127.0.0.1");
+        accessControl.setRecognition("127.0.0.1:1");
+
+        accessControlTwo = new BorkerAccessControl();
+        accessControlTwo.setAccount("rokcet1");
+        accessControlTwo.setPassword("aliyun1");
+        accessControlTwo.setNetaddress("127.0.0.1");
+        accessControlTwo.setRecognition("127.0.0.1:2");
+
+        loginInfoMap = new ConcurrentHashMap<>();
+        FieldSetter.setField(plainAclPlugEngine, plainAclPlugEngine.getClass().getSuperclass().getDeclaredField("loginInfoMap"), loginInfoMap);
+
+    }
+
+    @Test(expected = AclPlugRuntimeException.class)
+    public void accountNullTest() {
+        accessControl.setAccount(null);
+        plainAclPlugEngine.setAccessControl(accessControl);
+    }
+
+    @Test(expected = AclPlugRuntimeException.class)
+    public void accountThanTest() {
+        accessControl.setAccount("123");
+        plainAclPlugEngine.setAccessControl(accessControl);
+    }
+
+    @Test(expected = AclPlugRuntimeException.class)
+    public void passWordtNullTest() {
+        accessControl.setAccount(null);
+        plainAclPlugEngine.setAccessControl(accessControl);
+    }
+
+    @Test(expected = AclPlugRuntimeException.class)
+    public void passWordThanTest() {
+        accessControl.setAccount("123");
+        plainAclPlugEngine.setAccessControl(accessControl);
+    }
+
+    @Test(expected = AclPlugRuntimeException.class)
+    public void testPlainAclPlugEngineInit() {
+        ControllerParameters controllerParametersEntity = new ControllerParameters();
+        new PlainAclPlugEngine(controllerParametersEntity).initialize();
+
+    }
+
+    @Test
+    public void authenticationInfoOfSetAccessControl() {
+        AuthenticationInfoManagementAclPlugEngine aclPlugEngine = (AuthenticationInfoManagementAclPlugEngine) plainAclPlugEngine;
+        aclPlugEngine.setAccessControl(accessControl);
+
+        AuthenticationInfo authenticationInfo = aclPlugEngine.getAccessControl(accessControl);
+
+        AccessControl getAccessControl = authenticationInfo.getAccessControl();
+        Assert.assertEquals(accessControl, getAccessControl);
+
+        AccessControl testAccessControl = new AccessControl();
+        testAccessControl.setAccount("rokcetmq");
+        testAccessControl.setPassword("aliyun11");
+        testAccessControl.setNetaddress("127.0.0.1");
+        testAccessControl.setRecognition("127.0.0.1:1");
+
+        testAccessControl.setAccount("rokcetmq1");
+        authenticationInfo = aclPlugEngine.getAccessControl(testAccessControl);
+        Assert.assertNull(authenticationInfo);
+
+        testAccessControl.setAccount("rokcetmq");
+        testAccessControl.setPassword("1234567");
+        authenticationInfo = aclPlugEngine.getAccessControl(testAccessControl);
+        Assert.assertNull(authenticationInfo);
+
+        testAccessControl.setNetaddress("127.0.0.2");
+        authenticationInfo = aclPlugEngine.getAccessControl(testAccessControl);
+        Assert.assertNull(authenticationInfo);
+    }
+
+    @Test
+    public void setAccessControlList() {
+        List<AccessControl> accessControlList = new ArrayList<>();
+        accessControlList.add(accessControl);
+
+        accessControlList.add(accessControlTwo);
+
+        plainAclPlugEngine.setAccessControlList(accessControlList);
+
+        AuthenticationInfoManagementAclPlugEngine aclPlugEngine = (AuthenticationInfoManagementAclPlugEngine) plainAclPlugEngine;
+        AuthenticationInfo newAccessControl = aclPlugEngine.getAccessControl(accessControl);
+        Assert.assertEquals(accessControl, newAccessControl.getAccessControl());
+
+        newAccessControl = aclPlugEngine.getAccessControl(accessControlTwo);
+        Assert.assertEquals(accessControlTwo, newAccessControl.getAccessControl());
+
+    }
+
+    @Test
+    public void setNetaddressAccessControl() {
+        AuthenticationInfoManagementAclPlugEngine aclPlugEngine = (AuthenticationInfoManagementAclPlugEngine) plainAclPlugEngine;
+        AccessControl accessControl = new BorkerAccessControl();
+        accessControl.setAccount("RocketMQ");
+        accessControl.setPassword("RocketMQ");
+        accessControl.setNetaddress("127.0.0.1");
+        aclPlugEngine.setAccessControl(accessControl);
+        aclPlugEngine.setNetaddressAccessControl(accessControl);
+
+        AuthenticationInfo authenticationInfo = aclPlugEngine.getAccessControl(accessControl);
+
+        AccessControl getAccessControl = authenticationInfo.getAccessControl();
+        Assert.assertEquals(accessControl, getAccessControl);
+
+        accessControl.setNetaddress("127.0.0.2");
+        authenticationInfo = aclPlugEngine.getAccessControl(accessControl);
+        Assert.assertNull(authenticationInfo);
+    }
+
+    public void eachCheckLoginAndAuthentication() {
+
+    }
+
+    @Test(expected = AclPlugRuntimeException.class)
+    public void borkerAccessControlTransportTestNull() {
+        plainAclPlugEngine.setBorkerAccessControlTransport(new BorkerAccessControlTransport());
+    }
+
+    @Test
+    public void borkerAccessControlTransportTest() {
+        BorkerAccessControlTransport borkerAccessControlTransprt = new BorkerAccessControlTransport();
+        borkerAccessControlTransprt.setOnlyNetAddress((BorkerAccessControl) this.accessControl);
+        List<BorkerAccessControl> list = new ArrayList<>();
+        list.add((BorkerAccessControl) this.accessControlTwo);
+        borkerAccessControlTransprt.setList(list);
+        plainAclPlugEngine.setBorkerAccessControlTransport(borkerAccessControlTransprt);
+
+        AuthenticationInfoManagementAclPlugEngine aclPlugEngine = (AuthenticationInfoManagementAclPlugEngine) plainAclPlugEngine;
+        AccessControl accessControl = new BorkerAccessControl();
+        accessControl.setAccount("RocketMQ");
+        accessControl.setPassword("RocketMQ");
+        accessControl.setNetaddress("127.0.0.1");
+        aclPlugEngine.setAccessControl(accessControl);
+        AuthenticationInfo authenticationInfo = aclPlugEngine.getAccessControl(accessControl);
+        Assert.assertNotNull(authenticationInfo.getAccessControl());
+
+        authenticationInfo = aclPlugEngine.getAccessControl(accessControlTwo);
+        Assert.assertEquals(accessControlTwo, authenticationInfo.getAccessControl());
+
+    }
+
+    @Test
+    public void getLoginInfo() {
+        plainAclPlugEngine.setAccessControl(accessControl);
+        LoginInfo loginInfo = plainAclPlugEngine.getLoginInfo(accessControl);
+        Assert.assertNotNull(loginInfo);
+
+        loginInfo = plainAclPlugEngine.getLoginInfo(accessControlTwo);
+        Assert.assertNull(loginInfo);
+
+    }
+
+    @Test
+    public void deleteLoginInfo() {
+        plainAclPlugEngine.setAccessControl(accessControl);
+        plainAclPlugEngine.getLoginInfo(accessControl);
+
+        LoginInfo loginInfo = loginInfoMap.get(accessControl.getRecognition());
+        Assert.assertNotNull(loginInfo);
+
+        plainAclPlugEngine.deleteLoginInfo(accessControl.getRecognition());
+
+        loginInfo = loginInfoMap.get(accessControl.getRecognition());
+        Assert.assertNull(loginInfo);
+    }
+
+    @Test
+    public void getAuthenticationInfo() {
+        AccessControl newAccessControl = new AccessControl();
+        newAccessControl.setAccount("rokcetmq");
+        newAccessControl.setPassword("aliyun11");
+        newAccessControl.setNetaddress("127.0.0.1");
+        newAccessControl.setRecognition("127.0.0.1:1");
+
+        AuthenticationResult authenticationResult = new AuthenticationResult();
+        plainAclPlugEngine.getAuthenticationInfo(newAccessControl, authenticationResult);
+        Assert.assertEquals("Login information does not exist, Please check login, password, IP", authenticationResult.getResultString());
+
+        plainAclPlugEngine.setAccessControl(accessControl);
+        AuthenticationInfo authenticationInfo = plainAclPlugEngine.getAuthenticationInfo(newAccessControl, authenticationResult);
+        Assert.assertNotNull(authenticationInfo);
+
+    }
+}
diff --git a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/strategy/NetaddressStrategyTest.java b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/strategy/NetaddressStrategyTest.java
new file mode 100644
index 000000000..3f21b6788
--- /dev/null
+++ b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/strategy/NetaddressStrategyTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug.strategy;
+
+import org.apache.rocketmq.acl.plug.entity.AccessControl;
+import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class NetaddressStrategyTest {
+
+    NetaddressStrategyFactory netaddressStrategyFactory = new NetaddressStrategyFactory();
+
+    @Test
+    public void NetaddressStrategyFactoryTest() {
+        AccessControl accessControl = new AccessControl();
+        NetaddressStrategy netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
+        Assert.assertEquals(netaddressStrategy, NetaddressStrategyFactory.NULL_NET_ADDRESS_STRATEGY);
+
+        accessControl.setNetaddress("*");
+        netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
+        Assert.assertEquals(netaddressStrategy, NetaddressStrategyFactory.NULL_NET_ADDRESS_STRATEGY);
+
+        accessControl.setNetaddress("127.0.0.1");
+        netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
+        Assert.assertEquals(netaddressStrategy.getClass(), NetaddressStrategyFactory.OneNetaddressStrategy.class);
+
+        accessControl.setNetaddress("127.0.0.1,127.0.0.2,127.0.0.3");
+        netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
+        Assert.assertEquals(netaddressStrategy.getClass(), NetaddressStrategyFactory.MultipleNetaddressStrategy.class);
+
+        accessControl.setNetaddress("127.0.0.{1,2,3}");
+        netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
+        Assert.assertEquals(netaddressStrategy.getClass(), NetaddressStrategyFactory.MultipleNetaddressStrategy.class);
+
+        accessControl.setNetaddress("127.0.0.1-200");
+        netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
+        Assert.assertEquals(netaddressStrategy.getClass(), NetaddressStrategyFactory.RangeNetaddressStrategy.class);
+
+        accessControl.setNetaddress("127.0.0.*");
+        netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
+        Assert.assertEquals(netaddressStrategy.getClass(), NetaddressStrategyFactory.RangeNetaddressStrategy.class);
+
+        accessControl.setNetaddress("127.0.1-20.*");
+        netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
+        Assert.assertEquals(netaddressStrategy.getClass(), NetaddressStrategyFactory.RangeNetaddressStrategy.class);
+    }
+
+    @Test(expected = AclPlugRuntimeException.class)
+    public void verifyTest() {
+        AccessControl accessControl = new AccessControl();
+        accessControl.setNetaddress("127.0.0.1");
+        netaddressStrategyFactory.getNetaddressStrategy(accessControl);
+        accessControl.setNetaddress("256.0.0.1");
+        netaddressStrategyFactory.getNetaddressStrategy(accessControl);
+    }
+
+    @Test
+    public void nullNetaddressStrategyTest() {
+        boolean isMatch = NetaddressStrategyFactory.NULL_NET_ADDRESS_STRATEGY.match(new AccessControl());
+        Assert.assertTrue(isMatch);
+    }
+
+    public void oneNetaddressStrategyTest() {
+        AccessControl accessControl = new AccessControl();
+        accessControl.setNetaddress("127.0.0.1");
+        NetaddressStrategy netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
+        accessControl.setNetaddress("");
+        boolean match = netaddressStrategy.match(accessControl);
+        Assert.assertFalse(match);
+
+        accessControl.setNetaddress("127.0.0.2");
+        match = netaddressStrategy.match(accessControl);
+        Assert.assertFalse(match);
+
+        accessControl.setNetaddress("127.0.0.1");
+        match = netaddressStrategy.match(accessControl);
+        Assert.assertTrue(match);
+    }
+
+    @Test
+    public void multipleNetaddressStrategyTest() {
+        AccessControl accessControl = new AccessControl();
+        accessControl.setNetaddress("127.0.0.1,127.0.0.2,127.0.0.3");
+        NetaddressStrategy netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
+        multipleNetaddressStrategyTest(netaddressStrategy);
+
+        accessControl.setNetaddress("127.0.0.{1,2,3}");
+        netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
+        multipleNetaddressStrategyTest(netaddressStrategy);
+
+    }
+
+    @Test(expected = AclPlugRuntimeException.class)
+    public void multipleNetaddressStrategyExceptionTest() {
+        AccessControl accessControl = new AccessControl();
+        accessControl.setNetaddress("127.0.0.1,2,3}");
+        netaddressStrategyFactory.getNetaddressStrategy(accessControl);
+    }
+
+    private void multipleNetaddressStrategyTest(NetaddressStrategy netaddressStrategy) {
+        AccessControl accessControl = new AccessControl();
+        accessControl.setNetaddress("127.0.0.1");
+        boolean match = netaddressStrategy.match(accessControl);
+        Assert.assertTrue(match);
+
+        accessControl.setNetaddress("127.0.0.2");
+        match = netaddressStrategy.match(accessControl);
+        Assert.assertTrue(match);
+
+        accessControl.setNetaddress("127.0.0.3");
+        match = netaddressStrategy.match(accessControl);
+        Assert.assertTrue(match);
+
+        accessControl.setNetaddress("127.0.0.4");
+        match = netaddressStrategy.match(accessControl);
+        Assert.assertFalse(match);
+
+        accessControl.setNetaddress("127.0.0.0");
+        match = netaddressStrategy.match(accessControl);
+        Assert.assertFalse(match);
+
+    }
+
+    @Test
+    public void rangeNetaddressStrategyTest() {
+        String head = "127.0.0.";
+        AccessControl accessControl = new AccessControl();
+        accessControl.setNetaddress("127.0.0.1-200");
+        NetaddressStrategy netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
+        rangeNetaddressStrategyTest(netaddressStrategy, head, 1, 200, true);
+        accessControl.setNetaddress("127.0.0.*");
+        netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
+        rangeNetaddressStrategyTest(netaddressStrategy, head, 0, 255, true);
+
+        accessControl.setNetaddress("127.0.1-200.*");
+        netaddressStrategy = netaddressStrategyFactory.getNetaddressStrategy(accessControl);
+        rangeNetaddressStrategyThirdlyTest(netaddressStrategy, head, 1, 200);
+    }
+
+    private void rangeNetaddressStrategyTest(NetaddressStrategy netaddressStrategy, String head, int start, int end,
+        boolean isFalse) {
+        AccessControl accessControl = new AccessControl();
+        for (int i = -10; i < 300; i++) {
+            accessControl.setNetaddress(head + i);
+            boolean match = netaddressStrategy.match(accessControl);
+            if (isFalse && i >= start && i <= end) {
+                Assert.assertTrue(match);
+                continue;
+            }
+            Assert.assertFalse(match);
+
+        }
+    }
+
+    private void rangeNetaddressStrategyThirdlyTest(NetaddressStrategy netaddressStrategy, String head, int start,
+        int end) {
+        String newHead;
+        for (int i = -10; i < 300; i++) {
+            newHead = head + i;
+            if (i >= start && i <= end) {
+                rangeNetaddressStrategyTest(netaddressStrategy, newHead, 0, 255, false);
+            }
+        }
+    }
+
+    @Test(expected = AclPlugRuntimeException.class)
+    public void rangeNetaddressStrategyExceptionStartGreaterEndTest() {
+        rangeNetaddressStrategyExceptionTest("127.0.0.2-1");
+    }
+
+    @Test(expected = AclPlugRuntimeException.class)
+    public void rangeNetaddressStrategyExceptionScopeTest() {
+        rangeNetaddressStrategyExceptionTest("127.0.0.-1-200");
+    }
+
+    @Test(expected = AclPlugRuntimeException.class)
+    public void rangeNetaddressStrategyExceptionScopeTwoTest() {
+        rangeNetaddressStrategyExceptionTest("127.0.0.0-256");
+    }
+
+    private void rangeNetaddressStrategyExceptionTest(String netaddress) {
+        AccessControl accessControl = new AccessControl();
+        accessControl.setNetaddress(netaddress);
+        netaddressStrategyFactory.getNetaddressStrategy(accessControl);
+    }
+
+}
diff --git a/acl-plug/src/test/resources/conf/transport.yml b/acl-plug/src/test/resources/conf/transport.yml
new file mode 100644
index 000000000..99d26fd8e
--- /dev/null
+++ b/acl-plug/src/test/resources/conf/transport.yml
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+onlyNetAddress:
+  netaddress: 10.10.103.*
+  noPermitPullTopic:
+  - broker-a
+
+list:
+- account: RocketMQ
+  password: 1234567
+  netaddress: 192.0.0.*
+  permitSendTopic:
+  - test1
+  - test2
+- account: RocketMQ
+  password: 1234567
+  netaddress: 192.0.2.1
+  permitSendTopic:
+  - test3
+  - test4
+    
\ No newline at end of file
diff --git a/broker/pom.xml b/broker/pom.xml
index f10ae5373..c353eb32b 100644
--- a/broker/pom.xml
+++ b/broker/pom.xml
@@ -1,21 +1,17 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+	license agreements. See the NOTICE file distributed with this work for additional 
+	information regarding copyright ownership. The ASF licenses this file to 
+	You under the Apache License, Version 2.0 (the "License"); you may not use 
+	this file except in compliance with the License. You may obtain a copy of 
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+	by applicable law or agreed to in writing, software distributed under the 
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+	OF ANY KIND, either express or implied. See the License for the specific 
+	language governing permissions and limitations under the License. -->
 
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
@@ -52,6 +48,10 @@
             <groupId>${project.groupId}</groupId>
             <artifactId>rocketmq-filter</artifactId>
         </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-acl-plug</artifactId>
+        </dependency>
         <dependency>
             <groupId>ch.qos.logback</groupId>
             <artifactId>logback-classic</artifactId>
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index e7ef46d0c..c30d1f33e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -19,6 +19,7 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -31,6 +32,11 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.plug.AclPlugController;
+import org.apache.rocketmq.acl.plug.AclRemotingService;
+import org.apache.rocketmq.acl.plug.entity.AccessControl;
+import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
 import org.apache.rocketmq.broker.client.ClientHousekeepingService;
 import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
 import org.apache.rocketmq.broker.client.ConsumerManager;
@@ -91,6 +97,7 @@
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.remoting.netty.RequestTask;
 import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.srvutil.FileWatchService;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.MessageArrivingListener;
@@ -157,6 +164,8 @@
     private TransactionalMessageService transactionalMessageService;
     private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
 
+    private AclPlugController aclPlugController;
+
     public BrokerController(
         final BrokerConfig brokerConfig,
         final NettyServerConfig nettyServerConfig,
@@ -467,6 +476,7 @@ private void reloadServerSslContext() {
                 }
             }
             initialTransaction();
+            initialAclPlug();
         }
         return result;
     }
@@ -486,6 +496,47 @@ private void initialTransaction() {
         this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
     }
 
+    private void initialAclPlug() {
+        try {
+            if (!this.brokerConfig.isAclPlug()) {
+                log.info("Default does not start acl plug");
+                return;
+            }
+            ControllerParameters controllerParameters = new ControllerParameters();
+            controllerParameters.setFileHome(brokerConfig.getRocketmqHome());
+            aclPlugController = new AclPlugController(controllerParameters);
+            if (!aclPlugController.isStartSucceed()) {
+                log.error("start acl plug failure");
+                return;
+            }
+            final AclRemotingService aclRemotingService = aclPlugController.getAclRemotingService();
+            this.registerServerRPCHook(new RPCHook() {
+
+                @Override
+                public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
+                    HashMap<String, String> extFields = request.getExtFields();
+                    AccessControl accessControl = new AccessControl();
+                    accessControl.setCode(request.getCode());
+                    accessControl.setRecognition(remoteAddr);
+                    if (extFields != null) {
+                        accessControl.setAccount(extFields.get("account"));
+                        accessControl.setPassword(extFields.get("password"));
+                        accessControl.setNetaddress(StringUtils.split(remoteAddr, ":")[0]);
+                        accessControl.setTopic(extFields.get("topic"));
+                    }
+                    aclRemotingService.check(accessControl);
+                }
+
+                @Override
+                public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
+                }
+            });
+
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+        }
+    }
+
     public void registerProcessor() {
         /**
          * SendMessageProcessor
@@ -1049,7 +1100,12 @@ public void setTransactionalMessageCheckListener(
         this.transactionalMessageCheckListener = transactionalMessageCheckListener;
     }
 
+    public AclPlugController getAclPlugController() {
+        return this.aclPlugController;
+    }
+
     public BlockingQueue<Runnable> getEndTransactionThreadPoolQueue() {
         return endTransactionThreadPoolQueue;
+
     }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
index d536db505..f4ecc2c04 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
@@ -72,6 +72,9 @@ public void onChannelClose(String remoteAddr, Channel channel) {
         this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
         this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
         this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
+        if (this.brokerController.getAclPlugController() != null && this.brokerController.getAclPlugController().isStartSucceed()) {
+            this.brokerController.getAclPlugController().doChannelCloseEvent(remoteAddr);
+        }
     }
 
     @Override
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index f81af2165..6e11de20f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -171,6 +171,18 @@
     @ImportantField
     private long transactionCheckInterval = 60 * 1000;
 
+    private boolean isAclPlug;
+
+    public static String localHostName() {
+        try {
+            return InetAddress.getLocalHost().getHostName();
+        } catch (UnknownHostException e) {
+            log.error("Failed to obtain the host name", e);
+        }
+
+        return "DEFAULT_BROKER";
+    }
+
     public boolean isTraceOn() {
         return traceOn;
     }
@@ -235,16 +247,6 @@ public void setSlaveReadEnable(final boolean slaveReadEnable) {
         this.slaveReadEnable = slaveReadEnable;
     }
 
-    public static String localHostName() {
-        try {
-            return InetAddress.getLocalHost().getHostName();
-        } catch (UnknownHostException e) {
-            log.error("Failed to obtain the host name", e);
-        }
-
-        return "DEFAULT_BROKER";
-    }
-
     public int getRegisterBrokerTimeoutMills() {
         return registerBrokerTimeoutMills;
     }
@@ -709,6 +711,14 @@ public void setTransactionCheckInterval(long transactionCheckInterval) {
         this.transactionCheckInterval = transactionCheckInterval;
     }
 
+    public boolean isAclPlug() {
+        return isAclPlug;
+    }
+
+    public void setAclPlug(boolean isAclPlug) {
+        this.isAclPlug = isAclPlug;
+    }
+
     public int getEndTransactionThreadPoolNums() {
         return endTransactionThreadPoolNums;
     }
@@ -732,4 +742,5 @@ public long getWaitTimeMillsInTransactionQueue() {
     public void setWaitTimeMillsInTransactionQueue(long waitTimeMillsInTransactionQueue) {
         this.waitTimeMillsInTransactionQueue = waitTimeMillsInTransactionQueue;
     }
+
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
index 12070ddc3..46a6e45ab 100644
--- a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
+++ b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
@@ -36,4 +36,5 @@
     public static final String PROTECTION_LOGGER_NAME = "RocketmqProtection";
     public static final String WATER_MARK_LOGGER_NAME = "RocketmqWaterMark";
     public static final String FILTER_LOGGER_NAME = "RocketmqFilter";
+    public static final String ACL_PLUG_LOGGER_NAME = "RocketmqAclPlug";
 }
diff --git a/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java b/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java
index 0d2dec6fa..8d86544be 100644
--- a/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java
@@ -17,14 +17,13 @@
 
 package org.apache.rocketmq.common;
 
-import org.junit.Test;
-
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.InetAddress;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
+import org.junit.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
diff --git a/distribution/conf/broker.conf b/distribution/conf/broker.conf
index 0c0b28b7b..363bcbc03 100644
--- a/distribution/conf/broker.conf
+++ b/distribution/conf/broker.conf
@@ -20,3 +20,5 @@ deleteWhen = 04
 fileReservedTime = 48
 brokerRole = ASYNC_MASTER
 flushDiskType = ASYNC_FLUSH
+aclPlug=true
+namesrvAddr=127.0.0.1:9876
diff --git a/distribution/conf/transport.yml b/distribution/conf/transport.yml
new file mode 100644
index 000000000..25d4902a6
--- /dev/null
+++ b/distribution/conf/transport.yml
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+onlyNetAddress:
+  netaddress: 10.10.103.*
+  noPermitPullTopic:
+    - broker-a
+
+list:
+  - account: RocketMQ
+    password: 1234567
+    netaddress: 192.0.0.*
+    permitSendTopic:
+      - test1
+      - test2
+  - account: RocketMQ
+    password: 1234567
+    netaddress: 192.0.2.1
+    permitSendTopic:
+      - test3
+      - test4
+    
\ No newline at end of file
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
index efffa36d5..8aec7e309 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
@@ -29,10 +29,10 @@
 
     public static void main(String[] args) throws MQClientException {
         DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
-
+        consumer.setNamesrvAddr("127.0.0.1:9876");
         consumer.start();
 
-        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
+        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("broker-a");
         for (MessageQueue mq : mqs) {
             System.out.printf("Consume from the queue: %s%n", mq);
             SINGLE_MQ:
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumerTest.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumerTest.java
index 16108b8c6..f12595a90 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumerTest.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumerTest.java
@@ -24,6 +24,7 @@
 public class PullConsumerTest {
     public static void main(String[] args) throws MQClientException {
         DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
+        consumer.setNamesrvAddr("127.0.0.1:9876");
         consumer.start();
 
         try {
diff --git a/pom.xml b/pom.xml
index 9378dac54..535893c21 100644
--- a/pom.xml
+++ b/pom.xml
@@ -16,7 +16,8 @@
   limitations under the License.
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 
     <parent>
         <groupId>org.apache</groupId>
@@ -125,6 +126,7 @@
         <module>distribution</module>
         <module>openmessaging</module>
         <module>logging</module>
+        <module>acl-plug</module>
     </modules>
 
     <build>
@@ -157,7 +159,7 @@
                 </executions>
                 <configuration>
                     <rules>
-                        <banCircularDependencies />
+                        <banCircularDependencies/>
                     </rules>
                     <fail>true</fail>
                 </configuration>
@@ -521,6 +523,11 @@
                 <artifactId>rocketmq-example</artifactId>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>${project.groupId}</groupId>
+                <artifactId>rocketmq-acl-plug</artifactId>
+                <version>${project.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.slf4j</groupId>
                 <artifactId>slf4j-api</artifactId>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services