You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/03/30 11:46:44 UTC

[GitHub] [pulsar] murong00 opened a new pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

murong00 opened a new pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637
 
 
   ### Motivation
   
   Add a new namespace policy `autoSubscriptionCreationOverride` which will enable an override of broker `autoSubscriptionCreation` settings on the namespace level. Users can keep `autoSubscriptionCreation` disabled for the broker and allow it on a specific namespace using this feature.
   
   ### Modifications
   
   * Add a new namespace policy: `autoSubscriptionCreationOverride` and associated API / CLI interface for setting and removing.
   * When checking `autoSubscriptionCreation` configuration, the broker first retrieves namespace policies from zookeeper. If not set, it falls back to broker configuration. 
   * Some minor improvement on `autoTopicCreationOverride` (e.g. v1 Namespaces API & cli md)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] codelipenghui commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#discussion_r405434298
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 ##########
 @@ -571,23 +572,80 @@ protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse, AutoTop
                         try {
                             // Write back the new policies into zookeeper
                             globalZk().setData(path(POLICIES, namespaceName.toString()),
-                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
-                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion(),
+                                    (rc, path1, ctx, stat) -> {
+                                        if (rc == KeeperException.Code.OK.intValue()) {
+                                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                                        } else {
+                                            String errorMsg = String.format(
+                                                    "[%s] Failed to modify autoTopicCreation status for namespace %s",
+                                                    clientAppId(), namespaceName);
+                                            if (rc == KeeperException.Code.NONODE.intValue()) {
+                                                log.warn("{} : does not exist", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                                            } else if (rc == KeeperException.Code.BADVERSION.intValue()) {
+                                                log.warn("{} : concurrent modification", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+                                            } else {
+                                                asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
+                                            }
+                                        }
+                                    }, null);
+                            String autoOverride = autoTopicCreationOverride.allowAutoTopicCreation ? "enabled" : "disabled";
+                            log.info("[{}] Successfully {} autoTopicCreation on namespace {}", clientAppId(), autoOverride, namespaceName);
                             asyncResponse.resume(Response.noContent().build());
-                            log.info("[{}] Successfully {} on namespace {}", clientAppId(),
-                                    autoTopicCreationOverride.allowAutoTopicCreation ? "enabled" : "disabled", namespaceName);
                             return null;
-                        } catch (KeeperException.NoNodeException e) {
-                            log.error("[{}] Failed to modify autoTopicCreation status for namespace {}: does not exist", clientAppId(),
-                                    namespaceName);
-                            asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
+                            asyncResponse.resume(new RestException(e));
                             return null;
-                        } catch (KeeperException.BadVersionException e) {
-                            log.error(
-                                    "[{}] Failed to modify autoTopicCreation status on namespace {} expected policy node version={} : concurrent modification",
-                                    clientAppId(), namespaceName, policiesNode.getValue().getVersion());
+                        }
+                    } else {
+                        asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
+                        return null;
+                    }
+                }
+        ).exceptionally(e -> {
+            log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
+            asyncResponse.resume(new RestException(e));
+            return null;
+        });
+    }
+
+    protected void internalRemoveAutoTopicCreation(AsyncResponse asyncResponse) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
 
