You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/01/05 08:04:00 UTC
[rocketmq] branch 5.0.0-alpha updated: [ISSUE #3679] Support topic attributes (#3698)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-alpha
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha by this push:
new d5701ae [ISSUE #3679] Support topic attributes (#3698)
d5701ae is described below
commit d5701ae569fcd50967b5cd14ac8992d176285c1d
Author: Hongjian Fei <er...@163.com>
AuthorDate: Wed Jan 5 16:02:53 2022 +0800
[ISSUE #3679] Support topic attributes (#3698)
* [Feature] Support topic attributes.
* [Feature] Topic attributes unit-test.
---
.../broker/processor/AdminBrokerProcessor.java | 18 +-
.../rocketmq/broker/topic/TopicConfigManager.java | 143 +++++++++
.../broker/topic/TopicConfigManagerTest.java | 318 +++++++++++++++++++++
.../rocketmq/client/impl/MQClientAPIImpl.java | 3 +
common/pom.xml | 13 -
.../apache/rocketmq/common/TopicAttributes.java | 33 ++-
.../org/apache/rocketmq/common/TopicConfig.java | 50 ++--
.../rocketmq/common/attribute/Attribute.java | 38 ++-
.../rocketmq/common/attribute/AttributeParser.java | 79 +++++
.../common/attribute/BooleanAttribute.java | 32 ++-
.../rocketmq/common/attribute/EnumAttribute.java | 32 ++-
.../common/attribute/LongRangeAttribute.java | 35 ++-
.../protocol/header/CreateTopicRequestHeader.java | 11 +
.../common/attribute/AttributeParserTest.java | 70 +++++
.../rocketmq/common/attribute/AttributeTest.java | 70 +++++
.../apache/rocketmq/store/util/QueueTypeUtils.java | 20 ++
.../tools/command/topic/UpdateTopicSubCommand.java | 15 +
17 files changed, 873 insertions(+), 107 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index f30953d..6505263 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -33,12 +33,14 @@ import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
+import org.apache.rocketmq.common.TopicAttributes;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.attribute.AttributeParser;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
@@ -294,12 +296,18 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());
topicConfig.setPerm(requestHeader.getPerm());
topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
+ String attributesModification = requestHeader.getAttributes();
+ topicConfig.setAttributes(AttributeParser.parseToMap(attributesModification));
- this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
-
- this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
-
- response.setCode(ResponseCode.SUCCESS);
+ try {
+ this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
+ this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
+ response.setCode(ResponseCode.SUCCESS);
+ } catch (Exception e) {
+ log.error("Update / create topic failed for [{}]", request, e);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(e.getMessage());
+ }
return response;
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 4af230d..ba40538 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.broker.topic;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
@@ -25,11 +27,16 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
+import org.apache.rocketmq.common.attribute.Attribute;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicAttributes;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
@@ -40,6 +47,8 @@ import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import static com.google.common.base.Preconditions.checkNotNull;
+
public class TopicConfigManager extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long LOCK_TIMEOUT_MILLIS = 3000;
@@ -353,6 +362,18 @@ public class TopicConfigManager extends ConfigManager {
}
public void updateTopicConfig(final TopicConfig topicConfig) {
+ checkNotNull(topicConfig, "topicConfig shouldn't be null");
+
+ Map<String, String> newAttributes = request(topicConfig);
+ Map<String, String> currentAttributes = current(topicConfig.getTopicName());
+
+ Map<String, String> finalAttributes = alterCurrentAttributes(
+ this.topicConfigTable.get(topicConfig.getTopicName()) == null,
+ ImmutableMap.copyOf(currentAttributes),
+ ImmutableMap.copyOf(newAttributes));
+
+ topicConfig.setAttributes(finalAttributes);
+
TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
if (old != null) {
log.info("update topic config, old:[{}] new:[{}]", old, topicConfig);
@@ -398,6 +419,11 @@ public class TopicConfigManager extends ConfigManager {
}
}
+ // make it testable
+ public Map<String, Attribute> allAttributes() {
+ return TopicAttributes.ALL;
+ }
+
public boolean isOrderTopic(final String topic) {
TopicConfig topicConfig = this.topicConfigTable.get(topic);
if (topicConfig == null) {
@@ -471,4 +497,121 @@ public class TopicConfigManager extends ConfigManager {
public ConcurrentMap<String, TopicConfig> getTopicConfigTable() {
return topicConfigTable;
}
+
+ private Map<String, String> request(TopicConfig topicConfig) {
+ return topicConfig.getAttributes() == null ? new HashMap<>() : topicConfig.getAttributes();
+ }
+
+ private Map<String, String> current(String topic) {
+ TopicConfig topicConfig = this.topicConfigTable.get(topic);
+ if (topicConfig == null) {
+ return new HashMap<>();
+ } else {
+ Map<String, String> attributes = topicConfig.getAttributes();
+ if (attributes == null) {
+ return new HashMap<>();
+ } else {
+ return attributes;
+ }
+ }
+ }
+
+ private Map<String, String> alterCurrentAttributes(boolean create, ImmutableMap<String, String> currentAttributes, ImmutableMap<String, String> newAttributes) {
+ Map<String, String> init = new HashMap<>();
+ Map<String, String> add = new HashMap<>();
+ Map<String, String> update = new HashMap<>();
+ Map<String, String> delete = new HashMap<>();
+ Set<String> keys = new HashSet<>();
+
+ for (Entry<String, String> attribute : newAttributes.entrySet()) {
+ String key = attribute.getKey();
+ String realKey = realKey(key);
+ String value = attribute.getValue();
+
+ validate(realKey);
+ duplicationCheck(keys, realKey);
+
+ if (create) {
+ if (key.startsWith("+")) {
+ init.put(realKey, value);
+ } else {
+ throw new RuntimeException("only add attribute is supported while creating topic. key: " + realKey);
+ }
+ } else {
+ if (key.startsWith("+")) {
+ if (!currentAttributes.containsKey(realKey)) {
+ add.put(realKey, value);
+ } else {
+ update.put(realKey, value);
+ }
+ } else if (key.startsWith("-")) {
+ if (!currentAttributes.containsKey(realKey)) {
+ throw new RuntimeException("attempt to delete a nonexistent key: " + realKey);
+ }
+ delete.put(realKey, value);
+ } else {
+ throw new RuntimeException("wrong format key: " + realKey);
+ }
+ }
+ }
+
+ validateAlter(init, true, false);
+ validateAlter(add, false, false);
+ validateAlter(update, false, false);
+ validateAlter(delete, false, true);
+
+ log.info("add: {}, update: {}, delete: {}", add, update, delete);
+ HashMap<String, String> finalAttributes = new HashMap<>(currentAttributes);
+ finalAttributes.putAll(init);
+ finalAttributes.putAll(add);
+ finalAttributes.putAll(update);
+ for (String s : delete.keySet()) {
+ finalAttributes.remove(s);
+ }
+ return finalAttributes;
+ }
+
+ private void duplicationCheck(Set<String> keys, String key) {
+ boolean notExist = keys.add(key);
+ if (!notExist) {
+ throw new RuntimeException("alter duplication key. key: " + key);
+ }
+ }
+
+ private void validate(String kvAttribute) {
+ if (Strings.isNullOrEmpty(kvAttribute)) {
+ throw new RuntimeException("kv string format wrong.");
+ }
+
+ if (kvAttribute.contains("+")) {
+ throw new RuntimeException("kv string format wrong.");
+ }
+
+ if (kvAttribute.contains("-")) {
+ throw new RuntimeException("kv string format wrong.");
+ }
+ }
+
+ private void validateAlter(Map<String, String> alter, boolean init, boolean delete) {
+ for (Entry<String, String> entry : alter.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ Attribute attribute = allAttributes().get(key);
+ if (attribute == null) {
+ throw new RuntimeException("unsupported key: " + key);
+ }
+ if (!init && !attribute.isChangeable()) {
+ throw new RuntimeException("attempt to update an unchangeable attribute. key: " + key);
+ }
+
+ if (!delete) {
+ attribute.verify(value);
+ }
+ }
+ }
+
+ private String realKey(String key) {
+ return key.substring(1);
+ }
}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
new file mode 100644
index 0000000..9853c36
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.topic;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.attribute.Attribute;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.TopicAttributes;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.attribute.BooleanAttribute;
+import org.apache.rocketmq.common.attribute.EnumAttribute;
+import org.apache.rocketmq.common.attribute.LongRangeAttribute;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.queue.CQType;
+import org.apache.rocketmq.store.util.QueueTypeUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.collect.Sets.newHashSet;
+import static java.util.Arrays.asList;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TopicConfigManagerTest {
+ private TopicConfigManager topicConfigManager;
+ @Mock
+ private BrokerController brokerController;
+
+ @Before
+ public void init() {
+ BrokerConfig brokerConfig = new BrokerConfig();
+ when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+
+ topicConfigManager = new TopicConfigManager(brokerController);
+ }
+
+ @Test
+ public void testAddUnsupportedKeyOnCreating() {
+ String unsupportedKey = "key4";
+
+ supportAttributes(asList(
+ new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"),
+ new BooleanAttribute("bool.key", false, false),
+ new LongRangeAttribute("long.range.key", true, 10, 20, 15)
+ ));
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("+enum.key", "enum-2");
+ attributes.put("+" + unsupportedKey, "value1");
+
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setTopicName("new-topic");
+ topicConfig.setAttributes(attributes);
+
+ RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig));
+ Assert.assertEquals("unsupported key: " + unsupportedKey, runtimeException.getMessage());
+ }
+
+ @Test
+ public void testAddWrongFormatKeyOnCreating() {
+ supportAttributes(asList(
+ new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"),
+ new BooleanAttribute("bool.key", false, false),
+ new LongRangeAttribute("long.range.key", true, 10, 20, 15)
+ ));
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("++enum.key", "value1");
+
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setTopicName("new-topic");
+ topicConfig.setAttributes(attributes);
+
+ RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig));
+ Assert.assertEquals("kv string format wrong.", runtimeException.getMessage());
+ }
+
+ @Test
+ public void testDeleteKeyOnCreating() {
+ String key = "enum.key";
+ supportAttributes(asList(
+ new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"),
+ new BooleanAttribute("bool.key", false, false),
+ new LongRangeAttribute("long.range.key", true, 10, 20, 15)
+ ));
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("-" + key, "");
+
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setTopicName("new-topic");
+ topicConfig.setAttributes(attributes);
+
+ RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig));
+ Assert.assertEquals("only add attribute is supported while creating topic. key: " + key, runtimeException.getMessage());
+ }
+
+ @Test
+ public void testAddWrongValueOnCreating() {
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("+" + TopicAttributes.queueType.getName(), "wrong-value");
+
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setTopicName("new-topic");
+ topicConfig.setAttributes(attributes);
+
+ RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig));
+ Assert.assertEquals("value is not in set: [SimpleCQ, BatchCQ]", runtimeException.getMessage());
+ }
+
+ @Test
+ public void testNormalAddKeyOnCreating() {
+ String topic = "new-topic";
+
+ supportAttributes(asList(
+ new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"),
+ new BooleanAttribute("bool.key", false, false),
+ new LongRangeAttribute("long.range.key", true, 10, 20, 15)
+ ));
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("+enum.key", "enum-2");
+ attributes.put("+long.range.key", "16");
+
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setTopicName(topic);
+ topicConfig.setAttributes(attributes);
+ topicConfigManager.updateTopicConfig(topicConfig);
+
+ TopicConfig existingTopicConfig = topicConfigManager.getTopicConfigTable().get(topic);
+ Assert.assertEquals("enum-2", existingTopicConfig.getAttributes().get("enum.key"));
+ Assert.assertEquals("16", existingTopicConfig.getAttributes().get("long.range.key"));
+// assert file
+ }
+
+ @Test
+ public void testAddDuplicatedKeyOnUpdating() {
+ String duplicatedKey = "long.range.key";
+
+ supportAttributes(asList(
+ new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"),
+ new BooleanAttribute("bool.key", false, false),
+ new LongRangeAttribute("long.range.key", true, 10, 20, 15)
+ ));
+
+ createTopic();
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("+" + duplicatedKey, "11");
+ attributes.put("-" + duplicatedKey, "");
+
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setTopicName("new-topic");
+ topicConfig.setAttributes(attributes);
+
+ RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig));
+ Assert.assertEquals("alter duplication key. key: " + duplicatedKey, runtimeException.getMessage());
+ }
+
+ private void createTopic() {
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("+enum.key", "enum-3");
+ attributes.put("+bool.key", "true");
+ attributes.put("+long.range.key", "12");
+
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setTopicName("new-topic");
+ topicConfig.setAttributes(attributes);
+
+ topicConfigManager.updateTopicConfig(topicConfig);
+ }
+
+ @Test
+ public void testDeleteNonexistentKeyOnUpdating() {
+ String key = "nonexisting.key";
+
+ supportAttributes(asList(
+ new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"),
+ new BooleanAttribute("bool.key", false, false),
+ new LongRangeAttribute("long.range.key", true, 10, 20, 15)
+ ));
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("+enum.key", "enum-2");
+ attributes.put("+bool.key", "true");
+
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setTopicName("new-topic");
+ topicConfig.setAttributes(attributes);
+
+ topicConfigManager.updateTopicConfig(topicConfig);
+
+ attributes = new HashMap<>();
+ attributes.clear();
+ attributes.put("-" + key, "");
+ topicConfig.setAttributes(attributes);
+ RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig));
+ Assert.assertEquals("attempt to delete a nonexistent key: " + key, runtimeException.getMessage());
+ }
+
+ @Test
+ public void testAlterTopicWithoutChangingAttributes() {
+ String topic = "new-topic";
+
+ supportAttributes(asList(
+ new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"),
+ new BooleanAttribute("bool.key", false, false),
+ new LongRangeAttribute("long.range.key", true, 10, 20, 15)
+ ));
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("+enum.key", "enum-2");
+ attributes.put("+bool.key", "true");
+
+ TopicConfig topicConfigInit = new TopicConfig();
+ topicConfigInit.setTopicName(topic);
+ topicConfigInit.setAttributes(attributes);
+
+ topicConfigManager.updateTopicConfig(topicConfigInit);
+ Assert.assertEquals("enum-2", topicConfigManager.getTopicConfigTable().get(topic).getAttributes().get("enum.key"));
+ Assert.assertEquals("true", topicConfigManager.getTopicConfigTable().get(topic).getAttributes().get("bool.key"));
+
+ TopicConfig topicConfigAlter = new TopicConfig();
+ topicConfigAlter.setTopicName(topic);
+ topicConfigAlter.setReadQueueNums(10);
+ topicConfigAlter.setWriteQueueNums(10);
+ topicConfigManager.updateTopicConfig(topicConfigAlter);
+ Assert.assertEquals("enum-2", topicConfigManager.getTopicConfigTable().get(topic).getAttributes().get("enum.key"));
+ Assert.assertEquals("true", topicConfigManager.getTopicConfigTable().get(topic).getAttributes().get("bool.key"));
+ }
+
+ @Test
+ public void testNormalUpdateUnchangeableKeyOnUpdating() {
+ String topic = "exist-topic";
+
+ supportAttributes(asList(
+ new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"),
+ new BooleanAttribute("bool.key", true, false),
+ new LongRangeAttribute("long.range.key", false, 10, 20, 15)
+ ));
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("+long.range.key", "14");
+
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setTopicName(topic);
+ topicConfig.setAttributes(attributes);
+
+ topicConfigManager.updateTopicConfig(topicConfig);
+
+ attributes.put("+long.range.key", "16");
+ topicConfig.setAttributes(attributes);
+ RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig));
+ Assert.assertEquals("attempt to update an unchangeable attribute. key: long.range.key", runtimeException.getMessage());
+ }
+
+ @Test
+ public void testNormalQueryKeyOnGetting() {
+ String topic = "exist-topic";
+ String unchangeable = "bool.key";
+
+ supportAttributes(asList(
+ new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"),
+ new BooleanAttribute("bool.key", false, false),
+ new LongRangeAttribute("long.range.key", true, 10, 20, 15)
+ ));
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("+" + unchangeable, "true");
+
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setTopicName(topic);
+ topicConfig.setAttributes(attributes);
+
+ topicConfigManager.updateTopicConfig(topicConfig);
+
+ TopicConfig topicConfigUpdated = topicConfigManager.getTopicConfigTable().get(topic);
+ Assert.assertEquals(CQType.SimpleCQ, QueueTypeUtils.getCQType(topicConfigUpdated));
+
+ Assert.assertEquals("true", topicConfigUpdated.getAttributes().get(unchangeable));
+ }
+
+ private void supportAttributes(List<Attribute> supportAttributes) {
+ Map<String, Attribute> supportedAttributes = new HashMap<>();
+
+ for (Attribute supportAttribute : supportAttributes) {
+ supportedAttributes.put(supportAttribute.getName(), supportAttribute);
+ }
+
+ topicConfigManager = spy(topicConfigManager);
+ when(topicConfigManager.allAttributes()).thenReturn(supportedAttributes);
+ }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 20b8f41..cd0d05b 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -56,10 +56,12 @@ import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
+import org.apache.rocketmq.common.TopicAttributes;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.attribute.AttributeParser;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
@@ -332,6 +334,7 @@ public class MQClientAPIImpl {
requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
requestHeader.setOrder(topicConfig.isOrder());
+ requestHeader.setAttributes(AttributeParser.parseToString(topicConfig.getAttributes()));
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
diff --git a/common/pom.xml b/common/pom.xml
index dd8ea6a..bbbc713 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -27,19 +27,6 @@
<artifactId>rocketmq-common</artifactId>
<name>rocketmq-common ${project.version}</name>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>6</source>
- <target>6</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
diff --git a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java b/common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java
similarity index 56%
copy from store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
copy to common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java
index 18e7f5a..9c1e96f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java
@@ -14,22 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.store.util;
+package org.apache.rocketmq.common;
-import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.MessageStore;
-import org.apache.rocketmq.store.StreamMessageStore;
-import org.apache.rocketmq.store.queue.CQType;
+import org.apache.rocketmq.common.attribute.Attribute;
+import org.apache.rocketmq.common.attribute.EnumAttribute;
-public class QueueTypeUtils {
+import java.util.HashMap;
+import java.util.Map;
- public static CQType getCQType(MessageStore messageStore) {
- if (messageStore instanceof DefaultMessageStore) {
- return CQType.SimpleCQ;
- } else if (messageStore instanceof StreamMessageStore) {
- return CQType.BatchCQ;
- } else {
- throw new RuntimeException("new cq type is not supported now.");
- }
+import static com.google.common.collect.Sets.newHashSet;
+
+public class TopicAttributes {
+ public static final EnumAttribute queueType = new EnumAttribute(
+ "queue.type",
+ false,
+ newHashSet("BatchCQ", "SimpleCQ"),
+ "SimpleCQ"
+ );
+ public static final Map<String, Attribute> ALL;
+
+ static {
+ ALL = new HashMap<>();
+ ALL.put(queueType.getName(), queueType);
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
index ec4d54b..610c3e2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
@@ -18,6 +18,8 @@ package org.apache.rocketmq.common;
import org.apache.rocketmq.common.constant.PermName;
+import java.util.Map;
+
public class TopicConfig {
private static final String SEPARATOR = " ";
public static int defaultReadQueueNums = 16;
@@ -29,6 +31,7 @@ public class TopicConfig {
private TopicFilterType topicFilterType = TopicFilterType.SINGLE_TAG;
private int topicSysFlag = 0;
private boolean order = false;
+ private Map<String, String> attributes;
public TopicConfig() {
}
@@ -72,6 +75,8 @@ public class TopicConfig {
sb.append(SEPARATOR);
sb.append(this.topicFilterType);
+ // Leave the encode/decode [attributes] out for now
+
return sb.toString();
}
@@ -150,29 +155,29 @@ public class TopicConfig {
this.order = isOrder;
}
- @Override
- public boolean equals(final Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
+ public Map<String, String> getAttributes() {
+ return attributes;
+ }
+
+ public void setAttributes(Map<String, String> attributes) {
+ this.attributes = attributes;
+ }
- final TopicConfig that = (TopicConfig) o;
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
- if (readQueueNums != that.readQueueNums)
- return false;
- if (writeQueueNums != that.writeQueueNums)
- return false;
- if (perm != that.perm)
- return false;
- if (topicSysFlag != that.topicSysFlag)
- return false;
- if (order != that.order)
- return false;
- if (topicName != null ? !topicName.equals(that.topicName) : that.topicName != null)
- return false;
- return topicFilterType == that.topicFilterType;
+ TopicConfig that = (TopicConfig) o;
+ if (readQueueNums != that.readQueueNums) return false;
+ if (writeQueueNums != that.writeQueueNums) return false;
+ if (perm != that.perm) return false;
+ if (topicSysFlag != that.topicSysFlag) return false;
+ if (order != that.order) return false;
+ if (topicName != null ? !topicName.equals(that.topicName) : that.topicName != null) return false;
+ if (topicFilterType != that.topicFilterType) return false;
+ return attributes != null ? attributes.equals(that.attributes) : that.attributes == null;
}
@Override
@@ -184,6 +189,7 @@ public class TopicConfig {
result = 31 * result + (topicFilterType != null ? topicFilterType.hashCode() : 0);
result = 31 * result + topicSysFlag;
result = 31 * result + (order ? 1 : 0);
+ result = 31 * result + (attributes != null ? attributes.hashCode() : 0);
return result;
}
@@ -191,7 +197,7 @@ public class TopicConfig {
public String toString() {
return "TopicConfig [topicName=" + topicName + ", readQueueNums=" + readQueueNums
+ ", writeQueueNums=" + writeQueueNums + ", perm=" + PermName.perm2String(perm)
- + ", topicFilterType=" + topicFilterType + ", topicSysFlag=" + topicSysFlag + ", order="
- + order + "]";
+ + ", topicFilterType=" + topicFilterType + ", topicSysFlag=" + topicSysFlag + ", order=" + order
+ + ", attributes=" + attributes + "]";
}
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java b/common/src/main/java/org/apache/rocketmq/common/attribute/Attribute.java
similarity index 56%
copy from store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
copy to common/src/main/java/org/apache/rocketmq/common/attribute/Attribute.java
index 18e7f5a..ba9be3b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/attribute/Attribute.java
@@ -14,22 +14,32 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.store.util;
+package org.apache.rocketmq.common.attribute;
-import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.MessageStore;
-import org.apache.rocketmq.store.StreamMessageStore;
-import org.apache.rocketmq.store.queue.CQType;
+public abstract class Attribute {
+ protected String name;
+ protected boolean changeable;
-public class QueueTypeUtils {
+ public abstract void verify(String value);
- public static CQType getCQType(MessageStore messageStore) {
- if (messageStore instanceof DefaultMessageStore) {
- return CQType.SimpleCQ;
- } else if (messageStore instanceof StreamMessageStore) {
- return CQType.BatchCQ;
- } else {
- throw new RuntimeException("new cq type is not supported now.");
- }
+ public Attribute(String name, boolean changeable) {
+ this.name = name;
+ this.changeable = changeable;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public boolean isChangeable() {
+ return changeable;
+ }
+
+ public void setChangeable(boolean changeable) {
+ this.changeable = changeable;
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/attribute/AttributeParser.java b/common/src/main/java/org/apache/rocketmq/common/attribute/AttributeParser.java
new file mode 100644
index 0000000..7ee7afc
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/attribute/AttributeParser.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.attribute;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AttributeParser {
+ public static Map<String, String> parseToMap(String attributesModification) {
+ if (Strings.isNullOrEmpty(attributesModification)) {
+ return new HashMap<>();
+ }
+
+ // format: +key1=value1,+key2=value2,-key3,+key4=value4
+ Map<String, String> attributes = new HashMap<>();
+ String arraySeparator = ",";
+ String kvSeparator = "=";
+ String[] kvs = attributesModification.split(arraySeparator);
+ for (String kv : kvs) {
+ String key;
+ String value;
+ if (kv.contains(kvSeparator)) {
+ key = kv.split(kvSeparator)[0];
+ value = kv.split(kvSeparator)[1];
+ if (!key.contains("+")) {
+ throw new RuntimeException("add/alter attribute format is wrong: " + key);
+ }
+ } else {
+ key = kv;
+ value = "";
+ if (!key.contains("-")) {
+ throw new RuntimeException("delete attribute format is wrong: " + key);
+ }
+ }
+ String old = attributes.put(key, value);
+ if (old != null) {
+ throw new RuntimeException("key duplication: " + key);
+ }
+ }
+ return attributes;
+ }
+
+ public static String parseToString(Map<String, String> attributes) {
+ if (attributes == null || attributes.size() == 0) {
+ return "";
+ }
+
+ List<String> kvs = new ArrayList<>();
+ for (Map.Entry<String, String> entry : attributes.entrySet()) {
+
+ String value = entry.getValue();
+ if (Strings.isNullOrEmpty(value)) {
+ kvs.add(entry.getKey());
+ } else {
+ kvs.add(entry.getKey() + "=" + entry.getValue());
+ }
+ }
+ return Joiner.on(",").join(kvs);
+ }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java b/common/src/main/java/org/apache/rocketmq/common/attribute/BooleanAttribute.java
similarity index 54%
copy from store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
copy to common/src/main/java/org/apache/rocketmq/common/attribute/BooleanAttribute.java
index 18e7f5a..41ad748 100644
--- a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/attribute/BooleanAttribute.java
@@ -14,22 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.store.util;
+package org.apache.rocketmq.common.attribute;
-import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.MessageStore;
-import org.apache.rocketmq.store.StreamMessageStore;
-import org.apache.rocketmq.store.queue.CQType;
+import static com.google.common.base.Preconditions.checkNotNull;
-public class QueueTypeUtils {
+public class BooleanAttribute extends Attribute {
+ private final boolean defaultValue;
- public static CQType getCQType(MessageStore messageStore) {
- if (messageStore instanceof DefaultMessageStore) {
- return CQType.SimpleCQ;
- } else if (messageStore instanceof StreamMessageStore) {
- return CQType.BatchCQ;
- } else {
- throw new RuntimeException("new cq type is not supported now.");
+ public BooleanAttribute(String name, boolean changeable, boolean defaultValue) {
+ super(name, changeable);
+ this.defaultValue = defaultValue;
+ }
+
+ @Override
+ public void verify(String value) {
+ checkNotNull(value);
+
+ if (!"false".equalsIgnoreCase(value) && !"true".equalsIgnoreCase(value)) {
+ throw new RuntimeException("boolean attribute format is wrong.");
}
}
+
+ public boolean getDefaultValue() {
+ return defaultValue;
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java b/common/src/main/java/org/apache/rocketmq/common/attribute/EnumAttribute.java
similarity index 55%
copy from store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
copy to common/src/main/java/org/apache/rocketmq/common/attribute/EnumAttribute.java
index 18e7f5a..5353b8a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/attribute/EnumAttribute.java
@@ -14,22 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.store.util;
+package org.apache.rocketmq.common.attribute;
-import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.MessageStore;
-import org.apache.rocketmq.store.StreamMessageStore;
-import org.apache.rocketmq.store.queue.CQType;
+import java.util.Set;
-public class QueueTypeUtils {
+public class EnumAttribute extends Attribute {
+ private final Set<String> universe;
+ private final String defaultValue;
- public static CQType getCQType(MessageStore messageStore) {
- if (messageStore instanceof DefaultMessageStore) {
- return CQType.SimpleCQ;
- } else if (messageStore instanceof StreamMessageStore) {
- return CQType.BatchCQ;
- } else {
- throw new RuntimeException("new cq type is not supported now.");
+ public EnumAttribute(String name, boolean changeable, Set<String> universe, String defaultValue) {
+ super(name, changeable);
+ this.universe = universe;
+ this.defaultValue = defaultValue;
+ }
+
+ @Override
+ public void verify(String value) {
+ if (!this.universe.contains(value)) {
+ throw new RuntimeException("value is not in set: " + this.universe);
}
}
+
+ public String getDefaultValue() {
+ return defaultValue;
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java b/common/src/main/java/org/apache/rocketmq/common/attribute/LongRangeAttribute.java
similarity index 52%
copy from store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
copy to common/src/main/java/org/apache/rocketmq/common/attribute/LongRangeAttribute.java
index 18e7f5a..eeeda72 100644
--- a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/attribute/LongRangeAttribute.java
@@ -14,22 +14,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.store.util;
+package org.apache.rocketmq.common.attribute;
-import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.MessageStore;
-import org.apache.rocketmq.store.StreamMessageStore;
-import org.apache.rocketmq.store.queue.CQType;
+import static java.lang.String.format;
-public class QueueTypeUtils {
+public class LongRangeAttribute extends Attribute {
+ private final long min;
+ private final long max;
+ private final long defaultValue;
- public static CQType getCQType(MessageStore messageStore) {
- if (messageStore instanceof DefaultMessageStore) {
- return CQType.SimpleCQ;
- } else if (messageStore instanceof StreamMessageStore) {
- return CQType.BatchCQ;
- } else {
- throw new RuntimeException("new cq type is not supported now.");
+ public LongRangeAttribute(String name, boolean changeable, long min, long max, long defaultValue) {
+ super(name, changeable);
+ this.min = min;
+ this.max = max;
+ this.defaultValue = defaultValue;
+ }
+
+ @Override
+ public void verify(String value) {
+ long l = Long.parseLong(value);
+ if (l < min || l > max) {
+ throw new RuntimeException(format("value is not in range(%d, %d)", min, max));
}
}
+
+ public long getDefaultValue() {
+ return defaultValue;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
index 290ec4c..2e381b3 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
@@ -26,6 +26,8 @@ import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import java.util.Map;
+
public class CreateTopicRequestHeader implements CommandCustomHeader {
@CFNotNull
private String topic;
@@ -42,6 +44,7 @@ public class CreateTopicRequestHeader implements CommandCustomHeader {
private Integer topicSysFlag;
@CFNotNull
private Boolean order = false;
+ private String attributes;
@CFNullable
private Boolean force = false;
@@ -130,4 +133,12 @@ public class CreateTopicRequestHeader implements CommandCustomHeader {
public void setForce(Boolean force) {
this.force = force;
}
+
+ public String getAttributes() {
+ return attributes;
+ }
+
+ public void setAttributes(String attributes) {
+ this.attributes = attributes;
+ }
}
diff --git a/common/src/test/java/org/apache/rocketmq/common/attribute/AttributeParserTest.java b/common/src/test/java/org/apache/rocketmq/common/attribute/AttributeParserTest.java
new file mode 100644
index 0000000..1239810
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/attribute/AttributeParserTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.attribute;
+
+import com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.collect.Maps.newHashMap;
+import static org.junit.Assert.assertTrue;
+
+public class AttributeParserTest {
+ @Test
+ public void testParseToMap() {
+ Assert.assertEquals(0, AttributeParser.parseToMap(null).size());
+ AttributeParser.parseToMap("++=++");
+ AttributeParser.parseToMap("--");
+ Assert.assertThrows(RuntimeException.class, () -> AttributeParser.parseToMap("x"));
+ Assert.assertThrows(RuntimeException.class, () -> AttributeParser.parseToMap("+"));
+ Assert.assertThrows(RuntimeException.class, () -> AttributeParser.parseToMap("++"));
+ }
+
+ @Test
+ public void testParseToString() {
+ Assert.assertEquals("", AttributeParser.parseToString(null));
+ Assert.assertEquals("", AttributeParser.parseToString(newHashMap()));
+ HashMap<String, String> map = new HashMap<>();
+ int addSize = 10;
+ for (int i = 0; i < addSize; i++) {
+ map.put("+add.key" + i, "value" + i);
+ }
+ int deleteSize = 10;
+ for (int i = 0; i < deleteSize; i++) {
+ map.put("-delete.key" + i, "");
+ }
+ Assert.assertEquals(addSize + deleteSize, AttributeParser.parseToString(map).split(",").length);
+ }
+
+ @Test
+ public void testParseBetweenStringAndMapWithoutDistortion() {
+ List<String> testCases = Arrays.asList("-a", "+a=b,+c=d,+z=z,+e=e", "+a=b,-d", "+a=b", "-a,-b");
+ for (String testCase : testCases) {
+ assertTrue(Maps.difference(AttributeParser.parseToMap(testCase), AttributeParser.parseToMap(parse(testCase))).areEqual());
+ }
+ }
+
+ private String parse(String original) {
+ Map<String, String> stringStringMap = AttributeParser.parseToMap(original);
+ return AttributeParser.parseToString(stringStringMap);
+ }
+}
diff --git a/common/src/test/java/org/apache/rocketmq/common/attribute/AttributeTest.java b/common/src/test/java/org/apache/rocketmq/common/attribute/AttributeTest.java
new file mode 100644
index 0000000..39a12b9
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/attribute/AttributeTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.attribute;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static com.google.common.collect.Sets.newHashSet;
+
+public class AttributeTest {
+
+ @Test
+ public void testEnumAttribute() {
+ EnumAttribute enumAttribute = new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1");
+
+ Assert.assertThrows(RuntimeException.class, () -> enumAttribute.verify(""));
+ Assert.assertThrows(RuntimeException.class, () -> enumAttribute.verify("x"));
+ Assert.assertThrows(RuntimeException.class, () -> enumAttribute.verify("enum-4"));
+
+ enumAttribute.verify("enum-1");
+ enumAttribute.verify("enum-2");
+ enumAttribute.verify("enum-3");
+ }
+
+ @Test
+ public void testLongRangeAttribute() {
+ LongRangeAttribute longRangeAttribute = new LongRangeAttribute("long.range.key", true, 10, 20, 15);
+ Assert.assertThrows(RuntimeException.class, () -> longRangeAttribute.verify(""));
+ Assert.assertThrows(RuntimeException.class, () -> longRangeAttribute.verify(","));
+ Assert.assertThrows(RuntimeException.class, () -> longRangeAttribute.verify("a"));
+ Assert.assertThrows(RuntimeException.class, () -> longRangeAttribute.verify("-1"));
+ Assert.assertThrows(RuntimeException.class, () -> longRangeAttribute.verify("21"));
+
+ longRangeAttribute.verify("11");
+ longRangeAttribute.verify("10");
+ longRangeAttribute.verify("20");
+ }
+
+ @Test
+ public void testBooleanAttribute() {
+ BooleanAttribute booleanAttribute = new BooleanAttribute("bool.key", false, false);
+
+ Assert.assertThrows(RuntimeException.class, () -> booleanAttribute.verify(""));
+ Assert.assertThrows(RuntimeException.class, () -> booleanAttribute.verify("a"));
+ Assert.assertThrows(RuntimeException.class, () -> booleanAttribute.verify(","));
+ Assert.assertThrows(RuntimeException.class, () -> booleanAttribute.verify("checked"));
+ Assert.assertThrows(RuntimeException.class, () -> booleanAttribute.verify("1"));
+ Assert.assertThrows(RuntimeException.class, () -> booleanAttribute.verify("0"));
+ Assert.assertThrows(RuntimeException.class, () -> booleanAttribute.verify("-1"));
+
+ booleanAttribute.verify("true");
+ booleanAttribute.verify("tRue");
+ booleanAttribute.verify("false");
+ booleanAttribute.verify("falSe");
+ }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java b/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
index 18e7f5a..612bf7e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
+++ b/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
@@ -16,13 +16,18 @@
*/
package org.apache.rocketmq.store.util;
+import org.apache.rocketmq.common.TopicAttributes;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.StreamMessageStore;
import org.apache.rocketmq.store.queue.CQType;
+import java.util.Map;
+
public class QueueTypeUtils {
+ @Deprecated
public static CQType getCQType(MessageStore messageStore) {
if (messageStore instanceof DefaultMessageStore) {
return CQType.SimpleCQ;
@@ -32,4 +37,19 @@ public class QueueTypeUtils {
throw new RuntimeException("new cq type is not supported now.");
}
}
+
+ public static CQType getCQType(TopicConfig topicConfig) {
+ String attributeName = TopicAttributes.queueType.getName();
+
+ Map<String, String> attributes = topicConfig.getAttributes();
+ if (attributes == null || attributes.size() == 0) {
+ return CQType.valueOf(TopicAttributes.queueType.getDefaultValue());
+ }
+
+ if (attributes.containsKey(attributeName)) {
+ return CQType.valueOf(attributes.get(attributeName));
+ } else {
+ return CQType.valueOf(TopicAttributes.queueType.getDefaultValue());
+ }
+ }
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java
index c33ae52..3caa477 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java
@@ -16,12 +16,16 @@
*/
package org.apache.rocketmq.tools.command.topic;
+import java.util.Map;
import java.util.Set;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.TopicAttributes;
import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.attribute.AttributeParser;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
@@ -55,6 +59,10 @@ public class UpdateTopicSubCommand implements SubCommand {
optionGroup.setRequired(true);
options.addOptionGroup(optionGroup);
+ opt = new Option("a", "attributes", true, "attribute(+a=b,+c=d,-e)");
+ opt.setRequired(false);
+ options.addOption(opt);
+
opt = new Option("t", "topic", true, "topic name");
opt.setRequired(true);
options.addOption(opt);
@@ -98,6 +106,12 @@ public class UpdateTopicSubCommand implements SubCommand {
topicConfig.setWriteQueueNums(8);
topicConfig.setTopicName(commandLine.getOptionValue('t').trim());
+ if (commandLine.hasOption('a')) {
+ String attributesModification = commandLine.getOptionValue('a').trim();
+ Map<String, String> attributes = AttributeParser.parseToMap(attributesModification);
+ topicConfig.setAttributes(attributes);
+ }
+
// readQueueNums
if (commandLine.hasOption('r')) {
topicConfig.setReadQueueNums(Integer.parseInt(commandLine.getOptionValue('r').trim()));
@@ -187,4 +201,5 @@ public class UpdateTopicSubCommand implements SubCommand {
defaultMQAdminExt.shutdown();
}
}
+
}