You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/01/31 17:11:53 UTC

[GitHub] merlimat closed pull request #1151: Added REST handler to create a subscription on a topic

merlimat closed pull request #1151: Added REST handler to create a subscription on a topic
URL: https://github.com/apache/incubator-pulsar/pull/1151
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
index b7361b027..0e0464b90 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
@@ -24,14 +24,12 @@
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.time.Instant;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
@@ -66,6 +64,7 @@
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
@@ -77,6 +76,7 @@
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
 import org.apache.pulsar.client.api.MessageId;
@@ -106,6 +106,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.zafarkhaja.semver.Version;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -116,7 +117,6 @@
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
-import com.github.zafarkhaja.semver.Version;
 
 /**
  */
@@ -615,7 +615,7 @@ public void deleteTopic(@PathParam("property") String property, @PathParam("clus
         DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination);
         if (cluster.equals(Namespaces.GLOBAL_CLUSTER)) {
             validateGlobalNamespaceOwnership(NamespaceName.get(property, cluster, namespace));
-        } 
+        }
         List<String> subscriptions = Lists.newArrayList();
         PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(property, cluster, namespace,
                 destination, authoritative);
@@ -656,7 +656,7 @@ public PersistentTopicStats getStats(@PathParam("property") String property, @Pa
         validateAdminAndClientPermission(dn);
         if (cluster.equals(Namespaces.GLOBAL_CLUSTER)) {
             validateGlobalNamespaceOwnership(NamespaceName.get(property, cluster, namespace));
-        } 
+        }
         validateDestinationOwnership(dn, authoritative);
         Topic topic = getTopicReference(dn);
         return topic.getStats();
@@ -676,7 +676,7 @@ public PersistentTopicInternalStats getInternalStats(@PathParam("property") Stri
         validateAdminAndClientPermission(dn);
         if (cluster.equals(Namespaces.GLOBAL_CLUSTER)) {
             validateGlobalNamespaceOwnership(NamespaceName.get(property, cluster, namespace));
-        } 
+        }
         validateDestinationOwnership(dn, authoritative);
         Topic topic = getTopicReference(dn);
         return topic.getInternalStats();
@@ -1024,6 +1024,67 @@ public void resetCursor(@PathParam("property") String property, @PathParam("clus
         }
     }
 
+    @PUT
+    @Path("/{property}/{cluster}/{namespace}/{destination}/subscription/{subscriptionName}")
+    @ApiOperation(value = "Reset subscription to message position closest to given position.", notes = "Creates a subscription on the topic at the specified message id")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
+            @ApiResponse(code = 405, message = "Not supported for partitioned topics") })
+    public void createSubscription(@PathParam("property") String property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination,
+            @PathParam("subscriptionName") String subscriptionName,
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) throws PulsarServerException {
+        destination = decode(destination);
+        DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination);
+        if (cluster.equals(Namespaces.GLOBAL_CLUSTER)) {
+            validateGlobalNamespaceOwnership(NamespaceName.get(property, cluster, namespace));
+        }
+        log.info("[{}][{}] Creating subscription {} at message id {}", clientAppId(), destination,
+                subscriptionName, messageId);
+
+        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(property, cluster, namespace,
+                destination, authoritative);
+
+        try {
+            if (partitionMetadata.partitions > 0) {
+                // Create the subscription on each partition
+                List<CompletableFuture<Void>> futures = Lists.newArrayList();
+                PulsarAdmin admin = pulsar().getAdminClient();
+
+                for (int i = 0; i < partitionMetadata.partitions; i++) {
+                    futures.add(admin.persistentTopics().createSubscriptionAsync(dn.getPartition(i).toString(),
+                            subscriptionName, messageId));
+                }
+
+                FutureUtil.waitForAll(futures).join();
+            } else {
+                validateAdminOperationOnDestination(dn, authoritative);
+
+                PersistentTopic topic = (PersistentTopic) getOrCreateTopic(dn);
+
+                if (topic.getSubscriptions().containsKey(subscriptionName)) {
+                    throw new RestException(Status.CONFLICT, "Subscription already exists for topic");
+                }
+
+                PersistentSubscription subscription = (PersistentSubscription) topic
+                        .createSubscription(subscriptionName).get();
+                subscription.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get();
+                log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(), dn,
+                        subscriptionName, messageId);
+            }
+        } catch (Exception e) {
+            Throwable t = e.getCause();
+            log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), dn, subscriptionName,
+                    messageId, e);
+            if (t instanceof SubscriptionInvalidCursorPosition) {
+                throw new RestException(Status.PRECONDITION_FAILED,
+                        "Unable to find position for position specified: " + t.getMessage());
+            } else {
+                throw new RestException(e);
+            }
+        }
+    }
+
     @POST
     @Path("/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/resetcursor")
     @ApiOperation(value = "Reset subscription to message position closest to given position.", notes = "It fence cursor and disconnects all active consumers before reseting cursor.")
