You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/09/27 15:42:18 UTC
[pulsar] branch master updated: Allow for topic deletions with
regex consumers (#5230)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 07bb845 Allow for topic deletions with regex consumers (#5230)
07bb845 is described below
commit 07bb84532f3da80d1717c16f377fade16b8c4360
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Sep 27 08:42:11 2019 -0700
Allow for topic deletions with regex consumers (#5230)
* Allow for topic deletions with regex consumers
* Fixed test compilation
* One more compile fix
* Fixed BrokerServiceAutoTopicCreationTest
---
.../broker/service/BrokerServiceException.java | 8 ++
.../apache/pulsar/broker/service/ServerCnx.java | 19 +++-
.../apache/pulsar/client/impl/RawReaderImpl.java | 3 +-
.../BrokerServiceAutoTopicCreationTest.java | 3 +
.../client/impl/PatternTopicsConsumerImplTest.java | 49 ++++++++++
.../pulsar/client/impl/TopicsConsumerImplTest.java | 2 +-
.../pulsar/client/api/PulsarClientException.java | 16 ++++
.../org/apache/pulsar/client/cli/CmdConsume.java | 44 ++++++---
.../org/apache/pulsar/client/impl/ClientCnx.java | 2 +
.../apache/pulsar/client/impl/ConsumerImpl.java | 46 +++++++---
.../client/impl/MultiTopicsConsumerImpl.java | 102 ++++++++++++++++-----
.../impl/PatternMultiTopicsConsumerImpl.java | 7 +-
.../pulsar/client/impl/PulsarClientImpl.java | 7 +-
.../org/apache/pulsar/client/impl/ReaderImpl.java | 6 +-
.../pulsar/client/impl/ZeroQueueConsumerImpl.java | 13 +--
.../pulsar/client/impl/ConsumerImplTest.java | 4 +-
.../client/impl/MultiTopicsConsumerImplTest.java | 2 +-
.../apache/pulsar/common/api/proto/PulsarApi.java | 57 ++++++++++++
.../apache/pulsar/common/protocol/Commands.java | 8 +-
pulsar-common/src/main/proto/PulsarApi.proto | 9 +-
20 files changed, 326 insertions(+), 81 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index 2e28700..3a7c542 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -116,6 +116,12 @@ public class BrokerServiceException extends Exception {
}
}
+ public static class TopicNotFoundException extends BrokerServiceException {
+ public TopicNotFoundException(String msg) {
+ super(msg);
+ }
+ }
+
public static class SubscriptionBusyException extends BrokerServiceException {
public SubscriptionBusyException(String msg) {
super(msg);
@@ -180,6 +186,8 @@ public class BrokerServiceException extends Exception {
} else if (t instanceof ServiceUnitNotReadyException || t instanceof TopicFencedException
|| t instanceof SubscriptionFencedException) {
return PulsarApi.ServerError.ServiceNotReady;
+ } else if (t instanceof TopicNotFoundException) {
+ return PulsarApi.ServerError.TopicNotFound;
} else if (t instanceof IncompatibleSchemaException
|| t instanceof InvalidSchemaDataException) {
// for backward compatible with old clients, invalid schema data
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 1cab8e6..7a3a3d5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -58,6 +58,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
+import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.web.RestException;
@@ -611,6 +612,7 @@ public class ServerCnx extends PulsarHandler {
final InitialPosition initialPosition = subscribe.getInitialPosition();
final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null;
final boolean isReplicated = subscribe.hasReplicateSubscriptionState() && subscribe.getReplicateSubscriptionState();
+ final boolean forceTopicCreation = subscribe.getForceTopicCreation();
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
@@ -676,8 +678,18 @@ public class ServerCnx extends PulsarHandler {
}
}
- service.getOrCreateTopic(topicName.toString())
- .thenCompose(topic -> {
+ boolean createTopicIfDoesNotExist = forceTopicCreation
+ && service.pulsar().getConfig().isAllowAutoTopicCreation();
+
+ service.getTopic(topicName.toString(), createTopicIfDoesNotExist)
+ .thenCompose(optTopic -> {
+ if (!optTopic.isPresent()) {
+ return FutureUtil
+ .failedFuture(new TopicNotFoundException("Topic does not exist"));
+ }
+
+ Topic topic = optTopic.get();
+
if (schema != null) {
return topic.addSchemaIfIdleOrCheckCompatible(schema)
.thenCompose(isCompatible -> {
@@ -728,6 +740,9 @@ public class ServerCnx extends PulsarHandler {
remoteAddress, topicName, subscriptionName,
exception.getCause().getMessage());
}
+ } else if (exception.getCause() instanceof BrokerServiceException) {
+ log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName,
+ subscriptionName, exception.getCause().getMessage());
} else {
log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName,
subscriptionName, exception.getCause().getMessage(), exception);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index a59349d..2f9de09 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -112,7 +112,8 @@ public class RawReaderImpl implements RawReader {
consumerFuture,
SubscriptionMode.Durable,
MessageId.earliest,
- Schema.BYTES, null
+ Schema.BYTES, null,
+ true
);
incomingRawMessages = new GrowableArrayBlockingQueue<>();
pendingRawReceives = new ConcurrentLinkedQueue<>();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index cd1aec9..929f9c2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -22,6 +22,8 @@ import org.apache.pulsar.client.api.PulsarClientException;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -77,6 +79,7 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
final String subscriptionName = "test-topic-sub";
try {
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+ fail("Subscribe operation should have failed");
} catch (Exception e) {
assertTrue(e instanceof PulsarClientException);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 18cf12c..7413825 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -27,6 +27,7 @@ import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@@ -41,6 +42,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -760,4 +762,51 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
producer2.close();
producer3.close();
}
+
+ @Test()
+ public void testTopicDeletion() throws Exception {
+ String baseTopicName = "persistent://my-property/my-ns/pattern-topic-" + System.currentTimeMillis();
+ Pattern pattern = Pattern.compile(baseTopicName + ".*");
+
+ // Create 2 topics
+ Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
+ .topic(baseTopicName + "-1")
+ .create();
+ Producer<String> producer2 = pulsarClient.newProducer(Schema.STRING)
+ .topic(baseTopicName + "-2")
+ .create();
+
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topicsPattern(pattern)
+ .patternAutoDiscoveryPeriod(1)
+ .subscriptionName("sub")
+ .subscribe();
+
+ assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
+ PatternMultiTopicsConsumerImpl<String> consumerImpl = (PatternMultiTopicsConsumerImpl<String>) consumer;
+
+ // 4. verify consumer get methods
+ assertSame(consumerImpl.getPattern(), pattern);
+ assertEquals(consumerImpl.getTopics().size(), 2);
+
+ producer1.send("msg-1");
+
+ producer1.close();
+
+ Message<String> message = consumer.receive();
+ assertEquals(message.getValue(), "msg-1");
+ consumer.acknowledge(message);
+
+ // Force delete the topic while the regex consumer is connected
+ admin.topics().delete(baseTopicName + "-1", true);
+
+ producer2.send("msg-2");
+
+ message = consumer.receive();
+ assertEquals(message.getValue(), "msg-2");
+ consumer.acknowledge(message);
+
+ assertEquals(pulsar.getBrokerService().getTopicIfExists(baseTopicName + "-1").join(), Optional.empty());
+ assertTrue(pulsar.getBrokerService().getTopicIfExists(baseTopicName + "-2").join().isPresent());
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index f2eaca9..bb70d7f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -528,7 +528,7 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
assertEquals(((MultiTopicsConsumerImpl<byte[]>) consumer).getTopics().size(), 2);
// 8. re-subscribe topic3
- CompletableFuture<Void> subFuture = ((MultiTopicsConsumerImpl<byte[]>)consumer).subscribeAsync(topicName3);
+ CompletableFuture<Void> subFuture = ((MultiTopicsConsumerImpl<byte[]>)consumer).subscribeAsync(topicName3, true);
subFuture.get();
// 9. producer publish messages
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index b125458..9f9c38f 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -180,6 +180,22 @@ public class PulsarClientException extends IOException {
}
/**
+ * Topic does not exist and cannot be created.
+ */
+ public static class TopicDoesNotExistException extends PulsarClientException {
+ /**
+ * Constructs an {@code TopicDoesNotExistException} with the specified detail message.
+ *
+ * @param msg
+ * The detail message (which is saved for later retrieval
+ * by the {@link #getMessage()} method)
+ */
+ public TopicDoesNotExistException(String msg) {
+ super(msg);
+ }
+ }
+
+ /**
* Lookup exception thrown by Pulsar client.
*/
public static class LookupException extends PulsarClientException {
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
index 4f86556..cb3463d 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
@@ -18,6 +18,14 @@
*/
package org.apache.pulsar.client.cli;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
@@ -29,6 +37,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
import org.apache.commons.io.HexDump;
import org.apache.commons.lang3.StringUtils;
@@ -36,6 +45,7 @@ import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -54,14 +64,6 @@ import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.ParameterException;
-import com.beust.jcommander.Parameters;
-import com.google.common.util.concurrent.RateLimiter;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
-
/**
* pulsar-client consume command implementation.
*
@@ -91,7 +93,10 @@ public class CmdConsume {
@Parameter(names = { "-r", "--rate" }, description = "Rate (in msg/sec) at which to consume, "
+ "value 0 means to consume messages as fast as possible.")
private double consumeRate = 0;
-
+
+ @Parameter(names = { "--regex" }, description = "Indicate thetopic name is a regex pattern")
+ private boolean isRegex = false;
+
private ClientBuilder clientBuilder;
private Authentication authentication;
private String serviceURL;
@@ -144,7 +149,7 @@ public class CmdConsume {
throw (new ParameterException("Number of messages should be zero or positive."));
String topic = this.mainOptions.get(0);
-
+
if(this.serviceURL.startsWith("ws")) {
return consumeFromWebSocket(topic);
}else {
@@ -158,8 +163,17 @@ public class CmdConsume {
try {
PulsarClient client = clientBuilder.build();
- Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(this.subscriptionName)
- .subscriptionType(subscriptionType).subscribe();
+ ConsumerBuilder<byte[]> builder = client.newConsumer()
+ .subscriptionName(this.subscriptionName)
+ .subscriptionType(subscriptionType);
+
+ if (isRegex) {
+ builder.topicsPattern(Pattern.compile(topic));
+ } else {
+ builder.topic(topic);
+ }
+
+ Consumer<byte[]> consumer = builder.subscribe();
RateLimiter limiter = (this.consumeRate > 0) ? RateLimiter.create(this.consumeRate) : null;
while (this.numMessagesToConsume == 0 || numMessagesConsumed < this.numMessagesToConsume) {
@@ -197,13 +211,13 @@ public class CmdConsume {
int returnCode = 0;
TopicName topicName = TopicName.get(topic);
-
+
String wsTopic = String.format(
"%s/%s/" + (StringUtils.isEmpty(topicName.getCluster()) ? "" : topicName.getCluster() + "/")
+ "%s/%s/%s?subscriptionType=%s",
topicName.getDomain(), topicName.getTenant(), topicName.getNamespacePortion(), topicName.getLocalName(),
subscriptionName, subscriptionType.toString());
-
+
String consumerBaseUri = serviceURL + (serviceURL.endsWith("/") ? "" : "/") + "ws/consumer/" + wsTopic;
URI consumerUri = URI.create(consumerBaseUri);
@@ -252,7 +266,7 @@ public class CmdConsume {
LOG.debug("No message to consume after waiting for 5 seconds.");
} else {
try {
- System.out.println(Base64.getDecoder().decode(msg));
+ System.out.println(Base64.getDecoder().decode(msg));
}catch(Exception e) {
System.out.println(msg);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 32f73c9..c5e0d9d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -911,6 +911,8 @@ public class ClientCnx extends PulsarHandler {
return new PulsarClientException.TopicTerminatedException(errorMsg);
case IncompatibleSchema:
return new PulsarClientException.IncompatibleSchemaException(errorMsg);
+ case TopicNotFound:
+ return new PulsarClientException.TopicDoesNotExistException(errorMsg);
case UnknownError:
default:
return new PulsarClientException(errorMsg);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b13e2fa..7a1dc1f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -61,6 +61,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.PulsarClientException.TopicDoesNotExistException;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.protocol.Commands;
@@ -141,6 +142,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
protected volatile boolean paused;
+ private final boolean createTopicIfDoesNotExist;
+
enum SubscriptionMode {
// Make the subscription to be backed by a durable cursor that will retain messages and persist the current
// position
@@ -151,20 +154,25 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
- ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
- SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
+ ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
+ SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors,
+ boolean createTopicIfDoesNotExist) {
if (conf.getReceiverQueueSize() == 0) {
- return new ZeroQueueConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture,
- subscriptionMode, startMessageId, schema, interceptors);
+ return new ZeroQueueConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer,
+ subscribeFuture,
+ subscriptionMode, startMessageId, schema, interceptors,
+ createTopicIfDoesNotExist);
} else {
- return new ConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture,
- subscriptionMode, startMessageId, schema, interceptors);
+ return new ConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer,
+ subscribeFuture,
+ subscriptionMode, startMessageId, schema, interceptors, createTopicIfDoesNotExist);
}
}
protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
- ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
- SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
+ ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
+ SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors,
+ boolean createTopicIfDoesNotExist) {
super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema, interceptors);
this.consumerId = client.newConsumerId();
this.subscriptionMode = subscriptionMode;
@@ -179,6 +187,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition();
this.negativeAcksTracker = new NegativeAcksTracker(this, conf);
this.resetIncludeHead = conf.isResetIncludeHead();
+ this.createTopicIfDoesNotExist = createTopicIfDoesNotExist;
if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
stats = new ConsumerStatsRecorderImpl(client, conf, this);
@@ -510,7 +519,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel,
consumerName, isDurable, startMessageIdData, metadata, readCompacted,
conf.isReplicateSubscriptionState(), InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
- si);
+ si, createTopicIfDoesNotExist);
if (startMessageIdData != null) {
startMessageIdData.recycle();
}
@@ -548,18 +557,27 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
return null;
}
log.warn("[{}][{}] Failed to subscribe to topic on {}", topic, subscription, cnx.channel().remoteAddress());
- if (e.getCause() instanceof PulsarClientException && getConnectionHandler().isRetriableError((PulsarClientException) e.getCause())
+
+ if (e.getCause() instanceof PulsarClientException
+ && getConnectionHandler().isRetriableError((PulsarClientException) e.getCause())
&& System.currentTimeMillis() < subscribeTimeout) {
reconnectLater(e.getCause());
- return null;
- }
-
- if (!subscribeFuture.isDone()) {
+ } else if (!subscribeFuture.isDone()) {
// unable to create new consumer, fail operation
setState(State.Failed);
closeConsumerTasks();
subscribeFuture.completeExceptionally(e);
client.cleanupConsumer(this);
+ } else if (e.getCause() instanceof TopicDoesNotExistException) {
+ // The topic was deleted after the consumer was created, and we're
+ // not allowed to recreate the topic. This can happen in few cases:
+ // * Regex consumer getting error after topic gets deleted
+ // * Regular consumer after topic is manually delete and with
+ // auto-topic-creation set to false
+ // No more retries are needed in this case.
+ setState(State.Failed);
+ client.cleanupConsumer(this);
+ log.warn("[{}][{}] Closed consumer because topic does not exist anymore {}", topic, subscription, cnx.channel().remoteAddress());
} else {
// consumer was subscribed and connected but we got some error, keep trying
reconnectLater(e.getCause());
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 3986285..f950f16 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -45,6 +45,8 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerStats;
@@ -101,14 +103,14 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
- ConsumerInterceptors<T> interceptors) {
+ ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) {
this(client, DUMMY_TOPIC_NAME_PREFIX + ConsumerName.generateRandomName(), conf, listenerExecutor,
- subscribeFuture, schema, interceptors);
+ subscribeFuture, schema, interceptors, createTopicIfDoesNotExist);
}
MultiTopicsConsumerImpl(PulsarClientImpl client, String singleTopic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
- ConsumerInterceptors<T> interceptors) {
+ ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) {
super(client, singleTopic, conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, subscribeFuture,
schema, interceptors);
@@ -152,7 +154,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
this.namespaceName = conf.getTopicNames().stream().findFirst()
.flatMap(s -> Optional.of(TopicName.get(s).getNamespaceObject())).get();
- List<CompletableFuture<Void>> futures = conf.getTopicNames().stream().map(this::subscribeAsync)
+ List<CompletableFuture<Void>> futures = conf.getTopicNames().stream().map(t -> subscribeAsync(t, createTopicIfDoesNotExist))
.collect(Collectors.toList());
FutureUtil.waitForAll(futures)
.thenAccept(finalFuture -> {
@@ -661,7 +663,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
}
// subscribe one more given topic
- public CompletableFuture<Void> subscribeAsync(String topicName) {
+ public CompletableFuture<Void> subscribeAsync(String topicName, boolean createTopicIfDoesNotExist) {
if (!topicNameValid(topicName)) {
return FutureUtil.failedFuture(
new PulsarClientException.AlreadyClosedException("Topic name not valid"));
@@ -675,12 +677,13 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
CompletableFuture<Void> subscribeResult = new CompletableFuture<>();
client.getPartitionedTopicMetadata(topicName)
- .thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, topicName, metadata.partitions))
- .exceptionally(ex1 -> {
- log.warn("[{}] Failed to get partitioned topic metadata: {}", topicName, ex1.getMessage());
- subscribeResult.completeExceptionally(ex1);
- return null;
- });
+ .thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, topicName, metadata.partitions,
+ createTopicIfDoesNotExist))
+ .exceptionally(ex1 -> {
+ log.warn("[{}] Failed to get partitioned topic metadata: {}", topicName, ex1.getMessage());
+ subscribeResult.completeExceptionally(ex1);
+ return null;
+ });
return subscribeResult;
}
@@ -702,7 +705,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
CompletableFuture<Consumer> future = new CompletableFuture<>();
MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, topicName, cloneConf, listenerExecutor,
- future, schema, interceptors);
+ future, schema, interceptors, true /* createTopicIfDoesNotExist */);
future.thenCompose(c -> ((MultiTopicsConsumerImpl)c).subscribeAsync(topicName, numPartitions))
.thenRun(()-> subscribeFuture.complete(consumer))
@@ -728,22 +731,24 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
}
CompletableFuture<Void> subscribeResult = new CompletableFuture<>();
- subscribeTopicPartitions(subscribeResult, topicName, numberPartitions);
+ subscribeTopicPartitions(subscribeResult, topicName, numberPartitions, true /* createTopicIfDoesNotExist */);
return subscribeResult;
}
- private void subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int numPartitions) {
+ private void subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int numPartitions,
+ boolean createIfDoesNotExist) {
client.preProcessSchemaBeforeSubscribe(client, schema, topicName).whenComplete((ignored, cause) -> {
if (null == cause) {
- doSubscribeTopicPartitions(subscribeResult, topicName, numPartitions);
+ doSubscribeTopicPartitions(subscribeResult, topicName, numPartitions, createIfDoesNotExist);
} else {
subscribeResult.completeExceptionally(cause);
}
});
}
- private void doSubscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int numPartitions) {
+ private void doSubscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int numPartitions,
+ boolean createIfDoesNotExist) {
if (log.isDebugEnabled()) {
log.debug("Subscribe to topic {} metadata.partitions: {}", topicName, numPartitions);
}
@@ -767,8 +772,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, partitionName,
configurationData, client.externalExecutorProvider().getExecutor(),
partitionIndex, true, subFuture,
- SubscriptionMode.Durable, null, schema, interceptors
- );
+ SubscriptionMode.Durable, null, schema, interceptors,
+ createIfDoesNotExist);
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
return subFuture;
})
@@ -780,8 +785,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
client.externalExecutorProvider().getExecutor(), -1, true, subFuture, SubscriptionMode.Durable, null,
- schema, interceptors
- );
+ schema, interceptors,
+ createIfDoesNotExist);
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
futureList = Collections.singletonList(subFuture);
@@ -913,6 +918,59 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
return unsubscribeFuture;
}
+ // Remove a consumer for a topic
+ public CompletableFuture<Void> removeConsumerAsync(String topicName) {
+ checkArgument(TopicName.isValid(topicName), "Invalid topic name:" + topicName);
+
+ if (getState() == State.Closing || getState() == State.Closed) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.AlreadyClosedException("Topics Consumer was already closed"));
+ }
+
+ CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
+ String topicPartName = TopicName.get(topicName).getPartitionedTopicName();
+
+
+ List<ConsumerImpl<T>> consumersToClose = consumers.values().stream()
+ .filter(consumer -> {
+ String consumerTopicName = consumer.getTopic();
+ if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicPartName)) {
+ return true;
+ } else {
+ return false;
+ }
+ }).collect(Collectors.toList());
+
+ List<CompletableFuture<Void>> futureList = consumersToClose.stream()
+ .map(ConsumerImpl::closeAsync).collect(Collectors.toList());
+
+ FutureUtil.waitForAll(futureList)
+ .whenComplete((r, ex) -> {
+ if (ex == null) {
+ consumersToClose.forEach(consumer1 -> {
+ consumers.remove(consumer1.getTopic());
+ pausedConsumers.remove(consumer1);
+ allTopicPartitionsNumber.decrementAndGet();
+ });
+
+ topics.remove(topicName);
+ ((UnAckedTopicMessageTracker) unAckedMessageTracker).removeTopicMessages(topicName);
+
+ unsubscribeFuture.complete(null);
+ log.info("[{}] [{}] [{}] Removed Topics Consumer, allTopicPartitionsNumber: {}",
+ topicName, subscription, consumerName, allTopicPartitionsNumber);
+ } else {
+ unsubscribeFuture.completeExceptionally(ex);
+ setState(State.Failed);
+ log.error("[{}] [{}] [{}] Could not remove Topics Consumer",
+ topicName, subscription, consumerName, ex.getCause());
+ }
+ });
+
+ return unsubscribeFuture;
+ }
+
+
// get topics name
public List<String> getTopics() {
return topics.keySet().stream().collect(Collectors.toList());
@@ -997,8 +1055,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(
client, partitionName, configurationData,
client.externalExecutorProvider().getExecutor(),
- partitionIndex, true, subFuture, SubscriptionMode.Durable, null, schema, interceptors
- );
+ partitionIndex, true, subFuture, SubscriptionMode.Durable, null, schema, interceptors,
+ true /* createTopicIfDoesNotExist */);
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
if (log.isDebugEnabled()) {
log.debug("[{}] create consumer {} for partitionName: {}",
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index 47e3236..0941a58 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -54,7 +54,8 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
ExecutorService listenerExecutor,
CompletableFuture<Consumer<T>> subscribeFuture,
Schema<T> schema, Mode subscriptionMode, ConsumerInterceptors<T> interceptors) {
- super(client, conf, listenerExecutor, subscribeFuture, schema, interceptors);
+ super(client, conf, listenerExecutor, subscribeFuture, schema, interceptors,
+ false /* createTopicIfDoesNotExist */);
this.topicsPattern = topicsPattern;
this.subscriptionMode = subscriptionMode;
@@ -129,7 +130,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
}
List<CompletableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(topics.size());
- removedTopics.stream().forEach(topic -> futures.add(unsubscribeAsync(topic)));
+ removedTopics.stream().forEach(topic -> futures.add(removeConsumerAsync(topic)));
FutureUtil.waitForAll(futures)
.thenAccept(finalFuture -> removeFuture.complete(null))
.exceptionally(ex -> {
@@ -150,7 +151,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
}
List<CompletableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(topics.size());
- addedTopics.stream().forEach(topic -> futures.add(subscribeAsync(topic)));
+ addedTopics.stream().forEach(topic -> futures.add(subscribeAsync(topic, false /* createTopicIfDoesNotExist */)));
FutureUtil.waitForAll(futures)
.thenAccept(finalFuture -> addFuture.complete(null))
.exceptionally(ex -> {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index ea23f46..0728c66 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -350,8 +350,8 @@ public class PulsarClientImpl implements PulsarClient {
} else {
int partitionIndex = TopicName.getPartitionIndex(topic);
consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, partitionIndex, false,
- consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors
- );
+ consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors,
+ true /* createTopicIfDoesNotExist */);
}
synchronized (consumers) {
@@ -370,7 +370,8 @@ public class PulsarClientImpl implements PulsarClient {
CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();
ConsumerBase<T> consumer = new MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
- externalExecutorProvider.getExecutor(), consumerSubscribedFuture, schema, interceptors);
+ externalExecutorProvider.getExecutor(), consumerSubscribedFuture, schema, interceptors,
+ true /* createTopicIfDoesNotExist */);
synchronized (consumers) {
consumers.put(consumer, Boolean.TRUE);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index fd9db1a..9788d84 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -83,10 +83,8 @@ public class ReaderImpl<T> implements Reader<T> {
final int partitionIdx = TopicName.getPartitionIndex(readerConfiguration.getTopicName());
consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor,
- partitionIdx, false, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, null
-
- );
-
+ partitionIdx, false, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, null,
+ true /* createTopicIfDoesNotExist */);
}
@Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index 3b0b257..3d669e7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -49,17 +49,10 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema,
- ConsumerInterceptors<T> interceptors) {
- this(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture, subscriptionMode, startMessageId,
- schema, interceptors, Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
- }
-
- public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
- ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
- SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema,
- ConsumerInterceptors<T> interceptors, long backoffIntervalNanos, long maxBackoffIntervalNanos) {
+ ConsumerInterceptors<T> interceptors,
+ boolean createTopicIfDoesNotExist) {
super(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture, subscriptionMode, startMessageId,
- schema, interceptors);
+ schema, interceptors, createTopicIfDoesNotExist);
}
@Override
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 34112e5..6a96215 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -60,8 +60,8 @@ public class ConsumerImplTest {
consumerConf.setSubscriptionName("test-sub");
consumer = ConsumerImpl.newConsumerImpl(client, topic, consumerConf,
- executorService, -1, false, subscribeFuture, SubscriptionMode.Durable, null, null, null
- );
+ executorService, -1, false, subscribeFuture, SubscriptionMode.Durable, null, null, null,
+ true);
}
@Test(invocationTimeOut = 1000)
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 816dc85..bc7c264 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -57,7 +57,7 @@ public class MultiTopicsConsumerImplTest {
MultiTopicsConsumerImpl impl = new MultiTopicsConsumerImpl(
clientImpl, consumerConfData,
- listenerExecutor, null, null, null);
+ listenerExecutor, null, null, null, true);
impl.getStats();
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index d6a5bc6..5bd50c0 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -8804,6 +8804,10 @@ public final class PulsarApi {
// optional bool replicate_subscription_state = 14;
boolean hasReplicateSubscriptionState();
boolean getReplicateSubscriptionState();
+
+ // optional bool force_topic_creation = 15 [default = true];
+ boolean hasForceTopicCreation();
+ boolean getForceTopicCreation();
}
public static final class CommandSubscribe extends
org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -9145,6 +9149,16 @@ public final class PulsarApi {
return replicateSubscriptionState_;
}
+ // optional bool force_topic_creation = 15 [default = true];
+ public static final int FORCE_TOPIC_CREATION_FIELD_NUMBER = 15;
+ private boolean forceTopicCreation_;
+ public boolean hasForceTopicCreation() {
+ return ((bitField0_ & 0x00002000) == 0x00002000);
+ }
+ public boolean getForceTopicCreation() {
+ return forceTopicCreation_;
+ }
+
private void initFields() {
topic_ = "";
subscription_ = "";
@@ -9160,6 +9174,7 @@ public final class PulsarApi {
schema_ = org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
initialPosition_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition.Latest;
replicateSubscriptionState_ = false;
+ forceTopicCreation_ = true;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -9258,6 +9273,9 @@ public final class PulsarApi {
if (((bitField0_ & 0x00001000) == 0x00001000)) {
output.writeBool(14, replicateSubscriptionState_);
}
+ if (((bitField0_ & 0x00002000) == 0x00002000)) {
+ output.writeBool(15, forceTopicCreation_);
+ }
}
private int memoizedSerializedSize = -1;
@@ -9322,6 +9340,10 @@ public final class PulsarApi {
size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
.computeBoolSize(14, replicateSubscriptionState_);
}
+ if (((bitField0_ & 0x00002000) == 0x00002000)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeBoolSize(15, forceTopicCreation_);
+ }
memoizedSerializedSize = size;
return size;
}
@@ -9463,6 +9485,8 @@ public final class PulsarApi {
bitField0_ = (bitField0_ & ~0x00001000);
replicateSubscriptionState_ = false;
bitField0_ = (bitField0_ & ~0x00002000);
+ forceTopicCreation_ = true;
+ bitField0_ = (bitField0_ & ~0x00004000);
return this;
}
@@ -9553,6 +9577,10 @@ public final class PulsarApi {
to_bitField0_ |= 0x00001000;
}
result.replicateSubscriptionState_ = replicateSubscriptionState_;
+ if (((from_bitField0_ & 0x00004000) == 0x00004000)) {
+ to_bitField0_ |= 0x00002000;
+ }
+ result.forceTopicCreation_ = forceTopicCreation_;
result.bitField0_ = to_bitField0_;
return result;
}
@@ -9608,6 +9636,9 @@ public final class PulsarApi {
if (other.hasReplicateSubscriptionState()) {
setReplicateSubscriptionState(other.getReplicateSubscriptionState());
}
+ if (other.hasForceTopicCreation()) {
+ setForceTopicCreation(other.getForceTopicCreation());
+ }
return this;
}
@@ -9764,6 +9795,11 @@ public final class PulsarApi {
replicateSubscriptionState_ = input.readBool();
break;
}
+ case 120: {
+ bitField0_ |= 0x00004000;
+ forceTopicCreation_ = input.readBool();
+ break;
+ }
}
}
}
@@ -10227,6 +10263,27 @@ public final class PulsarApi {
return this;
}
+ // optional bool force_topic_creation = 15 [default = true];
+ private boolean forceTopicCreation_ = true;
+ public boolean hasForceTopicCreation() {
+ return ((bitField0_ & 0x00004000) == 0x00004000);
+ }
+ public boolean getForceTopicCreation() {
+ return forceTopicCreation_;
+ }
+ public Builder setForceTopicCreation(boolean value) {
+ bitField0_ |= 0x00004000;
+ forceTopicCreation_ = value;
+
+ return this;
+ }
+ public Builder clearForceTopicCreation() {
+ bitField0_ = (bitField0_ & ~0x00004000);
+ forceTopicCreation_ = true;
+
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:pulsar.proto.CommandSubscribe)
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 2262225..e3ebde4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -467,13 +467,15 @@ public class Commands {
SubType subType, int priorityLevel, String consumerName) {
return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName,
true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false,
- false /* isReplicated */, InitialPosition.Earliest, null);
+ false /* isReplicated */, InitialPosition.Earliest, null,
+ true /* createTopicIfDoesNotExist */);
}
public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId,
Map<String, String> metadata, boolean readCompacted, boolean isReplicated,
- InitialPosition subscriptionInitialPosition, SchemaInfo schemaInfo) {
+ InitialPosition subscriptionInitialPosition, SchemaInfo schemaInfo,
+ boolean createTopicIfDoesNotExist) {
CommandSubscribe.Builder subscribeBuilder = CommandSubscribe.newBuilder();
subscribeBuilder.setTopic(topic);
subscribeBuilder.setSubscription(subscription);
@@ -486,6 +488,8 @@ public class Commands {
subscribeBuilder.setReadCompacted(readCompacted);
subscribeBuilder.setInitialPosition(subscriptionInitialPosition);
subscribeBuilder.setReplicateSubscriptionState(isReplicated);
+ subscribeBuilder.setForceTopicCreation(createTopicIfDoesNotExist);
+
if (startMessageId != null) {
subscribeBuilder.setStartMessageId(startMessageId);
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 9cb510d..f4bcec1 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -268,7 +268,7 @@ message CommandSubscribe {
// Signal wether the subscription should be backed by a
// durable cursor or not
optional bool durable = 8 [default = true];
-
+
// If specified, the subscription will position the cursor
// markd-delete position on the particular message id and
// will send messages from that point
@@ -292,6 +292,13 @@ message CommandSubscribe {
// to periodically sync the state of replicated subscriptions
// across different clusters (when using geo-replication).
optional bool replicate_subscription_state = 14;
+
+ // If true, the subscribe operation will cause a topic to be
+ // created if it does not exist already (and if topic auto-creation
+ // is allowed by broker.
+ // If false, the subscribe operation will fail if the topic
+ // does not exist.
+ optional bool force_topic_creation = 15 [default = true];
}
message CommandPartitionedTopicMetadata {