You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/06/03 02:01:04 UTC

[GitHub] [pulsar] massakam opened a new pull request #10790: [broker] Add REST API to enable or disable replicated subscriptions

massakam opened a new pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790


   ### Motivation
   
   Currently, once a replicated subscription is created on a topic, there is no way for users to disable it. I think it is useful to have a REST API that allows users to enable or disable replicated subscriptions.
   
   ### Modifications
   
   Added the following REST API endpoints:
   ```
   /admin/persistent/{tenant}/{cluster}/{namespace}/{topic}/subscription/{subName}/replicatedSubscriptionStatus
   /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{subName}/replicatedSubscriptionStatus
   ```
   We can enable/disable a replicated subscription by posting true or false to these endpoints.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10790: [broker] Add REST API to enable or disable replicated subscriptions

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#discussion_r644513191



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,126 @@ protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean author
             });
         }
     }
+
+    protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName,
+            boolean authoritative, Boolean enabled) {
+        log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
+                topicName, subName);
+
+        if (enabled == null) {

Review comment:
       `enabled` can be null if the request body is empty.
   ```sh
   $ curl -i \
     -X POST \
     -d '' \
     -H 'Content-Type: application/json' \
     http://localhost:8080/admin/v2/persistent/public/default/massakam/subscription/sub/replicatedSubscriptionStatus
   
   HTTP/1.1 400 Bad Request
   Date: Thu, 03 Jun 2021 06:04:10 GMT
   broker-address: localhost
   Content-Type: application/json
   Content-Length: 50
   Server: Jetty(9.4.39.v20210325)
   
   {"reason":"Boolean type request body is required"}
   ```
   I think the "required = true" specification is just an option used when generating Swagger documentation.
   https://docs.swagger.io/swagger-core/v1.5.0/apidocs/io/swagger/annotations/ApiParam.html

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,134 @@ protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean author
             });
         }
     }
+
+    protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName,
+            boolean authoritative, Boolean enabled) {
+        log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
+                topicName, subName);
+
+        if (enabled == null) {
+            asyncResponse.resume(new RestException(Status.BAD_REQUEST, "Boolean type request body is required"));
+            return;
+        }
+
+        // Reject the request if the topic is not persistent
+        if (!topicName.isPersistent()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on non-persistent topics"));
+            return;
+        }
+
+        // Reject the request if the topic is not global
+        if (!topicName.isGlobal()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on non-global topics"));
+            return;
+        }
+
+        // Permission to consume this topic is required
+        try {
+            validateTopicOperation(topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // Redirect the request to the peer-cluster if the local cluster is not included in the replication clusters
+        try {
+            validateGlobalNamespaceOwnership(namespaceName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // If the topic name is a partition name, no need to get partition topic metadata again
+        if (topicName.isPartitioned()) {

Review comment:
       No, "!" is not needed here. This method returns true if the topic name suffix is "-partition-x".
   ```java
   System.out.println(TopicName.get("persistent://public/default/massakam").isPartitioned());
   System.out.println(TopicName.get("persistent://public/default/massakam-partition-0").isPartitioned());
   ```
   ```sh
   false
   true
   ```

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,126 @@ protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean author
             });
         }
     }
+
+    protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName,
+            boolean authoritative, Boolean enabled) {
+        log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
+                topicName, subName);
+
+        if (enabled == null) {

Review comment:
       If boolean is used instead of Boolean, it seems that an error response will be returned unless the request body can be cast to boolean type.
   ```sh
   $ curl -i -X POST -d '' -H 'Content-Type: application/json' http://localhost:8080/admin/v2/persistent/public/default/massakam/subscription/sub1/replicatedSubscriptionStatus
   
   HTTP/1.1 400 Bad Request
   Date: Thu, 03 Jun 2021 06:56:00 GMT
   broker-address: localhost
   Content-Type: application/json
   Content-Length: 35
   Server: Jetty(9.4.39.v20210325)
   
   The request entity cannot be empty.
   
   $ curl -i -X POST -d '"foo"' -H 'Content-Type: application/json' http://localhost:8080/admin/v2/persistent/public/default/massakam/subscription/sub1/replicatedSubscriptionStatus
   
   HTTP/1.1 400 Bad Request
   Date: Thu, 03 Jun 2021 07:01:15 GMT
   broker-address: localhost
   Content-Type: text/plain
   Content-Length: 248
   Server: Jetty(9.4.39.v20210325)
   
   Cannot deserialize value of type `boolean` from String "foo": only "true"/"True"/"TRUE" or "false"/"False"/"FALSE" recognized
    at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 1]
   
   $ curl -i -X POST -d 0 -H 'Content-Type: application/json' http://localhost:8080/admin/v2/persistent/public/default/massakam/subscription/sub1/replicatedSubscriptionStatus
   
   HTTP/1.1 204 No Content
   Date: Thu, 03 Jun 2021 06:56:21 GMT
   broker-address: localhost
   Server: Jetty(9.4.39.v20210325)
   
   $ curl -i -X POST -d 1 -H 'Content-Type: application/json' http://localhost:8080/admin/v2/persistent/public/default/massakam/subscription/sub1/replicatedSubscriptionStatus
   
   HTTP/1.1 204 No Content
   Date: Thu, 03 Jun 2021 06:56:34 GMT
   broker-address: localhost
   Server: Jetty(9.4.39.v20210325)
   ```
   Since this behavior is not a problem, I changed the type of `enabled` to boolean.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,134 @@ protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean author
             });
         }
     }
