You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:14:15 UTC
[24/99] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
new file mode 100644
index 0000000..664b9fb
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -0,0 +1,1071 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.client.QueryResult;
+import org.apache.rocketmq.client.Validators;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.PullCallback;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.listener.MessageListener;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
+import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.hook.ConsumeMessageContext;
+import org.apache.rocketmq.client.hook.ConsumeMessageHook;
+import org.apache.rocketmq.client.hook.FilterMessageHook;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.stat.ConsumerStatsManager;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ServiceState;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.filter.FilterAPI;
+import org.apache.rocketmq.common.help.FAQUrl;
+import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
+import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DefaultMQPushConsumerImpl implements MQConsumerInner {
+ /**
+ * Delay some time when exception occur
+ */
+ private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000;
+ /**
+ * Flow control interval
+ */
+ private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
+ /**
+ * Delay some time when suspend pull service
+ */
+ private static final long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000;
+ private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15;
+ private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
+ private final Logger log = ClientLogger.getLog();
+ private final DefaultMQPushConsumer defaultMQPushConsumer;
+ private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
+ private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
+ private final long consumerStartTimestamp = System.currentTimeMillis();
+ private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
+ private final RPCHook rpcHook;
+ private ServiceState serviceState = ServiceState.CREATE_JUST;
+ private MQClientInstance mQClientFactory;
+ private PullAPIWrapper pullAPIWrapper;
+ private volatile boolean pause = false;
+ private boolean consumeOrderly = false;
+ private MessageListener messageListenerInner;
+ private OffsetStore offsetStore;
+ private ConsumeMessageService consumeMessageService;
+ private long flowControlTimes1 = 0;
+ private long flowControlTimes2 = 0;
+
+
+ public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
+ this.defaultMQPushConsumer = defaultMQPushConsumer;
+ this.rpcHook = rpcHook;
+ }
+
+ public void registerFilterMessageHook(final FilterMessageHook hook) {
+ this.filterMessageHookList.add(hook);
+ log.info("register FilterMessageHook Hook, {}", hook.hookName());
+ }
+
+ public boolean hasHook() {
+ return !this.consumeMessageHookList.isEmpty();
+ }
+
+ public void registerConsumeMessageHook(final ConsumeMessageHook hook) {
+ this.consumeMessageHookList.add(hook);
+ log.info("register consumeMessageHook Hook, {}", hook.hookName());
+ }
+
+ public void executeHookBefore(final ConsumeMessageContext context) {
+ if (!this.consumeMessageHookList.isEmpty()) {
+ for (ConsumeMessageHook hook : this.consumeMessageHookList) {
+ try {
+ hook.consumeMessageBefore(context);
+ } catch (Throwable e) {
+ }
+ }
+ }
+ }
+
+ public void executeHookAfter(final ConsumeMessageContext context) {
+ if (!this.consumeMessageHookList.isEmpty()) {
+ for (ConsumeMessageHook hook : this.consumeMessageHookList) {
+ try {
+ hook.consumeMessageAfter(context);
+ } catch (Throwable e) {
+ }
+ }
+ }
+ }
+
+ public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
+ createTopic(key, newTopic, queueNum, 0);
+ }
+
+ public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
+ this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag);
+ }
+
+ public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
+ Set<MessageQueue> result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
+ if (null == result) {
+ this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
+ result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
+ }
+
+ if (null == result) {
+ throw new MQClientException("The topic[" + topic + "] not exist", null);
+ }
+
+ return result;
+ }
+
+ public DefaultMQPushConsumer getDefaultMQPushConsumer() {
+ return defaultMQPushConsumer;
+ }
+
+ public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
+ return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq);
+ }
+
+ public long maxOffset(MessageQueue mq) throws MQClientException {
+ return this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
+ }
+
+ public long minOffset(MessageQueue mq) throws MQClientException {
+ return this.mQClientFactory.getMQAdminImpl().minOffset(mq);
+ }
+
+ public OffsetStore getOffsetStore() {
+ return offsetStore;
+ }
+
+ public void setOffsetStore(OffsetStore offsetStore) {
+ this.offsetStore = offsetStore;
+ }
+
+ public void pullMessage(final PullRequest pullRequest) {
+ final ProcessQueue processQueue = pullRequest.getProcessQueue();
+ if (processQueue.isDropped()) {
+ log.info("the pull request[{}] is dropped.", pullRequest.toString());
+ return;
+ }
+
+ pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
+
+ try {
+ this.makeSureStateOK();
+ } catch (MQClientException e) {
+ log.warn("pullMessage exception, consumer state not ok", e);
+ this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+ return;
+ }
+
+ if (this.isPause()) {
+ log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
+ this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
+ return;
+ }
+
+ long size = processQueue.getMsgCount().get();
+ if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
+ this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
+ if ((flowControlTimes1++ % 1000) == 0) {
+ log.warn(
+ "the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}",
+ processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1);
+ }
+ return;
+ }
+
+ if (!this.consumeOrderly) {
+ if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
+ this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
+ if ((flowControlTimes2++ % 1000) == 0) {
+ log.warn(
+ "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
+ processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
+ pullRequest, flowControlTimes2);
+ }
+ return;
+ }
+ } else {
+ if (processQueue.isLocked()) {
+ if (!pullRequest.isLockedFirst()) {
+ final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
+ boolean brokerBusy = offset < pullRequest.getNextOffset();
+ log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
+ pullRequest, offset, brokerBusy);
+ if (brokerBusy) {
+ log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
+ pullRequest, offset);
+ }
+
+ pullRequest.setLockedFirst(true);
+ pullRequest.setNextOffset(offset);
+ }
+ } else {
+ this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+ log.info("pull message later because not locked in broker, {}", pullRequest);
+ return;
+ }
+ }
+
+ final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
+ if (null == subscriptionData) {
+ this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+ log.warn("find the consumer's subscription failed, {}", pullRequest);
+ return;
+ }
+
+ final long beginTimestamp = System.currentTimeMillis();
+
+ PullCallback pullCallback = new PullCallback() {
+ @Override
+ public void onSuccess(PullResult pullResult) {
+ if (pullResult != null) {
+ pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
+ subscriptionData);
+
+ switch (pullResult.getPullStatus()) {
+ case FOUND:
+ long prevRequestOffset = pullRequest.getNextOffset();
+ pullRequest.setNextOffset(pullResult.getNextBeginOffset());
+ long pullRT = System.currentTimeMillis() - beginTimestamp;
+ DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
+ pullRequest.getMessageQueue().getTopic(), pullRT);
+
+ long firstMsgOffset = Long.MAX_VALUE;
+ if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
+ DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
+ } else {
+ firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
+
+ DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
+ pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
+
+ boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
+ DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
+ pullResult.getMsgFoundList(), //
+ processQueue, //
+ pullRequest.getMessageQueue(), //
+ dispathToConsume);
+
+ if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
+ DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
+ DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
+ } else {
+ DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
+ }
+ }
+
+ if (pullResult.getNextBeginOffset() < prevRequestOffset//
+ || firstMsgOffset < prevRequestOffset) {
+ log.warn(
+ "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", //
+ pullResult.getNextBeginOffset(), //
+ firstMsgOffset, //
+ prevRequestOffset);
+ }
+
+ break;
+ case NO_NEW_MSG:
+ pullRequest.setNextOffset(pullResult.getNextBeginOffset());
+
+ DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
+
+ DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
+ break;
+ case NO_MATCHED_MSG:
+ pullRequest.setNextOffset(pullResult.getNextBeginOffset());
+
+ DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
+
+ DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
+ break;
+ case OFFSET_ILLEGAL:
+ log.warn("the pull request offset illegal, {} {}", //
+ pullRequest.toString(), pullResult.toString());
+ pullRequest.setNextOffset(pullResult.getNextBeginOffset());
+
+ pullRequest.getProcessQueue().setDropped(true);
+ DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
+ pullRequest.getNextOffset(), false);
+
+ DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
+
+ DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
+
+ log.warn("fix the pull request offset, {}", pullRequest);
+ } catch (Throwable e) {
+ log.error("executeTaskLater Exception", e);
+ }
+ }
+ }, 10000);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+
+ @Override
+ public void onException(Throwable e) {
+ if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ log.warn("execute the pull request exception", e);
+ }
+
+ DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+ }
+ };
+
+ boolean commitOffsetEnable = false;
+ long commitOffsetValue = 0L;
+ if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
+ commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
+ if (commitOffsetValue > 0) {
+ commitOffsetEnable = true;
+ }
+ }
+
+ String subExpression = null;
+ boolean classFilter = false;
+ SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
+ if (sd != null) {
+ if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
+ subExpression = sd.getSubString();
+ }
+
+ classFilter = sd.isClassFilterMode();
+ }
+
+ int sysFlag = PullSysFlag.buildSysFlag(//
+ commitOffsetEnable, // commitOffset
+ true, // suspend
+ subExpression != null, // subscription
+ classFilter // class filter
+ );
+ try {
+ this.pullAPIWrapper.pullKernelImpl(//
+ pullRequest.getMessageQueue(), // 1
+ subExpression, // 2
+ subscriptionData.getSubVersion(), // 3
+ pullRequest.getNextOffset(), // 4
+ this.defaultMQPushConsumer.getPullBatchSize(), // 5
+ sysFlag, // 6
+ commitOffsetValue, // 7
+ BROKER_SUSPEND_MAX_TIME_MILLIS, // 8
+ CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 9
+ CommunicationMode.ASYNC, // 10
+ pullCallback// 11
+ );
+ } catch (Exception e) {
+ log.error("pullKernelImpl exception", e);
+ this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+ }
+ }
+
+ private void makeSureStateOK() throws MQClientException {
+ if (this.serviceState != ServiceState.RUNNING) {
+ throw new MQClientException("The consumer service state not OK, "//
+ + this.serviceState//
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
+ null);
+ }
+ }
+
+ private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
+ this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
+ }
+
+ public boolean isPause() {
+ return pause;
+ }
+
+ public void setPause(boolean pause) {
+ this.pause = pause;
+ }
+
+ public ConsumerStatsManager getConsumerStatsManager() {
+ return this.mQClientFactory.getConsumerStatsManager();
+ }
+
+ public void executePullRequestImmediately(final PullRequest pullRequest) {
+ this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
+ }
+
+ private void correctTagsOffset(final PullRequest pullRequest) {
+ if (0L == pullRequest.getProcessQueue().getMsgCount().get()) {
+ this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), true);
+ }
+ }
+
+ public void executeTaskLater(final Runnable r, final long timeDelay) {
+ this.mQClientFactory.getPullMessageService().executeTaskLater(r, timeDelay);
+ }
+
+ public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
+ throws MQClientException, InterruptedException {
+ return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
+ }
+
+ public MessageExt queryMessageByUniqKey(String topic, String uniqKey) throws MQClientException,
+ InterruptedException {
+ return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey);
+ }
+
+
+ public void registerMessageListener(MessageListener messageListener) {
+ this.messageListenerInner = messageListener;
+ }
+
+ public void resume() {
+ this.pause = false;
+ doRebalance();
+ log.info("resume this consumer, {}", this.defaultMQPushConsumer.getConsumerGroup());
+ }
+
+ public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ try {
+ String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
+ : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
+ this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
+ this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
+ } catch (Exception e) {
+ log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
+
+ Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
+
+ String originMsgId = MessageAccessor.getOriginMessageId(msg);
+ MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
+
+ newMsg.setFlag(msg.getFlag());
+ MessageAccessor.setProperties(newMsg, msg.getProperties());
+ MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
+ MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
+ MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
+ newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
+
+ this.mQClientFactory.getDefaultMQProducer().send(newMsg);
+ }
+ }
+
+ private int getMaxReconsumeTimes() {
+ // default reconsume times: 16
+ if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
+ return 16;
+ } else {
+ return this.defaultMQPushConsumer.getMaxReconsumeTimes();
+ }
+ }
+
+ public void shutdown() {
+ switch (this.serviceState) {
+ case CREATE_JUST:
+ break;
+ case RUNNING:
+ this.consumeMessageService.shutdown();
+ this.persistConsumerOffset();
+ this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
+ this.mQClientFactory.shutdown();
+ log.info("the consumer [{}] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup());
+ this.rebalanceImpl.destroy();
+ this.serviceState = ServiceState.SHUTDOWN_ALREADY;
+ break;
+ case SHUTDOWN_ALREADY:
+ break;
+ default:
+ break;
+ }
+ }
+
+ public void start() throws MQClientException {
+ switch (this.serviceState) {
+ case CREATE_JUST:
+ log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
+ this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
+ this.serviceState = ServiceState.START_FAILED;
+
+ this.checkConfig();
+
+ this.copySubscription();
+
+ if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
+ this.defaultMQPushConsumer.changeInstanceNameToPID();
+ }
+
+ this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
+
+ this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
+ this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
+ this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
+ this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
+
+ this.pullAPIWrapper = new PullAPIWrapper(
+ mQClientFactory,
+ this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
+ this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
+
+ if (this.defaultMQPushConsumer.getOffsetStore() != null) {
+ this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
+ } else {
+ switch (this.defaultMQPushConsumer.getMessageModel()) {
+ case BROADCASTING:
+ this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
+ break;
+ case CLUSTERING:
+ this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
+ break;
+ default:
+ break;
+ }
+ }
+ this.offsetStore.load();
+
+ if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
+ this.consumeOrderly = true;
+ this.consumeMessageService =
+ new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
+ } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
+ this.consumeOrderly = false;
+ this.consumeMessageService =
+ new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
+ }
+
+ this.consumeMessageService.start();
+
+ boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
+ if (!registerOK) {
+ this.serviceState = ServiceState.CREATE_JUST;
+ this.consumeMessageService.shutdown();
+ throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
+ null);
+ }
+
+ mQClientFactory.start();
+ log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
+ this.serviceState = ServiceState.RUNNING;
+ break;
+ case RUNNING:
+ case START_FAILED:
+ case SHUTDOWN_ALREADY:
+ throw new MQClientException("The PushConsumer service state not OK, maybe started once, "//
+ + this.serviceState//
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
+ null);
+ default:
+ break;
+ }
+
+ this.updateTopicSubscribeInfoWhenSubscriptionChanged();
+
+ this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
+
+ this.mQClientFactory.rebalanceImmediately();
+ }
+
+ private void checkConfig() throws MQClientException {
+ Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());
+
+ if (null == this.defaultMQPushConsumer.getConsumerGroup()) {
+ throw new MQClientException(
+ "consumerGroup is null"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+
+ if (this.defaultMQPushConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {
+ throw new MQClientException(
+ "consumerGroup can not equal "
+ + MixAll.DEFAULT_CONSUMER_GROUP
+ + ", please specify another one."
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+
+ if (null == this.defaultMQPushConsumer.getMessageModel()) {
+ throw new MQClientException(
+ "messageModel is null"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+
+ if (null == this.defaultMQPushConsumer.getConsumeFromWhere()) {
+ throw new MQClientException(
+ "consumeFromWhere is null"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+
+ Date dt = UtilAll.parseDate(this.defaultMQPushConsumer.getConsumeTimestamp(), UtilAll.YYYY_MMDD_HHMMSS);
+ if (null == dt) {
+ throw new MQClientException(
+ "consumeTimestamp is invalid, YYYY_MMDD_HHMMSS"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+
+ // allocateMessageQueueStrategy
+ if (null == this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()) {
+ throw new MQClientException(
+ "allocateMessageQueueStrategy is null"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+
+ // subscription
+ if (null == this.defaultMQPushConsumer.getSubscription()) {
+ throw new MQClientException(
+ "subscription is null"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+
+ // messageListener
+ if (null == this.defaultMQPushConsumer.getMessageListener()) {
+ throw new MQClientException(
+ "messageListener is null"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+
+ boolean orderly = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerOrderly;
+ boolean concurrently = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerConcurrently;
+ if (!orderly && !concurrently) {
+ throw new MQClientException(
+ "messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+
+ // consumeThreadMin
+ if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1
+ || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000
+ || this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) {
+ throw new MQClientException(
+ "consumeThreadMin Out of range [1, 1000]"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+
+ // consumeThreadMax
+ if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) {
+ throw new MQClientException(
+ "consumeThreadMax Out of range [1, 1000]"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+
+ // consumeConcurrentlyMaxSpan
+ if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1
+ || this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) {
+ throw new MQClientException(
+ "consumeConcurrentlyMaxSpan Out of range [1, 65535]"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+
+ // pullThresholdForQueue
+ if (this.defaultMQPushConsumer.getPullThresholdForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdForQueue() > 65535) {
+ throw new MQClientException(
+ "pullThresholdForQueue Out of range [1, 65535]"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+
+ // pullInterval
+ if (this.defaultMQPushConsumer.getPullInterval() < 0 || this.defaultMQPushConsumer.getPullInterval() > 65535) {
+ throw new MQClientException(
+ "pullInterval Out of range [0, 65535]"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+
+ // consumeMessageBatchMaxSize
+ if (this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() < 1
+ || this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) {
+ throw new MQClientException(
+ "consumeMessageBatchMaxSize Out of range [1, 1024]"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+
+ // pullBatchSize
+ if (this.defaultMQPushConsumer.getPullBatchSize() < 1 || this.defaultMQPushConsumer.getPullBatchSize() > 1024) {
+ throw new MQClientException(
+ "pullBatchSize Out of range [1, 1024]"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+ }
+
+ private void copySubscription() throws MQClientException {
+ try {
+ Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
+ if (sub != null) {
+ for (final Map.Entry<String, String> entry : sub.entrySet()) {
+ final String topic = entry.getKey();
+ final String subString = entry.getValue();
+ SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
+ topic, subString);
+ this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
+ }
+ }
+
+ if (null == this.messageListenerInner) {
+ this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
+ }
+
+ switch (this.defaultMQPushConsumer.getMessageModel()) {
+ case BROADCASTING:
+ break;
+ case CLUSTERING:
+ final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
+ SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
+ retryTopic, SubscriptionData.SUB_ALL);
+ this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
+ break;
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ throw new MQClientException("subscription exception", e);
+ }
+ }
+
+ public MessageListener getMessageListenerInner() {
+ return messageListenerInner;
+ }
+
+ private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
+ Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
+ if (subTable != null) {
+ for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
+ final String topic = entry.getKey();
+ this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
+ }
+ }
+ }
+
+ public ConcurrentHashMap<String, SubscriptionData> getSubscriptionInner() {
+ return this.rebalanceImpl.getSubscriptionInner();
+ }
+
+ public void subscribe(String topic, String subExpression) throws MQClientException {
+ try {
+ SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
+ topic, subExpression);
+ this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
+ if (this.mQClientFactory != null) {
+ this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
+ }
+ } catch (Exception e) {
+ throw new MQClientException("subscription exception", e);
+ }
+ }
+
+ public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
+ try {
+ SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
+ topic, "*");
+ subscriptionData.setSubString(fullClassName);
+ subscriptionData.setClassFilterMode(true);
+ subscriptionData.setFilterClassSource(filterClassSource);
+ this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
+ if (this.mQClientFactory != null) {
+ this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
+ }
+
+ } catch (Exception e) {
+ throw new MQClientException("subscription exception", e);
+ }
+ }
+
+ public void suspend() {
+ this.pause = true;
+ log.info("suspend this consumer, {}", this.defaultMQPushConsumer.getConsumerGroup());
+ }
+
+ public void unsubscribe(String topic) {
+ this.rebalanceImpl.getSubscriptionInner().remove(topic);
+ }
+
+ public void updateConsumeOffset(MessageQueue mq, long offset) {
+ this.offsetStore.updateOffset(mq, offset, false);
+ }
+
+ public void updateCorePoolSize(int corePoolSize) {
+ this.consumeMessageService.updateCorePoolSize(corePoolSize);
+ }
+
+ public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
+ }
+
+ public RebalanceImpl getRebalanceImpl() {
+ return rebalanceImpl;
+ }
+
+ public boolean isConsumeOrderly() {
+ return consumeOrderly;
+ }
+
+ public void setConsumeOrderly(boolean consumeOrderly) {
+ this.consumeOrderly = consumeOrderly;
+ }
+
+ public void resetOffsetByTimeStamp(long timeStamp)
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ for (String topic : rebalanceImpl.getSubscriptionInner().keySet()) {
+ Set<MessageQueue> mqs = rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
+ Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
+ if (mqs != null) {
+ for (MessageQueue mq : mqs) {
+ long offset = searchOffset(mq, timeStamp);
+ offsetTable.put(mq, offset);
+ }
+ this.mQClientFactory.resetOffset(topic, groupName(), offsetTable);
+ }
+ }
+ }
+
+ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
+ return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
+ }
+
+ @Override
+ public String groupName() {
+ return this.defaultMQPushConsumer.getConsumerGroup();
+ }
+
+ @Override
+ public MessageModel messageModel() {
+ return this.defaultMQPushConsumer.getMessageModel();
+ }
+
+ @Override
+ public ConsumeType consumeType() {
+ return ConsumeType.CONSUME_PASSIVELY;
+ }
+
+ @Override
+ public ConsumeFromWhere consumeFromWhere() {
+ return this.defaultMQPushConsumer.getConsumeFromWhere();
+ }
+
+ @Override
+ public Set<SubscriptionData> subscriptions() {
+ Set<SubscriptionData> subSet = new HashSet<SubscriptionData>();
+
+ subSet.addAll(this.rebalanceImpl.getSubscriptionInner().values());
+
+ return subSet;
+ }
+
+ @Override
+ public void doRebalance() {
+ if (this.rebalanceImpl != null && !this.pause) {
+ this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
+ }
+ }
+
+ @Override
+ public void persistConsumerOffset() {
+ try {
+ this.makeSureStateOK();
+ Set<MessageQueue> mqs = new HashSet<MessageQueue>();
+ Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
+ if (allocateMq != null) {
+ mqs.addAll(allocateMq);
+ }
+
+ this.offsetStore.persistAll(mqs);
+ } catch (Exception e) {
+ log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
+ }
+ }
+
+ @Override
+ public void updateTopicSubscribeInfo(String topic, Set<MessageQueue> info) {
+ Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
+ if (subTable != null) {
+ if (subTable.containsKey(topic)) {
+ this.rebalanceImpl.topicSubscribeInfoTable.put(topic, info);
+ }
+ }
+ }
+
+ @Override
+ public boolean isSubscribeTopicNeedUpdate(String topic) {
+ Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
+ if (subTable != null) {
+ if (subTable.containsKey(topic)) {
+ return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic);
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public boolean isUnitMode() {
+ return this.defaultMQPushConsumer.isUnitMode();
+ }
+
+ @Override
+ public ConsumerRunningInfo consumerRunningInfo() {
+ ConsumerRunningInfo info = new ConsumerRunningInfo();
+
+ Properties prop = MixAll.object2Properties(this.defaultMQPushConsumer);
+
+ prop.put(ConsumerRunningInfo.PROP_CONSUME_ORDERLY, String.valueOf(this.consumeOrderly));
+ prop.put(ConsumerRunningInfo.PROP_THREADPOOL_CORE_SIZE, String.valueOf(this.consumeMessageService.getCorePoolSize()));
+ prop.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, String.valueOf(this.consumerStartTimestamp));
+
+ info.setProperties(prop);
+
+ Set<SubscriptionData> subSet = this.subscriptions();
+ info.getSubscriptionSet().addAll(subSet);
+
+ Iterator<Entry<MessageQueue, ProcessQueue>> it = this.rebalanceImpl.getProcessQueueTable().entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<MessageQueue, ProcessQueue> next = it.next();
+ MessageQueue mq = next.getKey();
+ ProcessQueue pq = next.getValue();
+
+ ProcessQueueInfo pqinfo = new ProcessQueueInfo();
+ pqinfo.setCommitOffset(this.offsetStore.readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE));
+ pq.fillProcessQueueInfo(pqinfo);
+ info.getMqTable().put(mq, pqinfo);
+ }
+
+ for (SubscriptionData sd : subSet) {
+ ConsumeStatus consumeStatus = this.mQClientFactory.getConsumerStatsManager().consumeStatus(this.groupName(), sd.getTopic());
+ info.getStatusTable().put(sd.getTopic(), consumeStatus);
+ }
+
+ return info;
+ }
+
+ public MQClientInstance getmQClientFactory() {
+ return mQClientFactory;
+ }
+
+ public void setmQClientFactory(MQClientInstance mQClientFactory) {
+ this.mQClientFactory = mQClientFactory;
+ }
+
+ public ServiceState getServiceState() {
+ return serviceState;
+ }
+
+ public void setServiceState(ServiceState serviceState) {
+ this.serviceState = serviceState;
+ }
+
+ public void adjustThreadPool() {
+ long computeAccTotal = this.computeAccumulationTotal();
+ long adjustThreadPoolNumsThreshold = this.defaultMQPushConsumer.getAdjustThreadPoolNumsThreshold();
+
+ long incThreshold = (long) (adjustThreadPoolNumsThreshold * 1.0);
+
+ long decThreshold = (long) (adjustThreadPoolNumsThreshold * 0.8);
+
+ if (computeAccTotal >= incThreshold) {
+ this.consumeMessageService.incCorePoolSize();
+ }
+
+ if (computeAccTotal < decThreshold) {
+ this.consumeMessageService.decCorePoolSize();
+ }
+ }
+
+ private long computeAccumulationTotal() {
+ long msgAccTotal = 0;
+ ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = this.rebalanceImpl.getProcessQueueTable();
+ Iterator<Entry<MessageQueue, ProcessQueue>> it = processQueueTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<MessageQueue, ProcessQueue> next = it.next();
+ ProcessQueue value = next.getValue();
+ msgAccTotal += value.getMsgAccCnt();
+ }
+
+ return msgAccTotal;
+ }
+
+ public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic)
+ throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ List<QueueTimeSpan> queueTimeSpan = new ArrayList<QueueTimeSpan>();
+ TopicRouteData routeData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, 3000);
+ for (BrokerData brokerData : routeData.getBrokerDatas()) {
+ String addr = brokerData.selectBrokerAddr();
+ queueTimeSpan.addAll(this.mQClientFactory.getMQClientAPIImpl().queryConsumeTimeSpan(addr, topic, groupName(), 3000));
+ }
+
+ return queueTimeSpan;
+ }
+
+
+ public ConsumeMessageService getConsumeMessageService() {
+ return consumeMessageService;
+ }
+
+
+ public void setConsumeMessageService(ConsumeMessageService consumeMessageService) {
+ this.consumeMessageService = consumeMessageService;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
new file mode 100644
index 0000000..c1abd2f
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+
+import java.util.Set;
+
+
+/**
+ * Consumer inner interface
+ *
+ * @author shijia.wxr
+ */
+public interface MQConsumerInner {
+ String groupName();
+
+
+ MessageModel messageModel();
+
+
+ ConsumeType consumeType();
+
+
+ ConsumeFromWhere consumeFromWhere();
+
+
+ Set<SubscriptionData> subscriptions();
+
+
+ void doRebalance();
+
+
+ void persistConsumerOffset();
+
+
+ void updateTopicSubscribeInfo(final String topic, final Set<MessageQueue> info);
+
+
+ boolean isSubscribeTopicNeedUpdate(final String topic);
+
+
+ boolean isUnitMode();
+
+
+ ConsumerRunningInfo consumerRunningInfo();
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java
new file mode 100644
index 0000000..0849b5e
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Message lock,strictly ensure the single queue only one thread at a time consuming
+ *
+ * @author shijia.wxr
+ */
+public class MessageQueueLock {
+ private ConcurrentHashMap<MessageQueue, Object> mqLockTable =
+ new ConcurrentHashMap<MessageQueue, Object>();
+
+
+ public Object fetchLockObject(final MessageQueue mq) {
+ Object objLock = this.mqLockTable.get(mq);
+ if (null == objLock) {
+ objLock = new Object();
+ Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
+ if (prevLock != null) {
+ objLock = prevLock;
+ }
+ }
+
+ return objLock;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
new file mode 100644
index 0000000..adca859
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+
+/**
+ * Queue consumption snapshot
+ *
+ * @author shijia.wxr
+ */
+public class ProcessQueue {
+ public final static long REBALANCE_LOCK_MAX_LIVE_TIME =
+ Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000"));
+ public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
+ private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
+ private final Logger log = ClientLogger.getLog();
+ private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
+ private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
+ private final AtomicLong msgCount = new AtomicLong();
+ private final Lock lockConsume = new ReentrantLock();
+ private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long, MessageExt>();
+ private final AtomicLong tryUnlockTimes = new AtomicLong(0);
+ private volatile long queueOffsetMax = 0L;
+ private volatile boolean dropped = false;
+ private volatile long lastPullTimestamp = System.currentTimeMillis();
+ private volatile long lastConsumeTimestamp = System.currentTimeMillis();
+ private volatile boolean locked = false;
+ private volatile long lastLockTimestamp = System.currentTimeMillis();
+ private volatile boolean consuming = false;
+ private volatile long msgAccCnt = 0;
+
+ public boolean isLockExpired() {
+ boolean result = (System.currentTimeMillis() - this.lastLockTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
+ return result;
+ }
+
+
+ public boolean isPullExpired() {
+ boolean result = (System.currentTimeMillis() - this.lastPullTimestamp) > PULL_MAX_IDLE_TIME;
+ return result;
+ }
+
+ /**
+
+ *
+ * @param pushConsumer
+ */
+ public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
+ if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
+ return;
+ }
+
+ int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
+ for (int i = 0; i < loop; i++) {
+ MessageExt msg = null;
+ try {
+ this.lockTreeMap.readLock().lockInterruptibly();
+ try {
+ if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
+ msg = msgTreeMap.firstEntry().getValue();
+ } else {
+
+ break;
+ }
+ } finally {
+ this.lockTreeMap.readLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("getExpiredMsg exception", e);
+ }
+
+ try {
+
+ pushConsumer.sendMessageBack(msg, 3);
+ log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
+ try {
+ this.lockTreeMap.writeLock().lockInterruptibly();
+ try {
+ if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
+ try {
+ msgTreeMap.remove(msgTreeMap.firstKey());
+ } catch (Exception e) {
+ log.error("send expired msg exception", e);
+ }
+ }
+ } finally {
+ this.lockTreeMap.writeLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("getExpiredMsg exception", e);
+ }
+ } catch (Exception e) {
+ log.error("send expired msg exception", e);
+ }
+ }
+ }
+
+
+ public boolean putMessage(final List<MessageExt> msgs) {
+ boolean dispatchToConsume = false;
+ try {
+ this.lockTreeMap.writeLock().lockInterruptibly();
+ try {
+ int validMsgCnt = 0;
+ for (MessageExt msg : msgs) {
+ MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
+ if (null == old) {
+ validMsgCnt++;
+ this.queueOffsetMax = msg.getQueueOffset();
+ }
+ }
+ msgCount.addAndGet(validMsgCnt);
+
+ if (!msgTreeMap.isEmpty() && !this.consuming) {
+ dispatchToConsume = true;
+ this.consuming = true;
+ }
+
+ if (!msgs.isEmpty()) {
+ MessageExt messageExt = msgs.get(msgs.size() - 1);
+ String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
+ if (property != null) {
+ long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
+ if (accTotal > 0) {
+ this.msgAccCnt = accTotal;
+ }
+ }
+ }
+ } finally {
+ this.lockTreeMap.writeLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("putMessage exception", e);
+ }
+
+ return dispatchToConsume;
+ }
+
+
+ public long getMaxSpan() {
+ try {
+ this.lockTreeMap.readLock().lockInterruptibly();
+ try {
+ if (!this.msgTreeMap.isEmpty()) {
+ return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey();
+ }
+ } finally {
+ this.lockTreeMap.readLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("getMaxSpan exception", e);
+ }
+
+ return 0;
+ }
+
+
+ public long removeMessage(final List<MessageExt> msgs) {
+ long result = -1;
+ final long now = System.currentTimeMillis();
+ try {
+ this.lockTreeMap.writeLock().lockInterruptibly();
+ this.lastConsumeTimestamp = now;
+ try {
+ if (!msgTreeMap.isEmpty()) {
+ result = this.queueOffsetMax + 1;
+ int removedCnt = 0;
+ for (MessageExt msg : msgs) {
+ MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
+ if (prev != null) {
+ removedCnt--;
+ }
+ }
+ msgCount.addAndGet(removedCnt);
+
+ if (!msgTreeMap.isEmpty()) {
+ result = msgTreeMap.firstKey();
+ }
+ }
+ } finally {
+ this.lockTreeMap.writeLock().unlock();
+ }
+ } catch (Throwable t) {
+ log.error("removeMessage exception", t);
+ }
+
+ return result;
+ }
+
+
+ public TreeMap<Long, MessageExt> getMsgTreeMap() {
+ return msgTreeMap;
+ }
+
+
+ public AtomicLong getMsgCount() {
+ return msgCount;
+ }
+
+
+ public boolean isDropped() {
+ return dropped;
+ }
+
+
+ public void setDropped(boolean dropped) {
+ this.dropped = dropped;
+ }
+
+ public boolean isLocked() {
+ return locked;
+ }
+
+ public void setLocked(boolean locked) {
+ this.locked = locked;
+ }
+
+ public void rollback() {
+ try {
+ this.lockTreeMap.writeLock().lockInterruptibly();
+ try {
+ this.msgTreeMap.putAll(this.msgTreeMapTemp);
+ this.msgTreeMapTemp.clear();
+ } finally {
+ this.lockTreeMap.writeLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("rollback exception", e);
+ }
+ }
+
+
+ public long commit() {
+ try {
+ this.lockTreeMap.writeLock().lockInterruptibly();
+ try {
+ Long offset = this.msgTreeMapTemp.lastKey();
+ msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
+ this.msgTreeMapTemp.clear();
+ if (offset != null) {
+ return offset + 1;
+ }
+ } finally {
+ this.lockTreeMap.writeLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("commit exception", e);
+ }
+
+ return -1;
+ }
+
+
+ public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
+ try {
+ this.lockTreeMap.writeLock().lockInterruptibly();
+ try {
+ for (MessageExt msg : msgs) {
+ this.msgTreeMapTemp.remove(msg.getQueueOffset());
+ this.msgTreeMap.put(msg.getQueueOffset(), msg);
+ }
+ } finally {
+ this.lockTreeMap.writeLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("makeMessageToCosumeAgain exception", e);
+ }
+ }
+
+
+ public List<MessageExt> takeMessags(final int batchSize) {
+ List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
+ final long now = System.currentTimeMillis();
+ try {
+ this.lockTreeMap.writeLock().lockInterruptibly();
+ this.lastConsumeTimestamp = now;
+ try {
+ if (!this.msgTreeMap.isEmpty()) {
+ for (int i = 0; i < batchSize; i++) {
+ Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
+ if (entry != null) {
+ result.add(entry.getValue());
+ msgTreeMapTemp.put(entry.getKey(), entry.getValue());
+ } else {
+ break;
+ }
+ }
+ }
+
+ if (result.isEmpty()) {
+ consuming = false;
+ }
+ } finally {
+ this.lockTreeMap.writeLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("take Messages exception", e);
+ }
+
+ return result;
+ }
+
+
+ public boolean hasTempMessage() {
+ try {
+ this.lockTreeMap.readLock().lockInterruptibly();
+ try {
+ return !this.msgTreeMap.isEmpty();
+ } finally {
+ this.lockTreeMap.readLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ }
+
+ return true;
+ }
+
+
+ public void clear() {
+ try {
+ this.lockTreeMap.writeLock().lockInterruptibly();
+ try {
+ this.msgTreeMap.clear();
+ this.msgTreeMapTemp.clear();
+ this.msgCount.set(0);
+ this.queueOffsetMax = 0L;
+ } finally {
+ this.lockTreeMap.writeLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("rollback exception", e);
+ }
+ }
+
+
+ public long getLastLockTimestamp() {
+ return lastLockTimestamp;
+ }
+
+
+ public void setLastLockTimestamp(long lastLockTimestamp) {
+ this.lastLockTimestamp = lastLockTimestamp;
+ }
+
+
+ public Lock getLockConsume() {
+ return lockConsume;
+ }
+
+
+ public long getLastPullTimestamp() {
+ return lastPullTimestamp;
+ }
+
+
+ public void setLastPullTimestamp(long lastPullTimestamp) {
+ this.lastPullTimestamp = lastPullTimestamp;
+ }
+
+
+ public long getMsgAccCnt() {
+ return msgAccCnt;
+ }
+
+
+ public void setMsgAccCnt(long msgAccCnt) {
+ this.msgAccCnt = msgAccCnt;
+ }
+
+
+ public long getTryUnlockTimes() {
+ return this.tryUnlockTimes.get();
+ }
+
+
+ public void incTryUnlockTimes() {
+ this.tryUnlockTimes.incrementAndGet();
+ }
+
+
+ public void fillProcessQueueInfo(final ProcessQueueInfo info) {
+ try {
+ this.lockTreeMap.readLock().lockInterruptibly();
+
+ if (!this.msgTreeMap.isEmpty()) {
+ info.setCachedMsgMinOffset(this.msgTreeMap.firstKey());
+ info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey());
+ info.setCachedMsgCount(this.msgTreeMap.size());
+ }
+
+ if (!this.msgTreeMapTemp.isEmpty()) {
+ info.setTransactionMsgMinOffset(this.msgTreeMapTemp.firstKey());
+ info.setTransactionMsgMaxOffset(this.msgTreeMapTemp.lastKey());
+ info.setTransactionMsgCount(this.msgTreeMapTemp.size());
+ }
+
+ info.setLocked(this.locked);
+ info.setTryUnlockTimes(this.tryUnlockTimes.get());
+ info.setLastLockTimestamp(this.lastLockTimestamp);
+
+ info.setDroped(this.dropped);
+ info.setLastPullTimestamp(this.lastPullTimestamp);
+ info.setLastConsumeTimestamp(this.lastConsumeTimestamp);
+ } catch (Exception e) {
+ } finally {
+ this.lockTreeMap.readLock().unlock();
+ }
+ }
+
+
+ public long getLastConsumeTimestamp() {
+ return lastConsumeTimestamp;
+ }
+
+
+ public void setLastConsumeTimestamp(long lastConsumeTimestamp) {
+ this.lastConsumeTimestamp = lastConsumeTimestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
new file mode 100644
index 0000000..05aa8d1
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.client.consumer.PullCallback;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.hook.FilterMessageContext;
+import org.apache.rocketmq.client.hook.FilterMessageHook;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class PullAPIWrapper {
+ private final Logger log = ClientLogger.getLog();
+ private final MQClientInstance mQClientFactory;
+ private final String consumerGroup;
+ private final boolean unitMode;
+ private ConcurrentHashMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
+ new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
+ private volatile boolean connectBrokerByUser = false;
+ private volatile long defaultBrokerId = MixAll.MASTER_ID;
+ private Random random = new Random(System.currentTimeMillis());
+ private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
+
+ public PullAPIWrapper(MQClientInstance mQClientFactory, String consumerGroup, boolean unitMode) {
+ this.mQClientFactory = mQClientFactory;
+ this.consumerGroup = consumerGroup;
+ this.unitMode = unitMode;
+ }
+
+ public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
+ final SubscriptionData subscriptionData) {
+ PullResultExt pullResultExt = (PullResultExt) pullResult;
+
+ this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
+ if (PullStatus.FOUND == pullResult.getPullStatus()) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
+ List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
+
+ List<MessageExt> msgListFilterAgain = msgList;
+ if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
+ msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
+ for (MessageExt msg : msgList) {
+ if (msg.getTags() != null) {
+ if (subscriptionData.getTagsSet().contains(msg.getTags())) {
+ msgListFilterAgain.add(msg);
+ }
+ }
+ }
+ }
+
+ if (this.hasHook()) {
+ FilterMessageContext filterMessageContext = new FilterMessageContext();
+ filterMessageContext.setUnitMode(unitMode);
+ filterMessageContext.setMsgList(msgListFilterAgain);
+ this.executeHook(filterMessageContext);
+ }
+
+ for (MessageExt msg : msgListFilterAgain) {
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
+ Long.toString(pullResult.getMinOffset()));
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
+ Long.toString(pullResult.getMaxOffset()));
+ }
+
+ pullResultExt.setMsgFoundList(msgListFilterAgain);
+ }
+
+ pullResultExt.setMessageBinary(null);
+
+ return pullResult;
+ }
+
+ public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {
+ AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
+ if (null == suggest) {
+ this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
+ } else {
+ suggest.set(brokerId);
+ }
+ }
+
+ public boolean hasHook() {
+ return !this.filterMessageHookList.isEmpty();
+ }
+
+ public void executeHook(final FilterMessageContext context) {
+ if (!this.filterMessageHookList.isEmpty()) {
+ for (FilterMessageHook hook : this.filterMessageHookList) {
+ try {
+ hook.filterMessage(context);
+ } catch (Throwable e) {
+ log.error("execute hook error. hookName={}", hook.hookName());
+ }
+ }
+ }
+ }
+
+ public PullResult pullKernelImpl(
+ final MessageQueue mq,
+ final String subExpression,
+ final long subVersion,
+ final long offset,
+ final int maxNums,
+ final int sysFlag,
+ final long commitOffset,
+ final long brokerSuspendMaxTimeMillis,
+ final long timeoutMillis,
+ final CommunicationMode communicationMode,
+ final PullCallback pullCallback
+ ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ FindBrokerResult findBrokerResult =
+ this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
+ this.recalculatePullFromWhichNode(mq), false);
+ if (null == findBrokerResult) {
+ this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
+ findBrokerResult =
+ this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
+ this.recalculatePullFromWhichNode(mq), false);
+ }
+
+ if (findBrokerResult != null) {
+ int sysFlagInner = sysFlag;
+
+ if (findBrokerResult.isSlave()) {
+ sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
+ }
+
+ PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
+ requestHeader.setConsumerGroup(this.consumerGroup);
+ requestHeader.setTopic(mq.getTopic());
+ requestHeader.setQueueId(mq.getQueueId());
+ requestHeader.setQueueOffset(offset);
+ requestHeader.setMaxMsgNums(maxNums);
+ requestHeader.setSysFlag(sysFlagInner);
+ requestHeader.setCommitOffset(commitOffset);
+ requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
+ requestHeader.setSubscription(subExpression);
+ requestHeader.setSubVersion(subVersion);
+
+ String brokerAddr = findBrokerResult.getBrokerAddr();
+ if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
+ brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
+ }
+
+ PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
+ brokerAddr,
+ requestHeader,
+ timeoutMillis,
+ communicationMode,
+ pullCallback);
+
+ return pullResult;
+ }
+
+ throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
+ }
+
+ public long recalculatePullFromWhichNode(final MessageQueue mq) {
+ if (this.isConnectBrokerByUser()) {
+ return this.defaultBrokerId;
+ }
+
+ AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
+ if (suggest != null) {
+ return suggest.get();
+ }
+
+ return MixAll.MASTER_ID;
+ }
+
+ private String computPullFromWhichFilterServer(final String topic, final String brokerAddr)
+ throws MQClientException {
+ ConcurrentHashMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable();
+ if (topicRouteTable != null) {
+ TopicRouteData topicRouteData = topicRouteTable.get(topic);
+ List<String> list = topicRouteData.getFilterServerTable().get(brokerAddr);
+
+ if (list != null && !list.isEmpty()) {
+ return list.get(randomNum() % list.size());
+ }
+ }
+
+ throw new MQClientException("Find Filter Server Failed, Broker Addr: " + brokerAddr + " topic: "
+ + topic, null);
+ }
+
+ public boolean isConnectBrokerByUser() {
+ return connectBrokerByUser;
+ }
+
+ public int randomNum() {
+ int value = random.nextInt();
+ if (value < 0) {
+ value = Math.abs(value);
+ if (value < 0)
+ value = 0;
+ }
+ return value;
+ }
+
+ public void setConnectBrokerByUser(boolean connectBrokerByUser) {
+ this.connectBrokerByUser = connectBrokerByUser;
+
+ }
+
+ public void registerFilterMessageHook(ArrayList<FilterMessageHook> filterMessageHookList) {
+ this.filterMessageHookList = filterMessageHookList;
+ }
+
+ public long getDefaultBrokerId() {
+ return defaultBrokerId;
+ }
+
+ public void setDefaultBrokerId(long defaultBrokerId) {
+ this.defaultBrokerId = defaultBrokerId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
new file mode 100644
index 0000000..9f79543
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.ServiceThread;
+import org.slf4j.Logger;
+
+import java.util.concurrent.*;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class PullMessageService extends ServiceThread {
+ private final Logger log = ClientLogger.getLog();
+ private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
+ private final MQClientInstance mQClientFactory;
+ private final ScheduledExecutorService scheduledExecutorService = Executors
+ .newSingleThreadScheduledExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "PullMessageServiceScheduledThread");
+ }
+ });
+
+ public PullMessageService(MQClientInstance mQClientFactory) {
+ this.mQClientFactory = mQClientFactory;
+ }
+
+ public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
+ this.scheduledExecutorService.schedule(new Runnable() {
+
+ @Override
+ public void run() {
+ PullMessageService.this.executePullRequestImmediately(pullRequest);
+ }
+ }, timeDelay, TimeUnit.MILLISECONDS);
+ }
+
+ public void executePullRequestImmediately(final PullRequest pullRequest) {
+ try {
+ this.pullRequestQueue.put(pullRequest);
+ } catch (InterruptedException e) {
+ log.error("executePullRequestImmediately pullRequestQueue.put", e);
+ }
+ }
+
+ public void executeTaskLater(final Runnable r, final long timeDelay) {
+ this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS);
+ }
+
+ public ScheduledExecutorService getScheduledExecutorService() {
+ return scheduledExecutorService;
+ }
+
+ private void pullMessage(final PullRequest pullRequest) {
+ final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
+ if (consumer != null) {
+ DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
+ impl.pullMessage(pullRequest);
+ } else {
+ log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
+ }
+ }
+
+
+ @Override
+ public void run() {
+ log.info(this.getServiceName() + " service started");
+
+ while (!this.isStopped()) {
+ try {
+ PullRequest pullRequest = this.pullRequestQueue.take();
+ if (pullRequest != null) {
+ this.pullMessage(pullRequest);
+ }
+ } catch (InterruptedException e) {
+ } catch (Exception e) {
+ log.error("Pull Message Service Run Method exception", e);
+ }
+ }
+
+ log.info(this.getServiceName() + " service end");
+ }
+
+
+ @Override
+ public String getServiceName() {
+ return PullMessageService.class.getSimpleName();
+ }
+
+
+}