You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/11/09 11:53:30 UTC
[pulsar] 09/09: [ISSUE-12291][Client] 'StartMessageId' and
'RollbackDuration' not working in MultiTopicsReader for non-partitioned
topics (#12308)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ce8f4505cfc799a74d15f786542cf9639ef33393
Author: Jason918 <ja...@qq.com>
AuthorDate: Thu Nov 4 23:18:10 2021 +0800
[ISSUE-12291][Client] 'StartMessageId' and 'RollbackDuration' not working in MultiTopicsReader for non-partitioned topics (#12308)
Fixes #12291
### Motivation
Bug fix. 'StartMessageId' and 'RollbackDuration' is not working in MultiTopicsReader for non-partitioned topics.
### Modifications
This fix is quite simple. Just add `startMessageId` and `startMessageRollbackDurationInSec` when creating underlying consumer with `ConsumerImpl.newConsumerImpl`
(cherry picked from commit cb48152254b8c16596e7251ef9a7229d918d2e90)
---
.../pulsar/client/impl/MultiTopicsReaderTest.java | 102 ++++++++++++++++++++-
.../apache/pulsar/client/impl/ConsumerImpl.java | 2 +-
.../client/impl/MultiTopicsConsumerImpl.java | 8 +-
3 files changed, 106 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
index 31a426e..a8a6ced 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
@@ -48,7 +48,6 @@ import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -378,6 +377,107 @@ public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest {
Awaitility.await().untilAsserted(() -> assertEquals(client.consumersCount(), 0));
}
+ @Test(timeOut = 20000)
+ public void testMultiNonPartitionedTopicWithStartMessageId() throws Exception {
+ final String topic1 = "persistent://my-property/my-ns/topic1" + UUID.randomUUID();
+ final String topic2 = "persistent://my-property/my-ns/topic2" + UUID.randomUUID();
+ List<String> topics = Arrays.asList(topic1, topic2);
+ PulsarClientImpl client = (PulsarClientImpl) pulsarClient;
+
+ // create producer and send msg
+ List<Producer<String>> producerList = new ArrayList<>();
+ for (String topicName : topics) {
+ producerList.add(pulsarClient.newProducer(Schema.STRING).topic(topicName).create());
+ }
+ int msgNum = 10;
+ Set<String> messages = new HashSet<>();
+ for (int i = 0; i < producerList.size(); i++) {
+ Producer<String> producer = producerList.get(i);
+ for (int j = 0; j < msgNum; j++) {
+ String msg = i + "msg" + j;
+ producer.send(msg);
+ messages.add(msg);
+ }
+ }
+ Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+ .startMessageId(MessageId.earliest)
+ .topics(topics).readerName("my-reader").create();
+ // receive messages
+ while (reader.hasMessageAvailable()) {
+ messages.remove(reader.readNext(5, TimeUnit.SECONDS).getValue());
+ }
+ assertEquals(messages.size(), 0);
+ assertEquals(client.consumersCount(), 1);
+ // clean up
+ for (Producer<String> producer : producerList) {
+ producer.close();
+ }
+ reader.close();
+ Awaitility.await().untilAsserted(() -> assertEquals(client.consumersCount(), 0));
+ }
+
+ @Test(timeOut = 20000)
+ public void testMultiNonPartitionedTopicWithRollbackDuration() throws Exception {
+ final String topic1 = "persistent://my-property/my-ns/topic1" + UUID.randomUUID();
+ final String topic2 = "persistent://my-property/my-ns/topic2" + UUID.randomUUID();
+ List<String> topics = Arrays.asList(topic1, topic2);
+ PulsarClientImpl client = (PulsarClientImpl) pulsarClient;
+
+ // create producer and send msg
+ List<Producer<String>> producerList = new ArrayList<>();
+ for (String topicName : topics) {
+ producerList.add(pulsarClient.newProducer(Schema.STRING).topic(topicName).create());
+ }
+ int totalMsg = 10;
+ Set<String> messages = new HashSet<>();
+ long oldMsgPublishTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(5); // 5 hours old
+ long newMsgPublishTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1); // 5 hours old
+ for (int i = 0; i < producerList.size(); i++) {
+ Producer<String> producer = producerList.get(i);
+ // (1) Publish 10 messages with publish-time 5 HOUR back
+ for (int j = 0; j < totalMsg; j++) {
+ TypedMessageBuilderImpl<String> msg = (TypedMessageBuilderImpl<String>) producer.newMessage()
+ .value(i + "-old-msg-" + j);
+ msg.getMetadataBuilder()
+ .setPublishTime(oldMsgPublishTime)
+ .setProducerName(producer.getProducerName())
+ .setReplicatedFrom("us-west1");
+ msg.send();
+ messages.add(msg.getMessage().getValue());
+ }
+ // (2) Publish 10 messages with publish-time 1 HOUR back
+ for (int j = 0; j < totalMsg; j++) {
+ TypedMessageBuilderImpl<String> msg = (TypedMessageBuilderImpl<String>) producer.newMessage()
+ .value(i + "-new-msg-" + j);
+ msg.getMetadataBuilder()
+ .setPublishTime(newMsgPublishTime)
+ .setProducerName(producer.getProducerName())
+ .setReplicatedFrom("us-west1");
+ msg.send();
+ messages.add(msg.getMessage().getValue());
+ }
+ }
+
+ Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+ .startMessageFromRollbackDuration(2, TimeUnit.HOURS)
+ .topics(topics).readerName("my-reader").create();
+ // receive messages
+ while (reader.hasMessageAvailable()) {
+ messages.remove(reader.readNext(5, TimeUnit.SECONDS).getValue());
+ }
+ assertEquals(messages.size(), 2 * totalMsg);
+ for (String message : messages) {
+ assertTrue(message.contains("old-msg"));
+ }
+ assertEquals(client.consumersCount(), 1);
+ // clean up
+ for (Producer<String> producer : producerList) {
+ producer.close();
+ }
+ reader.close();
+ Awaitility.await().untilAsserted(() -> assertEquals(client.consumersCount(), 0));
+ }
+
@Test(timeOut = 10000)
public void testKeyHashRangeReader() throws Exception {
final List<String> keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b29d6c7..88b6df1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2000,7 +2000,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
if (lastDequeuedMessageId == MessageId.earliest) {
// if we are starting from latest, we should seek to the actual last message first.
// allow the last one to be read when read head inclusively.
- if (startMessageId.equals(MessageId.latest)) {
+ if (MessageId.latest.equals(startMessageId)) {
CompletableFuture<GetLastMessageIdResponse> future = internalGetLastMessageIdAsync();
// if the consumer is configured to read inclusive then we need to seek to the last message
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index fa7c4a2..18bd7bc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -998,8 +998,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
} else {
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
client.externalExecutorProvider(), -1,
- true, subFuture, null, schema, interceptors,
- createIfDoesNotExist);
+ true, subFuture, startMessageId, schema, interceptors,
+ createIfDoesNotExist, startMessageRollbackDurationInSec);
synchronized (pauseMutex) {
if (paused) {
@@ -1294,8 +1294,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(
client, partitionName, configurationData,
client.externalExecutorProvider(),
- partitionIndex, true, subFuture, null, schema, interceptors,
- true /* createTopicIfDoesNotExist */);
+ partitionIndex, true, subFuture, startMessageId, schema, interceptors,
+ true /* createTopicIfDoesNotExist */, startMessageRollbackDurationInSec);
synchronized (pauseMutex) {
if (paused) {
newConsumer.pause();