+
+    protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName,
+            boolean authoritative, Boolean enabled) {
+        log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
+                topicName, subName);
+
+        if (enabled == null) {
+            asyncResponse.resume(new RestException(Status.BAD_REQUEST, "Boolean type request body is required"));
+            return;
+        }
+
+        // Reject the request if the topic is not persistent
+        if (!topicName.isPersistent()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on non-persistent topics"));
+            return;
+        }
+
+        // Reject the request if the topic is not global
+        if (!topicName.isGlobal()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on non-global topics"));
+            return;
+        }
+
+        // Permission to consume this topic is required
+        try {
+            validateTopicOperation(topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // Redirect the request to the peer-cluster if the local cluster is not included in the replication clusters
+        try {
+            validateGlobalNamespaceOwnership(namespaceName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // If the topic name is a partition name, no need to get partition topic metadata again
+        if (topicName.isPartitioned()) {
+            internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative,
+                    enabled);
+        } else {
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions > 0) {
+                    final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+
+                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                        TopicName topicNamePartition = topicName.getPartition(i);
+                        try {
+                            futures.add(pulsar().getAdminClient().topics().setReplicatedSubscriptionStatusAsync(
+                                    topicNamePartition.toString(), subName, enabled));
+                        } catch (Exception e) {
+                            log.warn("[{}] Failed to change replicated subscription status to {} - {} {}",
+                                    clientAppId(), enabled, topicNamePartition, subName, e);
+                            resumeAsyncResponseExceptionally(asyncResponse, e);
+                            return;
+                        }
+                    }
+
+                    FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                        if (exception != null) {
+                            Throwable t = exception.getCause();
+                            if (t instanceof NotFoundException) {
+                                asyncResponse
+                                        .resume(new RestException(Status.NOT_FOUND, "Topic or subscription not found"));
+                                return null;
+                            } else if (t instanceof PreconditionFailedException) {
+                                asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
+                                        "Cannot enable/disable replicated subscriptions on non-global topics"));
+                                return null;
+                            } else {
+                                log.warn("[{}] Failed to change replicated subscription status to {} - {} {}",
+                                        clientAppId(), enabled, topicName, subName, t);
+                                asyncResponse.resume(new RestException(t));
+                                return null;
+                            }
+                        }
+
+                        asyncResponse.resume(Response.noContent().build());
+                        return null;
+                    });
+                } else {
+                    internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative,
+                            enabled);
+                }
+            }).exceptionally(ex -> {
+                log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
+                        topicName, subName, ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
+        }
+    }
+
+    private void internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse asyncResponse,
+            String subName, boolean authoritative, Boolean enabled) {

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam removed a comment on pull request #10790: [broker] Add REST API to enable or disable replicated subscriptions

Posted by GitBox <gi...@apache.org>.
massakam removed a comment on pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#issuecomment-857886734


   PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10790: [broker] Add REST API to enable or disable replicated subscriptions

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#discussion_r645689015



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
##########
@@ -177,12 +178,28 @@ public boolean isReplicated() {
         return replicatedSubscriptionSnapshotCache != null;
     }
 
-    void setReplicated(boolean replicated) {
-        this.replicatedSubscriptionSnapshotCache = replicated
-                ? new ReplicatedSubscriptionSnapshotCache(subName,
-                        topic.getBrokerService().pulsar().getConfiguration()
-                                .getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription())
-                : null;
+    public void setReplicated(boolean replicated) {
+        ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig();
+
+        if (!replicated || !config.isEnableReplicatedSubscriptions()) {
+            this.replicatedSubscriptionSnapshotCache = null;
+        } else if (this.replicatedSubscriptionSnapshotCache == null) {
+            this.replicatedSubscriptionSnapshotCache = new ReplicatedSubscriptionSnapshotCache(subName,
+                    config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription());
+        }
+
+        if (this.cursor != null) {
+            Map<String, Long> properties = this.cursor.getProperties();
+            try {
+                if (replicated) {
+                    properties.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
+                } else {
+                    properties.remove(REPLICATED_SUBSCRIPTION_PROPERTY);
+                }
+            } catch (UnsupportedOperationException e) {

Review comment:
       Even if `lastMarkDeleteEntry` is not null, `lastMarkDeleteEntry.properties` can be `Collections.emptyMap()`, an immutable map object. `UnsupportedOperationException` is thrown when trying to add a property to this map. And I don't think there is a way to check if the map is immutable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #10790: [broker] Add REST API to enable or disable replicated subscriptions

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#discussion_r644498422



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,126 @@ protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean author
             });
         }
     }
+
+    protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName,
+            boolean authoritative, Boolean enabled) {
+        log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
+                topicName, subName);
+
+        if (enabled == null) {

Review comment:
       required = true, it is impossible to be null here, can we use boolean?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,134 @@ protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean author
             });
         }
     }
+
+    protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName,
+            boolean authoritative, Boolean enabled) {
+        log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
+                topicName, subName);
+
+        if (enabled == null) {
+            asyncResponse.resume(new RestException(Status.BAD_REQUEST, "Boolean type request body is required"));
+            return;
+        }
+
+        // Reject the request if the topic is not persistent
+        if (!topicName.isPersistent()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on non-persistent topics"));
+            return;
+        }
+
+        // Reject the request if the topic is not global
+        if (!topicName.isGlobal()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on non-global topics"));
+            return;
+        }
+
+        // Permission to consume this topic is required
+        try {
+            validateTopicOperation(topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // Redirect the request to the peer-cluster if the local cluster is not included in the replication clusters
+        try {
+            validateGlobalNamespaceOwnership(namespaceName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // If the topic name is a partition name, no need to get partition topic metadata again
+        if (topicName.isPartitioned()) {
+            internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative,
+                    enabled);
+        } else {
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions > 0) {
+                    final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+
+                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                        TopicName topicNamePartition = topicName.getPartition(i);
+                        try {
+                            futures.add(pulsar().getAdminClient().topics().setReplicatedSubscriptionStatusAsync(
+                                    topicNamePartition.toString(), subName, enabled));
+                        } catch (Exception e) {
+                            log.warn("[{}] Failed to change replicated subscription status to {} - {} {}",
+                                    clientAppId(), enabled, topicNamePartition, subName, e);
+                            resumeAsyncResponseExceptionally(asyncResponse, e);
+                            return;
+                        }
+                    }
+
+                    FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                        if (exception != null) {
+                            Throwable t = exception.getCause();
+                            if (t instanceof NotFoundException) {
+                                asyncResponse
+                                        .resume(new RestException(Status.NOT_FOUND, "Topic or subscription not found"));
+                                return null;
+                            } else if (t instanceof PreconditionFailedException) {
+                                asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
+                                        "Cannot enable/disable replicated subscriptions on non-global topics"));
+                                return null;
+                            } else {
+                                log.warn("[{}] Failed to change replicated subscription status to {} - {} {}",
+                                        clientAppId(), enabled, topicName, subName, t);
+                                asyncResponse.resume(new RestException(t));
+                                return null;
+                            }
+                        }
+
+                        asyncResponse.resume(Response.noContent().build());
+                        return null;
+                    });
+                } else {
+                    internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative,
+                            enabled);
+                }
+            }).exceptionally(ex -> {
+                log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
+                        topicName, subName, ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
+        }
+    }
+
+    private void internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse asyncResponse,
+            String subName, boolean authoritative, Boolean enabled) {

Review comment:
       Should we use boolean

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,134 @@ protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean author
             });
         }
     }
