You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/03/06 08:50:23 UTC
[pulsar] branch master updated: Fix create partitioned topic with a
substring of an existing topic name. (#6478)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 19ccfd5 Fix create partitioned topic with a substring of an existing topic name. (#6478)
19ccfd5 is described below
commit 19ccfd5c60020a32bceeca128a9846ca006f0dc7
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Mar 6 16:50:10 2020 +0800
Fix create partitioned topic with a substring of an existing topic name. (#6478)
Fixes #6468
Fix create a partitioned topic with a substring of an existing topic name. And make create partitioned topic async.
---
.../apache/pulsar/broker/admin/AdminResource.java | 114 ++++++++++++++++++++-
.../broker/admin/impl/PersistentTopicsBase.java | 97 +++---------------
.../broker/admin/v1/NonPersistentTopics.java | 34 +-----
.../pulsar/broker/admin/v1/PersistentTopics.java | 16 ++-
.../broker/admin/v2/NonPersistentTopics.java | 37 ++-----
.../pulsar/broker/admin/v2/PersistentTopics.java | 29 ++++--
.../apache/pulsar/broker/admin/AdminApiTest.java | 8 +-
.../org/apache/pulsar/broker/admin/AdminTest.java | 6 +-
.../pulsar/broker/admin/PersistentTopicsTest.java | 57 ++++++++---
.../pulsar/broker/admin/v1/V1_AdminApiTest.java | 4 +-
10 files changed, 227 insertions(+), 175 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 722da4f..a216982 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Lists;
import java.net.MalformedURLException;
import java.net.URI;
+import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -36,6 +37,7 @@ import java.util.stream.Collectors;
import javax.servlet.ServletContext;
import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
@@ -46,6 +48,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
@@ -255,16 +258,19 @@ public abstract class AdminResource extends PulsarWebResource {
return namespaces;
}
- protected void tryCreatePartitionsAsync(int numPartitions) {
+ protected CompletableFuture<Void> tryCreatePartitionsAsync(int numPartitions) {
if (!topicName.isPersistent()) {
- return;
+ return CompletableFuture.completedFuture(null);
}
+ List<CompletableFuture<Void>> futures = new ArrayList<>(numPartitions);
for (int i = 0; i < numPartitions; i++) {
- tryCreatePartitionAsync(i);
+ futures.add(tryCreatePartitionAsync(i, null));
}
+ return FutureUtil.waitForAll(futures);
}
- private void tryCreatePartitionAsync(final int partition) {
+ private CompletableFuture<Void> tryCreatePartitionAsync(final int partition, CompletableFuture<Void> reuseFuture) {
+ CompletableFuture<Void> result = reuseFuture == null ? new CompletableFuture<>() : reuseFuture;
zkCreateOptimisticAsync(localZk(), ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0],
(rc, s, o, s1) -> {
if (KeeperException.Code.OK.intValue() == rc) {
@@ -272,18 +278,22 @@ public abstract class AdminResource extends PulsarWebResource {
log.debug("[{}] Topic partition {} created.", clientAppId(),
topicName.getPartition(partition));
}
+ result.complete(null);
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
log.info("[{}] Topic partition {} is exists, doing nothing.", clientAppId(),
topicName.getPartition(partition));
+ result.completeExceptionally(KeeperException.create(KeeperException.Code.NODEEXISTS));
} else if (KeeperException.Code.BADVERSION.intValue() == rc) {
log.warn("[{}] Fail to create topic partition {} with concurrent modification, retry now.",
clientAppId(), topicName.getPartition(partition));
- tryCreatePartitionAsync(partition);
+ tryCreatePartitionAsync(partition, result);
} else {
log.error("[{}] Fail to create topic partition {}", clientAppId(),
topicName.getPartition(partition), KeeperException.create(KeeperException.Code.get(rc)));
+ result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
}
});
+ return result;
}
protected NamespaceName namespaceName;
@@ -707,4 +717,98 @@ public abstract class AdminResource extends PulsarWebResource {
partitionedTopics.sort(null);
return partitionedTopics;
}
+
+ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions) {
+ try {
+ validateAdminAccessForTenant(topicName.getTenant());
+ } catch (Exception e) {
+ log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
+ if (numPartitions <= 0) {
+ asyncResponse.resume(new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0"));
+ return;
+ }
+ checkTopicExistsAsync(topicName).thenAccept(exists -> {
+ if (exists) {
+ log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
+ asyncResponse.resume(new RestException(Status.CONFLICT, "This topic already exists"));
+ } else {
+ try {
+ String path = ZkAdminPaths.partitionedTopicPath(topicName);
+ byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
+ zkCreateOptimisticAsync(globalZk(), path, data, (rc, s, o, s1) -> {
+ if (KeeperException.Code.OK.intValue() == rc) {
+ globalZk().sync(path, (rc2, s2, ctx) -> {
+ if (KeeperException.Code.OK.intValue() == rc2) {
+ log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
+ tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
+ log.info("[{}] Successfully created partitions for topic {}", clientAppId(), topicName);
+ asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(e -> {
+ log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName);
+ // The partitioned topic is created but there are some partitions create failed
+ asyncResponse.resume(new RestException(e));
+ return null;
+ });
+ } else {
+ log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc2)));
+ asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc2))));
+ }
+ }, null);
+ } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
+ log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
+ asyncResponse.resume(new RestException(Status.CONFLICT, "Partitioned topic already exists"));
+ } else if (KeeperException.Code.BADVERSION.intValue() == rc) {
+ log.warn("[{}] Failed to create partitioned topic {}: concurrent modification", clientAppId(),
+ topicName);
+ asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+ } else {
+ log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc)));
+ asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc))));
+ }
+ });
+ } catch (Exception e) {
+ log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ }
+ }
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+
+ /**
+ * Check the exists topics contains the given topic.
+ * Since there are topic partitions and non-partitioned topics in Pulsar, must ensure both partitions
+ * and non-partitioned topics are not duplicated. So, if compare with a partition name, we should compare
+ * to the partitioned name of this partition.
+ *
+ * @param topicName given topic name
+ */
+ protected CompletableFuture<Boolean> checkTopicExistsAsync(TopicName topicName) {
+ return pulsar().getNamespaceService().getListOfTopics(topicName.getNamespaceObject(),
+ PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
+ .thenCompose(topics -> {
+ boolean exists = false;
+ for (String topic : topics) {
+ if (topicName.getPartitionedTopicName().equals(TopicName.get(topic).getPartitionedTopicName())) {
+ exists = true;
+ break;
+ }
+ }
+ return CompletableFuture.completedFuture(exists);
+ });
+ }
+
+ protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable throwable) {
+ if (throwable instanceof WebApplicationException) {
+ asyncResponse.resume(throwable);
+ } else {
+ asyncResponse.resume(new RestException(throwable));
+ }
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index e37f09d..f2a95d1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.broker.admin.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-import org.apache.pulsar.common.api.proto.PulsarApi;
+
import static org.apache.pulsar.common.util.Codec.decode;
import com.github.zafarkhaja.semver.Version;
@@ -390,46 +390,6 @@ public class PersistentTopicsBase extends AdminResource {
revokePermissions(topicName.toString(), role);
}
- protected void internalCreatePartitionedTopic(int numPartitions) {
- validateAdminAccessForTenant(topicName.getTenant());
- if (numPartitions <= 0) {
- throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
- }
- validatePartitionTopicName(topicName.getLocalName());
- try {
- boolean topicExist = pulsar().getNamespaceService()
- .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
- .join()
- .contains(topicName.toString());
- if (topicExist) {
- log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
- throw new RestException(Status.CONFLICT, "This topic already exists");
- }
- } catch (Exception e) {
- log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
- throw new RestException(e);
- }
- try {
- String path = ZkAdminPaths.partitionedTopicPath(topicName);
- byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
- zkCreateOptimistic(path, data);
- tryCreatePartitionsAsync(numPartitions);
- // Sync data to all quorums and the observers
- zkSync(path);
- log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
- } catch (KeeperException.NodeExistsException e) {
- log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
- throw new RestException(Status.CONFLICT, "Partitioned topic already exists");
- } catch (KeeperException.BadVersionException e) {
- log.warn("[{}] Failed to create partitioned topic {}: concurrent modification", clientAppId(),
- topicName);
- throw new RestException(Status.CONFLICT, "Concurrent modification");
- } catch (Exception e) {
- log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
- throw new RestException(e);
- }
- }
-
protected void internalCreateNonPartitionedTopic(boolean authoritative) {
validateAdminAccessForTenant(topicName.getTenant());
validateNonPartitionTopicName(topicName.getLocalName());
@@ -540,11 +500,22 @@ public class PersistentTopicsBase extends AdminResource {
}
}
- protected void internalCreateMissedPartitions() {
- PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName, false, false);
- if (metadata != null) {
- tryCreatePartitionsAsync(metadata.partitions);
- }
+ protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
+ getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> {
+ if (metadata != null) {
+ tryCreatePartitionsAsync(metadata.partitions).thenAccept(v -> {
+ asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(e -> {
+ log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return null;
+ });
+ }
+ }).exceptionally(e -> {
+ log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return null;
+ });
}
private CompletableFuture<Void> updatePartitionInOtherCluster(int numPartitions, Set<String> clusters) {
@@ -2072,40 +2043,6 @@ public class PersistentTopicsBase extends AdminResource {
}
/**
- * Validate partitioned topic name.
- * Validation will fail and throw RestException if
- * 1) There's already a partitioned topic with same topic name and have some of its partition created.
- * 2) There's already non partition topic with same name and contains partition suffix "-partition-"
- * followed by numeric value. In this case internal created partition of partitioned topic could override
- * the existing non partition topic.
- *
- * @param topicName
- */
- private void validatePartitionTopicName(String topicName) {
- List<String> existingTopicList = internalGetList();
- String prefix = topicName + TopicName.PARTITIONED_TOPIC_SUFFIX;
- for (String existingTopicName : existingTopicList) {
- if (existingTopicName.contains(prefix)) {
- try {
- Long.parseLong(existingTopicName.substring(
- existingTopicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX)
- + TopicName.PARTITIONED_TOPIC_SUFFIX.length()));
- log.warn("[{}] Already have topic {} which contains partition " +
- "suffix '-partition-' and end with numeric value. Creation of partitioned topic {}"
- + "could cause conflict.", clientAppId(), existingTopicName, topicName);
- throw new RestException(Status.PRECONDITION_FAILED,
- "Already have topic " + existingTopicName + " which contains partition suffix '-partition-' " +
- "and end with numeric value, Creation of partitioned topic " + topicName +
- " could cause conflict.");
- } catch (NumberFormatException e) {
- // Do nothing, if value after partition suffix is not pure numeric value,
- // as it can't conflict with internal created partitioned topic's name.
- }
- }
- }
- }
-
- /**
* Validate non partition topic name,
* Validation will fail and throw RestException if
* 1) Topic name contains partition suffix "-partition-" and the remaining part follow the partition
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 4bc0ddf..2338b0f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -124,41 +124,15 @@ public class NonPersistentTopics extends PersistentTopics {
@ApiOperation(hidden = true, value = "Create a partitioned topic.", notes = "It needs to be called before creating a producer on a partitioned topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 409, message = "Partitioned topic already exist") })
- public void createPartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
+ public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
int numPartitions) {
- validateTopicName(property, cluster, namespace, encodedTopic);
- validateAdminAccessForTenant(topicName.getTenant());
- if (numPartitions <= 0) {
- throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
- }
try {
- boolean topicExist = pulsar().getNamespaceService()
- .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
- .join()
- .contains(topicName.toString());
- if (topicExist) {
- log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
- throw new RestException(Status.CONFLICT, "This topic already exists");
- }
- } catch (Exception e) {
- log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
- throw new RestException(e);
- }
- try {
- String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(),
- topicName.getEncodedLocalName());
- byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
- zkCreateOptimistic(path, data);
- // Sync data to all quorums and the observers
- zkSync(path);
- log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
- } catch (KeeperException.NodeExistsException e) {
- log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
- throw new RestException(Status.CONFLICT, "Partitioned topic already exist");
+ validateTopicName(property, cluster, namespace, encodedTopic);
+ internalCreatePartitionedTopic(asyncResponse, numPartitions);
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
- throw new RestException(e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 836ca14..362adc8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -57,7 +57,8 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
-import javax.ws.rs.container.AsyncResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
*/
@@ -66,7 +67,7 @@ import javax.ws.rs.container.AsyncResponse;
@Api(value = "/persistent", description = "Persistent topic admin apis", tags = "persistent topic", hidden = true)
@SuppressWarnings("deprecation")
public class PersistentTopics extends PersistentTopicsBase {
-
+ private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
@GET
@Path("/{property}/{cluster}/{namespace}")
@ApiOperation(hidden = true, value = "Get the list of topics under a namespace.", response = String.class, responseContainer = "List")
@@ -147,11 +148,16 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 409, message = "Partitioned topic already exist") })
- public void createPartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
+ public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
int numPartitions) {
- validateTopicName(property, cluster, namespace, encodedTopic);
- internalCreatePartitionedTopic(numPartitions);
+ try {
+ validateTopicName(property, cluster, namespace, encodedTopic);
+ internalCreatePartitionedTopic(asyncResponse, numPartitions);
+ } catch (Exception e) {
+ log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ }
}
/**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 7e88eed..3756f82 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -164,6 +164,7 @@ public class NonPersistentTopics extends PersistentTopics {
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration"),
})
public void createPartitionedTopic(
+ @Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@@ -172,39 +173,15 @@ public class NonPersistentTopics extends PersistentTopics {
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "The number of partitions for the topic", required = true, type = "int", defaultValue = "0")
int numPartitions) {
- validateGlobalNamespaceOwnership(tenant,namespace);
- validateTopicName(tenant, namespace, encodedTopic);
- validateAdminAccessForTenant(topicName.getTenant());
- if (numPartitions <= 0) {
- throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
- }
- try {
- boolean topicExist = pulsar().getNamespaceService()
- .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
- .join()
- .contains(topicName.toString());
- if (topicExist) {
- log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
- throw new RestException(Status.CONFLICT, "This topic already exists");
- }
- } catch (Exception e) {
- log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
- throw new RestException(e);
- }
+
try {
- String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(),
- topicName.getEncodedLocalName());
- byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
- zkCreateOptimistic(path, data);
- // Sync data to all quorums and the observers
- zkSync(path);
- log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
- } catch (KeeperException.NodeExistsException e) {
- log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
- throw new RestException(Status.CONFLICT, "Partitioned topic already exists");
+ validateGlobalNamespaceOwnership(tenant,namespace);
+ validateTopicName(tenant, namespace, encodedTopic);
+ validateAdminAccessForTenant(topicName.getTenant());
+ internalCreatePartitionedTopic(asyncResponse, numPartitions);
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
- throw new RestException(e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 8c59fa5..b2fc28b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -55,6 +55,9 @@ import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.ApiParam;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import static org.apache.pulsar.common.util.Codec.decode;
/**
@@ -192,6 +195,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
})
public void createPartitionedTopic(
+ @Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@@ -200,9 +204,15 @@ public class PersistentTopics extends PersistentTopicsBase {
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "The number of partitions for the topic", required = true, type = "int", defaultValue = "0")
int numPartitions) {
- validateGlobalNamespaceOwnership(tenant,namespace);
- validatePartitionedTopicName(tenant, namespace, encodedTopic);
- internalCreatePartitionedTopic(numPartitions);
+ try {
+ validateGlobalNamespaceOwnership(tenant,namespace);
+ validatePartitionedTopicName(tenant, namespace, encodedTopic);
+ validateAdminAccessForTenant(topicName.getTenant());
+ internalCreatePartitionedTopic(asyncResponse, numPartitions);
+ } catch (Exception e) {
+ log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ }
}
@PUT
@@ -276,7 +286,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@POST
@Path("/{tenant}/{namespace}/{topic}/createMissedPartitions")
- @ApiOperation(value = "Create missed partitions of an existing partitioned topic.", notes = "This is a best-effort operation for create missed partitions of existing non-global partitioned-topic and does't throw any exceptions when create failed")
+ @ApiOperation(value = "Create missed partitions of an existing partitioned topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to adminisActions to be grantedtrate resources on this tenant"),
@@ -287,6 +297,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 500, message = "Internal server error")
})
public void createMissedPartitions(
+ @Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@@ -294,8 +305,12 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic) {
- validatePartitionedTopicName(tenant, namespace, encodedTopic);
- internalCreateMissedPartitions();
+ try {
+ validatePartitionedTopicName(tenant, namespace, encodedTopic);
+ internalCreateMissedPartitions(asyncResponse);
+ } catch (Exception e) {
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ }
}
@GET
@@ -1072,4 +1087,6 @@ public class PersistentTopics extends PersistentTopicsBase {
validateTopicName(tenant, namespace, encodedTopic);
return internalGetLastMessageId(authoritative);
}
+
+ private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index f0ee4f6..d832628 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -919,9 +919,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
try {
admin.topics().createPartitionedTopic(partitionedTopicName, 32);
fail("Should have failed as the partitioned topic already exists");
- } catch (PreconditionFailedException e) {
- // Expecting PreconditionFailedException instead of ConflictException as it'll
- // fail validation before actually try to create metadata in ZK.
+ } catch (ConflictException ignore) {
}
producer = client.newProducer(Schema.BYTES)
@@ -2010,6 +2008,10 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
} catch (PulsarAdminException e) {
assertTrue(e instanceof ConflictException);
}
+
+ // Check create partitioned topic with substring topic name
+ admin.topics().createPartitionedTopic("persistent://prop-xyz/ns1/create_substring_topic", 1);
+ admin.topics().createPartitionedTopic("persistent://prop-xyz/ns1/substring_topic", 1);
}
/**
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 00d1a31..3cae752 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -677,7 +677,11 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
verify(response, times(1)).resume(Lists.newArrayList());
// create topic
assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists.newArrayList());
- persistentTopics.createPartitionedTopic(property, cluster, namespace, topic, 5);
+ response = mock(AsyncResponse.class);
+ ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.createPartitionedTopic(response, property, cluster, namespace, topic, 5);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists
.newArrayList(String.format("persistent://%s/%s/%s/%s", property, cluster, namespace, topic)));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 1825d31..cac35a6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -56,6 +56,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
@@ -157,13 +158,17 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
"Partitioned Topic not found: persistent://my-tenant/my-namespace/topic-not-found-partition-0 has zero partitions");
// 3) Create the partitioned topic
- persistentTopics.createPartitionedTopic(testTenant, testNamespace, testLocalTopicName, 3);
+ response = mock(AsyncResponse.class);
+ ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 3);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// 4) Create a subscription
response = mock(AsyncResponse.class);
persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", true,
(MessageIdImpl) MessageId.earliest, false);
- ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
@@ -239,7 +244,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true);
}
- @Test(expectedExceptions = RestException.class)
+ @Test
public void testCreatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix() throws KeeperException, InterruptedException {
// Test the case in which user already has topic like topic-name-partition-123 created before we enforce the validation.
final String nonPartitionTopicName1 = "standard-topic";
@@ -250,7 +255,12 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
doReturn(mockLocalZooKeeperCacheService).when(pulsar).getLocalZkCacheService();
doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache();
doReturn(ImmutableSet.of(nonPartitionTopicName1, nonPartitionTopicName2)).when(mockZooKeeperChildrenCache).get(anyString());
- persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionedTopicName, 5);
+ doReturn(CompletableFuture.completedFuture(ImmutableSet.of(nonPartitionTopicName1, nonPartitionTopicName2))).when(mockZooKeeperChildrenCache).getAsync(anyString());
+ AsyncResponse response = mock(AsyncResponse.class);
+ ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class);
+ persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5);
+ verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
+ Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.CONFLICT.getStatusCode());
}
@Test(expectedExceptions = RestException.class)
@@ -263,13 +273,18 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
doReturn(mockLocalZooKeeperCacheService).when(pulsar).getLocalZkCacheService();
doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache();
doReturn(ImmutableSet.of(nonPartitionTopicName2)).when(mockZooKeeperChildrenCache).get(anyString());
+ doReturn(CompletableFuture.completedFuture(ImmutableSet.of(nonPartitionTopicName2))).when(mockZooKeeperChildrenCache).getAsync(anyString());
doAnswer(invocation -> {
persistentTopics.namespaceName = NamespaceName.get("tenant", "namespace");
persistentTopics.topicName = TopicName.get("persistent", "tenant", "cluster", "namespace", "topicname");
return null;
}).when(persistentTopics).validatePartitionedTopicName(any(), any(), any());
doNothing().when(persistentTopics).validateAdminAccessForTenant(anyString());
- persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionedTopicName, 5);
+ AsyncResponse response = mock(AsyncResponse.class);
+ ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
persistentTopics.updatePartitionedTopic(testTenant, testNamespace, partitionedTopicName, true, 10);
}
@@ -295,7 +310,11 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
// 3) create partitioned topic and unload
response = mock(AsyncResponse.class);
- persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionTopicName, 6);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 6);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+ response = mock(AsyncResponse.class);
persistentTopics.unloadTopic(response, testTenant, testNamespace, partitionTopicName, true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
@@ -320,10 +339,17 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
@Test
public void testGetPartitionedTopicsList() throws KeeperException, InterruptedException, PulsarAdminException {
+ AsyncResponse response = mock(AsyncResponse.class);
+ ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, "test-topic1", 3);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
- persistentTopics.createPartitionedTopic(testTenant, testNamespace, "test-topic1", 3);
-
- nonPersistentTopic.createPartitionedTopic(testTenant, testNamespace, "test-topic2", 3);
+ response = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ nonPersistentTopic.createPartitionedTopic(response, testTenant, testNamespace, "test-topic2", 3);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
List<String> persistentPartitionedTopics = persistentTopics.getPartitionedTopicList(testTenant, testNamespace);
@@ -351,7 +377,11 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
public void testGrantPartitionedTopic() {
final String partitionedTopicName = "partitioned-topic";
final int numPartitions = 5;
- persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionedTopicName, numPartitions);
+ AsyncResponse response = mock(AsyncResponse.class);
+ ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, numPartitions);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
String role = "role";
Set<AuthAction> expectActions = new HashSet<>();
@@ -387,8 +417,11 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
public void testRevokePartitionedTopic() {
final String partitionedTopicName = "partitioned-topic";
final int numPartitions = 5;
- persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionedTopicName, numPartitions);
-
+ AsyncResponse response = mock(AsyncResponse.class);
+ ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, numPartitions);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
String role = "role";
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index 373a6b7..78e3dc1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -879,9 +879,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
try {
admin.topics().createPartitionedTopic(partitionedTopicName, 32);
fail("Should have failed as the partitioned topic exists with its partition created");
- } catch (PreconditionFailedException e) {
- // Expecting PreconditionFailedException instead of ConflictException as it'll
- // fail validation before actually try to create metadata in ZK.
+ } catch (ConflictException ignore) {
}
producer = client.newProducer(Schema.BYTES)