You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/22 20:32:14 UTC

[pulsar] branch master updated: handle SubscriptionBusyException in resetCursor api (#7335)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new aeee10f  handle SubscriptionBusyException in resetCursor api (#7335)
aeee10f is described below

commit aeee10fda99b0be57f9e937b472aca9c118a1e8a
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Mon Jun 22 15:32:00 2020 -0500

    handle SubscriptionBusyException in resetCursor api (#7335)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  6 ++
 .../broker/service/SubscriptionSeekTest.java       | 70 ++++++++++++++++++++++
 2 files changed, 76 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 3f350d9..cdbca45 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1768,6 +1768,9 @@ public class PersistentTopicsBase extends AdminResource {
                     "Unable to find position for position specified: " + t.getMessage()));
             } else if (e instanceof WebApplicationException) {
                 asyncResponse.resume(e);
+            } else if (t instanceof SubscriptionBusyException) {
+                asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
+                        "Failed for Subscription Busy: " + t.getMessage()));
             } else {
                 asyncResponse.resume(new RestException(e));
             }
@@ -1811,6 +1814,9 @@ public class PersistentTopicsBase extends AdminResource {
                 } else if (t instanceof SubscriptionInvalidCursorPosition) {
                     throw new RestException(Status.PRECONDITION_FAILED,
                             "Unable to find position for position specified: " + t.getMessage());
+                } else if (t instanceof SubscriptionBusyException) {
+                    throw new RestException(Status.PRECONDITION_FAILED,
+                            "Failed for SubscriptionBusy: " + t.getMessage());
                 } else {
                     throw new RestException(e);
                 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index 54afc0e..0f5986d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -24,18 +24,22 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.RelativeTimeUtil;
 import org.testng.annotations.AfterClass;
@@ -44,6 +48,7 @@ import org.testng.annotations.Test;
 
 /**
  */
+@Slf4j
 public class SubscriptionSeekTest extends BrokerTestBase {
     @BeforeClass
     @Override
@@ -93,6 +98,71 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         Thread.sleep(500);
         consumer.seek(messageIds.get(5));
         assertEquals(sub.getNumberOfEntriesInBacklog(false), 5);
+
+        MessageIdImpl messageId = (MessageIdImpl) messageIds.get(5);
+        MessageIdImpl beforeEarliest = new MessageIdImpl(
+                messageId.getLedgerId() - 1, messageId.getEntryId(), messageId.getPartitionIndex());
+        MessageIdImpl afterLatest = new MessageIdImpl(
+                messageId.getLedgerId() + 1, messageId.getEntryId(), messageId.getPartitionIndex());
+
+        log.info("MessageId {}: beforeEarliest: {}, afterLatest: {}", messageId, beforeEarliest, afterLatest);
+
+        Thread.sleep(500);
+        consumer.seek(beforeEarliest);
+        assertEquals(sub.getNumberOfEntriesInBacklog(false), 10);
+
+        Thread.sleep(500);
+        consumer.seek(afterLatest);
+        assertEquals(sub.getNumberOfEntriesInBacklog(false), 0);
+    }
+
+    @Test
+    public void testConcurrentResetCursor() throws Exception {
+        final String topicName = "persistent://prop/use/ns-abc/testConcurrentReset_" + System.currentTimeMillis();
+        final String subscriptionName = "test-sub-name";
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+
+        admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest);
+
+        PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
+        assertNotNull(topicRef);
+        assertEquals(topicRef.getProducers().size(), 1);
+
+        List<MessageId> messageIds = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            MessageId msgId = producer.send(message.getBytes());
+            messageIds.add(msgId);
+        }
+
+        List<PulsarAdminException> exceptions = Lists.newLinkedList();
+        class ResetCursorThread extends Thread {
+            public void run() {
+                try {
+                    admin.topics().resetCursor(topicName, subscriptionName, messageIds.get(3));
+                } catch (PulsarAdminException e) {
+                    exceptions.add(e);
+                }
+            }
+        }
+
+        List<ResetCursorThread> resetCursorThreads = Lists.newLinkedList();
+        for (int i = 0; i < 4; i ++) {
+            ResetCursorThread thread = new ResetCursorThread();
+            resetCursorThreads.add(thread);
+        }
+        for (int i = 0; i < 4; i ++) {
+            resetCursorThreads.get(i).start();
+        }
+        for (int i = 0; i < 4; i ++) {
+            resetCursorThreads.get(i).join();
+        }
+
+        for (int i = 0; i < exceptions.size(); i++) {
+            log.error("Meet Exception", exceptions.get(i));
+            assertTrue(exceptions.get(i).getMessage().contains("Failed to fence subscription"));
+        }
     }
 
     @Test