+
+    protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName,
+            boolean authoritative, Boolean enabled) {
+        log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
+                topicName, subName);
+
+        if (enabled == null) {
+            asyncResponse.resume(new RestException(Status.BAD_REQUEST, "Boolean type request body is required"));
+            return;
+        }
+
+        // Reject the request if the topic is not persistent
+        if (!topicName.isPersistent()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on non-persistent topics"));
+            return;
+        }
+
+        // Reject the request if the topic is not global
+        if (!topicName.isGlobal()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on non-global topics"));
+            return;
+        }
+
+        // Permission to consume this topic is required
+        try {
+            validateTopicOperation(topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // Redirect the request to the peer-cluster if the local cluster is not included in the replication clusters
+        try {
+            validateGlobalNamespaceOwnership(namespaceName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // If the topic name is a partition name, no need to get partition topic metadata again
+        if (topicName.isPartitioned()) {

Review comment:
       Is there missing one `!` 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10790: [broker] Add REST API to enable or disable replicated subscriptions

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#discussion_r645684070



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
##########
@@ -177,12 +178,28 @@ public boolean isReplicated() {
         return replicatedSubscriptionSnapshotCache != null;
     }
 
-    void setReplicated(boolean replicated) {
-        this.replicatedSubscriptionSnapshotCache = replicated
-                ? new ReplicatedSubscriptionSnapshotCache(subName,
-                        topic.getBrokerService().pulsar().getConfiguration()
-                                .getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription())
-                : null;
+    public void setReplicated(boolean replicated) {
+        ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig();
+
+        if (!replicated || !config.isEnableReplicatedSubscriptions()) {
+            this.replicatedSubscriptionSnapshotCache = null;
+        } else if (this.replicatedSubscriptionSnapshotCache == null) {
+            this.replicatedSubscriptionSnapshotCache = new ReplicatedSubscriptionSnapshotCache(subName,
+                    config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription());
+        }
+
+        if (this.cursor != null) {
+            Map<String, Long> properties = this.cursor.getProperties();

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #10790: [broker] Add REST API to enable or disable replicated subscriptions

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#discussion_r644498422



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,126 @@ protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean author
             });
         }
     }
+
+    protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName,
+            boolean authoritative, Boolean enabled) {
+        log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
+                topicName, subName);
+
+        if (enabled == null) {

Review comment:
       required = true, it is impossible to be null here, can we use boolean?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,134 @@ protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean author
             });
         }
     }
+
+    protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName,
+            boolean authoritative, Boolean enabled) {
+        log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
+                topicName, subName);
+
+        if (enabled == null) {
+            asyncResponse.resume(new RestException(Status.BAD_REQUEST, "Boolean type request body is required"));
+            return;
+        }
+
+        // Reject the request if the topic is not persistent
+        if (!topicName.isPersistent()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on non-persistent topics"));
+            return;
+        }
+
+        // Reject the request if the topic is not global
+        if (!topicName.isGlobal()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on non-global topics"));
+            return;
+        }
+
+        // Permission to consume this topic is required
+        try {
+            validateTopicOperation(topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // Redirect the request to the peer-cluster if the local cluster is not included in the replication clusters
+        try {
+            validateGlobalNamespaceOwnership(namespaceName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // If the topic name is a partition name, no need to get partition topic metadata again
+        if (topicName.isPartitioned()) {
+            internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative,
+                    enabled);
+        } else {
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions > 0) {
+                    final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+
+                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                        TopicName topicNamePartition = topicName.getPartition(i);
+                        try {
+                            futures.add(pulsar().getAdminClient().topics().setReplicatedSubscriptionStatusAsync(
+                                    topicNamePartition.toString(), subName, enabled));
+                        } catch (Exception e) {
+                            log.warn("[{}] Failed to change replicated subscription status to {} - {} {}",
+                                    clientAppId(), enabled, topicNamePartition, subName, e);
+                            resumeAsyncResponseExceptionally(asyncResponse, e);
+                            return;
+                        }
+                    }
+
+                    FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                        if (exception != null) {
+                            Throwable t = exception.getCause();
+                            if (t instanceof NotFoundException) {
+                                asyncResponse
+                                        .resume(new RestException(Status.NOT_FOUND, "Topic or subscription not found"));
+                                return null;
+                            } else if (t instanceof PreconditionFailedException) {
+                                asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
+                                        "Cannot enable/disable replicated subscriptions on non-global topics"));
+                                return null;
+                            } else {
+                                log.warn("[{}] Failed to change replicated subscription status to {} - {} {}",
+                                        clientAppId(), enabled, topicName, subName, t);
+                                asyncResponse.resume(new RestException(t));
+                                return null;
+                            }
+                        }
+
+                        asyncResponse.resume(Response.noContent().build());
+                        return null;
+                    });
+                } else {
+                    internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative,
+                            enabled);
+                }
+            }).exceptionally(ex -> {
+                log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
+                        topicName, subName, ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
+        }
+    }
+
+    private void internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse asyncResponse,
+            String subName, boolean authoritative, Boolean enabled) {

Review comment:
       Should we use boolean

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,134 @@ protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean author
             });
         }
     }
