You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2022/03/29 04:32:00 UTC
[rocketmq-mqtt] 32/43: fix unit test getOffset 1
This is an automated email from the ASF dual-hosted git repository.
huzongtang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git
commit 00a430e256c40ef1e7d3899751780e5c0803ead4
Author: tianliuliu <64...@qq.com>
AuthorDate: Mon Mar 14 17:25:07 2022 +0800
fix unit test getOffset 1
---
.../rocketmq/mqtt/ds/test/LmqOffsetStoreManagerTest.java | 10 +++++++---
1 file changed, 7 insertions(+), 3 deletions(-)
diff --git a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/LmqOffsetStoreManagerTest.java b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/LmqOffsetStoreManagerTest.java
index 326cec8..43f2a91 100644
--- a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/LmqOffsetStoreManagerTest.java
+++ b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/LmqOffsetStoreManagerTest.java
@@ -20,6 +20,8 @@ package org.apache.rocketmq.mqtt.ds.test;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
@@ -118,18 +120,20 @@ public class LmqOffsetStoreManagerTest {
when(defaultMQPullConsumerImpl.getRebalanceImpl()).thenReturn(rebalanceImpl);
MQClientInstance mqClientInstance = mock(MQClientInstance.class);
when(rebalanceImpl.getmQClientFactory()).thenReturn(mqClientInstance);
-
+ //
MQClientAPIImpl mqClientAPI = mock(MQClientAPIImpl.class);
when(mqClientInstance.getMQClientAPIImpl()).thenReturn(mqClientAPI);
when(mqClientAPI.queryConsumerOffset(any(), any(), anyLong())).thenReturn(10L);
CompletableFuture<Map<Queue, QueueOffset>> offset = lmqOffsetStoreManager.getOffset(clientId, subscription);
-
+ CountDownLatch countDownLatch = new CountDownLatch(1);
offset.whenComplete((offsetMap, throwable) -> {
long offset1 = offsetMap.get(queue).getOffset();
Assert.assertTrue(offset1 == 10L);
+ countDownLatch.countDown();
});
-
+ countDownLatch.await(3, TimeUnit.SECONDS);
+ verify(mqClientAPI).queryConsumerOffset(any(), any(), anyLong());
}
}
\ No newline at end of file