@@ -1324,10 +1385,10 @@ public void expireMessages(String property, String cluster, String namespace, St
                         dn.toString(), ex.getMessage(), ex);
                 throw ex;
             }
-            
+
             String path = path(PARTITIONED_TOPIC_PATH_ZNODE, dn.getProperty(), dn.getCluster(),
                     dn.getNamespacePortion(), "persistent", dn.getEncodedLocalName());
-            
+
             // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
             // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
             // producer/consumer
@@ -1361,6 +1422,14 @@ private Topic getTopicReference(DestinationName dn) {
         }
     }
 
+    private Topic getOrCreateTopic(DestinationName dn) {
+        try {
+            return pulsar().getBrokerService().getTopic(dn.toString()).get();
+        } catch (InterruptedException | ExecutionException e) {
+           throw new RestException(e);
+        }
+    }
+
     /**
      * Get the Subscription object reference from the Topic reference
      */
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
new file mode 100644
index 000000000..6f759d8a5
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+import javax.ws.rs.ClientErrorException;
+import javax.ws.rs.core.Response.Status;
+
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+
+public class CreateSubscriptionTest extends MockedPulsarServiceBaseTest {
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        super.internalSetup();
+    }
+
+    @AfterMethod
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void createSubscriptionSingleTopic() throws Exception {
+        String topic = "persistent://prop-xyz/use/ns1/my-topic";
+        admin.persistentTopics().createSubscription(topic, "sub-1", MessageId.latest);
+
+        // Create should fail if the subscription already exists
+        try {
+            admin.persistentTopics().createSubscription(topic, "sub-1", MessageId.latest);
+            fail("Should have failed");
+        } catch (ConflictException e) {
+            assertEquals(((ClientErrorException) e.getCause()).getResponse().getStatus(),
+                    Status.CONFLICT.getStatusCode());
+        }
+
+        assertEquals(admin.persistentTopics().getSubscriptions(topic), Lists.newArrayList("sub-1"));
+
+        Producer p1 = pulsarClient.createProducer(topic);
+        p1.send("test-1".getBytes());
+        p1.send("test-2".getBytes());
+        MessageId m3 = p1.send("test-3".getBytes());
+
+        assertEquals(admin.persistentTopics().getStats(topic).subscriptions.get("sub-1").msgBacklog, 3);
+
+        admin.persistentTopics().createSubscription(topic, "sub-2", MessageId.latest);
+        assertEquals(admin.persistentTopics().getStats(topic).subscriptions.get("sub-2").msgBacklog, 0);
+
+        admin.persistentTopics().createSubscription(topic, "sub-3", MessageId.earliest);
+        assertEquals(admin.persistentTopics().getStats(topic).subscriptions.get("sub-3").msgBacklog, 3);
+
+        admin.persistentTopics().createSubscription(topic, "sub-5", m3);
+        assertEquals(admin.persistentTopics().getStats(topic).subscriptions.get("sub-5").msgBacklog, 1);
+    }
+
+    @Test
+    public void createSubscriptionOnPartitionedTopic() throws Exception {
+        String topic = "persistent://prop-xyz/use/ns1/my-partitioned-topic";
+        admin.persistentTopics().createPartitionedTopic(topic, 10);
+
+        admin.persistentTopics().createSubscription(topic, "sub-1", MessageId.latest);
+
+        // Create should fail if the subscription already exists
+        try {
+            admin.persistentTopics().createSubscription(topic, "sub-1", MessageId.latest);
+            fail("Should have failed");
+        } catch (Exception e) {
+            // Expected
+        }
+
+        for (int i = 0; i < 10; i++) {
+            assertEquals(
+                    admin.persistentTopics().getSubscriptions(DestinationName.get(topic).getPartition(i).toString()),
+                    Lists.newArrayList("sub-1"));
+        }
+    }
+}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java
index 093ae3e91..70642fed1 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java
@@ -196,7 +196,7 @@
      * @return a future that can be used to track when the partitioned topic is created
      */
     CompletableFuture<Void> createPartitionedTopicAsync(String destination, int numPartitions);
