You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 02:44:42 UTC
[46/58] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/processor/EndTransactionProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/EndTransactionProcessor.java b/broker/src/main/java/com/alibaba/rocketmq/broker/processor/EndTransactionProcessor.java
deleted file mode 100644
index ed10f1b..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/EndTransactionProcessor.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.processor;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.common.TopicFilterType;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.common.message.MessageAccessor;
-import com.alibaba.rocketmq.common.message.MessageConst;
-import com.alibaba.rocketmq.common.message.MessageDecoder;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.protocol.ResponseCode;
-import com.alibaba.rocketmq.common.protocol.header.EndTransactionRequestHeader;
-import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
-import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
-import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
-import com.alibaba.rocketmq.store.MessageExtBrokerInner;
-import com.alibaba.rocketmq.store.MessageStore;
-import com.alibaba.rocketmq.store.PutMessageResult;
-import io.netty.channel.ChannelHandlerContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * @author shijia.wxr
- */
-public class EndTransactionProcessor implements NettyRequestProcessor {
- private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
- private final BrokerController brokerController;
-
- public EndTransactionProcessor(final BrokerController brokerController) {
- this.brokerController = brokerController;
- }
-
- @Override
- public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
- final RemotingCommand response = RemotingCommand.createResponseCommand(null);
- final EndTransactionRequestHeader requestHeader =
- (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
-
-
- if (requestHeader.getFromTransactionCheck()) {
- switch (requestHeader.getCommitOrRollback()) {
- case MessageSysFlag.TRANSACTION_NOT_TYPE: {
- LOGGER.warn("check producer[{}] transaction state, but it's pending status."
- + "RequestHeader: {} Remark: {}",
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
- requestHeader.toString(),
- request.getRemark());
- return null;
- }
-
- case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
- LOGGER.warn("check producer[{}] transaction state, the producer commit the message."
- + "RequestHeader: {} Remark: {}",
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
- requestHeader.toString(),
- request.getRemark());
-
- break;
- }
-
- case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
- LOGGER.warn("check producer[{}] transaction state, the producer rollback the message."
- + "RequestHeader: {} Remark: {}",
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
- requestHeader.toString(),
- request.getRemark());
- break;
- }
- default:
- return null;
- }
- } else {
- switch (requestHeader.getCommitOrRollback()) {
- case MessageSysFlag.TRANSACTION_NOT_TYPE: {
- LOGGER.warn("the producer[{}] end transaction in sending message, and it's pending status."
- + "RequestHeader: {} Remark: {}",
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
- requestHeader.toString(),
- request.getRemark());
- return null;
- }
-
- case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
- break;
- }
-
- case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
- LOGGER.warn("the producer[{}] end transaction in sending message, rollback the message."
- + "RequestHeader: {} Remark: {}",
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
- requestHeader.toString(),
- request.getRemark());
- break;
- }
- default:
- return null;
- }
- }
-
- final MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getCommitLogOffset());
- if (msgExt != null) {
- final String pgroupRead = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
- if (!pgroupRead.equals(requestHeader.getProducerGroup())) {
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("the producer group wrong");
- return response;
- }
-
- if (msgExt.getQueueOffset() != requestHeader.getTranStateTableOffset()) {
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("the transaction state table offset wrong");
- return response;
- }
-
- if (msgExt.getCommitLogOffset() != requestHeader.getCommitLogOffset()) {
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("the commit log offset wrong");
- return response;
- }
-
- MessageExtBrokerInner msgInner = this.endMessageTransaction(msgExt);
- msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
-
- msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
- msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
- msgInner.setStoreTimestamp(msgExt.getStoreTimestamp());
- if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
- msgInner.setBody(null);
- }
-
- final MessageStore messageStore = this.brokerController.getMessageStore();
- final PutMessageResult putMessageResult = messageStore.putMessage(msgInner);
- if (putMessageResult != null) {
- switch (putMessageResult.getPutMessageStatus()) {
- // Success
- case PUT_OK:
- case FLUSH_DISK_TIMEOUT:
- case FLUSH_SLAVE_TIMEOUT:
- case SLAVE_NOT_AVAILABLE:
- response.setCode(ResponseCode.SUCCESS);
- response.setRemark(null);
- break;
- // Failed
- case CREATE_MAPEDFILE_FAILED:
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("create maped file failed.");
- break;
- case MESSAGE_ILLEGAL:
- case PROPERTIES_SIZE_EXCEEDED:
- response.setCode(ResponseCode.MESSAGE_ILLEGAL);
- response.setRemark("the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
- break;
- case SERVICE_NOT_AVAILABLE:
- response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
- response.setRemark("service not available now.");
- break;
- case OS_PAGECACHE_BUSY:
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("OS page cache busy, please try another machine");
- break;
- case UNKNOWN_ERROR:
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("UNKNOWN_ERROR");
- break;
- default:
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("UNKNOWN_ERROR DEFAULT");
- break;
- }
-
- return response;
- } else {
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("store putMessage return null");
- }
- } else {
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("find prepared transaction message failed");
- return response;
- }
-
- return response;
- }
-
- @Override
- public boolean rejectRequest() {
- return false;
- }
-
- private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {
- MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
- msgInner.setBody(msgExt.getBody());
- msgInner.setFlag(msgExt.getFlag());
- MessageAccessor.setProperties(msgInner, msgExt.getProperties());
-
- TopicFilterType topicFilterType =
- (msgInner.getSysFlag() & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG
- : TopicFilterType.SINGLE_TAG;
- long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
- msgInner.setTagsCode(tagsCodeValue);
- msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
-
- msgInner.setSysFlag(msgExt.getSysFlag());
- msgInner.setBornTimestamp(msgExt.getBornTimestamp());
- msgInner.setBornHost(msgExt.getBornHost());
- msgInner.setStoreHost(msgExt.getStoreHost());
- msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
-
- msgInner.setWaitStoreMsgOK(false);
- MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
-
- msgInner.setTopic(msgExt.getTopic());
- msgInner.setQueueId(msgExt.getQueueId());
-
- return msgInner;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/processor/ForwardRequestProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/ForwardRequestProcessor.java b/broker/src/main/java/com/alibaba/rocketmq/broker/processor/ForwardRequestProcessor.java
deleted file mode 100644
index a92ead0..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/ForwardRequestProcessor.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.processor;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
-import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
-import io.netty.channel.ChannelHandlerContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * @author shijia.wxr
- */
-public class ForwardRequestProcessor implements NettyRequestProcessor {
- private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-
- private final BrokerController brokerController;
-
-
- public ForwardRequestProcessor(final BrokerController brokerController) {
- this.brokerController = brokerController;
- }
-
-
- @Override
- public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
- return null;
- }
-
- @Override
- public boolean rejectRequest() {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java
deleted file mode 100644
index 1257f18..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java
+++ /dev/null
@@ -1,542 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.processor;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.broker.client.ConsumerGroupInfo;
-import com.alibaba.rocketmq.broker.longpolling.PullRequest;
-import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageContext;
-import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageHook;
-import com.alibaba.rocketmq.broker.pagecache.ManyMessageTransfer;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.TopicConfig;
-import com.alibaba.rocketmq.common.TopicFilterType;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.common.constant.PermName;
-import com.alibaba.rocketmq.common.filter.FilterAPI;
-import com.alibaba.rocketmq.common.help.FAQUrl;
-import com.alibaba.rocketmq.common.message.MessageDecoder;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.protocol.ResponseCode;
-import com.alibaba.rocketmq.common.protocol.header.PullMessageRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.PullMessageResponseHeader;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import com.alibaba.rocketmq.common.protocol.topic.OffsetMovedEvent;
-import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
-import com.alibaba.rocketmq.common.sysflag.PullSysFlag;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-import com.alibaba.rocketmq.remoting.common.RemotingUtil;
-import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
-import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
-import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
-import com.alibaba.rocketmq.store.GetMessageResult;
-import com.alibaba.rocketmq.store.MessageExtBrokerInner;
-import com.alibaba.rocketmq.store.PutMessageResult;
-import com.alibaba.rocketmq.store.config.BrokerRole;
-import com.alibaba.rocketmq.store.stats.BrokerStatsManager;
-import io.netty.channel.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public class PullMessageProcessor implements NettyRequestProcessor {
- private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- private final BrokerController brokerController;
- private List<ConsumeMessageHook> consumeMessageHookList;
-
- public PullMessageProcessor(final BrokerController brokerController) {
- this.brokerController = brokerController;
- }
-
- @Override
- public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
- return this.processRequest(ctx.channel(), request, true);
- }
-
- @Override
- public boolean rejectRequest() {
- return false;
- }
-
- private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
- throws RemotingCommandException {
- RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
- final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
- final PullMessageRequestHeader requestHeader =
- (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
-
-
- response.setOpaque(request.getOpaque());
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("receive PullMessage request command, " + request);
- }
-
-
- if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
- response.setCode(ResponseCode.NO_PERMISSION);
- response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] pulling message is forbidden");
- return response;
- }
-
-
- SubscriptionGroupConfig subscriptionGroupConfig =
- this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
- if (null == subscriptionGroupConfig) {
- response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
- response.setRemark("subscription group not exist, " + requestHeader.getConsumerGroup() + " "
- + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
- return response;
- }
-
-
- if (!subscriptionGroupConfig.isConsumeEnable()) {
- response.setCode(ResponseCode.NO_PERMISSION);
- response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
- return response;
- }
-
- final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
- final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
- final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
-
- final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
-
-
- TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
- if (null == topicConfig) {
- LOG.error("the topic " + requestHeader.getTopic() + " not exist, consumer: " + RemotingHelper.parseChannelRemoteAddr(channel));
- response.setCode(ResponseCode.TOPIC_NOT_EXIST);
- response.setRemark(
- "topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
- return response;
- }
-
-
- if (!PermName.isReadable(topicConfig.getPerm())) {
- response.setCode(ResponseCode.NO_PERMISSION);
- response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
- return response;
- }
-
-
- if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
- String errorInfo = "queueId[" + requestHeader.getQueueId() + "] is illagal,Topic :" + requestHeader.getTopic()
- + " topicConfig.readQueueNums: " + topicConfig.getReadQueueNums() + " consumer: " + channel.remoteAddress();
- LOG.warn(errorInfo);
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark(errorInfo);
- return response;
- }
-
-
- SubscriptionData subscriptionData = null;
- if (hasSubscriptionFlag) {
- try {
- subscriptionData = FilterAPI.buildSubscriptionData(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
- requestHeader.getSubscription());
- } catch (Exception e) {
- LOG.warn("parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), //
- requestHeader.getConsumerGroup());
- response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
- response.setRemark("parse the consumer's subscription failed");
- return response;
- }
- } else {
- ConsumerGroupInfo consumerGroupInfo =
- this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
- if (null == consumerGroupInfo) {
- LOG.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
- response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
- response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
- return response;
- }
-
- if (!subscriptionGroupConfig.isConsumeBroadcastEnable() //
- && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
- response.setCode(ResponseCode.NO_PERMISSION);
- response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
- return response;
- }
-
- subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
- if (null == subscriptionData) {
- LOG.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
- response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
- response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
- return response;
- }
-
-
- if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
- LOG.warn("the broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
- subscriptionData.getSubString());
- response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
- response.setRemark("the consumer's subscription not latest");
- return response;
- }
- }
-
- final GetMessageResult getMessageResult =
- this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
- requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData);
- if (getMessageResult != null) {
- response.setRemark(getMessageResult.getStatus().name());
- responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
- responseHeader.setMinOffset(getMessageResult.getMinOffset());
- responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
-
-
- if (getMessageResult.isSuggestPullingFromSlave()) {
- responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
- } else {
- responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
- }
-
- switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
- case ASYNC_MASTER:
- case SYNC_MASTER:
- break;
- case SLAVE:
- if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
- response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
- responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
- }
- break;
- }
-
- if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
- // consume too slow ,redirect to another machine
- if (getMessageResult.isSuggestPullingFromSlave()) {
- responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
- }
- // consume ok
- else {
- responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
- }
- } else {
- responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
- }
-
- switch (getMessageResult.getStatus()) {
- case FOUND:
- response.setCode(ResponseCode.SUCCESS);
- break;
- case MESSAGE_WAS_REMOVING:
- response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
- break;
- case NO_MATCHED_LOGIC_QUEUE:
- case NO_MESSAGE_IN_QUEUE:
- if (0 != requestHeader.getQueueOffset()) {
- response.setCode(ResponseCode.PULL_OFFSET_MOVED);
-
- // XXX: warn and notify me
- LOG.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", //
- requestHeader.getQueueOffset(), //
- getMessageResult.getNextBeginOffset(), //
- requestHeader.getTopic(), //
- requestHeader.getQueueId(), //
- requestHeader.getConsumerGroup()//
- );
- } else {
- response.setCode(ResponseCode.PULL_NOT_FOUND);
- }
- break;
- case NO_MATCHED_MESSAGE:
- response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
- break;
- case OFFSET_FOUND_NULL:
- response.setCode(ResponseCode.PULL_NOT_FOUND);
- break;
- case OFFSET_OVERFLOW_BADLY:
- response.setCode(ResponseCode.PULL_OFFSET_MOVED);
- // XXX: warn and notify me
- LOG.info("the request offset: " + requestHeader.getQueueOffset() + " over flow badly, broker max offset: "
- + getMessageResult.getMaxOffset() + ", consumer: " + channel.remoteAddress());
- break;
- case OFFSET_OVERFLOW_ONE:
- response.setCode(ResponseCode.PULL_NOT_FOUND);
- break;
- case OFFSET_TOO_SMALL:
- response.setCode(ResponseCode.PULL_OFFSET_MOVED);
- LOG.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
- requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
- getMessageResult.getMinOffset(), channel.remoteAddress());
- break;
- default:
- assert false;
- break;
- }
-
- if (this.hasConsumeMessageHook()) {
- ConsumeMessageContext context = new ConsumeMessageContext();
- context.setConsumerGroup(requestHeader.getConsumerGroup());
- context.setTopic(requestHeader.getTopic());
- context.setQueueId(requestHeader.getQueueId());
-
- String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
-
- switch (response.getCode()) {
- case ResponseCode.SUCCESS:
- int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
- int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount;
-
- context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);
- context.setCommercialRcvTimes(incValue);
- context.setCommercialRcvSize(getMessageResult.getBufferTotalSize());
- context.setCommercialOwner(owner);
-
- break;
- case ResponseCode.PULL_NOT_FOUND:
- if (!brokerAllowSuspend) {
-
-
- context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
- context.setCommercialRcvTimes(1);
- context.setCommercialOwner(owner);
-
- }
- break;
- case ResponseCode.PULL_RETRY_IMMEDIATELY:
- case ResponseCode.PULL_OFFSET_MOVED:
- context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
- context.setCommercialRcvTimes(1);
- context.setCommercialOwner(owner);
- break;
- default:
- assert false;
- break;
- }
-
- this.executeConsumeMessageHookBefore(context);
- }
-
- switch (response.getCode()) {
- case ResponseCode.SUCCESS:
-
- this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
- getMessageResult.getMessageCount());
-
- this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
- getMessageResult.getBufferTotalSize());
-
- this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
- if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
- final long beginTimeMills = this.brokerController.getMessageStore().now();
- final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
- this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
- requestHeader.getTopic(), requestHeader.getQueueId(),
- (int) (this.brokerController.getMessageStore().now() - beginTimeMills));
- response.setBody(r);
- } else {
- try {
- FileRegion fileRegion =
- new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
- channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- getMessageResult.release();
- if (!future.isSuccess()) {
- LOG.error("transfer many message by pagecache failed, " + channel.remoteAddress(), future.cause());
- }
- }
- });
- } catch (Throwable e) {
- LOG.error("transfer many message by pagecache exception", e);
- getMessageResult.release();
- }
-
- response = null;
- }
- break;
- case ResponseCode.PULL_NOT_FOUND:
-
- if (brokerAllowSuspend && hasSuspendFlag) {
- long pollingTimeMills = suspendTimeoutMillisLong;
- if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
- pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
- }
-
- String topic = requestHeader.getTopic();
- long offset = requestHeader.getQueueOffset();
- int queueId = requestHeader.getQueueId();
- PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
- this.brokerController.getMessageStore().now(), offset, subscriptionData);
- this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
- response = null;
- break;
- }
-
-
- case ResponseCode.PULL_RETRY_IMMEDIATELY:
- break;
- case ResponseCode.PULL_OFFSET_MOVED:
- if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE
- || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
- MessageQueue mq = new MessageQueue();
- mq.setTopic(requestHeader.getTopic());
- mq.setQueueId(requestHeader.getQueueId());
- mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
-
- OffsetMovedEvent event = new OffsetMovedEvent();
- event.setConsumerGroup(requestHeader.getConsumerGroup());
- event.setMessageQueue(mq);
- event.setOffsetRequest(requestHeader.getQueueOffset());
- event.setOffsetNew(getMessageResult.getNextBeginOffset());
- this.generateOffsetMovedEvent(event);
- LOG.warn(
- "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
- requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
- responseHeader.getSuggestWhichBrokerId());
- } else {
- responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
- response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
- LOG.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
- requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
- responseHeader.getSuggestWhichBrokerId());
- }
-
- break;
- default:
- assert false;
- }
- } else {
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("store getMessage return null");
- }
-
-
- boolean storeOffsetEnable = brokerAllowSuspend;
- storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
- storeOffsetEnable = storeOffsetEnable
- && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
- if (storeOffsetEnable) {
- this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
- requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
- }
- return response;
- }
-
-
- public boolean hasConsumeMessageHook() {
- return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty();
- }
-
- public void executeConsumeMessageHookBefore(final ConsumeMessageContext context) {
- if (hasConsumeMessageHook()) {
- for (ConsumeMessageHook hook : this.consumeMessageHookList) {
- try {
- hook.consumeMessageBefore(context);
- } catch (Throwable e) {
- }
- }
- }
- }
-
- private byte[] readGetMessageResult(final GetMessageResult getMessageResult, final String group, final String topic, final int queueId) {
- final ByteBuffer byteBuffer = ByteBuffer.allocate(getMessageResult.getBufferTotalSize());
-
- long storeTimestamp = 0;
- try {
- List<ByteBuffer> messageBufferList = getMessageResult.getMessageBufferList();
- for (ByteBuffer bb : messageBufferList) {
-
- byteBuffer.put(bb);
- storeTimestamp = bb.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
- }
- } finally {
- getMessageResult.release();
- }
-
- this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId, this.brokerController.getMessageStore().now() - storeTimestamp);
- return byteBuffer.array();
- }
-
- private void generateOffsetMovedEvent(final OffsetMovedEvent event) {
- try {
- MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
- msgInner.setTopic(MixAll.OFFSET_MOVED_EVENT);
- msgInner.setTags(event.getConsumerGroup());
- msgInner.setDelayTimeLevel(0);
- msgInner.setKeys(event.getConsumerGroup());
- msgInner.setBody(event.encode());
- msgInner.setFlag(0);
- msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
- msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(TopicFilterType.SINGLE_TAG, msgInner.getTags()));
-
- msgInner.setQueueId(0);
- msgInner.setSysFlag(0);
- msgInner.setBornTimestamp(System.currentTimeMillis());
- msgInner.setBornHost(RemotingUtil.string2SocketAddress(this.brokerController.getBrokerAddr()));
- msgInner.setStoreHost(msgInner.getBornHost());
-
- msgInner.setReconsumeTimes(0);
-
- PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
- } catch (Exception e) {
- LOG.warn(String.format("generateOffsetMovedEvent Exception, %s", event.toString()), e);
- }
- }
-
- public void excuteRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException {
- Runnable run = new Runnable() {
- @Override
- public void run() {
- try {
- final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);
-
- if (response != null) {
- response.setOpaque(request.getOpaque());
- response.markResponseType();
- try {
- channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- LOG.error("processRequestWrapper response to " + future.channel().remoteAddress() + " failed",
- future.cause());
- LOG.error(request.toString());
- LOG.error(response.toString());
- }
- }
- });
- } catch (Throwable e) {
- LOG.error("processRequestWrapper process request over, but response failed", e);
- LOG.error(request.toString());
- LOG.error(response.toString());
- }
- }
- } catch (RemotingCommandException e1) {
- LOG.error("excuteRequestWhenWakeup run", e1);
- }
- }
- };
-
- this.brokerController.getPullMessageExecutor().submit(run);
- }
-
- public void registerConsumeMessageHook(List<ConsumeMessageHook> sendMessageHookList) {
- this.consumeMessageHookList = sendMessageHookList;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/processor/QueryMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/QueryMessageProcessor.java b/broker/src/main/java/com/alibaba/rocketmq/broker/processor/QueryMessageProcessor.java
deleted file mode 100644
index 738d11f..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/QueryMessageProcessor.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.processor;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.broker.pagecache.OneMessageTransfer;
-import com.alibaba.rocketmq.broker.pagecache.QueryMessageTransfer;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.common.protocol.RequestCode;
-import com.alibaba.rocketmq.common.protocol.ResponseCode;
-import com.alibaba.rocketmq.common.protocol.header.QueryMessageRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.QueryMessageResponseHeader;
-import com.alibaba.rocketmq.common.protocol.header.ViewMessageRequestHeader;
-import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
-import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
-import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
-import com.alibaba.rocketmq.store.QueryMessageResult;
-import com.alibaba.rocketmq.store.SelectMappedBufferResult;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.FileRegion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * @author shijia.wxr
- */
-public class QueryMessageProcessor implements NettyRequestProcessor {
- private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-
- private final BrokerController brokerController;
-
-
- public QueryMessageProcessor(final BrokerController brokerController) {
- this.brokerController = brokerController;
- }
-
-
- @Override
- public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
- throws RemotingCommandException {
- switch (request.getCode()) {
- case RequestCode.QUERY_MESSAGE:
- return this.queryMessage(ctx, request);
- case RequestCode.VIEW_MESSAGE_BY_ID:
- return this.viewMessageById(ctx, request);
- default:
- break;
- }
-
- return null;
- }
-
- @Override
- public boolean rejectRequest() {
- return false;
- }
-
-
- public RemotingCommand queryMessage(ChannelHandlerContext ctx, RemotingCommand request)
- throws RemotingCommandException {
- final RemotingCommand response =
- RemotingCommand.createResponseCommand(QueryMessageResponseHeader.class);
- final QueryMessageResponseHeader responseHeader =
- (QueryMessageResponseHeader) response.readCustomHeader();
- final QueryMessageRequestHeader requestHeader =
- (QueryMessageRequestHeader) request
- .decodeCommandCustomHeader(QueryMessageRequestHeader.class);
-
-
- response.setOpaque(request.getOpaque());
-
-
- String isUniqueKey = request.getExtFields().get(MixAll.UNIQUE_MSG_QUERY_FLAG);
- if (isUniqueKey != null && isUniqueKey.equals("true")) {
- requestHeader.setMaxNum(this.brokerController.getMessageStoreConfig().getDefaultQueryMaxNum());
- }
-
- final QueryMessageResult queryMessageResult =
- this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(),
- requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(),
- requestHeader.getEndTimestamp());
- assert queryMessageResult != null;
-
- responseHeader.setIndexLastUpdatePhyoffset(queryMessageResult.getIndexLastUpdatePhyoffset());
- responseHeader.setIndexLastUpdateTimestamp(queryMessageResult.getIndexLastUpdateTimestamp());
-
-
- if (queryMessageResult.getBufferTotalSize() > 0) {
- response.setCode(ResponseCode.SUCCESS);
- response.setRemark(null);
-
- try {
- FileRegion fileRegion =
- new QueryMessageTransfer(response.encodeHeader(queryMessageResult
- .getBufferTotalSize()), queryMessageResult);
- ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- queryMessageResult.release();
- if (!future.isSuccess()) {
- log.error("transfer query message by pagecache failed, ", future.cause());
- }
- }
- });
- } catch (Throwable e) {
- log.error("", e);
- queryMessageResult.release();
- }
-
- return null;
- }
-
- response.setCode(ResponseCode.QUERY_NOT_FOUND);
- response.setRemark("can not find message, maybe time range not correct");
- return response;
- }
-
-
- public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request)
- throws RemotingCommandException {
- final RemotingCommand response = RemotingCommand.createResponseCommand(null);
- final ViewMessageRequestHeader requestHeader =
- (ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class);
-
-
- response.setOpaque(request.getOpaque());
-
- final SelectMappedBufferResult selectMappedBufferResult =
- this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset());
- if (selectMappedBufferResult != null) {
- response.setCode(ResponseCode.SUCCESS);
- response.setRemark(null);
-
- try {
- FileRegion fileRegion =
- new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()),
- selectMappedBufferResult);
- ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- selectMappedBufferResult.release();
- if (!future.isSuccess()) {
- log.error("transfer one message by pagecache failed, ", future.cause());
- }
- }
- });
- } catch (Throwable e) {
- log.error("", e);
- selectMappedBufferResult.release();
- }
-
- return null;
- } else {
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("can not find message by the offset, " + requestHeader.getOffset());
- }
-
- return response;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java
deleted file mode 100644
index a375285..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java
+++ /dev/null
@@ -1,497 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.processor;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageContext;
-import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageHook;
-import com.alibaba.rocketmq.broker.mqtrace.SendMessageContext;
-import com.alibaba.rocketmq.common.*;
-import com.alibaba.rocketmq.common.constant.PermName;
-import com.alibaba.rocketmq.common.help.FAQUrl;
-import com.alibaba.rocketmq.common.message.MessageAccessor;
-import com.alibaba.rocketmq.common.message.MessageConst;
-import com.alibaba.rocketmq.common.message.MessageDecoder;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.protocol.RequestCode;
-import com.alibaba.rocketmq.common.protocol.ResponseCode;
-import com.alibaba.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.SendMessageResponseHeader;
-import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
-import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
-import com.alibaba.rocketmq.common.sysflag.TopicSysFlag;
-import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
-import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
-import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
-import com.alibaba.rocketmq.store.MessageExtBrokerInner;
-import com.alibaba.rocketmq.store.PutMessageResult;
-import com.alibaba.rocketmq.store.config.StorePathConfigHelper;
-import com.alibaba.rocketmq.store.stats.BrokerStatsManager;
-import io.netty.channel.ChannelHandlerContext;
-
-import java.net.SocketAddress;
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
-
- private List<ConsumeMessageHook> consumeMessageHookList;
-
- public SendMessageProcessor(final BrokerController brokerController) {
- super(brokerController);
- }
-
- @Override
- public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
- SendMessageContext mqtraceContext;
- switch (request.getCode()) {
- case RequestCode.CONSUMER_SEND_MSG_BACK:
- return this.consumerSendMsgBack(ctx, request);
- default:
- SendMessageRequestHeader requestHeader = parseRequestHeader(request);
- if (requestHeader == null) {
- return null;
- }
-
- mqtraceContext = buildMsgContext(ctx, requestHeader);
- this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
- final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
-
- this.executeSendMessageHookAfter(response, mqtraceContext);
- return response;
- }
- }
-
- @Override
- public boolean rejectRequest() {
- return this.brokerController.getMessageStore().isOSPageCacheBusy() ||
- this.brokerController.getMessageStore().isTransientStorePoolDeficient();
- }
-
- private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
- throws RemotingCommandException {
- final RemotingCommand response = RemotingCommand.createResponseCommand(null);
- final ConsumerSendMsgBackRequestHeader requestHeader =
- (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
-
- if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
-
- ConsumeMessageContext context = new ConsumeMessageContext();
- context.setConsumerGroup(requestHeader.getGroup());
- context.setTopic(requestHeader.getOriginTopic());
- context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);
- context.setCommercialRcvTimes(1);
- context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));
-
- this.executeConsumeMessageHookAfter(context);
- }
-
-
- SubscriptionGroupConfig subscriptionGroupConfig =
- this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
- if (null == subscriptionGroupConfig) {
- response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
- response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
- + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
- return response;
- }
-
-
- if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
- response.setCode(ResponseCode.NO_PERMISSION);
- response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
- return response;
- }
-
-
- if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
- response.setCode(ResponseCode.SUCCESS);
- response.setRemark(null);
- return response;
- }
-
- String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
- int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
-
-
- int topicSysFlag = 0;
- if (requestHeader.isUnitMode()) {
- topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
- }
-
-
- TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(//
- newTopic, //
- subscriptionGroupConfig.getRetryQueueNums(), //
- PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
- if (null == topicConfig) {
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("topic[" + newTopic + "] not exist");
- return response;
- }
-
-
- if (!PermName.isWriteable(topicConfig.getPerm())) {
- response.setCode(ResponseCode.NO_PERMISSION);
- response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
- return response;
- }
-
- MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
- if (null == msgExt) {
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("look message by offset failed, " + requestHeader.getOffset());
- return response;
- }
-
-
- final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
- if (null == retryTopic) {
- MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
- }
- msgExt.setWaitStoreMsgOK(false);
-
-
- int delayLevel = requestHeader.getDelayLevel();
-
-
- int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
- if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
- maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
- }
-
-
- if (msgExt.getReconsumeTimes() >= maxReconsumeTimes//
- || delayLevel < 0) {
- newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
- queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
-
- topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
- DLQ_NUMS_PER_GROUP, //
- PermName.PERM_WRITE, 0
- );
- if (null == topicConfig) {
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("topic[" + newTopic + "] not exist");
- return response;
- }
- } else {
- if (0 == delayLevel) {
- delayLevel = 3 + msgExt.getReconsumeTimes();
- }
-
- msgExt.setDelayTimeLevel(delayLevel);
- }
-
- MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
- msgInner.setTopic(newTopic);
- msgInner.setBody(msgExt.getBody());
- msgInner.setFlag(msgExt.getFlag());
- MessageAccessor.setProperties(msgInner, msgExt.getProperties());
- msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
- msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
-
- msgInner.setQueueId(queueIdInt);
- msgInner.setSysFlag(msgExt.getSysFlag());
- msgInner.setBornTimestamp(msgExt.getBornTimestamp());
- msgInner.setBornHost(msgExt.getBornHost());
- msgInner.setStoreHost(this.getStoreHost());
- msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
-
- String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
- MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
-
- PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
- if (putMessageResult != null) {
- switch (putMessageResult.getPutMessageStatus()) {
- case PUT_OK:
- String backTopic = msgExt.getTopic();
- String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
- if (correctTopic != null) {
- backTopic = correctTopic;
- }
-
- this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
-
- response.setCode(ResponseCode.SUCCESS);
- response.setRemark(null);
-
- return response;
- default:
- break;
- }
-
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark(putMessageResult.getPutMessageStatus().name());
- return response;
- }
-
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("putMessageResult is null");
- return response;
- }
-
- private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
- final RemotingCommand request, //
- final SendMessageContext sendMessageContext, //
- final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
-
- final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
- final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
-
-
- response.setOpaque(request.getOpaque());
-
- response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
- response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
-
- if (log.isDebugEnabled()) {
- log.debug("receive SendMessage request command, " + request);
- }
-
- final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
- if (this.brokerController.getMessageStore().now() < startTimstamp) {
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
- return response;
- }
-
- response.setCode(-1);
- super.msgCheck(ctx, requestHeader, response);
- if (response.getCode() != -1) {
- return response;
- }
-
- final byte[] body = request.getBody();
-
- int queueIdInt = requestHeader.getQueueId();
- TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
-
- if (queueIdInt < 0) {
- queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
- }
-
- int sysFlag = requestHeader.getSysFlag();
-
- if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
- sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
- }
-
- String newTopic = requestHeader.getTopic();
- if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
- SubscriptionGroupConfig subscriptionGroupConfig =
- this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
- if (null == subscriptionGroupConfig) {
- response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
- response.setRemark(
- "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
- return response;
- }
-
-
- int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
- if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
- maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
- }
- int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
- if (reconsumeTimes >= maxReconsumeTimes) {
- newTopic = MixAll.getDLQTopic(groupName);
- queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
- topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
- DLQ_NUMS_PER_GROUP, //
- PermName.PERM_WRITE, 0
- );
- if (null == topicConfig) {
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("topic[" + newTopic + "] not exist");
- return response;
- }
- }
- }
- MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
- msgInner.setTopic(newTopic);
- msgInner.setBody(body);
- msgInner.setFlag(requestHeader.getFlag());
- MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
- msgInner.setPropertiesString(requestHeader.getProperties());
- msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
-
- msgInner.setQueueId(queueIdInt);
- msgInner.setSysFlag(sysFlag);
- msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
- msgInner.setBornHost(ctx.channel().remoteAddress());
- msgInner.setStoreHost(this.getStoreHost());
- msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
-
- if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
- String traFlag = msgInner.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
- if (traFlag != null) {
- response.setCode(ResponseCode.NO_PERMISSION);
- response.setRemark(
- "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
- return response;
- }
- }
-
- PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
- if (putMessageResult != null) {
- boolean sendOK = false;
-
- switch (putMessageResult.getPutMessageStatus()) {
- // Success
- case PUT_OK:
- sendOK = true;
- response.setCode(ResponseCode.SUCCESS);
- break;
- case FLUSH_DISK_TIMEOUT:
- response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
- sendOK = true;
- break;
- case FLUSH_SLAVE_TIMEOUT:
- response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
- sendOK = true;
- break;
- case SLAVE_NOT_AVAILABLE:
- response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
- sendOK = true;
- break;
-
- // Failed
- case CREATE_MAPEDFILE_FAILED:
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("create mapped file failed, server is busy or broken.");
- break;
- case MESSAGE_ILLEGAL:
- case PROPERTIES_SIZE_EXCEEDED:
- response.setCode(ResponseCode.MESSAGE_ILLEGAL);
- response.setRemark(
- "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
- break;
- case SERVICE_NOT_AVAILABLE:
- response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
- response.setRemark(
- "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");
- break;
- case OS_PAGECACHE_BUSY:
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
- break;
- case UNKNOWN_ERROR:
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("UNKNOWN_ERROR");
- break;
- default:
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("UNKNOWN_ERROR DEFAULT");
- break;
- }
-
- String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
- if (sendOK) {
-
- this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
- this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
- putMessageResult.getAppendMessageResult().getWroteBytes());
- this.brokerController.getBrokerStatsManager().incBrokerPutNums();
-
- response.setRemark(null);
-
- responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
- responseHeader.setQueueId(queueIdInt);
- responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
-
-
- doResponse(ctx, request, response);
-
-
- if (hasSendMessageHook()) {
- sendMessageContext.setMsgId(responseHeader.getMsgId());
- sendMessageContext.setQueueId(responseHeader.getQueueId());
- sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
-
- int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
- int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
- int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
-
- sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
- sendMessageContext.setCommercialSendTimes(incValue);
- sendMessageContext.setCommercialSendSize(wroteSize);
- sendMessageContext.setCommercialOwner(owner);
- }
- return null;
- } else {
- if (hasSendMessageHook()) {
- int wroteSize = request.getBody().length;
- int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
-
- sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
- sendMessageContext.setCommercialSendTimes(incValue);
- sendMessageContext.setCommercialSendSize(wroteSize);
- sendMessageContext.setCommercialOwner(owner);
- }
- }
- } else {
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("store putMessage return null");
- }
-
- return response;
- }
-
- public boolean hasConsumeMessageHook() {
- return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty();
- }
-
- public void executeConsumeMessageHookAfter(final ConsumeMessageContext context) {
- if (hasConsumeMessageHook()) {
- for (ConsumeMessageHook hook : this.consumeMessageHookList) {
- try {
- hook.consumeMessageAfter(context);
- } catch (Throwable e) {
- }
- }
- }
- }
-
- public SocketAddress getStoreHost() {
- return storeHost;
- }
-
- private String diskUtil() {
- String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
- double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
-
- String storePathLogis =
- StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
- double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogis);
-
- String storePathIndex =
- StorePathConfigHelper.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
- double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathIndex);
-
- return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", physicRatio, logisRatio, indexRatio);
- }
-
- public void registerConsumeMessageHook(List<ConsumeMessageHook> consumeMessageHookList) {
- this.consumeMessageHookList = consumeMessageHookList;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/slave/SlaveSynchronize.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/com/alibaba/rocketmq/broker/slave/SlaveSynchronize.java
deleted file mode 100644
index 8b3aefe..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/slave/SlaveSynchronize.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.slave;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.broker.subscription.SubscriptionGroupManager;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper;
-import com.alibaba.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
-import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
-import com.alibaba.rocketmq.store.config.StorePathConfigHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-
-/**
- * @author shijia.wxr
- * @author manhong.yqd
- */
-public class SlaveSynchronize {
- private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- private final BrokerController brokerController;
- private volatile String masterAddr = null;
-
-
- public SlaveSynchronize(BrokerController brokerController) {
- this.brokerController = brokerController;
- }
-
-
- public String getMasterAddr() {
- return masterAddr;
- }
-
-
- public void setMasterAddr(String masterAddr) {
- this.masterAddr = masterAddr;
- }
-
-
- public void syncAll() {
- this.syncTopicConfig();
- this.syncConsumerOffset();
- this.syncDelayOffset();
- this.syncSubscriptionGroupConfig();
- }
-
-
- private void syncTopicConfig() {
- String masterAddrBak = this.masterAddr;
- if (masterAddrBak != null) {
- try {
- TopicConfigSerializeWrapper topicWrapper =
- this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
- if (!this.brokerController.getTopicConfigManager().getDataVersion()
- .equals(topicWrapper.getDataVersion())) {
-
- this.brokerController.getTopicConfigManager().getDataVersion()
- .assignNewOne(topicWrapper.getDataVersion());
- this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();
- this.brokerController.getTopicConfigManager().getTopicConfigTable()
- .putAll(topicWrapper.getTopicConfigTable());
- this.brokerController.getTopicConfigManager().persist();
-
- log.info("update slave topic config from master, {}", masterAddrBak);
- }
- } catch (Exception e) {
- log.error("syncTopicConfig Exception, " + masterAddrBak, e);
- }
- }
- }
-
-
- private void syncConsumerOffset() {
- String masterAddrBak = this.masterAddr;
- if (masterAddrBak != null) {
- try {
- ConsumerOffsetSerializeWrapper offsetWrapper =
- this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
- this.brokerController.getConsumerOffsetManager().getOffsetTable()
- .putAll(offsetWrapper.getOffsetTable());
- this.brokerController.getConsumerOffsetManager().persist();
- log.info("update slave consumer offset from master, {}", masterAddrBak);
- } catch (Exception e) {
- log.error("syncConsumerOffset Exception, " + masterAddrBak, e);
- }
- }
- }
-
-
- private void syncDelayOffset() {
- String masterAddrBak = this.masterAddr;
- if (masterAddrBak != null) {
- try {
- String delayOffset =
- this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
- if (delayOffset != null) {
-
- String fileName =
- StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
- .getMessageStoreConfig().getStorePathRootDir());
- try {
- MixAll.string2File(delayOffset, fileName);
- } catch (IOException e) {
- log.error("persist file Exception, " + fileName, e);
- }
- }
- log.info("update slave delay offset from master, {}", masterAddrBak);
- } catch (Exception e) {
- log.error("syncDelayOffset Exception, " + masterAddrBak, e);
- }
- }
- }
-
-
- private void syncSubscriptionGroupConfig() {
- String masterAddrBak = this.masterAddr;
- if (masterAddrBak != null) {
- try {
- SubscriptionGroupWrapper subscriptionWrapper =
- this.brokerController.getBrokerOuterAPI()
- .getAllSubscriptionGroupConfig(masterAddrBak);
-
- if (!this.brokerController.getSubscriptionGroupManager().getDataVersion()
- .equals(subscriptionWrapper.getDataVersion())) {
- SubscriptionGroupManager subscriptionGroupManager =
- this.brokerController.getSubscriptionGroupManager();
- subscriptionGroupManager.getDataVersion().assignNewOne(
- subscriptionWrapper.getDataVersion());
- subscriptionGroupManager.getSubscriptionGroupTable().clear();
- subscriptionGroupManager.getSubscriptionGroupTable().putAll(
- subscriptionWrapper.getSubscriptionGroupTable());
- subscriptionGroupManager.persist();
- log.info("update slave Subscription Group from master, {}", masterAddrBak);
- }
- } catch (Exception e) {
- log.error("syncSubscriptionGroup Exception, " + masterAddrBak, e);
- }
- }
- }
-}