You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/07/17 06:35:23 UTC
[rocketmq] 03/03: Add lite pull consumer example
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch litePullConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 11b686e47797be15f187f906e4064264dfa2c993
Author: duhenglucky <du...@gmail.com>
AuthorDate: Wed Jul 17 14:33:29 2019 +0800
Add lite pull consumer example
---
.../client/consumer/DefaultLiteMQPullConsumer.java | 82 ++++++++++++++++++++--
.../client/consumer/LiteMQPullConsumer.java | 3 -
.../impl/consumer/LiteMQPullConsumerImpl.java | 77 +++++++++++++++++---
.../example/simple/LitePullConsumerTest.java | 49 +++++++++++++
4 files changed, 192 insertions(+), 19 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
index 99fd0d9..96d4f5a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
@@ -27,18 +27,50 @@ import org.apache.rocketmq.remoting.RPCHook;
public class DefaultLiteMQPullConsumer extends DefaultMQPullConsumer implements LiteMQPullConsumer {
private LiteMQPullConsumerImpl liteMQPullConsumer;
+ /**
+ * Maximum amount of time in minutes a message may block the consuming thread.
+ */
+ private long consumeTimeout = 15;
+
+ /**
+ * Is auto commit offset
+ */
+ private boolean autoCommit = true;
+
+ private int pullThreadNumbers = 20;
+
+ /**
+ * Maximum commit offset interval time in seconds.
+ */
+ private long autoCommitInterval = 20;
+
public DefaultLiteMQPullConsumer(String consumerGroup, RPCHook rpcHook) {
+ this.setConsumerGroup(consumerGroup);
this.liteMQPullConsumer = new LiteMQPullConsumerImpl(this, rpcHook);
}
- @Override public void subscribe(String topic, String subExpression) throws MQClientException{
+ public DefaultLiteMQPullConsumer(String consumerGroup) {
+ this.setConsumerGroup(consumerGroup);
+ this.liteMQPullConsumer = new LiteMQPullConsumerImpl(this, null);
+ }
+
+ @Override
+ public void start() throws MQClientException{
+ this.liteMQPullConsumer.start();
+ }
+
+ @Override
+ public void subscribe(String topic, String subExpression) throws MQClientException {
this.liteMQPullConsumer.subscribe(topic, subExpression);
}
- @Override public void unsubscribe(String topic) {
+ @Override
+ public void unsubscribe(String topic) {
+ this.liteMQPullConsumer.unsubscribe(topic);
}
- @Override public List<MessageExt> poll() {
+ @Override
+ public List<MessageExt> poll() {
return poll(this.getConsumerPullTimeoutMillis());
}
@@ -46,19 +78,55 @@ public class DefaultLiteMQPullConsumer extends DefaultMQPullConsumer implements
return liteMQPullConsumer.poll(timeout);
}
- @Override public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
+ @Override
+ public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
+ this.liteMQPullConsumer.seek(messageQueue, offset);
+ }
+
+ @Override
+ public void pause(Collection<MessageQueue> messageQueues) {
+ this.liteMQPullConsumer.pause(messageQueues);
+ }
+
+ @Override
+ public void resume(Collection<MessageQueue> messageQueues) {
+ this.liteMQPullConsumer.resume(messageQueues);
+ }
+
+ @Override
+ public void commitSync() {
+ this.liteMQPullConsumer.commit();
+ }
+ public long getConsumeTimeout() {
+ return consumeTimeout;
}
- @Override public void pause(Collection<MessageQueue> messageQueueCollection) {
+ public void setConsumeTimeout(long consumeTimeout) {
+ this.consumeTimeout = consumeTimeout;
+ }
+ public boolean isAutoCommit() {
+ return autoCommit;
}
- @Override public void resume(Collection<MessageQueue> partitions) {
+ public void setAutoCommit(boolean autoCommit) {
+ this.autoCommit = autoCommit;
+ }
+ public int getPullThreadNumbers() {
+ return pullThreadNumbers;
}
- @Override public void commitSync() {
+ public void setPullThreadNumbers(int pullThreadNumbers) {
+ this.pullThreadNumbers = pullThreadNumbers;
+ }
+
+ public long getAutoCommitInterval() {
+ return autoCommitInterval;
+ }
+ public void setAutoCommitInterval(long autoCommitInterval) {
+ this.autoCommitInterval = autoCommitInterval;
}
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java
index 223cca0..da8d1cf 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java
@@ -38,9 +38,6 @@ public interface LiteMQPullConsumer {
*/
void unsubscribe(final String topic);
- /**
- * @return
- */
List<MessageExt> poll();
List<MessageExt> poll(long timeout);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
index abf5f47..d612286 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
@@ -27,12 +27,14 @@ import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.commons.lang3.reflect.FieldUtils;
-import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.DefaultLiteMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -50,6 +52,8 @@ import org.apache.rocketmq.remoting.RPCHook;
public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
private final InternalLogger log = ClientLogger.getLog();
+ private DefaultLiteMQPullConsumer defaultLiteMQPullConsumer;
+
private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
@@ -58,12 +62,21 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
private List<ConsumeRequest> allConsumed = new ArrayList<ConsumeRequest>(256);
private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();
- ;
+
+ private final ScheduledExecutorService cleanExpireMsgExecutors;
private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
- public LiteMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) {
+ private ScheduledExecutorService autoCommitExecutors;
+
+ public LiteMQPullConsumerImpl(final DefaultLiteMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) {
super(defaultMQPullConsumer, rpcHook);
+ this.defaultLiteMQPullConsumer = defaultMQPullConsumer;
+ this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
+ "Lite_CleanExpireMsgScheduledThread_"));
+ this.autoCommitExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
+ "Lite_AutoCommitScheduledThread_"));
+
}
public void updateAssignedMessageQueue(String topic, Set<MessageQueue> assignedMessageQueue) {
@@ -115,18 +128,43 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
@Override
public synchronized void start() throws MQClientException {
+ this.defaultMQPullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
super.start();
final String group = this.defaultMQPullConsumer.getConsumerGroup();
this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
- 10, //this.pullThreadNums,
+ this.defaultLiteMQPullConsumer.getPullThreadNumbers(),
new ThreadFactoryImpl("PullMsgThread-" + group)
);
- this.defaultMQPullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
+ this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ cleanExpireMsg();
+ }
+ }, this.defaultLiteMQPullConsumer.getConsumeTimeout(), this.defaultLiteMQPullConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
+ this.autoCommitExecutors.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ if (defaultLiteMQPullConsumer.isAutoCommit()) {
+ commit();
+ }
+ }
+ }, this.defaultLiteMQPullConsumer.getAutoCommitInterval(), this.defaultLiteMQPullConsumer.getAutoCommitInterval(), TimeUnit.SECONDS);
+ updateTopicSubscribeInfoWhenSubscriptionChanged();
+ }
+
+ private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
+ Map<String, SubscriptionData> subTable = rebalanceImpl.getSubscriptionInner();
+ if (subTable != null) {
+ for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
+ final String topic = entry.getKey();
+ this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
+ }
+ }
}
public List<MessageExt> poll(long timeout) {
try {
- ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.MILLISECONDS);
+ ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.SECONDS);
if (consumeRequest != null) {
List<MessageExt> messages = consumeRequest.getMessageExts();
for (MessageExt messageExt : messages) {
@@ -148,6 +186,16 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
assignedMessageQueue.resume(messageQueues);
}
+ public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
+ this.updatePullOffset(messageQueue, offset);
+ try {
+ updateConsumeOffset(messageQueue, offset);
+ } catch (MQClientException ex) {
+ log.error("Seek offset to remote message queue error!", ex);
+ throw ex;
+ }
+ }
+
public void unsubscribe(final String topic) {
unsubscribe(topic);
removePullTaskCallback(topic);
@@ -270,7 +318,7 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
if (!msgTreeMap.isEmpty()) {
msg = msgTreeMap.firstEntry().getValue();
if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg))
- > 10 * 60 * 1000) {
+ > this.defaultLiteMQPullConsumer.getConsumeTimeout() * 60 * 1000) {
//Expired, ack and remove it.
} else {
break;
@@ -316,16 +364,19 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
@Override
public void run() {
+ System.out.println("begin pull message");
String topic = this.messageQueue.getTopic();
if (!this.isCancelled()) {
if (assignedMessageQueue.isPaused(messageQueue)) {
+ scheduledThreadPoolExecutor.schedule(this, 1000, TimeUnit.MILLISECONDS);
log.debug("Message Queue: {} has been paused!", messageQueue);
return;
}
SubscriptionData subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
long offset = nextPullOffset(messageQueue);
+ long pullDelayTimeMills = 0;
try {
- PullResult pullResult = defaultMQPullConsumer.pull(messageQueue, subscriptionData.getSubString(), offset, nextPullBatchNums());
+ PullResult pullResult = pull(messageQueue, subscriptionData.getSubString(), offset, nextPullBatchNums());
ProcessQueue processQueue = rebalanceImpl.getProcessQueueTable().get(messageQueue);
switch (pullResult.getPullStatus()) {
case FOUND:
@@ -338,9 +389,17 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
break;
}
updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
- } catch (Exception e) {
+ } catch (Throwable e) {
+ pullDelayTimeMills = 1000;
+ e.printStackTrace();
log.error("An error occurred in pull message process.", e);
}
+
+ if (!this.isCancelled()) {
+ scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
+ } else {
+ log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);
+ }
}
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
new file mode 100644
index 0000000..4297e4f
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.simple;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.rocketmq.client.consumer.DefaultLiteMQPullConsumer;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public class LitePullConsumerTest {
+ public static void main(String[] args) throws Exception {
+ DefaultLiteMQPullConsumer litePullConsumer = new DefaultLiteMQPullConsumer("test", null);
+ litePullConsumer.subscribe("test", null);
+ litePullConsumer.start();
+ MessageQueue messageQueue = new MessageQueue("test", "duhengdeMacBook-Pro.local", 1);
+ int i = 0;
+ while (true) {
+ List<MessageExt> messageExts = litePullConsumer.poll();
+ System.out.println("-----------");
+ System.out.println(messageExts);
+ System.out.println("-----------");
+ i++;
+ if (i == 3) {
+ System.out.println("pause");
+ litePullConsumer.pause(Arrays.asList(messageQueue));
+ }
+ if (i == 10) {
+ System.out.println("resume");
+ litePullConsumer.resume(Arrays.asList(messageQueue));
+ }
+ litePullConsumer.commitSync();
+ }
+ }
+}