You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/06/03 05:52:30 UTC

[GitHub] [pulsar] 315157973 commented on a change in pull request #10790: [broker] Add REST API to enable or disable replicated subscriptions

315157973 commented on a change in pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#discussion_r644498422



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,126 @@ protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean author
             });
         }
     }
+
+    protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName,
+            boolean authoritative, Boolean enabled) {
+        log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
+                topicName, subName);
+
+        if (enabled == null) {

Review comment:
       required = true, it is impossible to be null here, can we use boolean?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,134 @@ protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean author
             });
         }
     }
+
+    protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName,
+            boolean authoritative, Boolean enabled) {
+        log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
+                topicName, subName);
+
+        if (enabled == null) {
+            asyncResponse.resume(new RestException(Status.BAD_REQUEST, "Boolean type request body is required"));
+            return;
+        }
+
+        // Reject the request if the topic is not persistent
+        if (!topicName.isPersistent()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on non-persistent topics"));
+            return;
+        }
+
+        // Reject the request if the topic is not global
+        if (!topicName.isGlobal()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on non-global topics"));
+            return;
+        }
+
+        // Permission to consume this topic is required
+        try {
+            validateTopicOperation(topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // Redirect the request to the peer-cluster if the local cluster is not included in the replication clusters
+        try {
+            validateGlobalNamespaceOwnership(namespaceName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // If the topic name is a partition name, no need to get partition topic metadata again
+        if (topicName.isPartitioned()) {
+            internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative,
+                    enabled);
+        } else {
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions > 0) {
+                    final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+
+                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                        TopicName topicNamePartition = topicName.getPartition(i);
+                        try {
+                            futures.add(pulsar().getAdminClient().topics().setReplicatedSubscriptionStatusAsync(
+                                    topicNamePartition.toString(), subName, enabled));
+                        } catch (Exception e) {
+                            log.warn("[{}] Failed to change replicated subscription status to {} - {} {}",
+                                    clientAppId(), enabled, topicNamePartition, subName, e);
+                            resumeAsyncResponseExceptionally(asyncResponse, e);
+                            return;
+                        }
+                    }
+
+                    FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                        if (exception != null) {
+                            Throwable t = exception.getCause();
+                            if (t instanceof NotFoundException) {
+                                asyncResponse
+                                        .resume(new RestException(Status.NOT_FOUND, "Topic or subscription not found"));
+                                return null;
+                            } else if (t instanceof PreconditionFailedException) {
+                                asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
+                                        "Cannot enable/disable replicated subscriptions on non-global topics"));
+                                return null;
+                            } else {
+                                log.warn("[{}] Failed to change replicated subscription status to {} - {} {}",
+                                        clientAppId(), enabled, topicName, subName, t);
+                                asyncResponse.resume(new RestException(t));
+                                return null;
+                            }
+                        }
+
+                        asyncResponse.resume(Response.noContent().build());
+                        return null;
+                    });
+                } else {
+                    internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative,
+                            enabled);
+                }
+            }).exceptionally(ex -> {
+                log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
+                        topicName, subName, ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
+        }
+    }
+
+    private void internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse asyncResponse,
+            String subName, boolean authoritative, Boolean enabled) {

Review comment:
       Should we use boolean

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,134 @@ protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean author
             });
         }
     }
+
+    protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName,
+            boolean authoritative, Boolean enabled) {
+        log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
+                topicName, subName);
+
+        if (enabled == null) {
+            asyncResponse.resume(new RestException(Status.BAD_REQUEST, "Boolean type request body is required"));
+            return;
+        }
+
+        // Reject the request if the topic is not persistent
+        if (!topicName.isPersistent()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on non-persistent topics"));
+            return;
+        }
+
+        // Reject the request if the topic is not global
+        if (!topicName.isGlobal()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on non-global topics"));
+            return;
+        }
+
+        // Permission to consume this topic is required
+        try {
+            validateTopicOperation(topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // Redirect the request to the peer-cluster if the local cluster is not included in the replication clusters
+        try {
+            validateGlobalNamespaceOwnership(namespaceName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // If the topic name is a partition name, no need to get partition topic metadata again
+        if (topicName.isPartitioned()) {

Review comment:
       Is there missing one `!` 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org