You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/06/18 21:07:06 UTC

[GitHub] [pulsar] congbobo184 commented on a change in pull request #10963: [Broker] Fix create partitioned topic in replicated namespace

congbobo184 commented on a change in pull request #10963:
URL: https://github.com/apache/pulsar/pull/10963#discussion_r654312778



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -672,55 +673,57 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
                     "Number of partitions should be less than or equal to " + maxPartitions));
             return;
         }
+
+        List<CompletableFuture<Void>> createFutureList = new ArrayList<>();
+
+        CompletableFuture<Void> createLocalFuture = new CompletableFuture<>();
+        createFutureList.add(createLocalFuture);
         checkTopicExistsAsync(topicName).thenAccept(exists -> {
             if (exists) {
                 log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
                 asyncResponse.resume(new RestException(Status.CONFLICT, "This topic already exists"));

Review comment:
       if local exist but remote cluster don't have, do we need to check this logical?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -672,55 +673,57 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
                     "Number of partitions should be less than or equal to " + maxPartitions));
             return;
         }
+
+        List<CompletableFuture<Void>> createFutureList = new ArrayList<>();
+
+        CompletableFuture<Void> createLocalFuture = new CompletableFuture<>();
+        createFutureList.add(createLocalFuture);
         checkTopicExistsAsync(topicName).thenAccept(exists -> {
             if (exists) {
                 log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
                 asyncResponse.resume(new RestException(Status.CONFLICT, "This topic already exists"));
-            } else {
-
-                try {
-                    String path = ZkAdminPaths.partitionedTopicPath(topicName);
-                    namespaceResources().getPartitionedTopicResources()
-                            .createAsync(path, new PartitionedTopicMetadata(numPartitions)).thenAccept(r -> {
-                                log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
-                                tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
-                                    log.info("[{}] Successfully created partitions for topic {}", clientAppId(),
-                                            topicName);
-                                    asyncResponse.resume(Response.noContent().build());
-                                }).exceptionally(e -> {
-                                    log.error("[{}] Failed to create partitions for topic {}", clientAppId(),
-                                            topicName);
-                                    // The partitioned topic is created but there are some partitions create failed
-                                    asyncResponse.resume(new RestException(e));
-                                    return null;
-                                });
-                            }).exceptionally(ex -> {
-                                if (ex.getCause() instanceof AlreadyExistsException) {
-                                    log.warn("[{}] Failed to create already existing partitioned topic {}",
-                                            clientAppId(), topicName);
-                                    asyncResponse.resume(
-                                            new RestException(Status.CONFLICT, "Partitioned topic already exists"));
-                                } else if (ex.getCause() instanceof BadVersionException) {
-                                    log.warn("[{}] Failed to create partitioned topic {}: concurrent modification",
-                                            clientAppId(), topicName);
-                                    asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
-                                } else {
-                                    log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName,
-                                            ex.getCause());
-                                    asyncResponse.resume(new RestException(ex.getCause()));
-                                }
-                                return null;
-                            });
-                } catch (Exception e) {
-                    log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
-                    resumeAsyncResponseExceptionally(asyncResponse, e);
-                }
+                return;
             }
+
+            provisionPartitionedTopicPath(asyncResponse, numPartitions, createLocalTopicOnly)
+                    .thenCompose(ignored -> tryCreatePartitionsAsync(numPartitions))
+                    .whenComplete((ignored, ex) -> {
+                        if (ex != null) {
+                            createLocalFuture.completeExceptionally(ex);
+                            return;
+                        }
+                        createLocalFuture.complete(null);

Review comment:
       `createFutureList.add(tryCreatePartitionsAsync(numPartitions))`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org