You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/08/30 03:21:27 UTC
[rocketmq-ons] 27/29: Polish pull consumer poll method
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch pullConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq-ons.git
commit ea416451b92ad81e0d537bd001afd5512dbd1d1d
Author: duhenglucky <du...@gmail.com>
AuthorDate: Fri Aug 30 11:15:39 2019 +0800
Polish pull consumer poll method
---
.../java/org/apache/rocketmq/ons/api/PullConsumer.java | 17 +++++++++++------
.../org/apache/rocketmq/ons/api/bean/ProducerBean.java | 2 --
ons-core/pom.xml | 6 ++----
3 files changed, 13 insertions(+), 12 deletions(-)
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PullConsumer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PullConsumer.java
index 6043183..d7f60a8 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PullConsumer.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PullConsumer.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.ons.api;
import java.util.Collection;
import java.util.List;
import java.util.Set;
+import java.time.Duration;
public interface PullConsumer {
@@ -67,10 +68,10 @@ public interface PullConsumer {
* @param timeout
* @return
*/
- List<Message> poll(long timeout);
+ List<Message> poll(Duration timeout);
/**
- * Overrides the fetch offsets that the consumer will use on the next {@link #poll(long)} }. If this API is invoked
+ * Overrides the fetch offsets that the consumer will use on the next {@link #poll(Duration)} }. If this API is invoked
* for the same message queue more than once, the latest offset will be used on the next poll(). Note that you may
* lose data if this API is arbitrarily used in the middle of consumption.
*
@@ -81,7 +82,7 @@ public interface PullConsumer {
/**
* Overrides the fetch offsets with the beginning offset in server that the consumer will use on the next {@link
- * #poll(long)} }.
+ * #poll(Duration)} }.
*
* @param topicPartition
*/
@@ -89,14 +90,14 @@ public interface PullConsumer {
/**
* Overrides the fetch offsets with the end offset in server that the consumer will use on the next {@link
- * #poll(long)} }.
+ * #poll(Duration)} }.
*
* @param topicPartition
*/
void seekToEnd(TopicPartition topicPartition);
/**
- * Suspend fetching from the requested message queues. Future calls to {@link #poll(long)} will not return any
+ * Suspend fetching from the requested message queues. Future calls to {@link #poll(Duration)} will not return any
* records from these message queues until they have been resumed using {@link #resume(Collection)}.
*
* Note that this method does not affect message queue subscription. In particular, it does not cause a group
@@ -108,7 +109,7 @@ public interface PullConsumer {
/**
* Resume specified message queues which have been paused with {@link #pause(Collection)}. New calls to {@link
- * #poll(long)} will return records from these partitions if there are any to be fetched. If the message queues were
+ * #poll(Duration)} will return records from these partitions if there are any to be fetched. If the message queues were
* not previously paused, this method is a no-op.
*
* @param topicPartitions
@@ -135,4 +136,8 @@ public interface PullConsumer {
*/
Long committed(TopicPartition topicPartition);
+ /**
+ * Sync commit current consumed offset to server.
+ */
+ void commitSync();
}
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java
index 548cc82..1036253 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java
@@ -98,12 +98,10 @@ public class ProducerBean implements Producer {
}
@Override public SendResult send(Message message, String shardingKey) {
- //TODO
return null;
}
@Override public SendResult send(Collection<Message> messages) {
- //TODO
return null;
}
}
diff --git a/ons-core/pom.xml b/ons-core/pom.xml
index 27e4142..862c623 100644
--- a/ons-core/pom.xml
+++ b/ons-core/pom.xml
@@ -41,14 +41,12 @@
<maven.jdoc.skip>true</maven.jdoc.skip>
<downloadSources>true</downloadSources>
<!-- compiler settings properties -->
- <java_source_version>1.6</java_source_version>
- <java_target_version>1.6</java_target_version>
+ <java_source_version>1.8</java_source_version>
+ <java_target_version>1.8</java_target_version>
<file_encoding>UTF-8</file_encoding>
<!-- Always use stable version of RocketMQ -->
<rocketmq.version>4.5.1</rocketmq.version>
<auth.version>${project.version}</auth.version>
- <spring.version>4.1.2.RELEASE</spring.version>
- <diamond.version>3.7.4</diamond.version>
</properties>
<build>
<plugins>