You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/06/29 15:16:34 UTC
[pulsar] branch master updated: [Client] Consumer for a single
partition of a PartitionedTopic should be set with correct partitionIndex
(#4591)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5de6e6b [Client] Consumer for a single partition of a PartitionedTopic should be set with correct partitionIndex (#4591)
5de6e6b is described below
commit 5de6e6ba9b566754f099fd1bd96809ed75ac1543
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Sat Jun 29 23:16:28 2019 +0800
[Client] Consumer for a single partition of a PartitionedTopic should be set with correct partitionIndex (#4591)
This PR fixes #4586 by getting `partitionIndex` from topic name.
---
.../apache/pulsar/client/impl/RawReaderImpl.java | 1 +
.../api/PartitionedProducerConsumerTest.java | 72 +++++++++++++++++++++-
.../apache/pulsar/client/impl/ConsumerImpl.java | 18 +++---
.../client/impl/MultiTopicsConsumerImpl.java | 6 +-
.../pulsar/client/impl/PulsarClientImpl.java | 3 +-
.../org/apache/pulsar/client/impl/ReaderImpl.java | 2 +-
.../pulsar/client/impl/ZeroQueueConsumerImpl.java | 8 +--
.../pulsar/client/impl/ConsumerImplTest.java | 2 +-
8 files changed, 91 insertions(+), 21 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index d1619f5..e6e4c19 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -108,6 +108,7 @@ public class RawReaderImpl implements RawReader {
conf,
client.externalExecutorProvider().getExecutor(),
TopicName.getPartitionIndex(conf.getSingleTopic()),
+ false,
consumerFuture,
SubscriptionMode.Durable,
MessageId.earliest,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index 0a88963..4ab650c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -35,9 +35,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
-import org.apache.pulsar.client.impl.PartitionedProducerImpl;
-import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.client.impl.*;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -670,6 +668,74 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
Collections.singletonList(nonPartitionedTopic));
}
+ @Test
+ public void testMessageIdForSubscribeToSinglePartition() throws Exception {
+ PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+ TopicName topicName = null;
+ TopicName partition2TopicName = null;
+ Producer<byte[]> producer = null;
+ Consumer<byte[]> consumer1 = null;
+ Consumer<byte[]> consumer2 = null;
+ final int numPartitions = 4;
+ final int totalMessages = 30;
+
+ try {
+ log.info("-- Starting {} test --", methodName);
+
+ topicName = TopicName.get("persistent://my-property/my-ns/my-topic-" + System.currentTimeMillis());
+ partition2TopicName = topicName.getPartition(2);
+
+ admin.topics().createPartitionedTopic(topicName.toString(), numPartitions);
+
+ producer = pulsarClient.newProducer().topic(topicName.toString())
+ .messageRouter(new AlwaysTwoMessageRouter())
+ .create();
+
+ consumer1 = pulsarClient.newConsumer().topic(topicName.toString())
+ .subscriptionName("subscriber-partitioned")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscribe();
+
+ consumer2 = pulsarClient.newConsumer().topic(partition2TopicName.toString())
+ .subscriptionName("subscriber-single")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscribe();
+
+ for (int i = 0; i < totalMessages; i++) {
+ String message = "my-message-" + i;
+ producer.newMessage().key(String.valueOf(i)).value(message.getBytes()).send();
+ }
+ producer.flush();
+
+ Message<byte[]> msg;
+
+ for (int i = 0; i < totalMessages; i ++) {
+ msg = consumer1.receive(5, TimeUnit.SECONDS);
+ Assert.assertEquals(((MessageIdImpl)((TopicMessageIdImpl)msg.getMessageId()).getInnerMessageId()).getPartitionIndex(), 2);
+ consumer1.acknowledge(msg);
+ }
+
+ for (int i = 0; i < totalMessages; i ++) {
+ msg = consumer2.receive(5, TimeUnit.SECONDS);
+ Assert.assertEquals(((MessageIdImpl)msg.getMessageId()).getPartitionIndex(), 2);
+ consumer2.acknowledge(msg);
+ }
+
+ } finally {
+ producer.close();
+ consumer1.unsubscribe();
+ consumer1.close();
+ consumer2.unsubscribe();
+ consumer2.close();
+ pulsarClient.close();
+ admin.topics().deletePartitionedTopic(topicName.toString());
+
+ log.info("-- Exiting {} test --", methodName);
+ }
+ }
+
/**
* It verifies that consumer producer auto update for partitions extend.
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 8704fd3..11bb520 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -103,6 +103,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private long subscribeTimeout;
private final int partitionIndex;
+ private final boolean hasParentConsumer;
private final int receiverQueueRefillThreshold;
@@ -153,27 +154,27 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
- ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
+ ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
- return ConsumerImpl.newConsumerImpl(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, subscriptionMode,
+ return ConsumerImpl.newConsumerImpl(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture, subscriptionMode,
startMessageId, schema, interceptors, Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
}
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
- ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
+ ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors,
long backoffIntervalNanos, long maxBackoffIntervalNanos) {
if (conf.getReceiverQueueSize() == 0) {
- return new ZeroQueueConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture,
+ return new ZeroQueueConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture,
subscriptionMode, startMessageId, schema, interceptors, backoffIntervalNanos, maxBackoffIntervalNanos);
} else {
- return new ConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture,
+ return new ConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture,
subscriptionMode, startMessageId, schema, interceptors, backoffIntervalNanos, maxBackoffIntervalNanos);
}
}
protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
- ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
+ ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors,
long backoffIntervalNanos, long maxBackoffIntervalNanos) {
super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema, interceptors);
@@ -183,6 +184,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
AVAILABLE_PERMITS_UPDATER.set(this, 0);
this.subscribeTimeout = System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs();
this.partitionIndex = partitionIndex;
+ this.hasParentConsumer = hasParentConsumer;
this.receiverQueueRefillThreshold = conf.getReceiverQueueSize() / 2;
this.priorityLevel = conf.getPriorityLevel();
this.readCompacted = conf.isReadCompacted();
@@ -548,7 +550,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
// command to receive messages.
// For readers too (isDurable==false), the partition idx will be set though we have to
// send available permits immediately after establishing the reader session
- if (!(firstTimeConnect && partitionIndex > -1 && isDurable) && conf.getReceiverQueueSize() != 0) {
+ if (!(firstTimeConnect && hasParentConsumer && isDurable) && conf.getReceiverQueueSize() != 0) {
sendFlowPermitsToBroker(cnx, conf.getReceiverQueueSize());
}
}).exceptionally((e) -> {
@@ -1050,7 +1052,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
// do not add each item in batch message into tracker
id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(), getPartitionIndex());
}
- if (partitionIndex != -1) {
+ if (hasParentConsumer) {
// we should no longer track this message, TopicsConsumer will take care from now onwards
unAckedMessageTracker.remove(id);
} else {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 5d45ca0..e50e8bc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -760,7 +760,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, partitionName,
configurationData, client.externalExecutorProvider().getExecutor(),
- partitionIndex, subFuture,
+ partitionIndex, true, subFuture,
SubscriptionMode.Durable, null, schema, interceptors,
client.getConfiguration().getDefaultBackoffIntervalNanos(),
client.getConfiguration().getMaxBackoffIntervalNanos());
@@ -774,7 +774,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
- client.externalExecutorProvider().getExecutor(), 0, subFuture, SubscriptionMode.Durable, null,
+ client.externalExecutorProvider().getExecutor(), -1, true, subFuture, SubscriptionMode.Durable, null,
schema, interceptors, client.getConfiguration().getDefaultBackoffIntervalNanos(),
client.getConfiguration().getMaxBackoffIntervalNanos());
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
@@ -992,7 +992,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(
client, partitionName, configurationData,
client.externalExecutorProvider().getExecutor(),
- partitionIndex, subFuture, SubscriptionMode.Durable, null, schema, interceptors,
+ partitionIndex, true, subFuture, SubscriptionMode.Durable, null, schema, interceptors,
client.getConfiguration().getDefaultBackoffIntervalNanos(),
client.getConfiguration().getMaxBackoffIntervalNanos());
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index d345da5..bceaf2c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -353,7 +353,8 @@ public class PulsarClientImpl implements PulsarClient {
consumer = MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf,
listenerThread, consumerSubscribedFuture, metadata.partitions, schema, interceptors);
} else {
- consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, -1,
+ int partitionIndex = TopicName.getPartitionIndex(topic);
+ consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, partitionIndex, false,
consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors,
this.conf.getDefaultBackoffIntervalNanos(), this.conf.getMaxBackoffIntervalNanos());
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 71953b9..df71360 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -83,7 +83,7 @@ public class ReaderImpl<T> implements Reader<T> {
final int partitionIdx = TopicName.getPartitionIndex(readerConfiguration.getTopicName());
consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor,
- partitionIdx, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, null,
+ partitionIdx, false, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, null,
client.getConfiguration().getDefaultBackoffIntervalNanos(), client.getConfiguration().getMaxBackoffIntervalNanos());
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index 6beb0eb..3f3f5bd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -47,18 +47,18 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
private volatile boolean waitingOnReceiveForZeroQueueSize = false;
public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
- ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
+ ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema,
ConsumerInterceptors<T> interceptors) {
- this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, subscriptionMode, startMessageId,
+ this(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture, subscriptionMode, startMessageId,
schema, interceptors, Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
}
public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
- ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
+ ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema,
ConsumerInterceptors<T> interceptors, long backoffIntervalNanos, long maxBackoffIntervalNanos) {
- super(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, subscriptionMode, startMessageId,
+ super(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture, subscriptionMode, startMessageId,
schema, interceptors, backoffIntervalNanos, maxBackoffIntervalNanos);
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 4e6bad6..f463259 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -64,7 +64,7 @@ public class ConsumerImplTest {
consumerConf.setSubscriptionName("test-sub");
consumer = ConsumerImpl.newConsumerImpl(client, topic, consumerConf,
- executorService, -1, subscribeFuture, SubscriptionMode.Durable, null, null, null,
+ executorService, -1, false, subscribeFuture, SubscriptionMode.Durable, null, null, null,
clientConf.getDefaultBackoffIntervalNanos(), clientConf.getMaxBackoffIntervalNanos());
}