You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/01/05 08:04:00 UTC

[rocketmq] branch 5.0.0-alpha updated: [ISSUE #3679] Support topic attributes (#3698)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-alpha
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha by this push:
     new d5701ae  [ISSUE #3679] Support topic attributes (#3698)
d5701ae is described below

commit d5701ae569fcd50967b5cd14ac8992d176285c1d
Author: Hongjian Fei <er...@163.com>
AuthorDate: Wed Jan 5 16:02:53 2022 +0800

    [ISSUE #3679] Support topic attributes (#3698)
    
    * [Feature] Support topic attributes.
    
    * [Feature] Topic attributes unit-test.
---
 .../broker/processor/AdminBrokerProcessor.java     |  18 +-
 .../rocketmq/broker/topic/TopicConfigManager.java  | 143 +++++++++
 .../broker/topic/TopicConfigManagerTest.java       | 318 +++++++++++++++++++++
 .../rocketmq/client/impl/MQClientAPIImpl.java      |   3 +
 common/pom.xml                                     |  13 -
 .../apache/rocketmq/common/TopicAttributes.java    |  33 ++-
 .../org/apache/rocketmq/common/TopicConfig.java    |  50 ++--
 .../rocketmq/common/attribute/Attribute.java       |  38 ++-
 .../rocketmq/common/attribute/AttributeParser.java |  79 +++++
 .../common/attribute/BooleanAttribute.java         |  32 ++-
 .../rocketmq/common/attribute/EnumAttribute.java   |  32 ++-
 .../common/attribute/LongRangeAttribute.java       |  35 ++-
 .../protocol/header/CreateTopicRequestHeader.java  |  11 +
 .../common/attribute/AttributeParserTest.java      |  70 +++++
 .../rocketmq/common/attribute/AttributeTest.java   |  70 +++++
 .../apache/rocketmq/store/util/QueueTypeUtils.java |  20 ++
 .../tools/command/topic/UpdateTopicSubCommand.java |  15 +
 17 files changed, 873 insertions(+), 107 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index f30953d..6505263 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -33,12 +33,14 @@ import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.PlainAccessConfig;
+import org.apache.rocketmq.common.TopicAttributes;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.admin.OffsetWrapper;
 import org.apache.rocketmq.common.admin.TopicOffset;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.attribute.AttributeParser;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
@@ -294,12 +296,18 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());
         topicConfig.setPerm(requestHeader.getPerm());
         topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
+        String attributesModification = requestHeader.getAttributes();
+        topicConfig.setAttributes(AttributeParser.parseToMap(attributesModification));
 
-        this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
-
-        this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
-
-        response.setCode(ResponseCode.SUCCESS);
+        try {
+            this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
+            this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
+            response.setCode(ResponseCode.SUCCESS);
+        }  catch (Exception e) {
+            log.error("Update / create topic failed for [{}]", request, e);
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(e.getMessage());
+        }
         return response;
     }
 
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 4af230d..ba40538 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -16,6 +16,8 @@
  */
 package org.apache.rocketmq.broker.topic;
 
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -25,11 +27,16 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
+import org.apache.rocketmq.common.attribute.Attribute;
 import org.apache.rocketmq.common.ConfigManager;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicAttributes;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
@@ -40,6 +47,8 @@ import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 public class TopicConfigManager extends ConfigManager {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private static final long LOCK_TIMEOUT_MILLIS = 3000;
@@ -353,6 +362,18 @@ public class TopicConfigManager extends ConfigManager {
     }
 
     public void updateTopicConfig(final TopicConfig topicConfig) {
+        checkNotNull(topicConfig, "topicConfig shouldn't be null");
+
+        Map<String, String> newAttributes = request(topicConfig);
+        Map<String, String> currentAttributes = current(topicConfig.getTopicName());
+
+        Map<String, String> finalAttributes = alterCurrentAttributes(
+                this.topicConfigTable.get(topicConfig.getTopicName()) == null,
+                ImmutableMap.copyOf(currentAttributes),
+                ImmutableMap.copyOf(newAttributes));
+
+        topicConfig.setAttributes(finalAttributes);
+
         TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
         if (old != null) {
             log.info("update topic config, old:[{}] new:[{}]", old, topicConfig);
@@ -398,6 +419,11 @@ public class TopicConfigManager extends ConfigManager {
         }
     }
 
+    // make it testable
+    public Map<String, Attribute> allAttributes() {
+        return TopicAttributes.ALL;
+    }
+
     public boolean isOrderTopic(final String topic) {
         TopicConfig topicConfig = this.topicConfigTable.get(topic);
         if (topicConfig == null) {
@@ -471,4 +497,121 @@ public class TopicConfigManager extends ConfigManager {
     public ConcurrentMap<String, TopicConfig> getTopicConfigTable() {
         return topicConfigTable;
     }
+
+    private Map<String, String> request(TopicConfig topicConfig) {
+        return topicConfig.getAttributes() == null ? new HashMap<>() : topicConfig.getAttributes();
+    }
+
+    private Map<String, String> current(String topic) {
+        TopicConfig topicConfig = this.topicConfigTable.get(topic);
+        if (topicConfig == null) {
+            return new HashMap<>();
+        } else {
+            Map<String, String> attributes = topicConfig.getAttributes();
+            if (attributes == null) {
+                return new HashMap<>();
+            } else {
+                return attributes;
+            }
+        }
+    }
+
+    private Map<String, String> alterCurrentAttributes(boolean create, ImmutableMap<String, String> currentAttributes, ImmutableMap<String, String> newAttributes) {
+        Map<String, String> init = new HashMap<>();
+        Map<String, String> add = new HashMap<>();
+        Map<String, String> update = new HashMap<>();
+        Map<String, String> delete = new HashMap<>();
+        Set<String> keys = new HashSet<>();
+
+        for (Entry<String, String> attribute : newAttributes.entrySet()) {
+            String key = attribute.getKey();
+            String realKey = realKey(key);
+            String value = attribute.getValue();
+
+            validate(realKey);
+            duplicationCheck(keys, realKey);
+
+            if (create) {
+                if (key.startsWith("+")) {
+                    init.put(realKey, value);
+                } else {
+                    throw new RuntimeException("only add attribute is supported while creating topic. key: " + realKey);
+                }
+            } else {
+                if (key.startsWith("+")) {
+                    if (!currentAttributes.containsKey(realKey)) {
+                        add.put(realKey, value);
+                    } else {
+                        update.put(realKey, value);
+                    }
+                } else if (key.startsWith("-")) {
+                    if (!currentAttributes.containsKey(realKey)) {
+                        throw new RuntimeException("attempt to delete a nonexistent key: " + realKey);
+                    }
+                    delete.put(realKey, value);
+                } else {
+                    throw new RuntimeException("wrong format key: " + realKey);
+                }
+            }
+        }
+
+        validateAlter(init, true, false);
+        validateAlter(add, false, false);
+        validateAlter(update, false, false);
+        validateAlter(delete, false, true);
+
+        log.info("add: {}, update: {}, delete: {}", add, update, delete);
+        HashMap<String, String> finalAttributes = new HashMap<>(currentAttributes);
+        finalAttributes.putAll(init);
+        finalAttributes.putAll(add);
+        finalAttributes.putAll(update);
+        for (String s : delete.keySet()) {
+            finalAttributes.remove(s);
+        }
+        return finalAttributes;
+    }
+
+    private void duplicationCheck(Set<String> keys, String key) {
+        boolean notExist = keys.add(key);
+        if (!notExist) {
+            throw new RuntimeException("alter duplication key. key: " + key);
+        }
+    }
+
+    private void validate(String kvAttribute) {
+        if (Strings.isNullOrEmpty(kvAttribute)) {
+            throw new RuntimeException("kv string format wrong.");
+        }
+
+        if (kvAttribute.contains("+")) {
+            throw new RuntimeException("kv string format wrong.");
+        }
+
+        if (kvAttribute.contains("-")) {
+            throw new RuntimeException("kv string format wrong.");
+        }
+    }
+
+    private void validateAlter(Map<String, String> alter, boolean init, boolean delete) {
+        for (Entry<String, String> entry : alter.entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+
+            Attribute attribute = allAttributes().get(key);
+            if (attribute == null) {
+                throw new RuntimeException("unsupported key: " + key);
+            }
+            if (!init && !attribute.isChangeable()) {
+                throw new RuntimeException("attempt to update an unchangeable attribute. key: " + key);
+            }
+
+            if (!delete) {
+                attribute.verify(value);
+            }
+        }
+    }
+
+    private String realKey(String key) {
+        return key.substring(1);
+    }
 }
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
new file mode 100644
index 0000000..9853c36
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.topic;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.attribute.Attribute;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.TopicAttributes;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.attribute.BooleanAttribute;
+import org.apache.rocketmq.common.attribute.EnumAttribute;
+import org.apache.rocketmq.common.attribute.LongRangeAttribute;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.queue.CQType;
+import org.apache.rocketmq.store.util.QueueTypeUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.collect.Sets.newHashSet;
+import static java.util.Arrays.asList;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TopicConfigManagerTest {
+    private TopicConfigManager topicConfigManager;
+    @Mock
+    private BrokerController brokerController;
+
+    @Before
+    public void init() {
+        BrokerConfig brokerConfig = new BrokerConfig();
+        when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+
+        topicConfigManager = new TopicConfigManager(brokerController);
+    }
+
+    @Test
+    public void testAddUnsupportedKeyOnCreating() {
+        String unsupportedKey = "key4";
+
+        supportAttributes(asList(
+                new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"),
+                new BooleanAttribute("bool.key", false, false),
+                new LongRangeAttribute("long.range.key", true, 10, 20, 15)
+        ));
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("+enum.key", "enum-2");
+        attributes.put("+" + unsupportedKey, "value1");
+
+        TopicConfig topicConfig = new TopicConfig();
+        topicConfig.setTopicName("new-topic");
+        topicConfig.setAttributes(attributes);
+
+        RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig));
+        Assert.assertEquals("unsupported key: " + unsupportedKey, runtimeException.getMessage());
+    }
+
+    @Test
+    public void testAddWrongFormatKeyOnCreating() {
+        supportAttributes(asList(
+                new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"),
+                new BooleanAttribute("bool.key", false, false),
+                new LongRangeAttribute("long.range.key", true, 10, 20, 15)
+        ));
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("++enum.key", "value1");
+
+        TopicConfig topicConfig = new TopicConfig();
+        topicConfig.setTopicName("new-topic");
+        topicConfig.setAttributes(attributes);
+
+        RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig));
+        Assert.assertEquals("kv string format wrong.", runtimeException.getMessage());
+    }
+
+    @Test
+    public void testDeleteKeyOnCreating() {
+        String key = "enum.key";
+        supportAttributes(asList(
+                new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"),
+                new BooleanAttribute("bool.key", false, false),
+                new LongRangeAttribute("long.range.key", true, 10, 20, 15)
+        ));
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("-" + key, "");
+
+        TopicConfig topicConfig = new TopicConfig();
+        topicConfig.setTopicName("new-topic");
+        topicConfig.setAttributes(attributes);
+
+        RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig));
+        Assert.assertEquals("only add attribute is supported while creating topic. key: " + key, runtimeException.getMessage());
+    }
+
+    @Test
+    public void testAddWrongValueOnCreating() {
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("+" + TopicAttributes.queueType.getName(), "wrong-value");
+
+        TopicConfig topicConfig = new TopicConfig();
+        topicConfig.setTopicName("new-topic");
+        topicConfig.setAttributes(attributes);
+
+        RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig));
+        Assert.assertEquals("value is not in set: [SimpleCQ, BatchCQ]", runtimeException.getMessage());
+    }
+
+    @Test
+    public void testNormalAddKeyOnCreating() {
+        String topic = "new-topic";
+
+        supportAttributes(asList(
+                new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"),
+                new BooleanAttribute("bool.key", false, false),
+                new LongRangeAttribute("long.range.key", true, 10, 20, 15)
+        ));
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("+enum.key", "enum-2");
+        attributes.put("+long.range.key", "16");
+
+        TopicConfig topicConfig = new TopicConfig();
+        topicConfig.setTopicName(topic);
+        topicConfig.setAttributes(attributes);
+        topicConfigManager.updateTopicConfig(topicConfig);
+
+        TopicConfig existingTopicConfig = topicConfigManager.getTopicConfigTable().get(topic);
+        Assert.assertEquals("enum-2", existingTopicConfig.getAttributes().get("enum.key"));
+        Assert.assertEquals("16", existingTopicConfig.getAttributes().get("long.range.key"));
+//        assert file
+    }
+
+    @Test
+    public void testAddDuplicatedKeyOnUpdating() {
+        String duplicatedKey = "long.range.key";
+
+        supportAttributes(asList(
+                new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"),
+                new BooleanAttribute("bool.key", false, false),
+                new LongRangeAttribute("long.range.key", true, 10, 20, 15)
+        ));
+
+        createTopic();
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("+" + duplicatedKey, "11");
+        attributes.put("-" + duplicatedKey, "");
+
+        TopicConfig topicConfig = new TopicConfig();
+        topicConfig.setTopicName("new-topic");
+        topicConfig.setAttributes(attributes);
+
+        RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig));
+        Assert.assertEquals("alter duplication key. key: " + duplicatedKey, runtimeException.getMessage());
+    }
+
+    private void createTopic() {
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("+enum.key", "enum-3");
+        attributes.put("+bool.key", "true");
+        attributes.put("+long.range.key", "12");
+
+        TopicConfig topicConfig = new TopicConfig();
+        topicConfig.setTopicName("new-topic");
+        topicConfig.setAttributes(attributes);
+
+        topicConfigManager.updateTopicConfig(topicConfig);
+    }
+
+    @Test
+    public void testDeleteNonexistentKeyOnUpdating() {
+        String key = "nonexisting.key";
+
+        supportAttributes(asList(
+                new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"),
+                new BooleanAttribute("bool.key", false, false),
+                new LongRangeAttribute("long.range.key", true, 10, 20, 15)
+        ));
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("+enum.key", "enum-2");
+        attributes.put("+bool.key", "true");
+
+        TopicConfig topicConfig = new TopicConfig();
+        topicConfig.setTopicName("new-topic");
+        topicConfig.setAttributes(attributes);
+
+        topicConfigManager.updateTopicConfig(topicConfig);
+
+        attributes = new HashMap<>();
+        attributes.clear();
+        attributes.put("-" + key, "");
+        topicConfig.setAttributes(attributes);
+        RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig));
+        Assert.assertEquals("attempt to delete a nonexistent key: " + key, runtimeException.getMessage());
+    }
+
+    @Test
+    public void testAlterTopicWithoutChangingAttributes() {
+        String topic = "new-topic";
+
+        supportAttributes(asList(
+                new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"),
+                new BooleanAttribute("bool.key", false, false),
+                new LongRangeAttribute("long.range.key", true, 10, 20, 15)
+        ));
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("+enum.key", "enum-2");
+        attributes.put("+bool.key", "true");
+
+        TopicConfig topicConfigInit = new TopicConfig();
+        topicConfigInit.setTopicName(topic);
+        topicConfigInit.setAttributes(attributes);
+
+        topicConfigManager.updateTopicConfig(topicConfigInit);
+        Assert.assertEquals("enum-2", topicConfigManager.getTopicConfigTable().get(topic).getAttributes().get("enum.key"));
+        Assert.assertEquals("true", topicConfigManager.getTopicConfigTable().get(topic).getAttributes().get("bool.key"));
+
+        TopicConfig topicConfigAlter = new TopicConfig();
+        topicConfigAlter.setTopicName(topic);
+        topicConfigAlter.setReadQueueNums(10);
+        topicConfigAlter.setWriteQueueNums(10);
+        topicConfigManager.updateTopicConfig(topicConfigAlter);
+        Assert.assertEquals("enum-2", topicConfigManager.getTopicConfigTable().get(topic).getAttributes().get("enum.key"));
+        Assert.assertEquals("true", topicConfigManager.getTopicConfigTable().get(topic).getAttributes().get("bool.key"));
+    }
+
+    @Test
+    public void testNormalUpdateUnchangeableKeyOnUpdating() {
+        String topic = "exist-topic";
+
+        supportAttributes(asList(
+                new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"),
+                new BooleanAttribute("bool.key", true, false),
+                new LongRangeAttribute("long.range.key", false, 10, 20, 15)
+        ));
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("+long.range.key", "14");
+
+        TopicConfig topicConfig = new TopicConfig();
+        topicConfig.setTopicName(topic);
+        topicConfig.setAttributes(attributes);
+
+        topicConfigManager.updateTopicConfig(topicConfig);
+
+        attributes.put("+long.range.key", "16");
+        topicConfig.setAttributes(attributes);
+        RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig));
+        Assert.assertEquals("attempt to update an unchangeable attribute. key: long.range.key", runtimeException.getMessage());
+    }
+
+    @Test
+    public void testNormalQueryKeyOnGetting() {
+        String topic = "exist-topic";
+        String unchangeable = "bool.key";
+
+        supportAttributes(asList(
+                new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"),
+                new BooleanAttribute("bool.key", false, false),
+                new LongRangeAttribute("long.range.key", true, 10, 20, 15)
+        ));
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("+" + unchangeable, "true");
+
+        TopicConfig topicConfig = new TopicConfig();
+        topicConfig.setTopicName(topic);
+        topicConfig.setAttributes(attributes);
+
+        topicConfigManager.updateTopicConfig(topicConfig);
+
+        TopicConfig topicConfigUpdated = topicConfigManager.getTopicConfigTable().get(topic);
+        Assert.assertEquals(CQType.SimpleCQ, QueueTypeUtils.getCQType(topicConfigUpdated));
+
+        Assert.assertEquals("true", topicConfigUpdated.getAttributes().get(unchangeable));
+    }
+
+    private void supportAttributes(List<Attribute> supportAttributes) {
+        Map<String, Attribute> supportedAttributes = new HashMap<>();
+
+        for (Attribute supportAttribute : supportAttributes) {
+            supportedAttributes.put(supportAttribute.getName(), supportAttribute);
+        }
+
+        topicConfigManager = spy(topicConfigManager);
+        when(topicConfigManager.allAttributes()).thenReturn(supportedAttributes);
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 20b8f41..cd0d05b 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -56,10 +56,12 @@ import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.PlainAccessConfig;
+import org.apache.rocketmq.common.TopicAttributes;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.attribute.AttributeParser;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageBatch;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
@@ -332,6 +334,7 @@ public class MQClientAPIImpl {
         requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
         requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
         requestHeader.setOrder(topicConfig.isOrder());
+        requestHeader.setAttributes(AttributeParser.parseToString(topicConfig.getAttributes()));
 
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
 
diff --git a/common/pom.xml b/common/pom.xml
index dd8ea6a..bbbc713 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -27,19 +27,6 @@
     <artifactId>rocketmq-common</artifactId>
     <name>rocketmq-common ${project.version}</name>
 
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <configuration>
-                    <source>6</source>
-                    <target>6</target>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-
     <dependencies>
         <dependency>
             <groupId>${project.groupId}</groupId>
diff --git a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java b/common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java
similarity index 56%
copy from store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
copy to common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java
index 18e7f5a..9c1e96f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java
@@ -14,22 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.store.util;
+package org.apache.rocketmq.common;
 
-import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.MessageStore;
-import org.apache.rocketmq.store.StreamMessageStore;
-import org.apache.rocketmq.store.queue.CQType;
+import org.apache.rocketmq.common.attribute.Attribute;
+import org.apache.rocketmq.common.attribute.EnumAttribute;
 
-public class QueueTypeUtils {
+import java.util.HashMap;
+import java.util.Map;
 
-    public static CQType getCQType(MessageStore messageStore) {
-        if (messageStore instanceof DefaultMessageStore) {
-            return CQType.SimpleCQ;
-        } else if (messageStore instanceof StreamMessageStore) {
-            return CQType.BatchCQ;
-        } else {
-            throw new RuntimeException("new cq type is not supported now.");
-        }
+import static com.google.common.collect.Sets.newHashSet;
+
+public class TopicAttributes {
+    public static final EnumAttribute queueType = new EnumAttribute(
+            "queue.type",
+            false,
+            newHashSet("BatchCQ", "SimpleCQ"),
+            "SimpleCQ"
+    );
+    public static final Map<String, Attribute> ALL;
+
+    static {
+        ALL = new HashMap<>();
+        ALL.put(queueType.getName(), queueType);
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
index ec4d54b..610c3e2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
@@ -18,6 +18,8 @@ package org.apache.rocketmq.common;
 
 import org.apache.rocketmq.common.constant.PermName;
 
+import java.util.Map;
+
 public class TopicConfig {
     private static final String SEPARATOR = " ";
     public static int defaultReadQueueNums = 16;
@@ -29,6 +31,7 @@ public class TopicConfig {
     private TopicFilterType topicFilterType = TopicFilterType.SINGLE_TAG;
     private int topicSysFlag = 0;
     private boolean order = false;
+    private Map<String, String> attributes;
 
     public TopicConfig() {
     }
@@ -72,6 +75,8 @@ public class TopicConfig {
         sb.append(SEPARATOR);
         sb.append(this.topicFilterType);
 
+        // Leave the encode/decode [attributes] out for now
+
         return sb.toString();
     }
 
@@ -150,29 +155,29 @@ public class TopicConfig {
         this.order = isOrder;
     }
 
-    @Override
-    public boolean equals(final Object o) {
-        if (this == o)
-            return true;
-        if (o == null || getClass() != o.getClass())
-            return false;
+    public Map<String, String> getAttributes() {
+        return attributes;
+    }
+
+    public void setAttributes(Map<String, String> attributes) {
+        this.attributes = attributes;
+    }
 
-        final TopicConfig that = (TopicConfig) o;
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
 
-        if (readQueueNums != that.readQueueNums)
-            return false;
-        if (writeQueueNums != that.writeQueueNums)
-            return false;
-        if (perm != that.perm)
-            return false;
-        if (topicSysFlag != that.topicSysFlag)
-            return false;
-        if (order != that.order)
-            return false;
-        if (topicName != null ? !topicName.equals(that.topicName) : that.topicName != null)
-            return false;
-        return topicFilterType == that.topicFilterType;
+        TopicConfig that = (TopicConfig) o;
 
+        if (readQueueNums != that.readQueueNums) return false;
+        if (writeQueueNums != that.writeQueueNums) return false;
+        if (perm != that.perm) return false;
+        if (topicSysFlag != that.topicSysFlag) return false;
+        if (order != that.order) return false;
+        if (topicName != null ? !topicName.equals(that.topicName) : that.topicName != null) return false;
+        if (topicFilterType != that.topicFilterType) return false;
+        return attributes != null ? attributes.equals(that.attributes) : that.attributes == null;
     }
 
     @Override
@@ -184,6 +189,7 @@ public class TopicConfig {
         result = 31 * result + (topicFilterType != null ? topicFilterType.hashCode() : 0);
         result = 31 * result + topicSysFlag;
         result = 31 * result + (order ? 1 : 0);
+        result = 31 * result + (attributes != null ? attributes.hashCode() : 0);
         return result;
     }
 
@@ -191,7 +197,7 @@ public class TopicConfig {
     public String toString() {
         return "TopicConfig [topicName=" + topicName + ", readQueueNums=" + readQueueNums
             + ", writeQueueNums=" + writeQueueNums + ", perm=" + PermName.perm2String(perm)
-            + ", topicFilterType=" + topicFilterType + ", topicSysFlag=" + topicSysFlag + ", order="
-            + order + "]";
+            + ", topicFilterType=" + topicFilterType + ", topicSysFlag=" + topicSysFlag + ", order=" + order
+            + ", attributes=" + attributes + "]";
     }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java b/common/src/main/java/org/apache/rocketmq/common/attribute/Attribute.java
similarity index 56%
copy from store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
copy to common/src/main/java/org/apache/rocketmq/common/attribute/Attribute.java
index 18e7f5a..ba9be3b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/attribute/Attribute.java
@@ -14,22 +14,32 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.store.util;
+package org.apache.rocketmq.common.attribute;
 
-import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.MessageStore;
-import org.apache.rocketmq.store.StreamMessageStore;
-import org.apache.rocketmq.store.queue.CQType;
+public abstract class Attribute {
+    protected String name;
+    protected boolean changeable;
 
-public class QueueTypeUtils {
+    public abstract void verify(String value);
 
-    public static CQType getCQType(MessageStore messageStore) {
-        if (messageStore instanceof DefaultMessageStore) {
-            return CQType.SimpleCQ;
-        } else if (messageStore instanceof StreamMessageStore) {
-            return CQType.BatchCQ;
-        } else {
-            throw new RuntimeException("new cq type is not supported now.");
-        }
+    public Attribute(String name, boolean changeable) {
+        this.name = name;
+        this.changeable = changeable;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public boolean isChangeable() {
+        return changeable;
+    }
+
+    public void setChangeable(boolean changeable) {
+        this.changeable = changeable;
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/attribute/AttributeParser.java b/common/src/main/java/org/apache/rocketmq/common/attribute/AttributeParser.java
new file mode 100644
index 0000000..7ee7afc
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/attribute/AttributeParser.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.attribute;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AttributeParser {
+    public static Map<String, String> parseToMap(String attributesModification) {
+        if (Strings.isNullOrEmpty(attributesModification)) {
+            return new HashMap<>();
+        }
+
+        // format: +key1=value1,+key2=value2,-key3,+key4=value4
+        Map<String, String> attributes = new HashMap<>();
+        String arraySeparator = ",";
+        String kvSeparator = "=";
+        String[] kvs = attributesModification.split(arraySeparator);
+        for (String kv : kvs) {
+            String key;
+            String value;
+            if (kv.contains(kvSeparator)) {
+                key = kv.split(kvSeparator)[0];
+                value = kv.split(kvSeparator)[1];
+                if (!key.contains("+")) {
+                    throw new RuntimeException("add/alter attribute format is wrong: " + key);
+                }
+            } else {
+                key = kv;
+                value = "";
+                if (!key.contains("-")) {
+                    throw new RuntimeException("delete attribute format is wrong: " + key);
+                }
+            }
+            String old = attributes.put(key, value);
+            if (old != null) {
+                throw new RuntimeException("key duplication: " + key);
+            }
+        }
+        return attributes;
+    }
+
+    public static String parseToString(Map<String, String> attributes) {
+        if (attributes == null || attributes.size() == 0) {
+            return "";
+        }
+
+        List<String> kvs = new ArrayList<>();
+        for (Map.Entry<String, String> entry : attributes.entrySet()) {
+
+            String value = entry.getValue();
+            if (Strings.isNullOrEmpty(value)) {
+                kvs.add(entry.getKey());
+            } else {
+                kvs.add(entry.getKey() + "=" + entry.getValue());
+            }
+        }
+        return Joiner.on(",").join(kvs);
+    }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java b/common/src/main/java/org/apache/rocketmq/common/attribute/BooleanAttribute.java
similarity index 54%
copy from store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
copy to common/src/main/java/org/apache/rocketmq/common/attribute/BooleanAttribute.java
index 18e7f5a..41ad748 100644
--- a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/attribute/BooleanAttribute.java
@@ -14,22 +14,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.store.util;
+package org.apache.rocketmq.common.attribute;
 
-import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.MessageStore;
-import org.apache.rocketmq.store.StreamMessageStore;
-import org.apache.rocketmq.store.queue.CQType;
+import static com.google.common.base.Preconditions.checkNotNull;
 
-public class QueueTypeUtils {
+public class BooleanAttribute extends Attribute {
+    private final boolean defaultValue;
 
-    public static CQType getCQType(MessageStore messageStore) {
-        if (messageStore instanceof DefaultMessageStore) {
-            return CQType.SimpleCQ;
-        } else if (messageStore instanceof StreamMessageStore) {
-            return CQType.BatchCQ;
-        } else {
-            throw new RuntimeException("new cq type is not supported now.");
+    public BooleanAttribute(String name, boolean changeable, boolean defaultValue) {
+        super(name, changeable);
+        this.defaultValue = defaultValue;
+    }
+
+    @Override
+    public void verify(String value) {
+        checkNotNull(value);
+
+        if (!"false".equalsIgnoreCase(value) && !"true".equalsIgnoreCase(value)) {
+            throw new RuntimeException("boolean attribute format is wrong.");
         }
     }
+
+    public boolean getDefaultValue() {
+        return defaultValue;
+    }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java b/common/src/main/java/org/apache/rocketmq/common/attribute/EnumAttribute.java
similarity index 55%
copy from store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
copy to common/src/main/java/org/apache/rocketmq/common/attribute/EnumAttribute.java
index 18e7f5a..5353b8a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/attribute/EnumAttribute.java
@@ -14,22 +14,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.store.util;
+package org.apache.rocketmq.common.attribute;
 
-import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.MessageStore;
-import org.apache.rocketmq.store.StreamMessageStore;
-import org.apache.rocketmq.store.queue.CQType;
+import java.util.Set;
 
-public class QueueTypeUtils {
+public class EnumAttribute extends Attribute {
+    private final Set<String> universe;
+    private final String defaultValue;
 
-    public static CQType getCQType(MessageStore messageStore) {
-        if (messageStore instanceof DefaultMessageStore) {
-            return CQType.SimpleCQ;
-        } else if (messageStore instanceof StreamMessageStore) {
-            return CQType.BatchCQ;
-        } else {
-            throw new RuntimeException("new cq type is not supported now.");
+    public EnumAttribute(String name, boolean changeable, Set<String> universe, String defaultValue) {
+        super(name, changeable);
+        this.universe = universe;
+        this.defaultValue = defaultValue;
+    }
+
+    @Override
+    public void verify(String value) {
+        if (!this.universe.contains(value)) {
+            throw new RuntimeException("value is not in set: " + this.universe);
         }
     }
+
+    public String getDefaultValue() {
+        return defaultValue;
+    }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java b/common/src/main/java/org/apache/rocketmq/common/attribute/LongRangeAttribute.java
similarity index 52%
copy from store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
copy to common/src/main/java/org/apache/rocketmq/common/attribute/LongRangeAttribute.java
index 18e7f5a..eeeda72 100644
--- a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/attribute/LongRangeAttribute.java
@@ -14,22 +14,31 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.store.util;
+package org.apache.rocketmq.common.attribute;
 
-import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.MessageStore;
-import org.apache.rocketmq.store.StreamMessageStore;
-import org.apache.rocketmq.store.queue.CQType;
+import static java.lang.String.format;
 
-public class QueueTypeUtils {
+public class LongRangeAttribute extends Attribute {
+    private final long min;
+    private final long max;
+    private final long defaultValue;
 
-    public static CQType getCQType(MessageStore messageStore) {
-        if (messageStore instanceof DefaultMessageStore) {
-            return CQType.SimpleCQ;
-        } else if (messageStore instanceof StreamMessageStore) {
-            return CQType.BatchCQ;
-        } else {
-            throw new RuntimeException("new cq type is not supported now.");
+    public LongRangeAttribute(String name, boolean changeable, long min, long max, long defaultValue) {
+        super(name, changeable);
+        this.min = min;
+        this.max = max;
+        this.defaultValue = defaultValue;
+    }
+
+    @Override
+    public void verify(String value) {
+        long l = Long.parseLong(value);
+        if (l < min || l > max) {
+            throw new RuntimeException(format("value is not in range(%d, %d)", min, max));
         }
     }
+
+    public long getDefaultValue() {
+        return defaultValue;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
index 290ec4c..2e381b3 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
@@ -26,6 +26,8 @@ import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
+import java.util.Map;
+
 public class CreateTopicRequestHeader implements CommandCustomHeader {
     @CFNotNull
     private String topic;
@@ -42,6 +44,7 @@ public class CreateTopicRequestHeader implements CommandCustomHeader {
     private Integer topicSysFlag;
     @CFNotNull
     private Boolean order = false;
+    private String attributes;
 
     @CFNullable
     private Boolean force = false;
@@ -130,4 +133,12 @@ public class CreateTopicRequestHeader implements CommandCustomHeader {
     public void setForce(Boolean force) {
         this.force = force;
     }
+
+    public String getAttributes() {
+        return attributes;
+    }
+
+    public void setAttributes(String attributes) {
+        this.attributes = attributes;
+    }
 }
diff --git a/common/src/test/java/org/apache/rocketmq/common/attribute/AttributeParserTest.java b/common/src/test/java/org/apache/rocketmq/common/attribute/AttributeParserTest.java
new file mode 100644
index 0000000..1239810
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/attribute/AttributeParserTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.attribute;
+
+import com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.collect.Maps.newHashMap;
+import static org.junit.Assert.assertTrue;
+
+public class AttributeParserTest {
+    @Test
+    public void testParseToMap() {
+        Assert.assertEquals(0, AttributeParser.parseToMap(null).size());
+        AttributeParser.parseToMap("++=++");
+        AttributeParser.parseToMap("--");
+        Assert.assertThrows(RuntimeException.class, () -> AttributeParser.parseToMap("x"));
+        Assert.assertThrows(RuntimeException.class, () -> AttributeParser.parseToMap("+"));
+        Assert.assertThrows(RuntimeException.class, () -> AttributeParser.parseToMap("++"));
+    }
+
+    @Test
+    public void testParseToString() {
+        Assert.assertEquals("", AttributeParser.parseToString(null));
+        Assert.assertEquals("", AttributeParser.parseToString(newHashMap()));
+        HashMap<String, String> map = new HashMap<>();
+        int addSize = 10;
+        for (int i = 0; i < addSize; i++) {
+            map.put("+add.key" + i, "value" + i);
+        }
+        int deleteSize = 10;
+        for (int i = 0; i < deleteSize; i++) {
+            map.put("-delete.key" + i, "");
+        }
+        Assert.assertEquals(addSize + deleteSize, AttributeParser.parseToString(map).split(",").length);
+    }
+
+    @Test
+    public void testParseBetweenStringAndMapWithoutDistortion() {
+        List<String> testCases = Arrays.asList("-a", "+a=b,+c=d,+z=z,+e=e", "+a=b,-d", "+a=b", "-a,-b");
+        for (String testCase : testCases) {
+            assertTrue(Maps.difference(AttributeParser.parseToMap(testCase), AttributeParser.parseToMap(parse(testCase))).areEqual());
+        }
+    }
+
+    private String parse(String original) {
+        Map<String, String> stringStringMap = AttributeParser.parseToMap(original);
+        return AttributeParser.parseToString(stringStringMap);
+    }
+}
diff --git a/common/src/test/java/org/apache/rocketmq/common/attribute/AttributeTest.java b/common/src/test/java/org/apache/rocketmq/common/attribute/AttributeTest.java
new file mode 100644
index 0000000..39a12b9
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/attribute/AttributeTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.attribute;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static com.google.common.collect.Sets.newHashSet;
+
+public class AttributeTest {
+
+    @Test
+    public void testEnumAttribute() {
+        EnumAttribute enumAttribute = new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1");
+
+        Assert.assertThrows(RuntimeException.class, () -> enumAttribute.verify(""));
+        Assert.assertThrows(RuntimeException.class, () -> enumAttribute.verify("x"));
+        Assert.assertThrows(RuntimeException.class, () -> enumAttribute.verify("enum-4"));
+
+        enumAttribute.verify("enum-1");
+        enumAttribute.verify("enum-2");
+        enumAttribute.verify("enum-3");
+    }
+
+    @Test
+    public void testLongRangeAttribute() {
+        LongRangeAttribute longRangeAttribute = new LongRangeAttribute("long.range.key", true, 10, 20, 15);
+        Assert.assertThrows(RuntimeException.class, () -> longRangeAttribute.verify(""));
+        Assert.assertThrows(RuntimeException.class, () -> longRangeAttribute.verify(","));
+        Assert.assertThrows(RuntimeException.class, () -> longRangeAttribute.verify("a"));
+        Assert.assertThrows(RuntimeException.class, () -> longRangeAttribute.verify("-1"));
+        Assert.assertThrows(RuntimeException.class, () -> longRangeAttribute.verify("21"));
+
+        longRangeAttribute.verify("11");
+        longRangeAttribute.verify("10");
+        longRangeAttribute.verify("20");
+    }
+
+    @Test
+    public void testBooleanAttribute() {
+        BooleanAttribute booleanAttribute = new BooleanAttribute("bool.key", false, false);
+
+        Assert.assertThrows(RuntimeException.class, () -> booleanAttribute.verify(""));
+        Assert.assertThrows(RuntimeException.class, () -> booleanAttribute.verify("a"));
+        Assert.assertThrows(RuntimeException.class, () -> booleanAttribute.verify(","));
+        Assert.assertThrows(RuntimeException.class, () -> booleanAttribute.verify("checked"));
+        Assert.assertThrows(RuntimeException.class, () -> booleanAttribute.verify("1"));
+        Assert.assertThrows(RuntimeException.class, () -> booleanAttribute.verify("0"));
+        Assert.assertThrows(RuntimeException.class, () -> booleanAttribute.verify("-1"));
+
+        booleanAttribute.verify("true");
+        booleanAttribute.verify("tRue");
+        booleanAttribute.verify("false");
+        booleanAttribute.verify("falSe");
+    }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java b/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
index 18e7f5a..612bf7e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
+++ b/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
@@ -16,13 +16,18 @@
  */
 package org.apache.rocketmq.store.util;
 
+import org.apache.rocketmq.common.TopicAttributes;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.StreamMessageStore;
 import org.apache.rocketmq.store.queue.CQType;
 
+import java.util.Map;
+
 public class QueueTypeUtils {
 
+    @Deprecated
     public static CQType getCQType(MessageStore messageStore) {
         if (messageStore instanceof DefaultMessageStore) {
             return CQType.SimpleCQ;
@@ -32,4 +37,19 @@ public class QueueTypeUtils {
             throw new RuntimeException("new cq type is not supported now.");
         }
     }
+
+    public static CQType getCQType(TopicConfig topicConfig) {
+        String attributeName = TopicAttributes.queueType.getName();
+
+        Map<String, String> attributes = topicConfig.getAttributes();
+        if (attributes == null || attributes.size() == 0) {
+            return CQType.valueOf(TopicAttributes.queueType.getDefaultValue());
+        }
+
+        if (attributes.containsKey(attributeName)) {
+            return CQType.valueOf(attributes.get(attributeName));
+        } else {
+            return CQType.valueOf(TopicAttributes.queueType.getDefaultValue());
+        }
+    }
 }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java
index c33ae52..3caa477 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java
@@ -16,12 +16,16 @@
  */
 package org.apache.rocketmq.tools.command.topic;
 
+import java.util.Map;
 import java.util.Set;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionGroup;
 import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.TopicAttributes;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.attribute.AttributeParser;
 import org.apache.rocketmq.common.sysflag.TopicSysFlag;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.srvutil.ServerUtil;
@@ -55,6 +59,10 @@ public class UpdateTopicSubCommand implements SubCommand {
         optionGroup.setRequired(true);
         options.addOptionGroup(optionGroup);
 
+        opt = new Option("a", "attributes", true, "attribute(+a=b,+c=d,-e)");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         opt = new Option("t", "topic", true, "topic name");
         opt.setRequired(true);
         options.addOption(opt);
@@ -98,6 +106,12 @@ public class UpdateTopicSubCommand implements SubCommand {
             topicConfig.setWriteQueueNums(8);
             topicConfig.setTopicName(commandLine.getOptionValue('t').trim());
 
+            if (commandLine.hasOption('a')) {
+                String attributesModification = commandLine.getOptionValue('a').trim();
+                Map<String, String> attributes = AttributeParser.parseToMap(attributesModification);
+                topicConfig.setAttributes(attributes);
+            }
+
             // readQueueNums
             if (commandLine.hasOption('r')) {
                 topicConfig.setReadQueueNums(Integer.parseInt(commandLine.getOptionValue('r').trim()));
@@ -187,4 +201,5 @@ public class UpdateTopicSubCommand implements SubCommand {
             defaultMQAdminExt.shutdown();
         }
     }
+
 }