You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/08/12 17:02:17 UTC

[GitHub] [pulsar] kimcs opened a new issue #4941: Unable to delete topic when both consumer (subscription) and reader has been connected

kimcs opened a new issue #4941: Unable to delete topic when both consumer (subscription) and reader has been connected
URL: https://github.com/apache/pulsar/issues/4941
 
 
   **Describe the bug**
   After connecting and closing one subscription and one reader it's not possible to delete a topic through the java admin client. The bug will not manifest if either only subscriptions or only readers are connected, only if a combination of them are.
   
   **To Reproduce**
   Steps to reproduce the behavior:
   1. Start a local standalone pulsar and make sure the admin and broker ports in the test code below match
   1. Run the following java code to reproduce the error:
   ```java
   package sample;
   
   import org.apache.pulsar.client.admin.PulsarAdmin;
   import org.apache.pulsar.client.admin.PulsarAdminException;
   import org.apache.pulsar.client.api.Consumer;
   import org.apache.pulsar.client.api.MessageId;
   import org.apache.pulsar.client.api.PulsarClient;
   import org.apache.pulsar.client.api.Reader;
   import org.apache.pulsar.client.api.SubscriptionInitialPosition;
   import org.apache.pulsar.client.api.SubscriptionType;
   import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
   import org.apache.pulsar.common.policies.data.TenantInfo;
   import org.testng.annotations.Test;
   
   import java.util.Collections;
   import java.util.LinkedHashSet;
   import java.util.List;
   import java.util.Set;
   import java.util.concurrent.TimeUnit;
   
   public class ConsumerAndReaderCombinationTest {
   
       void deleteTenantIfPresent(PulsarAdmin admin, String tenant) throws PulsarAdminException {
           if (admin.tenants().getTenants().contains(tenant)) {
               for (String namespace : admin.namespaces().getNamespaces(tenant)) {
                   List<String> existingTopics = admin.namespaces().getTopics(namespace);
                   for (String topic : existingTopics) {
                       admin.topics().delete(topic);
                   }
                   admin.namespaces().deleteNamespace(namespace);
               }
               admin.tenants().deleteTenant(tenant);
           }
       }
   
       @Test
       public void provokeBug() throws Exception {
           final String adminUrl = "http://localhost:8080";
           final String brokerUrl = "pulsar://localhost:6650";
   
           final String tenant = "delete-topic-test-magic48295";
           final String namespace = "ns";
           final String topic = "the-topic";
           final String qualifiedTopic = "persistent://" + tenant + "/" + namespace + "/" + topic;
   
           /*
            * Ensure a clean tenant
            */
           try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).authentication(new AuthenticationDisabled()).build()) {
               deleteTenantIfPresent(admin, tenant);
               Set<String> allowedClusters = new LinkedHashSet<>();
               allowedClusters.add("standalone");
               admin.tenants().createTenant(tenant, new TenantInfo(Collections.emptySet(), allowedClusters));
               admin.namespaces().createNamespace(tenant + "/" + namespace);
           }
   
           /*
            * Open and close both a consumer on a subscription and a reader on the same topic
            */
           try (PulsarClient client = PulsarClient.builder().serviceUrl(brokerUrl).build()) {
               try (Consumer<byte[]> consumer = client.newConsumer()
                       .topic(qualifiedTopic)
                       .subscriptionType(SubscriptionType.Exclusive)
                       .consumerName("testng")
                       .subscriptionName("s1")
                       .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                       .subscribe()) {
                   // no need to do anything with the consumer, just close it again
               }
               try (Reader<byte[]> reader = client.newReader().topic(qualifiedTopic).startMessageId(MessageId.earliest).create()) {
                   // no need to do anything with the reader, just close it again
               }
           }
   
           try (PulsarAdmin admin = PulsarAdmin.builder()
                   .serviceHttpUrl(adminUrl)
                   .readTimeout(3, TimeUnit.SECONDS)
                   .authentication(new AuthenticationDisabled())
                   .build()) {
   
               /*
                * When bug is present, this call will throw a TimeoutException (read-timeout) after 3 seconds
                */
               admin.topics().delete(qualifiedTopic);
           }
       }
   }
   ```
   1. Observe lines similiar to the following in the broker log.
   ```
   16:48:45.579 [pulsar-web-57-3] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://delete-topic-test-magic48295/ns/the-topic][s1] Unsubscribing
   16:48:45.579 [pulsar-web-57-3] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://delete-topic-test-magic48295/ns/the-topic][s1] Successfully closed subscription [ManagedCursorImpl{ledger=delete-topic-test-magic48295/ns/persistent/the-topic, name=s1, ackPos=10:-1, readPos=10:0}]
   16:48:45.583 [pulsar-web-57-3] INFO  org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper - [delete-topic-test-magic48295/ns/persistent/the-topic] Remove consumer=s1
   16:48:45.592 [ProcessThread(sid:0 cport:2181):] INFO  org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x10000f989b00009 type:delete cxid:0xe3 zxid:0xba txntype:-1 reqpath:n/a Error Path:/ledgers/00/0000 Error:KeeperErrorCode = Directory not empty for /ledgers/00/0000
   16:48:45.591 [bookkeeper-ml-workers-OrderedExecutor-2-0] ERROR org.apache.bookkeeper.common.util.SafeRunnable - Unexpected throwable caught 
   java.lang.IllegalArgumentException: inconsistent range
   	at java.util.concurrent.ConcurrentSkipListMap$SubMap.<init>(ConcurrentSkipListMap.java:2620) ~[?:1.8.0_212]
   	at java.util.concurrent.ConcurrentSkipListMap.subMap(ConcurrentSkipListMap.java:2078) ~[?:1.8.0_212]
   	at org.apache.bookkeeper.mledger.util.RangeCache.removeRange(RangeCache.java:132) ~[org.apache.pulsar-managed-ledger-original-2.4.0.jar:2.4.0]
   	at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.invalidateEntries(EntryCacheImpl.java:151) ~[org.apache.pulsar-managed-ledger-original-2.4.0.jar:2.4.0]
   	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$7.operationComplete(ManagedLedgerImpl.java:776) ~[org.apache.pulsar-managed-ledger-original-2.4.0.jar:2.4.0]
   	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$7.operationComplete(ManagedLedgerImpl.java:764) ~[org.apache.pulsar-managed-ledger-original-2.4.0.jar:2.4.0]
   	at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$13(MetaStoreImplZookeeper.java:308) ~[org.apache.pulsar-managed-ledger-original-2.4.0.jar:2.4.0]
   	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[org.apache.pulsar-managed-ledger-original-2.4.0.jar:2.4.0]
   	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.9.2.jar:4.9.2]
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_212]
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_212]
   	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
   	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]
   16:48:45.610 [BookKeeperClientWorker-OrderedExecutor-5-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [delete-topic-test-magic48295/ns/persistent/the-topic][s1] Deleted cursor ledger 11
   16:48:47.557 [pulsar-web-57-11] INFO  org.eclipse.jetty.server.RequestLog - 172.17.0.2 - - [12/Aug/2019:16:48:47 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats HTTP/1.1" 200 813 "-" "Pulsar-Java-v2.4.0" 5
   ```
   **Expected behavior**
   it's always expected to be possible to delete a topic that has no connected clients.
   
   **Desktop (please complete the following information):**
    - OS: macOS
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services