You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:14:25 UTC
[34/99] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
deleted file mode 100644
index 82c342f..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ /dev/null
@@ -1,1071 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.client.QueryResult;
-import com.alibaba.rocketmq.client.Validators;
-import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
-import com.alibaba.rocketmq.client.consumer.PullCallback;
-import com.alibaba.rocketmq.client.consumer.PullResult;
-import com.alibaba.rocketmq.client.consumer.listener.MessageListener;
-import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
-import com.alibaba.rocketmq.client.consumer.store.LocalFileOffsetStore;
-import com.alibaba.rocketmq.client.consumer.store.OffsetStore;
-import com.alibaba.rocketmq.client.consumer.store.ReadOffsetType;
-import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.hook.ConsumeMessageContext;
-import com.alibaba.rocketmq.client.hook.ConsumeMessageHook;
-import com.alibaba.rocketmq.client.hook.FilterMessageHook;
-import com.alibaba.rocketmq.client.impl.CommunicationMode;
-import com.alibaba.rocketmq.client.impl.MQClientManager;
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.client.stat.ConsumerStatsManager;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.ServiceState;
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
-import com.alibaba.rocketmq.common.filter.FilterAPI;
-import com.alibaba.rocketmq.common.help.FAQUrl;
-import com.alibaba.rocketmq.common.message.*;
-import com.alibaba.rocketmq.common.protocol.body.ConsumeStatus;
-import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
-import com.alibaba.rocketmq.common.protocol.body.ProcessQueueInfo;
-import com.alibaba.rocketmq.common.protocol.body.QueueTimeSpan;
-import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import com.alibaba.rocketmq.common.protocol.route.BrokerData;
-import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
-import com.alibaba.rocketmq.common.sysflag.PullSysFlag;
-import com.alibaba.rocketmq.remoting.RPCHook;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-import org.slf4j.Logger;
-
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * @author shijia.wxr
- */
-public class DefaultMQPushConsumerImpl implements MQConsumerInner {
- /**
- * Delay some time when exception occur
- */
- private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000;
- /**
- * Flow control interval
- */
- private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
- /**
- * Delay some time when suspend pull service
- */
- private static final long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000;
- private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15;
- private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
- private final Logger log = ClientLogger.getLog();
- private final DefaultMQPushConsumer defaultMQPushConsumer;
- private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
- private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
- private final long consumerStartTimestamp = System.currentTimeMillis();
- private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
- private final RPCHook rpcHook;
- private ServiceState serviceState = ServiceState.CREATE_JUST;
- private MQClientInstance mQClientFactory;
- private PullAPIWrapper pullAPIWrapper;
- private volatile boolean pause = false;
- private boolean consumeOrderly = false;
- private MessageListener messageListenerInner;
- private OffsetStore offsetStore;
- private ConsumeMessageService consumeMessageService;
- private long flowControlTimes1 = 0;
- private long flowControlTimes2 = 0;
-
-
- public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
- this.defaultMQPushConsumer = defaultMQPushConsumer;
- this.rpcHook = rpcHook;
- }
-
- public void registerFilterMessageHook(final FilterMessageHook hook) {
- this.filterMessageHookList.add(hook);
- log.info("register FilterMessageHook Hook, {}", hook.hookName());
- }
-
- public boolean hasHook() {
- return !this.consumeMessageHookList.isEmpty();
- }
-
- public void registerConsumeMessageHook(final ConsumeMessageHook hook) {
- this.consumeMessageHookList.add(hook);
- log.info("register consumeMessageHook Hook, {}", hook.hookName());
- }
-
- public void executeHookBefore(final ConsumeMessageContext context) {
- if (!this.consumeMessageHookList.isEmpty()) {
- for (ConsumeMessageHook hook : this.consumeMessageHookList) {
- try {
- hook.consumeMessageBefore(context);
- } catch (Throwable e) {
- }
- }
- }
- }
-
- public void executeHookAfter(final ConsumeMessageContext context) {
- if (!this.consumeMessageHookList.isEmpty()) {
- for (ConsumeMessageHook hook : this.consumeMessageHookList) {
- try {
- hook.consumeMessageAfter(context);
- } catch (Throwable e) {
- }
- }
- }
- }
-
- public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
- createTopic(key, newTopic, queueNum, 0);
- }
-
- public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
- this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag);
- }
-
- public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
- Set<MessageQueue> result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
- if (null == result) {
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
- result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
- }
-
- if (null == result) {
- throw new MQClientException("The topic[" + topic + "] not exist", null);
- }
-
- return result;
- }
-
- public DefaultMQPushConsumer getDefaultMQPushConsumer() {
- return defaultMQPushConsumer;
- }
-
- public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
- return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq);
- }
-
- public long maxOffset(MessageQueue mq) throws MQClientException {
- return this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
- }
-
- public long minOffset(MessageQueue mq) throws MQClientException {
- return this.mQClientFactory.getMQAdminImpl().minOffset(mq);
- }
-
- public OffsetStore getOffsetStore() {
- return offsetStore;
- }
-
- public void setOffsetStore(OffsetStore offsetStore) {
- this.offsetStore = offsetStore;
- }
-
- public void pullMessage(final PullRequest pullRequest) {
- final ProcessQueue processQueue = pullRequest.getProcessQueue();
- if (processQueue.isDropped()) {
- log.info("the pull request[{}] is dropped.", pullRequest.toString());
- return;
- }
-
- pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
-
- try {
- this.makeSureStateOK();
- } catch (MQClientException e) {
- log.warn("pullMessage exception, consumer state not ok", e);
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
- return;
- }
-
- if (this.isPause()) {
- log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
- return;
- }
-
- long size = processQueue.getMsgCount().get();
- if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
- if ((flowControlTimes1++ % 1000) == 0) {
- log.warn(
- "the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}",
- processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1);
- }
- return;
- }
-
- if (!this.consumeOrderly) {
- if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
- if ((flowControlTimes2++ % 1000) == 0) {
- log.warn(
- "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
- processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
- pullRequest, flowControlTimes2);
- }
- return;
- }
- } else {
- if (processQueue.isLocked()) {
- if (!pullRequest.isLockedFirst()) {
- final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
- boolean brokerBusy = offset < pullRequest.getNextOffset();
- log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
- pullRequest, offset, brokerBusy);
- if (brokerBusy) {
- log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
- pullRequest, offset);
- }
-
- pullRequest.setLockedFirst(true);
- pullRequest.setNextOffset(offset);
- }
- } else {
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
- log.info("pull message later because not locked in broker, {}", pullRequest);
- return;
- }
- }
-
- final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
- if (null == subscriptionData) {
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
- log.warn("find the consumer's subscription failed, {}", pullRequest);
- return;
- }
-
- final long beginTimestamp = System.currentTimeMillis();
-
- PullCallback pullCallback = new PullCallback() {
- @Override
- public void onSuccess(PullResult pullResult) {
- if (pullResult != null) {
- pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
- subscriptionData);
-
- switch (pullResult.getPullStatus()) {
- case FOUND:
- long prevRequestOffset = pullRequest.getNextOffset();
- pullRequest.setNextOffset(pullResult.getNextBeginOffset());
- long pullRT = System.currentTimeMillis() - beginTimestamp;
- DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
- pullRequest.getMessageQueue().getTopic(), pullRT);
-
- long firstMsgOffset = Long.MAX_VALUE;
- if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- } else {
- firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
-
- DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
- pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
-
- boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
- DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
- pullResult.getMsgFoundList(), //
- processQueue, //
- pullRequest.getMessageQueue(), //
- dispathToConsume);
-
- if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
- DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
- DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
- } else {
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- }
- }
-
- if (pullResult.getNextBeginOffset() < prevRequestOffset//
- || firstMsgOffset < prevRequestOffset) {
- log.warn(
- "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", //
- pullResult.getNextBeginOffset(), //
- firstMsgOffset, //
- prevRequestOffset);
- }
-
- break;
- case NO_NEW_MSG:
- pullRequest.setNextOffset(pullResult.getNextBeginOffset());
-
- DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
-
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- break;
- case NO_MATCHED_MSG:
- pullRequest.setNextOffset(pullResult.getNextBeginOffset());
-
- DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
-
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- break;
- case OFFSET_ILLEGAL:
- log.warn("the pull request offset illegal, {} {}", //
- pullRequest.toString(), pullResult.toString());
- pullRequest.setNextOffset(pullResult.getNextBeginOffset());
-
- pullRequest.getProcessQueue().setDropped(true);
- DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
-
- @Override
- public void run() {
- try {
- DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
- pullRequest.getNextOffset(), false);
-
- DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
-
- DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
-
- log.warn("fix the pull request offset, {}", pullRequest);
- } catch (Throwable e) {
- log.error("executeTaskLater Exception", e);
- }
- }
- }, 10000);
- break;
- default:
- break;
- }
- }
- }
-
-
- @Override
- public void onException(Throwable e) {
- if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- log.warn("execute the pull request exception", e);
- }
-
- DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
- }
- };
-
- boolean commitOffsetEnable = false;
- long commitOffsetValue = 0L;
- if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
- commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
- if (commitOffsetValue > 0) {
- commitOffsetEnable = true;
- }
- }
-
- String subExpression = null;
- boolean classFilter = false;
- SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
- if (sd != null) {
- if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
- subExpression = sd.getSubString();
- }
-
- classFilter = sd.isClassFilterMode();
- }
-
- int sysFlag = PullSysFlag.buildSysFlag(//
- commitOffsetEnable, // commitOffset
- true, // suspend
- subExpression != null, // subscription
- classFilter // class filter
- );
- try {
- this.pullAPIWrapper.pullKernelImpl(//
- pullRequest.getMessageQueue(), // 1
- subExpression, // 2
- subscriptionData.getSubVersion(), // 3
- pullRequest.getNextOffset(), // 4
- this.defaultMQPushConsumer.getPullBatchSize(), // 5
- sysFlag, // 6
- commitOffsetValue, // 7
- BROKER_SUSPEND_MAX_TIME_MILLIS, // 8
- CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 9
- CommunicationMode.ASYNC, // 10
- pullCallback// 11
- );
- } catch (Exception e) {
- log.error("pullKernelImpl exception", e);
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
- }
- }
-
- private void makeSureStateOK() throws MQClientException {
- if (this.serviceState != ServiceState.RUNNING) {
- throw new MQClientException("The consumer service state not OK, "//
- + this.serviceState//
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
- null);
- }
- }
-
- private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
- this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
- }
-
- public boolean isPause() {
- return pause;
- }
-
- public void setPause(boolean pause) {
- this.pause = pause;
- }
-
- public ConsumerStatsManager getConsumerStatsManager() {
- return this.mQClientFactory.getConsumerStatsManager();
- }
-
- public void executePullRequestImmediately(final PullRequest pullRequest) {
- this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
- }
-
- private void correctTagsOffset(final PullRequest pullRequest) {
- if (0L == pullRequest.getProcessQueue().getMsgCount().get()) {
- this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), true);
- }
- }
-
- public void executeTaskLater(final Runnable r, final long timeDelay) {
- this.mQClientFactory.getPullMessageService().executeTaskLater(r, timeDelay);
- }
-
- public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
- throws MQClientException, InterruptedException {
- return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
- }
-
- public MessageExt queryMessageByUniqKey(String topic, String uniqKey) throws MQClientException,
- InterruptedException {
- return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey);
- }
-
-
- public void registerMessageListener(MessageListener messageListener) {
- this.messageListenerInner = messageListener;
- }
-
- public void resume() {
- this.pause = false;
- doRebalance();
- log.info("resume this consumer, {}", this.defaultMQPushConsumer.getConsumerGroup());
- }
-
- public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- try {
- String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
- : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
- this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
- this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
- } catch (Exception e) {
- log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
-
- Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
-
- String originMsgId = MessageAccessor.getOriginMessageId(msg);
- MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
-
- newMsg.setFlag(msg.getFlag());
- MessageAccessor.setProperties(newMsg, msg.getProperties());
- MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
- MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
- MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
- newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
-
- this.mQClientFactory.getDefaultMQProducer().send(newMsg);
- }
- }
-
- private int getMaxReconsumeTimes() {
- // default reconsume times: 16
- if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
- return 16;
- } else {
- return this.defaultMQPushConsumer.getMaxReconsumeTimes();
- }
- }
-
- public void shutdown() {
- switch (this.serviceState) {
- case CREATE_JUST:
- break;
- case RUNNING:
- this.consumeMessageService.shutdown();
- this.persistConsumerOffset();
- this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
- this.mQClientFactory.shutdown();
- log.info("the consumer [{}] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup());
- this.rebalanceImpl.destroy();
- this.serviceState = ServiceState.SHUTDOWN_ALREADY;
- break;
- case SHUTDOWN_ALREADY:
- break;
- default:
- break;
- }
- }
-
- public void start() throws MQClientException {
- switch (this.serviceState) {
- case CREATE_JUST:
- log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
- this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
- this.serviceState = ServiceState.START_FAILED;
-
- this.checkConfig();
-
- this.copySubscription();
-
- if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
- this.defaultMQPushConsumer.changeInstanceNameToPID();
- }
-
- this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
-
- this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
- this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
- this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
- this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
-
- this.pullAPIWrapper = new PullAPIWrapper(
- mQClientFactory,
- this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
- this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
-
- if (this.defaultMQPushConsumer.getOffsetStore() != null) {
- this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
- } else {
- switch (this.defaultMQPushConsumer.getMessageModel()) {
- case BROADCASTING:
- this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
- break;
- case CLUSTERING:
- this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
- break;
- default:
- break;
- }
- }
- this.offsetStore.load();
-
- if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
- this.consumeOrderly = true;
- this.consumeMessageService =
- new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
- } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
- this.consumeOrderly = false;
- this.consumeMessageService =
- new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
- }
-
- this.consumeMessageService.start();
-
- boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
- if (!registerOK) {
- this.serviceState = ServiceState.CREATE_JUST;
- this.consumeMessageService.shutdown();
- throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
- + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
- null);
- }
-
- mQClientFactory.start();
- log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
- this.serviceState = ServiceState.RUNNING;
- break;
- case RUNNING:
- case START_FAILED:
- case SHUTDOWN_ALREADY:
- throw new MQClientException("The PushConsumer service state not OK, maybe started once, "//
- + this.serviceState//
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
- null);
- default:
- break;
- }
-
- this.updateTopicSubscribeInfoWhenSubscriptionChanged();
-
- this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
-
- this.mQClientFactory.rebalanceImmediately();
- }
-
- private void checkConfig() throws MQClientException {
- Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());
-
- if (null == this.defaultMQPushConsumer.getConsumerGroup()) {
- throw new MQClientException(
- "consumerGroup is null"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
- }
-
- if (this.defaultMQPushConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {
- throw new MQClientException(
- "consumerGroup can not equal "
- + MixAll.DEFAULT_CONSUMER_GROUP
- + ", please specify another one."
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
- }
-
- if (null == this.defaultMQPushConsumer.getMessageModel()) {
- throw new MQClientException(
- "messageModel is null"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
- }
-
- if (null == this.defaultMQPushConsumer.getConsumeFromWhere()) {
- throw new MQClientException(
- "consumeFromWhere is null"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
- }
-
- Date dt = UtilAll.parseDate(this.defaultMQPushConsumer.getConsumeTimestamp(), UtilAll.YYYY_MMDD_HHMMSS);
- if (null == dt) {
- throw new MQClientException(
- "consumeTimestamp is invalid, YYYY_MMDD_HHMMSS"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
- }
-
- // allocateMessageQueueStrategy
- if (null == this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()) {
- throw new MQClientException(
- "allocateMessageQueueStrategy is null"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
- }
-
- // subscription
- if (null == this.defaultMQPushConsumer.getSubscription()) {
- throw new MQClientException(
- "subscription is null"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
- }
-
- // messageListener
- if (null == this.defaultMQPushConsumer.getMessageListener()) {
- throw new MQClientException(
- "messageListener is null"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
- }
-
- boolean orderly = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerOrderly;
- boolean concurrently = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerConcurrently;
- if (!orderly && !concurrently) {
- throw new MQClientException(
- "messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
- }
-
- // consumeThreadMin
- if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1
- || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000
- || this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) {
- throw new MQClientException(
- "consumeThreadMin Out of range [1, 1000]"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
- }
-
- // consumeThreadMax
- if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) {
- throw new MQClientException(
- "consumeThreadMax Out of range [1, 1000]"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
- }
-
- // consumeConcurrentlyMaxSpan
- if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1
- || this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) {
- throw new MQClientException(
- "consumeConcurrentlyMaxSpan Out of range [1, 65535]"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
- }
-
- // pullThresholdForQueue
- if (this.defaultMQPushConsumer.getPullThresholdForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdForQueue() > 65535) {
- throw new MQClientException(
- "pullThresholdForQueue Out of range [1, 65535]"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
- }
-
- // pullInterval
- if (this.defaultMQPushConsumer.getPullInterval() < 0 || this.defaultMQPushConsumer.getPullInterval() > 65535) {
- throw new MQClientException(
- "pullInterval Out of range [0, 65535]"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
- }
-
- // consumeMessageBatchMaxSize
- if (this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() < 1
- || this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) {
- throw new MQClientException(
- "consumeMessageBatchMaxSize Out of range [1, 1024]"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
- }
-
- // pullBatchSize
- if (this.defaultMQPushConsumer.getPullBatchSize() < 1 || this.defaultMQPushConsumer.getPullBatchSize() > 1024) {
- throw new MQClientException(
- "pullBatchSize Out of range [1, 1024]"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
- }
- }
-
- private void copySubscription() throws MQClientException {
- try {
- Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
- if (sub != null) {
- for (final Map.Entry<String, String> entry : sub.entrySet()) {
- final String topic = entry.getKey();
- final String subString = entry.getValue();
- SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
- topic, subString);
- this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
- }
- }
-
- if (null == this.messageListenerInner) {
- this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
- }
-
- switch (this.defaultMQPushConsumer.getMessageModel()) {
- case BROADCASTING:
- break;
- case CLUSTERING:
- final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
- SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
- retryTopic, SubscriptionData.SUB_ALL);
- this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
- break;
- default:
- break;
- }
- } catch (Exception e) {
- throw new MQClientException("subscription exception", e);
- }
- }
-
- public MessageListener getMessageListenerInner() {
- return messageListenerInner;
- }
-
- private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
- Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
- if (subTable != null) {
- for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
- final String topic = entry.getKey();
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
- }
- }
- }
-
- public ConcurrentHashMap<String, SubscriptionData> getSubscriptionInner() {
- return this.rebalanceImpl.getSubscriptionInner();
- }
-
- public void subscribe(String topic, String subExpression) throws MQClientException {
- try {
- SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
- topic, subExpression);
- this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
- if (this.mQClientFactory != null) {
- this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
- }
- } catch (Exception e) {
- throw new MQClientException("subscription exception", e);
- }
- }
-
- public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
- try {
- SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
- topic, "*");
- subscriptionData.setSubString(fullClassName);
- subscriptionData.setClassFilterMode(true);
- subscriptionData.setFilterClassSource(filterClassSource);
- this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
- if (this.mQClientFactory != null) {
- this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
- }
-
- } catch (Exception e) {
- throw new MQClientException("subscription exception", e);
- }
- }
-
- public void suspend() {
- this.pause = true;
- log.info("suspend this consumer, {}", this.defaultMQPushConsumer.getConsumerGroup());
- }
-
- public void unsubscribe(String topic) {
- this.rebalanceImpl.getSubscriptionInner().remove(topic);
- }
-
- public void updateConsumeOffset(MessageQueue mq, long offset) {
- this.offsetStore.updateOffset(mq, offset, false);
- }
-
- public void updateCorePoolSize(int corePoolSize) {
- this.consumeMessageService.updateCorePoolSize(corePoolSize);
- }
-
- public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
- }
-
- public RebalanceImpl getRebalanceImpl() {
- return rebalanceImpl;
- }
-
- public boolean isConsumeOrderly() {
- return consumeOrderly;
- }
-
- public void setConsumeOrderly(boolean consumeOrderly) {
- this.consumeOrderly = consumeOrderly;
- }
-
- public void resetOffsetByTimeStamp(long timeStamp)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- for (String topic : rebalanceImpl.getSubscriptionInner().keySet()) {
- Set<MessageQueue> mqs = rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
- Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
- if (mqs != null) {
- for (MessageQueue mq : mqs) {
- long offset = searchOffset(mq, timeStamp);
- offsetTable.put(mq, offset);
- }
- this.mQClientFactory.resetOffset(topic, groupName(), offsetTable);
- }
- }
- }
-
- public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
- return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
- }
-
- @Override
- public String groupName() {
- return this.defaultMQPushConsumer.getConsumerGroup();
- }
-
- @Override
- public MessageModel messageModel() {
- return this.defaultMQPushConsumer.getMessageModel();
- }
-
- @Override
- public ConsumeType consumeType() {
- return ConsumeType.CONSUME_PASSIVELY;
- }
-
- @Override
- public ConsumeFromWhere consumeFromWhere() {
- return this.defaultMQPushConsumer.getConsumeFromWhere();
- }
-
- @Override
- public Set<SubscriptionData> subscriptions() {
- Set<SubscriptionData> subSet = new HashSet<SubscriptionData>();
-
- subSet.addAll(this.rebalanceImpl.getSubscriptionInner().values());
-
- return subSet;
- }
-
- @Override
- public void doRebalance() {
- if (this.rebalanceImpl != null && !this.pause) {
- this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
- }
- }
-
- @Override
- public void persistConsumerOffset() {
- try {
- this.makeSureStateOK();
- Set<MessageQueue> mqs = new HashSet<MessageQueue>();
- Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
- if (allocateMq != null) {
- mqs.addAll(allocateMq);
- }
-
- this.offsetStore.persistAll(mqs);
- } catch (Exception e) {
- log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
- }
- }
-
- @Override
- public void updateTopicSubscribeInfo(String topic, Set<MessageQueue> info) {
- Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
- if (subTable != null) {
- if (subTable.containsKey(topic)) {
- this.rebalanceImpl.topicSubscribeInfoTable.put(topic, info);
- }
- }
- }
-
- @Override
- public boolean isSubscribeTopicNeedUpdate(String topic) {
- Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
- if (subTable != null) {
- if (subTable.containsKey(topic)) {
- return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic);
- }
- }
-
- return false;
- }
-
- @Override
- public boolean isUnitMode() {
- return this.defaultMQPushConsumer.isUnitMode();
- }
-
- @Override
- public ConsumerRunningInfo consumerRunningInfo() {
- ConsumerRunningInfo info = new ConsumerRunningInfo();
-
- Properties prop = MixAll.object2Properties(this.defaultMQPushConsumer);
-
- prop.put(ConsumerRunningInfo.PROP_CONSUME_ORDERLY, String.valueOf(this.consumeOrderly));
- prop.put(ConsumerRunningInfo.PROP_THREADPOOL_CORE_SIZE, String.valueOf(this.consumeMessageService.getCorePoolSize()));
- prop.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, String.valueOf(this.consumerStartTimestamp));
-
- info.setProperties(prop);
-
- Set<SubscriptionData> subSet = this.subscriptions();
- info.getSubscriptionSet().addAll(subSet);
-
- Iterator<Entry<MessageQueue, ProcessQueue>> it = this.rebalanceImpl.getProcessQueueTable().entrySet().iterator();
- while (it.hasNext()) {
- Entry<MessageQueue, ProcessQueue> next = it.next();
- MessageQueue mq = next.getKey();
- ProcessQueue pq = next.getValue();
-
- ProcessQueueInfo pqinfo = new ProcessQueueInfo();
- pqinfo.setCommitOffset(this.offsetStore.readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE));
- pq.fillProcessQueueInfo(pqinfo);
- info.getMqTable().put(mq, pqinfo);
- }
-
- for (SubscriptionData sd : subSet) {
- ConsumeStatus consumeStatus = this.mQClientFactory.getConsumerStatsManager().consumeStatus(this.groupName(), sd.getTopic());
- info.getStatusTable().put(sd.getTopic(), consumeStatus);
- }
-
- return info;
- }
-
- public MQClientInstance getmQClientFactory() {
- return mQClientFactory;
- }
-
- public void setmQClientFactory(MQClientInstance mQClientFactory) {
- this.mQClientFactory = mQClientFactory;
- }
-
- public ServiceState getServiceState() {
- return serviceState;
- }
-
- public void setServiceState(ServiceState serviceState) {
- this.serviceState = serviceState;
- }
-
- public void adjustThreadPool() {
- long computeAccTotal = this.computeAccumulationTotal();
- long adjustThreadPoolNumsThreshold = this.defaultMQPushConsumer.getAdjustThreadPoolNumsThreshold();
-
- long incThreshold = (long) (adjustThreadPoolNumsThreshold * 1.0);
-
- long decThreshold = (long) (adjustThreadPoolNumsThreshold * 0.8);
-
- if (computeAccTotal >= incThreshold) {
- this.consumeMessageService.incCorePoolSize();
- }
-
- if (computeAccTotal < decThreshold) {
- this.consumeMessageService.decCorePoolSize();
- }
- }
-
- private long computeAccumulationTotal() {
- long msgAccTotal = 0;
- ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = this.rebalanceImpl.getProcessQueueTable();
- Iterator<Entry<MessageQueue, ProcessQueue>> it = processQueueTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<MessageQueue, ProcessQueue> next = it.next();
- ProcessQueue value = next.getValue();
- msgAccTotal += value.getMsgAccCnt();
- }
-
- return msgAccTotal;
- }
-
- public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic)
- throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
- List<QueueTimeSpan> queueTimeSpan = new ArrayList<QueueTimeSpan>();
- TopicRouteData routeData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, 3000);
- for (BrokerData brokerData : routeData.getBrokerDatas()) {
- String addr = brokerData.selectBrokerAddr();
- queueTimeSpan.addAll(this.mQClientFactory.getMQClientAPIImpl().queryConsumeTimeSpan(addr, topic, groupName(), 3000));
- }
-
- return queueTimeSpan;
- }
-
-
- public ConsumeMessageService getConsumeMessageService() {
- return consumeMessageService;
- }
-
-
- public void setConsumeMessageService(ConsumeMessageService consumeMessageService) {
- this.consumeMessageService = consumeMessageService;
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MQConsumerInner.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MQConsumerInner.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MQConsumerInner.java
deleted file mode 100644
index 1ff430b..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MQConsumerInner.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
-import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
-
-import java.util.Set;
-
-
-/**
- * Consumer inner interface
- *
- * @author shijia.wxr
- */
-public interface MQConsumerInner {
- String groupName();
-
-
- MessageModel messageModel();
-
-
- ConsumeType consumeType();
-
-
- ConsumeFromWhere consumeFromWhere();
-
-
- Set<SubscriptionData> subscriptions();
-
-
- void doRebalance();
-
-
- void persistConsumerOffset();
-
-
- void updateTopicSubscribeInfo(final String topic, final Set<MessageQueue> info);
-
-
- boolean isSubscribeTopicNeedUpdate(final String topic);
-
-
- boolean isUnitMode();
-
-
- ConsumerRunningInfo consumerRunningInfo();
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MessageQueueLock.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MessageQueueLock.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MessageQueueLock.java
deleted file mode 100644
index 9de7ac0..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MessageQueueLock.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * Message lock,strictly ensure the single queue only one thread at a time consuming
- *
- * @author shijia.wxr
- */
-public class MessageQueueLock {
- private ConcurrentHashMap<MessageQueue, Object> mqLockTable =
- new ConcurrentHashMap<MessageQueue, Object>();
-
-
- public Object fetchLockObject(final MessageQueue mq) {
- Object objLock = this.mqLockTable.get(mq);
- if (null == objLock) {
- objLock = new Object();
- Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
- if (prevLock != null) {
- objLock = prevLock;
- }
- }
-
- return objLock;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ProcessQueue.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ProcessQueue.java
deleted file mode 100644
index 05ffeb7..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ProcessQueue.java
+++ /dev/null
@@ -1,451 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.common.message.MessageAccessor;
-import com.alibaba.rocketmq.common.message.MessageConst;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.protocol.body.ProcessQueueInfo;
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-
-/**
- * Queue consumption snapshot
- *
- * @author shijia.wxr
- */
-public class ProcessQueue {
- public final static long REBALANCE_LOCK_MAX_LIVE_TIME =
- Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000"));
- public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
- private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
- private final Logger log = ClientLogger.getLog();
- private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
- private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
- private final AtomicLong msgCount = new AtomicLong();
- private final Lock lockConsume = new ReentrantLock();
- private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long, MessageExt>();
- private final AtomicLong tryUnlockTimes = new AtomicLong(0);
- private volatile long queueOffsetMax = 0L;
- private volatile boolean dropped = false;
- private volatile long lastPullTimestamp = System.currentTimeMillis();
- private volatile long lastConsumeTimestamp = System.currentTimeMillis();
- private volatile boolean locked = false;
- private volatile long lastLockTimestamp = System.currentTimeMillis();
- private volatile boolean consuming = false;
- private volatile long msgAccCnt = 0;
-
- public boolean isLockExpired() {
- boolean result = (System.currentTimeMillis() - this.lastLockTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
- return result;
- }
-
-
- public boolean isPullExpired() {
- boolean result = (System.currentTimeMillis() - this.lastPullTimestamp) > PULL_MAX_IDLE_TIME;
- return result;
- }
-
- /**
-
- *
- * @param pushConsumer
- */
- public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
- if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
- return;
- }
-
- int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
- for (int i = 0; i < loop; i++) {
- MessageExt msg = null;
- try {
- this.lockTreeMap.readLock().lockInterruptibly();
- try {
- if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
- msg = msgTreeMap.firstEntry().getValue();
- } else {
-
- break;
- }
- } finally {
- this.lockTreeMap.readLock().unlock();
- }
- } catch (InterruptedException e) {
- log.error("getExpiredMsg exception", e);
- }
-
- try {
-
- pushConsumer.sendMessageBack(msg, 3);
- log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
- try {
- this.lockTreeMap.writeLock().lockInterruptibly();
- try {
- if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
- try {
- msgTreeMap.remove(msgTreeMap.firstKey());
- } catch (Exception e) {
- log.error("send expired msg exception", e);
- }
- }
- } finally {
- this.lockTreeMap.writeLock().unlock();
- }
- } catch (InterruptedException e) {
- log.error("getExpiredMsg exception", e);
- }
- } catch (Exception e) {
- log.error("send expired msg exception", e);
- }
- }
- }
-
-
- public boolean putMessage(final List<MessageExt> msgs) {
- boolean dispatchToConsume = false;
- try {
- this.lockTreeMap.writeLock().lockInterruptibly();
- try {
- int validMsgCnt = 0;
- for (MessageExt msg : msgs) {
- MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
- if (null == old) {
- validMsgCnt++;
- this.queueOffsetMax = msg.getQueueOffset();
- }
- }
- msgCount.addAndGet(validMsgCnt);
-
- if (!msgTreeMap.isEmpty() && !this.consuming) {
- dispatchToConsume = true;
- this.consuming = true;
- }
-
- if (!msgs.isEmpty()) {
- MessageExt messageExt = msgs.get(msgs.size() - 1);
- String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
- if (property != null) {
- long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
- if (accTotal > 0) {
- this.msgAccCnt = accTotal;
- }
- }
- }
- } finally {
- this.lockTreeMap.writeLock().unlock();
- }
- } catch (InterruptedException e) {
- log.error("putMessage exception", e);
- }
-
- return dispatchToConsume;
- }
-
-
- public long getMaxSpan() {
- try {
- this.lockTreeMap.readLock().lockInterruptibly();
- try {
- if (!this.msgTreeMap.isEmpty()) {
- return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey();
- }
- } finally {
- this.lockTreeMap.readLock().unlock();
- }
- } catch (InterruptedException e) {
- log.error("getMaxSpan exception", e);
- }
-
- return 0;
- }
-
-
- public long removeMessage(final List<MessageExt> msgs) {
- long result = -1;
- final long now = System.currentTimeMillis();
- try {
- this.lockTreeMap.writeLock().lockInterruptibly();
- this.lastConsumeTimestamp = now;
- try {
- if (!msgTreeMap.isEmpty()) {
- result = this.queueOffsetMax + 1;
- int removedCnt = 0;
- for (MessageExt msg : msgs) {
- MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
- if (prev != null) {
- removedCnt--;
- }
- }
- msgCount.addAndGet(removedCnt);
-
- if (!msgTreeMap.isEmpty()) {
- result = msgTreeMap.firstKey();
- }
- }
- } finally {
- this.lockTreeMap.writeLock().unlock();
- }
- } catch (Throwable t) {
- log.error("removeMessage exception", t);
- }
-
- return result;
- }
-
-
- public TreeMap<Long, MessageExt> getMsgTreeMap() {
- return msgTreeMap;
- }
-
-
- public AtomicLong getMsgCount() {
- return msgCount;
- }
-
-
- public boolean isDropped() {
- return dropped;
- }
-
-
- public void setDropped(boolean dropped) {
- this.dropped = dropped;
- }
-
- public boolean isLocked() {
- return locked;
- }
-
- public void setLocked(boolean locked) {
- this.locked = locked;
- }
-
- public void rollback() {
- try {
- this.lockTreeMap.writeLock().lockInterruptibly();
- try {
- this.msgTreeMap.putAll(this.msgTreeMapTemp);
- this.msgTreeMapTemp.clear();
- } finally {
- this.lockTreeMap.writeLock().unlock();
- }
- } catch (InterruptedException e) {
- log.error("rollback exception", e);
- }
- }
-
-
- public long commit() {
- try {
- this.lockTreeMap.writeLock().lockInterruptibly();
- try {
- Long offset = this.msgTreeMapTemp.lastKey();
- msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
- this.msgTreeMapTemp.clear();
- if (offset != null) {
- return offset + 1;
- }
- } finally {
- this.lockTreeMap.writeLock().unlock();
- }
- } catch (InterruptedException e) {
- log.error("commit exception", e);
- }
-
- return -1;
- }
-
-
- public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
- try {
- this.lockTreeMap.writeLock().lockInterruptibly();
- try {
- for (MessageExt msg : msgs) {
- this.msgTreeMapTemp.remove(msg.getQueueOffset());
- this.msgTreeMap.put(msg.getQueueOffset(), msg);
- }
- } finally {
- this.lockTreeMap.writeLock().unlock();
- }
- } catch (InterruptedException e) {
- log.error("makeMessageToCosumeAgain exception", e);
- }
- }
-
-
- public List<MessageExt> takeMessags(final int batchSize) {
- List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
- final long now = System.currentTimeMillis();
- try {
- this.lockTreeMap.writeLock().lockInterruptibly();
- this.lastConsumeTimestamp = now;
- try {
- if (!this.msgTreeMap.isEmpty()) {
- for (int i = 0; i < batchSize; i++) {
- Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
- if (entry != null) {
- result.add(entry.getValue());
- msgTreeMapTemp.put(entry.getKey(), entry.getValue());
- } else {
- break;
- }
- }
- }
-
- if (result.isEmpty()) {
- consuming = false;
- }
- } finally {
- this.lockTreeMap.writeLock().unlock();
- }
- } catch (InterruptedException e) {
- log.error("take Messages exception", e);
- }
-
- return result;
- }
-
-
- public boolean hasTempMessage() {
- try {
- this.lockTreeMap.readLock().lockInterruptibly();
- try {
- return !this.msgTreeMap.isEmpty();
- } finally {
- this.lockTreeMap.readLock().unlock();
- }
- } catch (InterruptedException e) {
- }
-
- return true;
- }
-
-
- public void clear() {
- try {
- this.lockTreeMap.writeLock().lockInterruptibly();
- try {
- this.msgTreeMap.clear();
- this.msgTreeMapTemp.clear();
- this.msgCount.set(0);
- this.queueOffsetMax = 0L;
- } finally {
- this.lockTreeMap.writeLock().unlock();
- }
- } catch (InterruptedException e) {
- log.error("rollback exception", e);
- }
- }
-
-
- public long getLastLockTimestamp() {
- return lastLockTimestamp;
- }
-
-
- public void setLastLockTimestamp(long lastLockTimestamp) {
- this.lastLockTimestamp = lastLockTimestamp;
- }
-
-
- public Lock getLockConsume() {
- return lockConsume;
- }
-
-
- public long getLastPullTimestamp() {
- return lastPullTimestamp;
- }
-
-
- public void setLastPullTimestamp(long lastPullTimestamp) {
- this.lastPullTimestamp = lastPullTimestamp;
- }
-
-
- public long getMsgAccCnt() {
- return msgAccCnt;
- }
-
-
- public void setMsgAccCnt(long msgAccCnt) {
- this.msgAccCnt = msgAccCnt;
- }
-
-
- public long getTryUnlockTimes() {
- return this.tryUnlockTimes.get();
- }
-
-
- public void incTryUnlockTimes() {
- this.tryUnlockTimes.incrementAndGet();
- }
-
-
- public void fillProcessQueueInfo(final ProcessQueueInfo info) {
- try {
- this.lockTreeMap.readLock().lockInterruptibly();
-
- if (!this.msgTreeMap.isEmpty()) {
- info.setCachedMsgMinOffset(this.msgTreeMap.firstKey());
- info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey());
- info.setCachedMsgCount(this.msgTreeMap.size());
- }
-
- if (!this.msgTreeMapTemp.isEmpty()) {
- info.setTransactionMsgMinOffset(this.msgTreeMapTemp.firstKey());
- info.setTransactionMsgMaxOffset(this.msgTreeMapTemp.lastKey());
- info.setTransactionMsgCount(this.msgTreeMapTemp.size());
- }
-
- info.setLocked(this.locked);
- info.setTryUnlockTimes(this.tryUnlockTimes.get());
- info.setLastLockTimestamp(this.lastLockTimestamp);
-
- info.setDroped(this.dropped);
- info.setLastPullTimestamp(this.lastPullTimestamp);
- info.setLastConsumeTimestamp(this.lastConsumeTimestamp);
- } catch (Exception e) {
- } finally {
- this.lockTreeMap.readLock().unlock();
- }
- }
-
-
- public long getLastConsumeTimestamp() {
- return lastConsumeTimestamp;
- }
-
-
- public void setLastConsumeTimestamp(long lastConsumeTimestamp) {
- this.lastConsumeTimestamp = lastConsumeTimestamp;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullAPIWrapper.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullAPIWrapper.java
deleted file mode 100644
index 730b090..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.client.consumer.PullCallback;
-import com.alibaba.rocketmq.client.consumer.PullResult;
-import com.alibaba.rocketmq.client.consumer.PullStatus;
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.hook.FilterMessageContext;
-import com.alibaba.rocketmq.client.hook.FilterMessageHook;
-import com.alibaba.rocketmq.client.impl.CommunicationMode;
-import com.alibaba.rocketmq.client.impl.FindBrokerResult;
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.message.*;
-import com.alibaba.rocketmq.common.protocol.header.PullMessageRequestHeader;
-import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
-import com.alibaba.rocketmq.common.sysflag.PullSysFlag;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-import org.slf4j.Logger;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-
-/**
- * @author shijia.wxr
- */
-public class PullAPIWrapper {
- private final Logger log = ClientLogger.getLog();
- private final MQClientInstance mQClientFactory;
- private final String consumerGroup;
- private final boolean unitMode;
- private ConcurrentHashMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
- new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
- private volatile boolean connectBrokerByUser = false;
- private volatile long defaultBrokerId = MixAll.MASTER_ID;
- private Random random = new Random(System.currentTimeMillis());
- private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
-
- public PullAPIWrapper(MQClientInstance mQClientFactory, String consumerGroup, boolean unitMode) {
- this.mQClientFactory = mQClientFactory;
- this.consumerGroup = consumerGroup;
- this.unitMode = unitMode;
- }
-
- public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
- final SubscriptionData subscriptionData) {
- PullResultExt pullResultExt = (PullResultExt) pullResult;
-
- this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
- if (PullStatus.FOUND == pullResult.getPullStatus()) {
- ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
- List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
-
- List<MessageExt> msgListFilterAgain = msgList;
- if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
- msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
- for (MessageExt msg : msgList) {
- if (msg.getTags() != null) {
- if (subscriptionData.getTagsSet().contains(msg.getTags())) {
- msgListFilterAgain.add(msg);
- }
- }
- }
- }
-
- if (this.hasHook()) {
- FilterMessageContext filterMessageContext = new FilterMessageContext();
- filterMessageContext.setUnitMode(unitMode);
- filterMessageContext.setMsgList(msgListFilterAgain);
- this.executeHook(filterMessageContext);
- }
-
- for (MessageExt msg : msgListFilterAgain) {
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
- Long.toString(pullResult.getMinOffset()));
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
- Long.toString(pullResult.getMaxOffset()));
- }
-
- pullResultExt.setMsgFoundList(msgListFilterAgain);
- }
-
- pullResultExt.setMessageBinary(null);
-
- return pullResult;
- }
-
- public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {
- AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
- if (null == suggest) {
- this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
- } else {
- suggest.set(brokerId);
- }
- }
-
- public boolean hasHook() {
- return !this.filterMessageHookList.isEmpty();
- }
-
- public void executeHook(final FilterMessageContext context) {
- if (!this.filterMessageHookList.isEmpty()) {
- for (FilterMessageHook hook : this.filterMessageHookList) {
- try {
- hook.filterMessage(context);
- } catch (Throwable e) {
- log.error("execute hook error. hookName={}", hook.hookName());
- }
- }
- }
- }
-
- public PullResult pullKernelImpl(
- final MessageQueue mq,
- final String subExpression,
- final long subVersion,
- final long offset,
- final int maxNums,
- final int sysFlag,
- final long commitOffset,
- final long brokerSuspendMaxTimeMillis,
- final long timeoutMillis,
- final CommunicationMode communicationMode,
- final PullCallback pullCallback
- ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- FindBrokerResult findBrokerResult =
- this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
- this.recalculatePullFromWhichNode(mq), false);
- if (null == findBrokerResult) {
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
- findBrokerResult =
- this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
- this.recalculatePullFromWhichNode(mq), false);
- }
-
- if (findBrokerResult != null) {
- int sysFlagInner = sysFlag;
-
- if (findBrokerResult.isSlave()) {
- sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
- }
-
- PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
- requestHeader.setConsumerGroup(this.consumerGroup);
- requestHeader.setTopic(mq.getTopic());
- requestHeader.setQueueId(mq.getQueueId());
- requestHeader.setQueueOffset(offset);
- requestHeader.setMaxMsgNums(maxNums);
- requestHeader.setSysFlag(sysFlagInner);
- requestHeader.setCommitOffset(commitOffset);
- requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
- requestHeader.setSubscription(subExpression);
- requestHeader.setSubVersion(subVersion);
-
- String brokerAddr = findBrokerResult.getBrokerAddr();
- if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
- brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
- }
-
- PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
- brokerAddr,
- requestHeader,
- timeoutMillis,
- communicationMode,
- pullCallback);
-
- return pullResult;
- }
-
- throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
- }
-
- public long recalculatePullFromWhichNode(final MessageQueue mq) {
- if (this.isConnectBrokerByUser()) {
- return this.defaultBrokerId;
- }
-
- AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
- if (suggest != null) {
- return suggest.get();
- }
-
- return MixAll.MASTER_ID;
- }
-
- private String computPullFromWhichFilterServer(final String topic, final String brokerAddr)
- throws MQClientException {
- ConcurrentHashMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable();
- if (topicRouteTable != null) {
- TopicRouteData topicRouteData = topicRouteTable.get(topic);
- List<String> list = topicRouteData.getFilterServerTable().get(brokerAddr);
-
- if (list != null && !list.isEmpty()) {
- return list.get(randomNum() % list.size());
- }
- }
-
- throw new MQClientException("Find Filter Server Failed, Broker Addr: " + brokerAddr + " topic: "
- + topic, null);
- }
-
- public boolean isConnectBrokerByUser() {
- return connectBrokerByUser;
- }
-
- public int randomNum() {
- int value = random.nextInt();
- if (value < 0) {
- value = Math.abs(value);
- if (value < 0)
- value = 0;
- }
- return value;
- }
-
- public void setConnectBrokerByUser(boolean connectBrokerByUser) {
- this.connectBrokerByUser = connectBrokerByUser;
-
- }
-
- public void registerFilterMessageHook(ArrayList<FilterMessageHook> filterMessageHookList) {
- this.filterMessageHookList = filterMessageHookList;
- }
-
- public long getDefaultBrokerId() {
- return defaultBrokerId;
- }
-
- public void setDefaultBrokerId(long defaultBrokerId) {
- this.defaultBrokerId = defaultBrokerId;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullMessageService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullMessageService.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullMessageService.java
deleted file mode 100644
index 161a039..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullMessageService.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.common.ServiceThread;
-import org.slf4j.Logger;
-
-import java.util.concurrent.*;
-
-
-/**
- * @author shijia.wxr
- */
-public class PullMessageService extends ServiceThread {
- private final Logger log = ClientLogger.getLog();
- private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
- private final MQClientInstance mQClientFactory;
- private final ScheduledExecutorService scheduledExecutorService = Executors
- .newSingleThreadScheduledExecutor(new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "PullMessageServiceScheduledThread");
- }
- });
-
- public PullMessageService(MQClientInstance mQClientFactory) {
- this.mQClientFactory = mQClientFactory;
- }
-
- public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
- this.scheduledExecutorService.schedule(new Runnable() {
-
- @Override
- public void run() {
- PullMessageService.this.executePullRequestImmediately(pullRequest);
- }
- }, timeDelay, TimeUnit.MILLISECONDS);
- }
-
- public void executePullRequestImmediately(final PullRequest pullRequest) {
- try {
- this.pullRequestQueue.put(pullRequest);
- } catch (InterruptedException e) {
- log.error("executePullRequestImmediately pullRequestQueue.put", e);
- }
- }
-
- public void executeTaskLater(final Runnable r, final long timeDelay) {
- this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS);
- }
-
- public ScheduledExecutorService getScheduledExecutorService() {
- return scheduledExecutorService;
- }
-
- private void pullMessage(final PullRequest pullRequest) {
- final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
- if (consumer != null) {
- DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
- impl.pullMessage(pullRequest);
- } else {
- log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
- }
- }
-
-
- @Override
- public void run() {
- log.info(this.getServiceName() + " service started");
-
- while (!this.isStopped()) {
- try {
- PullRequest pullRequest = this.pullRequestQueue.take();
- if (pullRequest != null) {
- this.pullMessage(pullRequest);
- }
- } catch (InterruptedException e) {
- } catch (Exception e) {
- log.error("Pull Message Service Run Method exception", e);
- }
- }
-
- log.info(this.getServiceName() + " service end");
- }
-
-
- @Override
- public String getServiceName() {
- return PullMessageService.class.getSimpleName();
- }
-
-
-}