You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/08/30 03:21:25 UTC

[rocketmq-ons] 25/29: Add comments for pull consumer

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch pullConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq-ons.git

commit f1ca1d72e4f7f5ccde7cff1b78fbc4c45ecd9a7e
Author: duhenglucky <du...@gmail.com>
AuthorDate: Wed Jul 17 17:37:31 2019 +0800

    Add comments for pull consumer
---
 .../org/apache/rocketmq/ons/api/LifeCycle.java     |   4 +-
 .../org/apache/rocketmq/ons/api/MessageQueue.java  |  34 +++++++
 .../org/apache/rocketmq/ons/api/PullConsumer.java  | 102 ++++++++++++++++++++-
 3 files changed, 133 insertions(+), 7 deletions(-)

diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/LifeCycle.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/LifeCycle.java
index 51d96c4..811411e 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/LifeCycle.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/LifeCycle.java
@@ -1,4 +1,4 @@
-package org.apache.rocketmq.ons.api;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,7 +14,7 @@ package org.apache.rocketmq.ons.api;/*
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
+package org.apache.rocketmq.ons.api;
 public interface LifeCycle {
     boolean isStarted();
 
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/MessageQueue.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/MessageQueue.java
index abe77b4..55f9c75 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/MessageQueue.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/MessageQueue.java
@@ -41,4 +41,38 @@ public class MessageQueue {
     public void setQueue(String queue) {
         this.queue = queue;
     }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+        result = prime * result + ((queue == null) ? 0 : queue.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        MessageQueue other = (MessageQueue) obj;
+        if (queue == null) {
+            if (other.queue != null) {
+                return false;
+            }
+        } else if (!queue.equals(other.queue)) {
+            return false;
+        }
+        if (topic == null) {
+            if (other.topic != null)
+                return false;
+        } else if (!topic.equals(other.topic)) {
+            return false;
+        }
+        return true;
+    }
 }
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PullConsumer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PullConsumer.java
index abdd966..e4910f2 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PullConsumer.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PullConsumer.java
@@ -18,30 +18,122 @@ package org.apache.rocketmq.ons.api;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Set;
 
-public interface PullConsumer {
-
+public interface PullConsumer extends LifeCycle, Credentials {
+    /**
+     * Subscribe to the given list of topics to get dynamically assigned message queues.
+     *
+     * @param topics
+     */
     void subscribe(final Collection<String> topics);
 
+    /**
+     * Subscribe to the given list of topics to get dynamically assigned message queues with filters.
+     *
+     * @param topics
+     */
     void subscribe(final Collection<String> topics, final String subExpression);
 
+    /**
+     * Unsubscribe the given list of topics.
+     *
+     * @param topics
+     */
     void unsubscribe(final Collection<String> topics);
 
+    /**
+     * Fetch data for the topics or partitions specified using subscribe API. It is an error to not have subscribed to
+     * any topics or partitions before polling for data.
+     *
+     * @param timeout
+     * @return
+     */
     List<Message> poll(long timeout);
 
+    /**
+     * Overrides the fetch offsets that the consumer will use on the next {@link #poll(long)}. If this API is invoked
+     * for the same message queue more than once, the latest offset will be used on the next poll(). Note that you may
+     * lose data if this API is arbitrarily used in the middle of consumption.
+     *
+     * @param messageQueue
+     * @param offset
+     */
     void seek(MessageQueue messageQueue, long offset);
 
+    /**
+     * Overrides the fetch offsets with the beginning offset in server that the consumer will use on the next {@link
+     * #poll(long)}.
+     *
+     * @param messageQueue
+     */
     void seekToBeginning(MessageQueue messageQueue);
 
-    void seekToEnd(MessageQueue messageQueuea);
+    /**
+     * Overrides the fetch offsets with the end offset in server that the consumer will use on the next {@link
+     * #poll(long)}.
+     *
+     * @param messageQueue
+     */
+    void seekToEnd(MessageQueue messageQueue);
 
+    /**
+     * Suspend fetching from the requested message queues. Future calls to {@link #poll(long)} will not return any
+     * records from these message queues until they have been resumed using {@link #resume(Collection)}.
+     *
+     * Note that this method does not affect message queue subscription. In particular, it does not cause a group
+     * rebalance.
+     *
+     * @param messageQueues
+     */
     void pause(Collection<MessageQueue> messageQueues);
 
-    void resume(Collection<MessageQueue> partitions);
+    /**
+     * Resume specified message queues which have been paused with {@link #pause(Collection)}. New calls to {@link
+     * #poll(long)} will return records from these partitions if there are any to be fetched. If the message queues were
+     * not previously paused, this method is a no-op.
+     *
+     * @param messageQueues
+     */
+    void resume(Collection<MessageQueue> messageQueues);
+
+    /**
+     * Get metadata about the partitions for a given topic. This method will issue a remote call to the server if it
+     * does not already have any metadata about the given topic.
+     *
+     * @param topic
+     * @return
+     */
+    Set<MessageQueue> messageQueues(String topic);
 
+    /**
+     * Look up the offsets for the given message queue by timestamp. The returned offset for each message queue is the
+     * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding message
+     * queue.
+     *
+     * @param messageQueue
+     * @param timestamp
+     * @return
+     */
     Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp);
 
+    /**
+     * Get the last committed offset for the given message queue (whether the commit happened by this process or
+     * another). This offset will be used as the position for the consumer in the event of a failure.
+     *
+     * @param messageQueue
+     * @return
+     */
     Long committed(MessageQueue messageQueue);
 
-    void commitSync();
+    /**
+     * Commit the last offset that has been stored securely. Should the process fail and restart, this is the offset
+     * that the consumer will recover to. The consumer can either automatically commit offsets periodically;
+     */
+    void commit();
+
+    /**
+     * Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll.
+     */
+    void wakeup();
 }