You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/22 20:32:14 UTC
[pulsar] branch master updated: handle SubscriptionBusyException in
resetCursor api (#7335)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new aeee10f handle SubscriptionBusyException in resetCursor api (#7335)
aeee10f is described below
commit aeee10fda99b0be57f9e937b472aca9c118a1e8a
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Mon Jun 22 15:32:00 2020 -0500
handle SubscriptionBusyException in resetCursor api (#7335)
---
.../broker/admin/impl/PersistentTopicsBase.java | 6 ++
.../broker/service/SubscriptionSeekTest.java | 70 ++++++++++++++++++++++
2 files changed, 76 insertions(+)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 3f350d9..cdbca45 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1768,6 +1768,9 @@ public class PersistentTopicsBase extends AdminResource {
"Unable to find position for position specified: " + t.getMessage()));
} else if (e instanceof WebApplicationException) {
asyncResponse.resume(e);
+ } else if (t instanceof SubscriptionBusyException) {
+ asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
+ "Failed for Subscription Busy: " + t.getMessage()));
} else {
asyncResponse.resume(new RestException(e));
}
@@ -1811,6 +1814,9 @@ public class PersistentTopicsBase extends AdminResource {
} else if (t instanceof SubscriptionInvalidCursorPosition) {
throw new RestException(Status.PRECONDITION_FAILED,
"Unable to find position for position specified: " + t.getMessage());
+ } else if (t instanceof SubscriptionBusyException) {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Failed for SubscriptionBusy: " + t.getMessage());
} else {
throw new RestException(e);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index 54afc0e..0f5986d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -24,18 +24,22 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.testng.annotations.AfterClass;
@@ -44,6 +48,7 @@ import org.testng.annotations.Test;
/**
*/
+@Slf4j
public class SubscriptionSeekTest extends BrokerTestBase {
@BeforeClass
@Override
@@ -93,6 +98,71 @@ public class SubscriptionSeekTest extends BrokerTestBase {
Thread.sleep(500);
consumer.seek(messageIds.get(5));
assertEquals(sub.getNumberOfEntriesInBacklog(false), 5);
+
+ MessageIdImpl messageId = (MessageIdImpl) messageIds.get(5);
+ MessageIdImpl beforeEarliest = new MessageIdImpl(
+ messageId.getLedgerId() - 1, messageId.getEntryId(), messageId.getPartitionIndex());
+ MessageIdImpl afterLatest = new MessageIdImpl(
+ messageId.getLedgerId() + 1, messageId.getEntryId(), messageId.getPartitionIndex());
+
+ log.info("MessageId {}: beforeEarliest: {}, afterLatest: {}", messageId, beforeEarliest, afterLatest);
+
+ Thread.sleep(500);
+ consumer.seek(beforeEarliest);
+ assertEquals(sub.getNumberOfEntriesInBacklog(false), 10);
+
+ Thread.sleep(500);
+ consumer.seek(afterLatest);
+ assertEquals(sub.getNumberOfEntriesInBacklog(false), 0);
+ }
+
+ @Test
+ public void testConcurrentResetCursor() throws Exception {
+ final String topicName = "persistent://prop/use/ns-abc/testConcurrentReset_" + System.currentTimeMillis();
+ final String subscriptionName = "test-sub-name";
+
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+
+ admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest);
+
+ PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
+ assertNotNull(topicRef);
+ assertEquals(topicRef.getProducers().size(), 1);
+
+ List<MessageId> messageIds = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ String message = "my-message-" + i;
+ MessageId msgId = producer.send(message.getBytes());
+ messageIds.add(msgId);
+ }
+
+ List<PulsarAdminException> exceptions = Lists.newLinkedList();
+ class ResetCursorThread extends Thread {
+ public void run() {
+ try {
+ admin.topics().resetCursor(topicName, subscriptionName, messageIds.get(3));
+ } catch (PulsarAdminException e) {
+ exceptions.add(e);
+ }
+ }
+ }
+
+ List<ResetCursorThread> resetCursorThreads = Lists.newLinkedList();
+ for (int i = 0; i < 4; i ++) {
+ ResetCursorThread thread = new ResetCursorThread();
+ resetCursorThreads.add(thread);
+ }
+ for (int i = 0; i < 4; i ++) {
+ resetCursorThreads.get(i).start();
+ }
+ for (int i = 0; i < 4; i ++) {
+ resetCursorThreads.get(i).join();
+ }
+
+ for (int i = 0; i < exceptions.size(); i++) {
+ log.error("Meet Exception", exceptions.get(i));
+ assertTrue(exceptions.get(i).getMessage().contains("Failed to fence subscription"));
+ }
}
@Test