You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/03/21 05:06:11 UTC

[GitHub] merlimat closed pull request #1397: Issue 1069: Provide a setting in consumer configuration to specify where to start consuming messages

merlimat closed pull request #1397: Issue 1069: Provide a setting in consumer configuration to specify where to start consuming messages
URL: https://github.com/apache/incubator-pulsar/pull/1397
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 9149bb9f9..8dbb846d1 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -26,6 +26,7 @@
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 
 /**
  * A ManagedLedger it's a superset of a BookKeeper ledger concept.
@@ -132,11 +133,28 @@
      *
      * @param name
      *            the name associated with the ManagedCursor
+     * @param initializeOnLatest
+     *            the flag tell the method wthether it should intialize the cursor at latest position or not.
      * @return the ManagedCursor
      * @throws ManagedLedgerException
      */
     ManagedCursor openCursor(String name) throws InterruptedException, ManagedLedgerException;
 
+    /**
+     * Open a ManagedCursor in this ManagedLedger.
+     * <p>
+     * If the cursors doesn't exist, a new one will be created and its position will be at the end of the ManagedLedger.
+     *
+     * @param name
+     *            the name associated with the ManagedCursor
+     * @param initialPosition
+     *            the cursor will be set at lastest position or not when first created
+     *            default is <b>true</b>
+     * @return the ManagedCursor
+     * @throws ManagedLedgerException
+     */
+    public ManagedCursor openCursor(String name, InitialPosition initialPosition) throws InterruptedException, ManagedLedgerException;
+
     /**
      * Creates a new cursor whose metadata is not backed by durable storage. A caller can treat the non-durable cursor
      * exactly like a normal cursor, with the only difference in that after restart it will not remember which entries
@@ -194,7 +212,23 @@
     void asyncOpenCursor(String name, OpenCursorCallback callback, Object ctx);
 
     /**
-     * Get a list of all the cursors reading from this ManagedLedger.
+     * Open a ManagedCursor asynchronously.
+     *
+     * @see #openCursor(String)
+     * @param name
+     *            the name associated with the ManagedCursor
+     * @param initialPosition
+     *            the cursor will be set at lastest position or not when first created
+     *            default is <b>true</b>
+     * @param callback
+     *            callback object
+     * @param ctx
+     *            opaque context
+     */
+    public void asyncOpenCursor(String name, InitialPosition initialPosition, OpenCursorCallback callback, Object ctx);
+
+    /**
+     * Get a list of all the cursors reading from this ManagedLedger
      *
      * @return a list of cursors
      */
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index ac030e52b..34283ed0d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -78,6 +78,7 @@
 import org.apache.bookkeeper.mledger.util.Pair;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.slf4j.Logger;
@@ -546,7 +547,12 @@ public synchronized void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback
     }
 
     @Override