-    
+
     /**
      * Update number of partitions of a non-global partitioned topic.
      * <p>
@@ -208,7 +208,7 @@
      *            Destination name
      * @param numPartitions
      *            Number of new partitions of already exist partitioned-topic
-     * 
+     *
      * @return a future that can be used to track when the partitioned topic is updated
      */
     void updatePartitionedTopic(String destination, int numPartitions) throws PulsarAdminException;
@@ -224,7 +224,7 @@
      *            Destination name
      * @param numPartitions
      *            Number of new partitions of already exist partitioned-topic
-     * 
+     *
      * @return a future that can be used to track when the partitioned topic is updated
      */
     CompletableFuture<Void> updatePartitionedTopicAsync(String destination, int numPartitions);
@@ -311,7 +311,7 @@
      * @return a future that can be used to track when the topic is deleted
      */
     CompletableFuture<Void> deleteAsync(String destination);
-    
+
     /**
      * Unload a topic.
      * <p>
@@ -797,6 +797,42 @@
      */
     CompletableFuture<List<Message>> peekMessagesAsync(String destination, String subName, int numMessages);
 
+    /**
+     * Create a new subscription on a topic
+     *
+     * @param destination
+     *            Destination name
+     * @param subscriptionName
+     *            Subscription name
+     * @param messageId
+     *            The {@link MessageId} on where to initialize the subscription. It could be {@link MessageId#latest},
+     *            {@link MessageId#earliest} or a specific message id.
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws ConflictException
+     *             Subscription already exists
+     * @throws NotAllowedException
+     *             Command disallowed for requested resource
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void createSubscription(String destination, String subscriptionName, MessageId messageId)
+            throws PulsarAdminException;
+
+    /**
+     * Create a new subscription on a topic
+     *
+     * @param destination
+     *            Destination name
+     * @param subscriptionName
+     *            Subscription name
+     * @param messageId
+     *            The {@link MessageId} on where to initialize the subscription. It could be {@link MessageId#latest},
+     *            {@link MessageId#earliest} or a specific message id.
+     */
+    CompletableFuture<Void> createSubscriptionAsync(String destination, String subscriptionName, MessageId messageId);
+
     /**
      * Reset cursor position on a topic subscription
      *
@@ -829,7 +865,7 @@
      *            reset subscription to position closest to time in ms since epoch
      */
     CompletableFuture<Void> resetCursorAsync(String destination, String subName, long timestamp);
