You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/02/05 14:53:27 UTC

[GitHub] [pulsar] eolivelli commented on a change in pull request #9351: [pulsar-broker] namespace resources use metadata-store api

eolivelli commented on a change in pull request #9351:
URL: https://github.com/apache/pulsar/pull/9351#discussion_r571019653



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
##########
@@ -856,172 +764,52 @@ protected void internalSetSubscriptionExpirationTime(int expirationTime) {
         if (expirationTime < 0) {
             throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for subscription expiration time");
         }
-
-        Entry<Policies, Stat> policiesNode = null;
-
-        try {
-            // Force to read the data s.t. the watch to the cache content is setup.
-            policiesNode = policiesCache().getWithStat(path(POLICIES, namespaceName.toString())).orElseThrow(
-                    () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
-            policiesNode.getKey().subscription_expiration_time_minutes = expirationTime;
-
-            // Write back the new policies into zookeeper
-            globalZk().setData(path(POLICIES, namespaceName.toString()),
-                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
-            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
-
-            log.info("[{}] Successfully updated the subscription expiration time on namespace {}", clientAppId(),
-                    namespaceName);
-        } catch (KeeperException.NoNodeException e) {
-            log.warn("[{}] Failed to update the subscription expiration time for namespace {}: does not exist",
-                    clientAppId(), namespaceName);
-            throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
-        } catch (KeeperException.BadVersionException e) {
-            log.warn(
-                    "[{}] Failed to update the subscription expiration time on"
-                            + " namespace {} expected policy node version={} : concurrent modification",
-                    clientAppId(), namespaceName, policiesNode.getValue().getVersion());
-            throw new RestException(Status.CONFLICT, "Concurrent modification");
-        } catch (Exception e) {
-            log.error("[{}] Failed to update the subscription expiration time on namespace {}", clientAppId(),
-                    namespaceName, e);
-            throw new RestException(e);
-        }
+        updatePolicies(path(POLICIES, namespaceName.toString()), (policies) -> {
+            policies.subscription_expiration_time_minutes = expirationTime;
+            return policies;
+        });
     }
 
     protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse,
                                                 AutoTopicCreationOverride autoTopicCreationOverride) {
         final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
         validateAdminAccessForTenant(namespaceName.getTenant());
         validatePoliciesReadOnlyAccess();
-
-        if (!AutoTopicCreationOverride.isValidOverride(autoTopicCreationOverride)) {
-            throw new RestException(Status.PRECONDITION_FAILED, "Invalid configuration for autoTopicCreationOverride");
-        }
-        if (maxPartitions > 0 && autoTopicCreationOverride.defaultNumPartitions > maxPartitions) {
-            throw new RestException(Status.NOT_ACCEPTABLE,
-                    "Number of partitions should be less than or equal to " + maxPartitions);
+        if (autoTopicCreationOverride != null) {

Review comment:
       this `if` statement was not present before, can it be `null` here ?

##########
File path: pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
##########
@@ -253,4 +255,23 @@ public void accept(Notification t) {
             break;
         }
     }
+
+    private CompletableFuture<Void> executeWithRetry(Supplier<CompletableFuture<Void>> op, String key) {
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        op.get().thenAccept(r -> result.complete(null)).exceptionally((ex) -> {
+            if (ex.getCause() instanceof BadVersionException) {
+                // if resource is updated by other than metadata-cache then metadata-cache will get bad-version
+                // exception. so, try to invalidate the cache and try one more time.
+                objCache.synchronous().invalidate(key);

Review comment:
       can we call here this `synchronous` operation ?
   do we risk a deadlock ?

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
##########
@@ -2041,4 +2044,9 @@ public void testCompactionStatus() throws Exception {
         assertTrue(admin.topics().compactionStatus(topicName)
             .lastError.contains("Failed at something"));
     }
+
+    private void clearCache() {
+        // (MetadataCacheImpl<ClusterData>) pulsar.getPulsarResources().

Review comment:
       nit: remove comment




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org