You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/08/14 02:39:43 UTC
[rocketmq] branch litePullConsumer updated: Polish lite pull
consumer (#1381)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch litePullConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/litePullConsumer by this push:
new c6096a1 Polish lite pull consumer (#1381)
c6096a1 is described below
commit c6096a1443f4ee53472b5a93ec68b024fd61f311
Author: King <79...@qq.com>
AuthorDate: Wed Aug 14 10:39:36 2019 +0800
Polish lite pull consumer (#1381)
* fix unsubscribe code
* fix commit consumed offset
* fix commit consumed offset
* fix commit consumed offset
* fix commit consumed offset
* polish commit consumed offset
* pass checkstyle
* pass checkstyle
* polish LiteMQPullConsumer
* add flow control and polish commit logic
* fix bug
* polish code
* fix commit consumed offset back
* refactor litePullConsumer
* development save
* development save
* Refactor DefaultLitePullConsumer and DefaultLitePullConsumerImpl.
* Polish lite pull consumer
* polish lite pull consumer
* polish lite pull consumer
* fix seek
* fix seek function
* polish lite pull consumer
* add apache header
* add test
* polish test
* Make broadcast model work for litePullConsumer
* Revert example/broadcast/PushConsumer.java
* Add delay time when no new message
* Enable long polling mode
* Fix subscribe bug when rebalance
* Delete useless consumeMessageHook
* Implement TopicMessageQueueChangeListener interface
* Lite pull consumer support namespace
* Make sql92 filter work
---
.../org/apache/rocketmq/client/ClientConfig.java | 18 +-
.../client/consumer/DefaultLitePullConsumer.java | 25 ++-
.../rocketmq/client/consumer/LitePullConsumer.java | 9 +-
.../consumer/TopicMessageQueueChangeListener.java | 30 +++
.../client/impl/consumer/AssignedMessageQueue.java | 2 -
.../impl/consumer/DefaultLitePullConsumerImpl.java | 232 +++++++++++++--------
6 files changed, 212 insertions(+), 104 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 87c01a5..c3e4efa 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -16,7 +16,9 @@
*/
package org.apache.rocketmq.client;
+import java.util.Collection;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.UtilAll;
@@ -95,7 +97,6 @@ public class ClientConfig {
}
}
-
public String withNamespace(String resource) {
return NamespaceUtil.wrapNamespace(this.getNamespace(), resource);
}
@@ -124,9 +125,21 @@ public class ClientConfig {
if (StringUtils.isEmpty(this.getNamespace())) {
return queue;
}
-
return new MessageQueue(withNamespace(queue.getTopic()), queue.getBrokerName(), queue.getQueueId());
}
+
+ public Collection<MessageQueue> queuesWithNamespace(Collection<MessageQueue> queues) {
+ if (StringUtils.isEmpty(this.getNamespace())) {
+ return queues;
+ }
+ Iterator<MessageQueue> iter = queues.iterator();
+ while (iter.hasNext()) {
+ MessageQueue queue = iter.next();
+ queue.setTopic(withNamespace(queue.getTopic()));
+ }
+ return queues;
+ }
+
public void resetClientConfig(final ClientConfig cc) {
this.namesrvAddr = cc.namesrvAddr;
this.clientIP = cc.clientIP;
@@ -170,6 +183,7 @@ public class ClientConfig {
/**
* Domain name mode access way does not support the delimiter(;), and only one domain name can be set.
+ *
* @param namesrvAddr name server address
*/
public void setNamesrvAddr(String namesrvAddr) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 1858fa1..543e9cf 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -176,17 +176,22 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
@Override
public void subscribe(String topic, String subExpression) throws MQClientException {
- this.defaultLitePullConsumerImpl.subscribe(topic, subExpression);
+ this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression);
+ }
+
+ @Override
+ public void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {
+ this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), messageSelector);
}
@Override
public void unsubscribe(String topic) {
- this.defaultLitePullConsumerImpl.unsubscribe(topic);
+ this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic));
}
@Override
public void assign(Collection<MessageQueue> messageQueues) {
- defaultLitePullConsumerImpl.assign(messageQueues);
+ defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues));
}
@Override
@@ -201,17 +206,17 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
@Override
public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
- this.defaultLitePullConsumerImpl.seek(messageQueue, offset);
+ this.defaultLitePullConsumerImpl.seek(queueWithNamespace(messageQueue), offset);
}
@Override
public void pause(Collection<MessageQueue> messageQueues) {
- this.defaultLitePullConsumerImpl.pause(messageQueues);
+ this.defaultLitePullConsumerImpl.pause(queuesWithNamespace(messageQueues));
}
@Override
public void resume(Collection<MessageQueue> messageQueues) {
- this.defaultLitePullConsumerImpl.resume(messageQueues);
+ this.defaultLitePullConsumerImpl.resume(queuesWithNamespace(messageQueues));
}
@Override
@@ -221,7 +226,12 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
@Override
public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException {
- return this.defaultLitePullConsumerImpl.searchOffset(messageQueue, timestamp);
+ return this.defaultLitePullConsumerImpl.searchOffset(queueWithNamespace(messageQueue), timestamp);
+ }
+
+ public void registerTopicMessageQueueChangeListener(String topic,
+ TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException {
+ this.defaultLitePullConsumerImpl.registerTopicMessageQueueChangeListener(withNamespace(topic), topicMessageQueueChangeListener);
}
@Override
@@ -390,5 +400,4 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
public void setPullDelayTimeMills(long pullDelayTimeMills) {
this.pullDelayTimeMills = pullDelayTimeMills;
}
-
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
index ece08af..87b9dd3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
@@ -36,7 +36,7 @@ public interface LitePullConsumer {
void shutdown();
/**
- * Subscribe some topic
+ * Subscribe some topic with subExpression
*
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
* null or * expression,meaning subscribe all
@@ -44,6 +44,13 @@ public interface LitePullConsumer {
void subscribe(final String topic, final String subExpression) throws MQClientException;
/**
+ * Subscribe some topic with selector.
+ *
+ * @param selector message selector({@link MessageSelector}), can be null.
+ */
+ void subscribe(final String topic, final MessageSelector selector) throws MQClientException;
+
+ /**
* Unsubscribe consumption some topic
*
* @param topic message topic
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/TopicMessageQueueChangeListener.java b/client/src/main/java/org/apache/rocketmq/client/consumer/TopicMessageQueueChangeListener.java
new file mode 100644
index 0000000..fa6fd13
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/TopicMessageQueueChangeListener.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import java.util.Set;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public interface TopicMessageQueueChangeListener {
+ /**
+ * This method will be invoked in the condition of queue numbers changed, These scenarios occur when the topic is
+ * expanded or shrunk.
+ *
+ * @param messageQueues
+ */
+ void onChanged(String topic, Set<MessageQueue> messageQueues);
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
index b21fd01..8dcaa30 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
@@ -138,7 +138,6 @@ public class AssignedMessageQueue {
Map.Entry<MessageQueue, MessageQueueStat> next = it.next();
if (next.getKey().getTopic().equals(topic)) {
if (!assigned.contains(next.getKey())) {
- System.out.printf("MessageQueue-%s is removed %n", next.getKey());
next.getValue().getProcessQueue().setDropped(true);
it.remove();
}
@@ -167,7 +166,6 @@ public class AssignedMessageQueue {
if (!this.assignedMessageQueueState.containsKey(messageQueue)) {
MessageQueueStat messageQueueStat;
if (rebalanceImpl != null && rebalanceImpl.getProcessQueueTable().get(messageQueue) != null) {
- System.out.printf("MessageQueue-%s is added %n", messageQueue);
messageQueueStat = new MessageQueueStat(messageQueue, rebalanceImpl.getProcessQueueTable().get(messageQueue));
} else {
ProcessQueue processQueue = new ProcessQueue();
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 07ef1cf..e17aae6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.client.impl.consumer;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
@@ -28,12 +29,16 @@ import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.consumer.TopicMessageQueueChangeListener;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.PullResult;
@@ -127,6 +132,12 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
+ private final ScheduledExecutorService scheduledExecutorService;
+
+ private Map<String, TopicMessageQueueChangeListener> topicMessageQueueChangeListenerMap = new HashMap<String, TopicMessageQueueChangeListener>();
+
+ private Map<String, Set<MessageQueue>> messageQueuesForTopic = new HashMap<String, Set<MessageQueue>>();
+
private long consumeRequestFlowControlTimes = 0L;
private long queueFlowControlTimes = 0L;
@@ -138,6 +149,16 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
this.defaultLitePullConsumer = defaultLitePullConsumer;
this.rpcHook = rpcHook;
+ this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
+ this.defaultLitePullConsumer.getPullThreadNumbers(),
+ new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup())
+ );
+ this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "MonitorMessageQueueChangeThread");
+ }
+ });
}
private void checkServiceState() {
@@ -266,12 +287,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
mQClientFactory.start();
- final String group = this.defaultLitePullConsumer.getConsumerGroup();
-
- this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
- this.defaultLitePullConsumer.getPullThreadNumbers(),
- new ThreadFactoryImpl("PullMsgThread-" + group)
- );
if (subscriptionType == SubscriptionType.SUBSCRIBE) {
updateTopicSubscribeInfoWhenSubscriptionChanged();
}
@@ -279,8 +294,25 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
updateAssignPullTask(assignedMessageQueue.messageQueues());
}
+ scheduledExecutorService.scheduleAtFixedRate(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ fetchTopicMessageQueuesAndCompare();
+ } catch (Exception e) {
+ log.error("ScheduledTask fetchMessageQueuesAndCompare exception", e);
+ }
+ }
+ }, 1000 * 20, 1000 * 30, TimeUnit.MILLISECONDS);
+
log.info("the consumer [{}] start OK", this.defaultLitePullConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
+ for (String topic : topicMessageQueueChangeListenerMap.keySet()) {
+ Set<MessageQueue> messageQueues = fetchMessageQueues(topic);
+ messageQueuesForTopic.put(topic, messageQueues);
+ }
+ this.mQClientFactory.checkClientInBroker();
break;
case RUNNING:
case START_FAILED:
@@ -339,7 +371,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
-
}
public PullAPIWrapper getPullAPIWrapper() {
@@ -353,7 +384,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
break;
case CLUSTERING:
/*
- * Retry topic support in the future.
+ * Retry topic will be support in the future.
*/
break;
default:
@@ -410,7 +441,28 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
updateTopicSubscribeInfoWhenSubscriptionChanged();
}
} catch (Exception e) {
- throw new MQClientException("subscription exception", e);
+ throw new MQClientException("subscribe exception", e);
+ }
+ }
+
+ public synchronized void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {
+ try {
+ setSubscriptionType(SubscriptionType.SUBSCRIBE);
+ if (messageSelector == null) {
+ subscribe(topic, SubscriptionData.SUB_ALL);
+ return;
+ }
+ SubscriptionData subscriptionData = FilterAPI.build(topic,
+ messageSelector.getExpression(), messageSelector.getExpressionType());
+ this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
+ this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
+ assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
+ if (serviceState == ServiceState.RUNNING) {
+ this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
+ updateTopicSubscribeInfoWhenSubscriptionChanged();
+ }
+ } catch (Exception e) {
+ throw new MQClientException("subscribe exception", e);
}
}
@@ -421,6 +473,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
public synchronized void assign(Collection<MessageQueue> messageQueues) {
+ if (messageQueues == null || messageQueues.isEmpty()) {
+ throw new IllegalArgumentException("Message queues can not be null or empty.");
+ }
setSubscriptionType(SubscriptionType.ASSIGN);
assignedMessageQueue.updateAssignedMessageQueue(messageQueues);
if (serviceState == ServiceState.RUNNING) {
@@ -461,6 +516,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
List<MessageExt> messages = consumeRequest.getMessageExts();
long offset = consumeRequest.getProcessQueue().removeMessage(messages);
assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
+ //If namespace not null , reset Topic without namespace.
+ this.resetTopic(messages);
return messages;
}
} catch (InterruptedException ignore) {
@@ -587,8 +644,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private void submitConsumeRequest(ConsumeRequest consumeRequest) {
try {
consumeRequestCache.put(consumeRequest);
- } catch (InterruptedException ex) {
- log.error("Submit consumeRequest error", ex);
+ } catch (InterruptedException e) {
+ log.error("Submit consumeRequest error", e);
}
}
@@ -649,14 +706,14 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
if (processQueue == null && processQueue.isDropped()) {
- log.info("the message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
+ log.info("The message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
return;
}
if (consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchNums() > defaultLitePullConsumer.getPullThresholdForAll()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((consumeRequestFlowControlTimes++ % 1000) == 0)
- log.warn("the consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes);
+ log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes);
return;
}
@@ -667,7 +724,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
- "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
+ "The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
}
return;
@@ -677,7 +734,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
- "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
+ "The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
}
return;
@@ -687,21 +744,27 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
- "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",
+ "The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), queueMaxSpanFlowControlTimes);
}
return;
}
- String subExpression = null;
- if (subscriptionType == SubscriptionType.SUBSCRIBE) {
- String topic = this.messageQueue.getTopic();
- subExpression = rebalanceImpl.getSubscriptionInner().get(topic).getSubString();
- }
long offset = nextPullOffset(messageQueue);
long pullDelayTimeMills = defaultLitePullConsumer.getPullDelayTimeMills();
try {
- PullResult pullResult = pull(messageQueue, subExpression, offset, nextPullBatchNums());
+
+ SubscriptionData subscriptionData;
+ if (subscriptionType == SubscriptionType.SUBSCRIBE) {
+ String topic = this.messageQueue.getTopic();
+ subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
+ } else{
+ String topic = this.messageQueue.getTopic();
+ subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(),
+ topic, SubscriptionData.SUB_ALL);
+ }
+
+ PullResult pullResult = pull(messageQueue, subscriptionData, offset, nextPullBatchNums());
switch (pullResult.getPullStatus()) {
case FOUND:
if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty()) {
@@ -710,7 +773,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
break;
case OFFSET_ILLEGAL:
- log.warn("the pull request offset illegal, {}", pullResult.toString());
+ log.warn("The pull request offset illegal, {}", pullResult.toString());
break;
case NO_NEW_MSG:
pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_NO_NEW_MSG;
@@ -721,7 +784,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
} catch (Throwable e) {
pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION;
- e.printStackTrace();
log.error("An error occurred in pull message process.", e);
}
@@ -746,58 +808,16 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
}
- private PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums)
+ private PullResult pull(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return pull(mq, subExpression, offset, maxNums, this.defaultLitePullConsumer.getConsumerPullTimeoutMillis());
+ return pull(mq, subscriptionData, offset, maxNums, this.defaultLitePullConsumer.getConsumerPullTimeoutMillis());
}
- private PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)
+ private PullResult pull(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, timeout);
}
- private PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return pull(mq, messageSelector, offset, maxNums, this.defaultLitePullConsumer.getConsumerPullTimeoutMillis());
- }
-
- private PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, long timeout)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector);
- return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, timeout);
- }
-
- private SubscriptionData getSubscriptionData(MessageQueue mq, String subExpression)
- throws MQClientException {
-
- if (null == mq) {
- throw new MQClientException("mq is null", null);
- }
-
- try {
- return FilterAPI.buildSubscriptionData(this.defaultLitePullConsumer.getConsumerGroup(),
- mq.getTopic(), subExpression);
- } catch (Exception e) {
- throw new MQClientException("parse subscription error", e);
- }
- }
-
- private SubscriptionData getSubscriptionData(MessageQueue mq, MessageSelector messageSelector)
- throws MQClientException {
-
- if (null == mq) {
- throw new MQClientException("mq is null", null);
- }
-
- try {
- return FilterAPI.build(mq.getTopic(),
- messageSelector.getExpression(), messageSelector.getExpressionType());
- } catch (Exception e) {
- throw new MQClientException("parse subscription error", e);
- }
- }
-
private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums,
boolean block,
long timeout)
@@ -815,8 +835,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
throw new MQClientException("maxNums <= 0", null);
}
- this.subscriptionAutomatically(mq.getTopic());
-
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
long timeoutMillis = block ? this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
@@ -837,13 +855,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
null
);
this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
- //If namespace not null , reset Topic without namespace.
- this.resetTopic(pullResult.getMsgFoundList());
return pullResult;
}
-
- public void resetTopic(List<MessageExt> msgList) {
+ private void resetTopic(List<MessageExt> msgList) {
if (null == msgList || msgList.size() == 0) {
return;
}
@@ -857,17 +872,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
- public void subscriptionAutomatically(final String topic) {
- if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {
- try {
- SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultLitePullConsumer.getConsumerGroup(),
- topic, SubscriptionData.SUB_ALL);
- this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData);
- } catch (Exception ignore) {
- }
- }
- }
-
public void updateConsumeOffset(MessageQueue mq, long offset) {
checkServiceState();
this.offsetStore.updateOffset(mq, offset, false);
@@ -981,13 +985,59 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public Set<MessageQueue> fetchMessageQueues(String topic) throws MQClientException {
checkServiceState();
- // check if has info in memory, otherwise invoke api.
- Set<MessageQueue> result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
- if (null == result) {
- result = this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic);
+ Set<MessageQueue> result = this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic);
+ return parseMessageQueues(result);
+ }
+
+ private synchronized void fetchTopicMessageQueuesAndCompare() throws MQClientException {
+ for (Map.Entry<String, TopicMessageQueueChangeListener> entry : topicMessageQueueChangeListenerMap.entrySet()) {
+ String topic = entry.getKey();
+ TopicMessageQueueChangeListener topicMessageQueueChangeListener = entry.getValue();
+ Set<MessageQueue> oldMessageQueues = messageQueuesForTopic.get(topic);
+ Set<MessageQueue> newMessageQueues = fetchMessageQueues(topic);
+ boolean isChanged = !isSetEqual(newMessageQueues, oldMessageQueues);
+ if (isChanged) {
+ messageQueuesForTopic.put(topic, newMessageQueues);
+ if (topicMessageQueueChangeListener != null) {
+ topicMessageQueueChangeListener.onChanged(topic, newMessageQueues);
+ }
+ }
}
+ }
- return parseMessageQueues(result);
+ private boolean isSetEqual(Set<MessageQueue> set1, Set<MessageQueue> set2) {
+ if (set1 == null && set2 == null) {
+ return true;
+ }
+
+ if (set1 == null || set2 == null || set1.size() != set2.size()
+ || set1.size() == 0 || set2.size() == 0) {
+ return false;
+ }
+
+ Iterator iter = set2.iterator();
+ boolean isEqual = true;
+ while (iter.hasNext()) {
+ if (!set1.contains(iter.next())) {
+ isEqual = false;
+ }
+ }
+ return isEqual;
+ }
+
+ public synchronized void registerTopicMessageQueueChangeListener(String topic,
+ TopicMessageQueueChangeListener listener) throws MQClientException {
+ if (topic == null || listener == null) {
+ throw new MQClientException("Topic or listener is null", null);
+ }
+ if (topicMessageQueueChangeListenerMap.containsKey(topic)) {
+ log.warn("Topic {} had been registered, new listener will overwrite the old one", topic);
+ }
+ topicMessageQueueChangeListenerMap.put(topic, listener);
+ if (this.serviceState == ServiceState.RUNNING) {
+ Set<MessageQueue> messageQueues = fetchMessageQueues(topic);
+ messageQueuesForTopic.put(topic, messageQueues);
+ }
}
private Set<MessageQueue> parseMessageQueues(Set<MessageQueue> queueSet) {