You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/07/17 06:35:23 UTC

[rocketmq] 03/03: Add lite pull consumer example

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch litePullConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 11b686e47797be15f187f906e4064264dfa2c993
Author: duhenglucky <du...@gmail.com>
AuthorDate: Wed Jul 17 14:33:29 2019 +0800

    Add lite pull consumer example
---
 .../client/consumer/DefaultLiteMQPullConsumer.java | 82 ++++++++++++++++++++--
 .../client/consumer/LiteMQPullConsumer.java        |  3 -
 .../impl/consumer/LiteMQPullConsumerImpl.java      | 77 +++++++++++++++++---
 .../example/simple/LitePullConsumerTest.java       | 49 +++++++++++++
 4 files changed, 192 insertions(+), 19 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
index 99fd0d9..96d4f5a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
@@ -27,18 +27,50 @@ import org.apache.rocketmq.remoting.RPCHook;
 public class DefaultLiteMQPullConsumer extends DefaultMQPullConsumer implements LiteMQPullConsumer {
     private LiteMQPullConsumerImpl liteMQPullConsumer;
 
+    /**
+     * Maximum amount of time in minutes a message may block the consuming thread.
+     */
+    private long consumeTimeout = 15;
+
+    /**
+     * Is auto commit offset
+     */
+    private boolean autoCommit = true;
+
+    private int pullThreadNumbers = 20;
+
+    /**
+     * Maximum commit offset interval time in seconds.
+     */
+    private long autoCommitInterval = 20;
+
     public DefaultLiteMQPullConsumer(String consumerGroup, RPCHook rpcHook) {
+        this.setConsumerGroup(consumerGroup);
         this.liteMQPullConsumer = new LiteMQPullConsumerImpl(this, rpcHook);
     }
 
-    @Override public void subscribe(String topic, String subExpression) throws MQClientException{
+    public DefaultLiteMQPullConsumer(String consumerGroup) {
+        this.setConsumerGroup(consumerGroup);
+        this.liteMQPullConsumer = new LiteMQPullConsumerImpl(this, null);
+    }
+
+    @Override
+    public void start() throws MQClientException{
+        this.liteMQPullConsumer.start();
+    }
+
+    @Override
+    public void subscribe(String topic, String subExpression) throws MQClientException {
         this.liteMQPullConsumer.subscribe(topic, subExpression);
     }
 
-    @Override public void unsubscribe(String topic) {
+    @Override
+    public void unsubscribe(String topic) {
+        this.liteMQPullConsumer.unsubscribe(topic);
     }
 
-    @Override public List<MessageExt> poll() {
+    @Override
+    public List<MessageExt> poll() {
         return poll(this.getConsumerPullTimeoutMillis());
     }
 
@@ -46,19 +78,55 @@ public class DefaultLiteMQPullConsumer extends DefaultMQPullConsumer implements
         return liteMQPullConsumer.poll(timeout);
     }
 
-    @Override public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
+    @Override
+    public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
+        this.liteMQPullConsumer.seek(messageQueue, offset);
+    }
+
+    @Override
+    public void pause(Collection<MessageQueue> messageQueues) {
+        this.liteMQPullConsumer.pause(messageQueues);
+    }
+
+    @Override
+    public void resume(Collection<MessageQueue> messageQueues) {
+        this.liteMQPullConsumer.resume(messageQueues);
+    }
+
+    @Override
+    public void commitSync() {
+        this.liteMQPullConsumer.commit();
+    }
 
+    public long getConsumeTimeout() {
+        return consumeTimeout;
     }
 
-    @Override public void pause(Collection<MessageQueue> messageQueueCollection) {
+    public void setConsumeTimeout(long consumeTimeout) {
+        this.consumeTimeout = consumeTimeout;
+    }
 
+    public boolean isAutoCommit() {
+        return autoCommit;
     }
 
-    @Override public void resume(Collection<MessageQueue> partitions) {
+    public void setAutoCommit(boolean autoCommit) {
+        this.autoCommit = autoCommit;
+    }
 
+    public int getPullThreadNumbers() {
+        return pullThreadNumbers;
     }
 
-    @Override public void commitSync() {
+    public void setPullThreadNumbers(int pullThreadNumbers) {
+        this.pullThreadNumbers = pullThreadNumbers;
+    }
+
+    public long getAutoCommitInterval() {
+        return autoCommitInterval;
+    }
 
+    public void setAutoCommitInterval(long autoCommitInterval) {
+        this.autoCommitInterval = autoCommitInterval;
     }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java
index 223cca0..da8d1cf 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java
@@ -38,9 +38,6 @@ public interface LiteMQPullConsumer {
      */
     void unsubscribe(final String topic);
 
-    /**
-     * @return
-     */
     List<MessageExt> poll();
 
     List<MessageExt> poll(long timeout);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
index abf5f47..d612286 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
@@ -27,12 +27,14 @@ import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import org.apache.commons.lang3.reflect.FieldUtils;
-import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.DefaultLiteMQPullConsumer;
 import org.apache.rocketmq.client.consumer.MessageQueueListener;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -50,6 +52,8 @@ import org.apache.rocketmq.remoting.RPCHook;
 public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
     private final InternalLogger log = ClientLogger.getLog();
 
+    private DefaultLiteMQPullConsumer defaultLiteMQPullConsumer;
+
     private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
         new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
 
@@ -58,12 +62,21 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
     private List<ConsumeRequest> allConsumed = new ArrayList<ConsumeRequest>(256);
 
     private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();
-    ;
+
+    private final ScheduledExecutorService cleanExpireMsgExecutors;
 
     private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
 
-    public LiteMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) {
+    private ScheduledExecutorService autoCommitExecutors;
+
+    public LiteMQPullConsumerImpl(final DefaultLiteMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) {
         super(defaultMQPullConsumer, rpcHook);
+        this.defaultLiteMQPullConsumer = defaultMQPullConsumer;
+        this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
+            "Lite_CleanExpireMsgScheduledThread_"));
+        this.autoCommitExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
+            "Lite_AutoCommitScheduledThread_"));
+
     }
 
     public void updateAssignedMessageQueue(String topic, Set<MessageQueue> assignedMessageQueue) {
@@ -115,18 +128,43 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
 
     @Override
     public synchronized void start() throws MQClientException {
+        this.defaultMQPullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
         super.start();
         final String group = this.defaultMQPullConsumer.getConsumerGroup();
         this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
-            10, //this.pullThreadNums,
+            this.defaultLiteMQPullConsumer.getPullThreadNumbers(),
             new ThreadFactoryImpl("PullMsgThread-" + group)
         );
-        this.defaultMQPullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
+        this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                cleanExpireMsg();
+            }
+        }, this.defaultLiteMQPullConsumer.getConsumeTimeout(), this.defaultLiteMQPullConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
+        this.autoCommitExecutors.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                if (defaultLiteMQPullConsumer.isAutoCommit()) {
+                    commit();
+                }
+            }
+        }, this.defaultLiteMQPullConsumer.getAutoCommitInterval(), this.defaultLiteMQPullConsumer.getAutoCommitInterval(), TimeUnit.SECONDS);
+        updateTopicSubscribeInfoWhenSubscriptionChanged();
+    }
+
+    private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
+        Map<String, SubscriptionData> subTable = rebalanceImpl.getSubscriptionInner();
+        if (subTable != null) {
+            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
+                final String topic = entry.getKey();
+                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
+            }
+        }
     }
 
     public List<MessageExt> poll(long timeout) {
         try {
-            ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.MILLISECONDS);
+            ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.SECONDS);
             if (consumeRequest != null) {
                 List<MessageExt> messages = consumeRequest.getMessageExts();
                 for (MessageExt messageExt : messages) {
@@ -148,6 +186,16 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
         assignedMessageQueue.resume(messageQueues);
     }
 
+    public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
+        this.updatePullOffset(messageQueue, offset);
+        try {
+            updateConsumeOffset(messageQueue, offset);
+        } catch (MQClientException ex) {
+            log.error("Seek offset to remote message queue error!", ex);
+            throw ex;
+        }
+    }
+
     public void unsubscribe(final String topic) {
         unsubscribe(topic);
         removePullTaskCallback(topic);
@@ -270,7 +318,7 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
                         if (!msgTreeMap.isEmpty()) {
                             msg = msgTreeMap.firstEntry().getValue();
                             if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg))
