You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/07/17 06:35:22 UTC

[rocketmq] 02/03: Add pull task logic

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch litePullConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit af04557abf12ce36245d7e362925125cd25e0d3c
Author: duhenglucky <du...@gmail.com>
AuthorDate: Fri Jun 21 09:47:42 2019 +0800

    Add pull task logic
---
 .../client/consumer/DefaultLiteMQPullConsumer.java |  64 ++++++++++
 .../client/consumer/DefaultMQPullConsumer.java     |   2 +
 .../client/consumer/LiteMQPullConsumer.java        |  55 ++++++++
 .../rocketmq/client/consumer/MQPullConsumer.java   |   3 +
 .../impl/consumer/LiteMQPullConsumerImpl.java      | 140 ++++++++++++++++++++-
 5 files changed, 263 insertions(+), 1 deletion(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
new file mode 100644
index 0000000..99fd0d9
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.consumer.LiteMQPullConsumerImpl;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.RPCHook;
+
+public class DefaultLiteMQPullConsumer extends DefaultMQPullConsumer implements LiteMQPullConsumer {
+    private LiteMQPullConsumerImpl liteMQPullConsumer;
+
+    public DefaultLiteMQPullConsumer(String consumerGroup, RPCHook rpcHook) {
+        this.liteMQPullConsumer = new LiteMQPullConsumerImpl(this, rpcHook);
+    }
+
+    @Override public void subscribe(String topic, String subExpression) throws MQClientException{
+        this.liteMQPullConsumer.subscribe(topic, subExpression);
+    }
+
+    @Override public void unsubscribe(String topic) {
+    }
+
+    @Override public List<MessageExt> poll() {
+        return poll(this.getConsumerPullTimeoutMillis());
+    }
+
+    @Override public List<MessageExt> poll(long timeout) {
+        return liteMQPullConsumer.poll(timeout);
+    }
+
+    @Override public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
+
+    }
+
+    @Override public void pause(Collection<MessageQueue> messageQueueCollection) {
+
+    }
+
+    @Override public void resume(Collection<MessageQueue> partitions) {
+
+    }
+
+    @Override public void commitSync() {
+
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index f3b6caa..dbf37d2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -16,7 +16,9 @@
  */
 package org.apache.rocketmq.client.consumer;
 
+import java.util.Collection;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.QueryResult;
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java
new file mode 100644
index 0000000..223cca0
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public interface LiteMQPullConsumer {
+    /**
+     * Subscribe some topic
+     *
+     * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
+     * null or * expression,meaning subscribe all
+     */
+    void subscribe(final String topic, final String subExpression) throws MQClientException;
+
+    /**
+     * Unsubscribe consumption some topic
+     *
+     * @param topic message topic
+     */
+    void unsubscribe(final String topic);
+
+    /**
+     * @return
+     */
+    List<MessageExt> poll();
+
+    List<MessageExt> poll(long timeout);
+
+    void seek(MessageQueue messageQueue, long offset) throws MQClientException;
+
+    void pause(Collection<MessageQueue> messageQueueCollection);
+
+    void resume(Collection<MessageQueue> partitions);
+
+    void commitSync();
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
index 28b807c..9c7cb36 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.rocketmq.client.consumer;
 
+import java.util.Collection;
+import java.util.List;
 import java.util.Set;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -169,4 +171,5 @@ public interface MQPullConsumer extends MQConsumer {
      */
     void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup)
         throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
+
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
index 7332818..abf5f47 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
@@ -17,16 +17,21 @@
 package org.apache.rocketmq.client.impl.consumer;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.consumer.MessageQueueListener;
 import org.apache.rocketmq.client.consumer.PullResult;
@@ -34,6 +39,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.filter.FilterAPI;
+import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
@@ -118,6 +124,81 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
         this.defaultMQPullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
     }
 
+    public List<MessageExt> poll(long timeout) {
+        try {
+            ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.MILLISECONDS);
+            if (consumeRequest != null) {
+                List<MessageExt> messages = consumeRequest.getMessageExts();
+                for (MessageExt messageExt : messages) {
+                    MessageAccessor.setConsumeStartTimeStamp(messageExt, String.valueOf(consumeRequest.getStartConsumeTimeMillis()));
+                }
+                consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
+                return messages;
+            }
+        } catch (InterruptedException ignore) {
+        }
+        return null;
+    }
+
+    public void pause(Collection<MessageQueue> messageQueues) {
+        assignedMessageQueue.pause(messageQueues);
+    }
+
+    public void resume(Collection<MessageQueue> messageQueues) {
+        assignedMessageQueue.resume(messageQueues);
+    }
+
+    public void unsubscribe(final String topic) {
+        unsubscribe(topic);
+        removePullTaskCallback(topic);
+        assignedMessageQueue.removeAssignedMessageQueue(topic);
+    }
+
+    public void removePullTaskCallback(final String topic) {
+        removePullTask(topic);
+    }
+
+    public void removePullTask(final String topic) {
+        synchronized (this.taskTable) {
+            Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
+            while (it.hasNext()) {
+                Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
+                if (next.getKey().getTopic().equals(topic)) {
+                    it.remove();
+                }
+            }
+        }
+    }
+
+    public void commit() {
+        List<ConsumeRequest> consumeRequests;
+        synchronized (this.allConsumed) {
+            consumeRequests = this.allConsumed;
+            this.allConsumed = new ArrayList<ConsumeRequest>();
+        }
+        for (ConsumeRequest consumeRequest : consumeRequests) {
+            consumeRequest.getProcessQueue().removeMessage(consumeRequest.messageExts);
+        }
+        Set<Map.Entry<MessageQueue, Long>> entrySet = assignedMessageQueue.getNeedCommitOffsets().entrySet();
+        for (Map.Entry<MessageQueue, Long> entry : entrySet) {
+            try {
+                updateConsumeOffset(entry.getKey(), entry.getValue());
+            } catch (MQClientException e) {
+                log.error("A error occurred in update consume offset process.", e);
+            }
+        }
+        this.getOffsetStore().persistAll(assignedMessageQueue.getNeedCommitOffsets().keySet());
+    }
+
+    private void commit(final MessageQueue messageQueue, final ProcessQueue processQueue, final MessageExt messageExt) {
+        long offset = processQueue.removeMessage(Collections.singletonList(messageExt));
+        try {
+            updateConsumeOffset(messageQueue, offset);
+        } catch (MQClientException e) {
+            log.error("An error occurred in update consume offset process.", e);
+        }
+    }
+
     public void subscribe(String topic, String subExpression) throws MQClientException {
         try {
             SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(defaultMQPullConsumer.getConsumerGroup(),
@@ -159,7 +240,7 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
         try {
             offset = assignedMessageQueue.getNextOffset(remoteQueue);
             if (offset == -1) {
-                offset = this.defaultMQPullConsumer.fetchConsumeOffset(remoteQueue, false);
+                offset = fetchConsumeOffset(remoteQueue, false);
                 assignedMessageQueue.updateNextOffset(remoteQueue, offset);
             }
         } catch (MQClientException e) {
@@ -168,6 +249,63 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
         return offset;
     }
 
+    private void cleanExpireMsg() {
+        for (final Map.Entry<MessageQueue, ProcessQueue> next : rebalanceImpl.getProcessQueueTable().entrySet()) {
+            ProcessQueue pq = next.getValue();
+            MessageQueue mq = next.getKey();
+            ReadWriteLock lockTreeMap = getLockInProcessQueue(pq);
+            if (lockTreeMap == null) {
+                log.error("Gets tree map lock in process queue error of message queue:", mq);
+                return;
+            }
+
+            TreeMap<Long, MessageExt> msgTreeMap = pq.getMsgTreeMap();
+
+            int loop = msgTreeMap.size();
+            for (int i = 0; i < loop; i++) {
+                MessageExt msg = null;
+                try {
+                    lockTreeMap.readLock().lockInterruptibly();
+                    try {
+                        if (!msgTreeMap.isEmpty()) {
+                            msg = msgTreeMap.firstEntry().getValue();
+                            if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg))
+                                > 10 * 60 * 1000) {
+                                //Expired, ack and remove it.
+                            } else {
+                                break;
+                            }
+                        } else {
+                            break;
+                        }
+                    } finally {
+                        lockTreeMap.readLock().unlock();
+                    }
+                } catch (InterruptedException e) {
+                    log.error("Gets expired message exception", e);
+                }
+
+                try {
+                    this.defaultMQPullConsumer.sendMessageBack(msg, 3);
+                    log.info("Send expired msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}",
+                        msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
+                    System.out.println("Send expired msg back.");
+                    commit(mq, pq, msg);
+                } catch (Exception e) {
+                    log.error("Send back expired msg exception", e);
+                }
+            }
+        }
+    }
+
+    private ReadWriteLock getLockInProcessQueue(ProcessQueue pq) {
+        try {
+            return (ReadWriteLock) FieldUtils.readDeclaredField(pq, "lockTreeMap", true);
+        } catch (IllegalAccessException e) {
+            return null;
+        }
+    }
+
     public class PullTaskImpl implements Runnable {
         private final MessageQueue messageQueue;
         private volatile boolean cancelled = false;