+
+    protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName,
+            boolean authoritative, Boolean enabled) {
+        log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
+                topicName, subName);
+
+        if (enabled == null) {
+            asyncResponse.resume(new RestException(Status.BAD_REQUEST, "Boolean type request body is required"));
+            return;
+        }
+
+        // Reject the request if the topic is not persistent
+        if (!topicName.isPersistent()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on non-persistent topics"));
+            return;
+        }
+
+        // Reject the request if the topic is not global
+        if (!topicName.isGlobal()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on non-global topics"));
+            return;
+        }
+
+        // Permission to consume this topic is required
+        try {
+            validateTopicOperation(topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // Redirect the request to the peer-cluster if the local cluster is not included in the replication clusters
+        try {
+            validateGlobalNamespaceOwnership(namespaceName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // If the topic name is a partition name, no need to get partition topic metadata again
+        if (topicName.isPartitioned()) {

Review comment:
       Is there missing one `!` 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10790: [broker] Add REST API to enable or disable replicated subscriptions

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#discussion_r645342826



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
##########
@@ -230,6 +233,94 @@ public void testReplicationSnapshotStopWhenNoTraffic() throws Exception {
         assertNotEquals(rsc2.getLastCompletedSnapshotId().get(), snapshot2);
     }
 
+    @Test(timeOut = 30000)
+    public void testReplicatedSubscriptionRestApi1() throws Exception {
+        final String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
+        final String topicName = "persistent://" + namespace + "/topic-rest-api1";
+        final String subName = "sub";
+
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
+
+        @Cleanup
+        final PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
+                .statsInterval(0, TimeUnit.SECONDS).build();
+
+        // Create subscription in r1
+        createReplicatedSubscription(client1, topicName, subName, true);
+
+        @Cleanup
+        final PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString())
+                .statsInterval(0, TimeUnit.SECONDS).build();
+
+        // Create subscription in r2
+        createReplicatedSubscription(client2, topicName, subName, true);
+
+        TopicStats stats = admin1.topics().getStats(topicName);
+        assertTrue(stats.subscriptions.get(subName).isReplicated);
+
+        // Disable replicated subscription in r1
+        admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, false);
+        stats = admin1.topics().getStats(topicName);
+        assertFalse(stats.subscriptions.get(subName).isReplicated);
+        stats = admin2.topics().getStats(topicName);
+        assertTrue(stats.subscriptions.get(subName).isReplicated);
+
+        // Disable replicated subscription in r2
+        admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, false);
+        stats = admin2.topics().getStats(topicName);
+        assertFalse(stats.subscriptions.get(subName).isReplicated);
+
+        // Unload topic in r1
+        admin1.topics().unload(topicName);
+        stats = admin1.topics().getStats(topicName);
+        assertFalse(stats.subscriptions.get(subName).isReplicated);
+
+        // Enable replicated subscription in r1
+        admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, true);
+        stats = admin1.topics().getStats(topicName);
+        assertTrue(stats.subscriptions.get(subName).isReplicated);
+        stats = admin2.topics().getStats(topicName);
+        assertFalse(stats.subscriptions.get(subName).isReplicated);
+    }
+
+    @Test(timeOut = 30000)
+    public void testReplicatedSubscriptionRestApi2() throws Exception {
+        final String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
+        final String topicName = "persistent://" + namespace + "/topic-rest-api2";
+        final String subName = "sub";
+
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1"));
+        admin1.topics().createPartitionedTopic(topicName, 2);
+
+        @Cleanup
+        final PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
+                .statsInterval(0, TimeUnit.SECONDS).build();
+
+        // Create subscription in r1
+        createReplicatedSubscription(client1, topicName, subName, true);
+
+        PartitionedTopicStats partitionedStats = admin1.topics().getPartitionedStats(topicName, true);
+        for (TopicStats stats : partitionedStats.partitions.values()) {
+            assertTrue(stats.subscriptions.get(subName).isReplicated);
+        }
+
+        // Disable replicated subscription in r1
+        admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, false);
+        partitionedStats = admin1.topics().getPartitionedStats(topicName, true);
+        for (TopicStats stats : partitionedStats.partitions.values()) {
+            assertFalse(stats.subscriptions.get(subName).isReplicated);
+        }
+
+        // Enable replicated subscription in r1
+        admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, true);
+        partitionedStats = admin1.topics().getPartitionedStats(topicName, true);
+        for (TopicStats stats : partitionedStats.partitions.values()) {
+            assertTrue(stats.subscriptions.get(subName).isReplicated);
+        }

Review comment:
       We are not testing that actually the replication is starting to work or it is stopping to work.
   We are only testing on the value of the internal status.
   
   Can we some end-to-end test ?
   otherwise it would be possible that it looks like replication is enabled but actually it is not working

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
##########
@@ -177,12 +178,28 @@ public boolean isReplicated() {
         return replicatedSubscriptionSnapshotCache != null;
     }
 
