You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/07/17 06:35:22 UTC
[rocketmq] 02/03: Add pull task logic
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch litePullConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit af04557abf12ce36245d7e362925125cd25e0d3c
Author: duhenglucky <du...@gmail.com>
AuthorDate: Fri Jun 21 09:47:42 2019 +0800
Add pull task logic
---
.../client/consumer/DefaultLiteMQPullConsumer.java | 64 ++++++++++
.../client/consumer/DefaultMQPullConsumer.java | 2 +
.../client/consumer/LiteMQPullConsumer.java | 55 ++++++++
.../rocketmq/client/consumer/MQPullConsumer.java | 3 +
.../impl/consumer/LiteMQPullConsumerImpl.java | 140 ++++++++++++++++++++-
5 files changed, 263 insertions(+), 1 deletion(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
new file mode 100644
index 0000000..99fd0d9
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.consumer.LiteMQPullConsumerImpl;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.RPCHook;
+
+public class DefaultLiteMQPullConsumer extends DefaultMQPullConsumer implements LiteMQPullConsumer {
+ private LiteMQPullConsumerImpl liteMQPullConsumer;
+
+ public DefaultLiteMQPullConsumer(String consumerGroup, RPCHook rpcHook) {
+ this.liteMQPullConsumer = new LiteMQPullConsumerImpl(this, rpcHook);
+ }
+
+ @Override public void subscribe(String topic, String subExpression) throws MQClientException{
+ this.liteMQPullConsumer.subscribe(topic, subExpression);
+ }
+
+ @Override public void unsubscribe(String topic) {
+ }
+
+ @Override public List<MessageExt> poll() {
+ return poll(this.getConsumerPullTimeoutMillis());
+ }
+
+ @Override public List<MessageExt> poll(long timeout) {
+ return liteMQPullConsumer.poll(timeout);
+ }
+
+ @Override public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
+
+ }
+
+ @Override public void pause(Collection<MessageQueue> messageQueueCollection) {
+
+ }
+
+ @Override public void resume(Collection<MessageQueue> partitions) {
+
+ }
+
+ @Override public void commitSync() {
+
+ }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index f3b6caa..dbf37d2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -16,7 +16,9 @@
*/
package org.apache.rocketmq.client.consumer;
+import java.util.Collection;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java
new file mode 100644
index 0000000..223cca0
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public interface LiteMQPullConsumer {
+ /**
+ * Subscribe some topic
+ *
+ * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
+ * null or * expression,meaning subscribe all
+ */
+ void subscribe(final String topic, final String subExpression) throws MQClientException;
+
+ /**
+ * Unsubscribe consumption some topic
+ *
+ * @param topic message topic
+ */
+ void unsubscribe(final String topic);
+
+ /**
+ * @return
+ */
+ List<MessageExt> poll();
+
+ List<MessageExt> poll(long timeout);
+
+ void seek(MessageQueue messageQueue, long offset) throws MQClientException;
+
+ void pause(Collection<MessageQueue> messageQueueCollection);
+
+ void resume(Collection<MessageQueue> partitions);
+
+ void commitSync();
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
index 28b807c..9c7cb36 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.client.consumer;
+import java.util.Collection;
+import java.util.List;
import java.util.Set;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -169,4 +171,5 @@ public interface MQPullConsumer extends MQConsumer {
*/
void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
+
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
index 7332818..abf5f47 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
@@ -17,16 +17,21 @@
package org.apache.rocketmq.client.impl.consumer;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.PullResult;
@@ -34,6 +39,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.filter.FilterAPI;
+import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
@@ -118,6 +124,81 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
this.defaultMQPullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
}
+ public List<MessageExt> poll(long timeout) {
+ try {
+ ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.MILLISECONDS);
+ if (consumeRequest != null) {
+ List<MessageExt> messages = consumeRequest.getMessageExts();
+ for (MessageExt messageExt : messages) {
+ MessageAccessor.setConsumeStartTimeStamp(messageExt, String.valueOf(consumeRequest.getStartConsumeTimeMillis()));
+ }
+ consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
+ return messages;
+ }
+ } catch (InterruptedException ignore) {
+ }
+ return null;
+ }
+
+ public void pause(Collection<MessageQueue> messageQueues) {
+ assignedMessageQueue.pause(messageQueues);
+ }
+
+ public void resume(Collection<MessageQueue> messageQueues) {
+ assignedMessageQueue.resume(messageQueues);
+ }
+
+ public void unsubscribe(final String topic) {
+ unsubscribe(topic);
+ removePullTaskCallback(topic);
+ assignedMessageQueue.removeAssignedMessageQueue(topic);
+ }
+
+ public void removePullTaskCallback(final String topic) {
+ removePullTask(topic);
+ }
+
+ public void removePullTask(final String topic) {
+ synchronized (this.taskTable) {
+ Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
+ if (next.getKey().getTopic().equals(topic)) {
+ it.remove();
+ }
+ }
+ }
+ }
+
+ public void commit() {
+ List<ConsumeRequest> consumeRequests;
+ synchronized (this.allConsumed) {
+ consumeRequests = this.allConsumed;
+ this.allConsumed = new ArrayList<ConsumeRequest>();
+ }
+ for (ConsumeRequest consumeRequest : consumeRequests) {
+ consumeRequest.getProcessQueue().removeMessage(consumeRequest.messageExts);
+ }
+ Set<Map.Entry<MessageQueue, Long>> entrySet = assignedMessageQueue.getNeedCommitOffsets().entrySet();
+ for (Map.Entry<MessageQueue, Long> entry : entrySet) {
+ try {
+ updateConsumeOffset(entry.getKey(), entry.getValue());
+ } catch (MQClientException e) {
+ log.error("A error occurred in update consume offset process.", e);
+ }
+ }
+ this.getOffsetStore().persistAll(assignedMessageQueue.getNeedCommitOffsets().keySet());
+ }
+
+ private void commit(final MessageQueue messageQueue, final ProcessQueue processQueue, final MessageExt messageExt) {
+ long offset = processQueue.removeMessage(Collections.singletonList(messageExt));
+ try {
+ updateConsumeOffset(messageQueue, offset);
+ } catch (MQClientException e) {
+ log.error("An error occurred in update consume offset process.", e);
+ }
+ }
+
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(defaultMQPullConsumer.getConsumerGroup(),
@@ -159,7 +240,7 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
try {
offset = assignedMessageQueue.getNextOffset(remoteQueue);
if (offset == -1) {
- offset = this.defaultMQPullConsumer.fetchConsumeOffset(remoteQueue, false);
+ offset = fetchConsumeOffset(remoteQueue, false);
assignedMessageQueue.updateNextOffset(remoteQueue, offset);
}
} catch (MQClientException e) {
@@ -168,6 +249,63 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
return offset;
}
+ private void cleanExpireMsg() {
+ for (final Map.Entry<MessageQueue, ProcessQueue> next : rebalanceImpl.getProcessQueueTable().entrySet()) {
+ ProcessQueue pq = next.getValue();
+ MessageQueue mq = next.getKey();
+ ReadWriteLock lockTreeMap = getLockInProcessQueue(pq);
+ if (lockTreeMap == null) {
+ log.error("Gets tree map lock in process queue error of message queue:", mq);
+ return;
+ }
+
+ TreeMap<Long, MessageExt> msgTreeMap = pq.getMsgTreeMap();
+
+ int loop = msgTreeMap.size();
+ for (int i = 0; i < loop; i++) {
+ MessageExt msg = null;
+ try {
+ lockTreeMap.readLock().lockInterruptibly();
+ try {
+ if (!msgTreeMap.isEmpty()) {
+ msg = msgTreeMap.firstEntry().getValue();
+ if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg))
+ > 10 * 60 * 1000) {
+ //Expired, ack and remove it.
+ } else {
+ break;
+ }
+ } else {
+ break;
+ }
+ } finally {
+ lockTreeMap.readLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("Gets expired message exception", e);
+ }
+
+ try {
+ this.defaultMQPullConsumer.sendMessageBack(msg, 3);
+ log.info("Send expired msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}",
+ msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
+ System.out.println("Send expired msg back.");
+ commit(mq, pq, msg);
+ } catch (Exception e) {
+ log.error("Send back expired msg exception", e);
+ }
+ }
+ }
+ }
+
+ private ReadWriteLock getLockInProcessQueue(ProcessQueue pq) {
+ try {
+ return (ReadWriteLock) FieldUtils.readDeclaredField(pq, "lockTreeMap", true);
+ } catch (IllegalAccessException e) {
+ return null;
+ }
+ }
+
public class PullTaskImpl implements Runnable {
private final MessageQueue messageQueue;
private volatile boolean cancelled = false;