You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:53 UTC
[36/43] incubator-rocketmq git commit: Finish code dump. Reviewed by:
@yukon @vongosling @stevenschew @vintagewang @lollipop @zander
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/subscription/SubscriptionGroupManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/subscription/SubscriptionGroupManager.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/subscription/SubscriptionGroupManager.java
new file mode 100644
index 0000000..d954a46
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.subscription;
+
+import com.alibaba.rocketmq.broker.BrokerController;
+import com.alibaba.rocketmq.broker.BrokerPathConfigHelper;
+import com.alibaba.rocketmq.common.ConfigManager;
+import com.alibaba.rocketmq.common.DataVersion;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SubscriptionGroupManager extends ConfigManager {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+ private final ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
+ new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);
+ private final DataVersion dataVersion = new DataVersion();
+ private transient BrokerController brokerController;
+
+
+ public SubscriptionGroupManager() {
+ this.init();
+ }
+
+ private void init() {
+ {
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP);
+ this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig);
+ }
+
+ {
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP);
+ this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig);
+ }
+
+ {
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP);
+ this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig);
+ }
+
+ {
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP);
+ subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+ this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig);
+ }
+
+ {
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP);
+ subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+ this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig);
+ }
+
+ {
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP);
+ subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+ this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig);
+ }
+
+ {
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP);
+ subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+ this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig);
+ }
+ }
+
+
+ public SubscriptionGroupManager(BrokerController brokerController) {
+ this.brokerController = brokerController;
+ this.init();
+ }
+
+
+ public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
+ SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);
+ if (old != null) {
+ log.info("update subscription group config, old: " + old + " new: " + config);
+ } else {
+ log.info("create new subscription group, " + config);
+ }
+
+ this.dataVersion.nextVersion();
+
+ this.persist();
+ }
+
+ public void disableConsume(final String groupName) {
+ SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName);
+ if (old != null) {
+ old.setConsumeEnable(false);
+ this.dataVersion.nextVersion();
+ }
+ }
+
+
+ public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {
+ SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);
+ if (null == subscriptionGroupConfig) {
+ if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {
+ subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(group);
+ SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);
+ if (null == preConfig) {
+ log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());
+ }
+ this.dataVersion.nextVersion();
+ this.persist();
+ }
+ }
+
+ return subscriptionGroupConfig;
+ }
+
+
+ @Override
+ public String encode() {
+ return this.encode(false);
+ }
+
+ @Override
+ public String configFilePath() {
+ //return BrokerPathConfigHelper.getSubscriptionGroupPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
+ return BrokerPathConfigHelper.getSubscriptionGroupPath(System.getProperty("user.home") + File.separator + "store");
+ }
+
+ @Override
+ public void decode(String jsonString) {
+ if (jsonString != null) {
+ SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);
+ if (obj != null) {
+ this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable);
+ this.dataVersion.assignNewOne(obj.dataVersion);
+ this.printLoadDataWhenFirstBoot(obj);
+ }
+ }
+ }
+
+ public String encode(final boolean prettyFormat) {
+ return RemotingSerializable.toJson(this, prettyFormat);
+ }
+
+ private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager sgm) {
+ Iterator<Entry<String, SubscriptionGroupConfig>> it = sgm.getSubscriptionGroupTable().entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, SubscriptionGroupConfig> next = it.next();
+ log.info("load exist subscription group, {}", next.getValue().toString());
+ }
+ }
+
+ public ConcurrentHashMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {
+ return subscriptionGroupTable;
+ }
+
+
+ public DataVersion getDataVersion() {
+ return dataVersion;
+ }
+
+
+ public void deleteSubscriptionGroupConfig(final String groupName) {
+ SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName);
+ if (old != null) {
+ log.info("delete subscription group OK, subscription group: " + old);
+ this.dataVersion.nextVersion();
+ this.persist();
+ } else {
+ log.warn("delete subscription group failed, subscription group: " + old + " not exist");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/topic/TopicConfigManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/topic/TopicConfigManager.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/topic/TopicConfigManager.java
new file mode 100644
index 0000000..94d7e9f
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/topic/TopicConfigManager.java
@@ -0,0 +1,440 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.topic;
+
+import com.alibaba.rocketmq.broker.BrokerController;
+import com.alibaba.rocketmq.broker.BrokerPathConfigHelper;
+import com.alibaba.rocketmq.common.ConfigManager;
+import com.alibaba.rocketmq.common.DataVersion;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.TopicConfig;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.constant.PermName;
+import com.alibaba.rocketmq.common.protocol.body.KVTable;
+import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import com.alibaba.rocketmq.common.sysflag.TopicSysFlag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class TopicConfigManager extends ConfigManager {
+ private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private static final long LOCK_TIMEOUT_MILLIS = 3000;
+ private transient final Lock lockTopicConfigTable = new ReentrantLock();
+
+ private final ConcurrentHashMap<String, TopicConfig> topicConfigTable =
+ new ConcurrentHashMap<String, TopicConfig>(1024);
+ private final DataVersion dataVersion = new DataVersion();
+ private final Set<String> systemTopicList = new HashSet<String>();
+ private transient BrokerController brokerController;
+
+
+ public TopicConfigManager() {
+ }
+
+
+ public TopicConfigManager(BrokerController brokerController) {
+ this.brokerController = brokerController;
+ {
+ // MixAll.SELF_TEST_TOPIC
+ String topic = MixAll.SELF_TEST_TOPIC;
+ TopicConfig topicConfig = new TopicConfig(topic);
+ this.systemTopicList.add(topic);
+ topicConfig.setReadQueueNums(1);
+ topicConfig.setWriteQueueNums(1);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ }
+ {
+ // MixAll.DEFAULT_TOPIC
+ if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
+ String topic = MixAll.DEFAULT_TOPIC;
+ TopicConfig topicConfig = new TopicConfig(topic);
+ this.systemTopicList.add(topic);
+ topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
+ .getDefaultTopicQueueNums());
+ topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
+ .getDefaultTopicQueueNums());
+ int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
+ topicConfig.setPerm(perm);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ }
+ }
+ {
+ // MixAll.BENCHMARK_TOPIC
+ String topic = MixAll.BENCHMARK_TOPIC;
+ TopicConfig topicConfig = new TopicConfig(topic);
+ this.systemTopicList.add(topic);
+ topicConfig.setReadQueueNums(1024);
+ topicConfig.setWriteQueueNums(1024);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ }
+ {
+
+ String topic = this.brokerController.getBrokerConfig().getBrokerClusterName();
+ TopicConfig topicConfig = new TopicConfig(topic);
+ this.systemTopicList.add(topic);
+ int perm = PermName.PERM_INHERIT;
+ if (this.brokerController.getBrokerConfig().isClusterTopicEnable()) {
+ perm |= PermName.PERM_READ | PermName.PERM_WRITE;
+ }
+ topicConfig.setPerm(perm);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ }
+ {
+
+ String topic = this.brokerController.getBrokerConfig().getBrokerName();
+ TopicConfig topicConfig = new TopicConfig(topic);
+ this.systemTopicList.add(topic);
+ int perm = PermName.PERM_INHERIT;
+ if (this.brokerController.getBrokerConfig().isBrokerTopicEnable()) {
+ perm |= PermName.PERM_READ | PermName.PERM_WRITE;
+ }
+ topicConfig.setReadQueueNums(1);
+ topicConfig.setWriteQueueNums(1);
+ topicConfig.setPerm(perm);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ }
+ {
+ // MixAll.OFFSET_MOVED_EVENT
+ String topic = MixAll.OFFSET_MOVED_EVENT;
+ TopicConfig topicConfig = new TopicConfig(topic);
+ this.systemTopicList.add(topic);
+ topicConfig.setReadQueueNums(1);
+ topicConfig.setWriteQueueNums(1);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ }
+ }
+
+
+ public boolean isSystemTopic(final String topic) {
+ return this.systemTopicList.contains(topic);
+ }
+
+
+ public Set<String> getSystemTopic() {
+ return this.systemTopicList;
+ }
+
+
+ public boolean isTopicCanSendMessage(final String topic) {
+ return !topic.equals(MixAll.DEFAULT_TOPIC);
+ }
+
+
+ public TopicConfig selectTopicConfig(final String topic) {
+ return this.topicConfigTable.get(topic);
+ }
+
+
+ public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
+ final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
+ TopicConfig topicConfig = null;
+ boolean createNew = false;
+
+ try {
+ if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ topicConfig = this.topicConfigTable.get(topic);
+ if (topicConfig != null)
+ return topicConfig;
+
+ TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
+ if (defaultTopicConfig != null) {
+ if (defaultTopic.equals(MixAll.DEFAULT_TOPIC)) {
+ if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
+ defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
+ }
+ }
+
+ if (PermName.isInherited(defaultTopicConfig.getPerm())) {
+ topicConfig = new TopicConfig(topic);
+
+ int queueNums =
+ clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig
+ .getWriteQueueNums() : clientDefaultTopicQueueNums;
+
+ if (queueNums < 0) {
+ queueNums = 0;
+ }
+
+ topicConfig.setReadQueueNums(queueNums);
+ topicConfig.setWriteQueueNums(queueNums);
+ int perm = defaultTopicConfig.getPerm();
+ perm &= ~PermName.PERM_INHERIT;
+ topicConfig.setPerm(perm);
+ topicConfig.setTopicSysFlag(topicSysFlag);
+ topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
+ } else {
+ LOG.warn("create new topic failed, because the default topic[" + defaultTopic
+ + "] no perm, " + defaultTopicConfig.getPerm() + " producer: "
+ + remoteAddress);
+ }
+ } else {
+ LOG.warn("create new topic failed, because the default topic[" + defaultTopic
+ + "] not exist." + " producer: " + remoteAddress);
+ }
+
+ if (topicConfig != null) {
+ LOG.info("create new topic by default topic[" + defaultTopic + "], " + topicConfig
+ + " producer: " + remoteAddress);
+
+ this.topicConfigTable.put(topic, topicConfig);
+
+ this.dataVersion.nextVersion();
+
+ createNew = true;
+
+ this.persist();
+ }
+ } finally {
+ this.lockTopicConfigTable.unlock();
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.error("createTopicInSendMessageMethod exception", e);
+ }
+
+ if (createNew) {
+ this.brokerController.registerBrokerAll(false, true);
+ }
+
+ return topicConfig;
+ }
+
+ public TopicConfig createTopicInSendMessageBackMethod(
+ final String topic,
+ final int clientDefaultTopicQueueNums,
+ final int perm,
+ final int topicSysFlag) {
+ TopicConfig topicConfig = this.topicConfigTable.get(topic);
+ if (topicConfig != null)
+ return topicConfig;
+
+ boolean createNew = false;
+
+ try {
+ if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ topicConfig = this.topicConfigTable.get(topic);
+ if (topicConfig != null)
+ return topicConfig;
+
+ topicConfig = new TopicConfig(topic);
+ topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);
+ topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);
+ topicConfig.setPerm(perm);
+ topicConfig.setTopicSysFlag(topicSysFlag);
+
+ LOG.info("create new topic {}", topicConfig);
+ this.topicConfigTable.put(topic, topicConfig);
+ createNew = true;
+ this.dataVersion.nextVersion();
+ this.persist();
+ } finally {
+ this.lockTopicConfigTable.unlock();
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.error("createTopicInSendMessageBackMethod exception", e);
+ }
+
+ if (createNew) {
+ this.brokerController.registerBrokerAll(false, true);
+ }
+
+ return topicConfig;
+ }
+
+ public void updateTopicUnitFlag(final String topic, final boolean unit) {
+
+ TopicConfig topicConfig = this.topicConfigTable.get(topic);
+ if (topicConfig != null) {
+ int oldTopicSysFlag = topicConfig.getTopicSysFlag();
+ if (unit) {
+ topicConfig.setTopicSysFlag(TopicSysFlag.setUnitFlag(oldTopicSysFlag));
+ } else {
+ topicConfig.setTopicSysFlag(TopicSysFlag.clearUnitFlag(oldTopicSysFlag));
+ }
+
+ LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
+ topicConfig.getTopicSysFlag());
+
+ this.topicConfigTable.put(topic, topicConfig);
+
+ this.dataVersion.nextVersion();
+
+ this.persist();
+ this.brokerController.registerBrokerAll(false, true);
+ }
+ }
+
+ public void updateTopicUnitSubFlag(final String topic, final boolean hasUnitSub) {
+ TopicConfig topicConfig = this.topicConfigTable.get(topic);
+ if (topicConfig != null) {
+ int oldTopicSysFlag = topicConfig.getTopicSysFlag();
+ if (hasUnitSub) {
+ topicConfig.setTopicSysFlag(TopicSysFlag.setUnitSubFlag(oldTopicSysFlag));
+ }
+
+ LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
+ topicConfig.getTopicSysFlag());
+
+ this.topicConfigTable.put(topic, topicConfig);
+
+ this.dataVersion.nextVersion();
+
+ this.persist();
+ this.brokerController.registerBrokerAll(false, true);
+ }
+ }
+
+ public void updateTopicConfig(final TopicConfig topicConfig) {
+ TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ if (old != null) {
+ LOG.info("update topic config, old: " + old + " new: " + topicConfig);
+ } else {
+ LOG.info("create new topic, " + topicConfig);
+ }
+
+ this.dataVersion.nextVersion();
+
+ this.persist();
+ }
+
+
+ public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) {
+
+ if (orderKVTableFromNs != null && orderKVTableFromNs.getTable() != null) {
+ boolean isChange = false;
+ Set<String> orderTopics = orderKVTableFromNs.getTable().keySet();
+ for (String topic : orderTopics) {
+ TopicConfig topicConfig = this.topicConfigTable.get(topic);
+ if (topicConfig != null && !topicConfig.isOrder()) {
+ topicConfig.setOrder(true);
+ isChange = true;
+ LOG.info("update order topic config, topic={}, order={}", topic, true);
+ }
+ }
+
+ for (Map.Entry<String, TopicConfig> entry : this.topicConfigTable.entrySet()) {
+ String topic = entry.getKey();
+ if (!orderTopics.contains(topic)) {
+ TopicConfig topicConfig = entry.getValue();
+ if (topicConfig.isOrder()) {
+ topicConfig.setOrder(false);
+ isChange = true;
+ LOG.info("update order topic config, topic={}, order={}", topic, false);
+ }
+ }
+ }
+
+ if (isChange) {
+ this.dataVersion.nextVersion();
+ this.persist();
+ }
+ }
+ }
+
+ public boolean isOrderTopic(final String topic) {
+ TopicConfig topicConfig = this.topicConfigTable.get(topic);
+ if (topicConfig == null) {
+ return false;
+ } else {
+ return topicConfig.isOrder();
+ }
+ }
+
+ public void deleteTopicConfig(final String topic) {
+ TopicConfig old = this.topicConfigTable.remove(topic);
+ if (old != null) {
+ LOG.info("delete topic config OK, topic: " + old);
+ this.dataVersion.nextVersion();
+ this.persist();
+ } else {
+ LOG.warn("delete topic config failed, topic: " + topic + " not exist");
+ }
+ }
+
+ public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() {
+ TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+ topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
+ topicConfigSerializeWrapper.setDataVersion(this.dataVersion);
+ return topicConfigSerializeWrapper;
+ }
+
+ @Override
+ public String encode() {
+ return encode(false);
+ }
+
+ @Override
+ public String configFilePath() {
+// return BrokerPathConfigHelper.getTopicConfigPath(this.brokerController.getMessageStoreConfig()
+// .getStorePathRootDir());
+ return BrokerPathConfigHelper.getTopicConfigPath(System.getProperty("user.home") + File.separator + "store");
+ }
+
+ @Override
+ public void decode(String jsonString) {
+ if (jsonString != null) {
+ TopicConfigSerializeWrapper topicConfigSerializeWrapper =
+ TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class);
+ if (topicConfigSerializeWrapper != null) {
+ this.topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable());
+ this.dataVersion.assignNewOne(topicConfigSerializeWrapper.getDataVersion());
+ this.printLoadDataWhenFirstBoot(topicConfigSerializeWrapper);
+ }
+ }
+ }
+
+ public String encode(final boolean prettyFormat) {
+ TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+ topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
+ topicConfigSerializeWrapper.setDataVersion(this.dataVersion);
+ return topicConfigSerializeWrapper.toJson(prettyFormat);
+ }
+
+ private void printLoadDataWhenFirstBoot(final TopicConfigSerializeWrapper tcs) {
+ Iterator<Entry<String, TopicConfig>> it = tcs.getTopicConfigTable().entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, TopicConfig> next = it.next();
+ LOG.info("load exist local topic, {}", next.getValue().toString());
+ }
+ }
+
+ public DataVersion getDataVersion() {
+ return dataVersion;
+ }
+
+ public ConcurrentHashMap<String, TopicConfig> getTopicConfigTable() {
+ return topicConfigTable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionRecord.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionRecord.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionRecord.java
new file mode 100644
index 0000000..4328cf8
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionRecord.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.broker.transaction;
+
+public class TransactionRecord {
+ // Commit Log Offset
+ private long offset;
+ private String producerGroup;
+
+
+ public long getOffset() {
+ return offset;
+ }
+
+
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+
+
+ public String getProducerGroup() {
+ return producerGroup;
+ }
+
+
+ public void setProducerGroup(String producerGroup) {
+ this.producerGroup = producerGroup;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionStore.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionStore.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionStore.java
new file mode 100644
index 0000000..9d977ab
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionStore.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.broker.transaction;
+
+import java.util.List;
+
+
+public interface TransactionStore {
+ public boolean open();
+
+
+ public void close();
+
+
+ public boolean put(final List<TransactionRecord> trs);
+
+
+ public void remove(final List<Long> pks);
+
+
+ public List<TransactionRecord> traverse(final long pk, final int nums);
+
+
+ public long totalRecords();
+
+
+ public long minPK();
+
+
+ public long maxPK();
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java
new file mode 100644
index 0000000..47de33b
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.broker.transaction.jdbc;
+
+import com.alibaba.rocketmq.broker.transaction.TransactionRecord;
+import com.alibaba.rocketmq.broker.transaction.TransactionStore;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.sql.*;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public class JDBCTransactionStore implements TransactionStore {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
+ private final JDBCTransactionStoreConfig jdbcTransactionStoreConfig;
+ private Connection connection;
+ private AtomicLong totalRecordsValue = new AtomicLong(0);
+
+ public JDBCTransactionStore(JDBCTransactionStoreConfig jdbcTransactionStoreConfig) {
+ this.jdbcTransactionStoreConfig = jdbcTransactionStoreConfig;
+ }
+
+ @Override
+ public boolean open() {
+ if (this.loadDriver()) {
+ Properties props = new Properties();
+ props.put("user", jdbcTransactionStoreConfig.getJdbcUser());
+ props.put("password", jdbcTransactionStoreConfig.getJdbcPassword());
+
+ try {
+ this.connection =
+ DriverManager.getConnection(this.jdbcTransactionStoreConfig.getJdbcURL(), props);
+
+ this.connection.setAutoCommit(false);
+
+
+ if (!this.computeTotalRecords()) {
+ return this.createDB();
+ }
+
+ return true;
+ } catch (SQLException e) {
+ log.info("Create JDBC Connection Exeption", e);
+ }
+ }
+
+ return false;
+ }
+
+ private boolean loadDriver() {
+ try {
+ Class.forName(this.jdbcTransactionStoreConfig.getJdbcDriverClass()).newInstance();
+ log.info("Loaded the appropriate driver, {}",
+ this.jdbcTransactionStoreConfig.getJdbcDriverClass());
+ return true;
+ } catch (Exception e) {
+ log.info("Loaded the appropriate driver Exception", e);
+ }
+
+ return false;
+ }
+
+ private boolean computeTotalRecords() {
+ Statement statement = null;
+ ResultSet resultSet = null;
+ try {
+ statement = this.connection.createStatement();
+
+ resultSet = statement.executeQuery("select count(offset) as total from t_transaction");
+ if (!resultSet.next()) {
+ log.warn("computeTotalRecords ResultSet is empty");
+ return false;
+ }
+
+ this.totalRecordsValue.set(resultSet.getLong(1));
+ } catch (Exception e) {
+ log.warn("computeTotalRecords Exception", e);
+ return false;
+ } finally {
+ if (null != statement) {
+ try {
+ statement.close();
+ } catch (SQLException e) {
+ }
+ }
+
+ if (null != resultSet) {
+ try {
+ resultSet.close();
+ } catch (SQLException e) {
+ }
+ }
+ }
+
+ return true;
+ }
+
+ private boolean createDB() {
+ Statement statement = null;
+ try {
+ statement = this.connection.createStatement();
+
+ String sql = this.createTableSql();
+ log.info("createDB SQL:\n {}", sql);
+ statement.execute(sql);
+ this.connection.commit();
+ return true;
+ } catch (Exception e) {
+ log.warn("createDB Exception", e);
+ return false;
+ } finally {
+ if (null != statement) {
+ try {
+ statement.close();
+ } catch (SQLException e) {
+ log.warn("Close statement exception", e);
+ }
+ }
+ }
+ }
+
+ private String createTableSql() {
+ URL resource = JDBCTransactionStore.class.getClassLoader().getResource("transaction.sql");
+ String fileContent = MixAll.file2String(resource);
+ return fileContent;
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (this.connection != null) {
+ this.connection.close();
+ }
+ } catch (SQLException e) {
+ }
+ }
+
+ @Override
+ public boolean put(List<TransactionRecord> trs) {
+ PreparedStatement statement = null;
+ try {
+ this.connection.setAutoCommit(false);
+ statement = this.connection.prepareStatement("insert into t_transaction values (?, ?)");
+ for (TransactionRecord tr : trs) {
+ statement.setLong(1, tr.getOffset());
+ statement.setString(2, tr.getProducerGroup());
+ statement.addBatch();
+ }
+ int[] executeBatch = statement.executeBatch();
+ this.connection.commit();
+ this.totalRecordsValue.addAndGet(updatedRows(executeBatch));
+ return true;
+ } catch (Exception e) {
+ log.warn("createDB Exception", e);
+ return false;
+ } finally {
+ if (null != statement) {
+ try {
+ statement.close();
+ } catch (SQLException e) {
+ log.warn("Close statement exception", e);
+ }
+ }
+ }
+ }
+
+ private long updatedRows(int[] rows) {
+ long res = 0;
+ for (int i : rows) {
+ res += i;
+ }
+
+ return res;
+ }
+
+ @Override
+ public void remove(List<Long> pks) {
+ PreparedStatement statement = null;
+ try {
+ this.connection.setAutoCommit(false);
+ statement = this.connection.prepareStatement("DELETE FROM t_transaction WHERE offset = ?");
+ for (long pk : pks) {
+ statement.setLong(1, pk);
+ statement.addBatch();
+ }
+ int[] executeBatch = statement.executeBatch();
+ this.connection.commit();
+ } catch (Exception e) {
+ log.warn("createDB Exception", e);
+ } finally {
+ if (null != statement) {
+ try {
+ statement.close();
+ } catch (SQLException e) {
+ }
+ }
+ }
+ }
+
+ @Override
+ public List<TransactionRecord> traverse(long pk, int nums) {
+ return null;
+ }
+
+ @Override
+ public long totalRecords() {
+ return this.totalRecordsValue.get();
+ }
+
+ @Override
+ public long minPK() {
+ return 0;
+ }
+
+ @Override
+ public long maxPK() {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java
new file mode 100644
index 0000000..1244cfc
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.broker.transaction.jdbc;
+
+public class JDBCTransactionStoreConfig {
+ private String jdbcDriverClass = "com.mysql.jdbc.Driver";
+ private String jdbcURL = "jdbc:mysql://xxx.xxx.xxx.xxx:1000/xxx?useUnicode=true&characterEncoding=UTF-8";
+ private String jdbcUser = "xxx";
+ private String jdbcPassword = "xxx";
+
+
+ public String getJdbcDriverClass() {
+ return jdbcDriverClass;
+ }
+
+
+ public void setJdbcDriverClass(String jdbcDriverClass) {
+ this.jdbcDriverClass = jdbcDriverClass;
+ }
+
+
+ public String getJdbcURL() {
+ return jdbcURL;
+ }
+
+
+ public void setJdbcURL(String jdbcURL) {
+ this.jdbcURL = jdbcURL;
+ }
+
+
+ public String getJdbcUser() {
+ return jdbcUser;
+ }
+
+
+ public void setJdbcUser(String jdbcUser) {
+ this.jdbcUser = jdbcUser;
+ }
+
+
+ public String getJdbcPassword() {
+ return jdbcPassword;
+ }
+
+
+ public void setJdbcPassword(String jdbcPassword) {
+ this.jdbcPassword = jdbcPassword;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/resources/transaction.sql
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/resources/transaction.sql b/rocketmq-broker/src/main/resources/transaction.sql
new file mode 100644
index 0000000..aaefe43
--- /dev/null
+++ b/rocketmq-broker/src/main/resources/transaction.sql
@@ -0,0 +1,4 @@
+CREATE TABLE t_transaction(
+ offset NUMERIC(20) PRIMARY KEY,
+ producerGroup VARCHAR(64)
+)
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java
new file mode 100644
index 0000000..34ebfa5
--- /dev/null
+++ b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.broker.api;
+
+import com.alibaba.rocketmq.broker.BrokerController;
+import com.alibaba.rocketmq.client.hook.SendMessageContext;
+import com.alibaba.rocketmq.client.impl.CommunicationMode;
+import com.alibaba.rocketmq.client.impl.MQClientAPIImpl;
+import com.alibaba.rocketmq.client.producer.SendResult;
+import com.alibaba.rocketmq.common.BrokerConfig;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.common.message.MessageDecoder;
+import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
+import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
+import com.alibaba.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Test;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SendMessageTest {
+ @Test
+ public void test_sendMessage() throws Exception {
+ BrokerController brokerController = new BrokerController(//
+ new BrokerConfig(), //
+ new NettyServerConfig(), //
+ new NettyClientConfig(), //
+ new MessageStoreConfig());
+ boolean initResult = brokerController.initialize();
+ System.out.println("initialize " + initResult);
+
+ brokerController.start();
+
+ MQClientAPIImpl client = new MQClientAPIImpl(new NettyClientConfig(), null, null, null);
+ client.start();
+
+ for (int i = 0; i < 100; i++) {
+ String topic = "UnitTestTopic_" + i % 3;
+ Message msg = new Message(topic, "TAG1 TAG2", "100200300", ("Hello, Nice world\t" + i).getBytes());
+ msg.setDelayTimeLevel(i % 3 + 1);
+
+ try {
+ SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
+ requestHeader.setProducerGroup("abc");
+ requestHeader.setTopic(msg.getTopic());
+ requestHeader.setDefaultTopic(MixAll.DEFAULT_TOPIC);
+ requestHeader.setDefaultTopicQueueNums(4);
+ requestHeader.setQueueId(i % 4);
+ requestHeader.setSysFlag(0);
+ requestHeader.setBornTimestamp(System.currentTimeMillis());
+ requestHeader.setFlag(msg.getFlag());
+ requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
+
+ SendResult result = client.sendMessage("127.0.0.1:10911", "brokerName", msg, requestHeader, 1000 * 5,
+ CommunicationMode.SYNC, new SendMessageContext(), null);
+ System.out.println(i + "\t" + result);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ client.shutdown();
+
+ brokerController.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
new file mode 100644
index 0000000..55844eb
--- /dev/null
+++ b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: ConsumerOffsetManagerTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.broker.offset;
+
+import com.alibaba.rocketmq.broker.BrokerController;
+import com.alibaba.rocketmq.common.BrokerConfig;
+import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
+import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
+import com.alibaba.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Test;
+
+import java.util.Random;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumerOffsetManagerTest {
+ @Test
+ public void test_flushConsumerOffset() throws Exception {
+ BrokerController brokerController = new BrokerController(//
+ new BrokerConfig(), //
+ new NettyServerConfig(), //
+ new NettyClientConfig(), //
+ new MessageStoreConfig());
+ boolean initResult = brokerController.initialize();
+ System.out.println("initialize " + initResult);
+ brokerController.start();
+
+ ConsumerOffsetManager consumerOffsetManager = new ConsumerOffsetManager(brokerController);
+
+ Random random = new Random();
+
+ for (int i = 0; i < 100; i++) {
+ String group = "DIANPU_GROUP_" + i;
+ for (int id = 0; id < 16; id++) {
+ consumerOffsetManager.commitOffset(null, group, "TOPIC_A", id,
+ random.nextLong() % 1024 * 1024 * 1024);
+ consumerOffsetManager.commitOffset(null, group, "TOPIC_B", id,
+ random.nextLong() % 1024 * 1024 * 1024);
+ consumerOffsetManager.commitOffset(null, group, "TOPIC_C", id,
+ random.nextLong() % 1024 * 1024 * 1024);
+ }
+ }
+
+ consumerOffsetManager.persist();
+
+ brokerController.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java
new file mode 100644
index 0000000..9edd02e
--- /dev/null
+++ b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: TopicConfigManagerTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.broker.topic;
+
+import com.alibaba.rocketmq.broker.BrokerController;
+import com.alibaba.rocketmq.common.BrokerConfig;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.TopicConfig;
+import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
+import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
+import com.alibaba.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class TopicConfigManagerTest {
+ @Test
+ public void test_flushTopicConfig() throws Exception {
+ BrokerController brokerController = new BrokerController(//
+ new BrokerConfig(), //
+ new NettyServerConfig(), //
+ new NettyClientConfig(), //
+ new MessageStoreConfig());
+ boolean initResult = brokerController.initialize();
+ System.out.println("initialize " + initResult);
+ brokerController.start();
+
+ TopicConfigManager topicConfigManager = new TopicConfigManager(brokerController);
+
+ TopicConfig topicConfig =
+ topicConfigManager.createTopicInSendMessageMethod("TestTopic_SEND", MixAll.DEFAULT_TOPIC,
+ null, 4, 0);
+ assertTrue(topicConfig != null);
+
+ System.out.println(topicConfig);
+
+ for (int i = 0; i < 10; i++) {
+ String topic = "UNITTEST-" + i;
+ topicConfig =
+ topicConfigManager
+ .createTopicInSendMessageMethod(topic, MixAll.DEFAULT_TOPIC, null, 4, 0);
+ assertTrue(topicConfig != null);
+ }
+
+ topicConfigManager.persist();
+
+ brokerController.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/deploy.bat
----------------------------------------------------------------------
diff --git a/rocketmq-client/deploy.bat b/rocketmq-client/deploy.bat
new file mode 100644
index 0000000..f778070
--- /dev/null
+++ b/rocketmq-client/deploy.bat
@@ -0,0 +1 @@
+mvn -Dmaven.test.skip=true deploy -Pclient-shade
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/install.bat
----------------------------------------------------------------------
diff --git a/rocketmq-client/install.bat b/rocketmq-client/install.bat
new file mode 100644
index 0000000..87bf456
--- /dev/null
+++ b/rocketmq-client/install.bat
@@ -0,0 +1,2 @@
+mvn -Dmaven.test.skip=true clean package install -Pclient-shade -U
+
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-client/pom.xml b/rocketmq-client/pom.xml
new file mode 100644
index 0000000..63a6114
--- /dev/null
+++ b/rocketmq-client/pom.xml
@@ -0,0 +1,97 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain producerGroup copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>com.alibaba.rocketmq</groupId>
+ <artifactId>rocketmq-all</artifactId>
+ <version>4.0.0-SNAPSHOT</version>
+ </parent>
+
+
+ <modelVersion>4.0.0</modelVersion>
+ <packaging>jar</packaging>
+ <artifactId>rocketmq-client</artifactId>
+ <name>rocketmq-client ${project.version}</name>
+
+ <profiles>
+ <profile>
+ <id>client-shade</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.4.3</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <keepDependenciesWithProvidedScope>false</keepDependenciesWithProvidedScope>
+ <promoteTransitiveDependencies>false</promoteTransitiveDependencies>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ <minimizeJar>false</minimizeJar>
+ <createSourcesJar>true</createSourcesJar>
+ <artifactSet>
+ <includes>
+ <include>com.alibaba:fastjson</include>
+ <include>io.netty:netty-all</include>
+ <include>com.alibaba.rocketmq:rocketmq-client</include>
+ <include>com.alibaba.rocketmq:rocketmq-common</include>
+ <include>com.alibaba.rocketmq:rocketmq-remoting</include>
+ </includes>
+ </artifactSet>
+ <relocations>
+ <relocation>
+ <pattern>io.netty</pattern>
+ <shadedPattern>com.alibaba.rocketmq.shade.io.netty</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.alibaba.fastjson</pattern>
+ <shadedPattern>com.alibaba.rocketmq.shade.com.alibaba.fastjson</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/ClientConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/ClientConfig.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/ClientConfig.java
new file mode 100644
index 0000000..4d80564
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/ClientConfig.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client;
+
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+
+
+/**
+ * Client Common configuration
+ *
+ * @author shijia.wxr
+ * @author vongosling
+ */
+public class ClientConfig {
+ public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel";
+ private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
+ private String clientIP = RemotingUtil.getLocalAddress();
+ private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
+ private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
+ /**
+ * Pulling topic information interval from the named server
+ */
+ private int pollNameServerInteval = 1000 * 30;
+ /**
+ * Heartbeat interval in microseconds with message broker
+ */
+ private int heartbeatBrokerInterval = 1000 * 30;
+ /**
+ * Offset persistent interval for consumer
+ */
+ private int persistConsumerOffsetInterval = 1000 * 5;
+ private boolean unitMode = false;
+ private String unitName;
+ private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
+
+ public String buildMQClientId() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(this.getClientIP());
+
+ sb.append("@");
+ sb.append(this.getInstanceName());
+ if (!UtilAll.isBlank(this.unitName)) {
+ sb.append("@");
+ sb.append(this.unitName);
+ }
+
+ return sb.toString();
+ }
+
+ public String getClientIP() {
+ return clientIP;
+ }
+
+ public void setClientIP(String clientIP) {
+ this.clientIP = clientIP;
+ }
+
+ public String getInstanceName() {
+ return instanceName;
+ }
+
+ public void setInstanceName(String instanceName) {
+ this.instanceName = instanceName;
+ }
+
+ public void changeInstanceNameToPID() {
+ if (this.instanceName.equals("DEFAULT")) {
+ this.instanceName = String.valueOf(UtilAll.getPid());
+ }
+ }
+
+ public void resetClientConfig(final ClientConfig cc) {
+ this.namesrvAddr = cc.namesrvAddr;
+ this.clientIP = cc.clientIP;
+ this.instanceName = cc.instanceName;
+ this.clientCallbackExecutorThreads = cc.clientCallbackExecutorThreads;
+ this.pollNameServerInteval = cc.pollNameServerInteval;
+ this.heartbeatBrokerInterval = cc.heartbeatBrokerInterval;
+ this.persistConsumerOffsetInterval = cc.persistConsumerOffsetInterval;
+ this.unitMode = cc.unitMode;
+ this.unitName = cc.unitName;
+ this.vipChannelEnabled = cc.vipChannelEnabled;
+ }
+
+ public ClientConfig cloneClientConfig() {
+ ClientConfig cc = new ClientConfig();
+ cc.namesrvAddr = namesrvAddr;
+ cc.clientIP = clientIP;
+ cc.instanceName = instanceName;
+ cc.clientCallbackExecutorThreads = clientCallbackExecutorThreads;
+ cc.pollNameServerInteval = pollNameServerInteval;
+ cc.heartbeatBrokerInterval = heartbeatBrokerInterval;
+ cc.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
+ cc.unitMode = unitMode;
+ cc.unitName = unitName;
+ cc.vipChannelEnabled = vipChannelEnabled;
+ return cc;
+ }
+
+ public String getNamesrvAddr() {
+ return namesrvAddr;
+ }
+
+ public void setNamesrvAddr(String namesrvAddr) {
+ this.namesrvAddr = namesrvAddr;
+ }
+
+ public int getClientCallbackExecutorThreads() {
+ return clientCallbackExecutorThreads;
+ }
+
+
+ public void setClientCallbackExecutorThreads(int clientCallbackExecutorThreads) {
+ this.clientCallbackExecutorThreads = clientCallbackExecutorThreads;
+ }
+
+
+ public int getPollNameServerInteval() {
+ return pollNameServerInteval;
+ }
+
+
+ public void setPollNameServerInteval(int pollNameServerInteval) {
+ this.pollNameServerInteval = pollNameServerInteval;
+ }
+
+
+ public int getHeartbeatBrokerInterval() {
+ return heartbeatBrokerInterval;
+ }
+
+
+ public void setHeartbeatBrokerInterval(int heartbeatBrokerInterval) {
+ this.heartbeatBrokerInterval = heartbeatBrokerInterval;
+ }
+
+
+ public int getPersistConsumerOffsetInterval() {
+ return persistConsumerOffsetInterval;
+ }
+
+
+ public void setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval) {
+ this.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
+ }
+
+
+ public String getUnitName() {
+ return unitName;
+ }
+
+
+ public void setUnitName(String unitName) {
+ this.unitName = unitName;
+ }
+
+
+ public boolean isUnitMode() {
+ return unitMode;
+ }
+
+
+ public void setUnitMode(boolean unitMode) {
+ this.unitMode = unitMode;
+ }
+
+
+ public boolean isVipChannelEnabled() {
+ return vipChannelEnabled;
+ }
+
+
+ public void setVipChannelEnabled(final boolean vipChannelEnabled) {
+ this.vipChannelEnabled = vipChannelEnabled;
+ }
+
+
+ @Override
+ public String toString() {
+ return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
+ + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInteval=" + pollNameServerInteval
+ + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
+ + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
+ + vipChannelEnabled + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/MQAdmin.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/MQAdmin.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/MQAdmin.java
new file mode 100644
index 0000000..4e202e9
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/MQAdmin.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client;
+
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.remoting.exception.RemotingException;
+
+
+/**
+ * Base interface for MQ management
+ *
+ * @author shijia.wxr
+ */
+public interface MQAdmin {
+ /**
+ * Creates an topic
+ *
+ * @param key
+ * accesskey
+ * @param newTopic
+ * topic name
+ * @param queueNum
+ * topic's queue number
+ *
+ * @throws MQClientException
+ */
+ void createTopic(final String key, final String newTopic, final int queueNum)
+ throws MQClientException;
+
+
+ /**
+ * Creates an topic
+ *
+ * @param key
+ * accesskey
+ * @param newTopic
+ * topic name
+ * @param queueNum
+ * topic's queue number
+ * @param topicSysFlag
+ * topic system flag
+ *
+ * @throws MQClientException
+ */
+ void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
+ throws MQClientException;
+
+
+ /**
+ * Gets the message queue offset according to some time in milliseconds<br>
+ * be cautious to call because of more IO overhead
+ *
+ * @param mq
+ * Instance of MessageQueue
+ * @param timestamp
+ * from when in milliseconds.
+ *
+ * @return offset
+ *
+ * @throws MQClientException
+ */
+ long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;
+
+
+ /**
+ * Gets the max offset
+ *
+ * @param mq
+ * Instance of MessageQueue
+ *
+ * @return the max offset
+ *
+ * @throws MQClientException
+ */
+ long maxOffset(final MessageQueue mq) throws MQClientException;
+
+
+ /**
+ * Gets the minimum offset
+ *
+ * @param mq
+ * Instance of MessageQueue
+ *
+ * @return the minimum offset
+ *
+ * @throws MQClientException
+ */
+ long minOffset(final MessageQueue mq) throws MQClientException;
+
+
+ /**
+ * Gets the earliest stored message time
+ *
+ * @param mq
+ * Instance of MessageQueue
+ *
+ * @return the time in microseconds
+ *
+ * @throws MQClientException
+ */
+ long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;
+
+
+ /**
+ * Query message according tto message id
+ *
+ * @param offsetMsgId
+ * message id
+ *
+ * @return message
+ *
+ * @throws InterruptedException
+ * @throws MQBrokerException
+ * @throws RemotingException
+ * @throws MQClientException
+ */
+ MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException,
+ InterruptedException, MQClientException;
+
+
+ /**
+ * Query messages
+ *
+ * @param topic
+ * message topic
+ * @param key
+ * message key index word
+ * @param maxNum
+ * max message number
+ * @param begin
+ * from when
+ * @param end
+ * to when
+ *
+ * @return Instance of QueryResult
+ *
+ * @throws MQClientException
+ * @throws InterruptedException
+ */
+ QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
+ final long end) throws MQClientException, InterruptedException;
+
+ /**
+
+ * @param topic
+ * @param msgId
+ * @return The {@code MessageExt} of given msgId
+ * @throws RemotingException
+ * @throws MQBrokerException
+ * @throws InterruptedException
+ * @throws MQClientException
+ */
+ MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/MQHelper.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/MQHelper.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/MQHelper.java
new file mode 100644
index 0000000..5934b49
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/MQHelper.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client;
+
+import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
+import com.alibaba.rocketmq.client.log.ClientLogger;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.slf4j.Logger;
+
+import java.util.Set;
+import java.util.TreeSet;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MQHelper {
+ public static void resetOffsetByTimestamp(
+ final MessageModel messageModel,
+ final String consumerGroup,
+ final String topic,
+ final long timestamp) throws Exception {
+ resetOffsetByTimestamp(messageModel, "DEFAULT", consumerGroup, topic, timestamp);
+ }
+
+ /**
+ * Reset consumer topic offset according to time
+ *
+ * @param messageModel
+ * which model
+ * @param instanceName
+ * which instance
+ * @param consumerGroup
+ * consumer group
+ * @param topic
+ * topic
+ * @param timestamp
+ * time
+ *
+ * @throws Exception
+ */
+ public static void resetOffsetByTimestamp(
+ final MessageModel messageModel,
+ final String instanceName,
+ final String consumerGroup,
+ final String topic,
+ final long timestamp) throws Exception {
+ final Logger log = ClientLogger.getLog();
+
+ DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(consumerGroup);
+ consumer.setInstanceName(instanceName);
+ consumer.setMessageModel(messageModel);
+ consumer.start();
+
+ Set<MessageQueue> mqs = null;
+ try {
+ mqs = consumer.fetchSubscribeMessageQueues(topic);
+ if (mqs != null && !mqs.isEmpty()) {
+ TreeSet<MessageQueue> mqsNew = new TreeSet<MessageQueue>(mqs);
+ for (MessageQueue mq : mqsNew) {
+ long offset = consumer.searchOffset(mq, timestamp);
+ if (offset >= 0) {
+ consumer.updateConsumeOffset(mq, offset);
+ log.info("resetOffsetByTimestamp updateConsumeOffset success, {} {} {}",
+ consumerGroup, offset, mq);
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.warn("resetOffsetByTimestamp Exception", e);
+ throw e;
+ } finally {
+ if (mqs != null) {
+ consumer.getDefaultMQPullConsumerImpl().getOffsetStore().persistAll(mqs);
+ }
+ consumer.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/QueryResult.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/QueryResult.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/QueryResult.java
new file mode 100644
index 0000000..43c8106
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/QueryResult.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client;
+
+import com.alibaba.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class QueryResult {
+ private final long indexLastUpdateTimestamp;
+ private final List<MessageExt> messageList;
+
+
+ public QueryResult(long indexLastUpdateTimestamp, List<MessageExt> messageList) {
+ this.indexLastUpdateTimestamp = indexLastUpdateTimestamp;
+ this.messageList = messageList;
+ }
+
+
+ public long getIndexLastUpdateTimestamp() {
+ return indexLastUpdateTimestamp;
+ }
+
+
+ public List<MessageExt> getMessageList() {
+ return messageList;
+ }
+
+
+ @Override
+ public String toString() {
+ return "QueryResult [indexLastUpdateTimestamp=" + indexLastUpdateTimestamp + ", messageList="
+ + messageList + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/Validators.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/Validators.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/Validators.java
new file mode 100644
index 0000000..203aae0
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/Validators.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.client;
+
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.common.protocol.ResponseCode;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * Common Validator
+ *
+ * @author manhong.yqd
+ */
+public class Validators {
+ public static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$";
+ public static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR);
+ public static final int CHARACTER_MAX_LENGTH = 255;
+
+ /**
+ * @param origin
+ * @param patternStr
+ *
+ * @return The resulting {@code String}
+ */
+ public static String getGroupWithRegularExpression(String origin, String patternStr) {
+ Pattern pattern = Pattern.compile(patternStr);
+ Matcher matcher = pattern.matcher(origin);
+ while (matcher.find()) {
+ return matcher.group(0);
+ }
+ return null;
+ }
+
+ /**
+ * Validate group
+ *
+ * @param group
+ *
+ * @throws com.alibaba.rocketmq.client.exception.MQClientException
+ */
+ public static void checkGroup(String group) throws MQClientException {
+ if (UtilAll.isBlank(group)) {
+ throw new MQClientException("the specified group is blank", null);
+ }
+ if (!regularExpressionMatcher(group, PATTERN)) {
+ throw new MQClientException(String.format(
+ "the specified group[%s] contains illegal characters, allowing only %s", group,
+ VALID_PATTERN_STR), null);
+ }
+ if (group.length() > CHARACTER_MAX_LENGTH) {
+ throw new MQClientException("the specified group is longer than group max length 255.", null);
+ }
+ }
+
+ /**
+ * @param origin
+ * @param pattern
+ *
+ * @return <tt>true</tt> if, and only if, the entire origin sequence
+ * matches this matcher's pattern
+ */
+ public static boolean regularExpressionMatcher(String origin, Pattern pattern) {
+ if (pattern == null) {
+ return true;
+ }
+ Matcher matcher = pattern.matcher(origin);
+ return matcher.matches();
+ }
+
+ /**
+ * Validate message
+ *
+ * @param msg
+ * @param defaultMQProducer
+ *
+ * @throws com.alibaba.rocketmq.client.exception.MQClientException
+ */
+ public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
+ throws MQClientException {
+ if (null == msg) {
+ throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
+ }
+ // topic
+ Validators.checkTopic(msg.getTopic());
+ // body
+ if (null == msg.getBody()) {
+ throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
+ }
+
+ if (0 == msg.getBody().length) {
+ throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
+ }
+
+ if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
+ throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
+ "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
+ }
+ }
+
+ /**
+ * Validate topic
+ *
+ * @param topic
+ *
+ * @throws com.alibaba.rocketmq.client.exception.MQClientException
+ */
+ public static void checkTopic(String topic) throws MQClientException {
+ if (UtilAll.isBlank(topic)) {
+ throw new MQClientException("the specified topic is blank", null);
+ }
+
+ if (!regularExpressionMatcher(topic, PATTERN)) {
+ throw new MQClientException(String.format(
+ "the specified topic[%s] contains illegal characters, allowing only %s", topic,
+ VALID_PATTERN_STR), null);
+ }
+
+ if (topic.length() > CHARACTER_MAX_LENGTH) {
+ throw new MQClientException("the specified topic is longer than topic max length 255.", null);
+ }
+
+ //whether the same with system reserved keyword
+ if (topic.equals(MixAll.DEFAULT_TOPIC)) {
+ throw new MQClientException(
+ String.format("the topic[%s] is conflict with default topic.", topic), null);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/admin/MQAdminExtInner.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/admin/MQAdminExtInner.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/admin/MQAdminExtInner.java
new file mode 100644
index 0000000..071a872
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/admin/MQAdminExtInner.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.admin;
+
+/**
+ * @author shijia.wxr
+ */
+public interface MQAdminExtInner {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/common/ClientErrorCode.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/common/ClientErrorCode.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/common/ClientErrorCode.java
new file mode 100644
index 0000000..88d0eea
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/common/ClientErrorCode.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.client.common;
+
+public class ClientErrorCode {
+ public static final int CONNECT_BROKER_EXCEPTION = 10001;
+ public static final int ACCESS_BROKER_TIMEOUT = 10002;
+ public static final int BROKER_NOT_EXIST_EXCEPTION = 10003;
+ public static final int NO_NAME_SERVER_EXCEPTION = 10004;
+ public static final int NOT_FOUND_TOPIC_EXCEPTION = 10005;
+}
\ No newline at end of file