You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/02/15 12:41:20 UTC

[GitHub] [pulsar] eolivelli commented on a change in pull request #9485: [pulsar-broker] topic resources use metadata-store api

eolivelli commented on a change in pull request #9485:
URL: https://github.com/apache/pulsar/pull/9485#discussion_r576162558



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -737,42 +701,33 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole
                     return;
                 }
             }
-
             // Only tries to delete the znode for partitioned topic when all its partitions are successfully deleted
             String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(),
                     topicName.getEncodedLocalName());
-
-            globalZk().delete(path, -1, (rc, s, o) -> {
-                if (KeeperException.Code.OK.intValue() == rc) {
-                    try {
-                        globalZkCache().invalidate(path);
-                        globalZk().sync(path, (rc2, s2, ctx) -> {
-                            if (KeeperException.Code.OK.intValue() == rc2) {
-                                log.info("[{}] Deleted partitioned topic {}", clientAppId(), topicName);
-                                asyncResponse.resume(Response.noContent().build());
-                            } else {
-                                log.error("[{}] Failed to delete partitioned topic {}", clientAppId(),
-                                        topicName, KeeperException.create(KeeperException.Code.get(rc2)));
-                                asyncResponse.resume(new RestException(
-                                        KeeperException.create(KeeperException.Code.get(rc2))));
-                            }
-                        }, null);
-                    } catch (Exception e) {
-                        log.error("Failed to delete partitioned topic.", e);
-                        asyncResponse.resume(new RestException(e));
+            try {
+                namespaceResources().getPartitionedTopicResouces().deleteAsync(path).thenAccept(r2 -> {

Review comment:
       is this a typo?
   `getPartitionedTopicResouces` -> `getPartitionedTopicResources`

##########
File path: pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
##########
@@ -494,7 +494,11 @@ public void stop() throws Exception {
 
         LOG.debug("Local ZK/BK stopping ...");
         for (BookieServer bookie : bs) {
-            bookie.shutdown();
+            try {
+                bookie.shutdown();
+            } catch (Exception e) {
+                LOG.warn("failed to shutdown bookie", e);

Review comment:
       in case of InterruptedException we should set `Thread.currentThead().interrupt();`

##########
File path: pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
##########
@@ -30,19 +32,19 @@
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 public class PulsarClientToolTest extends BrokerTestBase {
 
-    @BeforeClass
+    @BeforeMethod
     @Override
     public void setup() throws Exception {
         super.internalSetup();
     }
 
-    @AfterClass
+    @AfterMethod

Review comment:
       don't forget "alwaysRun = true", otherwise resources won't be disposed in case of test failure

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -247,29 +241,31 @@ public void validatePoliciesReadOnlyAccess() {
 
     private CompletableFuture<Void> tryCreatePartitionAsync(final int partition, CompletableFuture<Void> reuseFuture) {
         CompletableFuture<Void> result = reuseFuture == null ? new CompletableFuture<>() : reuseFuture;
-        zkCreateOptimisticAsync(localZk(),
-                ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0],
-                (rc, s, o, s1) -> {
-                    if (KeeperException.Code.OK.intValue() == rc) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] Topic partition {} created.", clientAppId(),
-                                    topicName.getPartition(partition));
-                        }
-                        result.complete(null);
-                    } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
+        namespaceResources().getLocalStore()
+                .put(ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0], Optional.of(-1L))
+                .thenAccept(r -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Topic partition {} created.", clientAppId(), topicName.getPartition(partition));
+                    }
+                    result.complete(null);
+                }).exceptionally(ex -> {
+                    if (ex.getCause() instanceof AlreadyExistsException) {
                         log.info("[{}] Topic partition {} is exists, doing nothing.", clientAppId(),
                                 topicName.getPartition(partition));
                         result.complete(null);
-                } else if (KeeperException.Code.BADVERSION.intValue() == rc) {
-                    log.warn("[{}] Fail to create topic partition {} with concurrent modification, retry now.",
-                            clientAppId(), topicName.getPartition(partition));
-                    tryCreatePartitionAsync(partition, result);
-                } else {
-                    log.error("[{}] Fail to create topic partition {}", clientAppId(),
-                        topicName.getPartition(partition), KeeperException.create(KeeperException.Code.get(rc)));
-                    result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
-                }
-        });
+                    } else if (ex.getCause() instanceof BadVersionException) {
+                        log.warn("[{}] Fail to create topic partition {} with concurrent modification, retry now.",

Review comment:
       Is this message still valid ?
   It looks like we are completing the operation with success




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org