-    void setReplicated(boolean replicated) {
-        this.replicatedSubscriptionSnapshotCache = replicated
-                ? new ReplicatedSubscriptionSnapshotCache(subName,
-                        topic.getBrokerService().pulsar().getConfiguration()
-                                .getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription())
-                : null;
+    public void setReplicated(boolean replicated) {
+        ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig();
+
+        if (!replicated || !config.isEnableReplicatedSubscriptions()) {
+            this.replicatedSubscriptionSnapshotCache = null;
+        } else if (this.replicatedSubscriptionSnapshotCache == null) {
+            this.replicatedSubscriptionSnapshotCache = new ReplicatedSubscriptionSnapshotCache(subName,
+                    config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription());
+        }
+
+        if (this.cursor != null) {
+            Map<String, Long> properties = this.cursor.getProperties();

Review comment:
       we should add a method in cursor to set this property.
   
   because altering a Map that is returned by a getter method may result in an unpredictable behaviour (it may be a defensive copy)

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
##########
@@ -177,12 +178,28 @@ public boolean isReplicated() {
         return replicatedSubscriptionSnapshotCache != null;
     }
 
-    void setReplicated(boolean replicated) {
-        this.replicatedSubscriptionSnapshotCache = replicated
-                ? new ReplicatedSubscriptionSnapshotCache(subName,
-                        topic.getBrokerService().pulsar().getConfiguration()
-                                .getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription())
-                : null;
+    public void setReplicated(boolean replicated) {
+        ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig();
+
+        if (!replicated || !config.isEnableReplicatedSubscriptions()) {
+            this.replicatedSubscriptionSnapshotCache = null;
+        } else if (this.replicatedSubscriptionSnapshotCache == null) {
+            this.replicatedSubscriptionSnapshotCache = new ReplicatedSubscriptionSnapshotCache(subName,
+                    config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription());
+        }
+
+        if (this.cursor != null) {
+            Map<String, Long> properties = this.cursor.getProperties();
+            try {
+                if (replicated) {
+                    properties.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
+                } else {
+                    properties.remove(REPLICATED_SUBSCRIPTION_PROPERTY);
+                }
+            } catch (UnsupportedOperationException e) {

Review comment:
       this catch block looks like a code smell.
   UnsupportedOperationException is very generic RuntimeException in Java.
   
   we should have a safer way to check for this case
   `ManagedCursorImpl#lastMarkDeleteEntry has not been initialized yet`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie merged pull request #10790: [broker] Add REST API to enable or disable replicated subscriptions

Posted by GitBox <gi...@apache.org>.
sijie merged pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10790: [broker] Add REST API to enable or disable replicated subscriptions

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#discussion_r644540388



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,134 @@ protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean author
             });
         }
     }
+
+    protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName,
+            boolean authoritative, Boolean enabled) {
+        log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
+                topicName, subName);
+
+        if (enabled == null) {
+            asyncResponse.resume(new RestException(Status.BAD_REQUEST, "Boolean type request body is required"));
+            return;
+        }
+
+        // Reject the request if the topic is not persistent
+        if (!topicName.isPersistent()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on non-persistent topics"));
+            return;
+        }
+
+        // Reject the request if the topic is not global
+        if (!topicName.isGlobal()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on non-global topics"));
+            return;
+        }
+
+        // Permission to consume this topic is required
+        try {
+            validateTopicOperation(topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // Redirect the request to the peer-cluster if the local cluster is not included in the replication clusters
+        try {
+            validateGlobalNamespaceOwnership(namespaceName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // If the topic name is a partition name, no need to get partition topic metadata again
+        if (topicName.isPartitioned()) {
+            internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative,
+                    enabled);
+        } else {
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions > 0) {
+                    final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+
+                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                        TopicName topicNamePartition = topicName.getPartition(i);
+                        try {
+                            futures.add(pulsar().getAdminClient().topics().setReplicatedSubscriptionStatusAsync(
+                                    topicNamePartition.toString(), subName, enabled));
+                        } catch (Exception e) {
+                            log.warn("[{}] Failed to change replicated subscription status to {} - {} {}",
+                                    clientAppId(), enabled, topicNamePartition, subName, e);
+                            resumeAsyncResponseExceptionally(asyncResponse, e);
+                            return;
+                        }
+                    }
+
+                    FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                        if (exception != null) {
+                            Throwable t = exception.getCause();
+                            if (t instanceof NotFoundException) {
+                                asyncResponse
+                                        .resume(new RestException(Status.NOT_FOUND, "Topic or subscription not found"));
+                                return null;
+                            } else if (t instanceof PreconditionFailedException) {
+                                asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
+                                        "Cannot enable/disable replicated subscriptions on non-global topics"));
+                                return null;
+                            } else {
+                                log.warn("[{}] Failed to change replicated subscription status to {} - {} {}",
+                                        clientAppId(), enabled, topicName, subName, t);
+                                asyncResponse.resume(new RestException(t));
+                                return null;
+                            }
+                        }
+
+                        asyncResponse.resume(Response.noContent().build());
+                        return null;
+                    });
+                } else {
+                    internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative,
+                            enabled);
+                }
+            }).exceptionally(ex -> {
+                log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
+                        topicName, subName, ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
+        }
+    }
+
+    private void internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse asyncResponse,
+            String subName, boolean authoritative, Boolean enabled) {

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10790: [broker] Add REST API to enable or disable replicated subscriptions

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#discussion_r644540164



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,126 @@ protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean author
             });
         }
     }
