You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:54 UTC
[37/43] incubator-rocketmq git commit: Finish code dump. Reviewed by:
@yukon @vongosling @stevenschew @vintagewang @lollipop @zander
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/EndTransactionProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/EndTransactionProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/EndTransactionProcessor.java
new file mode 100644
index 0000000..ed10f1b
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/EndTransactionProcessor.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.processor;
+
+import com.alibaba.rocketmq.broker.BrokerController;
+import com.alibaba.rocketmq.common.TopicFilterType;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.message.MessageAccessor;
+import com.alibaba.rocketmq.common.message.MessageConst;
+import com.alibaba.rocketmq.common.message.MessageDecoder;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.protocol.ResponseCode;
+import com.alibaba.rocketmq.common.protocol.header.EndTransactionRequestHeader;
+import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import com.alibaba.rocketmq.store.MessageExtBrokerInner;
+import com.alibaba.rocketmq.store.MessageStore;
+import com.alibaba.rocketmq.store.PutMessageResult;
+import io.netty.channel.ChannelHandlerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class EndTransactionProcessor implements NettyRequestProcessor {
+ private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
+ private final BrokerController brokerController;
+
+ public EndTransactionProcessor(final BrokerController brokerController) {
+ this.brokerController = brokerController;
+ }
+
+ @Override
+ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ final EndTransactionRequestHeader requestHeader =
+ (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
+
+
+ if (requestHeader.getFromTransactionCheck()) {
+ switch (requestHeader.getCommitOrRollback()) {
+ case MessageSysFlag.TRANSACTION_NOT_TYPE: {
+ LOGGER.warn("check producer[{}] transaction state, but it's pending status."
+ + "RequestHeader: {} Remark: {}",
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+ requestHeader.toString(),
+ request.getRemark());
+ return null;
+ }
+
+ case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
+ LOGGER.warn("check producer[{}] transaction state, the producer commit the message."
+ + "RequestHeader: {} Remark: {}",
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+ requestHeader.toString(),
+ request.getRemark());
+
+ break;
+ }
+
+ case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
+ LOGGER.warn("check producer[{}] transaction state, the producer rollback the message."
+ + "RequestHeader: {} Remark: {}",
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+ requestHeader.toString(),
+ request.getRemark());
+ break;
+ }
+ default:
+ return null;
+ }
+ } else {
+ switch (requestHeader.getCommitOrRollback()) {
+ case MessageSysFlag.TRANSACTION_NOT_TYPE: {
+ LOGGER.warn("the producer[{}] end transaction in sending message, and it's pending status."
+ + "RequestHeader: {} Remark: {}",
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+ requestHeader.toString(),
+ request.getRemark());
+ return null;
+ }
+
+ case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
+ break;
+ }
+
+ case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
+ LOGGER.warn("the producer[{}] end transaction in sending message, rollback the message."
+ + "RequestHeader: {} Remark: {}",
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+ requestHeader.toString(),
+ request.getRemark());
+ break;
+ }
+ default:
+ return null;
+ }
+ }
+
+ final MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getCommitLogOffset());
+ if (msgExt != null) {
+ final String pgroupRead = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
+ if (!pgroupRead.equals(requestHeader.getProducerGroup())) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("the producer group wrong");
+ return response;
+ }
+
+ if (msgExt.getQueueOffset() != requestHeader.getTranStateTableOffset()) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("the transaction state table offset wrong");
+ return response;
+ }
+
+ if (msgExt.getCommitLogOffset() != requestHeader.getCommitLogOffset()) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("the commit log offset wrong");
+ return response;
+ }
+
+ MessageExtBrokerInner msgInner = this.endMessageTransaction(msgExt);
+ msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
+
+ msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
+ msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
+ msgInner.setStoreTimestamp(msgExt.getStoreTimestamp());
+ if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
+ msgInner.setBody(null);
+ }
+
+ final MessageStore messageStore = this.brokerController.getMessageStore();
+ final PutMessageResult putMessageResult = messageStore.putMessage(msgInner);
+ if (putMessageResult != null) {
+ switch (putMessageResult.getPutMessageStatus()) {
+ // Success
+ case PUT_OK:
+ case FLUSH_DISK_TIMEOUT:
+ case FLUSH_SLAVE_TIMEOUT:
+ case SLAVE_NOT_AVAILABLE:
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ break;
+ // Failed
+ case CREATE_MAPEDFILE_FAILED:
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("create maped file failed.");
+ break;
+ case MESSAGE_ILLEGAL:
+ case PROPERTIES_SIZE_EXCEEDED:
+ response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+ response.setRemark("the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
+ break;
+ case SERVICE_NOT_AVAILABLE:
+ response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
+ response.setRemark("service not available now.");
+ break;
+ case OS_PAGECACHE_BUSY:
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("OS page cache busy, please try another machine");
+ break;
+ case UNKNOWN_ERROR:
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("UNKNOWN_ERROR");
+ break;
+ default:
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("UNKNOWN_ERROR DEFAULT");
+ break;
+ }
+
+ return response;
+ } else {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("store putMessage return null");
+ }
+ } else {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("find prepared transaction message failed");
+ return response;
+ }
+
+ return response;
+ }
+
+ @Override
+ public boolean rejectRequest() {
+ return false;
+ }
+
+ private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {
+ MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+ msgInner.setBody(msgExt.getBody());
+ msgInner.setFlag(msgExt.getFlag());
+ MessageAccessor.setProperties(msgInner, msgExt.getProperties());
+
+ TopicFilterType topicFilterType =
+ (msgInner.getSysFlag() & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG
+ : TopicFilterType.SINGLE_TAG;
+ long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
+ msgInner.setTagsCode(tagsCodeValue);
+ msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
+
+ msgInner.setSysFlag(msgExt.getSysFlag());
+ msgInner.setBornTimestamp(msgExt.getBornTimestamp());
+ msgInner.setBornHost(msgExt.getBornHost());
+ msgInner.setStoreHost(msgExt.getStoreHost());
+ msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
+
+ msgInner.setWaitStoreMsgOK(false);
+ MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+
+ msgInner.setTopic(msgExt.getTopic());
+ msgInner.setQueueId(msgExt.getQueueId());
+
+ return msgInner;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/ForwardRequestProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/ForwardRequestProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/ForwardRequestProcessor.java
new file mode 100644
index 0000000..a92ead0
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/ForwardRequestProcessor.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.processor;
+
+import com.alibaba.rocketmq.broker.BrokerController;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.channel.ChannelHandlerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ForwardRequestProcessor implements NettyRequestProcessor {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+ private final BrokerController brokerController;
+
+
+ public ForwardRequestProcessor(final BrokerController brokerController) {
+ this.brokerController = brokerController;
+ }
+
+
+ @Override
+ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
+ return null;
+ }
+
+ @Override
+ public boolean rejectRequest() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java
new file mode 100644
index 0000000..0152b93
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.processor;
+
+import com.alibaba.rocketmq.broker.BrokerController;
+import com.alibaba.rocketmq.broker.client.ConsumerGroupInfo;
+import com.alibaba.rocketmq.broker.longpolling.PullRequest;
+import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageContext;
+import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageHook;
+import com.alibaba.rocketmq.broker.pagecache.ManyMessageTransfer;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.TopicConfig;
+import com.alibaba.rocketmq.common.TopicFilterType;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.constant.PermName;
+import com.alibaba.rocketmq.common.filter.FilterAPI;
+import com.alibaba.rocketmq.common.help.FAQUrl;
+import com.alibaba.rocketmq.common.message.MessageDecoder;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.ResponseCode;
+import com.alibaba.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import com.alibaba.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
+import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import com.alibaba.rocketmq.common.protocol.topic.OffsetMovedEvent;
+import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
+import com.alibaba.rocketmq.common.sysflag.PullSysFlag;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import com.alibaba.rocketmq.store.GetMessageResult;
+import com.alibaba.rocketmq.store.MessageExtBrokerInner;
+import com.alibaba.rocketmq.store.PutMessageResult;
+import com.alibaba.rocketmq.store.config.BrokerRole;
+import com.alibaba.rocketmq.store.stats.BrokerStatsManager;
+import io.netty.channel.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class PullMessageProcessor implements NettyRequestProcessor {
+ private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private final BrokerController brokerController;
+ private List<ConsumeMessageHook> consumeMessageHookList;
+
+ public PullMessageProcessor(final BrokerController brokerController) {
+ this.brokerController = brokerController;
+ }
+
+ @Override
+ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ return this.processRequest(ctx.channel(), request, true);
+ }
+
+ @Override
+ public boolean rejectRequest() {
+ return false;
+ }
+
+ private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
+ throws RemotingCommandException {
+ RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
+ final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
+ final PullMessageRequestHeader requestHeader =
+ (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
+
+
+ response.setOpaque(request.getOpaque());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("receive PullMessage request command, " + request);
+ }
+
+
+ if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
+ response.setCode(ResponseCode.NO_PERMISSION);
+ response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] pulling message is forbidden");
+ return response;
+ }
+
+
+ SubscriptionGroupConfig subscriptionGroupConfig =
+ this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
+ if (null == subscriptionGroupConfig) {
+ response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
+ response.setRemark("subscription group not exist, " + requestHeader.getConsumerGroup() + " "
+ + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
+ return response;
+ }
+
+
+ if (!subscriptionGroupConfig.isConsumeEnable()) {
+ response.setCode(ResponseCode.NO_PERMISSION);
+ response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
+ return response;
+ }
+
+ final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
+ final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
+ final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
+
+ final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
+
+
+ TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
+ if (null == topicConfig) {
+ LOG.error("the topic " + requestHeader.getTopic() + " not exist, consumer: " + RemotingHelper.parseChannelRemoteAddr(channel));
+ response.setCode(ResponseCode.TOPIC_NOT_EXIST);
+ response.setRemark(
+ "topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
+ return response;
+ }
+
+
+ if (!PermName.isReadable(topicConfig.getPerm())) {
+ response.setCode(ResponseCode.NO_PERMISSION);
+ response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
+ return response;
+ }
+
+
+ if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
+ String errorInfo = "queueId[" + requestHeader.getQueueId() + "] is illagal,Topic :" + requestHeader.getTopic()
+ + " topicConfig.readQueueNums: " + topicConfig.getReadQueueNums() + " consumer: " + channel.remoteAddress();
+ LOG.warn(errorInfo);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(errorInfo);
+ return response;
+ }
+
+
+ SubscriptionData subscriptionData = null;
+ if (hasSubscriptionFlag) {
+ try {
+ subscriptionData = FilterAPI.buildSubscriptionData(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
+ requestHeader.getSubscription());
+ } catch (Exception e) {
+ LOG.warn("parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), //
+ requestHeader.getConsumerGroup());
+ response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
+ response.setRemark("parse the consumer's subscription failed");
+ return response;
+ }
+ } else {
+ ConsumerGroupInfo consumerGroupInfo =
+ this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
+ if (null == consumerGroupInfo) {
+ LOG.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
+ response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
+ response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
+ return response;
+ }
+
+ if (!subscriptionGroupConfig.isConsumeBroadcastEnable() //
+ && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
+ response.setCode(ResponseCode.NO_PERMISSION);
+ response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
+ return response;
+ }
+
+ subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
+ if (null == subscriptionData) {
+ LOG.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
+ response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
+ response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
+ return response;
+ }
+
+
+ if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
+ LOG.warn("the broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
+ subscriptionData.getSubString());
+ response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
+ response.setRemark("the consumer's subscription not latest");
+ return response;
+ }
+ }
+
+ final GetMessageResult getMessageResult =
+ this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
+ requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData);
+ if (getMessageResult != null) {
+ response.setRemark(getMessageResult.getStatus().name());
+ responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
+ responseHeader.setMinOffset(getMessageResult.getMinOffset());
+ responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
+
+
+ if (getMessageResult.isSuggestPullingFromSlave()) {
+ responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
+ } else {
+ responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
+ }
+
+ switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
+ case ASYNC_MASTER:
+ case SYNC_MASTER:
+ break;
+ case SLAVE:
+ if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
+ response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
+ responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
+ }
+ break;
+ }
+
+ if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
+ // consume too slow ,redirect to another machine
+ if (getMessageResult.isSuggestPullingFromSlave()) {
+ responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
+ }
+ // consume ok
+ else {
+ responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
+ }
+ } else {
+ responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
+ }
+
+ switch (getMessageResult.getStatus()) {
+ case FOUND:
+ response.setCode(ResponseCode.SUCCESS);
+ break;
+ case MESSAGE_WAS_REMOVING:
+ response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
+ break;
+ case NO_MATCHED_LOGIC_QUEUE:
+ case NO_MESSAGE_IN_QUEUE:
+ if (0 != requestHeader.getQueueOffset()) {
+ response.setCode(ResponseCode.PULL_OFFSET_MOVED);
+
+ // XXX: warn and notify me
+ LOG.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", //
+ requestHeader.getQueueOffset(), //
+ getMessageResult.getNextBeginOffset(), //
+ requestHeader.getTopic(), //
+ requestHeader.getQueueId(), //
+ requestHeader.getConsumerGroup()//
+ );
+ } else {
+ response.setCode(ResponseCode.PULL_NOT_FOUND);
+ }
+ break;
+ case NO_MATCHED_MESSAGE:
+ response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
+ break;
+ case OFFSET_FOUND_NULL:
+ response.setCode(ResponseCode.PULL_NOT_FOUND);
+ break;
+ case OFFSET_OVERFLOW_BADLY:
+ response.setCode(ResponseCode.PULL_OFFSET_MOVED);
+ // XXX: warn and notify me
+ LOG.info("the request offset: " + requestHeader.getQueueOffset() + " over flow badly, broker max offset: "
+ + getMessageResult.getMaxOffset() + ", consumer: " + channel.remoteAddress());
+ break;
+ case OFFSET_OVERFLOW_ONE:
+ response.setCode(ResponseCode.PULL_NOT_FOUND);
+ break;
+ case OFFSET_TOO_SMALL:
+ response.setCode(ResponseCode.PULL_OFFSET_MOVED);
+ LOG.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
+ requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
+ getMessageResult.getMinOffset(), channel.remoteAddress());
+ break;
+ default:
+ assert false;
+ break;
+ }
+
+ if (this.hasConsumeMessageHook()) {
+ ConsumeMessageContext context = new ConsumeMessageContext();
+ context.setConsumerGroup(requestHeader.getConsumerGroup());
+ context.setTopic(requestHeader.getTopic());
+ context.setQueueId(requestHeader.getQueueId());
+
+ String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
+
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS:
+
+ context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);
+ context.setCommercialRcvTimes(getMessageResult.getMsgCount4Commercial());
+ context.setCommercialRcvSize(getMessageResult.getBufferTotalSize());
+ context.setCommercialOwner(owner);
+
+ break;
+ case ResponseCode.PULL_NOT_FOUND:
+ if (!brokerAllowSuspend) {
+
+
+ context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
+ context.setCommercialRcvTimes(1);
+ context.setCommercialOwner(owner);
+
+ }
+ break;
+ case ResponseCode.PULL_RETRY_IMMEDIATELY:
+ case ResponseCode.PULL_OFFSET_MOVED:
+ context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
+ context.setCommercialRcvTimes(1);
+ context.setCommercialOwner(owner);
+ break;
+ default:
+ assert false;
+ break;
+ }
+
+ this.executeConsumeMessageHookBefore(context);
+ }
+
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS:
+
+ this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
+ getMessageResult.getMessageCount());
+
+ this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
+ getMessageResult.getBufferTotalSize());
+
+ this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
+ if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
+ final long beginTimeMills = this.brokerController.getMessageStore().now();
+ final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
+ this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
+ requestHeader.getTopic(), requestHeader.getQueueId(),
+ (int) (this.brokerController.getMessageStore().now() - beginTimeMills));
+ response.setBody(r);
+ } else {
+ try {
+ FileRegion fileRegion =
+ new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
+ channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ getMessageResult.release();
+ if (!future.isSuccess()) {
+ LOG.error("transfer many message by pagecache failed, " + channel.remoteAddress(), future.cause());
+ }
+ }
+ });
+ } catch (Throwable e) {
+ LOG.error("transfer many message by pagecache exception", e);
+ getMessageResult.release();
+ }
+
+ response = null;
+ }
+ break;
+ case ResponseCode.PULL_NOT_FOUND:
+
+ if (brokerAllowSuspend && hasSuspendFlag) {
+ long pollingTimeMills = suspendTimeoutMillisLong;
+ if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
+ pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
+ }
+
+ String topic = requestHeader.getTopic();
+ long offset = requestHeader.getQueueOffset();
+ int queueId = requestHeader.getQueueId();
+ PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
+ this.brokerController.getMessageStore().now(), offset, subscriptionData);
+ this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
+ response = null;
+ break;
+ }
+
+
+ case ResponseCode.PULL_RETRY_IMMEDIATELY:
+ break;
+ case ResponseCode.PULL_OFFSET_MOVED:
+ if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE
+ || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
+ MessageQueue mq = new MessageQueue();
+ mq.setTopic(requestHeader.getTopic());
+ mq.setQueueId(requestHeader.getQueueId());
+ mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
+
+ OffsetMovedEvent event = new OffsetMovedEvent();
+ event.setConsumerGroup(requestHeader.getConsumerGroup());
+ event.setMessageQueue(mq);
+ event.setOffsetRequest(requestHeader.getQueueOffset());
+ event.setOffsetNew(getMessageResult.getNextBeginOffset());
+ this.generateOffsetMovedEvent(event);
+ LOG.warn(
+ "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
+ requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
+ responseHeader.getSuggestWhichBrokerId());
+ } else {
+ responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
+ response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
+ LOG.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
+ requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
+ responseHeader.getSuggestWhichBrokerId());
+ }
+
+ break;
+ default:
+ assert false;
+ }
+ } else {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("store getMessage return null");
+ }
+
+
+ boolean storeOffsetEnable = brokerAllowSuspend;
+ storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
+ storeOffsetEnable = storeOffsetEnable
+ && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
+ if (storeOffsetEnable) {
+ this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
+ requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
+ }
+ return response;
+ }
+
+
+ public boolean hasConsumeMessageHook() {
+ return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty();
+ }
+
+ public void executeConsumeMessageHookBefore(final ConsumeMessageContext context) {
+ if (hasConsumeMessageHook()) {
+ for (ConsumeMessageHook hook : this.consumeMessageHookList) {
+ try {
+ hook.consumeMessageBefore(context);
+ } catch (Throwable e) {
+ }
+ }
+ }
+ }
+
+ private byte[] readGetMessageResult(final GetMessageResult getMessageResult, final String group, final String topic, final int queueId) {
+ final ByteBuffer byteBuffer = ByteBuffer.allocate(getMessageResult.getBufferTotalSize());
+
+ long storeTimestamp = 0;
+ try {
+ List<ByteBuffer> messageBufferList = getMessageResult.getMessageBufferList();
+ for (ByteBuffer bb : messageBufferList) {
+
+ byteBuffer.put(bb);
+ storeTimestamp = bb.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
+ }
+ } finally {
+ getMessageResult.release();
+ }
+
+ this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId, this.brokerController.getMessageStore().now() - storeTimestamp);
+ return byteBuffer.array();
+ }
+
+ private void generateOffsetMovedEvent(final OffsetMovedEvent event) {
+ try {
+ MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+ msgInner.setTopic(MixAll.OFFSET_MOVED_EVENT);
+ msgInner.setTags(event.getConsumerGroup());
+ msgInner.setDelayTimeLevel(0);
+ msgInner.setKeys(event.getConsumerGroup());
+ msgInner.setBody(event.encode());
+ msgInner.setFlag(0);
+ msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
+ msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(TopicFilterType.SINGLE_TAG, msgInner.getTags()));
+
+ msgInner.setQueueId(0);
+ msgInner.setSysFlag(0);
+ msgInner.setBornTimestamp(System.currentTimeMillis());
+ msgInner.setBornHost(RemotingUtil.string2SocketAddress(this.brokerController.getBrokerAddr()));
+ msgInner.setStoreHost(msgInner.getBornHost());
+
+ msgInner.setReconsumeTimes(0);
+
+ PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
+ } catch (Exception e) {
+ LOG.warn(String.format("generateOffsetMovedEvent Exception, %s", event.toString()), e);
+ }
+ }
+
+ public void excuteRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException {
+ Runnable run = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);
+
+ if (response != null) {
+ response.setOpaque(request.getOpaque());
+ response.markResponseType();
+ try {
+ channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ LOG.error("processRequestWrapper response to " + future.channel().remoteAddress() + " failed",
+ future.cause());
+ LOG.error(request.toString());
+ LOG.error(response.toString());
+ }
+ }
+ });
+ } catch (Throwable e) {
+ LOG.error("processRequestWrapper process request over, but response failed", e);
+ LOG.error(request.toString());
+ LOG.error(response.toString());
+ }
+ }
+ } catch (RemotingCommandException e1) {
+ LOG.error("excuteRequestWhenWakeup run", e1);
+ }
+ }
+ };
+
+ this.brokerController.getPullMessageExecutor().submit(run);
+ }
+
+ public void registerConsumeMessageHook(List<ConsumeMessageHook> sendMessageHookList) {
+ this.consumeMessageHookList = sendMessageHookList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/QueryMessageProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/QueryMessageProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/QueryMessageProcessor.java
new file mode 100644
index 0000000..738d11f
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/QueryMessageProcessor.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.processor;
+
+import com.alibaba.rocketmq.broker.BrokerController;
+import com.alibaba.rocketmq.broker.pagecache.OneMessageTransfer;
+import com.alibaba.rocketmq.broker.pagecache.QueryMessageTransfer;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.protocol.RequestCode;
+import com.alibaba.rocketmq.common.protocol.ResponseCode;
+import com.alibaba.rocketmq.common.protocol.header.QueryMessageRequestHeader;
+import com.alibaba.rocketmq.common.protocol.header.QueryMessageResponseHeader;
+import com.alibaba.rocketmq.common.protocol.header.ViewMessageRequestHeader;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import com.alibaba.rocketmq.store.QueryMessageResult;
+import com.alibaba.rocketmq.store.SelectMappedBufferResult;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.FileRegion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class QueryMessageProcessor implements NettyRequestProcessor {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+ private final BrokerController brokerController;
+
+
+ public QueryMessageProcessor(final BrokerController brokerController) {
+ this.brokerController = brokerController;
+ }
+
+
+ @Override
+ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
+ throws RemotingCommandException {
+ switch (request.getCode()) {
+ case RequestCode.QUERY_MESSAGE:
+ return this.queryMessage(ctx, request);
+ case RequestCode.VIEW_MESSAGE_BY_ID:
+ return this.viewMessageById(ctx, request);
+ default:
+ break;
+ }
+
+ return null;
+ }
+
+ @Override
+ public boolean rejectRequest() {
+ return false;
+ }
+
+
+ public RemotingCommand queryMessage(ChannelHandlerContext ctx, RemotingCommand request)
+ throws RemotingCommandException {
+ final RemotingCommand response =
+ RemotingCommand.createResponseCommand(QueryMessageResponseHeader.class);
+ final QueryMessageResponseHeader responseHeader =
+ (QueryMessageResponseHeader) response.readCustomHeader();
+ final QueryMessageRequestHeader requestHeader =
+ (QueryMessageRequestHeader) request
+ .decodeCommandCustomHeader(QueryMessageRequestHeader.class);
+
+
+ response.setOpaque(request.getOpaque());
+
+
+ String isUniqueKey = request.getExtFields().get(MixAll.UNIQUE_MSG_QUERY_FLAG);
+ if (isUniqueKey != null && isUniqueKey.equals("true")) {
+ requestHeader.setMaxNum(this.brokerController.getMessageStoreConfig().getDefaultQueryMaxNum());
+ }
+
+ final QueryMessageResult queryMessageResult =
+ this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(),
+ requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(),
+ requestHeader.getEndTimestamp());
+ assert queryMessageResult != null;
+
+ responseHeader.setIndexLastUpdatePhyoffset(queryMessageResult.getIndexLastUpdatePhyoffset());
+ responseHeader.setIndexLastUpdateTimestamp(queryMessageResult.getIndexLastUpdateTimestamp());
+
+
+ if (queryMessageResult.getBufferTotalSize() > 0) {
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+
+ try {
+ FileRegion fileRegion =
+ new QueryMessageTransfer(response.encodeHeader(queryMessageResult
+ .getBufferTotalSize()), queryMessageResult);
+ ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ queryMessageResult.release();
+ if (!future.isSuccess()) {
+ log.error("transfer query message by pagecache failed, ", future.cause());
+ }
+ }
+ });
+ } catch (Throwable e) {
+ log.error("", e);
+ queryMessageResult.release();
+ }
+
+ return null;
+ }
+
+ response.setCode(ResponseCode.QUERY_NOT_FOUND);
+ response.setRemark("can not find message, maybe time range not correct");
+ return response;
+ }
+
+
+ public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request)
+ throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ final ViewMessageRequestHeader requestHeader =
+ (ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class);
+
+
+ response.setOpaque(request.getOpaque());
+
+ final SelectMappedBufferResult selectMappedBufferResult =
+ this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset());
+ if (selectMappedBufferResult != null) {
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+
+ try {
+ FileRegion fileRegion =
+ new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()),
+ selectMappedBufferResult);
+ ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ selectMappedBufferResult.release();
+ if (!future.isSuccess()) {
+ log.error("transfer one message by pagecache failed, ", future.cause());
+ }
+ }
+ });
+ } catch (Throwable e) {
+ log.error("", e);
+ selectMappedBufferResult.release();
+ }
+
+ return null;
+ } else {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("can not find message by the offset, " + requestHeader.getOffset());
+ }
+
+ return response;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java
new file mode 100644
index 0000000..414b3f4
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java
@@ -0,0 +1,496 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.processor;
+
+import com.alibaba.rocketmq.broker.BrokerController;
+import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageContext;
+import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageHook;
+import com.alibaba.rocketmq.broker.mqtrace.SendMessageContext;
+import com.alibaba.rocketmq.common.*;
+import com.alibaba.rocketmq.common.constant.PermName;
+import com.alibaba.rocketmq.common.help.FAQUrl;
+import com.alibaba.rocketmq.common.message.MessageAccessor;
+import com.alibaba.rocketmq.common.message.MessageConst;
+import com.alibaba.rocketmq.common.message.MessageDecoder;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.protocol.RequestCode;
+import com.alibaba.rocketmq.common.protocol.ResponseCode;
+import com.alibaba.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
+import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import com.alibaba.rocketmq.common.protocol.header.SendMessageResponseHeader;
+import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
+import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
+import com.alibaba.rocketmq.common.sysflag.TopicSysFlag;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import com.alibaba.rocketmq.store.MessageExtBrokerInner;
+import com.alibaba.rocketmq.store.PutMessageResult;
+import com.alibaba.rocketmq.store.config.StorePathConfigHelper;
+import com.alibaba.rocketmq.store.stats.BrokerStatsManager;
+import io.netty.channel.ChannelHandlerContext;
+
+import java.net.SocketAddress;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
+
+ private List<ConsumeMessageHook> consumeMessageHookList;
+
+ public SendMessageProcessor(final BrokerController brokerController) {
+ super(brokerController);
+ }
+
+ @Override
+ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ SendMessageContext mqtraceContext;
+ switch (request.getCode()) {
+ case RequestCode.CONSUMER_SEND_MSG_BACK:
+ return this.consumerSendMsgBack(ctx, request);
+ default:
+ SendMessageRequestHeader requestHeader = parseRequestHeader(request);
+ if (requestHeader == null) {
+ return null;
+ }
+
+ mqtraceContext = buildMsgContext(ctx, requestHeader);
+ this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
+ final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
+
+ this.executeSendMessageHookAfter(response, mqtraceContext);
+ return response;
+ }
+ }
+
+ @Override
+ public boolean rejectRequest() {
+ return this.brokerController.getMessageStore().isOSPageCacheBusy() ||
+ this.brokerController.getMessageStore().isTransientStorePoolDeficient();
+ }
+
+ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
+ throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ final ConsumerSendMsgBackRequestHeader requestHeader =
+ (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
+
+ if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
+
+ ConsumeMessageContext context = new ConsumeMessageContext();
+ context.setConsumerGroup(requestHeader.getGroup());
+ context.setTopic(requestHeader.getOriginTopic());
+ context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);
+ context.setCommercialRcvTimes(1);
+ context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));
+
+ this.executeConsumeMessageHookAfter(context);
+ }
+
+
+ SubscriptionGroupConfig subscriptionGroupConfig =
+ this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
+ if (null == subscriptionGroupConfig) {
+ response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
+ response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
+ + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
+ return response;
+ }
+
+
+ if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
+ response.setCode(ResponseCode.NO_PERMISSION);
+ response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
+ return response;
+ }
+
+
+ if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
+ int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
+
+
+ int topicSysFlag = 0;
+ if (requestHeader.isUnitMode()) {
+ topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
+ }
+
+
+ TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(//
+ newTopic, //
+ subscriptionGroupConfig.getRetryQueueNums(), //
+ PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
+ if (null == topicConfig) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("topic[" + newTopic + "] not exist");
+ return response;
+ }
+
+
+ if (!PermName.isWriteable(topicConfig.getPerm())) {
+ response.setCode(ResponseCode.NO_PERMISSION);
+ response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
+ return response;
+ }
+
+ MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
+ if (null == msgExt) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("look message by offset failed, " + requestHeader.getOffset());
+ return response;
+ }
+
+
+ final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
+ if (null == retryTopic) {
+ MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
+ }
+ msgExt.setWaitStoreMsgOK(false);
+
+
+ int delayLevel = requestHeader.getDelayLevel();
+
+
+ int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
+ if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
+ maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
+ }
+
+
+ if (msgExt.getReconsumeTimes() >= maxReconsumeTimes//
+ || delayLevel < 0) {
+ newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
+ queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
+
+ topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
+ DLQ_NUMS_PER_GROUP, //
+ PermName.PERM_WRITE, 0
+ );
+ if (null == topicConfig) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("topic[" + newTopic + "] not exist");
+ return response;
+ }
+ } else {
+ if (0 == delayLevel) {
+ delayLevel = 3 + msgExt.getReconsumeTimes();
+ }
+
+ msgExt.setDelayTimeLevel(delayLevel);
+ }
+
+ MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+ msgInner.setTopic(newTopic);
+ msgInner.setBody(msgExt.getBody());
+ msgInner.setFlag(msgExt.getFlag());
+ MessageAccessor.setProperties(msgInner, msgExt.getProperties());
+ msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
+ msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
+
+ msgInner.setQueueId(queueIdInt);
+ msgInner.setSysFlag(msgExt.getSysFlag());
+ msgInner.setBornTimestamp(msgExt.getBornTimestamp());
+ msgInner.setBornHost(msgExt.getBornHost());
+ msgInner.setStoreHost(this.getStoreHost());
+ msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
+
+ String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
+ MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
+
+ PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
+ if (putMessageResult != null) {
+ switch (putMessageResult.getPutMessageStatus()) {
+ case PUT_OK:
+ String backTopic = msgExt.getTopic();
+ String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
+ if (correctTopic != null) {
+ backTopic = correctTopic;
+ }
+
+ this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
+
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+
+ return response;
+ default:
+ break;
+ }
+
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(putMessageResult.getPutMessageStatus().name());
+ return response;
+ }
+
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("putMessageResult is null");
+ return response;
+ }
+
+ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
+ final RemotingCommand request, //
+ final SendMessageContext sendMessageContext, //
+ final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
+
+ final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
+ final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
+
+
+ response.setOpaque(request.getOpaque());
+
+ response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
+ response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
+
+ if (log.isDebugEnabled()) {
+ log.debug("receive SendMessage request command, " + request);
+ }
+
+ final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
+ if (this.brokerController.getMessageStore().now() < startTimstamp) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
+ return response;
+ }
+
+ response.setCode(-1);
+ super.msgCheck(ctx, requestHeader, response);
+ if (response.getCode() != -1) {
+ return response;
+ }
+
+ final byte[] body = request.getBody();
+
+ int queueIdInt = requestHeader.getQueueId();
+ TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
+
+ if (queueIdInt < 0) {
+ queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
+ }
+
+ int sysFlag = requestHeader.getSysFlag();
+
+ if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
+ sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
+ }
+
+ String newTopic = requestHeader.getTopic();
+ if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
+ SubscriptionGroupConfig subscriptionGroupConfig =
+ this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
+ if (null == subscriptionGroupConfig) {
+ response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
+ response.setRemark(
+ "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
+ return response;
+ }
+
+
+ int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
+ if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
+ maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
+ }
+ int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
+ if (reconsumeTimes >= maxReconsumeTimes) {
+ newTopic = MixAll.getDLQTopic(groupName);
+ queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
+ topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
+ DLQ_NUMS_PER_GROUP, //
+ PermName.PERM_WRITE, 0
+ );
+ if (null == topicConfig) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("topic[" + newTopic + "] not exist");
+ return response;
+ }
+ }
+ }
+ MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+ msgInner.setTopic(newTopic);
+ msgInner.setBody(body);
+ msgInner.setFlag(requestHeader.getFlag());
+ MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
+ msgInner.setPropertiesString(requestHeader.getProperties());
+ msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
+
+ msgInner.setQueueId(queueIdInt);
+ msgInner.setSysFlag(sysFlag);
+ msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
+ msgInner.setBornHost(ctx.channel().remoteAddress());
+ msgInner.setStoreHost(this.getStoreHost());
+ msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
+
+ if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
+ String traFlag = msgInner.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
+ if (traFlag != null) {
+ response.setCode(ResponseCode.NO_PERMISSION);
+ response.setRemark(
+ "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
+ return response;
+ }
+ }
+
+ PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
+ if (putMessageResult != null) {
+ boolean sendOK = false;
+
+ switch (putMessageResult.getPutMessageStatus()) {
+ // Success
+ case PUT_OK:
+ sendOK = true;
+ response.setCode(ResponseCode.SUCCESS);
+ break;
+ case FLUSH_DISK_TIMEOUT:
+ response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
+ sendOK = true;
+ break;
+ case FLUSH_SLAVE_TIMEOUT:
+ response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
+ sendOK = true;
+ break;
+ case SLAVE_NOT_AVAILABLE:
+ response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
+ sendOK = true;
+ break;
+
+ // Failed
+ case CREATE_MAPEDFILE_FAILED:
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("create mapped file failed, server is busy or broken.");
+ break;
+ case MESSAGE_ILLEGAL:
+ case PROPERTIES_SIZE_EXCEEDED:
+ response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+ response.setRemark(
+ "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
+ break;
+ case SERVICE_NOT_AVAILABLE:
+ response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
+ response.setRemark(
+ "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");
+ break;
+ case OS_PAGECACHE_BUSY:
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
+ break;
+ case UNKNOWN_ERROR:
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("UNKNOWN_ERROR");
+ break;
+ default:
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("UNKNOWN_ERROR DEFAULT");
+ break;
+ }
+
+ String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
+ if (sendOK) {
+
+ this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
+ this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
+ putMessageResult.getAppendMessageResult().getWroteBytes());
+ this.brokerController.getBrokerStatsManager().incBrokerPutNums();
+
+ response.setRemark(null);
+
+ responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
+ responseHeader.setQueueId(queueIdInt);
+ responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
+
+
+ doResponse(ctx, request, response);
+
+
+ if (hasSendMessageHook()) {
+ sendMessageContext.setMsgId(responseHeader.getMsgId());
+ sendMessageContext.setQueueId(responseHeader.getQueueId());
+ sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
+
+ int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
+ int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
+
+ sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
+ sendMessageContext.setCommercialSendTimes(incValue);
+ sendMessageContext.setCommercialSendSize(wroteSize);
+ sendMessageContext.setCommercialOwner(owner);
+ }
+ return null;
+ } else {
+ if (hasSendMessageHook()) {
+ int wroteSize = request.getBody().length;
+ int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
+
+ sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
+ sendMessageContext.setCommercialSendTimes(incValue);
+ sendMessageContext.setCommercialSendSize(wroteSize);
+ sendMessageContext.setCommercialOwner(owner);
+ }
+ }
+ } else {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("store putMessage return null");
+ }
+
+ return response;
+ }
+
+ public boolean hasConsumeMessageHook() {
+ return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty();
+ }
+
+ public void executeConsumeMessageHookAfter(final ConsumeMessageContext context) {
+ if (hasConsumeMessageHook()) {
+ for (ConsumeMessageHook hook : this.consumeMessageHookList) {
+ try {
+ hook.consumeMessageAfter(context);
+ } catch (Throwable e) {
+ }
+ }
+ }
+ }
+
+ public SocketAddress getStoreHost() {
+ return storeHost;
+ }
+
+ private String diskUtil() {
+ String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
+ double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
+
+ String storePathLogis =
+ StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
+ double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogis);
+
+ String storePathIndex =
+ StorePathConfigHelper.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
+ double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathIndex);
+
+ return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", physicRatio, logisRatio, indexRatio);
+ }
+
+ public void registerConsumeMessageHook(List<ConsumeMessageHook> consumeMessageHookList) {
+ this.consumeMessageHookList = consumeMessageHookList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/slave/SlaveSynchronize.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/slave/SlaveSynchronize.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/slave/SlaveSynchronize.java
new file mode 100644
index 0000000..8b3aefe
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/slave/SlaveSynchronize.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.slave;
+
+import com.alibaba.rocketmq.broker.BrokerController;
+import com.alibaba.rocketmq.broker.subscription.SubscriptionGroupManager;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper;
+import com.alibaba.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
+import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import com.alibaba.rocketmq.store.config.StorePathConfigHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+
+/**
+ * @author shijia.wxr
+ * @author manhong.yqd
+ */
+public class SlaveSynchronize {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private final BrokerController brokerController;
+ private volatile String masterAddr = null;
+
+
+ public SlaveSynchronize(BrokerController brokerController) {
+ this.brokerController = brokerController;
+ }
+
+
+ public String getMasterAddr() {
+ return masterAddr;
+ }
+
+
+ public void setMasterAddr(String masterAddr) {
+ this.masterAddr = masterAddr;
+ }
+
+
+ public void syncAll() {
+ this.syncTopicConfig();
+ this.syncConsumerOffset();
+ this.syncDelayOffset();
+ this.syncSubscriptionGroupConfig();
+ }
+
+
+ private void syncTopicConfig() {
+ String masterAddrBak = this.masterAddr;
+ if (masterAddrBak != null) {
+ try {
+ TopicConfigSerializeWrapper topicWrapper =
+ this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
+ if (!this.brokerController.getTopicConfigManager().getDataVersion()
+ .equals(topicWrapper.getDataVersion())) {
+
+ this.brokerController.getTopicConfigManager().getDataVersion()
+ .assignNewOne(topicWrapper.getDataVersion());
+ this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();
+ this.brokerController.getTopicConfigManager().getTopicConfigTable()
+ .putAll(topicWrapper.getTopicConfigTable());
+ this.brokerController.getTopicConfigManager().persist();
+
+ log.info("update slave topic config from master, {}", masterAddrBak);
+ }
+ } catch (Exception e) {
+ log.error("syncTopicConfig Exception, " + masterAddrBak, e);
+ }
+ }
+ }
+
+
+ private void syncConsumerOffset() {
+ String masterAddrBak = this.masterAddr;
+ if (masterAddrBak != null) {
+ try {
+ ConsumerOffsetSerializeWrapper offsetWrapper =
+ this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
+ this.brokerController.getConsumerOffsetManager().getOffsetTable()
+ .putAll(offsetWrapper.getOffsetTable());
+ this.brokerController.getConsumerOffsetManager().persist();
+ log.info("update slave consumer offset from master, {}", masterAddrBak);
+ } catch (Exception e) {
+ log.error("syncConsumerOffset Exception, " + masterAddrBak, e);
+ }
+ }
+ }
+
+
+ private void syncDelayOffset() {
+ String masterAddrBak = this.masterAddr;
+ if (masterAddrBak != null) {
+ try {
+ String delayOffset =
+ this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
+ if (delayOffset != null) {
+
+ String fileName =
+ StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
+ .getMessageStoreConfig().getStorePathRootDir());
+ try {
+ MixAll.string2File(delayOffset, fileName);
+ } catch (IOException e) {
+ log.error("persist file Exception, " + fileName, e);
+ }
+ }
+ log.info("update slave delay offset from master, {}", masterAddrBak);
+ } catch (Exception e) {
+ log.error("syncDelayOffset Exception, " + masterAddrBak, e);
+ }
+ }
+ }
+
+
+ private void syncSubscriptionGroupConfig() {
+ String masterAddrBak = this.masterAddr;
+ if (masterAddrBak != null) {
+ try {
+ SubscriptionGroupWrapper subscriptionWrapper =
+ this.brokerController.getBrokerOuterAPI()
+ .getAllSubscriptionGroupConfig(masterAddrBak);
+
+ if (!this.brokerController.getSubscriptionGroupManager().getDataVersion()
+ .equals(subscriptionWrapper.getDataVersion())) {
+ SubscriptionGroupManager subscriptionGroupManager =
+ this.brokerController.getSubscriptionGroupManager();
+ subscriptionGroupManager.getDataVersion().assignNewOne(
+ subscriptionWrapper.getDataVersion());
+ subscriptionGroupManager.getSubscriptionGroupTable().clear();
+ subscriptionGroupManager.getSubscriptionGroupTable().putAll(
+ subscriptionWrapper.getSubscriptionGroupTable());
+ subscriptionGroupManager.persist();
+ log.info("update slave Subscription Group from master, {}", masterAddrBak);
+ }
+ } catch (Exception e) {
+ log.error("syncSubscriptionGroup Exception, " + masterAddrBak, e);
+ }
+ }
+ }
+}