You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/09/17 14:01:17 UTC
[pulsar] branch master updated: Add more config for
auto-topic-creation (#4963)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 547c421 Add more config for auto-topic-creation (#4963)
547c421 is described below
commit 547c4218124a6055a468152304d539d587071e2d
Author: Xiaobing Fang <bi...@qq.com>
AuthorDate: Tue Sep 17 09:01:12 2019 -0500
Add more config for auto-topic-creation (#4963)
Master Issue: #4926
### Motivation
Curently the partitioned-topic and non-partitioned topic is a little confuse for users. in PR #3450 we add config for auto-topic-creation.
We could leverage this config to provide some more config for auto-topic-creation.
### Modifications
- Add `allowAutoTopicCreationType` and `allowAutoTopicCreationNumPartitions` to configuration.
- Users can use both configurations when they decide to create a topic automatically.
- Add test.
- Update doc.
---
conf/broker.conf | 6 ++
conf/standalone.conf | 9 ++
.../apache/pulsar/broker/ServiceConfiguration.java | 13 ++-
.../apache/pulsar/broker/admin/AdminResource.java | 116 +++++++++++++++++++-
.../pulsar/broker/admin/impl/BrokersBase.java | 7 ++
.../broker/admin/impl/PersistentTopicsBase.java | 52 +++++----
.../broker/admin/v1/NonPersistentTopics.java | 18 +++-
.../pulsar/broker/admin/v1/PersistentTopics.java | 5 +-
.../broker/admin/v2/NonPersistentTopics.java | 19 +++-
.../pulsar/broker/admin/v2/PersistentTopics.java | 6 +-
.../pulsar/broker/admin/PersistentTopicsTest.java | 6 +-
.../broker/auth/MockedPulsarServiceBaseTest.java | 1 +
.../broker/service/BacklogQuotaManagerTest.java | 1 +
.../broker/service/BrokerBkEnsemblesTests.java | 1 +
.../broker/service/BrokerBookieIsolationTest.java | 4 +
.../BrokerServiceAutoTopicCreationTest.java | 120 +++++++++++++++++++++
.../pulsar/broker/service/ReplicatorTestBase.java | 3 +
.../pulsar/client/api/NonPersistentTopicTest.java | 3 +
.../client/api/SimpleProducerConsumerTest.java | 4 +-
.../client/api/v1/V1_ProducerConsumerTest.java | 4 +-
.../worker/PulsarFunctionE2ESecurityTest.java | 3 +-
.../worker/PulsarFunctionLocalRunTest.java | 3 +-
.../worker/PulsarFunctionPublishTest.java | 3 +-
.../functions/worker/PulsarFunctionStateTest.java | 3 +-
.../worker/PulsarWorkerAssignmentTest.java | 2 +-
.../apache/pulsar/io/PulsarFunctionAdminTest.java | 2 +-
.../apache/pulsar/io/PulsarFunctionE2ETest.java | 3 +-
.../configurations/pulsar_broker_test.conf | 3 +
pulsar-client-cpp/test-conf/standalone-ssl.conf | 9 ++
pulsar-client-cpp/test-conf/standalone.conf | 9 ++
pulsar-client-cpp/tests/standalone.conf | 9 ++
.../pulsar/client/impl/HttpLookupService.java | 3 +-
.../pulsar/functions/worker/WorkerService.java | 3 +
site2/docs/reference-configuration.md | 2 +
.../pulsar/tests/integration/cli/CLITest.java | 6 ++
.../integration/functions/PulsarFunctionsTest.java | 27 ++++-
36 files changed, 442 insertions(+), 46 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index b491336..dcd39f3 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -88,6 +88,12 @@ ttlDurationDefaultInSeconds=0
# Enable topic auto creation if new producer or consumer connected (disable auto creation with value false)
allowAutoTopicCreation=true
+# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
+allowAutoTopicCreationType=partitioned
+
+# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned.
+defaultNumPartitions=1
+
# Enable the deletion of inactive topics
brokerDeleteInactiveTopicsEnabled=true
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 388c185..b25f21f 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -592,3 +592,12 @@ allowLoopback=true
# will heart performance. It is better to give a higher number of gc
# interval if there is enough disk capacity.
gcWaitTime=300000
+
+# Enable topic auto creation if new producer or consumer connected (disable auto creation with value false)
+allowAutoTopicCreation=true
+
+# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
+allowAutoTopicCreationType=non-partitioned
+
+# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned.
+defaultNumPartitions=1
\ No newline at end of file
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index bff2b46..ffea06d 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -895,10 +895,21 @@ public class ServiceConfiguration implements PulsarConfiguration {
private double managedLedgerDefaultMarkDeleteRateLimit = 1.0;
@FieldContext(
category = CATEGORY_STORAGE_ML,
- doc = "Allow automated creation of non-partition topics if set to true (default value)."
+ doc = "Allow automated creation of topics if set to true (default value)."
)
private boolean allowAutoTopicCreation = true;
@FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)"
+ )
+ private String allowAutoTopicCreationType = "partitioned";
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "The number of partitioned topics that is allowed to be automatically created"
+ + "if allowAutoTopicCreationType is partitioned."
+ )
+ private int defaultNumPartitions = 1;
+ @FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Number of threads to be used for managed ledger tasks dispatching"
)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 94409e1..ed497e8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -18,8 +18,10 @@
*/
package org.apache.pulsar.broker.admin;
+import com.fasterxml.jackson.core.JsonProcessingException;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import org.apache.pulsar.common.api.proto.PulsarApi;
import static org.apache.pulsar.common.util.Codec.decode;
import java.net.MalformedURLException;
@@ -81,6 +83,7 @@ import com.google.common.collect.Lists;
public abstract class AdminResource extends PulsarWebResource {
private static final Logger log = LoggerFactory.getLogger(AdminResource.class);
private static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly";
+ private static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000;
public static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics";
protected ZooKeeper globalZk() {
@@ -500,7 +503,7 @@ public abstract class AdminResource extends PulsarWebResource {
}
protected PartitionedTopicMetadata getPartitionedTopicMetadata(TopicName topicName,
- boolean authoritative) {
+ boolean authoritative, boolean checkAllowAutoCreation) {
validateClusterOwnership(topicName.getCluster());
// validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
@@ -519,7 +522,12 @@ public abstract class AdminResource extends PulsarWebResource {
}
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(), topicName.getEncodedLocalName());
- PartitionedTopicMetadata partitionMetadata = fetchPartitionedTopicMetadata(pulsar(), path);
+ PartitionedTopicMetadata partitionMetadata;
+ if (checkAllowAutoCreation) {
+ partitionMetadata = fetchPartitionedTopicMetadataCheckAllowAutoCreation(pulsar(), path, topicName);
+ } else {
+ partitionMetadata = fetchPartitionedTopicMetadata(pulsar(), path);
+ }
if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId(), topicName,
@@ -566,6 +574,96 @@ public abstract class AdminResource extends PulsarWebResource {
return metadataFuture;
}
+ protected static PartitionedTopicMetadata fetchPartitionedTopicMetadataCheckAllowAutoCreation(
+ PulsarService pulsar, String path, TopicName topicName) {
+ try {
+ return fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(pulsar, path, topicName)
+ .get();
+ } catch (Exception e) {
+ if (e.getCause() instanceof RestException) {
+ throw (RestException) e;
+ }
+ throw new RestException(e);
+ }
+ }
+
+ protected static CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(
+ PulsarService pulsar, String path, TopicName topicName) {
+ CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
+ try {
+ boolean allowAutoTopicCreation = pulsar.getConfiguration().isAllowAutoTopicCreation();
+ String topicType = pulsar.getConfiguration().getAllowAutoTopicCreationType();
+ boolean topicExist;
+ try {
+ topicExist = pulsar.getNamespaceService()
+ .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
+ .contains(topicName.toString());
+ } catch (Exception e) {
+ log.warn("Unexpected error while getting list of topics. topic={}. Error: {}",
+ topicName, e.getMessage(), e);
+ throw new RestException(e);
+ }
+ fetchPartitionedTopicMetadataAsync(pulsar, path).whenCompleteAsync((metadata, ex) -> {
+ if (ex != null) {
+ metadataFuture.completeExceptionally(ex);
+ // If topic is already exist, creating partitioned topic is not allowed.
+ } else if (metadata.partitions == 0 && !topicExist && allowAutoTopicCreation &&
+ TopicType.PARTITIONED.toString().equals(topicType)) {
+ createDefaultPartitionedTopicAsync(pulsar, path).whenComplete((defaultMetadata, e) -> {
+ if (e == null) {
+ metadataFuture.complete(defaultMetadata);
+ } else if (e instanceof KeeperException) {
+ try {
+ Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
+ if (!pulsar.getGlobalZkCache().exists(path)){
+ metadataFuture.completeExceptionally(e);
+ return;
+ }
+ } catch (InterruptedException | KeeperException exc) {
+ metadataFuture.completeExceptionally(exc);
+ return;
+ }
+ fetchPartitionedTopicMetadataAsync(pulsar, path).whenComplete((metadata2, ex2) -> {
+ if (ex2 != null) {
+ metadataFuture.completeExceptionally(ex2);
+ } else {
+ metadataFuture.complete(metadata2);
+ }
+ });
+ } else {
+ metadataFuture.completeExceptionally(e);
+ }
+ });
+ } else {
+ metadataFuture.complete(metadata);
+ }
+ });
+ } catch (Exception e) {
+ metadataFuture.completeExceptionally(e);
+ }
+ return metadataFuture;
+ }
+
+ protected static CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(
+ PulsarService pulsar, String path) {
+ int defaultNumPartitions = pulsar.getConfiguration().getDefaultNumPartitions();
+ checkArgument(defaultNumPartitions > 0, "Default number of partitions should be more than 0");
+ PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions);
+ CompletableFuture<PartitionedTopicMetadata> partitionedTopicFuture = new CompletableFuture<>();
+ try {
+ byte[] content = jsonMapper().writeValueAsBytes(configMetadata);
+ ZkUtils.createFullPathOptimistic(pulsar.getGlobalZkCache().getZooKeeper(), path, content,
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ // we wait for the data to be synced in all quorums and the observers
+ Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
+ partitionedTopicFuture.complete(configMetadata);
+ } catch (JsonProcessingException | KeeperException | InterruptedException e) {
+ log.error("Failed to create default partitioned topic.", e);
+ partitionedTopicFuture.completeExceptionally(e);
+ }
+ return partitionedTopicFuture;
+ }
+
protected void validateClusterExists(String cluster) {
try {
if (!clustersCache().get(path("clusters", cluster)).isPresent()) {
@@ -627,4 +725,18 @@ public abstract class AdminResource extends PulsarWebResource {
partitionedTopics.sort(null);
return partitionedTopics;
}
+
+ enum TopicType {
+ PARTITIONED("partitioned"),
+ NON_PARTITIONED("non-partitioned");
+ private String type;
+
+ TopicType(String type) {
+ this.type = type;
+ }
+
+ public String toString() {
+ return type;
+ }
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 63e392a..fc529e9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -262,6 +262,13 @@ public class BrokersBase extends AdminResource {
PulsarClient client = pulsar().getClient();
String messageStr = UUID.randomUUID().toString();
+ // create non-partitioned topic manually
+ try {
+ pulsar().getBrokerService().getTopic(topic, true).get();
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
CompletableFuture<Producer<String>> producerFuture =
client.newProducer(Schema.STRING).topic(topic).createAsync();
CompletableFuture<Reader<String>> readerFuture = client.newReader(Schema.STRING)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 9150f99..509b41d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.admin.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import org.apache.pulsar.common.api.proto.PulsarApi;
import static org.apache.pulsar.common.util.Codec.decode;
import com.github.zafarkhaja.semver.Version;
@@ -375,6 +376,18 @@ public class PersistentTopicsBase extends AdminResource {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
}
try {
+ boolean topicExist = pulsar().getNamespaceService()
+ .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
+ .contains(topicName.toString());
+ if (topicExist) {
+ log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
+ throw new RestException(Status.CONFLICT, "This topic already exists");
+ }
+ } catch (Exception e) {
+ log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
+ throw new RestException(e);
+ }
+ try {
String path = ZkAdminPaths.partitionedTopicPath(topicName);
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
zkCreateOptimistic(path, data);
@@ -444,8 +457,8 @@ public class PersistentTopicsBase extends AdminResource {
}
}
- protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean authoritative) {
- PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName, authoritative);
+ protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean authoritative, boolean checkAllowAutoCreation) {
+ PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName, authoritative, checkAllowAutoCreation);
if (metadata.partitions > 1) {
validateClientVersion();
}
@@ -457,7 +470,7 @@ public class PersistentTopicsBase extends AdminResource {
final CompletableFuture<Void> future = new CompletableFuture<>();
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+ PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
final int numPartitions = partitionMetadata.partitions;
if (numPartitions > 0) {
final AtomicInteger count = new AtomicInteger(numPartitions);
@@ -590,7 +603,7 @@ public class PersistentTopicsBase extends AdminResource {
final List<String> subscriptions = Lists.newArrayList();
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+ PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions > 0) {
try {
// get the subscriptions only from the 1st partition since all the other partitions will have the same
@@ -685,7 +698,7 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative,
boolean perPartition) {
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+ PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions == 0) {
throw new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
}
@@ -743,7 +756,7 @@ public class PersistentTopicsBase extends AdminResource {
}
protected void internalGetPartitionedStatsInternal(AsyncResponse asyncResponse, boolean authoritative) {
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+ PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions == 0) {
throw new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
}
@@ -786,7 +799,7 @@ public class PersistentTopicsBase extends AdminResource {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+ PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
@@ -855,7 +868,7 @@ public class PersistentTopicsBase extends AdminResource {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+ PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
@@ -920,7 +933,7 @@ public class PersistentTopicsBase extends AdminResource {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+ PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Skip messages on a partitioned topic is not allowed");
}
@@ -952,7 +965,7 @@ public class PersistentTopicsBase extends AdminResource {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+ PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
@@ -1027,7 +1040,7 @@ public class PersistentTopicsBase extends AdminResource {
validateGlobalNamespaceOwnership(namespaceName);
}
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+ PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
final int numPartitions = partitionMetadata.partitions;
if (numPartitions > 0) {
final CompletableFuture<Void> future = new CompletableFuture<>();
@@ -1141,7 +1154,7 @@ public class PersistentTopicsBase extends AdminResource {
log.info("[{}][{}] Creating subscription {} at message id {}", clientAppId(), topicName, subscriptionName,
targetMessageId);
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+ PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
final int numPartitions = partitionMetadata.partitions;
if (numPartitions > 0) {
final CompletableFuture<Void> future = new CompletableFuture<>();
@@ -1249,7 +1262,7 @@ public class PersistentTopicsBase extends AdminResource {
log.info("[{}][{}] received reset cursor on subscription {} to position {}", clientAppId(), topicName,
subName, messageId);
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+ PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions > 0) {
log.warn("[{}] Not supported operation on partitioned-topic {} {}", clientAppId(), topicName,
@@ -1288,7 +1301,7 @@ public class PersistentTopicsBase extends AdminResource {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+ PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed");
}
@@ -1413,7 +1426,7 @@ public class PersistentTopicsBase extends AdminResource {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+ PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Termination of a partitioned topic is not allowed");
}
@@ -1433,7 +1446,7 @@ public class PersistentTopicsBase extends AdminResource {
validateGlobalNamespaceOwnership(namespaceName);
}
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+ PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
@@ -1489,7 +1502,7 @@ public class PersistentTopicsBase extends AdminResource {
validateGlobalNamespaceOwnership(namespaceName);
}
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+ PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions > 0) {
String msg = "This method should not be called for partitioned topic";
log.error("[{}] {} {} {}", clientAppId(), msg, topicName, subName);
@@ -1595,7 +1608,8 @@ public class PersistentTopicsBase extends AdminResource {
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
// producer/consumer
checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject())
- .thenCompose(res -> fetchPartitionedTopicMetadataAsync(pulsar, path)).thenAccept(metadata -> {
+ .thenCompose(res -> fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(pulsar, path, topicName))
+ .thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
metadata.partitions);
@@ -1632,7 +1646,7 @@ public class PersistentTopicsBase extends AdminResource {
}
PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(
- TopicName.get(topicName.getPartitionedTopicName()), false);
+ TopicName.get(topicName.getPartitionedTopicName()), false, false);
if (partitionedTopicMetadata == null || partitionedTopicMetadata.partitions == 0) {
final String topicErrorType = partitionedTopicMetadata == null ?
"has no metadata" : "has zero partitions";
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index f1347e3..96bc084 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
@@ -76,9 +77,10 @@ public class NonPersistentTopics extends PersistentTopics {
public PartitionedTopicMetadata getPartitionedMetadata(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
- @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+ @QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
validateTopicName(property, cluster, namespace, encodedTopic);
- return getPartitionedTopicMetadata(topicName, authoritative);
+ return getPartitionedTopicMetadata(topicName, authoritative, checkAllowAutoCreation);
}
@GET
@@ -125,6 +127,18 @@ public class NonPersistentTopics extends PersistentTopics {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
}
try {
+ boolean topicExist = pulsar().getNamespaceService()
+ .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
+ .contains(topicName.toString());
+ if (topicExist) {
+ log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
+ throw new RestException(Status.CONFLICT, "This topic already exists");
+ }
+ } catch (Exception e) {
+ log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
+ throw new RestException(e);
+ }
+ try {
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(),
topicName.getEncodedLocalName());
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index f895ce1..ebece5e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -181,9 +181,10 @@ public class PersistentTopics extends PersistentTopicsBase {
public PartitionedTopicMetadata getPartitionedMetadata(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
- @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+ @QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
validateTopicName(property, cluster, namespace, encodedTopic);
- return internalGetPartitionedMetadata(authoritative);
+ return internalGetPartitionedMetadata(authoritative, checkAllowAutoCreation);
}
@DELETE
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 8125f5b..10dc5ee 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -86,9 +87,11 @@ public class NonPersistentTopics extends PersistentTopics {
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
- @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+ @ApiParam(value = "Is check configuration required to automatically create topic")
+ @QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
validateTopicName(tenant, namespace, encodedTopic);
- return getPartitionedTopicMetadata(topicName, authoritative);
+ return getPartitionedTopicMetadata(topicName, authoritative, checkAllowAutoCreation);
}
@GET
@@ -169,6 +172,18 @@ public class NonPersistentTopics extends PersistentTopics {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
}
try {
+ boolean topicExist = pulsar().getNamespaceService()
+ .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
+ .contains(topicName.toString());
+ if (topicExist) {
+ log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
+ throw new RestException(Status.CONFLICT, "This topic already exists");
+ }
+ } catch (Exception e) {
+ log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
+ throw new RestException(e);
+ }
+ try {
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(),
topicName.getEncodedLocalName());
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index e09c012..70e0624 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -288,9 +288,11 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
- @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+ @ApiParam(value = "Is check configuration required to automatically create topic")
+ @QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
validateTopicName(tenant, namespace, encodedTopic);
- return internalGetPartitionedMetadata(authoritative);
+ return internalGetPartitionedMetadata(authoritative, checkAllowAutoCreation);
}
@DELETE
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index fc89acc..3d8a29a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -211,8 +211,8 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
final String nonPartitionTopic2 = "secondary-non-partitioned-topic";
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, nonPartitionTopic2, true);
- Assert.assertEquals(
- persistentTopics.getPartitionedMetadata(testTenant, testNamespace, nonPartitionTopic, true) .partitions,
+ Assert.assertEquals(persistentTopics
+ .getPartitionedMetadata(testTenant, testNamespace, nonPartitionTopic, true, false).partitions,
0);
}
@@ -221,7 +221,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
final String topicName = "standard-topic";
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true);
PartitionedTopicMetadata pMetadata = persistentTopics.getPartitionedMetadata(
- testTenant, testNamespace, topicName, true);
+ testTenant, testNamespace, topicName, true, false);
Assert.assertEquals(pMetadata.partitions, 0);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 341b96a..868d591 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -104,6 +104,7 @@ public abstract class MockedPulsarServiceBaseTest {
this.conf.setDefaultNumberOfNamespaceBundles(1);
this.conf.setZookeeperServers("localhost:2181");
this.conf.setConfigurationStoreServers("localhost:3181");
+ this.conf.setAllowAutoTopicCreationType("non-persistent");
}
protected final void internalSetup() throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index c4a387c..88a6603 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -88,6 +88,7 @@ public class BacklogQuotaManagerTest {
config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
config.setManagedLedgerMaxEntriesPerLedger(5);
config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
+ config.setAllowAutoTopicCreationType("non-partitioned");
pulsar = new PulsarService(config);
pulsar.start();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 940002d..f880734 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -104,6 +104,7 @@ public class BrokerBkEnsemblesTests {
config.setManagedLedgerMaxEntriesPerLedger(5);
config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
config.setAdvertisedAddress("127.0.0.1");
+ config.setAllowAutoTopicCreationType("non-partitioned");
pulsar = new PulsarService(config);
pulsar.start();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index 3b9edc9..68f72b9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -158,6 +158,8 @@ public class BrokerBookieIsolationTest {
config.setManagedLedgerDefaultWriteQuorum(2);
config.setManagedLedgerDefaultAckQuorum(2);
+ config.setAllowAutoTopicCreationType("non-partitioned");
+
int totalEntriesPerLedger = 20;
int totalLedgers = totalPublish / totalEntriesPerLedger;
config.setManagedLedgerMaxEntriesPerLedger(totalEntriesPerLedger);
@@ -288,6 +290,7 @@ public class BrokerBookieIsolationTest {
config.setManagedLedgerDefaultEnsembleSize(2);
config.setManagedLedgerDefaultWriteQuorum(2);
config.setManagedLedgerDefaultAckQuorum(2);
+ config.setAllowAutoTopicCreationType("non-partitioned");
int totalEntriesPerLedger = 20;
int totalLedgers = totalPublish / totalEntriesPerLedger;
@@ -410,6 +413,7 @@ public class BrokerBookieIsolationTest {
config.setManagedLedgerDefaultEnsembleSize(2);
config.setManagedLedgerDefaultWriteQuorum(2);
config.setManagedLedgerDefaultAckQuorum(2);
+ config.setAllowAutoTopicCreationType("non-partitioned");
config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
pulsarService = new PulsarService(config);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
new file mode 100644
index 0000000..cd1aec9
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.pulsar.client.api.PulsarClientException;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.baseSetup();
+ }
+
+ @AfterClass
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testAutoNonPartitionedTopicCreation() throws Exception{
+ pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+ pulsar.getConfiguration().setAllowAutoTopicCreationType("non-partitioned");
+
+ final String topicName = "persistent://prop/ns-abc/non-partitioned-topic";
+ final String subscriptionName = "non-partitioned-topic-sub";
+ pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+
+ assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
+ assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicName));
+ }
+
+ @Test
+ public void testAutoPartitionedTopicCreation() throws Exception{
+ pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+ pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
+ pulsar.getConfiguration().setDefaultNumPartitions(3);
+
+ final String topicName = "persistent://prop/ns-abc/partitioned-topic";
+ final String subscriptionName = "partitioned-topic-sub";
+ pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+
+ assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicName));
+ for (int i = 0; i < 3; i++) {
+ assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicName + "-partition-" + i));
+ }
+ }
+
+ @Test
+ public void testAutoTopicCreationDisable() throws Exception{
+ pulsar.getConfiguration().setAllowAutoTopicCreation(false);
+
+ final String topicName = "persistent://prop/ns-abc/test-topic";
+ final String subscriptionName = "test-topic-sub";
+ try {
+ pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+ } catch (Exception e) {
+ assertTrue(e instanceof PulsarClientException);
+ }
+ assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
+ }
+
+ @Test
+ public void testAutoTopicCreationDisableIfNonPartitionedTopicAlreadyExist() throws Exception{
+ pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+ pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
+ pulsar.getConfiguration().setDefaultNumPartitions(3);
+
+ final String topicName = "persistent://prop/ns-abc/test-topic-2";
+ final String subscriptionName = "partitioned-topic-sub";
+ admin.topics().createNonPartitionedTopic(topicName);
+ pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+
+ assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicName));
+ for (int i = 0; i < 3; i++) {
+ assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicName + "-partition-" + i));
+ }
+ assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
+ }
+
+ /**
+ * CheckAllowAutoCreation's default value is false.
+ * So using getPartitionedTopicMetadata() directly will not produce partitioned topic
+ * even if the option to automatically create partitioned topic is configured
+ */
+ @Test
+ public void testGetPartitionedMetadataWithoutCheckAllowAutoCreation() throws Exception{
+ pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+ pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
+ pulsar.getConfiguration().setDefaultNumPartitions(3);
+
+ final String topicName = "persistent://prop/ns-abc/test-topic-3";
+ int partitions = admin.topics().getPartitionedTopicMetadata(topicName).partitions;
+ assertEquals(partitions, 0);
+ assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index ca5ba68..5864f48 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -132,6 +132,7 @@ public class ReplicatorTestBase {
config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
config1.setDefaultNumberOfNamespaceBundles(1);
+ config1.setAllowAutoTopicCreationType("non-partitioned");
pulsar1 = new PulsarService(config1);
pulsar1.start();
ns1 = pulsar1.getBrokerService();
@@ -165,6 +166,7 @@ public class ReplicatorTestBase {
config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
config2.setDefaultNumberOfNamespaceBundles(1);
+ config2.setAllowAutoTopicCreationType("non-partitioned");
pulsar2 = new PulsarService(config2);
pulsar2.start();
ns2 = pulsar2.getBrokerService();
@@ -198,6 +200,7 @@ public class ReplicatorTestBase {
config3.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config3.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
config3.setDefaultNumberOfNamespaceBundles(1);
+ config3.setAllowAutoTopicCreationType("non-partitioned");
pulsar3 = new PulsarService(config3);
pulsar3.start();
ns3 = pulsar3.getBrokerService();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index f70e6a1..b54265a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -891,6 +891,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
config1.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort()));
config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+ config1.setAllowAutoTopicCreationType("non-partitioned");
pulsar1 = new PulsarService(config1);
pulsar1.start();
ns1 = pulsar1.getBrokerService();
@@ -917,6 +918,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
config2.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort()));
config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+ config2.setAllowAutoTopicCreationType("non-partitioned");
pulsar2 = new PulsarService(config2);
pulsar2.start();
ns2 = pulsar2.getBrokerService();
@@ -942,6 +944,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
config3.setBrokerServicePurgeInactiveFrequencyInSeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
config3.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort()));
+ config3.setAllowAutoTopicCreationType("non-partitioned");
pulsar3 = new PulsarService(config3);
pulsar3.start();
ns3 = pulsar3.getBrokerService();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 35d4c77..c2fad94 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -2309,7 +2309,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
// (1) simple consumers
Consumer<byte[]> consumer = pulsarClient.newConsumer()
- .topic("persistent://my-property/my-ns/failAsyncReceive").subscriptionName("my-subscriber-name")
+ .topic("persistent://my-property/my-ns/failAsyncReceive-1").subscriptionName("my-subscriber-name")
.subscribe();
consumer.close();
// receive messages
@@ -2322,7 +2322,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
// (2) Partitioned-consumer
int numPartitions = 4;
- TopicName topicName = TopicName.get("persistent://my-property/my-ns/failAsyncReceive");
+ TopicName topicName = TopicName.get("persistent://my-property/my-ns/failAsyncReceive-2");
admin.topics().createPartitionedTopic(topicName.toString(), numPartitions);
Consumer<byte[]> partitionedConsumer = pulsarClient.newConsumer().topic(topicName.toString())
.subscriptionName("my-partitioned-subscriber").subscribe();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index 24fb5a7..7d9a8cd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -2040,7 +2040,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
// (1) simple consumers
Consumer<byte[]> consumer = pulsarClient.newConsumer()
- .topic("persistent://my-property/use/my-ns/failAsyncReceive")
+ .topic("persistent://my-property/use/my-ns/failAsyncReceive-1")
.subscriptionName("my-subscriber-name")
.subscribe();
consumer.close();
@@ -2054,7 +2054,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
// (2) Partitioned-consumer
int numPartitions = 4;
- TopicName topicName = TopicName.get("persistent://my-property/use/my-ns/failAsyncReceive");
+ TopicName topicName = TopicName.get("persistent://my-property/use/my-ns/failAsyncReceive-2");
admin.topics().createPartitionedTopic(topicName.toString(), numPartitions);
Consumer<byte[]> partitionedConsumer = pulsarClient.newConsumer().topic(topicName.toString())
.subscriptionName("my-partitioned-subscriber")
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
index 54f05d8..3782286 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
@@ -89,7 +89,7 @@ public class PulsarFunctionE2ESecurityTest {
final String TENANT2 = "tenant2";
final String NAMESPACE = "test-ns";
- String pulsarFunctionsNamespace = TENANT + "/use/pulsar-function-admin";
+ String pulsarFunctionsNamespace = TENANT + "/pulsar-function-admin";
String primaryHost;
String workerId;
@@ -132,6 +132,7 @@ public class PulsarFunctionE2ESecurityTest {
config.setBrokerServicePort(Optional.of(brokerServicePort));
config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
config.setAdvertisedAddress("localhost");
+ config.setAllowAutoTopicCreationType("non-partitioned");
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderToken.class.getName());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index 97cabc3..8c2525e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -102,7 +102,7 @@ public class PulsarFunctionLocalRunTest {
BrokerStats brokerStatsClient;
WorkerService functionsWorkerService;
final String tenant = "external-repl-prop";
- String pulsarFunctionsNamespace = tenant + "/" + CLUSTER + "/pulsar-function-admin";
+ String pulsarFunctionsNamespace = tenant + "/pulsar-function-admin";
String primaryHost;
String workerId;
@@ -180,6 +180,7 @@ public class PulsarFunctionLocalRunTest {
"tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
config.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
config.setBrokerClientTlsEnabled(true);
+ config.setAllowAutoTopicCreationType("non-partitioned");
functionsWorkerService = createPulsarFunctionWorker(config);
urlTls = new URL(brokerServiceUrl);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
index 9c2b7b7..efa0695 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -91,7 +91,7 @@ public class PulsarFunctionPublishTest {
BrokerStats brokerStatsClient;
WorkerService functionsWorkerService;
final String tenant = "external-repl-prop";
- String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
+ String pulsarFunctionsNamespace = tenant + "/pulsar-function-admin";
String primaryHost;
String workerId;
@@ -169,6 +169,7 @@ public class PulsarFunctionPublishTest {
"tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
config.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
config.setBrokerClientTlsEnabled(true);
+ config.setAllowAutoTopicCreationType("non-partitioned");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java
index 809fca7..d0b85c3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java
@@ -90,7 +90,7 @@ public class PulsarFunctionStateTest {
BrokerStats brokerStatsClient;
WorkerService functionsWorkerService;
final String tenant = "external-repl-prop";
- String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
+ String pulsarFunctionsNamespace = tenant + "/pulsar-function-admin";
String primaryHost;
String workerId;
@@ -168,6 +168,7 @@ public class PulsarFunctionStateTest {
"tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
config.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
config.setBrokerClientTlsEnabled(true);
+ config.setAllowAutoTopicCreationType("non-partitioned");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index 55e4a70..27a7d69 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -70,7 +70,7 @@ public class PulsarWorkerAssignmentTest {
BrokerStats brokerStatsClient;
WorkerService functionsWorkerService;
final String tenant = "external-repl-prop";
- final String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
+ final String pulsarFunctionsNamespace = tenant + "/pulsar-function-admin";
String primaryHost;
String workerId;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
index ecfce1c..0c5db19 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
@@ -72,7 +72,7 @@ public class PulsarFunctionAdminTest {
WorkerServer functionsWorkerServer;
WorkerService functionsWorkerService;
final String tenant = "external-repl-prop";
- String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
+ String pulsarFunctionsNamespace = tenant + "/pulsar-function-admin";
String primaryHost;
private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 847958f..d8f9cec 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -118,7 +118,7 @@ public class PulsarFunctionE2ETest {
BrokerStats brokerStatsClient;
WorkerService functionsWorkerService;
final String tenant = "external-repl-prop";
- String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
+ String pulsarFunctionsNamespace = tenant + "/pulsar-function-admin";
String primaryHost;
String workerId;
@@ -194,6 +194,7 @@ public class PulsarFunctionE2ETest {
"tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
config.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
config.setBrokerClientTlsEnabled(true);
+ config.setAllowAutoTopicCreationType("non-partitioned");
diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
index c55955f..75b6d03 100644
--- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
+++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
@@ -33,6 +33,9 @@ backlogQuotaCheckIntervalInSeconds=60
backlogQuotaDefaultLimitGB=50
brokerDeleteInactiveTopicsEnabled=true
brokerDeleteInactiveTopicsFrequencySeconds=60
+allowAutoTopicCreation=true
+allowAutoTopicCreationType=non-partitioned
+defaultNumPartitions=1
messageExpiryCheckIntervalInMinutes=5
clientLibraryVersionCheckEnabled=false
clientLibraryVersionCheckAllowUnversioned=true
diff --git a/pulsar-client-cpp/test-conf/standalone-ssl.conf b/pulsar-client-cpp/test-conf/standalone-ssl.conf
index 426ba43..6ab4406 100644
--- a/pulsar-client-cpp/test-conf/standalone-ssl.conf
+++ b/pulsar-client-cpp/test-conf/standalone-ssl.conf
@@ -278,3 +278,12 @@ keepAliveIntervalSeconds=30
# How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one connected)
brokerServicePurgeInactiveFrequencyInSeconds=60
+
+# Enable topic auto creation if new producer or consumer connected (disable auto creation with value false)
+allowAutoTopicCreation=true
+
+# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
+allowAutoTopicCreationType=non-partitioned
+
+# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned.
+defaultNumPartitions=1
\ No newline at end of file
diff --git a/pulsar-client-cpp/test-conf/standalone.conf b/pulsar-client-cpp/test-conf/standalone.conf
index 6f799c1..2de6a37 100644
--- a/pulsar-client-cpp/test-conf/standalone.conf
+++ b/pulsar-client-cpp/test-conf/standalone.conf
@@ -263,3 +263,12 @@ keepAliveIntervalSeconds=30
# How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one connected)
brokerServicePurgeInactiveFrequencyInSeconds=60
+
+# Enable topic auto creation if new producer or consumer connected (disable auto creation with value false)
+allowAutoTopicCreation=true
+
+# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
+allowAutoTopicCreationType=non-partitioned
+
+# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned.
+defaultNumPartitions=1
\ No newline at end of file
diff --git a/pulsar-client-cpp/tests/standalone.conf b/pulsar-client-cpp/tests/standalone.conf
index 8a01642..857285a 100644
--- a/pulsar-client-cpp/tests/standalone.conf
+++ b/pulsar-client-cpp/tests/standalone.conf
@@ -266,3 +266,12 @@ keepAliveIntervalSeconds=30
# How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one connected)
brokerServicePurgeInactiveFrequencyInSeconds=60
+
+# Enable topic auto creation if new producer or consumer connected (disable auto creation with value false)
+allowAutoTopicCreation=true
+
+# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
+allowAutoTopicCreationType=non-partitioned
+
+# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned.
+defaultNumPartitions=1
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 4fe030d..e5334df 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -103,7 +103,8 @@ class HttpLookupService implements LookupService {
public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName) {
String format = topicName.isV2() ? "admin/v2/%s/partitions" : "admin/%s/partitions";
- return httpClient.get(String.format(format, topicName.getLookupName()), PartitionedTopicMetadata.class);
+ return httpClient.get(String.format(format, topicName.getLookupName()) + "?checkAllowAutoCreation=true",
+ PartitionedTopicMetadata.class);
}
public String getServiceUrl() {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 9ec9688..0d73915 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -148,6 +148,9 @@ public class WorkerService {
}
log.info("Created Pulsar client");
+ brokerAdmin.topics().createNonPartitionedTopic(workerConfig.getFunctionAssignmentTopic());
+ brokerAdmin.topics().createNonPartitionedTopic(workerConfig.getClusterCoordinationTopic());
+ brokerAdmin.topics().createNonPartitionedTopic(workerConfig.getFunctionMetadataTopic());
//create scheduler manager
this.schedulerManager = new SchedulerManager(this.workerConfig, this.client, this.brokerAdmin,
this.executor);
diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md
index bb549ae..5261bae 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -127,6 +127,8 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|backlogQuotaCheckIntervalInSeconds| How often to check for topics that have reached the quota |60|
|backlogQuotaDefaultLimitGB| Default per-topic backlog quota limit |10|
|allowAutoTopicCreation| Enable topic auto creation if new producer or consumer connected |true|
+|allowAutoTopicCreationType| The type of topic that is allowed to be automatically created.(partitioned/non-partitioned) |non-partitioned|
+|defaultNumPartitions| The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned |1|
|brokerDeleteInactiveTopicsEnabled| Enable the deletion of inactive topics |true|
|brokerDeleteInactiveTopicsFrequencySeconds| How often to check for inactive topics |60|
|messageExpiryCheckIntervalInMinutes| How frequently to proactively check and purge expired messages |5|
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
index 3af34d7..ca39cd4 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
@@ -93,6 +93,12 @@ public class CLITest extends PulsarTestSuite {
public void testTopicTerminationOnTopicsWithoutConnectedConsumers() throws Exception {
String topicName = "persistent://public/default/test-topic-termination";
BrokerContainer container = pulsarCluster.getAnyBroker();
+ container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "topics",
+ "create",
+ topicName);
+
ContainerExecResult result = container.execCmd(
PulsarCluster.CLIENT_SCRIPT,
"produce",
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 23beec8..f1e2d1a 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -547,6 +547,9 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
.build();
@Cleanup
+ PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
+ admin.topics().createNonPartitionedTopic(outputTopicName);
+ @Cleanup
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(outputTopicName)
.subscriptionName("source-tester")
@@ -869,6 +872,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) {
+ admin.topics().createNonPartitionedTopic(inputTopicName);
+ admin.topics().createNonPartitionedTopic(outputTopicName);
retryStrategically((test) -> {
try {
return admin.topics().getStats(inputTopicName).subscriptions.size() == 1;
@@ -957,7 +962,10 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
String inputTopicName = "test-" + type + "-count-window-" + functionRuntimeType + "-input-" + randomName(8);
String outputTopicName = "test-" + type + "-count-window-" + functionRuntimeType + "-output-" + randomName(8);
-
+ try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) {
+ admin.topics().createNonPartitionedTopic(inputTopicName);
+ admin.topics().createNonPartitionedTopic(outputTopicName);
+ }
CommandGenerator generator = CommandGenerator.createDefaultGenerator(
inputTopicName,
@@ -1114,6 +1122,10 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
String inputTopicName = "persistent://public/default/test-neg-ack-" + runtime + "-input-" + randomName(8);
String outputTopicName = "test-neg-ack-" + runtime + "-output-" + randomName(8);
+ try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) {
+ admin.topics().createNonPartitionedTopic(inputTopicName);
+ admin.topics().createNonPartitionedTopic(outputTopicName);
+ }
String functionName = "test-neg-ack-fn-" + randomName(8);
final int numMessages = 20;
@@ -1290,6 +1302,10 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
String inputTopicName = "persistent://public/default/test-publish-" + runtime + "-input-" + randomName(8);
String outputTopicName = "test-publish-" + runtime + "-output-" + randomName(8);
+ try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) {
+ admin.topics().createNonPartitionedTopic(inputTopicName);
+ admin.topics().createNonPartitionedTopic(outputTopicName);
+ }
String functionName = "test-publish-fn-" + randomName(8);
final int numMessages = 10;
@@ -1416,6 +1432,10 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
String inputTopicName = "persistent://public/default/test-exclamation-" + runtime + "-input-" + randomName(8);
String outputTopicName = "test-exclamation-" + runtime + "-output-" + randomName(8);
+ try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) {
+ admin.topics().createNonPartitionedTopic(inputTopicName);
+ admin.topics().createNonPartitionedTopic(outputTopicName);
+ }
if (isTopicPattern) {
@Cleanup PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
@@ -1928,6 +1948,11 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
.build();
@Cleanup
+ PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
+ admin.topics().createNonPartitionedTopic(consumeTopicName);
+ admin.topics().createNonPartitionedTopic(outputTopicName);
+
+ @Cleanup
Consumer<KeyValue<byte[], byte[]>> consumer = client.newConsumer(KeyValueSchema.kvBytes())
.topic(consumeTopicName)
.subscriptionName("debezium-source-tester")