-    
+
     /**
      * Reset cursor position on a topic subscription
      *
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
index cc1d96c41..01ede00ee 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
@@ -181,7 +181,7 @@ public void updatePartitionedTopic(String destination, int numPartitions) throws
                 persistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("partitions"),
                 Entity.entity(numPartitions, MediaType.APPLICATION_JSON));
     }
-    
+
     @Override
     public PartitionedTopicMetadata getPartitionedTopicMetadata(String destination) throws PulsarAdminException {
         try {
@@ -586,8 +586,8 @@ public void failed(Throwable throwable) {
         peekMessagesAsync(destination, subName, numMessages, Lists.newArrayList(), future, 1);
         return future;
     }
-    
-    
+
+
     private void peekMessagesAsync(String destination, String subName, int numMessages,
             List<Message> messages, CompletableFuture<List<Message>> future, int nthMessage) {
         if (numMessages <= 0) {
@@ -598,8 +598,8 @@ private void peekMessagesAsync(String destination, String subName, int numMessag
         // if peeking first message succeeds, we know that the topic and subscription exists
         peekNthMessage(destination, subName, nthMessage).handle((r, ex) -> {
             if (ex != null) {
-                // if we get a not found exception, it means that the position for the message we are trying to get        
-                // does not exist. At this point, we can return the already found messages.       
+                // if we get a not found exception, it means that the position for the message we are trying to get
+                // does not exist. At this point, we can return the already found messages.
                 if (ex instanceof NotFoundException) {
                     log.warn("Exception '{}' occured while trying to peek Messages.", ex.getMessage());
                     future.complete(messages);
@@ -616,6 +616,28 @@ private void peekMessagesAsync(String destination, String subName, int numMessag
         });
     }
 
+    @Override
+    public void createSubscription(String destination, String subscriptionName, MessageId messageId)
+            throws PulsarAdminException {
+        try {
+            DestinationName ds = validateTopic(destination);
+            String encodedSubName = Codec.encode(subscriptionName);
+            request(persistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("subscription")
+                    .path(encodedSubName)).put(Entity.entity(messageId, MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> createSubscriptionAsync(String destination, String subscriptionName,
+            MessageId messageId) {
+        DestinationName ds = validateTopic(destination);
+        String encodedSubName = Codec.encode(subscriptionName);
+        return asyncPutRequest(persistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName())
+                .path("subscription").path(encodedSubName), Entity.entity(messageId, MediaType.APPLICATION_JSON));
+    }
+
     @Override
     public void resetCursor(String destination, String subName, long timestamp) throws PulsarAdminException {
         try {
@@ -769,7 +791,7 @@ private DestinationName validateTopic(String destination) {
                 log.error("Exception occured while trying to get BatchMsgId: {}", batchMsgId, ex);
             }
             buf.release();
-            singleMessageMetadataBuilder.recycle();          
+            singleMessageMetadataBuilder.recycle();
         }
         return ret;
     }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
index d7f7d51d8..5027e6959 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
@@ -62,6 +62,7 @@ public CmdPersistentTopics(PulsarAdmin admin) {
         jcommander.addCommand("unload", new UnloadCmd());
         jcommander.addCommand("subscriptions", new ListSubscriptions());
         jcommander.addCommand("unsubscribe", new DeleteSubscription());
+        jcommander.addCommand("create-subscription", new CreateSubscription());
         jcommander.addCommand("stats", new GetStats());
         jcommander.addCommand("stats-internal", new GetInternalStats());
         jcommander.addCommand("info-internal", new GetInternalInfo());
@@ -418,6 +419,35 @@ void run() throws PulsarAdminException {
         }
     }
 
+    @Parameters(commandDescription = "Create a new subscription on a topic")
+    private class CreateSubscription extends CliCommand {
+        @Parameter(description = "persistent://property/cluster/namespace/destination", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "-s",
+                "--subscription" }, description = "Subscription to reset position on", required = true)
+        private String subscriptionName;
+
+        @Parameter(names = { "--messageId",
+                "-m" }, description = "messageId where to create the subscription. It can be either 'latest', 'earliest' or (ledgerId:entryId)", required = false)
+        private String messageIdStr = "latest";
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            MessageId messageId;
+            if (messageIdStr.equals("latest")) {
+                messageId = MessageId.latest;
+            } else if (messageIdStr.equals("earliest")) {
+                messageId = MessageId.earliest;
+            } else {
+                messageId = validateMessageIdString(messageIdStr);
+            }
+
+            persistentTopics.createSubscription(persistentTopic, subscriptionName, messageId);
+        }
+    }
+
     @Parameters(commandDescription = "Reset position for subscription to position closest to timestamp or messageId")
     private class ResetCursor extends CliCommand {
         @Parameter(description = "persistent://property/cluster/namespace/destination", required = true)
@@ -430,7 +460,7 @@ void run() throws PulsarAdminException {
         @Parameter(names = { "--time",
                 "-t" }, description = "time in minutes to reset back to (or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w)", required = false)
         private String resetTimeStr;
-        
+
         @Parameter(names = { "--messageId",
                 "-m" }, description = "messageId to reset back to (ledgerId:entryId)", required = false)
         private String resetMessageIdStr;
@@ -534,7 +564,7 @@ private static int validateTimeString(String s) {
             return Integer.parseInt(s);
         }
     }
-    
+
     private MessageId validateMessageIdString(String resetMessageIdStr) throws PulsarAdminException {
         String[] messageId = resetMessageIdStr.split(":");
         try {
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 411ef7ef5..00f18a977 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -36,6 +36,8 @@
 import org.apache.pulsar.client.admin.Properties;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.ResourceQuotas;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
@@ -96,7 +98,7 @@ void brokerStats() throws Exception {
         brokerStats.run(split("monitoring-metrics"));
         verify(mockBrokerStats).getMetrics();
     }
-    
+
     @Test
     void getOwnedNamespaces() throws Exception {
         PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
@@ -132,7 +134,7 @@ void clusters() throws Exception {
 
         clusters.run(split("delete use"));
         verify(mockClusters).deleteCluster("use");
-        
+
         clusters.run(split("list-failure-domains use"));
         verify(mockClusters).getFailureDomains("use");
 
@@ -303,19 +305,19 @@ void namespaces() throws Exception {
 
         namespaces.run(split("get-message-ttl myprop/clust/ns1"));
         verify(mockNamespaces).getNamespaceMessageTTL("myprop/clust/ns1");
-        
+
         namespaces.run(split("set-anti-affinity-group myprop/clust/ns1 -g group"));
         verify(mockNamespaces).setNamespaceAntiAffinityGroup("myprop/clust/ns1", "group");
 
         namespaces.run(split("get-anti-affinity-group myprop/clust/ns1"));
         verify(mockNamespaces).getNamespaceAntiAffinityGroup("myprop/clust/ns1");
-        
+
         namespaces.run(split("get-anti-affinity-namespaces -p dummy -c cluster -g group"));
         verify(mockNamespaces).getAntiAffinityNamespaces("dummy", "cluster", "group");
 
         namespaces.run(split("delete-anti-affinity-group myprop/clust/ns1 "));
         verify(mockNamespaces).deleteNamespaceAntiAffinityGroup("myprop/clust/ns1");
-        
+
 
         namespaces.run(split("set-retention myprop/clust/ns1 -t 1h -s 1M"));
         verify(mockNamespaces).setRetention("myprop/clust/ns1", new RetentionPolicies(60, 1));
@@ -444,6 +446,9 @@ void persistentTopics() throws Exception {
         topics.run(split("expire-messages-all-subscriptions persistent://myprop/clust/ns1/ds1 -t 100"));
         verify(mockTopics).expireMessagesForAllSubscriptions("persistent://myprop/clust/ns1/ds1", 100);
 
+        topics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest"));
+        verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest);
+
         topics.run(split("create-partitioned-topic persistent://myprop/clust/ns1/ds1 --partitions 32"));
         verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32);
 
@@ -494,10 +499,10 @@ void nonPersistentTopics() throws Exception {
 
         topics.run(split("create-partitioned-topic non-persistent://myprop/clust/ns1/ds1 --partitions 32"));
         verify(mockTopics).createPartitionedTopic("non-persistent://myprop/clust/ns1/ds1", 32);
-        
+
         topics.run(split("list myprop/clust/ns1"));
         verify(mockTopics).getList("myprop/clust/ns1");
-        
+
         topics.run(split("list-in-bundle myprop/clust/ns1 --bundle 0x23d70a30_0x26666658"));
         verify(mockTopics).getListInBundle("myprop/clust/ns1", "0x23d70a30_0x26666658");
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services