You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:14:31 UTC
[40/99] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
new file mode 100644
index 0000000..7a0ddae
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -0,0 +1,497 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
+import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
+import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
+import org.apache.rocketmq.common.*;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.help.FAQUrl;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.config.StorePathConfigHelper;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+
+import java.net.SocketAddress;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
+
+ private List<ConsumeMessageHook> consumeMessageHookList;
+
+ public SendMessageProcessor(final BrokerController brokerController) {
+ super(brokerController);
+ }
+
+ @Override
+ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ SendMessageContext mqtraceContext;
+ switch (request.getCode()) {
+ case RequestCode.CONSUMER_SEND_MSG_BACK:
+ return this.consumerSendMsgBack(ctx, request);
+ default:
+ SendMessageRequestHeader requestHeader = parseRequestHeader(request);
+ if (requestHeader == null) {
+ return null;
+ }
+
+ mqtraceContext = buildMsgContext(ctx, requestHeader);
+ this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
+ final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
+
+ this.executeSendMessageHookAfter(response, mqtraceContext);
+ return response;
+ }
+ }
+
+ @Override
+ public boolean rejectRequest() {
+ return this.brokerController.getMessageStore().isOSPageCacheBusy() ||
+ this.brokerController.getMessageStore().isTransientStorePoolDeficient();
+ }
+
+ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
+ throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ final ConsumerSendMsgBackRequestHeader requestHeader =
+ (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
+
+ if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
+
+ ConsumeMessageContext context = new ConsumeMessageContext();
+ context.setConsumerGroup(requestHeader.getGroup());
+ context.setTopic(requestHeader.getOriginTopic());
+ context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);
+ context.setCommercialRcvTimes(1);
+ context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));
+
+ this.executeConsumeMessageHookAfter(context);
+ }
+
+
+ SubscriptionGroupConfig subscriptionGroupConfig =
+ this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
+ if (null == subscriptionGroupConfig) {
+ response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
+ response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
+ + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
+ return response;
+ }
+
+
+ if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
+ response.setCode(ResponseCode.NO_PERMISSION);
+ response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
+ return response;
+ }
+
+
+ if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
+ int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
+
+
+ int topicSysFlag = 0;
+ if (requestHeader.isUnitMode()) {
+ topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
+ }
+
+
+ TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(//
+ newTopic, //
+ subscriptionGroupConfig.getRetryQueueNums(), //
+ PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
+ if (null == topicConfig) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("topic[" + newTopic + "] not exist");
+ return response;
+ }
+
+
+ if (!PermName.isWriteable(topicConfig.getPerm())) {
+ response.setCode(ResponseCode.NO_PERMISSION);
+ response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
+ return response;
+ }
+
+ MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
+ if (null == msgExt) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("look message by offset failed, " + requestHeader.getOffset());
+ return response;
+ }
+
+
+ final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
+ if (null == retryTopic) {
+ MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
+ }
+ msgExt.setWaitStoreMsgOK(false);
+
+
+ int delayLevel = requestHeader.getDelayLevel();
+
+
+ int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
+ if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
+ maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
+ }
+
+
+ if (msgExt.getReconsumeTimes() >= maxReconsumeTimes//
+ || delayLevel < 0) {
+ newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
+ queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
+
+ topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
+ DLQ_NUMS_PER_GROUP, //
+ PermName.PERM_WRITE, 0
+ );
+ if (null == topicConfig) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("topic[" + newTopic + "] not exist");
+ return response;
+ }
+ } else {
+ if (0 == delayLevel) {
+ delayLevel = 3 + msgExt.getReconsumeTimes();
+ }
+
+ msgExt.setDelayTimeLevel(delayLevel);
+ }
+
+ MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+ msgInner.setTopic(newTopic);
+ msgInner.setBody(msgExt.getBody());
+ msgInner.setFlag(msgExt.getFlag());
+ MessageAccessor.setProperties(msgInner, msgExt.getProperties());
+ msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
+ msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
+
+ msgInner.setQueueId(queueIdInt);
+ msgInner.setSysFlag(msgExt.getSysFlag());
+ msgInner.setBornTimestamp(msgExt.getBornTimestamp());
+ msgInner.setBornHost(msgExt.getBornHost());
+ msgInner.setStoreHost(this.getStoreHost());
+ msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
+
+ String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
+ MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
+
+ PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
+ if (putMessageResult != null) {
+ switch (putMessageResult.getPutMessageStatus()) {
+ case PUT_OK:
+ String backTopic = msgExt.getTopic();
+ String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
+ if (correctTopic != null) {
+ backTopic = correctTopic;
+ }
+
+ this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
+
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+
+ return response;
+ default:
+ break;
+ }
+
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(putMessageResult.getPutMessageStatus().name());
+ return response;
+ }
+
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("putMessageResult is null");
+ return response;
+ }
+
+ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
+ final RemotingCommand request, //
+ final SendMessageContext sendMessageContext, //
+ final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
+
+ final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
+ final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
+
+
+ response.setOpaque(request.getOpaque());
+
+ response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
+ response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
+
+ if (log.isDebugEnabled()) {
+ log.debug("receive SendMessage request command, " + request);
+ }
+
+ final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
+ if (this.brokerController.getMessageStore().now() < startTimstamp) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
+ return response;
+ }
+
+ response.setCode(-1);
+ super.msgCheck(ctx, requestHeader, response);
+ if (response.getCode() != -1) {
+ return response;
+ }
+
+ final byte[] body = request.getBody();
+
+ int queueIdInt = requestHeader.getQueueId();
+ TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
+
+ if (queueIdInt < 0) {
+ queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
+ }
+
+ int sysFlag = requestHeader.getSysFlag();
+
+ if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
+ sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
+ }
+
+ String newTopic = requestHeader.getTopic();
+ if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
+ SubscriptionGroupConfig subscriptionGroupConfig =
+ this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
+ if (null == subscriptionGroupConfig) {
+ response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
+ response.setRemark(
+ "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
+ return response;
+ }
+
+
+ int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
+ if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
+ maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
+ }
+ int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
+ if (reconsumeTimes >= maxReconsumeTimes) {
+ newTopic = MixAll.getDLQTopic(groupName);
+ queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
+ topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
+ DLQ_NUMS_PER_GROUP, //
+ PermName.PERM_WRITE, 0
+ );
+ if (null == topicConfig) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("topic[" + newTopic + "] not exist");
+ return response;
+ }
+ }
+ }
+ MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+ msgInner.setTopic(newTopic);
+ msgInner.setBody(body);
+ msgInner.setFlag(requestHeader.getFlag());
+ MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
+ msgInner.setPropertiesString(requestHeader.getProperties());
+ msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
+
+ msgInner.setQueueId(queueIdInt);
+ msgInner.setSysFlag(sysFlag);
+ msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
+ msgInner.setBornHost(ctx.channel().remoteAddress());
+ msgInner.setStoreHost(this.getStoreHost());
+ msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
+
+ if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
+ String traFlag = msgInner.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
+ if (traFlag != null) {
+ response.setCode(ResponseCode.NO_PERMISSION);
+ response.setRemark(
+ "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
+ return response;
+ }
+ }
+
+ PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
+ if (putMessageResult != null) {
+ boolean sendOK = false;
+
+ switch (putMessageResult.getPutMessageStatus()) {
+ // Success
+ case PUT_OK:
+ sendOK = true;
+ response.setCode(ResponseCode.SUCCESS);
+ break;
+ case FLUSH_DISK_TIMEOUT:
+ response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
+ sendOK = true;
+ break;
+ case FLUSH_SLAVE_TIMEOUT:
+ response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
+ sendOK = true;
+ break;
+ case SLAVE_NOT_AVAILABLE:
+ response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
+ sendOK = true;
+ break;
+
+ // Failed
+ case CREATE_MAPEDFILE_FAILED:
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("create mapped file failed, server is busy or broken.");
+ break;
+ case MESSAGE_ILLEGAL:
+ case PROPERTIES_SIZE_EXCEEDED:
+ response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+ response.setRemark(
+ "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
+ break;
+ case SERVICE_NOT_AVAILABLE:
+ response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
+ response.setRemark(
+ "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");
+ break;
+ case OS_PAGECACHE_BUSY:
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
+ break;
+ case UNKNOWN_ERROR:
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("UNKNOWN_ERROR");
+ break;
+ default:
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("UNKNOWN_ERROR DEFAULT");
+ break;
+ }
+
+ String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
+ if (sendOK) {
+
+ this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
+ this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
+ putMessageResult.getAppendMessageResult().getWroteBytes());
+ this.brokerController.getBrokerStatsManager().incBrokerPutNums();
+
+ response.setRemark(null);
+
+ responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
+ responseHeader.setQueueId(queueIdInt);
+ responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
+
+
+ doResponse(ctx, request, response);
+
+
+ if (hasSendMessageHook()) {
+ sendMessageContext.setMsgId(responseHeader.getMsgId());
+ sendMessageContext.setQueueId(responseHeader.getQueueId());
+ sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
+
+ int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
+ int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
+ int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
+
+ sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
+ sendMessageContext.setCommercialSendTimes(incValue);
+ sendMessageContext.setCommercialSendSize(wroteSize);
+ sendMessageContext.setCommercialOwner(owner);
+ }
+ return null;
+ } else {
+ if (hasSendMessageHook()) {
+ int wroteSize = request.getBody().length;
+ int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
+
+ sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
+ sendMessageContext.setCommercialSendTimes(incValue);
+ sendMessageContext.setCommercialSendSize(wroteSize);
+ sendMessageContext.setCommercialOwner(owner);
+ }
+ }
+ } else {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("store putMessage return null");
+ }
+
+ return response;
+ }
+
+ public boolean hasConsumeMessageHook() {
+ return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty();
+ }
+
+ public void executeConsumeMessageHookAfter(final ConsumeMessageContext context) {
+ if (hasConsumeMessageHook()) {
+ for (ConsumeMessageHook hook : this.consumeMessageHookList) {
+ try {
+ hook.consumeMessageAfter(context);
+ } catch (Throwable e) {
+ }
+ }
+ }
+ }
+
+ public SocketAddress getStoreHost() {
+ return storeHost;
+ }
+
+ private String diskUtil() {
+ String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
+ double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
+
+ String storePathLogis =
+ StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
+ double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogis);
+
+ String storePathIndex =
+ StorePathConfigHelper.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
+ double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathIndex);
+
+ return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", physicRatio, logisRatio, indexRatio);
+ }
+
+ public void registerConsumeMessageHook(List<ConsumeMessageHook> consumeMessageHookList) {
+ this.consumeMessageHookList = consumeMessageHookList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
new file mode 100644
index 0000000..45914d7
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.slave;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper;
+import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.store.config.StorePathConfigHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+
+/**
+ * @author shijia.wxr
+ * @author manhong.yqd
+ */
+public class SlaveSynchronize {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private final BrokerController brokerController;
+ private volatile String masterAddr = null;
+
+
+ public SlaveSynchronize(BrokerController brokerController) {
+ this.brokerController = brokerController;
+ }
+
+
+ public String getMasterAddr() {
+ return masterAddr;
+ }
+
+
+ public void setMasterAddr(String masterAddr) {
+ this.masterAddr = masterAddr;
+ }
+
+
+ public void syncAll() {
+ this.syncTopicConfig();
+ this.syncConsumerOffset();
+ this.syncDelayOffset();
+ this.syncSubscriptionGroupConfig();
+ }
+
+
+ private void syncTopicConfig() {
+ String masterAddrBak = this.masterAddr;
+ if (masterAddrBak != null) {
+ try {
+ TopicConfigSerializeWrapper topicWrapper =
+ this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
+ if (!this.brokerController.getTopicConfigManager().getDataVersion()
+ .equals(topicWrapper.getDataVersion())) {
+
+ this.brokerController.getTopicConfigManager().getDataVersion()
+ .assignNewOne(topicWrapper.getDataVersion());
+ this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();
+ this.brokerController.getTopicConfigManager().getTopicConfigTable()
+ .putAll(topicWrapper.getTopicConfigTable());
+ this.brokerController.getTopicConfigManager().persist();
+
+ log.info("update slave topic config from master, {}", masterAddrBak);
+ }
+ } catch (Exception e) {
+ log.error("syncTopicConfig Exception, " + masterAddrBak, e);
+ }
+ }
+ }
+
+
+ private void syncConsumerOffset() {
+ String masterAddrBak = this.masterAddr;
+ if (masterAddrBak != null) {
+ try {
+ ConsumerOffsetSerializeWrapper offsetWrapper =
+ this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
+ this.brokerController.getConsumerOffsetManager().getOffsetTable()
+ .putAll(offsetWrapper.getOffsetTable());
+ this.brokerController.getConsumerOffsetManager().persist();
+ log.info("update slave consumer offset from master, {}", masterAddrBak);
+ } catch (Exception e) {
+ log.error("syncConsumerOffset Exception, " + masterAddrBak, e);
+ }
+ }
+ }
+
+
+ private void syncDelayOffset() {
+ String masterAddrBak = this.masterAddr;
+ if (masterAddrBak != null) {
+ try {
+ String delayOffset =
+ this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
+ if (delayOffset != null) {
+
+ String fileName =
+ StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
+ .getMessageStoreConfig().getStorePathRootDir());
+ try {
+ MixAll.string2File(delayOffset, fileName);
+ } catch (IOException e) {
+ log.error("persist file Exception, " + fileName, e);
+ }
+ }
+ log.info("update slave delay offset from master, {}", masterAddrBak);
+ } catch (Exception e) {
+ log.error("syncDelayOffset Exception, " + masterAddrBak, e);
+ }
+ }
+ }
+
+
+ private void syncSubscriptionGroupConfig() {
+ String masterAddrBak = this.masterAddr;
+ if (masterAddrBak != null) {
+ try {
+ SubscriptionGroupWrapper subscriptionWrapper =
+ this.brokerController.getBrokerOuterAPI()
+ .getAllSubscriptionGroupConfig(masterAddrBak);
+
+ if (!this.brokerController.getSubscriptionGroupManager().getDataVersion()
+ .equals(subscriptionWrapper.getDataVersion())) {
+ SubscriptionGroupManager subscriptionGroupManager =
+ this.brokerController.getSubscriptionGroupManager();
+ subscriptionGroupManager.getDataVersion().assignNewOne(
+ subscriptionWrapper.getDataVersion());
+ subscriptionGroupManager.getSubscriptionGroupTable().clear();
+ subscriptionGroupManager.getSubscriptionGroupTable().putAll(
+ subscriptionWrapper.getSubscriptionGroupTable());
+ subscriptionGroupManager.persist();
+ log.info("update slave Subscription Group from master, {}", masterAddrBak);
+ }
+ } catch (Exception e) {
+ log.error("syncSubscriptionGroup Exception, " + masterAddrBak, e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
new file mode 100644
index 0000000..364d5c8
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.subscription;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.BrokerPathConfigHelper;
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SubscriptionGroupManager extends ConfigManager {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+ private final ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
+ new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);
+ private final DataVersion dataVersion = new DataVersion();
+ private transient BrokerController brokerController;
+
+
+ public SubscriptionGroupManager() {
+ this.init();
+ }
+
+ private void init() {
+ {
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP);
+ this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig);
+ }
+
+ {
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP);
+ this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig);
+ }
+
+ {
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP);
+ this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig);
+ }
+
+ {
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP);
+ subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+ this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig);
+ }
+
+ {
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP);
+ subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+ this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig);
+ }
+
+ {
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP);
+ subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+ this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig);
+ }
+
+ {
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP);
+ subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+ this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig);
+ }
+ }
+
+
+ public SubscriptionGroupManager(BrokerController brokerController) {
+ this.brokerController = brokerController;
+ this.init();
+ }
+
+
+ public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
+ SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);
+ if (old != null) {
+ log.info("update subscription group config, old: " + old + " new: " + config);
+ } else {
+ log.info("create new subscription group, " + config);
+ }
+
+ this.dataVersion.nextVersion();
+
+ this.persist();
+ }
+
+ public void disableConsume(final String groupName) {
+ SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName);
+ if (old != null) {
+ old.setConsumeEnable(false);
+ this.dataVersion.nextVersion();
+ }
+ }
+
+
+ public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {
+ SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);
+ if (null == subscriptionGroupConfig) {
+ if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {
+ subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(group);
+ SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);
+ if (null == preConfig) {
+ log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());
+ }
+ this.dataVersion.nextVersion();
+ this.persist();
+ }
+ }
+
+ return subscriptionGroupConfig;
+ }
+
+
+ @Override
+ public String encode() {
+ return this.encode(false);
+ }
+
+ @Override
+ public String configFilePath() {
+ //return BrokerPathConfigHelper.getSubscriptionGroupPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
+ return BrokerPathConfigHelper.getSubscriptionGroupPath(System.getProperty("user.home") + File.separator + "store");
+ }
+
+ @Override
+ public void decode(String jsonString) {
+ if (jsonString != null) {
+ SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);
+ if (obj != null) {
+ this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable);
+ this.dataVersion.assignNewOne(obj.dataVersion);
+ this.printLoadDataWhenFirstBoot(obj);
+ }
+ }
+ }
+
+ public String encode(final boolean prettyFormat) {
+ return RemotingSerializable.toJson(this, prettyFormat);
+ }
+
+ private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager sgm) {
+ Iterator<Entry<String, SubscriptionGroupConfig>> it = sgm.getSubscriptionGroupTable().entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, SubscriptionGroupConfig> next = it.next();
+ log.info("load exist subscription group, {}", next.getValue().toString());
+ }
+ }
+
+ public ConcurrentHashMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {
+ return subscriptionGroupTable;
+ }
+
+
+ public DataVersion getDataVersion() {
+ return dataVersion;
+ }
+
+
+ public void deleteSubscriptionGroupConfig(final String groupName) {
+ SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName);
+ if (old != null) {
+ log.info("delete subscription group OK, subscription group: " + old);
+ this.dataVersion.nextVersion();
+ this.persist();
+ } else {
+ log.warn("delete subscription group failed, subscription group: " + old + " not exist");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
new file mode 100644
index 0000000..40fdd68
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -0,0 +1,440 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.topic;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.BrokerPathConfigHelper;
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.protocol.body.KVTable;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class TopicConfigManager extends ConfigManager {
+ private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private static final long LOCK_TIMEOUT_MILLIS = 3000;
+ private transient final Lock lockTopicConfigTable = new ReentrantLock();
+
+ private final ConcurrentHashMap<String, TopicConfig> topicConfigTable =
+ new ConcurrentHashMap<String, TopicConfig>(1024);
+ private final DataVersion dataVersion = new DataVersion();
+ private final Set<String> systemTopicList = new HashSet<String>();
+ private transient BrokerController brokerController;
+
+
+ public TopicConfigManager() {
+ }
+
+
+ public TopicConfigManager(BrokerController brokerController) {
+ this.brokerController = brokerController;
+ {
+ // MixAll.SELF_TEST_TOPIC
+ String topic = MixAll.SELF_TEST_TOPIC;
+ TopicConfig topicConfig = new TopicConfig(topic);
+ this.systemTopicList.add(topic);
+ topicConfig.setReadQueueNums(1);
+ topicConfig.setWriteQueueNums(1);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ }
+ {
+ // MixAll.DEFAULT_TOPIC
+ if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
+ String topic = MixAll.DEFAULT_TOPIC;
+ TopicConfig topicConfig = new TopicConfig(topic);
+ this.systemTopicList.add(topic);
+ topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
+ .getDefaultTopicQueueNums());
+ topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
+ .getDefaultTopicQueueNums());
+ int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
+ topicConfig.setPerm(perm);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ }
+ }
+ {
+ // MixAll.BENCHMARK_TOPIC
+ String topic = MixAll.BENCHMARK_TOPIC;
+ TopicConfig topicConfig = new TopicConfig(topic);
+ this.systemTopicList.add(topic);
+ topicConfig.setReadQueueNums(1024);
+ topicConfig.setWriteQueueNums(1024);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ }
+ {
+
+ String topic = this.brokerController.getBrokerConfig().getBrokerClusterName();
+ TopicConfig topicConfig = new TopicConfig(topic);
+ this.systemTopicList.add(topic);
+ int perm = PermName.PERM_INHERIT;
+ if (this.brokerController.getBrokerConfig().isClusterTopicEnable()) {
+ perm |= PermName.PERM_READ | PermName.PERM_WRITE;
+ }
+ topicConfig.setPerm(perm);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ }
+ {
+
+ String topic = this.brokerController.getBrokerConfig().getBrokerName();
+ TopicConfig topicConfig = new TopicConfig(topic);
+ this.systemTopicList.add(topic);
+ int perm = PermName.PERM_INHERIT;
+ if (this.brokerController.getBrokerConfig().isBrokerTopicEnable()) {
+ perm |= PermName.PERM_READ | PermName.PERM_WRITE;
+ }
+ topicConfig.setReadQueueNums(1);
+ topicConfig.setWriteQueueNums(1);
+ topicConfig.setPerm(perm);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ }
+ {
+ // MixAll.OFFSET_MOVED_EVENT
+ String topic = MixAll.OFFSET_MOVED_EVENT;
+ TopicConfig topicConfig = new TopicConfig(topic);
+ this.systemTopicList.add(topic);
+ topicConfig.setReadQueueNums(1);
+ topicConfig.setWriteQueueNums(1);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ }
+ }
+
+
+ public boolean isSystemTopic(final String topic) {
+ return this.systemTopicList.contains(topic);
+ }
+
+
+ public Set<String> getSystemTopic() {
+ return this.systemTopicList;
+ }
+
+
+ public boolean isTopicCanSendMessage(final String topic) {
+ return !topic.equals(MixAll.DEFAULT_TOPIC);
+ }
+
+
+ public TopicConfig selectTopicConfig(final String topic) {
+ return this.topicConfigTable.get(topic);
+ }
+
+
+ public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
+ final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
+ TopicConfig topicConfig = null;
+ boolean createNew = false;
+
+ try {
+ if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ topicConfig = this.topicConfigTable.get(topic);
+ if (topicConfig != null)
+ return topicConfig;
+
+ TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
+ if (defaultTopicConfig != null) {
+ if (defaultTopic.equals(MixAll.DEFAULT_TOPIC)) {
+ if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
+ defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
+ }
+ }
+
+ if (PermName.isInherited(defaultTopicConfig.getPerm())) {
+ topicConfig = new TopicConfig(topic);
+
+ int queueNums =
+ clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig
+ .getWriteQueueNums() : clientDefaultTopicQueueNums;
+
+ if (queueNums < 0) {
+ queueNums = 0;
+ }
+
+ topicConfig.setReadQueueNums(queueNums);
+ topicConfig.setWriteQueueNums(queueNums);
+ int perm = defaultTopicConfig.getPerm();
+ perm &= ~PermName.PERM_INHERIT;
+ topicConfig.setPerm(perm);
+ topicConfig.setTopicSysFlag(topicSysFlag);
+ topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
+ } else {
+ LOG.warn("create new topic failed, because the default topic[" + defaultTopic
+ + "] no perm, " + defaultTopicConfig.getPerm() + " producer: "
+ + remoteAddress);
+ }
+ } else {
+ LOG.warn("create new topic failed, because the default topic[" + defaultTopic
+ + "] not exist." + " producer: " + remoteAddress);
+ }
+
+ if (topicConfig != null) {
+ LOG.info("create new topic by default topic[" + defaultTopic + "], " + topicConfig
+ + " producer: " + remoteAddress);
+
+ this.topicConfigTable.put(topic, topicConfig);
+
+ this.dataVersion.nextVersion();
+
+ createNew = true;
+
+ this.persist();
+ }
+ } finally {
+ this.lockTopicConfigTable.unlock();
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.error("createTopicInSendMessageMethod exception", e);
+ }
+
+ if (createNew) {
+ this.brokerController.registerBrokerAll(false, true);
+ }
+
+ return topicConfig;
+ }
+
+ public TopicConfig createTopicInSendMessageBackMethod(
+ final String topic,
+ final int clientDefaultTopicQueueNums,
+ final int perm,
+ final int topicSysFlag) {
+ TopicConfig topicConfig = this.topicConfigTable.get(topic);
+ if (topicConfig != null)
+ return topicConfig;
+
+ boolean createNew = false;
+
+ try {
+ if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ topicConfig = this.topicConfigTable.get(topic);
+ if (topicConfig != null)
+ return topicConfig;
+
+ topicConfig = new TopicConfig(topic);
+ topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);
+ topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);
+ topicConfig.setPerm(perm);
+ topicConfig.setTopicSysFlag(topicSysFlag);
+
+ LOG.info("create new topic {}", topicConfig);
+ this.topicConfigTable.put(topic, topicConfig);
+ createNew = true;
+ this.dataVersion.nextVersion();
+ this.persist();
+ } finally {
+ this.lockTopicConfigTable.unlock();
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.error("createTopicInSendMessageBackMethod exception", e);
+ }
+
+ if (createNew) {
+ this.brokerController.registerBrokerAll(false, true);
+ }
+
+ return topicConfig;
+ }
+
+ public void updateTopicUnitFlag(final String topic, final boolean unit) {
+
+ TopicConfig topicConfig = this.topicConfigTable.get(topic);
+ if (topicConfig != null) {
+ int oldTopicSysFlag = topicConfig.getTopicSysFlag();
+ if (unit) {
+ topicConfig.setTopicSysFlag(TopicSysFlag.setUnitFlag(oldTopicSysFlag));
+ } else {
+ topicConfig.setTopicSysFlag(TopicSysFlag.clearUnitFlag(oldTopicSysFlag));
+ }
+
+ LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
+ topicConfig.getTopicSysFlag());
+
+ this.topicConfigTable.put(topic, topicConfig);
+
+ this.dataVersion.nextVersion();
+
+ this.persist();
+ this.brokerController.registerBrokerAll(false, true);
+ }
+ }
+
+ public void updateTopicUnitSubFlag(final String topic, final boolean hasUnitSub) {
+ TopicConfig topicConfig = this.topicConfigTable.get(topic);
+ if (topicConfig != null) {
+ int oldTopicSysFlag = topicConfig.getTopicSysFlag();
+ if (hasUnitSub) {
+ topicConfig.setTopicSysFlag(TopicSysFlag.setUnitSubFlag(oldTopicSysFlag));
+ }
+
+ LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
+ topicConfig.getTopicSysFlag());
+
+ this.topicConfigTable.put(topic, topicConfig);
+
+ this.dataVersion.nextVersion();
+
+ this.persist();
+ this.brokerController.registerBrokerAll(false, true);
+ }
+ }
+
+ public void updateTopicConfig(final TopicConfig topicConfig) {
+ TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ if (old != null) {
+ LOG.info("update topic config, old: " + old + " new: " + topicConfig);
+ } else {
+ LOG.info("create new topic, " + topicConfig);
+ }
+
+ this.dataVersion.nextVersion();
+
+ this.persist();
+ }
+
+
+ public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) {
+
+ if (orderKVTableFromNs != null && orderKVTableFromNs.getTable() != null) {
+ boolean isChange = false;
+ Set<String> orderTopics = orderKVTableFromNs.getTable().keySet();
+ for (String topic : orderTopics) {
+ TopicConfig topicConfig = this.topicConfigTable.get(topic);
+ if (topicConfig != null && !topicConfig.isOrder()) {
+ topicConfig.setOrder(true);
+ isChange = true;
+ LOG.info("update order topic config, topic={}, order={}", topic, true);
+ }
+ }
+
+ for (Map.Entry<String, TopicConfig> entry : this.topicConfigTable.entrySet()) {
+ String topic = entry.getKey();
+ if (!orderTopics.contains(topic)) {
+ TopicConfig topicConfig = entry.getValue();
+ if (topicConfig.isOrder()) {
+ topicConfig.setOrder(false);
+ isChange = true;
+ LOG.info("update order topic config, topic={}, order={}", topic, false);
+ }
+ }
+ }
+
+ if (isChange) {
+ this.dataVersion.nextVersion();
+ this.persist();
+ }
+ }
+ }
+
+ public boolean isOrderTopic(final String topic) {
+ TopicConfig topicConfig = this.topicConfigTable.get(topic);
+ if (topicConfig == null) {
+ return false;
+ } else {
+ return topicConfig.isOrder();
+ }
+ }
+
+ public void deleteTopicConfig(final String topic) {
+ TopicConfig old = this.topicConfigTable.remove(topic);
+ if (old != null) {
+ LOG.info("delete topic config OK, topic: " + old);
+ this.dataVersion.nextVersion();
+ this.persist();
+ } else {
+ LOG.warn("delete topic config failed, topic: " + topic + " not exist");
+ }
+ }
+
+ public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() {
+ TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+ topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
+ topicConfigSerializeWrapper.setDataVersion(this.dataVersion);
+ return topicConfigSerializeWrapper;
+ }
+
+ @Override
+ public String encode() {
+ return encode(false);
+ }
+
+ @Override
+ public String configFilePath() {
+// return BrokerPathConfigHelper.getTopicConfigPath(this.brokerController.getMessageStoreConfig()
+// .getStorePathRootDir());
+ return BrokerPathConfigHelper.getTopicConfigPath(System.getProperty("user.home") + File.separator + "store");
+ }
+
+ @Override
+ public void decode(String jsonString) {
+ if (jsonString != null) {
+ TopicConfigSerializeWrapper topicConfigSerializeWrapper =
+ TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class);
+ if (topicConfigSerializeWrapper != null) {
+ this.topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable());
+ this.dataVersion.assignNewOne(topicConfigSerializeWrapper.getDataVersion());
+ this.printLoadDataWhenFirstBoot(topicConfigSerializeWrapper);
+ }
+ }
+ }
+
+ public String encode(final boolean prettyFormat) {
+ TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+ topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
+ topicConfigSerializeWrapper.setDataVersion(this.dataVersion);
+ return topicConfigSerializeWrapper.toJson(prettyFormat);
+ }
+
+ private void printLoadDataWhenFirstBoot(final TopicConfigSerializeWrapper tcs) {
+ Iterator<Entry<String, TopicConfig>> it = tcs.getTopicConfigTable().entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, TopicConfig> next = it.next();
+ LOG.info("load exist local topic, {}", next.getValue().toString());
+ }
+ }
+
+ public DataVersion getDataVersion() {
+ return dataVersion;
+ }
+
+ public ConcurrentHashMap<String, TopicConfig> getTopicConfigTable() {
+ return topicConfigTable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java
new file mode 100644
index 0000000..68256d9
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.transaction;
+
+public class TransactionRecord {
+ // Commit Log Offset
+ private long offset;
+ private String producerGroup;
+
+
+ public long getOffset() {
+ return offset;
+ }
+
+
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+
+
+ public String getProducerGroup() {
+ return producerGroup;
+ }
+
+
+ public void setProducerGroup(String producerGroup) {
+ this.producerGroup = producerGroup;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java
new file mode 100644
index 0000000..758eeed
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.transaction;
+
+import java.util.List;
+
+
+public interface TransactionStore {
+ public boolean open();
+
+
+ public void close();
+
+
+ public boolean put(final List<TransactionRecord> trs);
+
+
+ public void remove(final List<Long> pks);
+
+
+ public List<TransactionRecord> traverse(final long pk, final int nums);
+
+
+ public long totalRecords();
+
+
+ public long minPK();
+
+
+ public long maxPK();
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java
new file mode 100644
index 0000000..4bf73d2
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.transaction.jdbc;
+
+import org.apache.rocketmq.broker.transaction.TransactionRecord;
+import org.apache.rocketmq.broker.transaction.TransactionStore;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.sql.*;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public class JDBCTransactionStore implements TransactionStore {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
+ private final JDBCTransactionStoreConfig jdbcTransactionStoreConfig;
+ private Connection connection;
+ private AtomicLong totalRecordsValue = new AtomicLong(0);
+
+ public JDBCTransactionStore(JDBCTransactionStoreConfig jdbcTransactionStoreConfig) {
+ this.jdbcTransactionStoreConfig = jdbcTransactionStoreConfig;
+ }
+
+ @Override
+ public boolean open() {
+ if (this.loadDriver()) {
+ Properties props = new Properties();
+ props.put("user", jdbcTransactionStoreConfig.getJdbcUser());
+ props.put("password", jdbcTransactionStoreConfig.getJdbcPassword());
+
+ try {
+ this.connection =
+ DriverManager.getConnection(this.jdbcTransactionStoreConfig.getJdbcURL(), props);
+
+ this.connection.setAutoCommit(false);
+
+
+ if (!this.computeTotalRecords()) {
+ return this.createDB();
+ }
+
+ return true;
+ } catch (SQLException e) {
+ log.info("Create JDBC Connection Exeption", e);
+ }
+ }
+
+ return false;
+ }
+
+ private boolean loadDriver() {
+ try {
+ Class.forName(this.jdbcTransactionStoreConfig.getJdbcDriverClass()).newInstance();
+ log.info("Loaded the appropriate driver, {}",
+ this.jdbcTransactionStoreConfig.getJdbcDriverClass());
+ return true;
+ } catch (Exception e) {
+ log.info("Loaded the appropriate driver Exception", e);
+ }
+
+ return false;
+ }
+
+ private boolean computeTotalRecords() {
+ Statement statement = null;
+ ResultSet resultSet = null;
+ try {
+ statement = this.connection.createStatement();
+
+ resultSet = statement.executeQuery("select count(offset) as total from t_transaction");
+ if (!resultSet.next()) {
+ log.warn("computeTotalRecords ResultSet is empty");
+ return false;
+ }
+
+ this.totalRecordsValue.set(resultSet.getLong(1));
+ } catch (Exception e) {
+ log.warn("computeTotalRecords Exception", e);
+ return false;
+ } finally {
+ if (null != statement) {
+ try {
+ statement.close();
+ } catch (SQLException e) {
+ }
+ }
+
+ if (null != resultSet) {
+ try {
+ resultSet.close();
+ } catch (SQLException e) {
+ }
+ }
+ }
+
+ return true;
+ }
+
+ private boolean createDB() {
+ Statement statement = null;
+ try {
+ statement = this.connection.createStatement();
+
+ String sql = this.createTableSql();
+ log.info("createDB SQL:\n {}", sql);
+ statement.execute(sql);
+ this.connection.commit();
+ return true;
+ } catch (Exception e) {
+ log.warn("createDB Exception", e);
+ return false;
+ } finally {
+ if (null != statement) {
+ try {
+ statement.close();
+ } catch (SQLException e) {
+ log.warn("Close statement exception", e);
+ }
+ }
+ }
+ }
+
+ private String createTableSql() {
+ URL resource = JDBCTransactionStore.class.getClassLoader().getResource("transaction.sql");
+ String fileContent = MixAll.file2String(resource);
+ return fileContent;
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (this.connection != null) {
+ this.connection.close();
+ }
+ } catch (SQLException e) {
+ }
+ }
+
+ @Override
+ public boolean put(List<TransactionRecord> trs) {
+ PreparedStatement statement = null;
+ try {
+ this.connection.setAutoCommit(false);
+ statement = this.connection.prepareStatement("insert into t_transaction values (?, ?)");
+ for (TransactionRecord tr : trs) {
+ statement.setLong(1, tr.getOffset());
+ statement.setString(2, tr.getProducerGroup());
+ statement.addBatch();
+ }
+ int[] executeBatch = statement.executeBatch();
+ this.connection.commit();
+ this.totalRecordsValue.addAndGet(updatedRows(executeBatch));
+ return true;
+ } catch (Exception e) {
+ log.warn("createDB Exception", e);
+ return false;
+ } finally {
+ if (null != statement) {
+ try {
+ statement.close();
+ } catch (SQLException e) {
+ log.warn("Close statement exception", e);
+ }
+ }
+ }
+ }
+
+ private long updatedRows(int[] rows) {
+ long res = 0;
+ for (int i : rows) {
+ res += i;
+ }
+
+ return res;
+ }
+
+ @Override
+ public void remove(List<Long> pks) {
+ PreparedStatement statement = null;
+ try {
+ this.connection.setAutoCommit(false);
+ statement = this.connection.prepareStatement("DELETE FROM t_transaction WHERE offset = ?");
+ for (long pk : pks) {
+ statement.setLong(1, pk);
+ statement.addBatch();
+ }
+ int[] executeBatch = statement.executeBatch();
+ this.connection.commit();
+ } catch (Exception e) {
+ log.warn("createDB Exception", e);
+ } finally {
+ if (null != statement) {
+ try {
+ statement.close();
+ } catch (SQLException e) {
+ }
+ }
+ }
+ }
+
+ @Override
+ public List<TransactionRecord> traverse(long pk, int nums) {
+ return null;
+ }
+
+ @Override
+ public long totalRecords() {
+ return this.totalRecordsValue.get();
+ }
+
+ @Override
+ public long minPK() {
+ return 0;
+ }
+
+ @Override
+ public long maxPK() {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java
new file mode 100644
index 0000000..5789329
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.transaction.jdbc;
+
+public class JDBCTransactionStoreConfig {
+ private String jdbcDriverClass = "com.mysql.jdbc.Driver";
+ private String jdbcURL = "jdbc:mysql://xxx.xxx.xxx.xxx:1000/xxx?useUnicode=true&characterEncoding=UTF-8";
+ private String jdbcUser = "xxx";
+ private String jdbcPassword = "xxx";
+
+
+ public String getJdbcDriverClass() {
+ return jdbcDriverClass;
+ }
+
+
+ public void setJdbcDriverClass(String jdbcDriverClass) {
+ this.jdbcDriverClass = jdbcDriverClass;
+ }
+
+
+ public String getJdbcURL() {
+ return jdbcURL;
+ }
+
+
+ public void setJdbcURL(String jdbcURL) {
+ this.jdbcURL = jdbcURL;
+ }
+
+
+ public String getJdbcUser() {
+ return jdbcUser;
+ }
+
+
+ public void setJdbcUser(String jdbcUser) {
+ this.jdbcUser = jdbcUser;
+ }
+
+
+ public String getJdbcPassword() {
+ return jdbcPassword;
+ }
+
+
+ public void setJdbcPassword(String jdbcPassword) {
+ this.jdbcPassword = jdbcPassword;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java
deleted file mode 100644
index b661385..0000000
--- a/broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.broker;
-
-import com.alibaba.rocketmq.common.BrokerConfig;
-import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
-import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
-import com.alibaba.rocketmq.store.config.MessageStoreConfig;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author shtykh_roman
- */
-public class BrokerControllerTest {
- protected Logger logger = LoggerFactory.getLogger(BrokerControllerTest.class);
-
- private static final int RESTART_NUM = 3;
-
- /**
- * Tests if the controller can be properly stopped and started.
- *
- * @throws Exception If fails.
- */
- @Test
- public void testRestart() throws Exception {
-
- for (int i = 0; i < RESTART_NUM; i++) {
- BrokerController brokerController = new BrokerController(//
- new BrokerConfig(), //
- new NettyServerConfig(), //
- new NettyClientConfig(), //
- new MessageStoreConfig());
- boolean initResult = brokerController.initialize();
- Assert.assertTrue(initResult);
- logger.info("Broker is initialized " + initResult);
- brokerController.start();
- logger.info("Broker is started");
-
- brokerController.shutdown();
- logger.info("Broker is stopped");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/com/alibaba/rocketmq/broker/BrokerTestHarness.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/com/alibaba/rocketmq/broker/BrokerTestHarness.java b/broker/src/test/java/com/alibaba/rocketmq/broker/BrokerTestHarness.java
deleted file mode 100644
index ca6f17b..0000000
--- a/broker/src/test/java/com/alibaba/rocketmq/broker/BrokerTestHarness.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
- */
-package com.alibaba.rocketmq.broker;
-
-import com.alibaba.rocketmq.common.BrokerConfig;
-import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
-import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
-import com.alibaba.rocketmq.store.config.MessageStoreConfig;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Random;
-
-/**
- * @author zander
- */
-public class BrokerTestHarness {
-
- protected BrokerController brokerController = null;
-
- protected Random random = new Random();
- public final String BROKER_NAME = "TestBrokerName";
- protected String brokerAddr = "";
- protected Logger logger = LoggerFactory.getLogger(BrokerTestHarness.class);
- protected BrokerConfig brokerConfig = new BrokerConfig();
- protected NettyServerConfig nettyServerConfig = new NettyServerConfig();
- protected NettyClientConfig nettyClientConfig = new NettyClientConfig();
- protected MessageStoreConfig storeConfig = new MessageStoreConfig();
-
- @Before
- public void startup() throws Exception {
- brokerConfig.setBrokerName(BROKER_NAME);
- brokerConfig.setBrokerIP1("127.0.0.1");
- storeConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "unitteststore");
- storeConfig.setStorePathCommitLog(System.getProperty("user.home") + File.separator + "unitteststore" + File.separator + "commitlog");
- nettyServerConfig.setListenPort(10000 + random.nextInt(1000));
- brokerAddr = brokerConfig.getBrokerIP1() + ":" + nettyServerConfig.getListenPort();
- brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig);
- boolean initResult = brokerController.initialize();
- Assert.assertTrue(initResult);
- logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
- brokerController.start();
- }
-
- @After
- public void shutdown() throws Exception {
- if (brokerController != null) {
- brokerController.shutdown();
- }
- //maybe need to clean the file store. But we do not suggest deleting anything.
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java b/broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java
deleted file mode 100644
index cf97876..0000000
--- a/broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
- */
-package com.alibaba.rocketmq.broker.api;
-
-import com.alibaba.rocketmq.broker.BrokerTestHarness;
-import com.alibaba.rocketmq.client.ClientConfig;
-import com.alibaba.rocketmq.client.hook.SendMessageContext;
-import com.alibaba.rocketmq.client.impl.CommunicationMode;
-import com.alibaba.rocketmq.client.impl.MQClientAPIImpl;
-import com.alibaba.rocketmq.client.producer.SendResult;
-import com.alibaba.rocketmq.client.producer.SendStatus;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.common.message.MessageDecoder;
-import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader;
-import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-
-/**
- * @author zander
- */
-public class SendMessageTest extends BrokerTestHarness{
-
- MQClientAPIImpl client = new MQClientAPIImpl(new NettyClientConfig(), null, null, new ClientConfig());
- String topic = "UnitTestTopic";
-
- @Before
- @Override
- public void startup() throws Exception {
- super.startup();
- client.start();
-
- }
-
- @After
- @Override
- public void shutdown() throws Exception {
- client.shutdown();
- super.shutdown();
- }
-
- @Test
- public void testSendSingle() throws Exception{
- Message msg = new Message(topic, "TAG1 TAG2", "100200300", "body".getBytes());
- SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
- requestHeader.setProducerGroup("abc");
- requestHeader.setTopic(msg.getTopic());
- requestHeader.setDefaultTopic(MixAll.DEFAULT_TOPIC);
- requestHeader.setDefaultTopicQueueNums(4);
- requestHeader.setQueueId(0);
- requestHeader.setSysFlag(0);
- requestHeader.setBornTimestamp(System.currentTimeMillis());
- requestHeader.setFlag(msg.getFlag());
- requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
-
- SendResult result = client.sendMessage(brokerAddr, BROKER_NAME, msg, requestHeader, 1000 * 5,
- CommunicationMode.SYNC, new SendMessageContext(), null);
- assertEquals(result.getSendStatus(), SendStatus.SEND_OK);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java b/broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
deleted file mode 100644
index 94504a4..0000000
--- a/broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * $Id: ConsumerOffsetManagerTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
- */
-package com.alibaba.rocketmq.broker.offset;
-
-import com.alibaba.rocketmq.broker.BrokerTestHarness;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-
-/**
- * @author zander
- */
-public class ConsumerOffsetManagerTest extends BrokerTestHarness{
-
- @Test
- public void testFlushConsumerOffset() throws Exception {
- ConsumerOffsetManager consumerOffsetManager = new ConsumerOffsetManager(brokerController);
- for (int i = 0; i < 10; i++) {
- String group = "UNIT_TEST_GROUP_" + i;
- for (int id = 0; id < 10; id++) {
- consumerOffsetManager.commitOffset(null, group, "TOPIC_A", id, id + 100);
- consumerOffsetManager.commitOffset(null, group, "TOPIC_B", id, id + 100);
- consumerOffsetManager.commitOffset(null, group, "TOPIC_C", id, id + 100);
- }
- }
- consumerOffsetManager.persist();
- consumerOffsetManager.getOffsetTable().clear();
- for (int i = 0; i < 10; i++) {
- String group = "UNIT_TEST_GROUP_" + i;
- for (int id = 0; id < 10; id++) {
- assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_A", id), -1);
- assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), -1);
- assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), -1);
- }
- }
- consumerOffsetManager.load();
- for (int i = 0; i < 10; i++) {
- String group = "UNIT_TEST_GROUP_" + i;
- for (int id = 0; id < 10; id++) {
- assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_A", id), id + 100);
- assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), id + 100);
- assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), id + 100);
- }
- }
- }
-}