You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/18 15:57:04 UTC

[pulsar] 10/27: makes subscription start from MessageId.latest defaultly (#9444)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a62d34abeb0975e66df28fb30ea59f45fd170883
Author: Aloys <lo...@gmail.com>
AuthorDate: Tue Feb 9 16:18:44 2021 +0800

    makes subscription start from MessageId.latest defaultly (#9444)
    
    ### Motivation
    Currently , if we did not specify the position when create a new subscription, it's will use the earliest position defaulty. https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java#L2002
    This PR makes subscription start from MessageId.latest defaultly
    
    (cherry picked from commit 28b20946107bd4389129e40c44da1e99b37e65a5)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  2 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 67 ++++++++++++++++++++++
 2 files changed, 68 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 16a91f3..d957797 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1894,7 +1894,7 @@ public class PersistentTopicsBase extends AdminResource {
                 return;
             }
         }
-        final MessageIdImpl targetMessageId = messageId == null ? (MessageIdImpl) MessageId.earliest : messageId;
+        final MessageIdImpl targetMessageId = messageId == null ? (MessageIdImpl) MessageId.latest : messageId;
         log.info("[{}][{}] Creating subscription {} at message id {}", clientAppId(), topicName, subscriptionName,
                 targetMessageId);
         // If the topic name is a partition name, no need to get partition topic metadata again
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 0c787ee..c31b51a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -34,9 +34,11 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
@@ -44,6 +46,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
 import org.apache.zookeeper.KeeperException;
 import org.mockito.ArgumentCaptor;
@@ -208,6 +211,70 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
     }
 
+
+
+    @Test
+    public void testCreateSubscriptions() throws Exception{
+        final int numberOfMessages = 5;
+        final String SUB_EARLIEST = "sub-earliest";
+        final String SUB_LATEST = "sub-latest";
+        final String SUB_NONE_MESSAGE_ID = "sub-none-message-id";
+
+        String testLocalTopicName = "subWithPositionOrNot";
+        final String topicName = "persistent://" + testTenant + "/" + testNamespace + "/" + testLocalTopicName;
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName)
+                .maxPendingMessages(30000).create();
+
+        // 1) produce numberOfMessages message to pulsar
+        for (int i = 0; i < numberOfMessages; i++) {
+            System.out.println(producer.send(new byte[10]));
+        }
+
+        // 2) Create a subscription from earliest position
+
+        AsyncResponse response = mock(AsyncResponse.class);
+        persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, SUB_EARLIEST, true,
+                (MessageIdImpl) MessageId.earliest, false);
+        ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+
+        TopicStats topicStats = persistentTopics.getStats(testTenant, testNamespace, testLocalTopicName, true, true);
+        long msgBacklog = topicStats.subscriptions.get(SUB_EARLIEST).msgBacklog;
+        System.out.println("Message back log for " + SUB_EARLIEST + " is :" + msgBacklog);
+        Assert.assertEquals(msgBacklog, numberOfMessages);
+
+        // 3) Create a subscription with form latest position
+
+        response = mock(AsyncResponse.class);
+        persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, SUB_LATEST, true,
+                (MessageIdImpl) MessageId.latest, false);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+        topicStats = persistentTopics.getStats(testTenant, testNamespace, testLocalTopicName, true, true);
+        msgBacklog = topicStats.subscriptions.get(SUB_LATEST).msgBacklog;
+        System.out.println("Message back log for " + SUB_LATEST + " is :" + msgBacklog);
+        Assert.assertEquals(msgBacklog, 0);
+
+        // 4) Create a subscription without position
+
+        response = mock(AsyncResponse.class);
+        persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, SUB_NONE_MESSAGE_ID, true,
+                null, false);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+        topicStats = persistentTopics.getStats(testTenant, testNamespace, testLocalTopicName, true, true);
+        msgBacklog = topicStats.subscriptions.get(SUB_NONE_MESSAGE_ID).msgBacklog;
+        System.out.println("Message back log for " + SUB_NONE_MESSAGE_ID + " is :" + msgBacklog);
+        Assert.assertEquals(msgBacklog, 0);
+
+        producer.close();
+    }
+
     @Test
     public void testCreateSubscriptionForNonPersistentTopic() throws InterruptedException {
         doReturn(TopicDomain.non_persistent.value()).when(persistentTopics).domain();