You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/12/18 22:21:24 UTC
[pulsar] branch master updated: PIP-68: WaitForExclusive producer
access mode (#8992)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 99476d3 PIP-68: WaitForExclusive producer access mode (#8992)
99476d3 is described below
commit 99476d3196ffe5968d52d571b0be59f8eeeb8fea
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Dec 18 14:20:50 2020 -0800
PIP-68: WaitForExclusive producer access mode (#8992)
* PIP-68: WaitForExclusive producer access mode
* Fixed checkstyle issues
* Fixed log level to info
* Update pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
---
.../pulsar/broker/service/AbstractTopic.java | 102 +++++++++++++++++---
.../pulsar/broker/service/PulsarCommandSender.java | 3 +-
.../broker/service/PulsarCommandSenderImpl.java | 5 +-
.../apache/pulsar/broker/service/ServerCnx.java | 16 ++-
.../org/apache/pulsar/broker/service/Topic.java | 4 +-
.../service/nonpersistent/NonPersistentTopic.java | 1 +
.../broker/service/persistent/PersistentTopic.java | 8 +-
.../broker/service/ExclusiveProducerTest.java | 107 +++++++++++++++++++--
.../pulsar/broker/service/PersistentTopicTest.java | 32 +++---
.../pulsar/client/api/ProducerAccessMode.java | 9 +-
.../org/apache/pulsar/client/impl/ClientCnx.java | 34 +++++--
.../pulsar/client/util/TimedCompletableFuture.java | 29 +++---
.../apache/pulsar/common/api/proto/PulsarApi.java | 57 +++++++++++
.../pulsar/common/policies/data/TopicStats.java | 4 +
.../apache/pulsar/common/protocol/Commands.java | 20 ++--
pulsar-common/src/main/proto/PulsarApi.proto | 5 +
16 files changed, 347 insertions(+), 89 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index bbbde6c..32e074e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -25,8 +25,10 @@ import com.google.common.base.MoreObjects;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@@ -34,6 +36,7 @@ import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
@@ -111,6 +114,8 @@ public abstract class AbstractTopic implements Topic {
protected volatile Optional<Long> topicEpoch = Optional.empty();
private volatile boolean hasExclusiveProducer;
+ private final Queue<Pair<Producer, CompletableFuture<Optional<Long>>>> waitingExclusiveProducers =
+ new ConcurrentLinkedQueue<>();
private static final AtomicLongFieldUpdater<AbstractTopic> USAGE_COUNT_UPDATER =
AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, "usageCount");
@@ -337,14 +342,15 @@ public abstract class AbstractTopic implements Topic {
}
@Override
- public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
+ public CompletableFuture<Optional<Long>> addProducer(Producer producer,
+ CompletableFuture<Void> producerQueuedFuture) {
checkArgument(producer.getTopic() == this);
CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
- incrementTopicEpochIfNeeded(producer)
- .thenAccept(epoch -> {
- lock.readLock().lock();
+ incrementTopicEpochIfNeeded(producer, producerQueuedFuture)
+ .thenAccept(producerEpoch -> {
+ lock.writeLock().lock();
try {
brokerService.checkTopicNsOwnership(getName());
checkTopicFenced();
@@ -360,11 +366,11 @@ public abstract class AbstractTopic implements Topic {
USAGE_COUNT_UPDATER.get(this));
}
- future.complete(epoch);
+ future.complete(producerEpoch);
} catch (Throwable e) {
future.completeExceptionally(e);
} finally {
- lock.readLock().unlock();
+ lock.writeLock().unlock();
}
}).exceptionally(ex -> {
future.completeExceptionally(ex);
@@ -374,12 +380,13 @@ public abstract class AbstractTopic implements Topic {
return future;
}
- protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer producer) {
+ protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer producer,
+ CompletableFuture<Void> producerQueuedFuture) {
lock.writeLock().lock();
try {
switch (producer.getAccessMode()) {
case Shared:
- if (hasExclusiveProducer) {
+ if (hasExclusiveProducer || !waitingExclusiveProducers.isEmpty()) {
return FutureUtil.failedFuture(new ProducerBusyException(
"Topic has an existing exclusive producer: " + producers.keys().nextElement()));
} else {
@@ -388,7 +395,7 @@ public abstract class AbstractTopic implements Topic {
}
case Exclusive:
- if (hasExclusiveProducer) {
+ if (hasExclusiveProducer || !waitingExclusiveProducers.isEmpty()) {
return FutureUtil.failedFuture(new ProducerFencedException(
"Topic has an existing exclusive producer: " + producers.keys().nextElement()));
} else if (!producers.isEmpty()) {
@@ -410,17 +417,52 @@ public abstract class AbstractTopic implements Topic {
} else {
future = incrementTopicEpoch(topicEpoch);
}
+ future.exceptionally(ex -> {
+ hasExclusiveProducer = false;
+ return null;
+ });
+
return future.thenApply(epoch -> {
topicEpoch = Optional.of(epoch);
return topicEpoch;
- }).exceptionally(ex -> {
+ });
+ }
+
+ case WaitForExclusive: {
+ if (hasExclusiveProducer || !producers.isEmpty()) {
+ CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
+ log.info("[{}] Queuing producer {} since there's already a producer", topic, producer);
+ waitingExclusiveProducers.add(Pair.of(producer, future));
+ producerQueuedFuture.complete(null);
+ return future;
+ } else if (producer.getTopicEpoch().isPresent()
+ && producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)) {
+ // If a producer reconnects, but all the topic epoch has already moved forward, this producer needs
+ // to be fenced, because a new producer had been present in between.
+ return FutureUtil.failedFuture(new ProducerFencedException(
+ String.format("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d",
+ topicEpoch.get(), producer.getTopicEpoch().get())));
+ } else {
+ // There are currently no existing producers
+ hasExclusiveProducer = true;
+
+ CompletableFuture<Long> future;
+ if (producer.getTopicEpoch().isPresent()) {
+ future = setTopicEpoch(producer.getTopicEpoch().get());
+ } else {
+ future = incrementTopicEpoch(topicEpoch);
+ }
+ future.exceptionally(ex -> {
hasExclusiveProducer = false;
return null;
});
- }
- // case WaitForExclusive:
- // TODO: Implementation
+ return future.thenApply(epoch -> {
+ topicEpoch = Optional.of(epoch);
+ return topicEpoch;
+ });
+ }
+ }
default:
return FutureUtil.failedFuture(
@@ -584,7 +626,35 @@ public abstract class AbstractTopic implements Topic {
// decrement usage only if this was a valid producer close
long newCount = USAGE_COUNT_UPDATER.decrementAndGet(this);
if (newCount == 0) {
- hasExclusiveProducer = false;
+ lock.writeLock().lock();
+ try {
+ hasExclusiveProducer = false;
+ Pair<Producer, CompletableFuture<Optional<Long>>> nextWaitingProducer =
+ waitingExclusiveProducers.poll();
+ if (nextWaitingProducer != null) {
+ Producer nextProducer = nextWaitingProducer.getKey();
+ CompletableFuture<Optional<Long>> producerFuture = nextWaitingProducer.getValue();
+ hasExclusiveProducer = true;
+
+ CompletableFuture<Long> future;
+ if (nextProducer.getTopicEpoch().isPresent()) {
+ future = setTopicEpoch(nextProducer.getTopicEpoch().get());
+ } else {
+ future = incrementTopicEpoch(topicEpoch);
+ }
+
+ future.thenAccept(epoch -> {
+ topicEpoch = Optional.of(epoch);
+ producerFuture.complete(topicEpoch);
+ }).exceptionally(ex -> {
+ hasExclusiveProducer = false;
+ producerFuture.completeExceptionally(ex);
+ return null;
+ });
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Removed producer -- count: {}", topic, producer.getProducerName(),
@@ -733,6 +803,10 @@ public abstract class AbstractTopic implements Topic {
}
}
+ protected int getWaitingProducersCount() {
+ return waitingExclusiveProducers.size();
+ }
+
protected boolean isExceedMaximumMessageSize(int size) {
Integer maxMessageSize = null;
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
index fd841ad..cece56b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
@@ -39,7 +39,8 @@ public interface PulsarCommandSender {
void sendProducerSuccessResponse(long requestId, String producerName, SchemaVersion schemaVersion);
void sendProducerSuccessResponse(long requestId, String producerName, long lastSequenceId,
- SchemaVersion schemaVersion, Optional<Long> topicEpoch);
+ SchemaVersion schemaVersion, Optional<Long> topicEpoch,
+ boolean isProducerReady);
void sendSendReceiptResponse(long producerId, long sequenceId, long highestId, long ledgerId,
long entryId);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index 2d199be..5d812c4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -98,9 +98,10 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
@Override
public void sendProducerSuccessResponse(long requestId, String producerName, long lastSequenceId,
- SchemaVersion schemaVersion, Optional<Long> topicEpoch) {
+ SchemaVersion schemaVersion, Optional<Long> topicEpoch,
+ boolean isProducerReady) {
PulsarApi.BaseCommand command = Commands.newProducerSuccessCommand(requestId, producerName, lastSequenceId,
- schemaVersion, topicEpoch);
+ schemaVersion, topicEpoch, isProducerReady);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
command.getProducerSuccess().recycle();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index cd73845..9b69244 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1152,17 +1152,18 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
});
schemaVersionFuture.thenAccept(schemaVersion -> {
+ CompletableFuture<Void> producerQueuedFuture = new CompletableFuture<>();
Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName,
getPrincipal(), isEncrypted, metadata, schemaVersion, epoch,
userProvidedProducerName, producerAccessMode, topicEpoch);
- topic.addProducer(producer).thenAccept(newTopicEpoch -> {
+ topic.addProducer(producer, producerQueuedFuture).thenAccept(newTopicEpoch -> {
if (isActive()) {
if (producerFuture.complete(producer)) {
log.info("[{}] Created new producer: {}", remoteAddress, producer);
commandSender.sendProducerSuccessResponse(requestId, producerName,
producer.getLastSequenceId(), producer.getSchemaVersion(),
- newTopicEpoch);
+ newTopicEpoch, true /* producer is ready now */);
return;
} else {
// The producer's future was completed before by
@@ -1192,6 +1193,17 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
}
return null;
});
+
+ producerQueuedFuture.thenRun(() -> {
+ // If the producer is queued waiting, we will get an immediate notification
+ // that we need to pass to client
+ if (isActive()) {
+ log.info("[{}] Producer is waiting in queue: {}", remoteAddress, producer);
+ commandSender.sendProducerSuccessResponse(requestId, producerName,
+ producer.getLastSequenceId(), producer.getSchemaVersion(),
+ Optional.empty(), false/* producer is not ready now */);
+ }
+ });
});
}).exceptionally(exception -> {
Throwable cause = exception.getCause();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 9e50fcf..af88ce8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -98,9 +98,11 @@ public interface Topic {
* Tries to add a producer to the topic. Several validations will be performed.
*
* @param producer
+ * @param producerQueuedFuture
+ * a future that will be triggered if the producer is being queued up prior of getting established
* @return the "topic epoch" if there is one or empty
*/
- CompletableFuture<Optional<Long>> addProducer(Producer producer);
+ CompletableFuture<Optional<Long>> addProducer(Producer producer, CompletableFuture<Void> producerQueuedFuture);
void removeProducer(Producer producer);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 004e656..ac49a96 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -761,6 +761,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : (stats.msgThroughputIn / stats.msgRateIn);
stats.msgInCounter = getMsgInCounter();
stats.bytesInCounter = getBytesInCounter();
+ stats.waitingPublishers = getWaitingProducersCount();
subscriptions.forEach((name, subscription) -> {
NonPersistentSubscriptionStats subStats = subscription.getStats();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 938dde5..676f98c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -496,13 +496,14 @@ public class PersistentTopic extends AbstractTopic
}
@Override
- public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
- return super.addProducer(producer).thenApply(epoch -> {
+ public CompletableFuture<Optional<Long>> addProducer(Producer producer,
+ CompletableFuture<Void> producerQueuedFuture) {
+ return super.addProducer(producer, producerQueuedFuture).thenApply(topicEpoch -> {
messageDeduplication.producerAdded(producer.getProducerName());
// Start replication producers if not already
startReplProducers();
- return epoch;
+ return topicEpoch;
});
}
@@ -1642,6 +1643,7 @@ public class PersistentTopic extends AbstractTopic
stats.msgInCounter = getMsgInCounter();
stats.bytesInCounter = getBytesInCounter();
stats.msgChunkPublished = this.msgChunkPublished;
+ stats.waitingPublishers = getWaitingProducersCount();
subscriptions.forEach((name, subscription) -> {
SubscriptionStats subStats = subscription.getStats(getPreciseBacklog);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
index 0ab3b70..75340c1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
@@ -18,15 +18,18 @@
*/
package org.apache.pulsar.broker.service;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.fail;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
import org.apache.pulsar.client.api.PulsarClientException.ProducerFencedException;
import org.apache.pulsar.client.api.Schema;
@@ -59,9 +62,15 @@ public class ExclusiveProducerTest extends BrokerTestBase {
};
}
- @DataProvider(name = "partitioned")
- public static Object[][] partitioned() {
- return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+ @DataProvider(name = "accessMode")
+ public static Object[][] accessMode() {
+ return new Object[][] {
+ // ProducerAccessMode, partitioned
+ { ProducerAccessMode.Exclusive, Boolean.TRUE},
+ { ProducerAccessMode.Exclusive, Boolean.FALSE },
+ { ProducerAccessMode.WaitForExclusive, Boolean.TRUE },
+ { ProducerAccessMode.WaitForExclusive, Boolean.FALSE },
+ };
}
@Test(dataProvider = "topics")
@@ -124,13 +133,13 @@ public class ExclusiveProducerTest extends BrokerTestBase {
}
}
- @Test(dataProvider = "topics")
- public void producerReconnection(String type, boolean partitioned) throws Exception {
- String topic = newTopic(type, partitioned);
+ @Test(dataProvider = "accessMode")
+ public void producerReconnection(ProducerAccessMode accessMode, boolean partitioned) throws Exception {
+ String topic = newTopic("persistent", partitioned);
Producer<String> p1 = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
- .accessMode(ProducerAccessMode.Exclusive)
+ .accessMode(accessMode)
.create();
p1.send("msg-1");
@@ -140,13 +149,13 @@ public class ExclusiveProducerTest extends BrokerTestBase {
p1.send("msg-2");
}
- @Test(dataProvider = "partitioned")
- public void producerFenced(boolean partitioned) throws Exception {
+ @Test(dataProvider = "accessMode")
+ public void producerFenced(ProducerAccessMode accessMode, boolean partitioned) throws Exception {
String topic = newTopic("persistent", partitioned);
Producer<String> p1 = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
- .accessMode(ProducerAccessMode.Exclusive)
+ .accessMode(accessMode)
.create();
p1.send("msg-1");
@@ -200,6 +209,84 @@ public class ExclusiveProducerTest extends BrokerTestBase {
p1.send("msg-2");
}
+ @Test(dataProvider = "topics")
+ public void waitForExclusiveTest(String type, boolean partitioned) throws Exception {
+ String topic = newTopic(type, partitioned);
+
+ Producer<String> p1 = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .producerName("p1")
+ .accessMode(ProducerAccessMode.WaitForExclusive)
+ .create();
+
+ CompletableFuture<Producer<String>> fp2 = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .producerName("p2")
+ .accessMode(ProducerAccessMode.WaitForExclusive)
+ .createAsync();
+
+ // This sleep is made to ensure P2 is enqueued before P3. Because of the lookups
+ // there's no strict guarantee they would be attempted in the same order otherwise
+ Thread.sleep(1000);
+
+ CompletableFuture<Producer<String>> fp3 = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .producerName("p3")
+ .accessMode(ProducerAccessMode.WaitForExclusive)
+ .createAsync();
+
+ Thread.sleep(1000);
+ // The second producer should get queued
+ assertFalse(fp2.isDone());
+ assertFalse(fp3.isDone());
+
+ p1.close();
+
+ // Now P2 should get created
+ Producer<String> p2 = fp2.get(1, TimeUnit.SECONDS);
+
+ assertFalse(fp3.isDone());
+
+ p2.close();
+
+ // Now P3 should get created
+ Producer<String> p3 = fp3.get(1, TimeUnit.SECONDS);
+ p3.close();
+ }
+
+ @Test(dataProvider = "topics")
+ public void waitForExclusiveWithClientTimeout(String type, boolean partitioned) throws Exception {
+ String topic = newTopic(type, partitioned);
+
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .operationTimeout(1, TimeUnit.SECONDS)
+ .build();
+
+ Producer<String> p1 = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .accessMode(ProducerAccessMode.WaitForExclusive)
+ .create();
+
+ CompletableFuture<Producer<String>> fp2 = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .accessMode(ProducerAccessMode.WaitForExclusive)
+ .createAsync();
+
+ // Wait enough time to have caused an operation timeout on p2
+ Thread.sleep(2000);
+
+ // There should be timeout error, since the broker should reply immediately
+ // with the instruction to wait
+ assertFalse(fp2.isDone());
+
+ p1.close();
+
+ // Now P2 should get created
+ fp2.get(1, TimeUnit.SECONDS);
+ }
+
private String newTopic(String type, boolean isPartitioned) throws Exception {
String topic = type + "://" + newTopicName();
if (isPartitioned) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 6117c8f..5ca7123 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -374,12 +374,12 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
role, false, null, SchemaVersion.Latest, 0, false,
ProducerAccessMode.Shared, Optional.empty());
- topic.addProducer(producer);
+ topic.addProducer(producer, new CompletableFuture<>());
assertEquals(topic.getProducers().size(), 1);
// 2. duplicate add
try {
- topic.addProducer(producer).join();
+ topic.addProducer(producer, new CompletableFuture<>()).join();
fail("Should have failed with naming exception because producer 'null' is already connected to the topic");
} catch (Exception e) {
assertEquals(e.getCause().getClass(), BrokerServiceException.NamingException.class);
@@ -392,7 +392,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
role, false, null, SchemaVersion.Latest,0, false,
ProducerAccessMode.Shared, Optional.empty());
try {
- topic.addProducer(failProducer);
+ topic.addProducer(failProducer, new CompletableFuture<>());
fail("should have failed");
} catch (IllegalArgumentException e) {
// OK
@@ -415,8 +415,8 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
Producer producer2 = new Producer(topic, serverCnx, 2 /* producer id */, "prod-name",
role, false, null, SchemaVersion.Latest, 0, true, ProducerAccessMode.Shared, Optional.empty());
try {
- topic.addProducer(producer1).join();
- topic.addProducer(producer2).join();
+ topic.addProducer(producer1, new CompletableFuture<>()).join();
+ topic.addProducer(producer2, new CompletableFuture<>()).join();
fail("should have failed");
} catch (Exception e) {
// OK
@@ -429,7 +429,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
role, false, null, SchemaVersion.Latest, 1, false, ProducerAccessMode.Shared, Optional.empty());
try {
- topic.addProducer(producer3).join();
+ topic.addProducer(producer3, new CompletableFuture<>()).join();
fail("should have failed");
} catch (Exception e) {
// OK
@@ -444,8 +444,8 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
Producer producer4 = new Producer(topic, serverCnx, 2 /* producer id */, "prod-name",
role, false, null, SchemaVersion.Latest, 2, false, ProducerAccessMode.Shared, Optional.empty());
- topic.addProducer(producer3);
- topic.addProducer(producer4);
+ topic.addProducer(producer3, new CompletableFuture<>());
+ topic.addProducer(producer4, new CompletableFuture<>());
Assert.assertEquals(topic.getProducers().size(), 1);
@@ -457,13 +457,13 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
Producer producer5 = new Producer(topic, serverCnx, 2 /* producer id */, "pulsar.repl.cluster1",
role, false, null, SchemaVersion.Latest, 1, false, ProducerAccessMode.Shared, Optional.empty());
- topic.addProducer(producer5);
+ topic.addProducer(producer5, new CompletableFuture<>());
Assert.assertEquals(topic.getProducers().size(), 1);
Producer producer6 = new Producer(topic, serverCnx, 2 /* producer id */, "pulsar.repl.cluster1",
role, false, null, SchemaVersion.Latest, 2, false, ProducerAccessMode.Shared, Optional.empty());
- topic.addProducer(producer6);
+ topic.addProducer(producer6, new CompletableFuture<>());
Assert.assertEquals(topic.getProducers().size(), 1);
topic.getProducers().values().forEach(producer -> Assert.assertEquals(producer.getEpoch(), 2));
@@ -471,7 +471,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
Producer producer7 = new Producer(topic, serverCnx, 2 /* producer id */, "pulsar.repl.cluster1",
role, false, null, SchemaVersion.Latest, 3, true, ProducerAccessMode.Shared, Optional.empty());
- topic.addProducer(producer7);
+ topic.addProducer(producer7, new CompletableFuture<>());
Assert.assertEquals(topic.getProducers().size(), 1);
topic.getProducers().values().forEach(producer -> Assert.assertEquals(producer.getEpoch(), 3));
}
@@ -482,20 +482,20 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
// 1. add producer1
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name1", role,
false, null, SchemaVersion.Latest,0, false, ProducerAccessMode.Shared, Optional.empty());
- topic.addProducer(producer);
+ topic.addProducer(producer, new CompletableFuture<>());
assertEquals(topic.getProducers().size(), 1);
// 2. add producer2
Producer producer2 = new Producer(topic, serverCnx, 2 /* producer id */, "prod-name2", role,
false, null, SchemaVersion.Latest,0, false, ProducerAccessMode.Shared, Optional.empty());
- topic.addProducer(producer2);
+ topic.addProducer(producer2, new CompletableFuture<>());
assertEquals(topic.getProducers().size(), 2);
// 3. add producer3 but reached maxProducersPerTopic
try {
Producer producer3 = new Producer(topic, serverCnx, 3 /* producer id */, "prod-name3", role,
false, null, SchemaVersion.Latest,0, false, ProducerAccessMode.Shared, Optional.empty());
- topic.addProducer(producer3).join();
+ topic.addProducer(producer3, new CompletableFuture<>()).join();
fail("should have failed");
} catch (Exception e) {
assertEquals(e.getCause().getClass(), BrokerServiceException.ProducerBusyException.class);
@@ -953,7 +953,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
role, false, null, SchemaVersion.Latest, 0, false, ProducerAccessMode.Shared, Optional.empty());
- topic.addProducer(producer).join();
+ topic.addProducer(producer, new CompletableFuture<>()).join();
assertTrue(topic.delete().isCompletedExceptionally());
assertFalse((boolean) isFencedField.get(topic));
@@ -1116,7 +1116,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
Thread.sleep(10); /* delay to ensure that the delete gets executed first */
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
role, false, null, SchemaVersion.Latest, 0, false, ProducerAccessMode.Shared, Optional.empty());
- topic.addProducer(producer).join();
+ topic.addProducer(producer, new CompletableFuture<>()).join();
fail("Should have failed");
} catch (Exception e) {
assertEquals(e.getCause().getClass(), BrokerServiceException.TopicFencedException.class);
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java
index 1a69da8..85199e7 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java
@@ -33,9 +33,8 @@ public enum ProducerAccessMode {
*/
Exclusive,
-// TODO
-// /**
-// * Producer creation is pending until it can acquire exclusive access.
-// */
-// WaitForExclusive,
+ /**
+ * Producer creation is pending until it can acquire exclusive access.
+ */
+ WaitForExclusive,
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 3c66065..4f5b3ed 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -91,6 +91,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
+import org.apache.pulsar.client.util.TimedCompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,10 +101,10 @@ public class ClientCnx extends PulsarHandler {
protected final Authentication authentication;
private State state;
- private final ConcurrentLongHashMap<CompletableFuture<? extends Object>> pendingRequests =
+ private final ConcurrentLongHashMap<TimedCompletableFuture<? extends Object>> pendingRequests =
new ConcurrentLongHashMap<>(16, 1);
// LookupRequests that waiting in client side.
- private final Queue<Pair<Long, Pair<ByteBuf, CompletableFuture<LookupDataResult>>>> waitingLookupRequests;
+ private final Queue<Pair<Long, Pair<ByteBuf, TimedCompletableFuture<LookupDataResult>>>> waitingLookupRequests;
private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new ConcurrentLongHashMap<>(16, 1);
private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = new ConcurrentLongHashMap<>(16, 1);
@@ -468,6 +469,19 @@ public class ClientCnx extends PulsarHandler {
success.getRequestId(), success.getProducerName());
}
long requestId = success.getRequestId();
+ if (!success.getProducerReady()) {
+ // We got a success operation but the producer is not ready. This means that the producer has been queued up
+ // in broker. We need to leave the future pending until we get the final confirmation. We just mark that
+ // we have received a response, in order to avoid the timeout.
+ TimedCompletableFuture<?> requestFuture = (TimedCompletableFuture<?>) pendingRequests.get(requestId);
+ if (requestFuture != null) {
+ log.info("{} Producer {} has been queued up at broker. request: {}", ctx.channel(),
+ success.getProducerName(), requestId);
+ requestFuture.markAsResponded();
+ }
+ return;
+ }
+
CompletableFuture<ProducerResponse> requestFuture = (CompletableFuture<ProducerResponse>) pendingRequests.remove(requestId);
if (requestFuture != null) {
ProducerResponse pr = new ProducerResponse(success.getProducerName(),
@@ -564,7 +578,7 @@ public class ClientCnx extends PulsarHandler {
}
// caller of this method needs to be protected under pendingLookupRequestSemaphore
- private void addPendingLookupRequests(long requestId, CompletableFuture<LookupDataResult> future) {
+ private void addPendingLookupRequests(long requestId, TimedCompletableFuture<LookupDataResult> future) {
pendingRequests.put(requestId, future);
eventLoopGroup.schedule(() -> {
if (!future.isDone()) {
@@ -577,13 +591,13 @@ public class ClientCnx extends PulsarHandler {
private CompletableFuture<LookupDataResult> getAndRemovePendingLookupRequest(long requestId) {
CompletableFuture<LookupDataResult> result = (CompletableFuture<LookupDataResult>) pendingRequests.remove(requestId);
if (result != null) {
- Pair<Long, Pair<ByteBuf, CompletableFuture<LookupDataResult>>> firstOneWaiting = waitingLookupRequests.poll();
+ Pair<Long, Pair<ByteBuf, TimedCompletableFuture<LookupDataResult>>> firstOneWaiting = waitingLookupRequests.poll();
if (firstOneWaiting != null) {
maxLookupRequestSemaphore.release();
// schedule a new lookup in.
eventLoopGroup.submit(() -> {
long newId = firstOneWaiting.getLeft();
- CompletableFuture<LookupDataResult> newFuture = firstOneWaiting.getRight().getRight();
+ TimedCompletableFuture<LookupDataResult> newFuture = firstOneWaiting.getRight().getRight();
addPendingLookupRequests(newId, newFuture);
ctx.writeAndFlush(firstOneWaiting.getRight().getLeft()).addListener(writeFuture -> {
if (!writeFuture.isSuccess()) {
@@ -680,7 +694,7 @@ public class ClientCnx extends PulsarHandler {
}
public CompletableFuture<LookupDataResult> newLookup(ByteBuf request, long requestId) {
- CompletableFuture<LookupDataResult> future = new CompletableFuture<>();
+ TimedCompletableFuture<LookupDataResult> future = new TimedCompletableFuture<>();
if (pendingLookupRequestSemaphore.tryAcquire()) {
addPendingLookupRequests(requestId, future);
@@ -789,7 +803,7 @@ public class ClientCnx extends PulsarHandler {
}
private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId, RequestType requestType) {
- CompletableFuture<T> future = new CompletableFuture<>();
+ TimedCompletableFuture<T> future = new TimedCompletableFuture<>();
pendingRequests.put(requestId, future);
ctx.writeAndFlush(requestMessage).addListener(writeFuture -> {
if (!writeFuture.isSuccess()) {
@@ -1064,8 +1078,10 @@ public class ClientCnx extends PulsarHandler {
break;
}
request = requestTimeoutQueue.poll();
- CompletableFuture<?> requestFuture = pendingRequests.remove(request.requestId);
- if (requestFuture != null && !requestFuture.isDone()) {
+ TimedCompletableFuture<?> requestFuture = pendingRequests.remove(request.requestId);
+ if (requestFuture != null
+ && !requestFuture.isDone()
+ && !requestFuture.hasGotResponse()) {
String timeoutMessage = String.format("%d %s timedout after ms %d", request.requestId, request.requestType.getDescription(), operationTimeoutMs);
if (requestFuture.completeExceptionally(new TimeoutException(timeoutMessage))) {
log.warn("{} {}", ctx.channel(), timeoutMessage);
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/TimedCompletableFuture.java
similarity index 62%
copy from pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/util/TimedCompletableFuture.java
index 1a69da8..30c1eb3 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/TimedCompletableFuture.java
@@ -16,26 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.pulsar.client.util;
-package org.apache.pulsar.client.api;
+import java.util.concurrent.CompletableFuture;
-/**
- * The type of access to the topic that the producer requires.
- */
-public enum ProducerAccessMode {
- /**
- * By default multiple producers can publish on a topic.
- */
- Shared,
+public class TimedCompletableFuture<T> extends CompletableFuture<T> {
+
+ private volatile boolean hasGotResponse = false;
- /**
- * Require exclusive access for producer. Fail immediately if there's already a producer connected.
- */
- Exclusive,
+ public void markAsResponded() {
+ this.hasGotResponse = true;
+ }
-// TODO
-// /**
-// * Producer creation is pending until it can acquire exclusive access.
-// */
-// WaitForExclusive,
+ public boolean hasGotResponse() {
+ return this.hasGotResponse;
+ }
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 3d5d422..aa9f2c1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -25116,6 +25116,10 @@ public final class PulsarApi {
// optional uint64 topic_epoch = 5;
boolean hasTopicEpoch();
long getTopicEpoch();
+
+ // optional bool producer_ready = 6 [default = true];
+ boolean hasProducerReady();
+ boolean getProducerReady();
}
public static final class CommandProducerSuccess extends
org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -25224,12 +25228,23 @@ public final class PulsarApi {
return topicEpoch_;
}
+ // optional bool producer_ready = 6 [default = true];
+ public static final int PRODUCER_READY_FIELD_NUMBER = 6;
+ private boolean producerReady_;
+ public boolean hasProducerReady() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ public boolean getProducerReady() {
+ return producerReady_;
+ }
+
private void initFields() {
requestId_ = 0L;
producerName_ = "";
lastSequenceId_ = -1L;
schemaVersion_ = org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
topicEpoch_ = 0L;
+ producerReady_ = true;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -25271,6 +25286,9 @@ public final class PulsarApi {
if (((bitField0_ & 0x00000010) == 0x00000010)) {
output.writeUInt64(5, topicEpoch_);
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ output.writeBool(6, producerReady_);
+ }
}
private int memoizedSerializedSize = -1;
@@ -25299,6 +25317,10 @@ public final class PulsarApi {
size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
.computeUInt64Size(5, topicEpoch_);
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeBoolSize(6, producerReady_);
+ }
memoizedSerializedSize = size;
return size;
}
@@ -25422,6 +25444,8 @@ public final class PulsarApi {
bitField0_ = (bitField0_ & ~0x00000008);
topicEpoch_ = 0L;
bitField0_ = (bitField0_ & ~0x00000010);
+ producerReady_ = true;
+ bitField0_ = (bitField0_ & ~0x00000020);
return this;
}
@@ -25475,6 +25499,10 @@ public final class PulsarApi {
to_bitField0_ |= 0x00000010;
}
result.topicEpoch_ = topicEpoch_;
+ if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+ to_bitField0_ |= 0x00000020;
+ }
+ result.producerReady_ = producerReady_;
result.bitField0_ = to_bitField0_;
return result;
}
@@ -25496,6 +25524,9 @@ public final class PulsarApi {
if (other.hasTopicEpoch()) {
setTopicEpoch(other.getTopicEpoch());
}
+ if (other.hasProducerReady()) {
+ setProducerReady(other.getProducerReady());
+ }
return this;
}
@@ -25558,6 +25589,11 @@ public final class PulsarApi {
topicEpoch_ = input.readUInt64();
break;
}
+ case 48: {
+ bitField0_ |= 0x00000020;
+ producerReady_ = input.readBool();
+ break;
+ }
}
}
}
@@ -25687,6 +25723,27 @@ public final class PulsarApi {
return this;
}
+ // optional bool producer_ready = 6 [default = true];
+ private boolean producerReady_ = true;
+ public boolean hasProducerReady() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ public boolean getProducerReady() {
+ return producerReady_;
+ }
+ public Builder setProducerReady(boolean value) {
+ bitField0_ |= 0x00000020;
+ producerReady_ = value;
+
+ return this;
+ }
+ public Builder clearProducerReady() {
+ bitField0_ = (bitField0_ & ~0x00000020);
+ producerReady_ = true;
+
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:pulsar.proto.CommandProducerSuccess)
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
index a287897..9633e0f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
@@ -69,6 +69,8 @@ public class TopicStats {
/** List of connected publishers on this topic w/ their stats. */
public List<PublisherStats> publishers;
+ public int waitingPublishers;
+
/** Map of subscriptions with their individual statistics. */
public Map<String, SubscriptionStats> subscriptions;
@@ -107,6 +109,7 @@ public class TopicStats {
this.msgOutCounter = 0;
this.publishers.clear();
this.subscriptions.clear();
+ this.waitingPublishers = 0;
this.replication.clear();
this.deduplicationStatus = null;
this.topicEpoch = null;
@@ -127,6 +130,7 @@ public class TopicStats {
this.msgInCounter += stats.msgInCounter;
this.bytesOutCounter += stats.bytesOutCounter;
this.msgOutCounter += stats.msgOutCounter;
+ this.waitingPublishers += stats.waitingPublishers;
double newAverageMsgSize = (this.averageMsgSize * (this.count - 1) + stats.averageMsgSize) / this.count;
this.averageMsgSize = newAverageMsgSize;
this.storageSize += stats.storageSize;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 4e8c058..7ec9132 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -364,20 +364,22 @@ public class Commands {
public static BaseCommand newProducerSuccessCommand(long requestId, String producerName,
SchemaVersion schemaVersion) {
- return newProducerSuccessCommand(requestId, producerName, -1, schemaVersion, Optional.empty());
+ return newProducerSuccessCommand(requestId, producerName, -1, schemaVersion, Optional.empty(), true);
}
public static ByteBuf newProducerSuccess(long requestId, String producerName, SchemaVersion schemaVersion) {
- return newProducerSuccess(requestId, producerName, -1, schemaVersion, Optional.empty());
+ return newProducerSuccess(requestId, producerName, -1, schemaVersion, Optional.empty(), true);
}
public static BaseCommand newProducerSuccessCommand(long requestId, String producerName, long lastSequenceId,
- SchemaVersion schemaVersion, Optional<Long> topicEpoch) {
+ SchemaVersion schemaVersion, Optional<Long> topicEpoch,
+ boolean isProducerReady) {
CommandProducerSuccess.Builder producerSuccessBuilder = CommandProducerSuccess.newBuilder();
producerSuccessBuilder.setRequestId(requestId);
producerSuccessBuilder.setProducerName(producerName);
producerSuccessBuilder.setLastSequenceId(lastSequenceId);
producerSuccessBuilder.setSchemaVersion(ByteString.copyFrom(schemaVersion.bytes()));
+ producerSuccessBuilder.setProducerReady(isProducerReady);
topicEpoch.ifPresent(producerSuccessBuilder::setTopicEpoch);
CommandProducerSuccess producerSuccess = producerSuccessBuilder.build();
BaseCommand.Builder builder = BaseCommand.newBuilder();
@@ -388,12 +390,14 @@ public class Commands {
}
public static ByteBuf newProducerSuccess(long requestId, String producerName, long lastSequenceId,
- SchemaVersion schemaVersion, Optional<Long> topicEpoch) {
+ SchemaVersion schemaVersion, Optional<Long> topicEpoch,
+ boolean isProducerReady) {
CommandProducerSuccess.Builder producerSuccessBuilder = CommandProducerSuccess.newBuilder();
producerSuccessBuilder.setRequestId(requestId);
producerSuccessBuilder.setProducerName(producerName);
producerSuccessBuilder.setLastSequenceId(lastSequenceId);
producerSuccessBuilder.setSchemaVersion(ByteString.copyFrom(schemaVersion.bytes()));
+ producerSuccessBuilder.setProducerReady(isProducerReady);
topicEpoch.ifPresent(producerSuccessBuilder::setTopicEpoch);
CommandProducerSuccess producerSuccess = producerSuccessBuilder.build();
ByteBuf res = serializeWithSize(
@@ -2251,8 +2255,8 @@ public class Commands {
return PulsarApi.ProducerAccessMode.Exclusive;
case Shared:
return PulsarApi.ProducerAccessMode.Shared;
-// case WaitForExclusive:
-// return PulsarApi.ProducerAccessMode.WaitForExclusive;
+ case WaitForExclusive:
+ return PulsarApi.ProducerAccessMode.WaitForExclusive;
default:
throw new IllegalArgumentException("Unknonw access mode: " + accessMode);
}
@@ -2264,8 +2268,8 @@ public class Commands {
return ProducerAccessMode.Exclusive;
case Shared:
return ProducerAccessMode.Shared;
-// case WaitForExclusive:
-// return ProducerAccessMode.WaitForExclusive;
+ case WaitForExclusive:
+ return ProducerAccessMode.WaitForExclusive;
default:
throw new IllegalArgumentException("Unknonw access mode: " + accessMode);
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index bc5f902..b998828 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -622,6 +622,11 @@ message CommandProducerSuccess {
// The topic epoch assigned by the broker. This field will only be set if we
// were requiring exclusive access when creating the producer.
optional uint64 topic_epoch = 5;
+
+ // If producer is not "ready", the client will avoid to timeout the request
+ // for creating the producer. Instead it will wait indefinitely until it gets
+ // a subsequent `CommandProducerSuccess` with `producer_ready==true`.
+ optional bool producer_ready = 6 [default = true];
}
message CommandError {