You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/08/30 03:21:27 UTC

[rocketmq-ons] 27/29: Polish pull consumer poll method

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch pullConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq-ons.git

commit ea416451b92ad81e0d537bd001afd5512dbd1d1d
Author: duhenglucky <du...@gmail.com>
AuthorDate: Fri Aug 30 11:15:39 2019 +0800

    Polish pull consumer poll method
---
 .../java/org/apache/rocketmq/ons/api/PullConsumer.java  | 17 +++++++++++------
 .../org/apache/rocketmq/ons/api/bean/ProducerBean.java  |  2 --
 ons-core/pom.xml                                        |  6 ++----
 3 files changed, 13 insertions(+), 12 deletions(-)

diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PullConsumer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PullConsumer.java
index 6043183..d7f60a8 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PullConsumer.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PullConsumer.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.ons.api;
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
+import java.time.Duration;
 
 public interface PullConsumer {
 
@@ -67,10 +68,10 @@ public interface PullConsumer {
      * @param timeout
      * @return
      */
-    List<Message> poll(long timeout);
+    List<Message> poll(Duration timeout);
 
     /**
-     * Overrides the fetch offsets that the consumer will use on the next {@link #poll(long)} }. If this API is invoked
+     * Overrides the fetch offsets that the consumer will use on the next {@link #poll(Duration)} }. If this API is invoked
      * for the same message queue more than once, the latest offset will be used on the next poll(). Note that you may
      * lose data if this API is arbitrarily used in the middle of consumption.
      *
@@ -81,7 +82,7 @@ public interface PullConsumer {
 
     /**
      * Overrides the fetch offsets with the beginning offset in server that the consumer will use on the next {@link
-     * #poll(long)} }.
+     * #poll(Duration)} }.
      *
      * @param topicPartition
      */
@@ -89,14 +90,14 @@ public interface PullConsumer {
 
     /**
      * Overrides the fetch offsets with the end offset in server that the consumer will use on the next {@link
-     * #poll(long)} }.
+     * #poll(Duration)} }.
      *
      * @param topicPartition
      */
     void seekToEnd(TopicPartition topicPartition);
 
     /**
-     * Suspend fetching from the requested message queues. Future calls to {@link #poll(long)} will not return any
+     * Suspend fetching from the requested message queues. Future calls to {@link #poll(Duration)} will not return any
      * records from these message queues until they have been resumed using {@link #resume(Collection)}.
      *
      * Note that this method does not affect message queue subscription. In particular, it does not cause a group
@@ -108,7 +109,7 @@ public interface PullConsumer {
 
     /**
      * Resume specified message queues which have been paused with {@link #pause(Collection)}. New calls to {@link
-     * #poll(long)} will return records from these partitions if there are any to be fetched. If the message queues were
+     * #poll(Duration)} will return records from these partitions if there are any to be fetched. If the message queues were
      * not previously paused, this method is a no-op.
      *
      * @param topicPartitions
@@ -135,4 +136,8 @@ public interface PullConsumer {
      */
     Long committed(TopicPartition topicPartition);
 
+    /**
+     * Sync commit current consumed offset to server.
+     */
+    void commitSync();
 }
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java
index 548cc82..1036253 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java
@@ -98,12 +98,10 @@ public class ProducerBean implements Producer {
     }
 
     @Override public SendResult send(Message message, String shardingKey) {
-        //TODO
         return null;
     }
 
     @Override public SendResult send(Collection<Message> messages) {
-        //TODO
         return null;
     }
 }
diff --git a/ons-core/pom.xml b/ons-core/pom.xml
index 27e4142..862c623 100644
--- a/ons-core/pom.xml
+++ b/ons-core/pom.xml
@@ -41,14 +41,12 @@
         <maven.jdoc.skip>true</maven.jdoc.skip>
         <downloadSources>true</downloadSources>
         <!-- compiler settings properties -->
-        <java_source_version>1.6</java_source_version>
-        <java_target_version>1.6</java_target_version>
+        <java_source_version>1.8</java_source_version>
+        <java_target_version>1.8</java_target_version>
         <file_encoding>UTF-8</file_encoding>
         <!-- Always use stable version of RocketMQ -->
         <rocketmq.version>4.5.1</rocketmq.version>
         <auth.version>${project.version}</auth.version>
-        <spring.version>4.1.2.RELEASE</spring.version>
-        <diamond.version>3.7.4</diamond.version>
     </properties>
     <build>
         <plugins>