You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/08/30 03:21:25 UTC
[rocketmq-ons] 25/29: Add comments for pull consumer
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch pullConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq-ons.git
commit f1ca1d72e4f7f5ccde7cff1b78fbc4c45ecd9a7e
Author: duhenglucky <du...@gmail.com>
AuthorDate: Wed Jul 17 17:37:31 2019 +0800
Add comments for pull consumer
---
.../org/apache/rocketmq/ons/api/LifeCycle.java | 4 +-
.../org/apache/rocketmq/ons/api/MessageQueue.java | 34 +++++++
.../org/apache/rocketmq/ons/api/PullConsumer.java | 102 ++++++++++++++++++++-
3 files changed, 133 insertions(+), 7 deletions(-)
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/LifeCycle.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/LifeCycle.java
index 51d96c4..811411e 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/LifeCycle.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/LifeCycle.java
@@ -1,4 +1,4 @@
-package org.apache.rocketmq.ons.api;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,7 +14,7 @@ package org.apache.rocketmq.ons.api;/*
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+package org.apache.rocketmq.ons.api;
public interface LifeCycle {
boolean isStarted();
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/MessageQueue.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/MessageQueue.java
index abe77b4..55f9c75 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/MessageQueue.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/MessageQueue.java
@@ -41,4 +41,38 @@ public class MessageQueue {
public void setQueue(String queue) {
this.queue = queue;
}
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+ result = prime * result + ((queue == null) ? 0 : queue.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ MessageQueue other = (MessageQueue) obj;
+ if (queue == null) {
+ if (other.queue != null) {
+ return false;
+ }
+ } else if (!queue.equals(other.queue)) {
+ return false;
+ }
+ if (topic == null) {
+ if (other.topic != null)
+ return false;
+ } else if (!topic.equals(other.topic)) {
+ return false;
+ }
+ return true;
+ }
}
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PullConsumer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PullConsumer.java
index abdd966..e4910f2 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PullConsumer.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PullConsumer.java
@@ -18,30 +18,122 @@ package org.apache.rocketmq.ons.api;
import java.util.Collection;
import java.util.List;
+import java.util.Set;
-public interface PullConsumer {
-
+public interface PullConsumer extends LifeCycle, Credentials {
+ /**
+ * Subscribe to the given list of topics to get dynamically assigned message queues.
+ *
+ * @param topics
+ */
void subscribe(final Collection<String> topics);
+ /**
+ * Subscribe to the given list of topics to get dynamically assigned message queues with filters.
+ *
+ * @param topics
+ */
void subscribe(final Collection<String> topics, final String subExpression);
+ /**
+ * Unsubscribe the given list of topics.
+ *
+ * @param topics
+ */
void unsubscribe(final Collection<String> topics);
+ /**
+ * Fetch data for the topics or partitions specified using subscribe API. It is an error to not have subscribed to
+ * any topics or partitions before polling for data.
+ *
+ * @param timeout
+ * @return
+ */
List<Message> poll(long timeout);
+ /**
+ * Overrides the fetch offsets that the consumer will use on the next {@link #poll(long)}. If this API is invoked
+ * for the same message queue more than once, the latest offset will be used on the next poll(). Note that you may
+ * lose data if this API is arbitrarily used in the middle of consumption.
+ *
+ * @param messageQueue
+ * @param offset
+ */
void seek(MessageQueue messageQueue, long offset);
+ /**
+ * Overrides the fetch offsets with the beginning offset in server that the consumer will use on the next {@link
+ * #poll(long)}.
+ *
+ * @param messageQueue
+ */
void seekToBeginning(MessageQueue messageQueue);
- void seekToEnd(MessageQueue messageQueuea);
+ /**
+ * Overrides the fetch offsets with the end offset in server that the consumer will use on the next {@link
+ * #poll(long)}.
+ *
+ * @param messageQueue
+ */
+ void seekToEnd(MessageQueue messageQueue);
+ /**
+ * Suspend fetching from the requested message queues. Future calls to {@link #poll(long)} will not return any
+ * records from these message queues until they have been resumed using {@link #resume(Collection)}.
+ *
+ * Note that this method does not affect message queue subscription. In particular, it does not cause a group
+ * rebalance.
+ *
+ * @param messageQueues
+ */
void pause(Collection<MessageQueue> messageQueues);
- void resume(Collection<MessageQueue> partitions);
+ /**
+ * Resume specified message queues which have been paused with {@link #pause(Collection)}. New calls to {@link
+ * #poll(long)} will return records from these partitions if there are any to be fetched. If the message queues were
+ * not previously paused, this method is a no-op.
+ *
+ * @param messageQueues
+ */
+ void resume(Collection<MessageQueue> messageQueues);
+
+ /**
+ * Get metadata about the partitions for a given topic. This method will issue a remote call to the server if it
+ * does not already have any metadata about the given topic.
+ *
+ * @param topic
+ * @return
+ */
+ Set<MessageQueue> messageQueues(String topic);
+ /**
+ * Look up the offsets for the given message queue by timestamp. The returned offset for each message queue is the
+ * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding message
+ * queue.
+ *
+ * @param messageQueue
+ * @param timestamp
+ * @return
+ */
Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp);
+ /**
+ * Get the last committed offset for the given message queue (whether the commit happened by this process or
+ * another). This offset will be used as the position for the consumer in the event of a failure.
+ *
+ * @param messageQueue
+ * @return
+ */
Long committed(MessageQueue messageQueue);
- void commitSync();
+ /**
+ * Commit the last offset that has been stored securely. Should the process fail and restart, this is the offset
+ * that the consumer will recover to. The consumer can either automatically commit offsets periodically;
+ */
+ void commit();
+
+ /**
+ * Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll.
+ */
+ void wakeup();
}