You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:14:26 UTC
[35/99] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientManager.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientManager.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientManager.java
deleted file mode 100644
index 19016ca..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientManager.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl;
-
-import com.alibaba.rocketmq.client.ClientConfig;
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.remoting.RPCHook;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-
-/**
- * @author shijia.wxr
- */
-public class MQClientManager {
- private static MQClientManager instance = new MQClientManager();
- private AtomicInteger factoryIndexGenerator = new AtomicInteger();
- private ConcurrentHashMap<String/* clientId */, MQClientInstance> factoryTable =
- new ConcurrentHashMap<String, MQClientInstance>();
-
-
- private MQClientManager() {
-
- }
-
-
- public static MQClientManager getInstance() {
- return instance;
- }
-
- public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig) {
- return getAndCreateMQClientInstance(clientConfig, null);
- }
-
- public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
- String clientId = clientConfig.buildMQClientId();
- MQClientInstance instance = this.factoryTable.get(clientId);
- if (null == instance) {
- instance =
- new MQClientInstance(clientConfig.cloneClientConfig(),
- this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
- MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
- if (prev != null) {
- instance = prev;
- } else {
- // TODO log
- }
- }
-
- return instance;
- }
-
- public void removeClientFactory(final String clientId) {
- this.factoryTable.remove(clientId);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
deleted file mode 100644
index 4dee764..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ /dev/null
@@ -1,471 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
-import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import com.alibaba.rocketmq.client.consumer.listener.ConsumeReturnType;
-import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import com.alibaba.rocketmq.client.hook.ConsumeMessageContext;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.client.stat.ConsumerStatsManager;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.ThreadFactoryImpl;
-import com.alibaba.rocketmq.common.message.MessageAccessor;
-import com.alibaba.rocketmq.common.message.MessageConst;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.protocol.body.CMResult;
-import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-import org.slf4j.Logger;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-
-/**
- * @author shijia.wxr
- */
-public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
- private static final Logger log = ClientLogger.getLog();
- private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
- private final DefaultMQPushConsumer defaultMQPushConsumer;
- private final MessageListenerConcurrently messageListener;
- private final BlockingQueue<Runnable> consumeRequestQueue;
- private final ThreadPoolExecutor consumeExecutor;
- private final String consumerGroup;
-
- private final ScheduledExecutorService scheduledExecutorService;
- private final ScheduledExecutorService cleanExpireMsgExecutors;
-
-
- public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
- MessageListenerConcurrently messageListener) {
- this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
- this.messageListener = messageListener;
-
- this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
- this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
- this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
-
- this.consumeExecutor = new ThreadPoolExecutor(//
- this.defaultMQPushConsumer.getConsumeThreadMin(), //
- this.defaultMQPushConsumer.getConsumeThreadMax(), //
- 1000 * 60, //
- TimeUnit.MILLISECONDS, //
- this.consumeRequestQueue, //
- new ThreadFactoryImpl("ConsumeMessageThread_"));
-
- this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
- this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
- }
-
-
- public void start() {
- this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
-
- @Override
- public void run() {
- cleanExpireMsg();
- }
-
- }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
- }
-
-
- public void shutdown() {
- this.scheduledExecutorService.shutdown();
- this.consumeExecutor.shutdown();
- this.cleanExpireMsgExecutors.shutdown();
- }
-
- @Override
- public void updateCorePoolSize(int corePoolSize) {
- if (corePoolSize > 0 //
- && corePoolSize <= Short.MAX_VALUE //
- && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
- this.consumeExecutor.setCorePoolSize(corePoolSize);
- }
- }
-
- @Override
- public void incCorePoolSize() {
- // long corePoolSize = this.consumeExecutor.getCorePoolSize();
- // if (corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax())
- // {
- // this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize()
- // + 1);
- // }
- //
- // log.info("incCorePoolSize Concurrently from {} to {}, ConsumerGroup:
- // {}", //
- // corePoolSize,//
- // this.consumeExecutor.getCorePoolSize(),//
- // this.consumerGroup);
- }
-
- @Override
- public void decCorePoolSize() {
- // long corePoolSize = this.consumeExecutor.getCorePoolSize();
- // if (corePoolSize > this.defaultMQPushConsumer.getConsumeThreadMin())
- // {
- // this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize()
- // - 1);
- // }
- //
- // log.info("decCorePoolSize Concurrently from {} to {}, ConsumerGroup:
- // {}", //
- // corePoolSize,//
- // this.consumeExecutor.getCorePoolSize(),//
- // this.consumerGroup);
- }
-
- @Override
- public int getCorePoolSize() {
- return this.consumeExecutor.getCorePoolSize();
- }
-
- @Override
- public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String brokerName) {
- ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult();
- result.setOrder(false);
- result.setAutoCommit(true);
-
- List<MessageExt> msgs = new ArrayList<MessageExt>();
- msgs.add(msg);
- MessageQueue mq = new MessageQueue();
- mq.setBrokerName(brokerName);
- mq.setTopic(msg.getTopic());
- mq.setQueueId(msg.getQueueId());
-
- ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(mq);
-
- this.resetRetryTopic(msgs);
-
- final long beginTime = System.currentTimeMillis();
-
- log.info("consumeMessageDirectly receive new messge: {}", msg);
-
- try {
- ConsumeConcurrentlyStatus status = this.messageListener.consumeMessage(msgs, context);
- if (status != null) {
- switch (status) {
- case CONSUME_SUCCESS:
- result.setConsumeResult(CMResult.CR_SUCCESS);
- break;
- case RECONSUME_LATER:
- result.setConsumeResult(CMResult.CR_LATER);
- break;
- default:
- break;
- }
- } else {
- result.setConsumeResult(CMResult.CR_RETURN_NULL);
- }
- } catch (Throwable e) {
- result.setConsumeResult(CMResult.CR_THROW_EXCEPTION);
- result.setRemark(RemotingHelper.exceptionSimpleDesc(e));
-
- log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", //
- RemotingHelper.exceptionSimpleDesc(e), //
- ConsumeMessageConcurrentlyService.this.consumerGroup, //
- msgs, //
- mq), e);
- }
-
- result.setSpentTimeMills(System.currentTimeMillis() - beginTime);
-
- log.info("consumeMessageDirectly Result: {}", result);
-
- return result;
- }
-
- @Override
- public void submitConsumeRequest(//
- final List<MessageExt> msgs, //
- final ProcessQueue processQueue, //
- final MessageQueue messageQueue, //
- final boolean dispatchToConsume) {
- final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
- if (msgs.size() <= consumeBatchSize) {
- ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
- try {
- this.consumeExecutor.submit(consumeRequest);
- } catch (RejectedExecutionException e) {
- this.submitConsumeRequestLater(consumeRequest);
- }
- } else {
- for (int total = 0; total < msgs.size(); ) {
- List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
- for (int i = 0; i < consumeBatchSize; i++, total++) {
- if (total < msgs.size()) {
- msgThis.add(msgs.get(total));
- } else {
- break;
- }
- }
-
- ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
- try {
- this.consumeExecutor.submit(consumeRequest);
- } catch (RejectedExecutionException e) {
- for (; total < msgs.size(); total++) {
- msgThis.add(msgs.get(total));
- }
-
- this.submitConsumeRequestLater(consumeRequest);
- }
- }
- }
- }
-
- public void resetRetryTopic(final List<MessageExt> msgs) {
- final String groupTopic = MixAll.getRetryTopic(consumerGroup);
- for (MessageExt msg : msgs) {
- String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
- if (retryTopic != null && groupTopic.equals(msg.getTopic())) {
- msg.setTopic(retryTopic);
- }
- }
- }
-
- private void cleanExpireMsg() {
- Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
- this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry<MessageQueue, ProcessQueue> next = it.next();
- ProcessQueue pq = next.getValue();
- pq.cleanExpiredMsg(this.defaultMQPushConsumer);
- }
- }
-
- public void processConsumeResult(//
- final ConsumeConcurrentlyStatus status, //
- final ConsumeConcurrentlyContext context, //
- final ConsumeRequest consumeRequest//
- ) {
- int ackIndex = context.getAckIndex();
-
- if (consumeRequest.getMsgs().isEmpty())
- return;
-
- switch (status) {
- case CONSUME_SUCCESS:
- if (ackIndex >= consumeRequest.getMsgs().size()) {
- ackIndex = consumeRequest.getMsgs().size() - 1;
- }
- int ok = ackIndex + 1;
- int failed = consumeRequest.getMsgs().size() - ok;
- this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
- this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
- break;
- case RECONSUME_LATER:
- ackIndex = -1;
- this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
- consumeRequest.getMsgs().size());
- break;
- default:
- break;
- }
-
- switch (this.defaultMQPushConsumer.getMessageModel()) {
- case BROADCASTING:
- for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
- MessageExt msg = consumeRequest.getMsgs().get(i);
- log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
- }
- break;
- case CLUSTERING:
- List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
- for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
- MessageExt msg = consumeRequest.getMsgs().get(i);
- boolean result = this.sendMessageBack(msg, context);
- if (!result) {
- msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
- msgBackFailed.add(msg);
- }
- }
-
- if (!msgBackFailed.isEmpty()) {
- consumeRequest.getMsgs().removeAll(msgBackFailed);
-
- this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
- }
- break;
- default:
- break;
- }
-
- long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
- if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
- this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
- }
- }
-
- public ConsumerStatsManager getConsumerStatsManager() {
- return this.defaultMQPushConsumerImpl.getConsumerStatsManager();
- }
-
- public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
- int delayLevel = context.getDelayLevelWhenNextConsume();
-
- try {
- this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
- return true;
- } catch (Exception e) {
- log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
- }
-
- return false;
- }
-
- private void submitConsumeRequestLater(//
- final List<MessageExt> msgs, //
- final ProcessQueue processQueue, //
- final MessageQueue messageQueue//
- ) {
-
- this.scheduledExecutorService.schedule(new Runnable() {
-
- @Override
- public void run() {
- ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue, true);
- }
- }, 5000, TimeUnit.MILLISECONDS);
- }
-
- private void submitConsumeRequestLater(final ConsumeRequest consumeRequest//
- ) {
-
- this.scheduledExecutorService.schedule(new Runnable() {
-
- @Override
- public void run() {
- ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
- }
- }, 5000, TimeUnit.MILLISECONDS);
- }
-
- class ConsumeRequest implements Runnable {
- private final List<MessageExt> msgs;
- private final ProcessQueue processQueue;
- private final MessageQueue messageQueue;
-
-
- public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {
- this.msgs = msgs;
- this.processQueue = processQueue;
- this.messageQueue = messageQueue;
- }
-
- public List<MessageExt> getMsgs() {
- return msgs;
- }
-
- public ProcessQueue getProcessQueue() {
- return processQueue;
- }
-
- @Override
- public void run() {
- if (this.processQueue.isDropped()) {
- log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
- return;
- }
-
- MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
- ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
- ConsumeConcurrentlyStatus status = null;
-
- ConsumeMessageContext consumeMessageContext = null;
- if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
- consumeMessageContext = new ConsumeMessageContext();
- consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
- consumeMessageContext.setProps(new HashMap<String, String>());
- consumeMessageContext.setMq(messageQueue);
- consumeMessageContext.setMsgList(msgs);
- consumeMessageContext.setSuccess(false);
- ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
- }
-
- long beginTimestamp = System.currentTimeMillis();
- boolean hasException = false;
- ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
- try {
- ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
- if (msgs != null && !msgs.isEmpty()) {
- for (MessageExt msg : msgs) {
- MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
- }
- }
- status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
- } catch (Throwable e) {
- log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
- RemotingHelper.exceptionSimpleDesc(e), //
- ConsumeMessageConcurrentlyService.this.consumerGroup,
- msgs,
- messageQueue);
- hasException = true;
- }
- long consumeRT = System.currentTimeMillis() - beginTimestamp;
- if (null == status) {
- if (hasException) {
- returnType = ConsumeReturnType.EXCEPTION;
- } else {
- returnType = ConsumeReturnType.RETURNNULL;
- }
- } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
- returnType = ConsumeReturnType.TIME_OUT;
- } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
- returnType = ConsumeReturnType.FAILED;
- } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
- returnType = ConsumeReturnType.SUCCESS;
- }
- consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
- if (null == status) {
- log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
- ConsumeMessageConcurrentlyService.this.consumerGroup,
- msgs,
- messageQueue);
- status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
-
- if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
- consumeMessageContext.setStatus(status.toString());
- consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
- ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
- }
-
- ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
- .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
-
- if (!processQueue.isDropped()) {
- ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
- } else {
- log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
- }
- }
-
- public MessageQueue getMessageQueue() {
- return messageQueue;
- }
-
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
deleted file mode 100644
index 82903b0..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++ /dev/null
@@ -1,536 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
-import com.alibaba.rocketmq.client.consumer.listener.*;
-import com.alibaba.rocketmq.client.hook.ConsumeMessageContext;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.client.stat.ConsumerStatsManager;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.ThreadFactoryImpl;
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.message.*;
-import com.alibaba.rocketmq.common.protocol.body.CMResult;
-import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.*;
-
-
-/**
- * @author shijia.wxr
- */
-public class ConsumeMessageOrderlyService implements ConsumeMessageService {
- private static final Logger log = ClientLogger.getLog();
- private final static long MAX_TIME_CONSUME_CONTINUOUSLY =
- Long.parseLong(System.getProperty("rocketmq.client.maxTimeConsumeContinuously", "60000"));
- private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
- private final DefaultMQPushConsumer defaultMQPushConsumer;
- private final MessageListenerOrderly messageListener;
- private final BlockingQueue<Runnable> consumeRequestQueue;
- private final ThreadPoolExecutor consumeExecutor;
- private final String consumerGroup;
- private final MessageQueueLock messageQueueLock = new MessageQueueLock();
- private final ScheduledExecutorService scheduledExecutorService;
- private volatile boolean stopped = false;
-
-
- public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerOrderly messageListener) {
- this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
- this.messageListener = messageListener;
-
- this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
- this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
- this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
-
- this.consumeExecutor = new ThreadPoolExecutor(//
- this.defaultMQPushConsumer.getConsumeThreadMin(), //
- this.defaultMQPushConsumer.getConsumeThreadMax(), //
- 1000 * 60, //
- TimeUnit.MILLISECONDS, //
- this.consumeRequestQueue, //
- new ThreadFactoryImpl("ConsumeMessageThread_"));
-
- this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
- }
-
-
- public void start() {
- if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- ConsumeMessageOrderlyService.this.lockMQPeriodically();
- }
- }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
- }
- }
-
-
- public void shutdown() {
- this.stopped = true;
- this.scheduledExecutorService.shutdown();
- this.consumeExecutor.shutdown();
- if (MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
- this.unlockAllMQ();
- }
- }
-
-
- public synchronized void unlockAllMQ() {
- this.defaultMQPushConsumerImpl.getRebalanceImpl().unlockAll(false);
- }
-
- @Override
- public void updateCorePoolSize(int corePoolSize) {
- if (corePoolSize > 0 //
- && corePoolSize <= Short.MAX_VALUE //
- && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
- this.consumeExecutor.setCorePoolSize(corePoolSize);
- }
- }
-
- @Override
- public void incCorePoolSize() {
- }
-
- @Override
- public void decCorePoolSize() {
- }
-
- @Override
- public int getCorePoolSize() {
- return this.consumeExecutor.getCorePoolSize();
- }
-
- @Override
- public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String brokerName) {
- ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult();
- result.setOrder(true);
-
- List<MessageExt> msgs = new ArrayList<MessageExt>();
- msgs.add(msg);
- MessageQueue mq = new MessageQueue();
- mq.setBrokerName(brokerName);
- mq.setTopic(msg.getTopic());
- mq.setQueueId(msg.getQueueId());
-
- ConsumeOrderlyContext context = new ConsumeOrderlyContext(mq);
-
- final long beginTime = System.currentTimeMillis();
-
- log.info("consumeMessageDirectly receive new messge: {}", msg);
-
- try {
- ConsumeOrderlyStatus status = this.messageListener.consumeMessage(msgs, context);
- if (status != null) {
- switch (status) {
- case COMMIT:
- result.setConsumeResult(CMResult.CR_COMMIT);
- break;
- case ROLLBACK:
- result.setConsumeResult(CMResult.CR_ROLLBACK);
- break;
- case SUCCESS:
- result.setConsumeResult(CMResult.CR_SUCCESS);
- break;
- case SUSPEND_CURRENT_QUEUE_A_MOMENT:
- result.setConsumeResult(CMResult.CR_LATER);
- break;
- default:
- break;
- }
- } else {
- result.setConsumeResult(CMResult.CR_RETURN_NULL);
- }
- } catch (Throwable e) {
- result.setConsumeResult(CMResult.CR_THROW_EXCEPTION);
- result.setRemark(RemotingHelper.exceptionSimpleDesc(e));
-
- log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", //
- RemotingHelper.exceptionSimpleDesc(e), //
- ConsumeMessageOrderlyService.this.consumerGroup, //
- msgs, //
- mq), e);
- }
-
- result.setAutoCommit(context.isAutoCommit());
- result.setSpentTimeMills(System.currentTimeMillis() - beginTime);
-
- log.info("consumeMessageDirectly Result: {}", result);
-
- return result;
- }
-
- @Override
- public void submitConsumeRequest(//
- final List<MessageExt> msgs, //
- final ProcessQueue processQueue, //
- final MessageQueue messageQueue, //
- final boolean dispathToConsume) {
- if (dispathToConsume) {
- ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
- this.consumeExecutor.submit(consumeRequest);
- }
- }
-
- public synchronized void lockMQPeriodically() {
- if (!this.stopped) {
- this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
- }
- }
-
- public void tryLockLaterAndReconsume(final MessageQueue mq, final ProcessQueue processQueue, final long delayMills) {
- this.scheduledExecutorService.schedule(new Runnable() {
- @Override
- public void run() {
- boolean lockOK = ConsumeMessageOrderlyService.this.lockOneMQ(mq);
- if (lockOK) {
- ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, mq, 10);
- } else {
- ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, mq, 3000);
- }
- }
- }, delayMills, TimeUnit.MILLISECONDS);
- }
-
- public synchronized boolean lockOneMQ(final MessageQueue mq) {
- if (!this.stopped) {
- return this.defaultMQPushConsumerImpl.getRebalanceImpl().lock(mq);
- }
-
- return false;
- }
-
- private void submitConsumeRequestLater(//
- final ProcessQueue processQueue, //
- final MessageQueue messageQueue, //
- final long suspendTimeMillis//
- ) {
- long timeMillis = suspendTimeMillis;
- if (timeMillis == -1) {
- timeMillis = this.defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis();
- }
-
- if (timeMillis < 10) {
- timeMillis = 10;
- } else if (timeMillis > 30000) {
- timeMillis = 30000;
- }
-
- this.scheduledExecutorService.schedule(new Runnable() {
-
- @Override
- public void run() {
- ConsumeMessageOrderlyService.this.submitConsumeRequest(null, processQueue, messageQueue, true);
- }
- }, timeMillis, TimeUnit.MILLISECONDS);
- }
-
- public boolean processConsumeResult(//
- final List<MessageExt> msgs, //
- final ConsumeOrderlyStatus status, //
- final ConsumeOrderlyContext context, //
- final ConsumeRequest consumeRequest//
- ) {
- boolean continueConsume = true;
- long commitOffset = -1L;
- if (context.isAutoCommit()) {
- switch (status) {
- case COMMIT:
- case ROLLBACK:
- log.warn("the message queue consume result is illegal, we think you want to ack these message {}",
- consumeRequest.getMessageQueue());
- case SUCCESS:
- commitOffset = consumeRequest.getProcessQueue().commit();
- this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
- break;
- case SUSPEND_CURRENT_QUEUE_A_MOMENT:
- this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
- if (checkReconsumeTimes(msgs)) {
- consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
- this.submitConsumeRequestLater(//
- consumeRequest.getProcessQueue(), //
- consumeRequest.getMessageQueue(), //
- context.getSuspendCurrentQueueTimeMillis());
- continueConsume = false;
- } else {
- commitOffset = consumeRequest.getProcessQueue().commit();
- }
- break;
- default:
- break;
- }
- } else {
- switch (status) {
- case SUCCESS:
- this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
- break;
- case COMMIT:
- commitOffset = consumeRequest.getProcessQueue().commit();
- break;
- case ROLLBACK:
- consumeRequest.getProcessQueue().rollback();
- this.submitConsumeRequestLater(//
- consumeRequest.getProcessQueue(), //
- consumeRequest.getMessageQueue(), //
- context.getSuspendCurrentQueueTimeMillis());
- continueConsume = false;
- break;
- case SUSPEND_CURRENT_QUEUE_A_MOMENT:
- this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
- if (checkReconsumeTimes(msgs)) {
- consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
- this.submitConsumeRequestLater(//
- consumeRequest.getProcessQueue(), //
- consumeRequest.getMessageQueue(), //
- context.getSuspendCurrentQueueTimeMillis());
- continueConsume = false;
- }
- break;
- default:
- break;
- }
- }
-
- if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
- this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
- }
-
- return continueConsume;
- }
-
- public ConsumerStatsManager getConsumerStatsManager() {
- return this.defaultMQPushConsumerImpl.getConsumerStatsManager();
- }
-
- private int getMaxReconsumeTimes() {
- // default reconsume times: Integer.MAX_VALUE
- if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
- return Integer.MAX_VALUE;
- } else {
- return this.defaultMQPushConsumer.getMaxReconsumeTimes();
- }
- }
-
- private boolean checkReconsumeTimes(List<MessageExt> msgs) {
- boolean suspend = false;
- if (msgs != null && !msgs.isEmpty()) {
- for (MessageExt msg : msgs) {
- if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) {
- MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes()));
- if (!sendMessageBack(msg)) {
- suspend = true;
- msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
- }
- } else {
- suspend = true;
- msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
- }
- }
- }
- return suspend;
- }
-
- public boolean sendMessageBack(final MessageExt msg) {
- try {
- // max reconsume times exceeded then send to dead letter queue.
- Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
- String originMsgId = MessageAccessor.getOriginMessageId(msg);
- MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
- newMsg.setFlag(msg.getFlag());
- MessageAccessor.setProperties(newMsg, msg.getProperties());
- MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
- MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));
- MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
- newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
-
- this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
- return true;
- } catch (Exception e) {
- log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
- }
-
- return false;
- }
-
- class ConsumeRequest implements Runnable {
- private final ProcessQueue processQueue;
- private final MessageQueue messageQueue;
-
-
- public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
- this.processQueue = processQueue;
- this.messageQueue = messageQueue;
- }
-
- public ProcessQueue getProcessQueue() {
- return processQueue;
- }
-
- public MessageQueue getMessageQueue() {
- return messageQueue;
- }
-
- @Override
- public void run() {
- if (this.processQueue.isDropped()) {
- log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
- return;
- }
-
- final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
- synchronized (objLock) {
- if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
- || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
- final long beginTime = System.currentTimeMillis();
- for (boolean continueConsume = true; continueConsume; ) {
- if (this.processQueue.isDropped()) {
- log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
- break;
- }
-
- if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
- && !this.processQueue.isLocked()) {
- log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
- ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
- break;
- }
-
- if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
- && this.processQueue.isLockExpired()) {
- log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
- ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
- break;
- }
-
- long interval = System.currentTimeMillis() - beginTime;
- if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
- ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
- break;
- }
-
- final int consumeBatchSize =
- ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
-
- List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
- if (!msgs.isEmpty()) {
- final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
-
- ConsumeOrderlyStatus status = null;
-
- ConsumeMessageContext consumeMessageContext = null;
- if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
- consumeMessageContext = new ConsumeMessageContext();
- consumeMessageContext
- .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
- consumeMessageContext.setMq(messageQueue);
- consumeMessageContext.setMsgList(msgs);
- consumeMessageContext.setSuccess(false);
- // init the consume context type
- consumeMessageContext.setProps(new HashMap<String, String>());
- ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
- }
-
- long beginTimestamp = System.currentTimeMillis();
- ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
- boolean hasException = false;
- try {
- this.processQueue.getLockConsume().lock();
- if (this.processQueue.isDropped()) {
- log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
- this.messageQueue);
- break;
- }
-
- status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
- } catch (Throwable e) {
- log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //
- RemotingHelper.exceptionSimpleDesc(e), //
- ConsumeMessageOrderlyService.this.consumerGroup, //
- msgs, //
- messageQueue);
- hasException = true;
- } finally {
- this.processQueue.getLockConsume().unlock();
- }
-
- if (null == status //
- || ConsumeOrderlyStatus.ROLLBACK == status//
- || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
- log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", //
- ConsumeMessageOrderlyService.this.consumerGroup, //
- msgs, //
- messageQueue);
- }
-
- long consumeRT = System.currentTimeMillis() - beginTimestamp;
- if (null == status) {
- if (hasException) {
- returnType = ConsumeReturnType.EXCEPTION;
- } else {
- returnType = ConsumeReturnType.RETURNNULL;
- }
- } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
- returnType = ConsumeReturnType.TIME_OUT;
- } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
- returnType = ConsumeReturnType.FAILED;
- } else if (ConsumeOrderlyStatus.SUCCESS == status) {
- returnType = ConsumeReturnType.SUCCESS;
- }
- consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
- if (null == status) {
- status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
- }
-
- if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
- consumeMessageContext.setStatus(status.toString());
- consumeMessageContext
- .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
- ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
- }
-
- ConsumeMessageOrderlyService.this.getConsumerStatsManager()
- .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
-
- continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
- } else {
- continueConsume = false;
- }
- }
- } else {
- if (this.processQueue.isDropped()) {
- log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
- return;
- }
-
- ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
- }
- }
- }
-
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageService.java
deleted file mode 100644
index 1f7f0d9..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageService.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
-
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public interface ConsumeMessageService {
- void start();
-
-
- void shutdown();
-
-
- void updateCorePoolSize(int corePoolSize);
-
-
- void incCorePoolSize();
-
-
- void decCorePoolSize();
-
-
- int getCorePoolSize();
-
-
- ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
-
-
- void submitConsumeRequest(//
- final List<MessageExt> msgs, //
- final ProcessQueue processQueue, //
- final MessageQueue messageQueue, //
- final boolean dispathToConsume);
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
deleted file mode 100644
index 1785ec9..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ /dev/null
@@ -1,706 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.client.QueryResult;
-import com.alibaba.rocketmq.client.Validators;
-import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
-import com.alibaba.rocketmq.client.consumer.PullCallback;
-import com.alibaba.rocketmq.client.consumer.PullResult;
-import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import com.alibaba.rocketmq.client.consumer.store.LocalFileOffsetStore;
-import com.alibaba.rocketmq.client.consumer.store.OffsetStore;
-import com.alibaba.rocketmq.client.consumer.store.ReadOffsetType;
-import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.hook.ConsumeMessageContext;
-import com.alibaba.rocketmq.client.hook.ConsumeMessageHook;
-import com.alibaba.rocketmq.client.hook.FilterMessageHook;
-import com.alibaba.rocketmq.client.impl.CommunicationMode;
-import com.alibaba.rocketmq.client.impl.MQClientManager;
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.ServiceState;
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
-import com.alibaba.rocketmq.common.filter.FilterAPI;
-import com.alibaba.rocketmq.common.help.FAQUrl;
-import com.alibaba.rocketmq.common.message.*;
-import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
-import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import com.alibaba.rocketmq.common.sysflag.PullSysFlag;
-import com.alibaba.rocketmq.remoting.RPCHook;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-import org.slf4j.Logger;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * @author shijia.wxr
- */
-public class DefaultMQPullConsumerImpl implements MQConsumerInner {
- private final Logger log = ClientLogger.getLog();
- private final DefaultMQPullConsumer defaultMQPullConsumer;
- private final long consumerStartTimestamp = System.currentTimeMillis();
- private final RPCHook rpcHook;
- private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
- private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
- private ServiceState serviceState = ServiceState.CREATE_JUST;
- private MQClientInstance mQClientFactory;
- private PullAPIWrapper pullAPIWrapper;
- private OffsetStore offsetStore;
- private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);
-
-
- public DefaultMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) {
- this.defaultMQPullConsumer = defaultMQPullConsumer;
- this.rpcHook = rpcHook;
- }
-
- public void registerConsumeMessageHook(final ConsumeMessageHook hook) {
- this.consumeMessageHookList.add(hook);
- log.info("register consumeMessageHook Hook, {}", hook.hookName());
- }
-
- public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
- createTopic(key, newTopic, queueNum, 0);
- }
-
- public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
- this.makeSureStateOK();
- this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag);
- }
-
- private void makeSureStateOK() throws MQClientException {
- if (this.serviceState != ServiceState.RUNNING) {
- throw new MQClientException("The consumer service state not OK, "//
- + this.serviceState//
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
- null);
- }
- }
-
- public long fetchConsumeOffset(MessageQueue mq, boolean fromStore) throws MQClientException {
- this.makeSureStateOK();
- return this.offsetStore.readOffset(mq, fromStore ? ReadOffsetType.READ_FROM_STORE : ReadOffsetType.MEMORY_FIRST_THEN_STORE);
- }
-
- public Set<MessageQueue> fetchMessageQueuesInBalance(String topic) throws MQClientException {
- this.makeSureStateOK();
- if (null == topic) {
- throw new IllegalArgumentException("topic is null");
- }
-
- ConcurrentHashMap<MessageQueue, ProcessQueue> mqTable = this.rebalanceImpl.getProcessQueueTable();
- Set<MessageQueue> mqResult = new HashSet<MessageQueue>();
- for (MessageQueue mq : mqTable.keySet()) {
- if (mq.getTopic().equals(topic)) {
- mqResult.add(mq);
- }
- }
-
- return mqResult;
- }
-
- public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
- this.makeSureStateOK();
- return this.mQClientFactory.getMQAdminImpl().fetchPublishMessageQueues(topic);
- }
-
- public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
- this.makeSureStateOK();
- return this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic);
- }
-
- public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
- this.makeSureStateOK();
- return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq);
- }
-
- public long maxOffset(MessageQueue mq) throws MQClientException {
- this.makeSureStateOK();
- return this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
- }
-
- public long minOffset(MessageQueue mq) throws MQClientException {
- this.makeSureStateOK();
- return this.mQClientFactory.getMQAdminImpl().minOffset(mq);
- }
-
- public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return pull(mq, subExpression, offset, maxNums, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
- }
-
- public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return this.pullSyncImpl(mq, subExpression, offset, maxNums, false, timeout);
- }
-
- private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block, long timeout)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- this.makeSureStateOK();
-
- if (null == mq) {
- throw new MQClientException("mq is null", null);
-
- }
-
- if (offset < 0) {
- throw new MQClientException("offset < 0", null);
- }
-
- if (maxNums <= 0) {
- throw new MQClientException("maxNums <= 0", null);
- }
-
- this.subscriptionAutomatically(mq.getTopic());
-
- int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
-
- SubscriptionData subscriptionData;
- try {
- subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
- mq.getTopic(), subExpression);
- } catch (Exception e) {
- throw new MQClientException("parse subscription error", e);
- }
-
- long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
-
- PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(//
- mq, // 1
- subscriptionData.getSubString(), // 2
- 0L, // 3
- offset, // 4
- maxNums, // 5
- sysFlag, // 6
- 0, // 7
- this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
- timeoutMillis, // 9
- CommunicationMode.SYNC, // 10
- null// 11
- );
- this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
- if (!this.consumeMessageHookList.isEmpty()) {
- ConsumeMessageContext consumeMessageContext = null;
- consumeMessageContext = new ConsumeMessageContext();
- consumeMessageContext.setConsumerGroup(this.groupName());
- consumeMessageContext.setMq(mq);
- consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
- consumeMessageContext.setSuccess(false);
- this.executeHookBefore(consumeMessageContext);
- consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
- consumeMessageContext.setSuccess(true);
- this.executeHookAfter(consumeMessageContext);
- }
- return pullResult;
- }
-
- public void subscriptionAutomatically(final String topic) {
- if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {
- try {
- SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
- topic, SubscriptionData.SUB_ALL);
- this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData);
- } catch (Exception e) {
- }
- }
- }
-
- public void unsubscribe(String topic) {
- this.rebalanceImpl.getSubscriptionInner().remove(topic);
- }
-
- @Override
- public String groupName() {
- return this.defaultMQPullConsumer.getConsumerGroup();
- }
-
- public void executeHookBefore(final ConsumeMessageContext context) {
- if (!this.consumeMessageHookList.isEmpty()) {
- for (ConsumeMessageHook hook : this.consumeMessageHookList) {
- try {
- hook.consumeMessageBefore(context);
- } catch (Throwable e) {
- }
- }
- }
- }
-
- public void executeHookAfter(final ConsumeMessageContext context) {
- if (!this.consumeMessageHookList.isEmpty()) {
- for (ConsumeMessageHook hook : this.consumeMessageHookList) {
- try {
- hook.consumeMessageAfter(context);
- } catch (Throwable e) {
- }
- }
- }
- }
-
- @Override
- public MessageModel messageModel() {
- return this.defaultMQPullConsumer.getMessageModel();
- }
-
- @Override
- public ConsumeType consumeType() {
- return ConsumeType.CONSUME_ACTIVELY;
- }
-
- @Override
- public ConsumeFromWhere consumeFromWhere() {
- return ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
- }
-
- @Override
- public Set<SubscriptionData> subscriptions() {
- Set<SubscriptionData> result = new HashSet<SubscriptionData>();
-
- Set<String> topics = this.defaultMQPullConsumer.getRegisterTopics();
- if (topics != null) {
- synchronized (topics) {
- for (String t : topics) {
- SubscriptionData ms = null;
- try {
- ms = FilterAPI.buildSubscriptionData(this.groupName(), t, SubscriptionData.SUB_ALL);
- } catch (Exception e) {
- log.error("parse subscription error", e);
- }
- ms.setSubVersion(0L);
- result.add(ms);
- }
- }
- }
-
- return result;
- }
-
- @Override
- public void doRebalance() {
- if (this.rebalanceImpl != null) {
- this.rebalanceImpl.doRebalance(false);
- }
- }
-
- @Override
- public void persistConsumerOffset() {
- try {
- this.makeSureStateOK();
- Set<MessageQueue> mqs = new HashSet<MessageQueue>();
- Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
- if (allocateMq != null) {
- mqs.addAll(allocateMq);
- }
- this.offsetStore.persistAll(mqs);
- } catch (Exception e) {
- log.error("group: " + this.defaultMQPullConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
- }
- }
-
- @Override
- public void updateTopicSubscribeInfo(String topic, Set<MessageQueue> info) {
- Map<String, SubscriptionData> subTable = this.rebalanceImpl.getSubscriptionInner();
- if (subTable != null) {
- if (subTable.containsKey(topic)) {
- this.rebalanceImpl.getTopicSubscribeInfoTable().put(topic, info);
- }
- }
- }
-
- @Override
- public boolean isSubscribeTopicNeedUpdate(String topic) {
- Map<String, SubscriptionData> subTable = this.rebalanceImpl.getSubscriptionInner();
- if (subTable != null) {
- if (subTable.containsKey(topic)) {
- return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic);
- }
- }
-
- return false;
- }
-
- @Override
- public boolean isUnitMode() {
- return this.defaultMQPullConsumer.isUnitMode();
- }
-
- @Override
- public ConsumerRunningInfo consumerRunningInfo() {
- ConsumerRunningInfo info = new ConsumerRunningInfo();
-
- Properties prop = MixAll.object2Properties(this.defaultMQPullConsumer);
- prop.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, String.valueOf(this.consumerStartTimestamp));
- info.setProperties(prop);
-
- info.getSubscriptionSet().addAll(this.subscriptions());
- return info;
- }
-
- public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
- throws MQClientException, RemotingException, InterruptedException {
- pull(mq, subExpression, offset, maxNums, pullCallback, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
- }
-
- public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, long timeout)
- throws MQClientException, RemotingException, InterruptedException {
- this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false, timeout);
- }
-
- private void pullAsyncImpl(//
- final MessageQueue mq, //
- final String subExpression, //
- final long offset, //
- final int maxNums, //
- final PullCallback pullCallback, //
- final boolean block, //
- final long timeout) throws MQClientException, RemotingException, InterruptedException {
- this.makeSureStateOK();
-
- if (null == mq) {
- throw new MQClientException("mq is null", null);
- }
-
- if (offset < 0) {
- throw new MQClientException("offset < 0", null);
- }
-
- if (maxNums <= 0) {
- throw new MQClientException("maxNums <= 0", null);
- }
-
- if (null == pullCallback) {
- throw new MQClientException("pullCallback is null", null);
- }
-
- this.subscriptionAutomatically(mq.getTopic());
-
- try {
- int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
-
- final SubscriptionData subscriptionData;
- try {
- subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
- mq.getTopic(), subExpression);
- } catch (Exception e) {
- throw new MQClientException("parse subscription error", e);
- }
-
- long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
-
- this.pullAPIWrapper.pullKernelImpl(//
- mq, // 1
- subscriptionData.getSubString(), // 2
- 0L, // 3
- offset, // 4
- maxNums, // 5
- sysFlag, // 6
- 0, // 7
- this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
- timeoutMillis, // 9
- CommunicationMode.ASYNC, // 10
- new PullCallback() {
-
- @Override
- public void onSuccess(PullResult pullResult) {
- pullCallback
- .onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData));
- }
-
- @Override
- public void onException(Throwable e) {
- pullCallback.onException(e);
- }
- });
- } catch (MQBrokerException e) {
- throw new MQClientException("pullAsync unknow exception", e);
- }
- }
-
- public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
- }
-
- public DefaultMQPullConsumer getDefaultMQPullConsumer() {
- return defaultMQPullConsumer;
- }
-
- public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
- throws MQClientException, RemotingException, InterruptedException {
- this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, true,
- this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
- }
-
- public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
- throws MQClientException, InterruptedException {
- this.makeSureStateOK();
- return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
- }
-
- public MessageExt queryMessageByUniqKey(String topic, String uniqKey)
- throws MQClientException, InterruptedException {
- this.makeSureStateOK();
- return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey);
- }
-
- public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
- this.makeSureStateOK();
- return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
- }
-
- public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- sendMessageBack(msg, delayLevel, brokerName, this.defaultMQPullConsumer.getConsumerGroup());
- }
-
- public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
- MQBrokerException, InterruptedException, MQClientException {
- this.offsetStore.updateConsumeOffsetToBroker(mq, offset, isOneway);
- }
-
- public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, String consumerGroup)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- try {
- String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
- : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
-
- if (UtilAll.isBlank(consumerGroup)) {
- consumerGroup = this.defaultMQPullConsumer.getConsumerGroup();
- }
-
- this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, 3000,
- this.defaultMQPullConsumer.getMaxReconsumeTimes());
- } catch (Exception e) {
- log.error("sendMessageBack Exception, " + this.defaultMQPullConsumer.getConsumerGroup(), e);
-
- Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPullConsumer.getConsumerGroup()), msg.getBody());
- String originMsgId = MessageAccessor.getOriginMessageId(msg);
- MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
- newMsg.setFlag(msg.getFlag());
- MessageAccessor.setProperties(newMsg, msg.getProperties());
- MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
- MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
- MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(this.defaultMQPullConsumer.getMaxReconsumeTimes()));
- newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
- this.mQClientFactory.getDefaultMQProducer().send(newMsg);
- }
- }
-
- public void shutdown() {
- switch (this.serviceState) {
- case CREATE_JUST:
- break;
- case RUNNING:
- this.persistConsumerOffset();
- this.mQClientFactory.unregisterConsumer(this.defaultMQPullConsumer.getConsumerGroup());
- this.mQClientFactory.shutdown();
- log.info("the consumer [{}] shutdown OK", this.defaultMQPullConsumer.getConsumerGroup());
- this.serviceState = ServiceState.SHUTDOWN_ALREADY;
- break;
- case SHUTDOWN_ALREADY:
- break;
- default:
- break;
- }
- }
-
- public void start() throws MQClientException {
- switch (this.serviceState) {
- case CREATE_JUST:
- this.serviceState = ServiceState.START_FAILED;
-
- this.checkConfig();
-
- this.copySubscription();
-
- if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
- this.defaultMQPullConsumer.changeInstanceNameToPID();
- }
-
- this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook);
-
- this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
- this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());
- this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());
- this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
-
- this.pullAPIWrapper = new PullAPIWrapper(//
- mQClientFactory, //
- this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
- this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
-
- if (this.defaultMQPullConsumer.getOffsetStore() != null) {
- this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();
- } else {
- switch (this.defaultMQPullConsumer.getMessageModel()) {
- case BROADCASTING:
- this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
- break;
- case CLUSTERING:
- this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
- break;
- default:
- break;
- }
- }
-
- this.offsetStore.load();
-
- boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);
- if (!registerOK) {
- this.serviceState = ServiceState.CREATE_JUST;
-
- throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()
- + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
- null);
- }
-
- mQClientFactory.start();
- log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());
- this.serviceState = ServiceState.RUNNING;
- break;
- case RUNNING:
- case START_FAILED:
- case SHUTDOWN_ALREADY:
- throw new MQClientException("The PullConsumer service state not OK, maybe started once, "//
- + this.serviceState//
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
- null);
- default:
- break;
- }
- }
-
- private void checkConfig() throws MQClientException {
- // check consumerGroup
- Validators.checkGroup(this.defaultMQPullConsumer.getConsumerGroup());
-
- // consumerGroup
- if (null == this.defaultMQPullConsumer.getConsumerGroup()) {
- throw new MQClientException(
- "consumerGroup is null" //
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
- null);
- }
-
- // consumerGroup
- if (this.defaultMQPullConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {
- throw new MQClientException(
- "consumerGroup can not equal "//
- + MixAll.DEFAULT_CONSUMER_GROUP //
- + ", please specify another one."//
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
- null);
- }
-
- // messageModel
- if (null == this.defaultMQPullConsumer.getMessageModel()) {
- throw new MQClientException(
- "messageModel is null" //
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
- null);
- }
-
- // allocateMessageQueueStrategy
- if (null == this.defaultMQPullConsumer.getAllocateMessageQueueStrategy()) {
- throw new MQClientException(
- "allocateMessageQueueStrategy is null" //
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
- null);
- }
-
- // allocateMessageQueueStrategy
- if (this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() < this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis()) {
- throw new MQClientException(
- "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis" //
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
- null);
- }
- }
-
- private void copySubscription() throws MQClientException {
- try {
- Set<String> registerTopics = this.defaultMQPullConsumer.getRegisterTopics();
- if (registerTopics != null) {
- for (final String topic : registerTopics) {
- SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
- topic, SubscriptionData.SUB_ALL);
- this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
- }
- }
- } catch (Exception e) {
- throw new MQClientException("subscription exception", e);
- }
- }
-
- public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException {
- this.makeSureStateOK();
- this.offsetStore.updateOffset(mq, offset, false);
- }
-
- public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- this.makeSureStateOK();
- return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
- }
-
- public void registerFilterMessageHook(final FilterMessageHook hook) {
- this.filterMessageHookList.add(hook);
- log.info("register FilterMessageHook Hook, {}", hook.hookName());
- }
-
- public OffsetStore getOffsetStore() {
- return offsetStore;
- }
-
- public void setOffsetStore(OffsetStore offsetStore) {
- this.offsetStore = offsetStore;
- }
-
- public PullAPIWrapper getPullAPIWrapper() {
- return pullAPIWrapper;
- }
-
- public void setPullAPIWrapper(PullAPIWrapper pullAPIWrapper) {
- this.pullAPIWrapper = pullAPIWrapper;
- }
-
- public ServiceState getServiceState() {
- return serviceState;
- }
-
- public void setServiceState(ServiceState serviceState) {
- this.serviceState = serviceState;
- }
-
- public long getConsumerStartTimestamp() {
- return consumerStartTimestamp;
- }
-
-
- public RebalanceImpl getRebalanceImpl() {
- return rebalanceImpl;
- }
-}