+
+    protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName,
+            boolean authoritative, Boolean enabled) {
+        log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
+                topicName, subName);
+
+        if (enabled == null) {

Review comment:
       If boolean is used instead of Boolean, it seems that an error response will be returned unless the request body can be cast to boolean type.
   ```sh
   $ curl -i -X POST -d '' -H 'Content-Type: application/json' http://localhost:8080/admin/v2/persistent/public/default/massakam/subscription/sub1/replicatedSubscriptionStatus
   
   HTTP/1.1 400 Bad Request
   Date: Thu, 03 Jun 2021 06:56:00 GMT
   broker-address: localhost
   Content-Type: application/json
   Content-Length: 35
   Server: Jetty(9.4.39.v20210325)
   
   The request entity cannot be empty.
   
   $ curl -i -X POST -d '"foo"' -H 'Content-Type: application/json' http://localhost:8080/admin/v2/persistent/public/default/massakam/subscription/sub1/replicatedSubscriptionStatus
   
   HTTP/1.1 400 Bad Request
   Date: Thu, 03 Jun 2021 07:01:15 GMT
   broker-address: localhost
   Content-Type: text/plain
   Content-Length: 248
   Server: Jetty(9.4.39.v20210325)
   
   Cannot deserialize value of type `boolean` from String "foo": only "true"/"True"/"TRUE" or "false"/"False"/"FALSE" recognized
    at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 1]
   
   $ curl -i -X POST -d 0 -H 'Content-Type: application/json' http://localhost:8080/admin/v2/persistent/public/default/massakam/subscription/sub1/replicatedSubscriptionStatus
   
   HTTP/1.1 204 No Content
   Date: Thu, 03 Jun 2021 06:56:21 GMT
   broker-address: localhost
   Server: Jetty(9.4.39.v20210325)
   
   $ curl -i -X POST -d 1 -H 'Content-Type: application/json' http://localhost:8080/admin/v2/persistent/public/default/massakam/subscription/sub1/replicatedSubscriptionStatus
   
   HTTP/1.1 204 No Content
   Date: Thu, 03 Jun 2021 06:56:34 GMT
   broker-address: localhost
   Server: Jetty(9.4.39.v20210325)
   ```
   Since this behavior is not a problem, I changed the type of `enabled` to boolean.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10790: [broker] Add REST API to enable or disable replicated subscriptions

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#discussion_r648138748



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
##########
@@ -285,6 +285,37 @@ public MarkDeleteEntry(PositionImpl newPosition, Map<String, Long> properties,
         return lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties : Collections.emptyMap();
     }
 
+    @Override
+    public boolean putProperty(String key, Long value) {
+        if (lastMarkDeleteEntry != null) {
+            MarkDeleteEntry currentLastMarkDeleteEntry = lastMarkDeleteEntry;
+            Map<String, Long> properties = currentLastMarkDeleteEntry.properties;
+            if (properties == null || properties.isEmpty()) {

Review comment:
       do we have concurrency issues here ?
   there is no lock or other synchronisation primitive that prevents two calls to access this variable, the same happens for  `lastMarkDeleteEntry`

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
##########
@@ -285,6 +285,37 @@ public MarkDeleteEntry(PositionImpl newPosition, Map<String, Long> properties,
         return lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties : Collections.emptyMap();
     }
 
+    @Override
+    public boolean putProperty(String key, Long value) {
+        if (lastMarkDeleteEntry != null) {
+            MarkDeleteEntry currentLastMarkDeleteEntry = lastMarkDeleteEntry;
+            Map<String, Long> properties = currentLastMarkDeleteEntry.properties;
+            if (properties == null || properties.isEmpty()) {
+                Map<String, Long> newProperties = Maps.newHashMap();
+                newProperties.put(key, value);
+                lastMarkDeleteEntry = new MarkDeleteEntry(currentLastMarkDeleteEntry.newPosition, newProperties,
+                        currentLastMarkDeleteEntry.callback, currentLastMarkDeleteEntry.ctx);
+                lastMarkDeleteEntry.callbackGroup = currentLastMarkDeleteEntry.callbackGroup;
+            } else {
+                properties.put(key, value);
+            }
+            return true;
+        }
+        return false;

Review comment:
       I do not understand this part of the story.
   Probably I miss some context.
   
   if `lastMarkDeleteEntry` is null, then the command in finally ignored, we are not storing anyway the information requested by the user.
   
   if this is correct, then I believe that we must fail the API call, because otherwise the user will think that the change is applied but this is not true.
   
   Can you please clarify ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10790: [broker] Add REST API to enable or disable replicated subscriptions

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#discussion_r648518432



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
##########
@@ -285,6 +285,37 @@ public MarkDeleteEntry(PositionImpl newPosition, Map<String, Long> properties,
         return lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties : Collections.emptyMap();
     }
 
+    @Override
+    public boolean putProperty(String key, Long value) {
+        if (lastMarkDeleteEntry != null) {
+            MarkDeleteEntry currentLastMarkDeleteEntry = lastMarkDeleteEntry;
+            Map<String, Long> properties = currentLastMarkDeleteEntry.properties;
+            if (properties == null || properties.isEmpty()) {

Review comment:
       Fixed this method to update `lastMarkDeleteEntry` using `AtomicReferenceFieldUpdater`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on pull request #10790: [broker] Add REST API to enable or disable replicated subscriptions

Posted by GitBox <gi...@apache.org>.
massakam commented on pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#issuecomment-857435724


   @eolivelli PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10790: [broker] Add REST API to enable or disable replicated subscriptions

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#discussion_r648015809



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
##########
@@ -177,12 +178,28 @@ public boolean isReplicated() {
         return replicatedSubscriptionSnapshotCache != null;
     }
 
-    void setReplicated(boolean replicated) {
-        this.replicatedSubscriptionSnapshotCache = replicated
-                ? new ReplicatedSubscriptionSnapshotCache(subName,
-                        topic.getBrokerService().pulsar().getConfiguration()
-                                .getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription())
-                : null;
+    public void setReplicated(boolean replicated) {
+        ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig();
+
+        if (!replicated || !config.isEnableReplicatedSubscriptions()) {
+            this.replicatedSubscriptionSnapshotCache = null;
+        } else if (this.replicatedSubscriptionSnapshotCache == null) {
+            this.replicatedSubscriptionSnapshotCache = new ReplicatedSubscriptionSnapshotCache(subName,
+                    config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription());
+        }
+
+        if (this.cursor != null) {
+            Map<String, Long> properties = this.cursor.getProperties();
+            try {
+                if (replicated) {
+                    properties.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
+                } else {
+                    properties.remove(REPLICATED_SUBSCRIPTION_PROPERTY);
+                }
+            } catch (UnsupportedOperationException e) {

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10790: [broker] Add REST API to enable or disable replicated subscriptions

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#discussion_r648518432



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
##########
@@ -285,6 +285,37 @@ public MarkDeleteEntry(PositionImpl newPosition, Map<String, Long> properties,
         return lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties : Collections.emptyMap();
     }
 
+    @Override
+    public boolean putProperty(String key, Long value) {
+        if (lastMarkDeleteEntry != null) {
+            MarkDeleteEntry currentLastMarkDeleteEntry = lastMarkDeleteEntry;
+            Map<String, Long> properties = currentLastMarkDeleteEntry.properties;
+            if (properties == null || properties.isEmpty()) {

Review comment:
       Fixed the method to update `lastMarkDeleteEntry` using `AtomicReferenceFieldUpdater`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10790: [broker] Add REST API to enable or disable replicated subscriptions

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#discussion_r648516867



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
##########
@@ -285,6 +285,37 @@ public MarkDeleteEntry(PositionImpl newPosition, Map<String, Long> properties,
         return lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties : Collections.emptyMap();
     }
 
+    @Override
+    public boolean putProperty(String key, Long value) {
+        if (lastMarkDeleteEntry != null) {
+            MarkDeleteEntry currentLastMarkDeleteEntry = lastMarkDeleteEntry;
+            Map<String, Long> properties = currentLastMarkDeleteEntry.properties;
+            if (properties == null || properties.isEmpty()) {
+                Map<String, Long> newProperties = Maps.newHashMap();
+                newProperties.put(key, value);
+                lastMarkDeleteEntry = new MarkDeleteEntry(currentLastMarkDeleteEntry.newPosition, newProperties,
+                        currentLastMarkDeleteEntry.callback, currentLastMarkDeleteEntry.ctx);
+                lastMarkDeleteEntry.callbackGroup = currentLastMarkDeleteEntry.callbackGroup;
+            } else {
+                properties.put(key, value);
+            }
+            return true;
+        }
+        return false;

Review comment:
       Fixed to return an error response when `lastMarkDeleteEntry` is null. However, if an instance of `PersistentSubscription` exists, the cursor should have already been initialized and `lastMarkDeleteEntry` should be set to a non-null value. So the REST API won't actually return that error.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10790: [broker] Add REST API to enable or disable replicated subscriptions

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#discussion_r645342826



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
##########
@@ -230,6 +233,94 @@ public void testReplicationSnapshotStopWhenNoTraffic() throws Exception {
         assertNotEquals(rsc2.getLastCompletedSnapshotId().get(), snapshot2);
     }
 
+    @Test(timeOut = 30000)
+    public void testReplicatedSubscriptionRestApi1() throws Exception {
+        final String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
+        final String topicName = "persistent://" + namespace + "/topic-rest-api1";
+        final String subName = "sub";
+
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
+
+        @Cleanup
+        final PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
+                .statsInterval(0, TimeUnit.SECONDS).build();
+
+        // Create subscription in r1
+        createReplicatedSubscription(client1, topicName, subName, true);
+
+        @Cleanup
+        final PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString())
+                .statsInterval(0, TimeUnit.SECONDS).build();
+
+        // Create subscription in r2
+        createReplicatedSubscription(client2, topicName, subName, true);
+
+        TopicStats stats = admin1.topics().getStats(topicName);
+        assertTrue(stats.subscriptions.get(subName).isReplicated);
+
+        // Disable replicated subscription in r1
+        admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, false);
+        stats = admin1.topics().getStats(topicName);
+        assertFalse(stats.subscriptions.get(subName).isReplicated);
+        stats = admin2.topics().getStats(topicName);
+        assertTrue(stats.subscriptions.get(subName).isReplicated);
+
+        // Disable replicated subscription in r2
+        admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, false);
+        stats = admin2.topics().getStats(topicName);
+        assertFalse(stats.subscriptions.get(subName).isReplicated);
+
+        // Unload topic in r1
+        admin1.topics().unload(topicName);
+        stats = admin1.topics().getStats(topicName);
+        assertFalse(stats.subscriptions.get(subName).isReplicated);
+
+        // Enable replicated subscription in r1
+        admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, true);
+        stats = admin1.topics().getStats(topicName);
+        assertTrue(stats.subscriptions.get(subName).isReplicated);
+        stats = admin2.topics().getStats(topicName);
+        assertFalse(stats.subscriptions.get(subName).isReplicated);
+    }
+
+    @Test(timeOut = 30000)
+    public void testReplicatedSubscriptionRestApi2() throws Exception {
+        final String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
+        final String topicName = "persistent://" + namespace + "/topic-rest-api2";
+        final String subName = "sub";
+
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1"));
+        admin1.topics().createPartitionedTopic(topicName, 2);
+
+        @Cleanup
+        final PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
+                .statsInterval(0, TimeUnit.SECONDS).build();
+
+        // Create subscription in r1
+        createReplicatedSubscription(client1, topicName, subName, true);
+
+        PartitionedTopicStats partitionedStats = admin1.topics().getPartitionedStats(topicName, true);
+        for (TopicStats stats : partitionedStats.partitions.values()) {
+            assertTrue(stats.subscriptions.get(subName).isReplicated);
+        }
+
+        // Disable replicated subscription in r1
+        admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, false);
+        partitionedStats = admin1.topics().getPartitionedStats(topicName, true);
+        for (TopicStats stats : partitionedStats.partitions.values()) {
+            assertFalse(stats.subscriptions.get(subName).isReplicated);
+        }
+
+        // Enable replicated subscription in r1
+        admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, true);
+        partitionedStats = admin1.topics().getPartitionedStats(topicName, true);
+        for (TopicStats stats : partitionedStats.partitions.values()) {
+            assertTrue(stats.subscriptions.get(subName).isReplicated);
+        }

Review comment:
       We are not testing that actually the replication is starting to work or it is stopping to work.
   We are only testing on the value of the internal status.
   
   Can we some end-to-end test ?
   otherwise it would be possible that it looks like replication is enabled but actually it is not working

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
##########
@@ -177,12 +178,28 @@ public boolean isReplicated() {
         return replicatedSubscriptionSnapshotCache != null;
     }
 
-    void setReplicated(boolean replicated) {
-        this.replicatedSubscriptionSnapshotCache = replicated
-                ? new ReplicatedSubscriptionSnapshotCache(subName,
-                        topic.getBrokerService().pulsar().getConfiguration()
-                                .getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription())
-                : null;
+    public void setReplicated(boolean replicated) {
+        ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig();
+
+        if (!replicated || !config.isEnableReplicatedSubscriptions()) {
+            this.replicatedSubscriptionSnapshotCache = null;
+        } else if (this.replicatedSubscriptionSnapshotCache == null) {
+            this.replicatedSubscriptionSnapshotCache = new ReplicatedSubscriptionSnapshotCache(subName,
+                    config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription());
+        }
+
+        if (this.cursor != null) {
+            Map<String, Long> properties = this.cursor.getProperties();

Review comment:
       we should add a method in cursor to set this property.
   
   because altering a Map that is returned by a getter method may result in an unpredictable behaviour (it may be a defensive copy)

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
##########
@@ -177,12 +178,28 @@ public boolean isReplicated() {
         return replicatedSubscriptionSnapshotCache != null;
     }
 
-    void setReplicated(boolean replicated) {
-        this.replicatedSubscriptionSnapshotCache = replicated
-                ? new ReplicatedSubscriptionSnapshotCache(subName,
-                        topic.getBrokerService().pulsar().getConfiguration()
-                                .getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription())
-                : null;
+    public void setReplicated(boolean replicated) {
+        ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig();
+
+        if (!replicated || !config.isEnableReplicatedSubscriptions()) {
+            this.replicatedSubscriptionSnapshotCache = null;
+        } else if (this.replicatedSubscriptionSnapshotCache == null) {
+            this.replicatedSubscriptionSnapshotCache = new ReplicatedSubscriptionSnapshotCache(subName,
+                    config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription());
+        }
+
+        if (this.cursor != null) {
+            Map<String, Long> properties = this.cursor.getProperties();
+            try {
+                if (replicated) {
+                    properties.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
+                } else {
+                    properties.remove(REPLICATED_SUBSCRIPTION_PROPERTY);
+                }
+            } catch (UnsupportedOperationException e) {

Review comment:
       this catch block looks like a code smell.
   UnsupportedOperationException is very generic RuntimeException in Java.
   
   we should have a safer way to check for this case
   `ManagedCursorImpl#lastMarkDeleteEntry has not been initialized yet`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #10790: [broker] Add REST API to enable or disable replicated subscriptions

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#issuecomment-863783708


   @codelipenghui @MarvinCai Can you review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10790: [broker] Add REST API to enable or disable replicated subscriptions

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#discussion_r644513191



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,126 @@ protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean author
             });
         }
     }
+
+    protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName,
+            boolean authoritative, Boolean enabled) {
+        log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
+                topicName, subName);
+
+        if (enabled == null) {

Review comment:
       `enabled` can be null if the request body is empty.
   ```sh
   $ curl -i \
     -X POST \
     -d '' \
     -H 'Content-Type: application/json' \
     http://localhost:8080/admin/v2/persistent/public/default/massakam/subscription/sub/replicatedSubscriptionStatus
   
   HTTP/1.1 400 Bad Request
   Date: Thu, 03 Jun 2021 06:04:10 GMT
   broker-address: localhost
   Content-Type: application/json
   Content-Length: 50
   Server: Jetty(9.4.39.v20210325)
   
   {"reason":"Boolean type request body is required"}
   ```
   I think the "required = true" specification is just an option used when generating Swagger documentation.
   https://docs.swagger.io/swagger-core/v1.5.0/apidocs/io/swagger/annotations/ApiParam.html




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10790: [broker] Add REST API to enable or disable replicated subscriptions

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#discussion_r645684356



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
##########
@@ -230,6 +233,94 @@ public void testReplicationSnapshotStopWhenNoTraffic() throws Exception {
         assertNotEquals(rsc2.getLastCompletedSnapshotId().get(), snapshot2);
     }
 
+    @Test(timeOut = 30000)
+    public void testReplicatedSubscriptionRestApi1() throws Exception {
+        final String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
+        final String topicName = "persistent://" + namespace + "/topic-rest-api1";
+        final String subName = "sub";
+
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
+
+        @Cleanup
+        final PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
+                .statsInterval(0, TimeUnit.SECONDS).build();
+
+        // Create subscription in r1
+        createReplicatedSubscription(client1, topicName, subName, true);
+
+        @Cleanup
+        final PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString())
+                .statsInterval(0, TimeUnit.SECONDS).build();
+
+        // Create subscription in r2
+        createReplicatedSubscription(client2, topicName, subName, true);
+
+        TopicStats stats = admin1.topics().getStats(topicName);
+        assertTrue(stats.subscriptions.get(subName).isReplicated);
+
+        // Disable replicated subscription in r1
+        admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, false);
+        stats = admin1.topics().getStats(topicName);
+        assertFalse(stats.subscriptions.get(subName).isReplicated);
+        stats = admin2.topics().getStats(topicName);
+        assertTrue(stats.subscriptions.get(subName).isReplicated);
+
+        // Disable replicated subscription in r2
+        admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, false);
+        stats = admin2.topics().getStats(topicName);
+        assertFalse(stats.subscriptions.get(subName).isReplicated);
+
+        // Unload topic in r1
+        admin1.topics().unload(topicName);
+        stats = admin1.topics().getStats(topicName);
+        assertFalse(stats.subscriptions.get(subName).isReplicated);
+
+        // Enable replicated subscription in r1
+        admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, true);
+        stats = admin1.topics().getStats(topicName);
+        assertTrue(stats.subscriptions.get(subName).isReplicated);
+        stats = admin2.topics().getStats(topicName);
+        assertFalse(stats.subscriptions.get(subName).isReplicated);
+    }
+
+    @Test(timeOut = 30000)
+    public void testReplicatedSubscriptionRestApi2() throws Exception {
+        final String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
+        final String topicName = "persistent://" + namespace + "/topic-rest-api2";
+        final String subName = "sub";
+
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1"));
+        admin1.topics().createPartitionedTopic(topicName, 2);
+
+        @Cleanup
+        final PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
+                .statsInterval(0, TimeUnit.SECONDS).build();
+
+        // Create subscription in r1
+        createReplicatedSubscription(client1, topicName, subName, true);
+
+        PartitionedTopicStats partitionedStats = admin1.topics().getPartitionedStats(topicName, true);
+        for (TopicStats stats : partitionedStats.partitions.values()) {
+            assertTrue(stats.subscriptions.get(subName).isReplicated);
+        }
+
+        // Disable replicated subscription in r1
+        admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, false);
+        partitionedStats = admin1.topics().getPartitionedStats(topicName, true);
+        for (TopicStats stats : partitionedStats.partitions.values()) {
+            assertFalse(stats.subscriptions.get(subName).isReplicated);
+        }
+
+        // Enable replicated subscription in r1
+        admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, true);
+        partitionedStats = admin1.topics().getPartitionedStats(topicName, true);
+        for (TopicStats stats : partitionedStats.partitions.values()) {
+            assertTrue(stats.subscriptions.get(subName).isReplicated);
+        }

Review comment:
       Added some e2e tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on pull request #10790: [broker] Add REST API to enable or disable replicated subscriptions

Posted by GitBox <gi...@apache.org>.
massakam commented on pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#issuecomment-857886734


   PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on pull request #10790: [broker] Add REST API to enable or disable replicated subscriptions

Posted by GitBox <gi...@apache.org>.
massakam commented on pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#issuecomment-858240638


   @eolivelli PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10790: [broker] Add REST API to enable or disable replicated subscriptions

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#discussion_r644517979



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,134 @@ protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean author
             });
         }
     }
+
+    protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName,
+            boolean authoritative, Boolean enabled) {
+        log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
+                topicName, subName);
+
+        if (enabled == null) {
+            asyncResponse.resume(new RestException(Status.BAD_REQUEST, "Boolean type request body is required"));
+            return;
+        }
+
+        // Reject the request if the topic is not persistent
+        if (!topicName.isPersistent()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on non-persistent topics"));
+            return;
+        }
+
+        // Reject the request if the topic is not global
+        if (!topicName.isGlobal()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on non-global topics"));
+            return;
+        }
+
+        // Permission to consume this topic is required
+        try {
+            validateTopicOperation(topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // Redirect the request to the peer-cluster if the local cluster is not included in the replication clusters
+        try {
+            validateGlobalNamespaceOwnership(namespaceName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // If the topic name is a partition name, no need to get partition topic metadata again
+        if (topicName.isPartitioned()) {

Review comment:
       No, "!" is not needed here. This method returns true if the topic name suffix is "-partition-x".
   ```java
   System.out.println(TopicName.get("persistent://public/default/massakam").isPartitioned());
   System.out.println(TopicName.get("persistent://public/default/massakam-partition-0").isPartitioned());
   ```
   ```sh
   false
   true
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org