You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/12/04 04:19:33 UTC
[rocketmq] branch develop updated: Deprecated the ambiguous methods of lite pull consumer (#5640)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e5fadc7db Deprecated the ambiguous methods of lite pull consumer (#5640)
e5fadc7db is described below
commit e5fadc7db7c4a5c098e5f912a19fd71cba3dde3d
Author: rongtong <ji...@163.com>
AuthorDate: Sun Dec 4 12:19:28 2022 +0800
Deprecated the ambiguous methods of lite pull consumer (#5640)
---
.../client/consumer/DefaultLitePullConsumer.java | 16 +++++++----
.../rocketmq/client/consumer/LitePullConsumer.java | 33 +++++++++++++++++++---
.../consumer/DefaultLitePullConsumerTest.java | 2 +-
.../example/simple/LitePullConsumerAssign.java | 2 +-
.../LitePullConsumerAssignWithSubExpression.java | 2 +-
5 files changed, 43 insertions(+), 12 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index d5edd30d6..41461ec26 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -317,21 +317,27 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
this.defaultLitePullConsumerImpl.registerTopicMessageQueueChangeListener(withNamespace(topic), topicMessageQueueChangeListener);
}
+ @Deprecated
@Override
public void commitSync() {
this.defaultLitePullConsumerImpl.commitAll();
}
- /**
- * Offset specified by batch commit
- * @param offsetMap
- * @param persist
- */
+ @Deprecated
@Override
public void commitSync(Map<MessageQueue, Long> offsetMap, boolean persist) {
this.defaultLitePullConsumerImpl.commit(offsetMap, persist);
}
+ @Override
+ public void commit() {
+ this.defaultLitePullConsumerImpl.commitAll();
+ }
+
+ @Override public void commit(Map<MessageQueue, Long> offsetMap, boolean persist) {
+ this.defaultLitePullConsumerImpl.commit(offsetMap, persist);
+ }
+
/**
* Get the MessageQueue assigned in subscribe mode
*
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
index e9e67d055..1c7f74222 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
@@ -182,18 +182,43 @@ public interface LitePullConsumer {
*/
Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException;
+ @Deprecated
/**
- * Manually commit consume offset.
+ * The method is deprecated because its name is ambiguous, this method relies on the background thread commit consumerOffset rather than the synchronous commit offset.
+ * The method is expected to be removed after version 5.1.0. It is recommended to use the {@link #commit()} method.
+ *
+ * Manually commit consume offset saved by the system.
*/
void commitSync();
+ @Deprecated
/**
- * Offset specified by batch commit
- * @param offsetMap
- * @param persist
+ * The method is deprecated because its name is ambiguous, this method relies on the background thread commit consumerOffset rather than the synchronous commit offset.
+ * The method is expected to be removed after version 5.1.0. It is recommended to use the {@link #commit(java.util.Map, boolean)} method.
+ *
+ * @param offsetMap Offset specified by batch commit
*/
void commitSync(Map<MessageQueue, Long> offsetMap, boolean persist);
+ /**
+ * Manually commit consume offset saved by the system. This is a non-blocking method.
+ */
+ void commit();
+
+ /**
+ * Offset specified by batch commit
+ *
+ * @param offsetMap Offset specified by batch commit
+ * @param persist Whether to persist to the broker
+ */
+ void commit(Map<MessageQueue, Long> offsetMap, boolean persist);
+
+ /**
+ * Manually commit consume offset saved by the system.
+ *
+ * @param messageQueues Message queues that need to submit consumer offset
+ * @param persist hether to persist to the broker
+ */
void commit(final Set<MessageQueue> messageQueues, boolean persist);
/**
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
index 9b3d4b936..5fc4df89c 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@ -194,7 +194,7 @@ public class DefaultLitePullConsumerTest {
//commit offset 1
Map<MessageQueue, Long> commitOffset = new HashMap<>();
commitOffset.put(messageQueue, 1L);
- litePullConsumer.commitSync(commitOffset, true);
+ litePullConsumer.commit(commitOffset, true);
assertThat(litePullConsumer.committed(messageQueue)).isEqualTo(1);
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerAssign.java b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerAssign.java
index e638de1c9..0d8fc1c69 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerAssign.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerAssign.java
@@ -43,7 +43,7 @@ public class LitePullConsumerAssign {
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s %n", messageExts);
- litePullConsumer.commitSync();
+ litePullConsumer.commit();
}
} finally {
litePullConsumer.shutdown();
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerAssignWithSubExpression.java b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerAssignWithSubExpression.java
index 0ab106fa1..fb673df3f 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerAssignWithSubExpression.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerAssignWithSubExpression.java
@@ -50,7 +50,7 @@ public class LitePullConsumerAssignWithSubExpression {
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s %n", messageExts);
- litePullConsumer.commitSync();
+ litePullConsumer.commit();
}
} finally {
litePullConsumer.shutdown();