-                            asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+        // Force to read the data s.t. the watch to the cache content is setup.
+        policiesCache().getWithStatAsync(path(POLICIES, namespaceName.toString())).thenApply(
+                policies -> {
+                    if (policies.isPresent()) {
+                        Entry<Policies, Stat> policiesNode = policies.get();
+                        policiesNode.getKey().autoTopicCreationOverride = null;
+                        try {
+                            // Write back the new policies into zookeeper
+                            globalZk().setData(path(POLICIES, namespaceName.toString()),
+                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion(),
+                                    (rc, path1, ctx, stat) -> {
+                                        if (rc == KeeperException.Code.OK.intValue()) {
+                                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                                        } else {
+                                            String errorMsg = String.format(
+                                                    "[%s] Failed to modify autoTopicCreation status for namespace %s",
+                                                    clientAppId(), namespaceName);
+                                            if (rc == KeeperException.Code.NONODE.intValue()) {
+                                                log.warn("{} : does not exist", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                                            } else if (rc == KeeperException.Code.BADVERSION.intValue()) {
+                                                log.warn("{} : concurrent modification", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+                                            } else {
+                                                asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
+                                            }
+                                        }
+                                    }, null);
+                            log.info("[{}] Successfully removed autoTopicCreation override on namespace {}", clientAppId(), namespaceName);
+                            asyncResponse.resume(Response.noContent().build());
 
 Review comment:
   The response should complete in the zookeeper callback. Otherwise, the broker always returns success.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] codelipenghui commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#discussion_r405436687
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 ##########
 @@ -615,29 +673,87 @@ protected void internalRemoveAutoTopicCreation(AsyncResponse asyncResponse) {
                 policies -> {
                     if (policies.isPresent()) {
                         Entry<Policies, Stat> policiesNode = policies.get();
-                        policiesNode.getKey().autoTopicCreationOverride = null;
+                        policiesNode.getKey().autoSubscriptionCreationOverride = autoSubscriptionCreationOverride;
                         try {
                             // Write back the new policies into zookeeper
                             globalZk().setData(path(POLICIES, namespaceName.toString()),
-                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
-                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion(),
+                                    (rc, path1, ctx, stat) -> {
+                                        if (rc == KeeperException.Code.OK.intValue()) {
+                                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                                        } else {
+                                            String errorMsg = String.format(
+                                                    "[%s] Failed to modify autoSubscriptionCreation status for namespace %s",
+                                                    clientAppId(), namespaceName);
+                                            if (rc == KeeperException.Code.NONODE.intValue()) {
+                                                log.warn("{} : does not exist", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                                            } else if (rc == KeeperException.Code.BADVERSION.intValue()) {
+                                                log.warn("{} : concurrent modification", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+                                            } else {
+                                                asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
+                                            }
+                                        }
+                                    }, null);
+                            String autoOverride = autoSubscriptionCreationOverride.allowAutoSubscriptionCreation ? "enabled" : "disabled";
+                            log.info("[{}] Successfully {} autoSubscriptionCreation on namespace {}", clientAppId(), autoOverride, namespaceName);
                             asyncResponse.resume(Response.noContent().build());
 
 Review comment:
   Should move to the zookeeper callback.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] murong00 commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
murong00 commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#discussion_r405912737
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 ##########
 @@ -571,23 +572,80 @@ protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse, AutoTop
                         try {
                             // Write back the new policies into zookeeper
                             globalZk().setData(path(POLICIES, namespaceName.toString()),
-                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
-                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion(),
+                                    (rc, path1, ctx, stat) -> {
+                                        if (rc == KeeperException.Code.OK.intValue()) {
+                                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                                        } else {
+                                            String errorMsg = String.format(
+                                                    "[%s] Failed to modify autoTopicCreation status for namespace %s",
+                                                    clientAppId(), namespaceName);
+                                            if (rc == KeeperException.Code.NONODE.intValue()) {
+                                                log.warn("{} : does not exist", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                                            } else if (rc == KeeperException.Code.BADVERSION.intValue()) {
+                                                log.warn("{} : concurrent modification", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+                                            } else {
+                                                asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
+                                            }
+                                        }
+                                    }, null);
+                            String autoOverride = autoTopicCreationOverride.allowAutoTopicCreation ? "enabled" : "disabled";
+                            log.info("[{}] Successfully {} autoTopicCreation on namespace {}", clientAppId(), autoOverride, namespaceName);
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] murong00 commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
murong00 commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#issuecomment-612776208
 
 
   /pulsarbot run-failure-checks

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] jiazhai commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
jiazhai commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#issuecomment-607896659
 
 
   /pulsarbot run-failure-checks
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] codelipenghui commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#discussion_r405435961
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 ##########
 @@ -571,23 +572,80 @@ protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse, AutoTop
                         try {
                             // Write back the new policies into zookeeper
                             globalZk().setData(path(POLICIES, namespaceName.toString()),
-                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
-                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion(),
+                                    (rc, path1, ctx, stat) -> {
+                                        if (rc == KeeperException.Code.OK.intValue()) {
+                                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                                        } else {
+                                            String errorMsg = String.format(
+                                                    "[%s] Failed to modify autoTopicCreation status for namespace %s",
+                                                    clientAppId(), namespaceName);
+                                            if (rc == KeeperException.Code.NONODE.intValue()) {
+                                                log.warn("{} : does not exist", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                                            } else if (rc == KeeperException.Code.BADVERSION.intValue()) {
+                                                log.warn("{} : concurrent modification", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+                                            } else {
+                                                asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
+                                            }
+                                        }
+                                    }, null);
+                            String autoOverride = autoTopicCreationOverride.allowAutoTopicCreation ? "enabled" : "disabled";
+                            log.info("[{}] Successfully {} autoTopicCreation on namespace {}", clientAppId(), autoOverride, namespaceName);
 
 Review comment:
   Should move to the zookeeper callback.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] murong00 commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
murong00 commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#discussion_r405912931
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 ##########
 @@ -615,29 +673,87 @@ protected void internalRemoveAutoTopicCreation(AsyncResponse asyncResponse) {
                 policies -> {
                     if (policies.isPresent()) {
                         Entry<Policies, Stat> policiesNode = policies.get();
-                        policiesNode.getKey().autoTopicCreationOverride = null;
+                        policiesNode.getKey().autoSubscriptionCreationOverride = autoSubscriptionCreationOverride;
                         try {
                             // Write back the new policies into zookeeper
                             globalZk().setData(path(POLICIES, namespaceName.toString()),
-                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
-                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion(),
+                                    (rc, path1, ctx, stat) -> {
+                                        if (rc == KeeperException.Code.OK.intValue()) {
+                                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                                        } else {
+                                            String errorMsg = String.format(
+                                                    "[%s] Failed to modify autoSubscriptionCreation status for namespace %s",
+                                                    clientAppId(), namespaceName);
+                                            if (rc == KeeperException.Code.NONODE.intValue()) {
+                                                log.warn("{} : does not exist", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                                            } else if (rc == KeeperException.Code.BADVERSION.intValue()) {
+                                                log.warn("{} : concurrent modification", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+                                            } else {
+                                                asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
+                                            }
+                                        }
+                                    }, null);
+                            String autoOverride = autoSubscriptionCreationOverride.allowAutoSubscriptionCreation ? "enabled" : "disabled";
+                            log.info("[{}] Successfully {} autoSubscriptionCreation on namespace {}", clientAppId(), autoOverride, namespaceName);
                             asyncResponse.resume(Response.noContent().build());
-                            log.info("[{}] Successfully removed override on namespace {}", clientAppId(), namespaceName);
                             return null;
-                        } catch (KeeperException.NoNodeException e) {
-                            log.error("[{}] Failed to modify autoTopicCreation status for namespace {}: does not exist", clientAppId(),
-                                    namespaceName);
-                            asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to modify autoSubscriptionCreation status on namespace {}", clientAppId(), namespaceName, e);
+                            asyncResponse.resume(new RestException(e));
                             return null;
-                        } catch (KeeperException.BadVersionException e) {
-                            log.error(
-                                    "[{}] Failed to modify autoTopicCreation status on namespace {} expected policy node version={} : concurrent modification",
-                                    clientAppId(), namespaceName, policiesNode.getValue().getVersion());
+                        }
+                    } else {
+                        asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
+                        return null;
+                    }
+                }
+        ).exceptionally(e -> {
+            log.error("[{}] Failed to modify autoSubscriptionCreation status on namespace {}", clientAppId(), namespaceName, e);
+            asyncResponse.resume(new RestException(e));
+            return null;
+        });
+    }
+
+    protected void internalRemoveAutoSubscriptionCreation(AsyncResponse asyncResponse) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
 
-                            asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+        // Force to read the data s.t. the watch to the cache content is setup.
+        policiesCache().getWithStatAsync(path(POLICIES, namespaceName.toString())).thenApply(
+                policies -> {
+                    if (policies.isPresent()) {
+                        Entry<Policies, Stat> policiesNode = policies.get();
+                        policiesNode.getKey().autoSubscriptionCreationOverride = null;
+                        try {
+                            // Write back the new policies into zookeeper
+                            globalZk().setData(path(POLICIES, namespaceName.toString()),
+                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion(),
+                                    (rc, path1, ctx, stat) -> {
+                                        if (rc == KeeperException.Code.OK.intValue()) {
+                                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                                        } else {
+                                            String errorMsg = String.format(
+                                                    "[%s] Failed to modify autoSubscriptionCreation status for namespace %s",
+                                                    clientAppId(), namespaceName);
+                                            if (rc == KeeperException.Code.NONODE.intValue()) {
+                                                log.warn("{} : does not exist", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                                            } else if (rc == KeeperException.Code.BADVERSION.intValue()) {
+                                                log.warn("{} : concurrent modification", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+                                            } else {
+                                                asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
+                                            }
+                                        }
+                                    }, null);
+                            log.info("[{}] Successfully removed autoSubscriptionCreation override on namespace {}", clientAppId(), namespaceName);
+                            asyncResponse.resume(Response.noContent().build());
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] murong00 removed a comment on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
murong00 removed a comment on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#issuecomment-612776208
 
 
   /pulsarbot run-failure-checks

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] murong00 removed a comment on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
murong00 removed a comment on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#issuecomment-613172673
 
 
   /pulsarbot run-failure-checks

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] murong00 commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
murong00 commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#issuecomment-610149365
 
 
   @codelipenghui I hava addressed your comment, please help to take a look, thanks.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] jiazhai commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
jiazhai commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#issuecomment-607560635
 
 
   /pulsarbot run-failure-checks

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] murong00 commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
murong00 commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#discussion_r405912715
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 ##########
 @@ -571,23 +572,80 @@ protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse, AutoTop
                         try {
                             // Write back the new policies into zookeeper
                             globalZk().setData(path(POLICIES, namespaceName.toString()),
-                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
-                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion(),
+                                    (rc, path1, ctx, stat) -> {
+                                        if (rc == KeeperException.Code.OK.intValue()) {
+                                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] murong00 commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
murong00 commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#discussion_r405912774
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 ##########
 @@ -615,29 +673,87 @@ protected void internalRemoveAutoTopicCreation(AsyncResponse asyncResponse) {
                 policies -> {
                     if (policies.isPresent()) {
                         Entry<Policies, Stat> policiesNode = policies.get();
-                        policiesNode.getKey().autoTopicCreationOverride = null;
+                        policiesNode.getKey().autoSubscriptionCreationOverride = autoSubscriptionCreationOverride;
                         try {
                             // Write back the new policies into zookeeper
                             globalZk().setData(path(POLICIES, namespaceName.toString()),
-                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
-                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion(),
+                                    (rc, path1, ctx, stat) -> {
+                                        if (rc == KeeperException.Code.OK.intValue()) {
+                                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                                        } else {
+                                            String errorMsg = String.format(
+                                                    "[%s] Failed to modify autoSubscriptionCreation status for namespace %s",
+                                                    clientAppId(), namespaceName);
+                                            if (rc == KeeperException.Code.NONODE.intValue()) {
+                                                log.warn("{} : does not exist", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                                            } else if (rc == KeeperException.Code.BADVERSION.intValue()) {
+                                                log.warn("{} : concurrent modification", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+                                            } else {
+                                                asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
+                                            }
+                                        }
+                                    }, null);
+                            String autoOverride = autoSubscriptionCreationOverride.allowAutoSubscriptionCreation ? "enabled" : "disabled";
+                            log.info("[{}] Successfully {} autoSubscriptionCreation on namespace {}", clientAppId(), autoOverride, namespaceName);
                             asyncResponse.resume(Response.noContent().build());
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] murong00 commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
murong00 commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#issuecomment-613172673
 
 
   /pulsarbot run-failure-checks

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] codelipenghui commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#discussion_r405436869
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 ##########
 @@ -615,29 +673,87 @@ protected void internalRemoveAutoTopicCreation(AsyncResponse asyncResponse) {
                 policies -> {
                     if (policies.isPresent()) {
                         Entry<Policies, Stat> policiesNode = policies.get();
-                        policiesNode.getKey().autoTopicCreationOverride = null;
+                        policiesNode.getKey().autoSubscriptionCreationOverride = autoSubscriptionCreationOverride;
                         try {
                             // Write back the new policies into zookeeper
                             globalZk().setData(path(POLICIES, namespaceName.toString()),
-                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
-                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion(),
+                                    (rc, path1, ctx, stat) -> {
+                                        if (rc == KeeperException.Code.OK.intValue()) {
+                                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                                        } else {
+                                            String errorMsg = String.format(
+                                                    "[%s] Failed to modify autoSubscriptionCreation status for namespace %s",
+                                                    clientAppId(), namespaceName);
+                                            if (rc == KeeperException.Code.NONODE.intValue()) {
+                                                log.warn("{} : does not exist", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                                            } else if (rc == KeeperException.Code.BADVERSION.intValue()) {
+                                                log.warn("{} : concurrent modification", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+                                            } else {
+                                                asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
+                                            }
+                                        }
+                                    }, null);
+                            String autoOverride = autoSubscriptionCreationOverride.allowAutoSubscriptionCreation ? "enabled" : "disabled";
+                            log.info("[{}] Successfully {} autoSubscriptionCreation on namespace {}", clientAppId(), autoOverride, namespaceName);
                             asyncResponse.resume(Response.noContent().build());
-                            log.info("[{}] Successfully removed override on namespace {}", clientAppId(), namespaceName);
                             return null;
-                        } catch (KeeperException.NoNodeException e) {
-                            log.error("[{}] Failed to modify autoTopicCreation status for namespace {}: does not exist", clientAppId(),
-                                    namespaceName);
-                            asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to modify autoSubscriptionCreation status on namespace {}", clientAppId(), namespaceName, e);
+                            asyncResponse.resume(new RestException(e));
                             return null;
-                        } catch (KeeperException.BadVersionException e) {
-                            log.error(
-                                    "[{}] Failed to modify autoTopicCreation status on namespace {} expected policy node version={} : concurrent modification",
-                                    clientAppId(), namespaceName, policiesNode.getValue().getVersion());
+                        }
+                    } else {
+                        asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
+                        return null;
+                    }
+                }
+        ).exceptionally(e -> {
+            log.error("[{}] Failed to modify autoSubscriptionCreation status on namespace {}", clientAppId(), namespaceName, e);
+            asyncResponse.resume(new RestException(e));
+            return null;
+        });
+    }
+
+    protected void internalRemoveAutoSubscriptionCreation(AsyncResponse asyncResponse) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
 
-                            asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+        // Force to read the data s.t. the watch to the cache content is setup.
+        policiesCache().getWithStatAsync(path(POLICIES, namespaceName.toString())).thenApply(
+                policies -> {
+                    if (policies.isPresent()) {
+                        Entry<Policies, Stat> policiesNode = policies.get();
+                        policiesNode.getKey().autoSubscriptionCreationOverride = null;
+                        try {
+                            // Write back the new policies into zookeeper
+                            globalZk().setData(path(POLICIES, namespaceName.toString()),
+                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion(),
+                                    (rc, path1, ctx, stat) -> {
+                                        if (rc == KeeperException.Code.OK.intValue()) {
+                                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                                        } else {
+                                            String errorMsg = String.format(
+                                                    "[%s] Failed to modify autoSubscriptionCreation status for namespace %s",
+                                                    clientAppId(), namespaceName);
+                                            if (rc == KeeperException.Code.NONODE.intValue()) {
+                                                log.warn("{} : does not exist", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                                            } else if (rc == KeeperException.Code.BADVERSION.intValue()) {
+                                                log.warn("{} : concurrent modification", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+                                            } else {
+                                                asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
+                                            }
+                                        }
+                                    }, null);
+                            log.info("[{}] Successfully removed autoSubscriptionCreation override on namespace {}", clientAppId(), namespaceName);
+                            asyncResponse.resume(Response.noContent().build());
 
 Review comment:
   Should move to the zookeeper callback.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] murong00 commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
murong00 commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#discussion_r404510526
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 ##########
 @@ -653,6 +654,101 @@ protected void internalRemoveAutoTopicCreation(AsyncResponse asyncResponse) {
         });
     }
 
+    protected void internalSetAutoSubscriptionCreation(AsyncResponse asyncResponse, AutoSubscriptionCreationOverride autoSubscriptionCreationOverride) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+
+        // Force to read the data s.t. the watch to the cache content is setup.
+        policiesCache().getWithStatAsync(path(POLICIES, namespaceName.toString())).thenApply(
+                policies -> {
+                    if (policies.isPresent()) {
+                        Entry<Policies, Stat> policiesNode = policies.get();
+                        policiesNode.getKey().autoSubscriptionCreationOverride = autoSubscriptionCreationOverride;
+                        try {
+                            // Write back the new policies into zookeeper
+                            globalZk().setData(path(POLICIES, namespaceName.toString()),
+                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
+                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                            asyncResponse.resume(Response.noContent().build());
+                            String autoOverride = autoSubscriptionCreationOverride.allowAutoSubscriptionCreation ? "enabled" : "disabled";
+                            log.info("[{}] Successfully {} autoSubscriptionCreation on namespace {}", clientAppId(), autoOverride, namespaceName);
+                            return null;
+                        } catch (KeeperException.NoNodeException e) {
+                            log.error("[{}] Failed to modify autoSubscriptionCreation status for namespace {}: does not exist", clientAppId(),
+                                    namespaceName);
+                            asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                            return null;
+                        } catch (KeeperException.BadVersionException e) {
+                            log.error(
+                                    "[{}] Failed to modify autoSubscriptionCreation status on namespace {} expected policy node version={} : concurrent modification",
+                                    clientAppId(), namespaceName, policiesNode.getValue().getVersion());
+
+                            asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+                            return null;
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to modify autoSubscriptionCreation status on namespace {}", clientAppId(), namespaceName, e);
+                            asyncResponse.resume(new RestException(e));
+                            return null;
+                        }
+                    } else {
+                        asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
+                        return null;
+                    }
+                }
+        ).exceptionally(e -> {
+            log.error("[{}] Failed to modify autoSubscriptionCreation status on namespace {}", clientAppId(), namespaceName, e);
+            asyncResponse.resume(new RestException(e));
+            return null;
+        });
+    }
+
+    protected void internalRemoveAutoSubscriptionCreation(AsyncResponse asyncResponse) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+
+        // Force to read the data s.t. the watch to the cache content is setup.
+        policiesCache().getWithStatAsync(path(POLICIES, namespaceName.toString())).thenApply(
+                policies -> {
+                    if (policies.isPresent()) {
+                        Entry<Policies, Stat> policiesNode = policies.get();
+                        policiesNode.getKey().autoSubscriptionCreationOverride = null;
+                        try {
+                            // Write back the new policies into zookeeper
+                            globalZk().setData(path(POLICIES, namespaceName.toString()),
+                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] murong00 commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
murong00 commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#discussion_r404510481
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 ##########
 @@ -653,6 +654,101 @@ protected void internalRemoveAutoTopicCreation(AsyncResponse asyncResponse) {
         });
     }
 
+    protected void internalSetAutoSubscriptionCreation(AsyncResponse asyncResponse, AutoSubscriptionCreationOverride autoSubscriptionCreationOverride) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+
+        // Force to read the data s.t. the watch to the cache content is setup.
+        policiesCache().getWithStatAsync(path(POLICIES, namespaceName.toString())).thenApply(
+                policies -> {
+                    if (policies.isPresent()) {
+                        Entry<Policies, Stat> policiesNode = policies.get();
+                        policiesNode.getKey().autoSubscriptionCreationOverride = autoSubscriptionCreationOverride;
+                        try {
+                            // Write back the new policies into zookeeper
+                            globalZk().setData(path(POLICIES, namespaceName.toString()),
+                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] murong00 commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
murong00 commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#issuecomment-610319377
 
 
   /pulsarbot run-failure-checks

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] murong00 commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
murong00 commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#issuecomment-611361110
 
 
   /pulsarbot run-failure-checks

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] murong00 commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
murong00 commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#issuecomment-613181339
 
 
   /pulsarbot run-failure-checks

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] codelipenghui commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#discussion_r405435396
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 ##########
 @@ -571,23 +572,80 @@ protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse, AutoTop
                         try {
                             // Write back the new policies into zookeeper
                             globalZk().setData(path(POLICIES, namespaceName.toString()),
-                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
-                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion(),
+                                    (rc, path1, ctx, stat) -> {
+                                        if (rc == KeeperException.Code.OK.intValue()) {
+                                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
 
 Review comment:
   Looks should complete the `asyncResponse` here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] codelipenghui commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#issuecomment-610836697
 
 
   /pulsarbot run-failure-checks

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] murong00 commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
murong00 commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#discussion_r405912642
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 ##########
 @@ -571,23 +572,80 @@ protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse, AutoTop
                         try {
                             // Write back the new policies into zookeeper
                             globalZk().setData(path(POLICIES, namespaceName.toString()),
-                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
-                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion(),
+                                    (rc, path1, ctx, stat) -> {
+                                        if (rc == KeeperException.Code.OK.intValue()) {
+                                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                                        } else {
+                                            String errorMsg = String.format(
+                                                    "[%s] Failed to modify autoTopicCreation status for namespace %s",
+                                                    clientAppId(), namespaceName);
+                                            if (rc == KeeperException.Code.NONODE.intValue()) {
+                                                log.warn("{} : does not exist", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                                            } else if (rc == KeeperException.Code.BADVERSION.intValue()) {
+                                                log.warn("{} : concurrent modification", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+                                            } else {
+                                                asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
+                                            }
+                                        }
+                                    }, null);
+                            String autoOverride = autoTopicCreationOverride.allowAutoTopicCreation ? "enabled" : "disabled";
+                            log.info("[{}] Successfully {} autoTopicCreation on namespace {}", clientAppId(), autoOverride, namespaceName);
                             asyncResponse.resume(Response.noContent().build());
-                            log.info("[{}] Successfully {} on namespace {}", clientAppId(),
-                                    autoTopicCreationOverride.allowAutoTopicCreation ? "enabled" : "disabled", namespaceName);
                             return null;
-                        } catch (KeeperException.NoNodeException e) {
-                            log.error("[{}] Failed to modify autoTopicCreation status for namespace {}: does not exist", clientAppId(),
-                                    namespaceName);
-                            asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
+                            asyncResponse.resume(new RestException(e));
                             return null;
-                        } catch (KeeperException.BadVersionException e) {
-                            log.error(
-                                    "[{}] Failed to modify autoTopicCreation status on namespace {} expected policy node version={} : concurrent modification",
-                                    clientAppId(), namespaceName, policiesNode.getValue().getVersion());
+                        }
+                    } else {
+                        asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
+                        return null;
+                    }
+                }
+        ).exceptionally(e -> {
+            log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
+            asyncResponse.resume(new RestException(e));
+            return null;
+        });
+    }
+
+    protected void internalRemoveAutoTopicCreation(AsyncResponse asyncResponse) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
 
-                            asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+        // Force to read the data s.t. the watch to the cache content is setup.
+        policiesCache().getWithStatAsync(path(POLICIES, namespaceName.toString())).thenApply(
+                policies -> {
+                    if (policies.isPresent()) {
+                        Entry<Policies, Stat> policiesNode = policies.get();
+                        policiesNode.getKey().autoTopicCreationOverride = null;
+                        try {
+                            // Write back the new policies into zookeeper
+                            globalZk().setData(path(POLICIES, namespaceName.toString()),
+                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion(),
+                                    (rc, path1, ctx, stat) -> {
+                                        if (rc == KeeperException.Code.OK.intValue()) {
+                                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                                        } else {
+                                            String errorMsg = String.format(
+                                                    "[%s] Failed to modify autoTopicCreation status for namespace %s",
+                                                    clientAppId(), namespaceName);
+                                            if (rc == KeeperException.Code.NONODE.intValue()) {
+                                                log.warn("{} : does not exist", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                                            } else if (rc == KeeperException.Code.BADVERSION.intValue()) {
+                                                log.warn("{} : concurrent modification", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+                                            } else {
+                                                asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
+                                            }
+                                        }
+                                    }, null);
+                            log.info("[{}] Successfully removed autoTopicCreation override on namespace {}", clientAppId(), namespaceName);
+                            asyncResponse.resume(Response.noContent().build());
 
 Review comment:
   @codelipenghui Thanks for pointing out this, hava fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] murong00 commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
murong00 commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#issuecomment-611320421
 
 
   /pulsarbot run-failure-checks

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] murong00 commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
murong00 commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#issuecomment-610700161
 
 
   /pulsarbot run-failure-checks

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] codelipenghui commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#discussion_r404022980
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 ##########
 @@ -653,6 +654,101 @@ protected void internalRemoveAutoTopicCreation(AsyncResponse asyncResponse) {
         });
     }
 
+    protected void internalSetAutoSubscriptionCreation(AsyncResponse asyncResponse, AutoSubscriptionCreationOverride autoSubscriptionCreationOverride) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+
+        // Force to read the data s.t. the watch to the cache content is setup.
+        policiesCache().getWithStatAsync(path(POLICIES, namespaceName.toString())).thenApply(
+                policies -> {
+                    if (policies.isPresent()) {
+                        Entry<Policies, Stat> policiesNode = policies.get();
+                        policiesNode.getKey().autoSubscriptionCreationOverride = autoSubscriptionCreationOverride;
+                        try {
+                            // Write back the new policies into zookeeper
+                            globalZk().setData(path(POLICIES, namespaceName.toString()),
+                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
 
 Review comment:
   It's better to call `setData()` asynchronously.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] jiazhai commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
jiazhai commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#issuecomment-606340974
 
 
   @murong00 thanks for this feature. Please check the CI failures. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] sijie commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
sijie commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#issuecomment-611709776
 
 
   /pulsarbot run-failure-checks

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] sijie commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
sijie commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#issuecomment-612676385
 
 
   /pulsarbot run-failure-checks

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] murong00 commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
murong00 commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#discussion_r405912642
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 ##########
 @@ -571,23 +572,80 @@ protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse, AutoTop
                         try {
                             // Write back the new policies into zookeeper
                             globalZk().setData(path(POLICIES, namespaceName.toString()),
-                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
-                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion(),
+                                    (rc, path1, ctx, stat) -> {
+                                        if (rc == KeeperException.Code.OK.intValue()) {
+                                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                                        } else {
+                                            String errorMsg = String.format(
+                                                    "[%s] Failed to modify autoTopicCreation status for namespace %s",
+                                                    clientAppId(), namespaceName);
+                                            if (rc == KeeperException.Code.NONODE.intValue()) {
+                                                log.warn("{} : does not exist", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                                            } else if (rc == KeeperException.Code.BADVERSION.intValue()) {
+                                                log.warn("{} : concurrent modification", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+                                            } else {
+                                                asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
+                                            }
+                                        }
+                                    }, null);
+                            String autoOverride = autoTopicCreationOverride.allowAutoTopicCreation ? "enabled" : "disabled";
+                            log.info("[{}] Successfully {} autoTopicCreation on namespace {}", clientAppId(), autoOverride, namespaceName);
                             asyncResponse.resume(Response.noContent().build());
-                            log.info("[{}] Successfully {} on namespace {}", clientAppId(),
-                                    autoTopicCreationOverride.allowAutoTopicCreation ? "enabled" : "disabled", namespaceName);
                             return null;
-                        } catch (KeeperException.NoNodeException e) {
-                            log.error("[{}] Failed to modify autoTopicCreation status for namespace {}: does not exist", clientAppId(),
-                                    namespaceName);
-                            asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
+                            asyncResponse.resume(new RestException(e));
                             return null;
-                        } catch (KeeperException.BadVersionException e) {
-                            log.error(
-                                    "[{}] Failed to modify autoTopicCreation status on namespace {} expected policy node version={} : concurrent modification",
-                                    clientAppId(), namespaceName, policiesNode.getValue().getVersion());
+                        }
+                    } else {
+                        asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
+                        return null;
+                    }
+                }
+        ).exceptionally(e -> {
+            log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
+            asyncResponse.resume(new RestException(e));
+            return null;
+        });
+    }
+
+    protected void internalRemoveAutoTopicCreation(AsyncResponse asyncResponse) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
 
-                            asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+        // Force to read the data s.t. the watch to the cache content is setup.
+        policiesCache().getWithStatAsync(path(POLICIES, namespaceName.toString())).thenApply(
+                policies -> {
+                    if (policies.isPresent()) {
+                        Entry<Policies, Stat> policiesNode = policies.get();
+                        policiesNode.getKey().autoTopicCreationOverride = null;
+                        try {
+                            // Write back the new policies into zookeeper
+                            globalZk().setData(path(POLICIES, namespaceName.toString()),
+                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion(),
+                                    (rc, path1, ctx, stat) -> {
+                                        if (rc == KeeperException.Code.OK.intValue()) {
+                                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                                        } else {
+                                            String errorMsg = String.format(
+                                                    "[%s] Failed to modify autoTopicCreation status for namespace %s",
+                                                    clientAppId(), namespaceName);
+                                            if (rc == KeeperException.Code.NONODE.intValue()) {
+                                                log.warn("{} : does not exist", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                                            } else if (rc == KeeperException.Code.BADVERSION.intValue()) {
+                                                log.warn("{} : concurrent modification", errorMsg);
+                                                asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+                                            } else {
+                                                asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
+                                            }
+                                        }
+                                    }, null);
+                            log.info("[{}] Successfully removed autoTopicCreation override on namespace {}", clientAppId(), namespaceName);
+                            asyncResponse.resume(Response.noContent().build());
 
 Review comment:
   @codelipenghui Thanks for pointing out this, have fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] murong00 commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
murong00 commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#issuecomment-611415900
 
 
   /pulsarbot run-failure-checks

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] sijie merged pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
sijie merged pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] codelipenghui commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#issuecomment-612811834
 
 
   @murong00 Could you please rebase your branch since some test-related fixes merged into the master branch.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] codelipenghui commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #6637: [pulsar-broker] Implement AutoSubscriptionCreation by namespace override
URL: https://github.com/apache/pulsar/pull/6637#discussion_r404023277
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 ##########
 @@ -653,6 +654,101 @@ protected void internalRemoveAutoTopicCreation(AsyncResponse asyncResponse) {
         });
     }
 
+    protected void internalSetAutoSubscriptionCreation(AsyncResponse asyncResponse, AutoSubscriptionCreationOverride autoSubscriptionCreationOverride) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+
+        // Force to read the data s.t. the watch to the cache content is setup.
+        policiesCache().getWithStatAsync(path(POLICIES, namespaceName.toString())).thenApply(
+                policies -> {
+                    if (policies.isPresent()) {
+                        Entry<Policies, Stat> policiesNode = policies.get();
+                        policiesNode.getKey().autoSubscriptionCreationOverride = autoSubscriptionCreationOverride;
+                        try {
+                            // Write back the new policies into zookeeper
+                            globalZk().setData(path(POLICIES, namespaceName.toString()),
+                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
+                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                            asyncResponse.resume(Response.noContent().build());
+                            String autoOverride = autoSubscriptionCreationOverride.allowAutoSubscriptionCreation ? "enabled" : "disabled";
+                            log.info("[{}] Successfully {} autoSubscriptionCreation on namespace {}", clientAppId(), autoOverride, namespaceName);
+                            return null;
+                        } catch (KeeperException.NoNodeException e) {
+                            log.error("[{}] Failed to modify autoSubscriptionCreation status for namespace {}: does not exist", clientAppId(),
+                                    namespaceName);
+                            asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                            return null;
+                        } catch (KeeperException.BadVersionException e) {
+                            log.error(
+                                    "[{}] Failed to modify autoSubscriptionCreation status on namespace {} expected policy node version={} : concurrent modification",
+                                    clientAppId(), namespaceName, policiesNode.getValue().getVersion());
+
+                            asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+                            return null;
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to modify autoSubscriptionCreation status on namespace {}", clientAppId(), namespaceName, e);
+                            asyncResponse.resume(new RestException(e));
+                            return null;
+                        }
+                    } else {
+                        asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
+                        return null;
+                    }
+                }
+        ).exceptionally(e -> {
+            log.error("[{}] Failed to modify autoSubscriptionCreation status on namespace {}", clientAppId(), namespaceName, e);
+            asyncResponse.resume(new RestException(e));
+            return null;
+        });
+    }
+
+    protected void internalRemoveAutoSubscriptionCreation(AsyncResponse asyncResponse) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+
+        // Force to read the data s.t. the watch to the cache content is setup.
+        policiesCache().getWithStatAsync(path(POLICIES, namespaceName.toString())).thenApply(
+                policies -> {
+                    if (policies.isPresent()) {
+                        Entry<Policies, Stat> policiesNode = policies.get();
+                        policiesNode.getKey().autoSubscriptionCreationOverride = null;
+                        try {
+                            // Write back the new policies into zookeeper
+                            globalZk().setData(path(POLICIES, namespaceName.toString()),
+                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
 
 Review comment:
   Same as the above comment.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services