-    public ManagedCursor openCursor(String cursorName) throws InterruptedException, ManagedLedgerException {
+    public ManagedCursor openCursor(String cursorName) throws InterruptedException, ManagedLedgerException{
+        return openCursor(cursorName, InitialPosition.Latest);
+    }
+
+    @Override
+    public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition) throws InterruptedException, ManagedLedgerException {
         final CountDownLatch counter = new CountDownLatch(1);
         class Result {
             ManagedCursor cursor = null;
@@ -554,7 +560,7 @@ public ManagedCursor openCursor(String cursorName) throws InterruptedException,
         }
         final Result result = new Result();
 
-        asyncOpenCursor(cursorName, new OpenCursorCallback() {
+        asyncOpenCursor(cursorName, initialPosition, new OpenCursorCallback() {
             @Override
             public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                 result.cursor = cursor;
@@ -582,9 +588,12 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
     }
 
     @Override
-    public synchronized void asyncOpenCursor(final String cursorName, final OpenCursorCallback callback,
-            final Object ctx) {
+    public synchronized void asyncOpenCursor(final String cursorName, final OpenCursorCallback callback, Object ctx){
+        this.asyncOpenCursor(cursorName, InitialPosition.Latest, callback, ctx);
+    }
 
+    @Override
+    public synchronized void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition, final OpenCursorCallback callback, final Object ctx){
         try {
             checkManagedLedgerIsOpen();
             checkFenced();
@@ -624,7 +633,7 @@ public void operationComplete() {
                 log.info("[{}] Opened new cursor: {}", name, cursor);
                 cursor.setActive();
                 // Update the ack position (ignoring entries that were written while the cursor was being created)
-                cursor.initializeCursorPosition(getLastPositionAndCounter());
+                cursor.initializeCursorPosition(initialPosition == InitialPosition.Latest ? getLastPositionAndCounter() : getFirstPositionAndCounter());
 
                 synchronized (this) {
                     cursors.add(cursor);
@@ -2030,6 +2039,22 @@ PositionImpl getMarkDeletePositionOfSlowestConsumer() {
         return Pair.create(pos, count);
     }
 
+    /**
+     * Get the first position written in the managed ledger, alongside with the associated counter
+     */
+    Pair<PositionImpl, Long> getFirstPositionAndCounter() {
+        PositionImpl pos;
+        long count;
+        Pair<PositionImpl, Long> lastPositionAndCounter;
+
+        do {
+            pos = getFirstPosition();
+            lastPositionAndCounter = getLastPositionAndCounter();
+            count = lastPositionAndCounter.second - getNumberOfEntries(Range.openClosed(pos, lastPositionAndCounter.first));
+        } while (pos.compareTo(getFirstPosition()) != 0 || lastPositionAndCounter.first.compareTo(getLastPosition()) != 0);
+        return Pair.create(pos, count);
+    }
+
     public void activateCursor(ManagedCursor cursor) {
         if (activeCursors.get(cursor.getName()) == null) {
             activeCursors.add(cursor);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 460e5227d..f6cbb12f6 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -66,6 +66,7 @@
 import org.apache.bookkeeper.mledger.util.Pair;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.pulsar.common.api.ByteBufPair;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
 import org.apache.zookeeper.CreateMode;
@@ -2152,4 +2153,35 @@ public ByteBuf getMessageWithMetadata(byte[] data) throws IOException {
         return ByteBufPair.coalesce(ByteBufPair.get(headers, payload));
     }
 
+    @Test
+    public void testConsumerSubscriptionInitializePosition() throws Exception{
+        final int MAX_ENTRY_PER_LEDGER = 2;
+        ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(MAX_ENTRY_PER_LEDGER);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("lastest_earliest_ledger", config);
+
+        final int totalInsertedEntries = 20;
+        for (int i = 0; i < totalInsertedEntries; i++) {
+            String content = "entry" + i; // 5 bytes
+            ledger.addEntry(content.getBytes());
+        }
+        // Open Cursor also adds cursor into activeCursor-container
+        ManagedCursor latestCursor = ledger.openCursor("c1", InitialPosition.Latest);
+        ManagedCursor earliestCursor = ledger.openCursor("c2", InitialPosition.Earliest);
+
+        // Since getReadPosition returns the next position, we decrease the entryId by 1
+        PositionImpl p1 = (PositionImpl) latestCursor.getReadPosition();
+        PositionImpl p2 = (PositionImpl) earliestCursor.getReadPosition();
+
+        Pair<PositionImpl, Long> latestPositionAndCounter = ledger.getLastPositionAndCounter();
+        Pair<PositionImpl, Long> earliestPositionAndCounter = ledger.getFirstPositionAndCounter();
+
+        assertEquals(latestPositionAndCounter.first.getNext(), p1);
+        assertEquals(earliestPositionAndCounter.first.getNext(), p2);
+
+        assertEquals(latestPositionAndCounter.second.longValue(), totalInsertedEntries);
+        assertEquals(earliestPositionAndCounter.second.longValue(), totalInsertedEntries - earliestCursor.getNumberOfEntriesInBacklog());
+
+        ledger.close();
+
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index fed1da635..04addf5cd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -72,6 +72,7 @@
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.compression.CompressionCodec;
 import org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -818,7 +819,7 @@ protected void internalCreateSubscription(String subscriptionName, MessageIdImpl
                 }
 
                 PersistentSubscription subscription = (PersistentSubscription) topic
-                        .createSubscription(subscriptionName).get();
+                        .createSubscription(subscriptionName, InitialPosition.Latest).get();
                 subscription.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get();
                 log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(),
                         topicName, subscriptionName, messageId);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index dc2e54ed1..4c5dbbca9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -37,10 +37,12 @@
 import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
@@ -68,6 +70,7 @@
     private final String appId;
     private AuthenticationDataSource authenticationData;
     private final String topicName;
+    private final InitialPosition subscriptionInitialPosition;
 
     private final long consumerId;
     private final int priorityLevel;
@@ -104,7 +107,7 @@
     public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
                     int priorityLevel, String consumerName,
                     int maxUnackedMessages, ServerCnx cnx, String appId,
-                    Map<String, String> metadata, boolean readCompacted) throws BrokerServiceException {
+                    Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition) throws BrokerServiceException {
 
         this.subscription = subscription;
         this.subType = subType;
@@ -114,6 +117,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
         this.readCompacted = readCompacted;
         this.consumerName = consumerName;
         this.maxUnackedMessages = maxUnackedMessages;
+        this.subscriptionInitialPosition = subscriptionInitialPosition;
         this.cnx = cnx;
         this.msgOut = new Rate();
         this.msgRedeliver = new Rate();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index e0ff275f6..bada69ca2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -77,6 +77,7 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSend;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
@@ -559,6 +560,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
         final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
         final boolean readCompacted = subscribe.getReadCompacted();
         final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
+        final InitialPosition initialPosition = subscribe.getInitialPosition();
 
         CompletableFuture<Boolean> isProxyAuthorizedFuture;
         if (service.isAuthorizationEnabled() && originalPrincipal != null) {
@@ -622,7 +624,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
                         service.getTopic(topicName.toString())
                                 .thenCompose(topic -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
                                                                       subType, priorityLevel, consumerName, isDurable,
-                                                                      startMessageId, metadata, readCompacted))
+                                                                      startMessageId, metadata, readCompacted, initialPosition))
                                 .thenAccept(consumer -> {
                                     if (consumerFuture.complete(consumer)) {
                                         log.info("[{}] Created subscription on topic {} / {}", remoteAddress, topicName,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index be60116a4..1b20ebdaf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -25,6 +25,7 @@
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -80,9 +81,9 @@ default long getOriginalSequenceId() {
 
     CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType,
             int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
-            Map<String, String> metadata, boolean readCompacted);
+            Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition);
 
-    CompletableFuture<Subscription> createSubscription(String subscriptionName);
+    CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition);
 
     CompletableFuture<Void> unsubscribe(String subName);
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 4a6de05e0..056062a74 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -64,6 +64,7 @@
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -322,7 +323,7 @@ public void removeProducer(Producer producer) {
     @Override
     public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
             SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
-            Map<String, String> metadata, boolean readCompacted) {
+            Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition) {
 
         final CompletableFuture<Consumer> future = new CompletableFuture<>();
 
@@ -366,7 +367,7 @@ public void removeProducer(Producer producer) {
 
         try {
             Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0, cnx,
-                                             cnx.getRole(), metadata, readCompacted);
+                                             cnx.getRole(), metadata, readCompacted, initialPosition);
             subscription.addConsumer(consumer);
             if (!cnx.isActive()) {
                 consumer.close();
@@ -396,7 +397,7 @@ public void removeProducer(Producer producer) {
     }
 
     @Override
-    public CompletableFuture<Subscription> createSubscription(String subscriptionName) {
+    public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition) {
         return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName));
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index cdca1cd8f..c179aef8c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -80,6 +80,7 @@
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -442,7 +443,7 @@ public void removeProducer(Producer producer) {
     @Override
     public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
             SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
-            Map<String, String> metadata, boolean readCompacted) {
+            Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition) {
 
         final CompletableFuture<Consumer> future = new CompletableFuture<>();
 
@@ -490,7 +491,7 @@ public void removeProducer(Producer producer) {
         }
 
         CompletableFuture<? extends Subscription> subscriptionFuture = isDurable ? //
-                getDurableSubscription(subscriptionName) //
+                getDurableSubscription(subscriptionName, initialPosition) //
                 : getNonDurableSubscription(subscriptionName, startMessageId);
 
         int maxUnackedMessages  = isDurable ? brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerConsumer() :0;
@@ -498,7 +499,7 @@ public void removeProducer(Producer producer) {
         subscriptionFuture.thenAccept(subscription -> {
             try {
                 Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName,
-                                                 maxUnackedMessages, cnx, cnx.getRole(), metadata, readCompacted);
+                                                 maxUnackedMessages, cnx, cnx.getRole(), metadata, readCompacted, initialPosition);
                 subscription.addConsumer(consumer);
                 if (!cnx.isActive()) {
                     consumer.close();
@@ -533,9 +534,9 @@ public void removeProducer(Producer producer) {
         return future;
     }
 
-    private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName) {
+    private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName, InitialPosition initialPosition) {
         CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
-        ledger.asyncOpenCursor(Codec.encode(subscriptionName), new OpenCursorCallback() {
+        ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, new OpenCursorCallback() {
             @Override
             public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                 if (log.isDebugEnabled()) {
@@ -598,8 +599,8 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
 
     @SuppressWarnings("unchecked")
     @Override
-    public CompletableFuture<Subscription> createSubscription(String subscriptionName) {
-        return getDurableSubscription(subscriptionName);
+    public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition) {
+        return getDurableSubscription(subscriptionName, initialPosition);
     }
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 61a1fc267..3dc4e5612 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -31,6 +31,7 @@
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
@@ -102,7 +103,7 @@ public String toString() {
         RawConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf,
                 CompletableFuture<Consumer<byte[]>> consumerFuture) {
             super(client, conf.getSingleTopic(), conf, client.externalExecutorProvider().getExecutor(), -1,
-                    consumerFuture, SubscriptionMode.Durable, MessageId.earliest, Schema.IDENTITY);
+                    consumerFuture, SubscriptionMode.Durable, MessageId.earliest, Schema.IDENTITY, SubscriptionInitialPosition.Earliest);
             incomingRawMessages = new GrowableArrayBlockingQueue<>();
             pendingRawReceives = new ConcurrentLinkedQueue<>();
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 50fe3c333..ccd32e1e5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -70,6 +70,7 @@
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.apache.pulsar.common.naming.TopicName;
@@ -222,10 +223,10 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
         doAnswer(new Answer<Object>() {
             @Override
             public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-                ((OpenCursorCallback) invocationOnMock.getArguments()[1]).openCursorComplete(cursorMock, null);
+                ((OpenCursorCallback) invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
                 return null;
             }
-        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(OpenCursorCallback.class), anyObject());
+        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), anyObject());
 
         // call deleteLedgerComplete on ledger asyncDelete
         doAnswer(new Answer<Object>() {
@@ -267,7 +268,7 @@ public void testConsumerGroupChangesWithOldNewConsumers() throws Exception {
 
         // 2. Add old consumer
         Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0,
-                "Cons1"/* consumer name */, 50000, serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false);
+                "Cons1"/* consumer name */, 50000, serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest);
         pdfc.addConsumer(consumer1);
         List<Consumer> consumers = pdfc.getConsumers();
         assertTrue(consumers.get(0).consumerName() == consumer1.consumerName());
@@ -278,7 +279,7 @@ public void testConsumerGroupChangesWithOldNewConsumers() throws Exception {
 
         // 3. Add new consumer
         Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0,
-                "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false);
+                "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest);
         pdfc.addConsumer(consumer2);
         consumers = pdfc.getConsumers();
         assertTrue(consumers.get(0).consumerName() == consumer1.consumerName());
@@ -307,7 +308,7 @@ public void testAddRemoveConsumer() throws Exception {
         // 2. Add consumer
         Consumer consumer1 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0,
                 "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
-                false /* read compacted */));
+                false /* read compacted */, InitialPosition.Latest));
         pdfc.addConsumer(consumer1);
         List<Consumer> consumers = pdfc.getConsumers();
         assertTrue(consumers.get(0).consumerName() == consumer1.consumerName());
@@ -331,7 +332,7 @@ public void testAddRemoveConsumer() throws Exception {
 
         // 5. Add another consumer which does not change active consumer
         Consumer consumer2 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
-                50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */));
+                50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest));
         pdfc.addConsumer(consumer2);
         consumers = pdfc.getConsumers();
         assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName());
@@ -345,7 +346,7 @@ public void testAddRemoveConsumer() throws Exception {
         // 6. Add a consumer which changes active consumer
         Consumer consumer0 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 0 /* consumer id */, 0,
                 "Cons0"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
-                false /* read compacted */));
+                false /* read compacted */, InitialPosition.Latest));
         pdfc.addConsumer(consumer0);
         consumers = pdfc.getConsumers();
         assertTrue(pdfc.getActiveConsumer().consumerName() == consumer0.consumerName());
@@ -579,7 +580,7 @@ private Consumer getNextConsumer(PersistentDispatcherMultipleConsumers dispatche
     private Consumer createConsumer(int priority, int permit, boolean blocked, int id) throws Exception {
         Consumer consumer =
                 new Consumer(null, SubType.Shared, null, id, priority, ""+id, 5000,
-                        serverCnx, "appId", Collections.emptyMap(), false /* read compacted */);
+                        serverCnx, "appId", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest);
         try {
             consumer.flowPermits(permit);
         } catch (Exception e) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index 52c976def..86f2a4ac8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -49,6 +49,7 @@
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -122,7 +123,7 @@ public void testConcurrentTopicAndSubscriptionDelete() throws Exception {
                 .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -180,7 +181,7 @@ public void testConcurrentTopicGCAndSubscriptionDelete() throws Exception {
                 .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -242,7 +243,7 @@ public void testConcurrentTopicDeleteAndUnsubscribe() throws Exception {
                 .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -300,7 +301,7 @@ public void testConcurrentTopicDeleteAndSubsUnsubscribe() throws Exception {
                 .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index ce053fe02..8cabd8331 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -91,6 +91,7 @@
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -422,7 +423,7 @@ public void testSubscribeFail() throws Exception {
                 .setSubscription("").setRequestId(1).setSubType(SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
         try {
             f1.get();
             fail("should fail with exception");
@@ -441,12 +442,12 @@ public void testSubscribeUnsubscribe() throws Exception {
 
         // 1. simple subscribe
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
         f1.get();
 
         // 2. duplicate subscribe
         Future<Consumer> f2 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
 
         try {
             f2.get();
@@ -470,7 +471,7 @@ public void testAddRemoveConsumer() throws Exception {
 
         // 1. simple add consumer
         Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
-                50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */);
+                50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest);
         sub.addConsumer(consumer);
         assertTrue(sub.getDispatcher().isConsumerConnected());
 
@@ -511,14 +512,14 @@ public void testMaxConsumersShared() throws Exception {
         // 1. add consumer1
         Consumer consumer = new Consumer(sub, SubType.Shared, topic.getName(), 1 /* consumer id */, 0,
                 "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
-                false /* read compacted */);
+                false /* read compacted */, InitialPosition.Latest);
         sub.addConsumer(consumer);
         assertEquals(sub.getConsumers().size(), 1);
 
         // 2. add consumer2
         Consumer consumer2 = new Consumer(sub, SubType.Shared, topic.getName(), 2 /* consumer id */, 0,
                 "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
-                false /* read compacted */);
+                false /* read compacted */, InitialPosition.Latest);
         sub.addConsumer(consumer2);
         assertEquals(sub.getConsumers().size(), 2);
 
@@ -526,7 +527,7 @@ public void testMaxConsumersShared() throws Exception {
         try {
             Consumer consumer3 = new Consumer(sub, SubType.Shared, topic.getName(), 3 /* consumer id */, 0,
                     "Cons3"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
-                    false /* read compacted */);
+                    false /* read compacted */, InitialPosition.Latest);
             sub.addConsumer(consumer3);
             fail("should have failed");
         } catch (BrokerServiceException e) {
@@ -539,7 +540,7 @@ public void testMaxConsumersShared() throws Exception {
         // 4. add consumer4 to sub2
         Consumer consumer4 = new Consumer(sub2, SubType.Shared, topic.getName(), 4 /* consumer id */, 0,
                 "Cons4"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
-                false /* read compacted */);
+                false /* read compacted */, InitialPosition.Latest);
         sub2.addConsumer(consumer4);
         assertEquals(sub2.getConsumers().size(), 1);
 
@@ -550,7 +551,7 @@ public void testMaxConsumersShared() throws Exception {
         try {
             Consumer consumer5 = new Consumer(sub2, SubType.Shared, topic.getName(), 5 /* consumer id */, 0,
                     "Cons5"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
-                    false /* read compacted */);
+                    false /* read compacted */, InitialPosition.Latest);
             sub2.addConsumer(consumer5);
             fail("should have failed");
         } catch (BrokerServiceException e) {
@@ -602,14 +603,14 @@ public void testMaxConsumersFailover() throws Exception {
         // 1. add consumer1
         Consumer consumer = new Consumer(sub, SubType.Failover, topic.getName(), 1 /* consumer id */, 0,
                 "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
-                false /* read compacted */);
+                false /* read compacted */, InitialPosition.Latest);
         sub.addConsumer(consumer);
         assertEquals(sub.getConsumers().size(), 1);
 
         // 2. add consumer2
         Consumer consumer2 = new Consumer(sub, SubType.Failover, topic.getName(), 2 /* consumer id */, 0,
                 "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
-                false /* read compacted */);
+                false /* read compacted */, InitialPosition.Latest);
         sub.addConsumer(consumer2);
         assertEquals(sub.getConsumers().size(), 2);
 
@@ -617,7 +618,7 @@ public void testMaxConsumersFailover() throws Exception {
         try {
             Consumer consumer3 = new Consumer(sub, SubType.Failover, topic.getName(), 3 /* consumer id */, 0,
                     "Cons3"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
-                    false /* read compacted */);
+                    false /* read compacted */, InitialPosition.Latest);
             sub.addConsumer(consumer3);
             fail("should have failed");
         } catch (BrokerServiceException e) {
@@ -630,7 +631,7 @@ public void testMaxConsumersFailover() throws Exception {
         // 4. add consumer4 to sub2
         Consumer consumer4 = new Consumer(sub2, SubType.Failover, topic.getName(), 4 /* consumer id */, 0,
                 "Cons4"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
-                false /* read compacted */);
+                false /* read compacted */, InitialPosition.Latest);
         sub2.addConsumer(consumer4);
         assertEquals(sub2.getConsumers().size(), 1);
 
@@ -641,7 +642,7 @@ public void testMaxConsumersFailover() throws Exception {
         try {
             Consumer consumer5 = new Consumer(sub2, SubType.Failover, topic.getName(), 5 /* consumer id */, 0,
                     "Cons5"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
-                    false /* read compacted */);
+                    false /* read compacted */, InitialPosition.Latest);
             sub2.addConsumer(consumer5);
             fail("should have failed");
         } catch (BrokerServiceException e) {
@@ -681,7 +682,7 @@ public void testUbsubscribeRaceConditions() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
         PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
         Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
-                50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */);
+                50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest);
         sub.addConsumer(consumer1);
 
         doAnswer(new Answer<Object>() {
@@ -703,7 +704,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
         try {
             Thread.sleep(10); /* delay to ensure that the ubsubscribe gets executed first */
             new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
-                    50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */);
+                    50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest);
         } catch (BrokerServiceException e) {
             assertTrue(e instanceof BrokerServiceException.SubscriptionFencedException);
         }
@@ -733,7 +734,7 @@ public void testDeleteTopic() throws Exception {
                 .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), false /* read compacted */);
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest);
         f1.get();
 
         assertTrue(topic.delete().isCompletedExceptionally());
@@ -748,7 +749,7 @@ public void testDeleteAndUnsubscribeTopic() throws Exception {
                 .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -802,7 +803,7 @@ public void testConcurrentTopicAndSubscriptionDelete() throws Exception {
                 .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -889,7 +890,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                 .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
 
         Future<Consumer> f = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
 
         try {
             f.get();
@@ -968,10 +969,10 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
         doAnswer(new Answer<Object>() {
             @Override
             public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-                ((OpenCursorCallback) invocationOnMock.getArguments()[1]).openCursorComplete(cursorMock, null);
+                ((OpenCursorCallback) invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
                 return null;
             }
-        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(OpenCursorCallback.class), anyObject());
+        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), anyObject());
 
         // call deleteLedgerComplete on ledger asyncDelete
         doAnswer(new Answer<Object>() {
@@ -1007,7 +1008,7 @@ public void testFailoverSubscription() throws Exception {
         // 1. Subscribe with non partition topic
         Future<Consumer> f1 = topic1.subscribe(serverCnx, cmd1.getSubscription(), cmd1.getConsumerId(),
                 cmd1.getSubType(), 0, cmd1.getConsumerName(), cmd1.getDurable(), null, Collections.emptyMap(),
-                cmd1.getReadCompacted());
+                cmd1.getReadCompacted(), InitialPosition.Latest);
         f1.get();
 
         // 2. Subscribe with partition topic
@@ -1019,7 +1020,7 @@ public void testFailoverSubscription() throws Exception {
 
         Future<Consumer> f2 = topic2.subscribe(serverCnx, cmd2.getSubscription(), cmd2.getConsumerId(),
                 cmd2.getSubType(), 0, cmd2.getConsumerName(), cmd2.getDurable(), null, Collections.emptyMap(),
-                cmd2.getReadCompacted());
+                cmd2.getReadCompacted(), InitialPosition.Latest);
         f2.get();
 
         // 3. Subscribe and create second consumer
@@ -1029,7 +1030,7 @@ public void testFailoverSubscription() throws Exception {
 
         Future<Consumer> f3 = topic2.subscribe(serverCnx, cmd3.getSubscription(), cmd3.getConsumerId(),
                 cmd3.getSubType(), 0, cmd3.getConsumerName(), cmd3.getDurable(), null, Collections.emptyMap(),
-                cmd3.getReadCompacted());
+                cmd3.getReadCompacted(), InitialPosition.Latest);
         f3.get();
 
         assertEquals(
@@ -1050,7 +1051,7 @@ public void testFailoverSubscription() throws Exception {
 
         Future<Consumer> f4 = topic2.subscribe(serverCnx, cmd4.getSubscription(), cmd4.getConsumerId(),
                 cmd4.getSubType(), 0, cmd4.getConsumerName(), cmd4.getDurable(), null, Collections.emptyMap(),
-                cmd4.getReadCompacted());
+                cmd4.getReadCompacted(), InitialPosition.Latest);
         f4.get();
 
         assertEquals(
@@ -1076,7 +1077,7 @@ public void testFailoverSubscription() throws Exception {
 
         Future<Consumer> f5 = topic2.subscribe(serverCnx, cmd5.getSubscription(), cmd5.getConsumerId(),
                 cmd5.getSubType(), 0, cmd5.getConsumerName(), cmd5.getDurable(), null, Collections.emptyMap(),
-                cmd5.getReadCompacted());
+                cmd5.getReadCompacted(), InitialPosition.Latest);
 
         try {
             f5.get();
@@ -1093,7 +1094,7 @@ public void testFailoverSubscription() throws Exception {
 
         Future<Consumer> f6 = topic2.subscribe(serverCnx, cmd6.getSubscription(), cmd6.getConsumerId(),
                 cmd6.getSubType(), 0, cmd6.getConsumerName(), cmd6.getDurable(), null, Collections.emptyMap(),
-                cmd6.getReadCompacted());
+                cmd6.getReadCompacted(), InitialPosition.Latest);
         f6.get();
 
         // 7. unsubscribe exclusive sub
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 9fb02ea43..fe933bbb3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -89,6 +89,7 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandProducerSuccess;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendError;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendReceipt;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess;
 import org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys;
@@ -1452,20 +1453,20 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
             @Override
             public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                 Thread.sleep(300);
-                ((OpenCursorCallback) invocationOnMock.getArguments()[1]).openCursorComplete(cursorMock, null);
+                ((OpenCursorCallback) invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
                 return null;
             }
-        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(OpenCursorCallback.class), anyObject());
+        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), anyObject());
 
         doAnswer(new Answer<Object>() {
             @Override
             public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                 Thread.sleep(300);
-                ((OpenCursorCallback) invocationOnMock.getArguments()[1])
+                ((OpenCursorCallback) invocationOnMock.getArguments()[2])
                         .openCursorFailed(new ManagedLedgerException("Managed ledger failure"), null);
                 return null;
             }
-        }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(OpenCursorCallback.class), anyObject());
+        }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(OpenCursorCallback.class), anyObject());
 
         doAnswer(new Answer<Object>() {
             @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 83ae0ba40..feabc5d3b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -292,4 +292,8 @@
      */
     ConsumerBuilder<T> properties(Map<String, String> properties);
 
+    /**
+     * Set subscriptionInitialPosition for the consumer
+    */
+    ConsumerBuilder<T> subscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
index c526af5b9..50df8a533 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
@@ -26,7 +26,7 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 /**
  * Class specifying the configuration of a consumer. In Exclusive subscription, only a single consumer is allowed to
  * attach to the subscription. Other consumers will get an error message. In Shared subscription, multiple consumers
@@ -46,6 +46,7 @@
 
     private final ConsumerConfigurationData<byte[]> conf = new ConsumerConfigurationData<>();
 
+    private boolean initializeSubscriptionOnLatest = true;
     /**
      * @return the configured timeout in milliseconds for unacked messages.
      */
@@ -339,4 +340,21 @@ public ConsumerConfiguration setProperties(Map<String, String> properties) {
     public ConsumerConfigurationData<byte[]> getConfigurationData() {
         return conf;
     }
+    
+     /** 
+     * @param subscriptionInitialPosition the initial position at which to set
+     * set cursor  when subscribing to the topic first time
+     * Default is {@value InitialPosition.Latest}
+     */
+    public ConsumerConfiguration setSubscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition) {
+        conf.setSubscriptionInitialPosition(subscriptionInitialPosition);
+        return this;
+    }   
+
+    /** 
+     * @return the configured {@link subscriptionInitailPosition} for the consumer
+     */
+    public SubscriptionInitialPosition getSubscriptionInitialPosition(){
+        return conf.getSubscriptionInitialPosition();
+    }   
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/SubscriptionInitialPosition.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/SubscriptionInitialPosition.java
new file mode 100644
index 000000000..05deee3b0
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/SubscriptionInitialPosition.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * intial position at which the cursor will be set when subscribe
+ *
+ *
+ */
+public enum SubscriptionInitialPosition {
+    /**
+     * the latest position which means the start consuming position will be the last message
+     */
+    Latest,
+
+    /**
+     * the earliest position which means the start consuming position will be the first message
+     */
+    Earliest,
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index dc435e71b..e1d5884ab 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -37,6 +37,7 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -222,4 +223,10 @@ private ConsumerBuilderImpl(PulsarClientImpl client, ConsumerConfigurationData<T
         conf.setPatternAutoDiscoveryPeriod(periodInMinutes);
         return this;
     }
+
+	@Override
+	public ConsumerBuilder<T> subscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition) {
+        conf.setSubscriptionInitialPosition(subscriptionInitialPosition);
+		return this;
+	}
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index d631fd619..067a67770 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -56,6 +56,7 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarDecoder;
@@ -123,6 +124,7 @@
 
     private final boolean readCompacted;
 
+    private final SubscriptionInitialPosition subscriptionInitialPosition;
     private final ConnectionHandler connectionHandler;
 
     enum SubscriptionMode {
@@ -136,12 +138,12 @@
 
     ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
             ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) {
-        this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, SubscriptionMode.Durable, null, schema);
+        this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, SubscriptionMode.Durable, null, schema, SubscriptionInitialPosition.Latest);
     }
 
     ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
                  ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
-                 SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema) {
+                 SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, SubscriptionInitialPosition subscriptionInitialPosition) {
         super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema);
         this.consumerId = client.newConsumerId();
         this.subscriptionMode = subscriptionMode;
@@ -154,6 +156,7 @@
         this.priorityLevel = conf.getPriorityLevel();
         this.batchMessageAckTracker = new ConcurrentSkipListMap<>();
         this.readCompacted = conf.isReadCompacted();
+        this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition();
 
         if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
             stats = new ConsumerStatsRecorderImpl(client, conf, this);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index ed374f663..881a69c26 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -32,6 +32,7 @@
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderListener;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -82,7 +83,7 @@ public void reachedEndOfTopic(Consumer<T> consumer) {
         }
 
         consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor,
-                -1, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema);
+                -1, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, SubscriptionInitialPosition.Latest);
     }
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 77f2e9736..3f2eb2b02 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -35,6 +35,7 @@
 import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 
 @Data
 public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
@@ -73,6 +74,8 @@
 
     private boolean readCompacted = false;
 
+    private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;
+
     private int patternAutoDiscoveryPeriod = 1;
 
     @JsonIgnore
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 4776ee1f8..23587b564 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -6741,6 +6741,10 @@ public Builder clearProtocolVersion() {
     // optional .pulsar.proto.Schema schema = 12;
     boolean hasSchema();
     org.apache.pulsar.common.api.proto.PulsarApi.Schema getSchema();
+    
+    // optional .pulsar.proto.CommandSubscribe.InitialPosition initialPosition = 13 [default = Latest];
+    boolean hasInitialPosition();
+    org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition getInitialPosition();
   }
   public static final class CommandSubscribe extends
       com.google.protobuf.GeneratedMessageLite
@@ -6822,6 +6826,47 @@ private SubType(int index, int value) {
       // @@protoc_insertion_point(enum_scope:pulsar.proto.CommandSubscribe.SubType)
     }
     
+    public enum InitialPosition
+        implements com.google.protobuf.Internal.EnumLite {
+      Latest(0, 0),
+      Earliest(1, 1),
+      ;
+      
+      public static final int Latest_VALUE = 0;
+      public static final int Earliest_VALUE = 1;
+      
+      
+      public final int getNumber() { return value; }
+      
+      public static InitialPosition valueOf(int value) {
+        switch (value) {
+          case 0: return Latest;
+          case 1: return Earliest;
+          default: return null;
+        }
+      }
+      
+      public static com.google.protobuf.Internal.EnumLiteMap<InitialPosition>
+          internalGetValueMap() {
+        return internalValueMap;
+      }
+      private static com.google.protobuf.Internal.EnumLiteMap<InitialPosition>
+          internalValueMap =
+            new com.google.protobuf.Internal.EnumLiteMap<InitialPosition>() {
+              public InitialPosition findValueByNumber(int number) {
+                return InitialPosition.valueOf(number);
+              }
+            };
+      
+      private final int value;
+      
+      private InitialPosition(int index, int value) {
+        this.value = value;
+      }
+      
+      // @@protoc_insertion_point(enum_scope:pulsar.proto.CommandSubscribe.InitialPosition)
+    }
+    
     private int bitField0_;
     // required string topic = 1;
     public static final int TOPIC_FIELD_NUMBER = 1;
@@ -7020,6 +7065,16 @@ public boolean hasSchema() {
       return schema_;
     }
     
+    // optional .pulsar.proto.CommandSubscribe.InitialPosition initialPosition = 13 [default = Latest];
+    public static final int INITIALPOSITION_FIELD_NUMBER = 13;
+    private org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition initialPosition_;
+    public boolean hasInitialPosition() {
+      return ((bitField0_ & 0x00000800) == 0x00000800);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition getInitialPosition() {
+      return initialPosition_;
+    }
+    
     private void initFields() {
       topic_ = "";
       subscription_ = "";
@@ -7033,6 +7088,7 @@ private void initFields() {
       metadata_ = java.util.Collections.emptyList();
       readCompacted_ = false;
       schema_ = org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
+      initialPosition_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition.Latest;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -7125,6 +7181,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr
       if (((bitField0_ & 0x00000400) == 0x00000400)) {
         output.writeMessage(12, schema_);
       }
+      if (((bitField0_ & 0x00000800) == 0x00000800)) {
+        output.writeEnum(13, initialPosition_.getNumber());
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -7181,6 +7240,10 @@ public int getSerializedSize() {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(12, schema_);
       }
+      if (((bitField0_ & 0x00000800) == 0x00000800)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeEnumSize(13, initialPosition_.getNumber());
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -7318,6 +7381,8 @@ public Builder clear() {
         bitField0_ = (bitField0_ & ~0x00000400);
         schema_ = org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
         bitField0_ = (bitField0_ & ~0x00000800);
+        initialPosition_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition.Latest;
+        bitField0_ = (bitField0_ & ~0x00001000);
         return this;
       }
       
@@ -7400,6 +7465,10 @@ public Builder clone() {
           to_bitField0_ |= 0x00000400;
         }
         result.schema_ = schema_;
+        if (((from_bitField0_ & 0x00001000) == 0x00001000)) {
+          to_bitField0_ |= 0x00000800;
+        }
+        result.initialPosition_ = initialPosition_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -7449,6 +7518,9 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandSub
         if (other.hasSchema()) {
           mergeSchema(other.getSchema());
         }
+        if (other.hasInitialPosition()) {
+          setInitialPosition(other.getInitialPosition());
+        }
         return this;
       }
       
@@ -7591,6 +7663,15 @@ public Builder mergeFrom(
               subBuilder.recycle();
               break;
             }
+            case 104: {
+              int rawValue = input.readEnum();
+              org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition value = org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition.valueOf(rawValue);
+              if (value != null) {
+                bitField0_ |= 0x00001000;
+                initialPosition_ = value;
+              }
+              break;
+            }
           }
         }
       }
@@ -8009,6 +8090,30 @@ public Builder clearSchema() {
         return this;
       }
       
+      // optional .pulsar.proto.CommandSubscribe.InitialPosition initialPosition = 13 [default = Latest];
+      private org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition initialPosition_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition.Latest;
+      public boolean hasInitialPosition() {
+        return ((bitField0_ & 0x00001000) == 0x00001000);
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition getInitialPosition() {
+        return initialPosition_;
+      }
+      public Builder setInitialPosition(org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        bitField0_ |= 0x00001000;
+        initialPosition_ = value;
+        
+        return this;
+      }
+      public Builder clearInitialPosition() {
+        bitField0_ = (bitField0_ & ~0x00001000);
+        initialPosition_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition.Latest;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandSubscribe)
     }
     
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 0c0a50135..505d7eda5 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -216,6 +216,13 @@ message CommandSubscribe {
     optional bool read_compacted = 11;
 
 	optional Schema schema = 12;
+	enum InitialPosition {
+		Latest   = 0;
+		Earliest = 1;
+	}
+	// Signal wthether the subscription will initialize on latest
+	// or not -- earliest
+	optional InitialPosition initialPosition = 13 [default = Latest];
 }
 
 message CommandPartitionedTopicMetadata {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services