-                                > 10 * 60 * 1000) {
+                                > this.defaultLiteMQPullConsumer.getConsumeTimeout() * 60 * 1000) {
                                 //Expired, ack and remove it.
                             } else {
                                 break;
@@ -316,16 +364,19 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
 
         @Override
         public void run() {
+            System.out.println("begin pull message");
             String topic = this.messageQueue.getTopic();
             if (!this.isCancelled()) {
                 if (assignedMessageQueue.isPaused(messageQueue)) {
+                    scheduledThreadPoolExecutor.schedule(this, 1000, TimeUnit.MILLISECONDS);
                     log.debug("Message Queue: {} has been paused!", messageQueue);
                     return;
                 }
                 SubscriptionData subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
                 long offset = nextPullOffset(messageQueue);
+                long pullDelayTimeMills = 0;
                 try {
-                    PullResult pullResult = defaultMQPullConsumer.pull(messageQueue, subscriptionData.getSubString(), offset, nextPullBatchNums());
+                    PullResult pullResult = pull(messageQueue, subscriptionData.getSubString(), offset, nextPullBatchNums());
                     ProcessQueue processQueue = rebalanceImpl.getProcessQueueTable().get(messageQueue);
                     switch (pullResult.getPullStatus()) {
                         case FOUND:
@@ -338,9 +389,17 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
                             break;
                     }
                     updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
-                } catch (Exception e) {
+                } catch (Throwable e) {
+                    pullDelayTimeMills = 1000;
+                    e.printStackTrace();
                     log.error("An error occurred in pull message process.", e);
                 }
+
+                if (!this.isCancelled()) {
+                    scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
+                } else {
+                    log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);
+                }
             }
         }
 
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
new file mode 100644
index 0000000..4297e4f
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.simple;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.rocketmq.client.consumer.DefaultLiteMQPullConsumer;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public class LitePullConsumerTest {
+    public static void main(String[] args) throws Exception {
+        DefaultLiteMQPullConsumer litePullConsumer = new DefaultLiteMQPullConsumer("test", null);
+        litePullConsumer.subscribe("test", null);
+        litePullConsumer.start();
+        MessageQueue messageQueue = new MessageQueue("test", "duhengdeMacBook-Pro.local", 1);
+        int i = 0;
+        while (true) {
+            List<MessageExt> messageExts = litePullConsumer.poll();
+            System.out.println("-----------");
+            System.out.println(messageExts);
+            System.out.println("-----------");
+            i++;
+            if (i == 3) {
+                System.out.println("pause");
+                litePullConsumer.pause(Arrays.asList(messageQueue));
+            }
+            if (i == 10) {
+                System.out.println("resume");
+                litePullConsumer.resume(Arrays.asList(messageQueue));
+            }
+            litePullConsumer.commitSync();
+        }
+    }
+}