You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/01/28 04:19:49 UTC

[GitHub] [pulsar] rdhabalia opened a new pull request #9351: [pulsar-broker] namespace resources use metadata-store api

rdhabalia opened a new pull request #9351:
URL: https://github.com/apache/pulsar/pull/9351


   ### Motivation
   This PR is on top of #9338, with this change, namespace-resources will start using metadata-api and remove zk-dependency.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia closed pull request #9351: [pulsar-broker] namespace resources use metadata-store api

Posted by GitBox <gi...@apache.org>.
rdhabalia closed pull request #9351:
URL: https://github.com/apache/pulsar/pull/9351


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on a change in pull request #9351: [pulsar-broker] namespace resources use metadata-store api

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on a change in pull request #9351:
URL: https://github.com/apache/pulsar/pull/9351#discussion_r571231044



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
##########
@@ -856,172 +764,52 @@ protected void internalSetSubscriptionExpirationTime(int expirationTime) {
         if (expirationTime < 0) {
             throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for subscription expiration time");
         }
-
-        Entry<Policies, Stat> policiesNode = null;
-
-        try {
-            // Force to read the data s.t. the watch to the cache content is setup.
-            policiesNode = policiesCache().getWithStat(path(POLICIES, namespaceName.toString())).orElseThrow(
-                    () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
-            policiesNode.getKey().subscription_expiration_time_minutes = expirationTime;
-
-            // Write back the new policies into zookeeper
-            globalZk().setData(path(POLICIES, namespaceName.toString()),
-                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
-            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
-
-            log.info("[{}] Successfully updated the subscription expiration time on namespace {}", clientAppId(),
-                    namespaceName);
-        } catch (KeeperException.NoNodeException e) {
-            log.warn("[{}] Failed to update the subscription expiration time for namespace {}: does not exist",
-                    clientAppId(), namespaceName);
-            throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
-        } catch (KeeperException.BadVersionException e) {
-            log.warn(
-                    "[{}] Failed to update the subscription expiration time on"
-                            + " namespace {} expected policy node version={} : concurrent modification",
-                    clientAppId(), namespaceName, policiesNode.getValue().getVersion());
-            throw new RestException(Status.CONFLICT, "Concurrent modification");
-        } catch (Exception e) {
-            log.error("[{}] Failed to update the subscription expiration time on namespace {}", clientAppId(),
-                    namespaceName, e);
-            throw new RestException(e);
-        }
+        updatePolicies(path(POLICIES, namespaceName.toString()), (policies) -> {
+            policies.subscription_expiration_time_minutes = expirationTime;
+            return policies;
+        });
     }
 
     protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse,
                                                 AutoTopicCreationOverride autoTopicCreationOverride) {
         final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
         validateAdminAccessForTenant(namespaceName.getTenant());
         validatePoliciesReadOnlyAccess();
-
-        if (!AutoTopicCreationOverride.isValidOverride(autoTopicCreationOverride)) {
-            throw new RestException(Status.PRECONDITION_FAILED, "Invalid configuration for autoTopicCreationOverride");
-        }
-        if (maxPartitions > 0 && autoTopicCreationOverride.defaultNumPartitions > maxPartitions) {
-            throw new RestException(Status.NOT_ACCEPTABLE,
-                    "Number of partitions should be less than or equal to " + maxPartitions);
+        if (autoTopicCreationOverride != null) {

Review comment:
       yes, I merged two methods which had duplicate code:
   `internalRemoveAutoTopicCreation` and `internalSetAutoSubscriptionCreation`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on a change in pull request #9351: [pulsar-broker] namespace resources use metadata-store api

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on a change in pull request #9351:
URL: https://github.com/apache/pulsar/pull/9351#discussion_r571231283



##########
File path: pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
##########
@@ -253,4 +255,23 @@ public void accept(Notification t) {
             break;
         }
     }
+
+    private CompletableFuture<Void> executeWithRetry(Supplier<CompletableFuture<Void>> op, String key) {
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        op.get().thenAccept(r -> result.complete(null)).exceptionally((ex) -> {
+            if (ex.getCause() instanceof BadVersionException) {
+                // if resource is updated by other than metadata-cache then metadata-cache will get bad-version
+                // exception. so, try to invalidate the cache and try one more time.
+                objCache.synchronous().invalidate(key);

Review comment:
       no, it just cleans up the cache synchronously. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Renkai commented on pull request #9351: [pulsar-broker] namespace resources use metadata-store api

Posted by GitBox <gi...@apache.org>.
Renkai commented on pull request #9351:
URL: https://github.com/apache/pulsar/pull/9351#issuecomment-774639145


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on pull request #9351: [pulsar-broker] namespace resources use metadata-store api

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on pull request #9351:
URL: https://github.com/apache/pulsar/pull/9351#issuecomment-773870157


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9351: [pulsar-broker] namespace resources use metadata-store api

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9351:
URL: https://github.com/apache/pulsar/pull/9351#discussion_r571019653



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
##########
@@ -856,172 +764,52 @@ protected void internalSetSubscriptionExpirationTime(int expirationTime) {
         if (expirationTime < 0) {
             throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for subscription expiration time");
         }
-
-        Entry<Policies, Stat> policiesNode = null;
-
-        try {
-            // Force to read the data s.t. the watch to the cache content is setup.
-            policiesNode = policiesCache().getWithStat(path(POLICIES, namespaceName.toString())).orElseThrow(
-                    () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
-            policiesNode.getKey().subscription_expiration_time_minutes = expirationTime;
-
-            // Write back the new policies into zookeeper
-            globalZk().setData(path(POLICIES, namespaceName.toString()),
-                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
-            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
-
-            log.info("[{}] Successfully updated the subscription expiration time on namespace {}", clientAppId(),
-                    namespaceName);
-        } catch (KeeperException.NoNodeException e) {
-            log.warn("[{}] Failed to update the subscription expiration time for namespace {}: does not exist",
-                    clientAppId(), namespaceName);
-            throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
-        } catch (KeeperException.BadVersionException e) {
-            log.warn(
-                    "[{}] Failed to update the subscription expiration time on"
-                            + " namespace {} expected policy node version={} : concurrent modification",
-                    clientAppId(), namespaceName, policiesNode.getValue().getVersion());
-            throw new RestException(Status.CONFLICT, "Concurrent modification");
-        } catch (Exception e) {
-            log.error("[{}] Failed to update the subscription expiration time on namespace {}", clientAppId(),
-                    namespaceName, e);
-            throw new RestException(e);
-        }
+        updatePolicies(path(POLICIES, namespaceName.toString()), (policies) -> {
+            policies.subscription_expiration_time_minutes = expirationTime;
+            return policies;
+        });
     }
 
     protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse,
                                                 AutoTopicCreationOverride autoTopicCreationOverride) {
         final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
         validateAdminAccessForTenant(namespaceName.getTenant());
         validatePoliciesReadOnlyAccess();
-
-        if (!AutoTopicCreationOverride.isValidOverride(autoTopicCreationOverride)) {
-            throw new RestException(Status.PRECONDITION_FAILED, "Invalid configuration for autoTopicCreationOverride");
-        }
-        if (maxPartitions > 0 && autoTopicCreationOverride.defaultNumPartitions > maxPartitions) {
-            throw new RestException(Status.NOT_ACCEPTABLE,
-                    "Number of partitions should be less than or equal to " + maxPartitions);
+        if (autoTopicCreationOverride != null) {

Review comment:
       this `if` statement was not present before, can it be `null` here ?

##########
File path: pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
##########
@@ -253,4 +255,23 @@ public void accept(Notification t) {
             break;
         }
     }
+
+    private CompletableFuture<Void> executeWithRetry(Supplier<CompletableFuture<Void>> op, String key) {
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        op.get().thenAccept(r -> result.complete(null)).exceptionally((ex) -> {
+            if (ex.getCause() instanceof BadVersionException) {
+                // if resource is updated by other than metadata-cache then metadata-cache will get bad-version
+                // exception. so, try to invalidate the cache and try one more time.
+                objCache.synchronous().invalidate(key);

Review comment:
       can we call here this `synchronous` operation ?
   do we risk a deadlock ?

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
##########
@@ -2041,4 +2044,9 @@ public void testCompactionStatus() throws Exception {
         assertTrue(admin.topics().compactionStatus(topicName)
             .lastError.contains("Failed at something"));
     }
+
+    private void clearCache() {
+        // (MetadataCacheImpl<ClusterData>) pulsar.getPulsarResources().

Review comment:
       nit: remove comment




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on pull request #9351: [pulsar-broker] namespace resources use metadata-store api

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on pull request #9351:
URL: https://github.com/apache/pulsar/pull/9351#issuecomment-773870157


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on a change in pull request #9351: [pulsar-broker] namespace resources use metadata-store api

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on a change in pull request #9351:
URL: https://github.com/apache/pulsar/pull/9351#discussion_r571348312



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -418,7 +382,7 @@ protected Policies getNamespacePolicies(NamespaceName namespaceName) {
         try {
             final String namespace = namespaceName.toString();
             final String policyPath = AdminResource.path(POLICIES, namespace);
-            Policies policies = policiesCache().get(policyPath)
+            Policies policies = namespaceResources().get(policyPath)

Review comment:
       yes, make sense. I will try to handle in another PR so, I can handle unit-test failures related to this change separately.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #9351: [pulsar-broker] namespace resources use metadata-store api

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #9351:
URL: https://github.com/apache/pulsar/pull/9351#discussion_r571317435



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -418,7 +382,7 @@ protected Policies getNamespacePolicies(NamespaceName namespaceName) {
         try {
             final String namespace = namespaceName.toString();
             final String policyPath = AdminResource.path(POLICIES, namespace);
-            Policies policies = policiesCache().get(policyPath)
+            Policies policies = namespaceResources().get(policyPath)

Review comment:
       nit: ideally namespace resources should take a namespace name and internally resolve the path on metadata store.

##########
File path: pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
##########
@@ -118,7 +120,7 @@ private MetadataCacheImpl(MetadataStore store, MetadataSerde<T> serde) {
 
     @Override
     public CompletableFuture<Void> readModifyUpdateOrCreate(String path, Function<Optional<T>, T> modifyFunction) {
-        return objCache.get(path)
+        return executeWithRetry(() -> objCache.get(path)

Review comment:
       This part is already in master, can you merge or rebase?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
##########
@@ -1021,4 +1026,75 @@ protected void validateBrokerName(String broker) throws MalformedURLException {
             throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
         }
     }
+
+    /*
+     * Get the list of namespaces (on every cluster) for a given property.
+     *
+     * @param property the property name
+     * @return the list of namespaces
+     */
+    protected List<String> getListOfNamespaces(String property) throws Exception {

Review comment:
       ```suggestion
       protected List<String> getListOfNamespaces(String tenant) throws Exception {
   ```

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
##########
@@ -132,20 +131,18 @@ protected void internalCreateNamespace(Policies policies) {
 
         try {
             int maxNamespacesPerTenant = pulsar().getConfiguration().getMaxNamespacesPerTenant();
-            //no distributed locks are added here.In a concurrent scenario, the threshold will be exceeded.
+            // no distributed locks are added here.In a concurrent scenario, the threshold will be exceeded.
             if (maxNamespacesPerTenant > 0) {
                 List<String> namespaces = getListOfNamespaces(namespaceName.getTenant());
                 if (namespaces != null && namespaces.size() > maxNamespacesPerTenant) {
                     throw new RestException(Status.PRECONDITION_FAILED,
                             "Exceed the maximum number of namespace in tenant :" + namespaceName.getTenant());
                 }
             }
-            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
-
-            zkCreateOptimistic(path(POLICIES, namespaceName.toString()), jsonMapper().writeValueAsBytes(policies));
+            namespaceResources().create(path(POLICIES, namespaceName.toString()), policies);
             log.info("[{}] Created namespace {}", clientAppId(), namespaceName);
-        } catch (KeeperException.NodeExistsException e) {
-            log.warn("[{}] Failed to create namespace {} - already exists", clientAppId(), namespaceName);
+        } catch (NotFoundException e) {
+            log.error("[{}] namespace already exists {}", clientAppId(), namespaceName, e);

Review comment:
       The exception and the log message seem to have diverged now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia merged pull request #9351: [pulsar-broker] namespace resources use metadata-store api

Posted by GitBox <gi...@apache.org>.
rdhabalia merged pull request #9351:
URL: https://github.com/apache/pulsar/pull/9351


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org