You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:14:16 UTC
[25/99] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
new file mode 100644
index 0000000..8700ef1
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl;
+
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.remoting.RPCHook;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MQClientManager {
+ private static MQClientManager instance = new MQClientManager();
+ private AtomicInteger factoryIndexGenerator = new AtomicInteger();
+ private ConcurrentHashMap<String/* clientId */, MQClientInstance> factoryTable =
+ new ConcurrentHashMap<String, MQClientInstance>();
+
+
+ private MQClientManager() {
+
+ }
+
+
+ public static MQClientManager getInstance() {
+ return instance;
+ }
+
+ public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig) {
+ return getAndCreateMQClientInstance(clientConfig, null);
+ }
+
+ public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
+ String clientId = clientConfig.buildMQClientId();
+ MQClientInstance instance = this.factoryTable.get(clientId);
+ if (null == instance) {
+ instance =
+ new MQClientInstance(clientConfig.cloneClientConfig(),
+ this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
+ MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
+ if (prev != null) {
+ instance = prev;
+ } else {
+ // TODO log
+ }
+ }
+
+ return instance;
+ }
+
+ public void removeClientFactory(final String clientId) {
+ this.factoryTable.remove(clientId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
new file mode 100644
index 0000000..88fe25f
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -0,0 +1,471 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.hook.ConsumeMessageContext;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.stat.ConsumerStatsManager;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.CMResult;
+import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.slf4j.Logger;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
+ private static final Logger log = ClientLogger.getLog();
+ private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
+ private final DefaultMQPushConsumer defaultMQPushConsumer;
+ private final MessageListenerConcurrently messageListener;
+ private final BlockingQueue<Runnable> consumeRequestQueue;
+ private final ThreadPoolExecutor consumeExecutor;
+ private final String consumerGroup;
+
+ private final ScheduledExecutorService scheduledExecutorService;
+ private final ScheduledExecutorService cleanExpireMsgExecutors;
+
+
+ public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
+ MessageListenerConcurrently messageListener) {
+ this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
+ this.messageListener = messageListener;
+
+ this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
+ this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
+ this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
+
+ this.consumeExecutor = new ThreadPoolExecutor(//
+ this.defaultMQPushConsumer.getConsumeThreadMin(), //
+ this.defaultMQPushConsumer.getConsumeThreadMax(), //
+ 1000 * 60, //
+ TimeUnit.MILLISECONDS, //
+ this.consumeRequestQueue, //
+ new ThreadFactoryImpl("ConsumeMessageThread_"));
+
+ this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
+ this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
+ }
+
+
+ public void start() {
+ this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
+
+ @Override
+ public void run() {
+ cleanExpireMsg();
+ }
+
+ }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
+ }
+
+
+ public void shutdown() {
+ this.scheduledExecutorService.shutdown();
+ this.consumeExecutor.shutdown();
+ this.cleanExpireMsgExecutors.shutdown();
+ }
+
+ @Override
+ public void updateCorePoolSize(int corePoolSize) {
+ if (corePoolSize > 0 //
+ && corePoolSize <= Short.MAX_VALUE //
+ && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
+ this.consumeExecutor.setCorePoolSize(corePoolSize);
+ }
+ }
+
+ @Override
+ public void incCorePoolSize() {
+ // long corePoolSize = this.consumeExecutor.getCorePoolSize();
+ // if (corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax())
+ // {
+ // this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize()
+ // + 1);
+ // }
+ //
+ // log.info("incCorePoolSize Concurrently from {} to {}, ConsumerGroup:
+ // {}", //
+ // corePoolSize,//
+ // this.consumeExecutor.getCorePoolSize(),//
+ // this.consumerGroup);
+ }
+
+ @Override
+ public void decCorePoolSize() {
+ // long corePoolSize = this.consumeExecutor.getCorePoolSize();
+ // if (corePoolSize > this.defaultMQPushConsumer.getConsumeThreadMin())
+ // {
+ // this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize()
+ // - 1);
+ // }
+ //
+ // log.info("decCorePoolSize Concurrently from {} to {}, ConsumerGroup:
+ // {}", //
+ // corePoolSize,//
+ // this.consumeExecutor.getCorePoolSize(),//
+ // this.consumerGroup);
+ }
+
+ @Override
+ public int getCorePoolSize() {
+ return this.consumeExecutor.getCorePoolSize();
+ }
+
+ @Override
+ public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String brokerName) {
+ ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult();
+ result.setOrder(false);
+ result.setAutoCommit(true);
+
+ List<MessageExt> msgs = new ArrayList<MessageExt>();
+ msgs.add(msg);
+ MessageQueue mq = new MessageQueue();
+ mq.setBrokerName(brokerName);
+ mq.setTopic(msg.getTopic());
+ mq.setQueueId(msg.getQueueId());
+
+ ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(mq);
+
+ this.resetRetryTopic(msgs);
+
+ final long beginTime = System.currentTimeMillis();
+
+ log.info("consumeMessageDirectly receive new messge: {}", msg);
+
+ try {
+ ConsumeConcurrentlyStatus status = this.messageListener.consumeMessage(msgs, context);
+ if (status != null) {
+ switch (status) {
+ case CONSUME_SUCCESS:
+ result.setConsumeResult(CMResult.CR_SUCCESS);
+ break;
+ case RECONSUME_LATER:
+ result.setConsumeResult(CMResult.CR_LATER);
+ break;
+ default:
+ break;
+ }
+ } else {
+ result.setConsumeResult(CMResult.CR_RETURN_NULL);
+ }
+ } catch (Throwable e) {
+ result.setConsumeResult(CMResult.CR_THROW_EXCEPTION);
+ result.setRemark(RemotingHelper.exceptionSimpleDesc(e));
+
+ log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", //
+ RemotingHelper.exceptionSimpleDesc(e), //
+ ConsumeMessageConcurrentlyService.this.consumerGroup, //
+ msgs, //
+ mq), e);
+ }
+
+ result.setSpentTimeMills(System.currentTimeMillis() - beginTime);
+
+ log.info("consumeMessageDirectly Result: {}", result);
+
+ return result;
+ }
+
+ @Override
+ public void submitConsumeRequest(//
+ final List<MessageExt> msgs, //
+ final ProcessQueue processQueue, //
+ final MessageQueue messageQueue, //
+ final boolean dispatchToConsume) {
+ final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
+ if (msgs.size() <= consumeBatchSize) {
+ ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
+ try {
+ this.consumeExecutor.submit(consumeRequest);
+ } catch (RejectedExecutionException e) {
+ this.submitConsumeRequestLater(consumeRequest);
+ }
+ } else {
+ for (int total = 0; total < msgs.size(); ) {
+ List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
+ for (int i = 0; i < consumeBatchSize; i++, total++) {
+ if (total < msgs.size()) {
+ msgThis.add(msgs.get(total));
+ } else {
+ break;
+ }
+ }
+
+ ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
+ try {
+ this.consumeExecutor.submit(consumeRequest);
+ } catch (RejectedExecutionException e) {
+ for (; total < msgs.size(); total++) {
+ msgThis.add(msgs.get(total));
+ }
+
+ this.submitConsumeRequestLater(consumeRequest);
+ }
+ }
+ }
+ }
+
+ public void resetRetryTopic(final List<MessageExt> msgs) {
+ final String groupTopic = MixAll.getRetryTopic(consumerGroup);
+ for (MessageExt msg : msgs) {
+ String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
+ if (retryTopic != null && groupTopic.equals(msg.getTopic())) {
+ msg.setTopic(retryTopic);
+ }
+ }
+ }
+
+ private void cleanExpireMsg() {
+ Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
+ this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<MessageQueue, ProcessQueue> next = it.next();
+ ProcessQueue pq = next.getValue();
+ pq.cleanExpiredMsg(this.defaultMQPushConsumer);
+ }
+ }
+
+ public void processConsumeResult(//
+ final ConsumeConcurrentlyStatus status, //
+ final ConsumeConcurrentlyContext context, //
+ final ConsumeRequest consumeRequest//
+ ) {
+ int ackIndex = context.getAckIndex();
+
+ if (consumeRequest.getMsgs().isEmpty())
+ return;
+
+ switch (status) {
+ case CONSUME_SUCCESS:
+ if (ackIndex >= consumeRequest.getMsgs().size()) {
+ ackIndex = consumeRequest.getMsgs().size() - 1;
+ }
+ int ok = ackIndex + 1;
+ int failed = consumeRequest.getMsgs().size() - ok;
+ this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
+ this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
+ break;
+ case RECONSUME_LATER:
+ ackIndex = -1;
+ this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
+ consumeRequest.getMsgs().size());
+ break;
+ default:
+ break;
+ }
+
+ switch (this.defaultMQPushConsumer.getMessageModel()) {
+ case BROADCASTING:
+ for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
+ MessageExt msg = consumeRequest.getMsgs().get(i);
+ log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
+ }
+ break;
+ case CLUSTERING:
+ List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
+ for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
+ MessageExt msg = consumeRequest.getMsgs().get(i);
+ boolean result = this.sendMessageBack(msg, context);
+ if (!result) {
+ msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
+ msgBackFailed.add(msg);
+ }
+ }
+
+ if (!msgBackFailed.isEmpty()) {
+ consumeRequest.getMsgs().removeAll(msgBackFailed);
+
+ this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
+ }
+ break;
+ default:
+ break;
+ }
+
+ long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
+ if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
+ this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
+ }
+ }
+
+ public ConsumerStatsManager getConsumerStatsManager() {
+ return this.defaultMQPushConsumerImpl.getConsumerStatsManager();
+ }
+
+ public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
+ int delayLevel = context.getDelayLevelWhenNextConsume();
+
+ try {
+ this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
+ return true;
+ } catch (Exception e) {
+ log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
+ }
+
+ return false;
+ }
+
+ private void submitConsumeRequestLater(//
+ final List<MessageExt> msgs, //
+ final ProcessQueue processQueue, //
+ final MessageQueue messageQueue//
+ ) {
+
+ this.scheduledExecutorService.schedule(new Runnable() {
+
+ @Override
+ public void run() {
+ ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue, true);
+ }
+ }, 5000, TimeUnit.MILLISECONDS);
+ }
+
+ private void submitConsumeRequestLater(final ConsumeRequest consumeRequest//
+ ) {
+
+ this.scheduledExecutorService.schedule(new Runnable() {
+
+ @Override
+ public void run() {
+ ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
+ }
+ }, 5000, TimeUnit.MILLISECONDS);
+ }
+
+ class ConsumeRequest implements Runnable {
+ private final List<MessageExt> msgs;
+ private final ProcessQueue processQueue;
+ private final MessageQueue messageQueue;
+
+
+ public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {
+ this.msgs = msgs;
+ this.processQueue = processQueue;
+ this.messageQueue = messageQueue;
+ }
+
+ public List<MessageExt> getMsgs() {
+ return msgs;
+ }
+
+ public ProcessQueue getProcessQueue() {
+ return processQueue;
+ }
+
+ @Override
+ public void run() {
+ if (this.processQueue.isDropped()) {
+ log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
+ return;
+ }
+
+ MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
+ ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
+ ConsumeConcurrentlyStatus status = null;
+
+ ConsumeMessageContext consumeMessageContext = null;
+ if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
+ consumeMessageContext = new ConsumeMessageContext();
+ consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
+ consumeMessageContext.setProps(new HashMap<String, String>());
+ consumeMessageContext.setMq(messageQueue);
+ consumeMessageContext.setMsgList(msgs);
+ consumeMessageContext.setSuccess(false);
+ ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
+ }
+
+ long beginTimestamp = System.currentTimeMillis();
+ boolean hasException = false;
+ ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
+ try {
+ ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
+ if (msgs != null && !msgs.isEmpty()) {
+ for (MessageExt msg : msgs) {
+ MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
+ }
+ }
+ status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
+ } catch (Throwable e) {
+ log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
+ RemotingHelper.exceptionSimpleDesc(e), //
+ ConsumeMessageConcurrentlyService.this.consumerGroup,
+ msgs,
+ messageQueue);
+ hasException = true;
+ }
+ long consumeRT = System.currentTimeMillis() - beginTimestamp;
+ if (null == status) {
+ if (hasException) {
+ returnType = ConsumeReturnType.EXCEPTION;
+ } else {
+ returnType = ConsumeReturnType.RETURNNULL;
+ }
+ } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
+ returnType = ConsumeReturnType.TIME_OUT;
+ } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
+ returnType = ConsumeReturnType.FAILED;
+ } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
+ returnType = ConsumeReturnType.SUCCESS;
+ }
+ consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
+ if (null == status) {
+ log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
+ ConsumeMessageConcurrentlyService.this.consumerGroup,
+ msgs,
+ messageQueue);
+ status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
+ }
+
+ if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
+ consumeMessageContext.setStatus(status.toString());
+ consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
+ ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
+ }
+
+ ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
+ .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
+
+ if (!processQueue.isDropped()) {
+ ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
+ } else {
+ log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
+ }
+ }
+
+ public MessageQueue getMessageQueue() {
+ return messageQueue;
+ }
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
new file mode 100644
index 0000000..010fd2f
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -0,0 +1,539 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
+import org.apache.rocketmq.client.hook.ConsumeMessageContext;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.stat.ConsumerStatsManager;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.protocol.body.CMResult;
+import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.*;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumeMessageOrderlyService implements ConsumeMessageService {
+ private static final Logger log = ClientLogger.getLog();
+ private final static long MAX_TIME_CONSUME_CONTINUOUSLY =
+ Long.parseLong(System.getProperty("rocketmq.client.maxTimeConsumeContinuously", "60000"));
+ private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
+ private final DefaultMQPushConsumer defaultMQPushConsumer;
+ private final MessageListenerOrderly messageListener;
+ private final BlockingQueue<Runnable> consumeRequestQueue;
+ private final ThreadPoolExecutor consumeExecutor;
+ private final String consumerGroup;
+ private final MessageQueueLock messageQueueLock = new MessageQueueLock();
+ private final ScheduledExecutorService scheduledExecutorService;
+ private volatile boolean stopped = false;
+
+
+ public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerOrderly messageListener) {
+ this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
+ this.messageListener = messageListener;
+
+ this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
+ this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
+ this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
+
+ this.consumeExecutor = new ThreadPoolExecutor(//
+ this.defaultMQPushConsumer.getConsumeThreadMin(), //
+ this.defaultMQPushConsumer.getConsumeThreadMax(), //
+ 1000 * 60, //
+ TimeUnit.MILLISECONDS, //
+ this.consumeRequestQueue, //
+ new ThreadFactoryImpl("ConsumeMessageThread_"));
+
+ this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
+ }
+
+
+ public void start() {
+ if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ ConsumeMessageOrderlyService.this.lockMQPeriodically();
+ }
+ }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+ }
+
+
+ public void shutdown() {
+ this.stopped = true;
+ this.scheduledExecutorService.shutdown();
+ this.consumeExecutor.shutdown();
+ if (MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
+ this.unlockAllMQ();
+ }
+ }
+
+
+ public synchronized void unlockAllMQ() {
+ this.defaultMQPushConsumerImpl.getRebalanceImpl().unlockAll(false);
+ }
+
+ @Override
+ public void updateCorePoolSize(int corePoolSize) {
+ if (corePoolSize > 0 //
+ && corePoolSize <= Short.MAX_VALUE //
+ && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
+ this.consumeExecutor.setCorePoolSize(corePoolSize);
+ }
+ }
+
+ @Override
+ public void incCorePoolSize() {
+ }
+
+ @Override
+ public void decCorePoolSize() {
+ }
+
+ @Override
+ public int getCorePoolSize() {
+ return this.consumeExecutor.getCorePoolSize();
+ }
+
+ @Override
+ public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String brokerName) {
+ ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult();
+ result.setOrder(true);
+
+ List<MessageExt> msgs = new ArrayList<MessageExt>();
+ msgs.add(msg);
+ MessageQueue mq = new MessageQueue();
+ mq.setBrokerName(brokerName);
+ mq.setTopic(msg.getTopic());
+ mq.setQueueId(msg.getQueueId());
+
+ ConsumeOrderlyContext context = new ConsumeOrderlyContext(mq);
+
+ final long beginTime = System.currentTimeMillis();
+
+ log.info("consumeMessageDirectly receive new messge: {}", msg);
+
+ try {
+ ConsumeOrderlyStatus status = this.messageListener.consumeMessage(msgs, context);
+ if (status != null) {
+ switch (status) {
+ case COMMIT:
+ result.setConsumeResult(CMResult.CR_COMMIT);
+ break;
+ case ROLLBACK:
+ result.setConsumeResult(CMResult.CR_ROLLBACK);
+ break;
+ case SUCCESS:
+ result.setConsumeResult(CMResult.CR_SUCCESS);
+ break;
+ case SUSPEND_CURRENT_QUEUE_A_MOMENT:
+ result.setConsumeResult(CMResult.CR_LATER);
+ break;
+ default:
+ break;
+ }
+ } else {
+ result.setConsumeResult(CMResult.CR_RETURN_NULL);
+ }
+ } catch (Throwable e) {
+ result.setConsumeResult(CMResult.CR_THROW_EXCEPTION);
+ result.setRemark(RemotingHelper.exceptionSimpleDesc(e));
+
+ log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", //
+ RemotingHelper.exceptionSimpleDesc(e), //
+ ConsumeMessageOrderlyService.this.consumerGroup, //
+ msgs, //
+ mq), e);
+ }
+
+ result.setAutoCommit(context.isAutoCommit());
+ result.setSpentTimeMills(System.currentTimeMillis() - beginTime);
+
+ log.info("consumeMessageDirectly Result: {}", result);
+
+ return result;
+ }
+
+ @Override
+ public void submitConsumeRequest(//
+ final List<MessageExt> msgs, //
+ final ProcessQueue processQueue, //
+ final MessageQueue messageQueue, //
+ final boolean dispathToConsume) {
+ if (dispathToConsume) {
+ ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
+ this.consumeExecutor.submit(consumeRequest);
+ }
+ }
+
+ public synchronized void lockMQPeriodically() {
+ if (!this.stopped) {
+ this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
+ }
+ }
+
+ public void tryLockLaterAndReconsume(final MessageQueue mq, final ProcessQueue processQueue, final long delayMills) {
+ this.scheduledExecutorService.schedule(new Runnable() {
+ @Override
+ public void run() {
+ boolean lockOK = ConsumeMessageOrderlyService.this.lockOneMQ(mq);
+ if (lockOK) {
+ ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, mq, 10);
+ } else {
+ ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, mq, 3000);
+ }
+ }
+ }, delayMills, TimeUnit.MILLISECONDS);
+ }
+
+ public synchronized boolean lockOneMQ(final MessageQueue mq) {
+ if (!this.stopped) {
+ return this.defaultMQPushConsumerImpl.getRebalanceImpl().lock(mq);
+ }
+
+ return false;
+ }
+
+ private void submitConsumeRequestLater(//
+ final ProcessQueue processQueue, //
+ final MessageQueue messageQueue, //
+ final long suspendTimeMillis//
+ ) {
+ long timeMillis = suspendTimeMillis;
+ if (timeMillis == -1) {
+ timeMillis = this.defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis();
+ }
+
+ if (timeMillis < 10) {
+ timeMillis = 10;
+ } else if (timeMillis > 30000) {
+ timeMillis = 30000;
+ }
+
+ this.scheduledExecutorService.schedule(new Runnable() {
+
+ @Override
+ public void run() {
+ ConsumeMessageOrderlyService.this.submitConsumeRequest(null, processQueue, messageQueue, true);
+ }
+ }, timeMillis, TimeUnit.MILLISECONDS);
+ }
+
+ public boolean processConsumeResult(//
+ final List<MessageExt> msgs, //
+ final ConsumeOrderlyStatus status, //
+ final ConsumeOrderlyContext context, //
+ final ConsumeRequest consumeRequest//
+ ) {
+ boolean continueConsume = true;
+ long commitOffset = -1L;
+ if (context.isAutoCommit()) {
+ switch (status) {
+ case COMMIT:
+ case ROLLBACK:
+ log.warn("the message queue consume result is illegal, we think you want to ack these message {}",
+ consumeRequest.getMessageQueue());
+ case SUCCESS:
+ commitOffset = consumeRequest.getProcessQueue().commit();
+ this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
+ break;
+ case SUSPEND_CURRENT_QUEUE_A_MOMENT:
+ this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
+ if (checkReconsumeTimes(msgs)) {
+ consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
+ this.submitConsumeRequestLater(//
+ consumeRequest.getProcessQueue(), //
+ consumeRequest.getMessageQueue(), //
+ context.getSuspendCurrentQueueTimeMillis());
+ continueConsume = false;
+ } else {
+ commitOffset = consumeRequest.getProcessQueue().commit();
+ }
+ break;
+ default:
+ break;
+ }
+ } else {
+ switch (status) {
+ case SUCCESS:
+ this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
+ break;
+ case COMMIT:
+ commitOffset = consumeRequest.getProcessQueue().commit();
+ break;
+ case ROLLBACK:
+ consumeRequest.getProcessQueue().rollback();
+ this.submitConsumeRequestLater(//
+ consumeRequest.getProcessQueue(), //
+ consumeRequest.getMessageQueue(), //
+ context.getSuspendCurrentQueueTimeMillis());
+ continueConsume = false;
+ break;
+ case SUSPEND_CURRENT_QUEUE_A_MOMENT:
+ this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
+ if (checkReconsumeTimes(msgs)) {
+ consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
+ this.submitConsumeRequestLater(//
+ consumeRequest.getProcessQueue(), //
+ consumeRequest.getMessageQueue(), //
+ context.getSuspendCurrentQueueTimeMillis());
+ continueConsume = false;
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
+ this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
+ }
+
+ return continueConsume;
+ }
+
+ public ConsumerStatsManager getConsumerStatsManager() {
+ return this.defaultMQPushConsumerImpl.getConsumerStatsManager();
+ }
+
+ private int getMaxReconsumeTimes() {
+ // default reconsume times: Integer.MAX_VALUE
+ if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
+ return Integer.MAX_VALUE;
+ } else {
+ return this.defaultMQPushConsumer.getMaxReconsumeTimes();
+ }
+ }
+
+ private boolean checkReconsumeTimes(List<MessageExt> msgs) {
+ boolean suspend = false;
+ if (msgs != null && !msgs.isEmpty()) {
+ for (MessageExt msg : msgs) {
+ if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) {
+ MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes()));
+ if (!sendMessageBack(msg)) {
+ suspend = true;
+ msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
+ }
+ } else {
+ suspend = true;
+ msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
+ }
+ }
+ }
+ return suspend;
+ }
+
+ public boolean sendMessageBack(final MessageExt msg) {
+ try {
+ // max reconsume times exceeded then send to dead letter queue.
+ Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
+ String originMsgId = MessageAccessor.getOriginMessageId(msg);
+ MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
+ newMsg.setFlag(msg.getFlag());
+ MessageAccessor.setProperties(newMsg, msg.getProperties());
+ MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
+ MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));
+ MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
+ newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
+
+ this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
+ return true;
+ } catch (Exception e) {
+ log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
+ }
+
+ return false;
+ }
+
+ class ConsumeRequest implements Runnable {
+ private final ProcessQueue processQueue;
+ private final MessageQueue messageQueue;
+
+
+ public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
+ this.processQueue = processQueue;
+ this.messageQueue = messageQueue;
+ }
+
+ public ProcessQueue getProcessQueue() {
+ return processQueue;
+ }
+
+ public MessageQueue getMessageQueue() {
+ return messageQueue;
+ }
+
+ @Override
+ public void run() {
+ if (this.processQueue.isDropped()) {
+ log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
+ return;
+ }
+
+ final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
+ synchronized (objLock) {
+ if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
+ || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
+ final long beginTime = System.currentTimeMillis();
+ for (boolean continueConsume = true; continueConsume; ) {
+ if (this.processQueue.isDropped()) {
+ log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
+ break;
+ }
+
+ if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
+ && !this.processQueue.isLocked()) {
+ log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
+ ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
+ break;
+ }
+
+ if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
+ && this.processQueue.isLockExpired()) {
+ log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
+ ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
+ break;
+ }
+
+ long interval = System.currentTimeMillis() - beginTime;
+ if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
+ ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
+ break;
+ }
+
+ final int consumeBatchSize =
+ ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
+
+ List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
+ if (!msgs.isEmpty()) {
+ final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
+
+ ConsumeOrderlyStatus status = null;
+
+ ConsumeMessageContext consumeMessageContext = null;
+ if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
+ consumeMessageContext = new ConsumeMessageContext();
+ consumeMessageContext
+ .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
+ consumeMessageContext.setMq(messageQueue);
+ consumeMessageContext.setMsgList(msgs);
+ consumeMessageContext.setSuccess(false);
+ // init the consume context type
+ consumeMessageContext.setProps(new HashMap<String, String>());
+ ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
+ }
+
+ long beginTimestamp = System.currentTimeMillis();
+ ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
+ boolean hasException = false;
+ try {
+ this.processQueue.getLockConsume().lock();
+ if (this.processQueue.isDropped()) {
+ log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
+ this.messageQueue);
+ break;
+ }
+
+ status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
+ } catch (Throwable e) {
+ log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //
+ RemotingHelper.exceptionSimpleDesc(e), //
+ ConsumeMessageOrderlyService.this.consumerGroup, //
+ msgs, //
+ messageQueue);
+ hasException = true;
+ } finally {
+ this.processQueue.getLockConsume().unlock();
+ }
+
+ if (null == status //
+ || ConsumeOrderlyStatus.ROLLBACK == status//
+ || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
+ log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", //
+ ConsumeMessageOrderlyService.this.consumerGroup, //
+ msgs, //
+ messageQueue);
+ }
+
+ long consumeRT = System.currentTimeMillis() - beginTimestamp;
+ if (null == status) {
+ if (hasException) {
+ returnType = ConsumeReturnType.EXCEPTION;
+ } else {
+ returnType = ConsumeReturnType.RETURNNULL;
+ }
+ } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
+ returnType = ConsumeReturnType.TIME_OUT;
+ } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
+ returnType = ConsumeReturnType.FAILED;
+ } else if (ConsumeOrderlyStatus.SUCCESS == status) {
+ returnType = ConsumeReturnType.SUCCESS;
+ }
+ consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
+ if (null == status) {
+ status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+ }
+
+ if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
+ consumeMessageContext.setStatus(status.toString());
+ consumeMessageContext
+ .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
+ ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
+ }
+
+ ConsumeMessageOrderlyService.this.getConsumerStatsManager()
+ .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
+
+ continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
+ } else {
+ continueConsume = false;
+ }
+ }
+ } else {
+ if (this.processQueue.isDropped()) {
+ log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
+ return;
+ }
+
+ ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
+ }
+ }
+ }
+
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
new file mode 100644
index 0000000..86529ee
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface ConsumeMessageService {
+ void start();
+
+
+ void shutdown();
+
+
+ void updateCorePoolSize(int corePoolSize);
+
+
+ void incCorePoolSize();
+
+
+ void decCorePoolSize();
+
+
+ int getCorePoolSize();
+
+
+ ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
+
+
+ void submitConsumeRequest(//
+ final List<MessageExt> msgs, //
+ final ProcessQueue processQueue, //
+ final MessageQueue messageQueue, //
+ final boolean dispathToConsume);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
new file mode 100644
index 0000000..affb652
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -0,0 +1,706 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.client.QueryResult;
+import org.apache.rocketmq.client.Validators;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullCallback;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
+import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.hook.ConsumeMessageContext;
+import org.apache.rocketmq.client.hook.ConsumeMessageHook;
+import org.apache.rocketmq.client.hook.FilterMessageHook;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ServiceState;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.filter.FilterAPI;
+import org.apache.rocketmq.common.help.FAQUrl;
+import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DefaultMQPullConsumerImpl implements MQConsumerInner {
+ private final Logger log = ClientLogger.getLog();
+ private final DefaultMQPullConsumer defaultMQPullConsumer;
+ private final long consumerStartTimestamp = System.currentTimeMillis();
+ private final RPCHook rpcHook;
+ private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
+ private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
+ private ServiceState serviceState = ServiceState.CREATE_JUST;
+ private MQClientInstance mQClientFactory;
+ private PullAPIWrapper pullAPIWrapper;
+ private OffsetStore offsetStore;
+ private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);
+
+
+ public DefaultMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) {
+ this.defaultMQPullConsumer = defaultMQPullConsumer;
+ this.rpcHook = rpcHook;
+ }
+
+ public void registerConsumeMessageHook(final ConsumeMessageHook hook) {
+ this.consumeMessageHookList.add(hook);
+ log.info("register consumeMessageHook Hook, {}", hook.hookName());
+ }
+
+ public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
+ createTopic(key, newTopic, queueNum, 0);
+ }
+
+ public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
+ this.makeSureStateOK();
+ this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag);
+ }
+
+ private void makeSureStateOK() throws MQClientException {
+ if (this.serviceState != ServiceState.RUNNING) {
+ throw new MQClientException("The consumer service state not OK, "//
+ + this.serviceState//
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
+ null);
+ }
+ }
+
+ public long fetchConsumeOffset(MessageQueue mq, boolean fromStore) throws MQClientException {
+ this.makeSureStateOK();
+ return this.offsetStore.readOffset(mq, fromStore ? ReadOffsetType.READ_FROM_STORE : ReadOffsetType.MEMORY_FIRST_THEN_STORE);
+ }
+
+ public Set<MessageQueue> fetchMessageQueuesInBalance(String topic) throws MQClientException {
+ this.makeSureStateOK();
+ if (null == topic) {
+ throw new IllegalArgumentException("topic is null");
+ }
+
+ ConcurrentHashMap<MessageQueue, ProcessQueue> mqTable = this.rebalanceImpl.getProcessQueueTable();
+ Set<MessageQueue> mqResult = new HashSet<MessageQueue>();
+ for (MessageQueue mq : mqTable.keySet()) {
+ if (mq.getTopic().equals(topic)) {
+ mqResult.add(mq);
+ }
+ }
+
+ return mqResult;
+ }
+
+ public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
+ this.makeSureStateOK();
+ return this.mQClientFactory.getMQAdminImpl().fetchPublishMessageQueues(topic);
+ }
+
+ public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
+ this.makeSureStateOK();
+ return this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic);
+ }
+
+ public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
+ this.makeSureStateOK();
+ return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq);
+ }
+
+ public long maxOffset(MessageQueue mq) throws MQClientException {
+ this.makeSureStateOK();
+ return this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
+ }
+
+ public long minOffset(MessageQueue mq) throws MQClientException {
+ this.makeSureStateOK();
+ return this.mQClientFactory.getMQAdminImpl().minOffset(mq);
+ }
+
+ public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return pull(mq, subExpression, offset, maxNums, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
+ }
+
+ public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return this.pullSyncImpl(mq, subExpression, offset, maxNums, false, timeout);
+ }
+
+ private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block, long timeout)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ this.makeSureStateOK();
+
+ if (null == mq) {
+ throw new MQClientException("mq is null", null);
+
+ }
+
+ if (offset < 0) {
+ throw new MQClientException("offset < 0", null);
+ }
+
+ if (maxNums <= 0) {
+ throw new MQClientException("maxNums <= 0", null);
+ }
+
+ this.subscriptionAutomatically(mq.getTopic());
+
+ int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
+
+ SubscriptionData subscriptionData;
+ try {
+ subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
+ mq.getTopic(), subExpression);
+ } catch (Exception e) {
+ throw new MQClientException("parse subscription error", e);
+ }
+
+ long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
+
+ PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(//
+ mq, // 1
+ subscriptionData.getSubString(), // 2
+ 0L, // 3
+ offset, // 4
+ maxNums, // 5
+ sysFlag, // 6
+ 0, // 7
+ this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
+ timeoutMillis, // 9
+ CommunicationMode.SYNC, // 10
+ null// 11
+ );
+ this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
+ if (!this.consumeMessageHookList.isEmpty()) {
+ ConsumeMessageContext consumeMessageContext = null;
+ consumeMessageContext = new ConsumeMessageContext();
+ consumeMessageContext.setConsumerGroup(this.groupName());
+ consumeMessageContext.setMq(mq);
+ consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
+ consumeMessageContext.setSuccess(false);
+ this.executeHookBefore(consumeMessageContext);
+ consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
+ consumeMessageContext.setSuccess(true);
+ this.executeHookAfter(consumeMessageContext);
+ }
+ return pullResult;
+ }
+
+ public void subscriptionAutomatically(final String topic) {
+ if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {
+ try {
+ SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
+ topic, SubscriptionData.SUB_ALL);
+ this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData);
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ public void unsubscribe(String topic) {
+ this.rebalanceImpl.getSubscriptionInner().remove(topic);
+ }
+
+ @Override
+ public String groupName() {
+ return this.defaultMQPullConsumer.getConsumerGroup();
+ }
+
+ public void executeHookBefore(final ConsumeMessageContext context) {
+ if (!this.consumeMessageHookList.isEmpty()) {
+ for (ConsumeMessageHook hook : this.consumeMessageHookList) {
+ try {
+ hook.consumeMessageBefore(context);
+ } catch (Throwable e) {
+ }
+ }
+ }
+ }
+
+ public void executeHookAfter(final ConsumeMessageContext context) {
+ if (!this.consumeMessageHookList.isEmpty()) {
+ for (ConsumeMessageHook hook : this.consumeMessageHookList) {
+ try {
+ hook.consumeMessageAfter(context);
+ } catch (Throwable e) {
+ }
+ }
+ }
+ }
+
+ @Override
+ public MessageModel messageModel() {
+ return this.defaultMQPullConsumer.getMessageModel();
+ }
+
+ @Override
+ public ConsumeType consumeType() {
+ return ConsumeType.CONSUME_ACTIVELY;
+ }
+
+ @Override
+ public ConsumeFromWhere consumeFromWhere() {
+ return ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
+ }
+
+ @Override
+ public Set<SubscriptionData> subscriptions() {
+ Set<SubscriptionData> result = new HashSet<SubscriptionData>();
+
+ Set<String> topics = this.defaultMQPullConsumer.getRegisterTopics();
+ if (topics != null) {
+ synchronized (topics) {
+ for (String t : topics) {
+ SubscriptionData ms = null;
+ try {
+ ms = FilterAPI.buildSubscriptionData(this.groupName(), t, SubscriptionData.SUB_ALL);
+ } catch (Exception e) {
+ log.error("parse subscription error", e);
+ }
+ ms.setSubVersion(0L);
+ result.add(ms);
+ }
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public void doRebalance() {
+ if (this.rebalanceImpl != null) {
+ this.rebalanceImpl.doRebalance(false);
+ }
+ }
+
+ @Override
+ public void persistConsumerOffset() {
+ try {
+ this.makeSureStateOK();
+ Set<MessageQueue> mqs = new HashSet<MessageQueue>();
+ Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
+ if (allocateMq != null) {
+ mqs.addAll(allocateMq);
+ }
+ this.offsetStore.persistAll(mqs);
+ } catch (Exception e) {
+ log.error("group: " + this.defaultMQPullConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
+ }
+ }
+
+ @Override
+ public void updateTopicSubscribeInfo(String topic, Set<MessageQueue> info) {
+ Map<String, SubscriptionData> subTable = this.rebalanceImpl.getSubscriptionInner();
+ if (subTable != null) {
+ if (subTable.containsKey(topic)) {
+ this.rebalanceImpl.getTopicSubscribeInfoTable().put(topic, info);
+ }
+ }
+ }
+
+ @Override
+ public boolean isSubscribeTopicNeedUpdate(String topic) {
+ Map<String, SubscriptionData> subTable = this.rebalanceImpl.getSubscriptionInner();
+ if (subTable != null) {
+ if (subTable.containsKey(topic)) {
+ return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic);
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public boolean isUnitMode() {
+ return this.defaultMQPullConsumer.isUnitMode();
+ }
+
+ @Override
+ public ConsumerRunningInfo consumerRunningInfo() {
+ ConsumerRunningInfo info = new ConsumerRunningInfo();
+
+ Properties prop = MixAll.object2Properties(this.defaultMQPullConsumer);
+ prop.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, String.valueOf(this.consumerStartTimestamp));
+ info.setProperties(prop);
+
+ info.getSubscriptionSet().addAll(this.subscriptions());
+ return info;
+ }
+
+ public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
+ throws MQClientException, RemotingException, InterruptedException {
+ pull(mq, subExpression, offset, maxNums, pullCallback, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
+ }
+
+ public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, long timeout)
+ throws MQClientException, RemotingException, InterruptedException {
+ this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false, timeout);
+ }
+
+ private void pullAsyncImpl(//
+ final MessageQueue mq, //
+ final String subExpression, //
+ final long offset, //
+ final int maxNums, //
+ final PullCallback pullCallback, //
+ final boolean block, //
+ final long timeout) throws MQClientException, RemotingException, InterruptedException {
+ this.makeSureStateOK();
+
+ if (null == mq) {
+ throw new MQClientException("mq is null", null);
+ }
+
+ if (offset < 0) {
+ throw new MQClientException("offset < 0", null);
+ }
+
+ if (maxNums <= 0) {
+ throw new MQClientException("maxNums <= 0", null);
+ }
+
+ if (null == pullCallback) {
+ throw new MQClientException("pullCallback is null", null);
+ }
+
+ this.subscriptionAutomatically(mq.getTopic());
+
+ try {
+ int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
+
+ final SubscriptionData subscriptionData;
+ try {
+ subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
+ mq.getTopic(), subExpression);
+ } catch (Exception e) {
+ throw new MQClientException("parse subscription error", e);
+ }
+
+ long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
+
+ this.pullAPIWrapper.pullKernelImpl(//
+ mq, // 1
+ subscriptionData.getSubString(), // 2
+ 0L, // 3
+ offset, // 4
+ maxNums, // 5
+ sysFlag, // 6
+ 0, // 7
+ this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
+ timeoutMillis, // 9
+ CommunicationMode.ASYNC, // 10
+ new PullCallback() {
+
+ @Override
+ public void onSuccess(PullResult pullResult) {
+ pullCallback
+ .onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData));
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ pullCallback.onException(e);
+ }
+ });
+ } catch (MQBrokerException e) {
+ throw new MQClientException("pullAsync unknow exception", e);
+ }
+ }
+
+ public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
+ }
+
+ public DefaultMQPullConsumer getDefaultMQPullConsumer() {
+ return defaultMQPullConsumer;
+ }
+
+ public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
+ throws MQClientException, RemotingException, InterruptedException {
+ this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, true,
+ this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
+ }
+
+ public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
+ throws MQClientException, InterruptedException {
+ this.makeSureStateOK();
+ return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
+ }
+
+ public MessageExt queryMessageByUniqKey(String topic, String uniqKey)
+ throws MQClientException, InterruptedException {
+ this.makeSureStateOK();
+ return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey);
+ }
+
+ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
+ this.makeSureStateOK();
+ return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
+ }
+
+ public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ sendMessageBack(msg, delayLevel, brokerName, this.defaultMQPullConsumer.getConsumerGroup());
+ }
+
+ public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
+ MQBrokerException, InterruptedException, MQClientException {
+ this.offsetStore.updateConsumeOffsetToBroker(mq, offset, isOneway);
+ }
+
+ public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, String consumerGroup)
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ try {
+ String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
+ : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
+
+ if (UtilAll.isBlank(consumerGroup)) {
+ consumerGroup = this.defaultMQPullConsumer.getConsumerGroup();
+ }
+
+ this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, 3000,
+ this.defaultMQPullConsumer.getMaxReconsumeTimes());
+ } catch (Exception e) {
+ log.error("sendMessageBack Exception, " + this.defaultMQPullConsumer.getConsumerGroup(), e);
+
+ Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPullConsumer.getConsumerGroup()), msg.getBody());
+ String originMsgId = MessageAccessor.getOriginMessageId(msg);
+ MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
+ newMsg.setFlag(msg.getFlag());
+ MessageAccessor.setProperties(newMsg, msg.getProperties());
+ MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
+ MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
+ MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(this.defaultMQPullConsumer.getMaxReconsumeTimes()));
+ newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
+ this.mQClientFactory.getDefaultMQProducer().send(newMsg);
+ }
+ }
+
+ public void shutdown() {
+ switch (this.serviceState) {
+ case CREATE_JUST:
+ break;
+ case RUNNING:
+ this.persistConsumerOffset();
+ this.mQClientFactory.unregisterConsumer(this.defaultMQPullConsumer.getConsumerGroup());
+ this.mQClientFactory.shutdown();
+ log.info("the consumer [{}] shutdown OK", this.defaultMQPullConsumer.getConsumerGroup());
+ this.serviceState = ServiceState.SHUTDOWN_ALREADY;
+ break;
+ case SHUTDOWN_ALREADY:
+ break;
+ default:
+ break;
+ }
+ }
+
+ public void start() throws MQClientException {
+ switch (this.serviceState) {
+ case CREATE_JUST:
+ this.serviceState = ServiceState.START_FAILED;
+
+ this.checkConfig();
+
+ this.copySubscription();
+
+ if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
+ this.defaultMQPullConsumer.changeInstanceNameToPID();
+ }
+
+ this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook);
+
+ this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
+ this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());
+ this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());
+ this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
+
+ this.pullAPIWrapper = new PullAPIWrapper(//
+ mQClientFactory, //
+ this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
+ this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
+
+ if (this.defaultMQPullConsumer.getOffsetStore() != null) {
+ this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();
+ } else {
+ switch (this.defaultMQPullConsumer.getMessageModel()) {
+ case BROADCASTING:
+ this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
+ break;
+ case CLUSTERING:
+ this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
+ break;
+ default:
+ break;
+ }
+ }
+
+ this.offsetStore.load();
+
+ boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);
+ if (!registerOK) {
+ this.serviceState = ServiceState.CREATE_JUST;
+
+ throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()
+ + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
+ null);
+ }
+
+ mQClientFactory.start();
+ log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());
+ this.serviceState = ServiceState.RUNNING;
+ break;
+ case RUNNING:
+ case START_FAILED:
+ case SHUTDOWN_ALREADY:
+ throw new MQClientException("The PullConsumer service state not OK, maybe started once, "//
+ + this.serviceState//
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
+ null);
+ default:
+ break;
+ }
+ }
+
+ private void checkConfig() throws MQClientException {
+ // check consumerGroup
+ Validators.checkGroup(this.defaultMQPullConsumer.getConsumerGroup());
+
+ // consumerGroup
+ if (null == this.defaultMQPullConsumer.getConsumerGroup()) {
+ throw new MQClientException(
+ "consumerGroup is null" //
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
+ null);
+ }
+
+ // consumerGroup
+ if (this.defaultMQPullConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {
+ throw new MQClientException(
+ "consumerGroup can not equal "//
+ + MixAll.DEFAULT_CONSUMER_GROUP //
+ + ", please specify another one."//
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
+ null);
+ }
+
+ // messageModel
+ if (null == this.defaultMQPullConsumer.getMessageModel()) {
+ throw new MQClientException(
+ "messageModel is null" //
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
+ null);
+ }
+
+ // allocateMessageQueueStrategy
+ if (null == this.defaultMQPullConsumer.getAllocateMessageQueueStrategy()) {
+ throw new MQClientException(
+ "allocateMessageQueueStrategy is null" //
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
+ null);
+ }
+
+ // allocateMessageQueueStrategy
+ if (this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() < this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis()) {
+ throw new MQClientException(
+ "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis" //
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
+ null);
+ }
+ }
+
+ private void copySubscription() throws MQClientException {
+ try {
+ Set<String> registerTopics = this.defaultMQPullConsumer.getRegisterTopics();
+ if (registerTopics != null) {
+ for (final String topic : registerTopics) {
+ SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
+ topic, SubscriptionData.SUB_ALL);
+ this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
+ }
+ }
+ } catch (Exception e) {
+ throw new MQClientException("subscription exception", e);
+ }
+ }
+
+ public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException {
+ this.makeSureStateOK();
+ this.offsetStore.updateOffset(mq, offset, false);
+ }
+
+ public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ this.makeSureStateOK();
+ return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
+ }
+
+ public void registerFilterMessageHook(final FilterMessageHook hook) {
+ this.filterMessageHookList.add(hook);
+ log.info("register FilterMessageHook Hook, {}", hook.hookName());
+ }
+
+ public OffsetStore getOffsetStore() {
+ return offsetStore;
+ }
+
+ public void setOffsetStore(OffsetStore offsetStore) {
+ this.offsetStore = offsetStore;
+ }
+
+ public PullAPIWrapper getPullAPIWrapper() {
+ return pullAPIWrapper;
+ }
+
+ public void setPullAPIWrapper(PullAPIWrapper pullAPIWrapper) {
+ this.pullAPIWrapper = pullAPIWrapper;
+ }
+
+ public ServiceState getServiceState() {
+ return serviceState;
+ }
+
+ public void setServiceState(ServiceState serviceState) {
+ this.serviceState = serviceState;
+ }
+
+ public long getConsumerStartTimestamp() {
+ return consumerStartTimestamp;
+ }
+
+
+ public RebalanceImpl getRebalanceImpl() {
+ return rebalanceImpl;
+ }
+}