You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/08/21 03:17:33 UTC

[GitHub] vongosling closed pull request #412: translate And Acl control-First group

vongosling closed pull request #412: translate And Acl control-First group
URL: https://github.com/apache/rocketmq/pull/412
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/acl/dao/AclDao.java b/broker/src/main/java/org/apache/rocketmq/broker/acl/dao/AclDao.java
new file mode 100644
index 000000000..e7150dcde
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/acl/dao/AclDao.java
@@ -0,0 +1,135 @@
+package org.apache.rocketmq.broker.acl.dao;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.broker.acl.domain.AclDomain;
+import org.apache.rocketmq.broker.acl.domain.OpsEnum;
+import org.apache.rocketmq.common.TopicConfig;
+
+import java.io.*;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class AclDao {
+
+    private static final String sysfilePath = "../../../../../../../../../";
+    public AclDao() {
+    }
+
+    public static boolean add(AclDomain aclDomain){
+        if(null == aclDomain)return false;
+        String topicId = aclDomain.getTopicId();
+        OpsEnum opsEnum = aclDomain.getType();
+        if(StringUtils.isEmpty(topicId)|| null == opsEnum) return false;
+        String key = opsEnum.getCode()+","+topicId;
+        TopicConfig.aclMap.put(key,aclDomain);
+        return writeLog(aclDomain,"add");
+    }
+
+    public static boolean update(AclDomain aclDomain){
+        if(null == aclDomain)return false;
+        String topicId = aclDomain.getTopicId();
+        OpsEnum opsEnum = aclDomain.getType();
+        if(StringUtils.isEmpty(topicId)|| null == opsEnum) return false;
+        String key = opsEnum.getCode()+","+topicId;
+        AclDomain tmp = TopicConfig.aclMap.get(key);
+        if(null!=aclDomain.getGroupId()){
+            tmp.setGroupId(aclDomain.getGroupId());
+        }
+        if(null != aclDomain.getUserIds()){
+            tmp.setUserIds(aclDomain.getUserIds());
+        }
+        if(null!=aclDomain.getStatus()){
+            tmp.setStatus(aclDomain.getStatus());
+        }
+        TopicConfig.aclMap.put(key,tmp);
+        return writeLog(tmp,"update");
+    }
+
+/*    public static boolean deleteByTopicId(String topicId){
+
+    }*/
+
+    public static Set<String> getUserIDByTopicId(String topicId,OpsEnum opsEnum){
+        Set<String> res = new HashSet<>();
+        if(StringUtils.isEmpty(topicId)|| null == opsEnum) return res;
+        String key = opsEnum.getCode()+","+topicId;
+        res = TopicConfig.aclMap.get(key).getUserIds();
+        return res;
+    }
+
+    private static boolean writeLog(AclDomain domain,String opsType){
+//TODO
+        String path = "";//读取配置文件 获取log存放位置
+        //TODO
+        //获取log文件,没有就创建
+        File file = new File(path);
+
+        //以追加的方式加入log文件末尾
+        String content = opsType+":"+domain.toString();
+        BufferedWriter out = null;
+        try {
+            if(!file.exists()){
+                file.createNewFile();
+            }
+            out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file,true)));
+            out.write(content);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }finally {
+            try {
+                if(out != null){
+                    out.close();
+                }
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+        return true;
+    }
+
+    public static boolean recoverFromLog(){
+        String path = sysfilePath+ "authLog.txt";//读取配置文件 获取log存放位置
+        StringBuffer sb = new StringBuffer();
+        String line = null;
+        try {
+            BufferedReader in = new BufferedReader(new FileReader(path));
+            line = in.readLine();
+            while(line!=null) {
+                if (!StringUtils.isEmpty(line)) {
+                    String[] opsStr = line.split(":");
+                    switch (opsStr[0]) {
+                        case "add":
+                            add(getAclDomainFromString(opsStr[1]));
+                            break;
+                        case "update":
+                            update(getAclDomainFromString(opsStr[1]));
+                            break;
+                        default:
+                    }
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return true;
+    }
+
+    private static AclDomain getAclDomainFromString(String str){
+        AclDomain aclDomain =  new AclDomain();
+        if(StringUtils.isEmpty(str))return aclDomain;
+        String[] strs = str.split(",");
+        aclDomain.setTopicId(strs[0]);
+        aclDomain.setGroupId(strs[1]);
+        aclDomain.setType(OpsEnum.getOpsEnum(Integer.valueOf(strs[3])));
+        aclDomain.setStatus(Integer.parseInt(strs[4]));
+        String ids = strs[2].substring(1,strs[2].length()-1);
+        Set<String> userIds = new HashSet<>();
+        for(String id:ids.split(",")){
+            userIds.add(id);
+        }
+        aclDomain.setUserIds(userIds);
+        return aclDomain;
+    }
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/acl/dao/UserDao.java b/broker/src/main/java/org/apache/rocketmq/broker/acl/dao/UserDao.java
new file mode 100644
index 000000000..568fc460e
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/acl/dao/UserDao.java
@@ -0,0 +1,57 @@
+package org.apache.rocketmq.broker.acl.dao;
+
+import ch.qos.logback.core.util.FileUtil;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.rocketmq.broker.acl.domain.AclDomain;
+import org.apache.rocketmq.broker.acl.domain.UserDomain;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author ycc
+ * @date 2018/08/13
+ */
+public class UserDao {
+
+    public List<UserDomain> readUserInfo() {
+        String path = getClass().getClassLoader().getResource("userInfo.json").toString();
+        JSONArray jsonArray = null;
+        StringBuffer sb = new StringBuffer();
+        String line = null;
+        try {
+            BufferedReader in = new BufferedReader(new FileReader(path));
+            line = in.readLine();
+            while (line != null) {
+                sb.append(line);
+            }
+            JSONObject jsonObject = JSONObject.parseObject(sb.toString());
+            if (jsonObject != null) {
+                jsonArray = jsonObject.getJSONArray("list");
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        List<UserDomain> result = new ArrayList<>();
+        if (jsonArray == null) {
+            return result;
+        }
+        for (int i = 0; i < jsonArray.size(); i++) {
+            UserDomain object = (UserDomain)jsonArray.get(i);
+            result.add(object);
+        }
+        return result;
+    }
+
+    public boolean isValidate(UserDomain userDomain) {
+        List<UserDomain> result = readUserInfo();
+        if (result != null && result.contains(userDomain)) {
+            return true;
+        }
+        return false;
+    }
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/acl/domain/AclDomain.java b/broker/src/main/java/org/apache/rocketmq/broker/acl/domain/AclDomain.java
new file mode 100644
index 000000000..fd0d84bf9
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/acl/domain/AclDomain.java
@@ -0,0 +1,91 @@
+package org.apache.rocketmq.broker.acl.domain;
+
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+    *
+    *@author ycc
+    *@date 2018/08/13
+ */
+
+public class AclDomain {
+    private String topicId;
+    private String groupId;
+    private Set<String> userIds;
+    private OpsEnum type;
+    /**
+     * 0 有效
+     * -1 无效
+     */
+    private Integer status;
+
+    public AclDomain(String topicId, String groupId, Set<String> userIds, OpsEnum type, Integer status) {
+        this.topicId = topicId;
+        this.groupId = groupId;
+        this.userIds = userIds;
+        this.type = type;
+        this.status = status;
+    }
+
+    public AclDomain() {
+    }
+
+    public String getTopicId() {
+        return topicId;
+    }
+
+    public void setTopicId(String topicId) {
+        this.topicId = topicId;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
+    public Set<String> getUserIds() {
+        return userIds;
+    }
+
+    public void setUserIds(Set<String> userIds) {
+        this.userIds = userIds;
+    }
+
+    public OpsEnum getType() {
+        return type;
+    }
+
+    public void setType(OpsEnum type) {
+        this.type = type;
+    }
+
+    public Integer getStatus() {
+        return status;
+    }
+
+    public void setStatus(Integer status) {
+        this.status = status;
+    }
+
+    @Override
+    public String toString() {
+        if(null!=userIds) {
+            Iterator<String> it = userIds.iterator();
+            StringBuilder ids = new StringBuilder();
+            while(it.hasNext()){
+                ids.append(it.next());
+                ids.append(",");
+            }
+            ids.deleteCharAt(ids.length()-1);
+            return topicId + "," + groupId + "," + "["+
+
+                    ids+"]" + "," + type.code + "," + status;
+        }else{
+            return topicId + "," + groupId + "," + userIds + "," + type.code + "," + status;
+        }
+    }
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/acl/domain/OpsEnum.java b/broker/src/main/java/org/apache/rocketmq/broker/acl/domain/OpsEnum.java
new file mode 100644
index 000000000..d242b00db
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/acl/domain/OpsEnum.java
@@ -0,0 +1,57 @@
+package org.apache.rocketmq.broker.acl.domain;
+
+/**
+ * @author ycc
+ * @date 2018/08/13
+ */
+
+public enum  OpsEnum {
+
+    AUTH_READ(1,"读权限"),
+
+    AUTH_WRITE(2,"写权限"),
+
+    AUTH_UPDATE(3,"修改权限"),
+
+    AUTH_DEL(4,"删除权限"),
+
+    AUTH_CREATE(5,"添加操作者权限"),
+
+    AUTH_OWNER(0,"所有者")
+    ;
+    Integer code;
+    String msg;
+
+    private OpsEnum(Integer code,String msg) {
+        this.code = code;
+        this.msg =msg;
+    }
+
+    public static OpsEnum getOpsEnum(Integer code) {
+        if (code == null) {
+            return null;
+        }
+        for (OpsEnum ops : OpsEnum.values()) {
+            if (ops.code.equals(code)) {
+                return ops;
+            }
+        }
+        return null;
+    }
+
+    public Integer getCode() {
+        return code;
+    }
+
+    public void setCode(Integer code) {
+        this.code = code;
+    }
+
+    public String getMsg() {
+        return msg;
+    }
+
+    public void setMsg(String msg) {
+        this.msg = msg;
+    }
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/acl/domain/UserDomain.java b/broker/src/main/java/org/apache/rocketmq/broker/acl/domain/UserDomain.java
new file mode 100644
index 000000000..ddeb645b3
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/acl/domain/UserDomain.java
@@ -0,0 +1,43 @@
+package org.apache.rocketmq.broker.acl.domain;
+/**
+ *@author ycc
+ *@date 2018/08/13
+ */
+public class UserDomain {
+    private Long id;
+    private String passWord;
+
+    public UserDomain(Long id, String passWord) {
+        this.id = id;
+        this.passWord = passWord;
+    }
+
+    public Long getId() {
+        return id;
+    }
+
+    public void setId(Long id) {
+        this.id = id;
+    }
+
+    public String getPassWord() {
+        return passWord;
+    }
+
+    public void setPassWord(String passWord) {
+        this.passWord = passWord;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof UserDomain) {
+            UserDomain person= (UserDomain) obj;
+            if (id.equals(person.getId()) && passWord.equals(person.getPassWord())) {
+                return true;
+            }
+            return false;
+        }
+        return false;
+    }
+}
+
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/acl/service/AclService.java b/broker/src/main/java/org/apache/rocketmq/broker/acl/service/AclService.java
new file mode 100644
index 000000000..0111f0078
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/acl/service/AclService.java
@@ -0,0 +1,39 @@
+package org.apache.rocketmq.broker.acl.service;
+
+import java.util.Set;
+
+/**
+    *
+    *@author ycc
+    *@date 2018/08/13
+    */
+
+public interface AclService {
+
+    public boolean createAcl(String topicId,String groupId,String userId,String password);
+
+    public boolean addReadAcl(String userId,String password, String topicId,String targetUserId);
+
+    public boolean addWriteAcl(String userId, String password, String topicId,String targetUserId);
+
+    public boolean delReadAcl(String userId, String password,String topicId,String targetUserId);
+
+    public boolean delWriteAcl(String userId,String password, String topicId,String targetUserId);
+
+  //  public boolean updateAcl(String oldTopicId,String newTopicId);
+
+    public boolean delTopic(String userId,String password,String topicId);
+
+    public Set<String> getAllReadUserId(String topicId);
+
+    public Set<String> getAllWriteUserId(String topicId);
+
+    public String getAllOwnerUserId(String topicId);
+
+    public boolean canRead(String topicId,String userId);
+
+    public boolean canWrite(String topicId,String userId);
+
+    public boolean isOwner(String topicId,String userId);
+
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/acl/serviceImpl/AclServiceImpl.java b/broker/src/main/java/org/apache/rocketmq/broker/acl/serviceImpl/AclServiceImpl.java
new file mode 100644
index 000000000..da290782e
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/acl/serviceImpl/AclServiceImpl.java
@@ -0,0 +1,137 @@
+package org.apache.rocketmq.broker.acl.serviceImpl;
+
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.broker.acl.dao.AclDao;
+import org.apache.rocketmq.broker.acl.dao.UserDao;
+import org.apache.rocketmq.broker.acl.domain.AclDomain;
+import org.apache.rocketmq.broker.acl.domain.OpsEnum;
+import org.apache.rocketmq.broker.acl.domain.UserDomain;
+import org.apache.rocketmq.broker.acl.service.AclService;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ *
+ *@author ycc
+ *@date 2018/08/13
+ */
+public class AclServiceImpl implements AclService {
+
+    private UserDao userDao = new UserDao();
+
+    @Override
+    public boolean createAcl(String topicId, String groupId, String userId, String password) {
+        if(!userDao.isValidate(new UserDomain(Long.parseLong(userId),password)))return false;
+        AclDomain aclDomain = new AclDomain(topicId,groupId,new HashSet<String>(), OpsEnum.AUTH_OWNER,0);
+
+        return AclDao.add(aclDomain);
+    }
+
+    @Override
+    public boolean addReadAcl(String userId,String password,String topicId,String targetUserId) {
+        if(!userDao.isValidate(new UserDomain(Long.parseLong(userId),password)))return false;
+        if(!isOwner(topicId,userId))return false;
+        String key = OpsEnum.AUTH_READ.getCode()+","+topicId;
+        AclDomain tmp = TopicConfig.aclMap.get(key);
+        tmp.getUserIds().add(userId);
+        return AclDao.update(tmp);
+    }
+
+    @Override
+    public boolean addWriteAcl(String userId,String password,String topicId,String targetUserId) {
+        if(!userDao.isValidate(new UserDomain(Long.parseLong(userId),password)))return false;
+        if(!isOwner(topicId,userId))return false;
+        String key = OpsEnum.AUTH_WRITE.getCode()+","+topicId;
+        AclDomain tmp = TopicConfig.aclMap.get(key);
+        tmp.getUserIds().add(userId);
+        return AclDao.update(tmp);
+    }
+
+    @Override
+    public boolean delReadAcl(String userId,String password,String topicId, String targetUserId) {
+        if(!userDao.isValidate(new UserDomain(Long.parseLong(userId),password)))return false;
+        if(!isOwner(topicId,userId))return false;
+        String key = OpsEnum.AUTH_READ.getCode()+","+topicId;
+        AclDomain tmp = TopicConfig.aclMap.get(key);
+        tmp.getUserIds().remove(userId);
+        return AclDao.update(tmp);
+    }
+
+    @Override
+    public boolean delWriteAcl(String userId,String password,String topicId, String targetUserId) {
+        if(!userDao.isValidate(new UserDomain(Long.parseLong(userId),password)))return false;
+        if(!isOwner(topicId,userId))return false;
+        String key = OpsEnum.AUTH_WRITE.getCode()+","+topicId;
+        AclDomain tmp = TopicConfig.aclMap.get(key);
+        tmp.getUserIds().remove(userId);
+        return AclDao.update(tmp);
+    }
+
+
+    @Override
+    public boolean delTopic(String userId,String password,String topicId) {
+        if(!userDao.isValidate(new UserDomain(Long.parseLong(userId),password)))return false;
+        if(!isOwner(topicId,userId))return false;
+        String rKey = OpsEnum.AUTH_READ.getCode()+","+topicId;
+        AclDomain rTmp = TopicConfig.aclMap.get(rKey);
+        rTmp.setStatus(-1);
+        String wKey = OpsEnum.AUTH_WRITE.getCode()+","+topicId;
+        AclDomain wTmp = TopicConfig.aclMap.get(wKey);
+        wTmp.setStatus(-1);
+        String oKey = OpsEnum.AUTH_OWNER.getCode()+","+topicId;
+        AclDomain oTmp = TopicConfig.aclMap.get(oKey);
+        oTmp.setStatus(-1);
+        return AclDao.update(rTmp)&AclDao.update(wTmp)&AclDao.update(oTmp);
+    }
+
+    @Override
+    public Set<String> getAllReadUserId(String topicId) {
+        String rKey = OpsEnum.AUTH_READ.getCode()+","+topicId;
+        AclDomain rTmp = TopicConfig.aclMap.get(rKey);
+        return rTmp.getUserIds();
+    }
+
+    @Override
+    public Set<String> getAllWriteUserId(String topicId) {
+        String wKey = OpsEnum.AUTH_WRITE.getCode()+","+topicId;
+        AclDomain wTmp = TopicConfig.aclMap.get(wKey);
+        return wTmp.getUserIds();
+    }
+
+    @Override
+    public String getAllOwnerUserId(String topicId) {
+        String oKey = OpsEnum.AUTH_OWNER.getCode()+","+topicId;
+        AclDomain oTmp = TopicConfig.aclMap.get(oKey);
+        Set<String> res = oTmp.getUserIds();
+        if(res.isEmpty())return null;
+        return res.iterator().next();
+    }
+
+    @Override
+    public boolean canRead(String topicId, String userId) {
+        String rKey = OpsEnum.AUTH_READ.getCode()+","+topicId;
+        AclDomain rTmp = TopicConfig.aclMap.get(rKey);
+        Set<String> res = rTmp.getUserIds();
+        if(res.isEmpty())return false;
+        return res.contains(userId);
+    }
+
+    @Override
+    public boolean canWrite(String topicId, String userId) {
+        String wKey = OpsEnum.AUTH_WRITE.getCode()+","+topicId;
+        AclDomain wTmp = TopicConfig.aclMap.get(wKey);
+        Set<String> res = wTmp.getUserIds();
+        if(res.isEmpty())return false;
+        return res.contains(userId);
+    }
+
+    @Override
+    public boolean isOwner(String topicId, String userId) {
+        String oKey = OpsEnum.AUTH_OWNER.getCode()+","+topicId;
+        AclDomain oTmp = TopicConfig.aclMap.get(oKey);
+        Set<String> res = oTmp.getUserIds();
+        if(res.isEmpty())return false;
+        return res.contains(userId);
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java
old mode 100644
new mode 100755
index 5d6acc0f8..62b74cd1b
--- a/client/src/main/java/org/apache/rocketmq/client/Validators.java
+++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.client;
 
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -33,6 +34,8 @@
     public static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$";
     public static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR);
     public static final int CHARACTER_MAX_LENGTH = 255;
+    public static final String USERID = "userId";
+    public static final String PASSWD = "passwd";
 
     /**
      * @return The resulting {@code String}
@@ -98,6 +101,16 @@ public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer
             throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
                 "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
         }
+
+        //validate userID
+        if (msg.getUserProperty(USERID) == null || msg.getUserProperty(USERID).equals("")){
+            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the userId can not be null or empty");
+        }
+        //validate passwd
+        if (msg.getUserProperty(PASSWD) == null || msg.getUserProperty(PASSWD).equals("")){
+            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the userId can not be null or empty");
+        }
+
     }
 
     /**
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
old mode 100644
new mode 100755
index d51030a15..178534205
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -566,8 +566,8 @@ public void registerMessageListener(MessageListenerOrderly messageListener) {
      * @throws MQClientException if there is any client error.
      */
     @Override
-    public void subscribe(String topic, String subExpression) throws MQClientException {
-        this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);
+    public void subscribe(String topic, String userId, String passwd, String subExpression) throws MQClientException {
+        this.defaultMQPushConsumerImpl.subscribe(topic, userId, passwd, subExpression);
     }
 
     /**
@@ -578,8 +578,8 @@ public void subscribe(String topic, String subExpression) throws MQClientExcepti
      * @param filterClassSource class source code,used UTF-8 file encoding,must be responsible for your code safety
      */
     @Override
-    public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
-        this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource);
+    public void subscribe(String topic, String userId, String passwd, String fullClassName, String filterClassSource) throws MQClientException {
+        this.defaultMQPushConsumerImpl.subscribe(topic,userId, passwd, fullClassName, filterClassSource);
     }
 
     /**
@@ -591,8 +591,8 @@ public void subscribe(String topic, String fullClassName, String filterClassSour
      * @see org.apache.rocketmq.client.consumer.MessageSelector#byTag
      */
     @Override
-    public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {
-        this.defaultMQPushConsumerImpl.subscribe(topic, messageSelector);
+    public void subscribe(final String topic, final  String userId, final String passwd, final MessageSelector messageSelector) throws MQClientException {
+        this.defaultMQPushConsumerImpl.subscribe(topic,userId, passwd, messageSelector);
     }
 
     /**
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
old mode 100644
new mode 100755
index d56075c60..5a3f65c96
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
@@ -52,7 +52,7 @@
      * null or * expression,meaning subscribe
      * all
      */
-    void subscribe(final String topic, final String subExpression) throws MQClientException;
+    void subscribe(final String topic, final  String userId, final String passwd, final String subExpression) throws MQClientException;
 
     /**
      * Subscribe some topic
@@ -60,13 +60,13 @@
      * @param fullClassName full class name,must extend org.apache.rocketmq.common.filter. MessageFilter
      * @param filterClassSource class source code,used UTF-8 file encoding,must be responsible for your code safety
      */
-    void subscribe(final String topic, final String fullClassName,
+    void subscribe(final String topic, final String userId, final String passwd, final String fullClassName,
         final String filterClassSource) throws MQClientException;
 
     /**
      * Subscribe some topic with selector.
      * <p>
-     * This interface also has the ability of {@link #subscribe(String, String)},
+     * This interface also has the ability of {@link #subscribe(String,String,String, String)},
      * and, support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}.
      * </p>
      * <p/>
@@ -80,7 +80,7 @@ void subscribe(final String topic, final String fullClassName,
      *
      * @param selector message selector({@link MessageSelector}), can be null.
      */
-    void subscribe(final String topic, final MessageSelector selector) throws MQClientException;
+    void subscribe(final String topic, final  String userId, final  String passwd, final MessageSelector selector) throws MQClientException;
 
     /**
      * Unsubscribe consumption some topic
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
old mode 100644
new mode 100755
index 393ef92c9..ca6a05bf8
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -79,6 +79,7 @@
 public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     /**
      * Delay some time when exception occur
+     *
      */
     private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000;
     /**
@@ -857,7 +858,7 @@ private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
         return this.rebalanceImpl.getSubscriptionInner();
     }
 
-    public void subscribe(String topic, String subExpression) throws MQClientException {
+    public void subscribe(String topic, String userId, String passwd, String subExpression) throws MQClientException {
         try {
             SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                 topic, subExpression);
@@ -870,7 +871,7 @@ public void subscribe(String topic, String subExpression) throws MQClientExcepti
         }
     }
 
-    public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
+    public void subscribe(String topic, String userId, String passwd, String fullClassName, String filterClassSource) throws MQClientException {
         try {
             SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                 topic, "*");
@@ -887,10 +888,10 @@ public void subscribe(String topic, String fullClassName, String filterClassSour
         }
     }
 
-    public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {
+    public void subscribe(final String topic, String userId, String passwd, final MessageSelector messageSelector) throws MQClientException {
         try {
             if (messageSelector == null) {
-                subscribe(topic, SubscriptionData.SUB_ALL);
+                subscribe(topic,userId, passwd, SubscriptionData.SUB_ALL);
                 return;
             }
 
diff --git a/common/pom.xml b/common/pom.xml
index 62c6d7eb5..142f92e7b 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -37,5 +37,9 @@
             <groupId>${project.groupId}</groupId>
             <artifactId>rocketmq-remoting</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-broker</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
index 4795cced6..b47a3972f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
@@ -16,8 +16,13 @@
  */
 package org.apache.rocketmq.common;
 
+import org.apache.rocketmq.broker.acl.dao.AclDao;
+import org.apache.rocketmq.broker.acl.domain.AclDomain;
 import org.apache.rocketmq.common.constant.PermName;
 
+import java.util.HashMap;
+import java.util.Map;
+
 public class TopicConfig {
     private static final String SEPARATOR = " ";
     public static int defaultReadQueueNums = 16;
@@ -29,7 +34,10 @@
     private TopicFilterType topicFilterType = TopicFilterType.SINGLE_TAG;
     private int topicSysFlag = 0;
     private boolean order = false;
-
+    public static Map<String, AclDomain> aclMap= new HashMap<String,AclDomain>();
+    static {
+        AclDao.recoverFromLog();
+    }
     public TopicConfig() {
     }
 
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
old mode 100644
new mode 100755
index d431d3ecc..91c4ec3e4
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
@@ -106,16 +106,16 @@ public void run() {
         consumer.setInstanceName(Long.toString(System.currentTimeMillis()));
 
         if (filterType == null || expression == null) {
-            consumer.subscribe(topic, "*");
+            consumer.subscribe(topic,"","", "*");
         } else {
             if (ExpressionType.TAG.equals(filterType)) {
                 String expr = MixAll.file2String(expression);
                 System.out.printf("Expression: %s%n", expr);
-                consumer.subscribe(topic, MessageSelector.byTag(expr));
+                consumer.subscribe(topic,"","", MessageSelector.byTag(expr));
             } else if (ExpressionType.SQL92.equals(filterType)) {
                 String expr = MixAll.file2String(expression);
                 System.out.printf("Expression: %s%n", expr);
-                consumer.subscribe(topic, MessageSelector.bySql(expr));
+                consumer.subscribe(topic,"","", MessageSelector.bySql(expr));
             } else {
                 throw new IllegalArgumentException("Not support filter type! " + filterType);
             }
diff --git a/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
old mode 100644
new mode 100755
index fb1f9bbde..31d99cce2
--- a/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
@@ -35,7 +35,7 @@ public static void main(String[] args) throws InterruptedException, MQClientExce
 
         consumer.setMessageModel(MessageModel.BROADCASTING);
 
-        consumer.subscribe("TopicTest", "TagA || TagC || TagD");
+        consumer.subscribe("TopicTest","","", "TagA || TagC || TagD");
 
         consumer.registerMessageListener(new MessageListenerConcurrently() {
 
diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java
old mode 100644
new mode 100755
index bb491ac40..e0f22b3e3
--- a/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java
@@ -36,7 +36,7 @@ public static void main(String[] args) throws InterruptedException, MQClientExce
         File classFile = new File(classLoader.getResource("MessageFilterImpl.java").getFile());
 
         String filterCode = MixAll.file2String(classFile);
-        consumer.subscribe("TopicTest", "org.apache.rocketmq.example.filter.MessageFilterImpl",
+        consumer.subscribe("TopicTest", "","","org.apache.rocketmq.example.filter.MessageFilterImpl",
             filterCode);
 
         consumer.registerMessageListener(new MessageListenerConcurrently() {
diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java b/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
old mode 100644
new mode 100755
index c41c9c14c..a2308bb9d
--- a/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
@@ -32,7 +32,7 @@
     public static void main(String[] args) {
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
         try {
-            consumer.subscribe("TopicTest",
+            consumer.subscribe("TopicTest","","",
                 MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
                     "and (a is not null and a between 0  3)"));
         } catch (MQClientException e) {
diff --git a/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java
old mode 100644
new mode 100755
index 6936f1dce..ebeb978ea
--- a/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java
@@ -44,7 +44,7 @@ public static void main(String[] args) throws InterruptedException, MQClientExce
             DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
             consumer.setInstanceName(Long.toString(System.currentTimeMillis()));
 
-            consumer.subscribe(topic, subscription);
+            consumer.subscribe(topic,"","", subscription);
 
             consumer.registerMessageListener(new MessageListenerConcurrently() {
                 AtomicLong consumeTimes = new AtomicLong(0);
diff --git a/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java
old mode 100644
new mode 100755
index abb274d61..2605f508f
--- a/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java
@@ -33,7 +33,7 @@ public static void main(String[] args) throws MQClientException {
 
         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 
-        consumer.subscribe("TopicTest", "TagA || TagC || TagD");
+        consumer.subscribe("TopicTest","","", "TagA || TagC || TagD");
 
         consumer.registerMessageListener(new MessageListenerOrderly() {
             AtomicLong consumeTimes = new AtomicLong(0);
diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
old mode 100644
new mode 100755
index 6d3b93650..b49f1b6cc
--- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
@@ -57,7 +57,7 @@ public static void main(String[] args) throws InterruptedException, MQClientExce
         /*
          * Subscribe one more more topics to consume.
          */
-        consumer.subscribe("TopicTest", "*");
+        consumer.subscribe("TopicTest","","", "*");
 
         /*
          *  Register callback to execute on arrival of messages fetched from brokers.
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
old mode 100644
new mode 100755
index c6c7e39d1..a2bc64eb0
--- a/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
@@ -29,7 +29,7 @@
 
     public static void main(String[] args) throws InterruptedException, MQClientException {
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
-        consumer.subscribe("Jodie_topic_1023", "*");
+        consumer.subscribe("Jodie_topic_1023", "","","*");
         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
         //wrong time format 2017_0422_221800
         consumer.setConsumeTimestamp("20170422221800");
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/acl/UpdateAclSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/acl/UpdateAclSubCommand.java
new file mode 100644
index 000000000..d9002f97b
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/acl/UpdateAclSubCommand.java
@@ -0,0 +1,123 @@
+package org.apache.rocketmq.tools.acl;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.broker.acl.service.AclService;
+import org.apache.rocketmq.broker.acl.serviceImpl.AclServiceImpl;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class UpdateAclSubCommand implements SubCommand {
+    @Override
+    public String commandName() {
+        return "updateTopicAcl";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "add or del topic acl";
+
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("b", "brokerAddr", true, "create topic to which broker");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("c", "clusterName", true, "create topic to which cluster");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("t", "topic", true, "topic name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("U", "userId", true, "user name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("P", "password", true, "password");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("T", "targetUserId", true, "target user name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("A", "add", false, "add acl");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("D", "delete", false, "delete acl");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("O", "operate type", true, "R|W");
+        opt.setRequired(true);
+        options.addOption(opt);
+        return null;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
+        String topicId = "";
+        String clusterName = "";
+        String userId = "";
+        String pass = "";
+        String targetUserId = "";
+        Character type ='R';
+        AclService aclService = new AclServiceImpl();
+        try{
+            if (commandLine.hasOption('t')) {
+                topicId = commandLine.getOptionValue("t").trim();
+            }else {
+                return;
+            }
+            if (commandLine.hasOption('c')) {
+                clusterName = commandLine.getOptionValue("c").trim();
+            }
+            if(commandLine.hasOption("U")){
+                userId = commandLine.getOptionValue("U").trim();
+            }else{
+                return ;
+            }
+            if(commandLine.hasOption("P")){
+                pass = commandLine.getOptionValue("P").trim();
+            }else{
+                return;
+            }
+            if(commandLine.hasOption("T")){
+                targetUserId = commandLine.getOptionValue("T").trim();
+            }else{
+                return;
+            }
+            if(commandLine.hasOption("O")){
+                type = commandLine.getOptionValue("O").trim().charAt(0);
+            }
+            if(commandLine.hasOption("A")){
+                if(type.equals('R')){
+                    aclService.addReadAcl(userId,pass,topicId,targetUserId);
+                }else if(type.equals("W")){
+                    aclService.addWriteAcl(userId,pass,topicId,targetUserId);
+                }else {
+                    return ;
+                }
+            }else if(commandLine.hasOption("D")){
+                if(type.equals('R')){
+                    aclService.delReadAcl(userId,pass,topicId,targetUserId);
+                }else if(type.equals("W")){
+                    aclService.delWriteAcl(userId,pass,topicId,targetUserId);
+                }else {
+                    return ;
+                }
+            }else{
+                return ;
+            }
+        }catch (Exception e){
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 6a51b7b4b..b2aa74d85 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -31,6 +31,7 @@
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.acl.UpdateAclSubCommand;
 import org.apache.rocketmq.tools.command.broker.BrokerConsumeStatsSubCommad;
 import org.apache.rocketmq.tools.command.broker.BrokerStatusSubCommand;
 import org.apache.rocketmq.tools.command.broker.CleanExpiredCQSubCommand;
@@ -197,6 +198,8 @@ public static void initCommand() {
         initCommand(new QueryConsumeQueueCommand());
         initCommand(new SendMessageCommand());
         initCommand(new ConsumeMessageCommand());
+
+        initCommand(new UpdateAclSubCommand());
     }
 
     private static void initLogback() throws JoranException {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services