You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/30 02:27:38 UTC
[pulsar] branch branch-2.7 updated: [Branch-2.7] Fix create
partitioned topic in replicated namespace (#11140)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 0fb9c52 [Branch-2.7] Fix create partitioned topic in replicated namespace (#11140)
0fb9c52 is described below
commit 0fb9c52cf07edc647359fcb84939363e0fc26ca4
Author: ran <ga...@126.com>
AuthorDate: Wed Jun 30 10:26:51 2021 +0800
[Branch-2.7] Fix create partitioned topic in replicated namespace (#11140)
Because there are many conflicts to cherry-pick the PR https://github.com/apache/pulsar/pull/10963 to branch-2.7, so add a new PR for branch-2.7.
Fixes https://github.com/apache/pulsar/issues/10673 Bug-2
### Motivation
Currently, create a partitioned topic in the replicated namespace will not create metadata path `/managed-ledgers` on replicated clusters.
### Modifications
Add a new flag `createLocalTopicOnly` to indicate whether create the partitioned path in replicated clusters or not.
If the flag is false, make remote calls to create partitioned topics on replicated clusters.
---
.../apache/pulsar/broker/admin/AdminResource.java | 130 ++++++++++++++-------
.../broker/admin/v1/NonPersistentTopics.java | 15 ++-
.../pulsar/broker/admin/v1/PersistentTopics.java | 13 ++-
.../broker/admin/v2/NonPersistentTopics.java | 5 +-
.../pulsar/broker/admin/v2/PersistentTopics.java | 5 +-
.../org/apache/pulsar/broker/admin/AdminTest.java | 2 +-
.../pulsar/broker/admin/PersistentTopicsTest.java | 20 ++--
.../pulsar/broker/service/ReplicatorTest.java | 57 +++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 8 +-
9 files changed, 188 insertions(+), 67 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 5c87776..3e26aa7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.Constants;
@@ -811,7 +812,8 @@ public abstract class AdminResource extends PulsarWebResource {
return partitionedTopics;
}
- protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions) {
+ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions,
+ boolean createLocalTopicOnly) {
final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
try {
validateAdminAccessForTenant(topicName.getTenant());
@@ -828,56 +830,102 @@ public abstract class AdminResource extends PulsarWebResource {
asyncResponse.resume(new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be less than or equal to " + maxPartitions));
return;
}
+
+ List<CompletableFuture<Void>> createFutureList = new ArrayList<>();
+
+ CompletableFuture<Void> createLocalFuture = new CompletableFuture<>();
+ createFutureList.add(createLocalFuture);
checkTopicExistsAsync(topicName).thenAccept(exists -> {
if (exists) {
log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
- asyncResponse.resume(new RestException(Status.CONFLICT, "This topic already exists"));
- } else {
+ createLocalFuture.completeExceptionally(
+ new RestException(Status.CONFLICT, "This topic already exists"));
+ return;
+ }
- try {
- String path = ZkAdminPaths.partitionedTopicPath(topicName);
- byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
- zkCreateOptimisticAsync(globalZk(), path, data, (rc, s, o, s1) -> {
- if (KeeperException.Code.OK.intValue() == rc) {
- globalZk().sync(path, (rc2, s2, ctx) -> {
- if (KeeperException.Code.OK.intValue() == rc2) {
- log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
- tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
- log.info("[{}] Successfully created partitions for topic {}", clientAppId(), topicName);
- asyncResponse.resume(Response.noContent().build());
- }).exceptionally(e -> {
- log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName);
- // The partitioned topic is created but there are some partitions create failed
- asyncResponse.resume(new RestException(e));
- return null;
- });
- } else {
- log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc2)));
- asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc2))));
- }
- }, null);
- } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
- log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
- asyncResponse.resume(new RestException(Status.CONFLICT, "Partitioned topic already exists"));
- } else if (KeeperException.Code.BADVERSION.intValue() == rc) {
- log.warn("[{}] Failed to create partitioned topic {}: concurrent modification", clientAppId(),
- topicName);
- asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
- } else {
- log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc)));
- asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc))));
+ provisionPartitionedTopicPath(numPartitions, createLocalTopicOnly)
+ .thenCompose(ignored -> tryCreatePartitionsAsync(numPartitions))
+ .whenComplete((ignored, ex) -> {
+ if (ex != null) {
+ createLocalFuture.completeExceptionally(ex);
+ return;
}
+ createLocalFuture.complete(null);
});
- } catch (Exception e) {
- log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- }
- }
}).exceptionally(ex -> {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, ex);
- resumeAsyncResponseExceptionally(asyncResponse, ex);
+ createLocalFuture.completeExceptionally(ex);
return null;
});
+
+ if (!createLocalTopicOnly && topicName.isGlobal() && isNamespaceReplicated(namespaceName)) {
+ getNamespaceReplicatedClusters(namespaceName)
+ .stream()
+ .filter(cluster -> !cluster.equals(pulsar().getConfiguration().getClusterName()))
+ .forEach(cluster -> createFutureList.add(
+ ((TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster).topics())
+ .createPartitionedTopicAsync(
+ topicName.getPartitionedTopicName(), numPartitions, true)));
+ }
+
+ FutureUtil.waitForAll(createFutureList).whenComplete((ignored, ex) -> {
+ if (ex != null) {
+ log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName, ex.getCause());
+ if (ex.getCause() instanceof RestException) {
+ asyncResponse.resume(ex.getCause());
+ } else {
+ resumeAsyncResponseExceptionally(asyncResponse, ex.getCause());
+ }
+ return;
+ }
+ log.info("[{}] Successfully created partitions for topic {} in cluster {}",
+ clientAppId(), topicName, pulsar().getConfiguration().getClusterName());
+ asyncResponse.resume(Response.noContent().build());
+ });
+ }
+
+ private CompletableFuture<Void> provisionPartitionedTopicPath(int numPartitions, boolean createLocalTopicOnly) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ try {
+ String path = ZkAdminPaths.partitionedTopicPath(topicName);
+ byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
+ zkCreateOptimisticAsync(globalZk(), path, data, (rc, s, o, s1) -> {
+ if (KeeperException.Code.OK.intValue() == rc) {
+ globalZk().sync(path, (rc2, s2, ctx) -> {
+ if (KeeperException.Code.OK.intValue() == rc2) {
+ log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
+ future.complete(null);
+ } else {
+ log.error("[{}] Failed to create partitioned topic {}",
+ clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc2)));
+ future.completeExceptionally(
+ new RestException(KeeperException.create(KeeperException.Code.get(rc2))));
+ }
+ }, null);
+ } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
+ if (createLocalTopicOnly) {
+ future.complete(null);
+ return;
+ }
+ log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
+ future.completeExceptionally(
+ new RestException(Status.CONFLICT, "Partitioned topic already exists"));
+ } else if (KeeperException.Code.BADVERSION.intValue() == rc) {
+ log.warn("[{}] Failed to create partitioned topic {}: concurrent modification", clientAppId(),
+ topicName);
+ future.completeExceptionally(new RestException(Status.CONFLICT, "Concurrent modification"));
+ } else {
+ log.error("[{}] Failed to create partitioned topic {}",
+ clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc)));
+ future.completeExceptionally(
+ new RestException(KeeperException.create(KeeperException.Code.get(rc))));
+ }
+ });
+ } catch (Exception e) {
+ log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
+ future.completeExceptionally(e);
+ }
+ return future;
}
/**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 06a9f92..b872f64 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -139,14 +139,17 @@ public class NonPersistentTopics extends PersistentTopics {
@ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal"
+ " to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic already exist")})
- public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse,
- @PathParam("property") String property, @PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace, @PathParam("topic") @Encoded
- String encodedTopic,
- int numPartitions) {
+ public void createPartitionedTopic(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ int numPartitions,
+ @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
- internalCreatePartitionedTopic(asyncResponse, numPartitions);
+ internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 1ba1467..4038dcb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -150,12 +150,17 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic already exist") })
- public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
- int numPartitions) {
+ public void createPartitionedTopic(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ int numPartitions,
+ @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
- internalCreatePartitionedTopic(asyncResponse, numPartitions);
+ internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 769e60d..26a9fe2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -182,13 +182,14 @@ public class NonPersistentTopics extends PersistentTopics {
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "The number of partitions for the topic", required = true, type = "int", defaultValue = "0")
- int numPartitions) {
+ int numPartitions,
+ @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) {
try {
validateGlobalNamespaceOwnership(tenant,namespace);
validateTopicName(tenant, namespace, encodedTopic);
validateAdminAccessForTenant(topicName.getTenant());
- internalCreatePartitionedTopic(asyncResponse, numPartitions);
+ internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index d7a1805..9267c28 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -216,12 +216,13 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "The number of partitions for the topic", required = true, type = "int", defaultValue = "0")
- int numPartitions) {
+ int numPartitions,
+ @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) {
try {
validateGlobalNamespaceOwnership(tenant,namespace);
validatePartitionedTopicName(tenant, namespace, encodedTopic);
validateAdminAccessForTenant(topicName.getTenant());
- internalCreatePartitionedTopic(asyncResponse, numPartitions);
+ internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index c80e727..2eab88f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -724,7 +724,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists.newArrayList());
response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
- persistentTopics.createPartitionedTopic(response, property, cluster, namespace, topic, 5);
+ persistentTopics.createPartitionedTopic(response, property, cluster, namespace, topic, 5, false);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 0a208a8..485b42c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -172,7 +172,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
// 3) Create the partitioned topic
response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
- persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 3);
+ persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 3, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
@@ -293,7 +293,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
// 3) Create the partitioned topic
AsyncResponse response = mock(AsyncResponse.class);
- persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 1);
+ persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 1, true);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
@@ -378,7 +378,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
doReturn(CompletableFuture.completedFuture(ImmutableSet.of(nonPartitionTopicName1, nonPartitionTopicName2))).when(mockZooKeeperChildrenCache).getAsync(anyString());
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class);
- persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5);
+ persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5, true);
verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.CONFLICT.getStatusCode());
}
@@ -402,7 +402,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
doNothing().when(persistentTopics).validateAdminAccessForTenant(anyString());
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
- persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5);
+ persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
persistentTopics.updatePartitionedTopic(testTenant, testNamespace, partitionedTopicName, true, false, 10);
@@ -431,7 +431,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
// 3) create partitioned topic and unload
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
- persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 6);
+ persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 6, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
@@ -461,13 +461,13 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
public void testGetPartitionedTopicsList() throws KeeperException, InterruptedException, PulsarAdminException {
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
- persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, "test-topic1", 3);
+ persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, "test-topic1", 3, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
- nonPersistentTopic.createPartitionedTopic(response, testTenant, testNamespace, "test-topic2", 3);
+ nonPersistentTopic.createPartitionedTopic(response, testTenant, testNamespace, "test-topic2", 3, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
@@ -499,7 +499,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
final int numPartitions = 5;
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
- persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, numPartitions);
+ persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, numPartitions, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
@@ -539,7 +539,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
final int numPartitions = 5;
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
- persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, numPartitions);
+ persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, numPartitions, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
String role = "role";
@@ -582,7 +582,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
// create partitioned topic and compaction on it
response = mock(AsyncResponse.class);
- persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 2);
+ persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 2, true);
persistentTopics.compact(response, testTenant, testNamespace, partitionTopicName, true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 75e214f..f3e90b1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -35,6 +35,7 @@ import io.netty.buffer.ByteBuf;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
+import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.SortedSet;
@@ -58,9 +59,11 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.commons.lang.RandomStringUtils;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -85,6 +88,7 @@ import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
@@ -1065,6 +1069,59 @@ public class ReplicatorTest extends ReplicatorTestBase {
consumer.close();
}
+
+ @Test
+ public void createPartitionedTopicTest() throws Exception {
+ final String cluster1 = pulsar1.getConfig().getClusterName();
+ final String cluster2 = pulsar2.getConfig().getClusterName();
+ final String cluster3 = pulsar3.getConfig().getClusterName();
+ final String namespace = "pulsar/ns-" + RandomStringUtils.randomAlphabetic(5).toLowerCase();
+
+ final String persistentPartitionedTopic = "persistent://" + namespace + "/partitioned-topic";
+ final String persistentNonPartitionedTopic = "persistent://" + namespace + "/non-partitioned-topic";
+ final String nonPersistentPartitionedTopic = "non-persistent://" + namespace + "/partitioned-topic";
+ final String nonPersistentNonPartitionedTopic = "non-persistent://" + namespace + "/non-partitioned-topic";
+ final int numPartitions = 3;
+
+ admin1.namespaces().createNamespace(namespace, Sets.newHashSet(cluster1, cluster2, cluster3));
+ admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
+
+ admin1.topics().createPartitionedTopic(persistentPartitionedTopic, numPartitions);
+ admin1.topics().createPartitionedTopic(nonPersistentPartitionedTopic, numPartitions);
+ admin1.topics().createNonPartitionedTopic(persistentNonPartitionedTopic);
+ admin1.topics().createNonPartitionedTopic(nonPersistentNonPartitionedTopic);
+
+ List<String> partitionedTopicList = admin1.topics().getPartitionedTopicList(namespace);
+ Assert.assertTrue(partitionedTopicList.contains(persistentPartitionedTopic));
+ Assert.assertTrue(partitionedTopicList.contains(nonPersistentPartitionedTopic));
+
+ // expected topic list didn't contain non-persistent-non-partitioned topic,
+ // because this model topic didn't create path in local metadata store.
+ List<String> expectedTopicList = Lists.newArrayList(
+ persistentNonPartitionedTopic, nonPersistentNonPartitionedTopic);
+ TopicName pt = TopicName.get(persistentPartitionedTopic);
+ for (int i = 0; i < numPartitions; i++) {
+ expectedTopicList.add(pt.getPartition(i).toString());
+ }
+
+ checkListContainExpectedTopic(admin1, namespace, expectedTopicList);
+ checkListContainExpectedTopic(admin2, namespace, expectedTopicList);
+ checkListContainExpectedTopic(admin3, namespace, expectedTopicList);
+ }
+
+ private void checkListContainExpectedTopic(PulsarAdmin admin, String namespace, List<String> expectedTopicList) {
+ // wait non-partitioned topics replicators created finished
+ final List<String> list = new ArrayList<>();
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> {
+ list.clear();
+ list.addAll(admin.topics().getList(namespace));
+ return list.size() == expectedTopicList.size();
+ });
+ for (String expectTopic : expectedTopicList) {
+ Assert.assertTrue(list.contains(expectTopic));
+ }
+ }
+
private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 9f2a911..c68b0bc 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -366,9 +366,15 @@ public class TopicsImpl extends BaseResource implements Topics {
@Override
public CompletableFuture<Void> createPartitionedTopicAsync(String topic, int numPartitions) {
+ return createPartitionedTopicAsync(topic, numPartitions, false);
+ }
+
+ public CompletableFuture<Void> createPartitionedTopicAsync(
+ String topic, int numPartitions, boolean createLocalTopicOnly) {
checkArgument(numPartitions > 0, "Number of partitions should be more than 0");
TopicName tn = validateTopic(topic);
- WebTarget path = topicPath(tn, "partitions");
+ WebTarget path = topicPath(tn, "partitions")
+ .queryParam("createLocalTopicOnly", Boolean.toString(createLocalTopicOnly));
return asyncPutRequest(path, Entity.entity(numPartitions, MediaType.APPLICATION_JSON));
}