You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/02/11 09:47:01 UTC

[GitHub] XiaoZYang closed pull request #1218: set initial cursor position when subscribe

XiaoZYang closed pull request #1218: set initial cursor position when subscribe
URL: https://github.com/apache/incubator-pulsar/pull/1218
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.gitignore b/.gitignore
index 1e194b2a7..a5811fb91 100644
--- a/.gitignore
+++ b/.gitignore
@@ -23,6 +23,9 @@ pulsar-broker/src/test/resources/log4j.properties
 *.iml
 *.iws
 
+# Vscode
+.vscode/
+
 # Mac
 .DS_Store
 
diff --git a/faq.md b/faq.md
new file mode 100644
index 000000000..1ff70938b
--- /dev/null
+++ b/faq.md
@@ -0,0 +1,273 @@
+# Frequently Asked Questions
+- Getting Started
+- Concepts and Design
+- Usage and Configuration
+
+---
+
+## Getting Started
+
+### What is the minimum requirements for Apache Pulsar ?
+You need 3 kind of clusters: bookie, broker, zookeeper. But if not have enough resource, it's ok to run them on same machine.
+
+---
+
+## Concepts and Design
+
+### Is ack tied to subscription?
+Yes, ack is tied to a particular subscription.
+
+### Where should I look into to tweak load balancing ?
+There are few parameters to look at :
+1. The topic assignments to brokers are done in terms of ?bundles?, that is in group of topic
+2. Topics are matched to bundles by hashing on the name
+3. Effectively, a bundle is a hash-range where topics falls into
+4. Initially the default is to have 4 ?bundles? for a namespace
+5. When the traffic increases on a given bundle, it will be split in 2 and reassigned to a different broker
+6. There are some adjustable thresholds that can be used to control when the split happens, based on number of topics/partitions, messages in/out, bytes in/out, etc..
+7. It?s also possible to specify a higher number of bundles when creating a namepsac
+8. There are the load-manager threshold that control when a broker should offload some of the bundles to other brokers
+
+### What is the lifecycle of subscription?
+Once it?s created, it retains all messages published after that (minus explicit TTL). Subscriptions can be dropped by explicitly unsubscribing (in `Consumer` API) or through the REST/CLI .
+
+### What is a bundle?
+In Pulsar, "namespaces" are the administrative unit: you can configure most options on a namespace and they will be applied on the topics contained on the namespace. It gives the convenience of doing settings and operations on a group of topics rather than doing it once per topic.
+
+In general, the pattern is to use a namespace for each user application. So a single user/tenant, can create multiple namespaces to manage its own applications.
+
+When it comes to topics, we need a way to assign topics to brokers, control the load and move them if a broker becomes overloaded. Rather that doing this operations per each single topic (ownership, load-monitoring, assigning), we do it in bundles, or "groups of topics".
+
+In practical words, the number of bundles determines "into how many brokers can I spread the topics for a given namespace".
+
+From the client API or implementation, there's no concept of bundles, clients will lookup the topics that want to publish/consumer individually.
+
+On the broker side, the namespace is broken down into multiple bundles, and each bundle can be assigned to a different broker. Effectively, bundles are the "unit of assignment" for topics into brokers and this is what the load-manager uses to track the traffic and decide where to place "bundles" and whether to offload them to other brokers.
+
+A bundle is represented by a hash-range. The 32-bit hash space is initially divided equally into the requested bundles. Topics are matched to a bundle by hashing on the topic name.
+
+Default number of bundles is configured in `broker.conf`: `defaultNumberOfNamespaceBundles=4`
+
+When the traffic increases on a given bundle, it will be split in 2 and reassigned to a different broker.
+
+Enable auto-split: `loadBalancerAutoBundleSplitEnable=true` trigger unload and reassignment after splitting: `loadBalancerAutoUnloadSplitsEnable=true`.
+
+If is expected to have a high traffic on a particular namespace, it's a good practice to specify a higher number of bundles when creating the namespace: `bin/pulsar-admin namespaces create $NS --bundles 64`. This will avoid the initial auto-adjustment phase.
+
+All the thresholds for the auto-splitting can be configured in `broker.conf`, eg: number of topics/partitions, messages in/out, bytes in/out, etc...
+
+### How the design deals with isolation between tenants, which concepts enable that and up to what extent, how huge difference can exist between tenants so that impact on each other is noticeable via degraded latency.
+The isolation between tenants (and topics of same tenant) happens at many different points. I'll start from the bottom up.
+
+#### Storage
+You're probably familiar with BookKeeper, but of the main strength is that each bookie can efficiently serve many different ledger (segments of topic data). We tested with 100s of thousand per single node.
+
+This is because there is a single journal (on its own device) where all the write operations gets appended and then the entries are periodically flushed in background on the storage device.
+
+This gives isolation between writes and reads in a bookie. You can read as fast as you can, maxing out the IO on the storage device, but your write throughput and latency are going to be unaffected.
+
+#### Broker
+Everything in the broker happens asynchronously. The amount of memory that is used is also capped per broker.
+
+Whenever the broker is marked as overloaded, traffic can be quickly shifted (manually or without intervention) to less loaded brokers. LoadManager component in brokers is dedicated to that.
+
+There are several points of flow control:
+- On the producer side, there are limits on the in-flight message for broker bookies, that will slow down users trying to publish faster that the system can absorb
+- On the consumer side, it's possible to throttle the delivery to a certain rate
+
+#### Quotas
+Can configure different storage quotas for different tenants/namespaces and take different actions when the quotas are filled up (block producer, give exception, drop older messages).
+
+#### Broker level isolation
+There is the option to isolate certain tenants/namespaces to a particular set of broker. Typically the reason for using that was to experiment with different configurations, debugging and quickly react to unexpected situations.
+
+For example, a particular user might be triggering a bad behavior in the broker that can impact performance for other tenants.
+
+In this case, the particular user can be "isolated" a subset of brokers that will not serve any other traffic, until a proper fix that correctly handles the condition can be deployed.
+
+This is a lightweight option of having multiple clusters for different users, since most of the other parts are still shared (ZK, BK,...).
+
+
+### Is there "regex" topic in Pulsar?
+There is regex subscription coming up in Pulsar 2.0. See [PIP-13](https://github.com/apache/incubator-pulsar/wiki/PIP-13:-Subscribe-to-topics-represented-by-regular-expressions).
+
+### Does Pulsar have, or plan to have, a concept of log compaction where only the latest message with the same key will be kept ?
+Yes, see [PIP-9](https://github.com/ivankelly/incubator-pulsar-wiki/pull/1/files) for more details.
+
+### When I use an exclusive subscription to a partitioned topic, is the subscription attached to the "whole topic" or to a "topic partition"? 
+On a partitioned topic, you can use all the 3 supported subscription types (exclusive, failover, shared), same as with non partitioned topics. 
+The ?subscription? concept is roughly similar to a ?consumer-group? in Kafka. You can have multiple of them in the same topic, with different names.
+
+If you use ?exclusive?, a consumer will try to consume from all partitions, or fail if any partition is already being consumer.
+
+The mode similar to Kafka is ?failover? subscription. In this case, you have 1 active consumer per partition, the active/stand-by decision is made at the partition level, and Pulsar will make sure to spread the partition assignments evenly across consumer.
+
+### What is the proxy component?
+It?s a component that was introduced recently. Essentially it?s a stateless proxy that speaks that Pulsar binary protocol. The motivation is to avoid (or overcome the impossibility) of direct connection between clients and brokers.
+
+--- 
+
+## Usage and Configuration
+### Can I manually change the number of bundles after creating namespaces?
+Yes, you can split a given bundle manually.
+
+### Is the producer kafka wrapper thread-safe?
+The producer wrapper should be thread-safe.
+
+### Can I just remove a subscription?
+Yes, you can use the cli tool `bin/pulsar-admin persistent unsubscribe $TOPIC -s $SUBSCRIPTION`.
+
+### How are subscription modes set? Can I create new subscriptions over the WebSocket API?
+Yes, you can set most of the producer/consumer configuration option in websocket, by passing them as HTTP query parameters like:
+`ws://localhost:8080/ws/consumer/persistent/sample/standalone/ns1/my-topic/my-sub?subscriptionType=Shared`
+
+see [the doc](http://pulsar.apache.org/docs/latest/clients/WebSocket/#RunningtheWebSocketservice-1fhsvp).
+
+### Is there any sort of order of operations or best practices on the upgrade procedure for a geo-replicated Pulsar cluster?
+In general, updating the Pulsar brokers is an easy operation, since the brokers don't have local state. The typical rollout is a rolling upgrade, either doing 1 broker at a time or some percentage of them in parallel.
+
+There are not complicated requirements to upgrade geo-replicated clusters, since we take particular care in ensuring backward and forward compatibility.
+
+Both the client and the brokers are reporting their own protocol version and they're able to disable newer features if the other side doesn't support them yet.
+
+Additionally, when making metadata breaking format changes (if the need arises), we make sure to spread the changes along at least 2 releases.
+
+This is to always allow the possibility to downgrade a running cluster to a previous version, in case any server problem is identified in production.
+
+So, one release will understand the new format while the next one will actually start using it.
+
+### Since Pulsar has configurable retention per namespace, can I set a "forever" value, ie., always keep all data in the namespaces?
+So, retention applies to "consumed" messages. Ones, for which the consumer has already acknowledged the processing. By default, retention is 0, so it means data is deleted as soon as all consumers acknowledge. You can set retention to delay the retention.
+
+That also means, that data is kept forever, by default, if the consumers are not acknowledging.
+
+There is no currently "infinite" retention, other than setting to very high value.
+
+### How can a consumer "replay" a topic from the beginning, ie., where can I set an offset for the consumer?
+1. Use admin API (or CLI tool):
+    - Reset to a specific point in time (3h ago)
+    - Reset to a message id
+2. You can use the client API `seek`.
+
+### When create a consumer, does this affect other consumers ?
+The key is that you should use different subscriptions for each consumer. Each subscription is completely independent from others.
+
+### The default when creating a consumer, is it to "tail" from "now" on the topic, or from the "last acknowledged" or something else?
+So when you spin up a consumer, it will try to subscribe to the topic, if the subscription doesn't exist, a new one will be created, and it will be positioned at the end of the topic ("now"). 
+
+Once you reconnect, the subscription will still be there and it will be positioned on the last acknowledged messages from the previous session.
+
+### I want some produce lock, i.e., to pessimistically or optimistically lock a specified topic so only one producer can write at a time and all further producers know they have to reprocess data before trying again to write a topic.
+To ensure only one producer is connected, you just need to use the same "producerName", the broker will ensure that no 2 producers with same name are publishing on a given topic.
+
+### I tested the performance using PerformanceProducer between two server node with 10,000Mbits NIC(and I tested tcp throughput can be larger than 1GB/s). I saw that the max msg throughput is around 1000,000 msg/s when using little msg_size(such as 64/128Bytes), when I increased the msg_size to 1028 or larger , then the msg/s will decreased sharply to 150,000msg/s, and both has max throughput around   1600Mbit/s, which is far from 1GB/s.  And I'm curious that the throughput between producer and broker why can't excess 1600Mbit/s ?  It seems that the Producer executor only use one thread, is this the reason?Then I start two producer client jvm, the throughput increased not much, just about little beyond 1600Mbit/s. Any other reasons?
+Most probably, when increasing the payload size, you're reaching the disk max write rate on a single bookie.
+
+There are few tricks that can be used to increase throughput (other than just partitioning)
+
+1. Enable striping in BK, by setting ensemble to bigger than write quorum. E.g. e=5 w=2 a=2. Write 2 copies of each message but stripe them across 5 bookies
+
+2. If there are already multiple topics/partitions, you can try to configure the bookies with multiple journals (e.g. 4). This should increase the throughput when the journal is on SSDs, since the controller has multiple IO queues and can efficiently sustain multiple threads each doing sequential writes
+
+- Option (1) you just configure it on a given pulsar namespace, look at "namespaces set-persistence" command
+
+- Option (2) needs to be configured in bookies
+
+### Is there any work on a Mesos Framework for Pulsar/Bookkeeper this point? Would this be useful?
+We don?t have anything ready available for Mesos/DCOS though there should be nothing preventing it
+
+It would surely be useful.
+
+
+### Is there an HDFS like interface?
+Not for Pulsar.There was some work in BK / DistributedLog community to have it but not at the messaging layer.
+
+### Where can I find information about `receiveAsync` parameters? In particular, is there a timeout as in `receive`?
+There?s no other info about `receiveAsync()`. The method doesn?t take any parameters. Currently there?s no timeout on it. You can always set a timeout on the `CompletableFuture` itself, but the problem is how to cancel the future and avoid ?getting? the message.
+
+What?s your use case for timeout on the `receiveAsync()`? Could that be achieved more easily by using the `MessageListener`?
+
+### Why do we choose to use bookkeeper to store consumer offset instead of zookeeper? I mean what's the benefits?
+ZooKeeper is a ?consensus? system that while it exposes a key/value interface is not meant to support a large volume of writes per second.
+
+ZK is not an ?horizontally scalable? system, because every node receive every transaction and keeps the whole data set. Effectively, ZK is based on a single ?log? that is replicated consistently across the participants. 
+
+The max throughput we have observed on a well configured ZK on good hardware was around ~10K writes/s. If you want to do more than that, you would have to shard it.. 
+
+To store consumers cursor positions, we need to write potentially a large number of updates per second. Typically we persist the cursor every 1 second, though the rate is configurable and if you want to reduce the amount of potential duplicates, you can increase the persistent frequency.
+
+With BookKeeper it?s very efficient to have a large throughput across a huge number of different ?logs?. In our case, we use 1 log per cursor, and it becomes feasible to persist every single cursor update.
+
+### I'm facing some issue using `.receiveAsync` that it seems to be related with `UnAckedMessageTracker` and `PartitionedConsumerImpl`. We are consuming messages with `receiveAsync`, doing instant `acknowledgeAsync` when message is received, after that the process will delay the next execution of itself. In such scenario we are consuming a lot more messages (repeated) than the num of messages produced. We are using Partitioned topics with setAckTimeout 30 seconds and I believe this issue could be related with `PartitionedConsumerImpl` because the same test in a non-partitioned topic does not generate any repeated message.
+PartitionedConsumer is composed of a set of regular consumers, one per partition. To have a single `receive()` abstraction, messages from all partitions are then pushed into a shared queue. 
+
+The thing is that the unacked message tracker works at the partition level.So when the timeout happens, it?s able to request redelivery for the messages and clear them from the queue when that happens,
+but if the messages were already pushed into the shared queue, the ?clearing? part will not happen.
+
+- the only quick workaround that I can think of is to increase the ?ack-timeout? to a level in which timeout doesn?t occur in processing
+- another option would be to reduce the receiver queue size, so that less messages are sitting in the queue
+
+### Can I use bookkeeper newer v3 wire protocol in Pulsar? How can I enable it?
+The answer is currently not, because we force the broker to use v2 protocol and that's not configurable at the moment.
+
+### Is "kubernetes/generic/proxy.yaml" meant to be used whenever we want to expose a Pulsar broker outside the Kubernetes cluster?
+Yes, the ?proxy? is an additional component to deploy a stateless proxy frontend that can be exposed through a load balancer and that doesn?t require direct connectivity to the actual brokers. No need to use it from within Kubernetes cluster. Also in some cases it?s simpler to have expose the brokers through `NodePort` or `ClusterIp` for other outside producer/consumers.
+
+### Is there a way of having both authentication and the Pulsar dashboard working at same time?
+The key is that with authorization, the stats collector needs to access the APIs that require the credentials. That?s not a problem for stats collected through Prometheus but it is for the ?Pulsar dashboard? which is where the per-topic stats are shown. I think that should be quite easy to fix.
+
+### How can I know when I've reached the end of the stream during replay?
+There is no direct way because messages can still be published in the topic, and relying on the `readNext(timeout)` is not precise because the client might be temporarily disconnected from broker in that moment.
+
+One option is to use `publishTimestamp` of messages. When you start replaying you can check current "now", then you replay util you hit a message with timestamp >= now.
+
+Another option is to "terminate" the topic. Once a topic is "terminated", no more message can be published on the topic, a reader/consumer can check the `hasReachedEndOfTopic()` condition to know when that happened.
+
+A final option is to check the topic stats. This is a tiny bit involved, because it requires the admin client (or using REST) to get the stats for the topic and checking the "backlog". If the backlog is 0, it means we've hit the end.
+
+### How can I prevent an inactive topic to be deleted under any circumstance? I want to set no time or space limit for a certain namespace.
+There?s not currently an option for ?infinite? (though it sounds a good idea! maybe we could use `-1` for that). The only option now is to use INT_MAX for `retentionTimeInMinutes` and LONG_MAX for `retentionSizeInMB`. It?s not ?infinite? but 4085 years of retention should probably be enough!
+
+### Is there a profiling option in Pulsar, so that we can breakdown the time costed in every stage? For instance, message A stay in queue 1ms, bk writing time 2ms(interval between sending to bk and receiving ack from bk) and so on.
+There are latency stats at different stages. In the client (eg: reported every 1min in info logs). 
+In the broker: accessible through the broker metrics, and finally in bookies where there are several different latency metrics. 
+
+In broker there?s just the write latency on BK, because there is no other queuing involved in the write path.
+
+### How can I have multiple readers that each get all the messages from a topic from the beginning concurrently? I.e., no round-robin, no exclusivity
+you can create reader with `MessageId.earliest`
+
+
+### Does broker validate if a property exists or not when producer/consumer connects ?
+yes, broker performs auth&auth while creating producer/consumer and this information presents under namespace policies.. so, if auth is enabled then broker does validation
+
+### From what I?ve seen so far, it seems that I?d instead want to do a partitioned topic when I want a firehose/mix of data, and shuffle that firehose in to specific topics per entity when I?d have more discrete consumers. Is that accurate?
+Precisely, you can use either approach, and even combine them, depending on what is more convenient for the use case. The general traits to choose one or the other are: 
+
+- Partitions -> Maintain a single ?logical? topic but scale throughput to multiple machines. Also, ability to consume in order for a ?partition? of the keys. In general, consumers are assigned a partition (and thus a subset of keys) without specifying anything.
+
+- Multiple topics -> When each topic represent some concrete existing ?concept? in the application and it is ?finite? (eg: using a topic per each user when the number of users is unbound and can be in the 100s of millions it?s not a good idea), within 10s or 100s of thousands. Having multiple topics makes it easier for a consumer to consume a specific portion of messages.
+
+### For subscribing to a large number of topics like that, would i need to call `subscribe` for each one individually, or is there some sort of wildcard capability?
+Currently you can only subscribe individually, (though you can automate it by getting the list of topics and going through it), but we?re working on the wildcard subscribe and we?re targeting that for next release.
+
+### Hi, is the difference between a consumer and a reader documented somewhere?
+Main difference: a reader can be used when manually managing the offset/messageId, rather than relying on Pulsar to keep track of it with the acknowledgments
+- consumer -> managed subscriptions with acks and auto retention
+- reader -> always specify start message id on creation
+
+
+### Hey, question on routing mode for partitioned topics. What is the default configuration and what is used in the Kafka adaptor?
+The default is to use the hash of the key on a message. If the message has no key, the producer will use a ?default? partition (picks 1 random partition and use it for all the messages it publishes). 
+
+This is to maintain the same ordering guarantee when no partitions are there: per-producer ordering.
+
+The same applies when using the Kafka wrapper.
+
+### I'm setting up bookies on AWS d2.4xlarge instances (16 cores, 122G memory, 12x2TB raid-0 hd). Do you have any recommendation for memory configuration for this kind of setup? For configurations like java heap, direct memory and dbStorage_writeCacheMaxSizeMb, dbStorage_readAheadCacheMaxSizeMb, dbStorage_rocksDB_blockCacheSize. BTW, I'm going to use journalSyncData=false since we cannot recover machines when they shutdown. So no fsync is required for every message.
+Since the VM has lot of RAM you can increase a lot from the defaults and leave the rest page cache. For JVM heap I'd say ~24g. WriteCacheMaxSize and ReadAheadCacheMaxSize are both coming from JVM direct memory.  I'd say to start with 16g @ 16g. For rocksdb block cache, which is allocated in JNI so it's completely out of JVM configuration, ideally you want to cache most of the indexes. I'd say 4gb should be enough to index all the data in the 24Tb storage space.
+
+### When there are multiple consumers for a topic, the broker reads once from bookies and send them to all consumers with some buffer? or go get from bookies all the time for each consumers ?
+In general, all dispatching is done directly by broker memory. We only read from bookies when consumer are falling behind.
+
+
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 0d2c41b4b..2b43a7997 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -138,6 +138,18 @@
      */
     public ManagedCursor openCursor(String name) throws InterruptedException, ManagedLedgerException;
 
+    /**
+     * Open a ManagedCursor in this ManagedLedger.
+     * <p>
+     * If the cursors doesn't exist, a new one will be created and its position will be at the end of the ManagedLedger.
+     *
+     * @param name
+     *            the name associated with the ManagedCursor
+     * @return the ManagedCursor
+     * @throws ManagedLedgerException
+     */
+    public ManagedCursor openCursor(String name, boolean initializeOnLatest) throws InterruptedException, ManagedLedgerException;
+
     /**
      * Creates a new cursor whose metadata is not backed by durable storage. A caller can treat the non-durable cursor
      * exactly like a normal cursor, with the only difference in that after restart it will not remember which entries
@@ -193,6 +205,22 @@
      */
     public void asyncOpenCursor(String name, OpenCursorCallback callback, Object ctx);
 
+    /**
+     * Open a ManagedCursor asynchronously.
+     *
+     * @see #openCursor(String)
+     * @param name
+     *            the name associated with the ManagedCursor
+     * @param callback
+     *            callback object
+     * @param ctx
+     *            opaque context
+     * @param initializeOnLatest
+     *            the cursor will be set at lastest position or not when first created
+     *            default is <b>true</b>
+     */
+    public void asyncOpenCursor(String name, OpenCursorCallback callback, Object ctx, boolean initializeOnLatest);
+
     /**
      * Get a list of all the cursors reading from this ManagedLedger
      *
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 68a764511..75a4e659a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -545,7 +545,12 @@ public synchronized void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback
     }
 
     @Override
-    public ManagedCursor openCursor(String cursorName) throws InterruptedException, ManagedLedgerException {
+    public ManagedCursor openCursor(String cursorName) throws InterruptedException, ManagedLedgerException{
+        return openCursor(cursorName, true);
+    }
+
+    @Override
+    public ManagedCursor openCursor(String cursorName, boolean initializeOnLatest) throws InterruptedException, ManagedLedgerException {
         final CountDownLatch counter = new CountDownLatch(1);
         class Result {
             ManagedCursor cursor = null;
@@ -566,7 +571,7 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                 counter.countDown();
             }
 
-        }, null);
+        }, null, initializeOnLatest);
 
         if (!counter.await(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS)) {
             throw new ManagedLedgerException("Timeout during open-cursor operation");
@@ -581,9 +586,13 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
     }
 
     @Override
-    public synchronized void asyncOpenCursor(final String cursorName, final OpenCursorCallback callback,
-            final Object ctx) {
+    public synchronized void asyncOpenCursor(final String cursorName, final OpenCursorCallback callback, Object ctx){
+        this.asyncOpenCursor(cursorName, callback, ctx, true);
+    }
 
+    @Override
+    public synchronized void asyncOpenCursor(final String cursorName, final OpenCursorCallback callback, final Object ctx,
+        final boolean initializeOnLatest){
         try {
             checkManagedLedgerIsOpen();
             checkFenced();
@@ -623,8 +632,8 @@ public void operationComplete() {
                 log.info("[{}] Opened new cursor: {}", name, cursor);
                 cursor.setActive();
                 // Update the ack position (ignoring entries that were written while the cursor was being created)
-                cursor.initializeCursorPosition(getLastPositionAndCounter());
-
+                cursor.initializeCursorPosition(initializeOnLatest? getLastPositionAndCounter() : getFirstPositionAndCounter());
+                
                 synchronized (this) {
                     cursors.add(cursor);
                     uninitializedCursors.remove(cursorName).complete(cursor);
@@ -2026,6 +2035,23 @@ PositionImpl getMarkDeletePositionOfSlowestConsumer() {
         return Pair.create(pos, count);
     }
 
+        /**
+     * Get the first position written in the managed ledger, alongside with the associated counter
+     */
+    Pair<PositionImpl, Long> getFirstPositionAndCounter() {
+        PositionImpl pos;
+        long count;
+
+        do {
+            pos = getFirstPosition();
+            count = ENTRIES_ADDED_COUNTER_UPDATER.get(this);
+
+            // Ensure no entry was written while reading the two values
+        } while (pos.compareTo(getFirstPosition()) != 0);
+
+        return Pair.create(pos, count);
+    }
+
     public void activateCursor(ManagedCursor cursor) {
         if (activeCursors.get(cursor.getName()) == null) {
             activeCursors.add(cursor);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
index a3b246e01..72d5f657b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
@@ -1035,7 +1035,8 @@ public void resetCursor(@PathParam("property") String property, @PathParam("clus
     public void createSubscription(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination,
             @PathParam("subscriptionName") String subscriptionName,
-            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) throws PulsarServerException {
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId,
+            @QueryParam("initializeOnLatest") @DefaultValue("true") boolean initializeOnLatest) throws PulsarServerException {
         destination = decode(destination);
         DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination);
         if (cluster.equals(Namespaces.GLOBAL_CLUSTER)) {
@@ -1069,7 +1070,7 @@ public void createSubscription(@PathParam("property") String property, @PathPara
                 }
 
                 PersistentSubscription subscription = (PersistentSubscription) topic
-                        .createSubscription(subscriptionName).get();
+                        .createSubscription(subscriptionName, initializeOnLatest).get();
                 subscription.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get();
                 log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(), dn,
                         subscriptionName, messageId);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 05fb749c3..e8f99cc42 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -551,6 +551,9 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
         final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
         final boolean readCompacted = subscribe.getReadCompacted();
         final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
+        final boolean initializeOnLatest = subscribe.hasInitializeOnLatest()
+                    ? subscribe.getInitializeOnLatest()
+                    : true;
 
         CompletableFuture<Boolean> isProxyAuthorizedFuture;
         if (service.isAuthorizationEnabled() && originalPrincipal != null) {
@@ -613,7 +616,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
                         service.getTopic(topicName.toString())
                                 .thenCompose(topic -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
                                                                       subType, priorityLevel, consumerName, isDurable,
-                                                                      startMessageId, metadata, readCompacted))
+                                                                      startMessageId, metadata, readCompacted, initializeOnLatest))
                                 .thenAccept(consumer -> {
                                     if (consumerFuture.complete(consumer)) {
                                         log.info("[{}] Created subscription on topic {} / {}", remoteAddress, topicName,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 7a426b7e4..24e1cd680 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -79,9 +79,9 @@ default long getOriginalSequenceId() {
 
     CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType,
             int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
-            Map<String, String> metadata, boolean readCompacted);
+            Map<String, String> metadata, boolean readCompacted, boolean initializeOnLatest);
 
-    CompletableFuture<Subscription> createSubscription(String subscriptionName);
+    CompletableFuture<Subscription> createSubscription(String subscriptionName, boolean initializeOnLatest);
 
     CompletableFuture<Void> unsubscribe(String subName);
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 5119edc4a..5bfd86b7a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -298,7 +298,7 @@ public void removeProducer(Producer producer) {
     @Override
     public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
             SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
-            Map<String, String> metadata, boolean readCompacted) {
+            Map<String, String> metadata, boolean readCompacted, boolean initializeOnLatest) {
 
         final CompletableFuture<Consumer> future = new CompletableFuture<>();
 
@@ -372,7 +372,7 @@ public void removeProducer(Producer producer) {
     }
 
     @Override
-    public CompletableFuture<Subscription> createSubscription(String subscriptionName) {
+    public CompletableFuture<Subscription> createSubscription(String subscriptionName, boolean initializeOnLatest) {
         return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName));
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 6333f3a89..01f1fb50d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -259,7 +259,7 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                             future.completeExceptionally(exception);
                         }
 
-                    }, null);
+                    }, null, true);
                     return future;
                 } else {
                     // Nothing to do, we are in the correct state
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 57d1d7583..a8b6ba39e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -413,7 +413,7 @@ public void removeProducer(Producer producer) {
     @Override
     public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
             SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
-            Map<String, String> metadata, boolean readCompacted) {
+            Map<String, String> metadata, boolean readCompacted, boolean initializeOnLatest) {
 
         final CompletableFuture<Consumer> future = new CompletableFuture<>();
 
@@ -461,7 +461,7 @@ public void removeProducer(Producer producer) {
         }
 
         CompletableFuture<? extends Subscription> subscriptionFuture = isDurable ? //
-                getDurableSubscription(subscriptionName) //
+                getDurableSubscription(subscriptionName, initializeOnLatest) //
                 : getNonDurableSubscription(subscriptionName, startMessageId);
 
         int maxUnackedMessages  = isDurable ? brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerConsumer() :0;
@@ -504,7 +504,7 @@ public void removeProducer(Producer producer) {
         return future;
     }
 
-    private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName) {
+    private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName, boolean initializeOnLatest) {
         CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
         ledger.asyncOpenCursor(Codec.encode(subscriptionName), new OpenCursorCallback() {
             @Override
@@ -512,7 +512,7 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}][{}] Opened cursor", topic, subscriptionName);
                 }
-
+                
                 subscriptionFuture.complete(subscriptions.computeIfAbsent(subscriptionName,
                         name -> createPersistentSubscription(subscriptionName, cursor)));
             }
@@ -523,7 +523,7 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                 USAGE_COUNT_UPDATER.decrementAndGet(PersistentTopic.this);
                 subscriptionFuture.completeExceptionally(new PersistenceException(exception));
             }
-        }, null);
+        }, null, initializeOnLatest);
         return subscriptionFuture;
     }
 
@@ -569,8 +569,8 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
 
     @SuppressWarnings("unchecked")
     @Override
-    public CompletableFuture<Subscription> createSubscription(String subscriptionName) {
-        return getDurableSubscription(subscriptionName);
+    public CompletableFuture<Subscription> createSubscription(String subscriptionName, boolean initializeOnLatest) {
+        return getDurableSubscription(subscriptionName, initializeOnLatest);
     }
 
     /**
@@ -892,7 +892,7 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                 future.completeExceptionally(new PersistenceException(exception));
             }
 
-        }, null);
+        }, null, true);
 
         return future;
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index a68165d5f..4e57e883c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -173,7 +173,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                 ((OpenCursorCallback) invocationOnMock.getArguments()[1]).openCursorComplete(cursorMock, null);
                 return null;
             }
-        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(OpenCursorCallback.class), anyObject());
+        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(OpenCursorCallback.class), anyObject(), any(Boolean.class));
 
         // call deleteLedgerComplete on ledger asyncDelete
         doAnswer(new Answer<Object>() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index a5a561ddd..2c6687ddd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -115,7 +115,7 @@ void testPersistentMessageFinder() throws Exception {
         config.setMaxEntriesPerLedger(entriesPerLedger);
         config.setRetentionTime(1, TimeUnit.HOURS);
         ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
-        ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);
+        ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName, true);
 
         ledger.addEntry(createMessageWrittenToLedger("retained1"));
         // space apart message publish times
@@ -139,7 +139,7 @@ void testPersistentMessageFinder() throws Exception {
         Thread.sleep(1000);
 
         ledger = factory.open(ledgerAndCursorName, config);
-        c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);
+        c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName, true);
         long endTimestamp = System.currentTimeMillis();
 
         Result result = new Result();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index d69407268..be3c1d652 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -48,6 +48,7 @@
 import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.PersistentTopicTest;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -83,7 +84,7 @@ public void setup(Method m) throws Exception {
         mlFactoryMock = mock(ManagedLedgerFactory.class);
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
         ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
-        final ManagedCursor cursor = ledger.openCursor("c1");
+        final ManagedCursor cursor = ledger.openCursor("c1", true);
         cursorMock = cursor;
         ledgerMock = ledger;
         mlFactoryMock = factory;
@@ -120,7 +121,7 @@ public void testConcurrentTopicAndSubscriptionDelete() throws Exception {
                 .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), true);
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -178,7 +179,7 @@ public void testConcurrentTopicGCAndSubscriptionDelete() throws Exception {
                 .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), true);
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -240,7 +241,7 @@ public void testConcurrentTopicDeleteAndUnsubscribe() throws Exception {
                 .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), true);
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -298,7 +299,7 @@ public void testConcurrentTopicDeleteAndSubsUnsubscribe() throws Exception {
                 .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), true);
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index f76bd9074..0bb22f0d6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -371,7 +371,7 @@ public void testSubscribeFail() throws Exception {
                 .setSubscription("").setRequestId(1).setSubType(SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), true);
         try {
             f1.get();
             fail("should fail with exception");
@@ -390,12 +390,12 @@ public void testSubscribeUnsubscribe() throws Exception {
 
         // 1. simple subscribe
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), true);
         f1.get();
 
         // 2. duplicate subscribe
         Future<Consumer> f2 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), true);
 
         try {
             f2.get();
@@ -501,7 +501,7 @@ public void testDeleteTopic() throws Exception {
                 .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), false /* read compacted */);
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), false /* read compacted */, true);
         f1.get();
 
         assertTrue(topic.delete().isCompletedExceptionally());
@@ -516,7 +516,7 @@ public void testDeleteAndUnsubscribeTopic() throws Exception {
                 .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), cmd.getInitializeOnLatest());
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -570,7 +570,7 @@ public void testConcurrentTopicAndSubscriptionDelete() throws Exception {
                 .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), true);
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -657,8 +657,8 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                 .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
 
         Future<Consumer> f = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
-
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), true);
+     
         try {
             f.get();
             fail("should have failed");
@@ -741,6 +741,16 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
             }
         }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(OpenCursorCallback.class), anyObject());
 
+        // call openCursorComplete on cursor asyncOpen
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                ((OpenCursorCallback) invocationOnMock.getArguments()[1]).openCursorComplete(cursorMock, null);
+                return null;
+            }
+        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(OpenCursorCallback.class), anyObject(), any(Boolean.class));
+        
+
         // call deleteLedgerComplete on ledger asyncDelete
         doAnswer(new Answer<Object>() {
             @Override
@@ -775,7 +785,7 @@ public void testFailoverSubscription() throws Exception {
         // 1. Subscribe with non partition topic
         Future<Consumer> f1 = topic1.subscribe(serverCnx, cmd1.getSubscription(), cmd1.getConsumerId(),
                 cmd1.getSubType(), 0, cmd1.getConsumerName(), cmd1.getDurable(), null, Collections.emptyMap(),
-                cmd1.getReadCompacted());
+                cmd1.getReadCompacted(), true);
         f1.get();
 
         // 2. Subscribe with partition topic
@@ -787,7 +797,7 @@ public void testFailoverSubscription() throws Exception {
 
         Future<Consumer> f2 = topic2.subscribe(serverCnx, cmd2.getSubscription(), cmd2.getConsumerId(),
                 cmd2.getSubType(), 0, cmd2.getConsumerName(), cmd2.getDurable(), null, Collections.emptyMap(),
-                cmd2.getReadCompacted());
+                cmd2.getReadCompacted(), true);
         f2.get();
 
         // 3. Subscribe and create second consumer
@@ -797,7 +807,7 @@ public void testFailoverSubscription() throws Exception {
 
         Future<Consumer> f3 = topic2.subscribe(serverCnx, cmd3.getSubscription(), cmd3.getConsumerId(),
                 cmd3.getSubType(), 0, cmd3.getConsumerName(), cmd3.getDurable(), null, Collections.emptyMap(),
-                cmd3.getReadCompacted());
+                cmd3.getReadCompacted(), true);
         f3.get();
 
         assertEquals(
@@ -818,7 +828,7 @@ public void testFailoverSubscription() throws Exception {
 
         Future<Consumer> f4 = topic2.subscribe(serverCnx, cmd4.getSubscription(), cmd4.getConsumerId(),
                 cmd4.getSubType(), 0, cmd4.getConsumerName(), cmd4.getDurable(), null, Collections.emptyMap(),
-                cmd4.getReadCompacted());
+                cmd4.getReadCompacted(), true);
         f4.get();
 
         assertEquals(
@@ -844,7 +854,7 @@ public void testFailoverSubscription() throws Exception {
 
         Future<Consumer> f5 = topic2.subscribe(serverCnx, cmd5.getSubscription(), cmd5.getConsumerId(),
                 cmd5.getSubType(), 0, cmd5.getConsumerName(), cmd5.getDurable(), null, Collections.emptyMap(),
-                cmd5.getReadCompacted());
+                cmd5.getReadCompacted(), true);
 
         try {
             f5.get();
@@ -861,7 +871,7 @@ public void testFailoverSubscription() throws Exception {
 
         Future<Consumer> f6 = topic2.subscribe(serverCnx, cmd6.getSubscription(), cmd6.getConsumerId(),
                 cmd6.getSubType(), 0, cmd6.getConsumerName(), cmd6.getDurable(), null, Collections.emptyMap(),
-                cmd6.getReadCompacted());
+                cmd6.getReadCompacted(), true);
         f6.get();
 
         // 7. unsubscribe exclusive sub
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index bc0e56180..f3de34246 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -1420,7 +1420,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                 ((OpenCursorCallback) invocationOnMock.getArguments()[1]).openCursorComplete(cursorMock, null);
                 return null;
             }
-        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(OpenCursorCallback.class), anyObject());
+        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(OpenCursorCallback.class), anyObject(), any(Boolean.class));
 
         doAnswer(new Answer<Object>() {
             @Override
@@ -1430,7 +1430,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                         .openCursorFailed(new ManagedLedgerException("Managed ledger failure"), null);
                 return null;
             }
-        }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(OpenCursorCallback.class), anyObject());
+        }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(OpenCursorCallback.class), anyObject(), any(Boolean.class));
 
         doAnswer(new Answer<Object>() {
             @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 4200d925d..6848604c5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -2447,4 +2447,40 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test
+    public void testConsumerSubscriptionInitialize() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        String topic = "persistent://my-property/use/my-ns/my-topic-test-subscription-initialize";
+        
+        Producer producer = pulsarClient.createProducer(topic);
+
+        // first produce 5 messages
+        for (int i = 0; i < 5; i++) {
+            final String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        // second create consumers
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        Consumer defaultConsumer = pulsarClient.subscribe(topic, "test-subscription-default");
+        conf.setInitializeSubscriptionOnLatest(true);
+        Consumer latestConsumer = pulsarClient.subscribe(topic, "test-subscription-latest", conf);
+        conf.setInitializeSubscriptionOnLatest(false);
+        Consumer earlistConsumer = pulsarClient.subscribe(topic, "test-subscription-earliest", conf);
+
+        // third produce 5 messages
+        for (int i = 5; i < 10; i++) {
+            final String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        assertEquals(defaultConsumer.receive().getData(), "my-message-5".getBytes());
+        assertEquals(latestConsumer.receive().getData(), "my-message-5".getBytes());
+        assertEquals(earlistConsumer.receive().getData(), "my-message-0".getBytes());
+
+        defaultConsumer.close();
+        latestConsumer.close();
+        earlistConsumer.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index 19cdc3155..ed4346f48 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -326,12 +326,12 @@ public void testAcknowledgeWithProperties() throws Exception {
         PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic);
         ManagedLedger ledger = topicRef.getManagedLedger();
         for (int i = 0; i < 30; i++) {
-            if (ledger.openCursor(subscription).getProperties().get("foobar") == Long.valueOf(0xdeadbeefdecaL)) {
+            if (ledger.openCursor(subscription, true).getProperties().get("foobar") == Long.valueOf(0xdeadbeefdecaL)) {
                 break;
             }
             Thread.sleep(100);
         }
-        Assert.assertEquals(ledger.openCursor(subscription).getProperties().get("foobar"),
+        Assert.assertEquals(ledger.openCursor(subscription, true).getProperties().get("foobar"),
                 Long.valueOf(0xdeadbeefdecaL));
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
index 76928bd1c..53acffa9d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
@@ -63,6 +63,7 @@
 
     private boolean readCompacted = false;
 
+    private boolean initializeSubscriptionOnLatest = true;
     /**
      * @return the configured timeout in milliseconds for unacked messages.
      */
@@ -321,4 +322,21 @@ public ConsumerConfiguration setProperties(Map<String, String> properties) {
     public Map<String, String> getProperties() {
         return properties;
     }
+    
+     /** 
+     * @param initializeSubscriptionOnLatest the initializeSubscriptionOnLatest to set
+     * Set cursor position when subscribing to the topic first time
+     * <p>
+     * Default is {@value true} which means {@link MessageId.lastest}
+     */
+    public void setInitializeSubscriptionOnLatest(boolean initializeSubscriptionOnLatest) {
+        this.initializeSubscriptionOnLatest = initializeSubscriptionOnLatest;
+    }   
+
+    /** 
+     * @return the configured {@link initializedSubscriptionOnLatest} for the consumer
+     */
+    public boolean getInitializeSubscriptionOnLatest(){
+        return this.initializeSubscriptionOnLatest;
+    }   
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 671929c5e..20a025076 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -107,6 +107,7 @@
     private final Map<String, String> metadata;
 
     private final boolean readCompacted;
+    private final boolean initializeSubscriptionOnLatest;
 
     enum SubscriptionMode {
         // Make the subscription to be backed by a durable cursor that will retain messages and persist the current
@@ -138,6 +139,7 @@
         this.priorityLevel = conf.getPriorityLevel();
         this.batchMessageAckTracker = new ConcurrentSkipListMap<>();
         this.readCompacted = conf.getReadCompacted();
+        this.initializeSubscriptionOnLatest = conf.getInitializeSubscriptionOnLatest();
 
         if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
             stats = new ConsumerStats(client, conf, this);
@@ -554,7 +556,7 @@ void connectionOpened(final ClientCnx cnx) {
         }
 
         ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel,
-                consumerName, isDurable, startMessageIdData, metadata, readCompacted);
+                consumerName, isDurable, startMessageIdData, metadata, readCompacted, initializeSubscriptionOnLatest);
         if (startMessageIdData != null) {
             startMessageIdData.recycle();
         }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 4376b013f..46f6cfc1d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -323,12 +323,12 @@ public static ByteBufPair newSend(long producerId, long sequenceId, int numMessa
     public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
             SubType subType, int priorityLevel, String consumerName) {
         return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName,
-                true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false);
+                true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false, true /* initializeOnLatest */);
     }
 
     public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
             SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId,
-            Map<String, String> metadata, boolean readCompacted) {
+            Map<String, String> metadata, boolean readCompacted, boolean initializeOnLatest) {
         CommandSubscribe.Builder subscribeBuilder = CommandSubscribe.newBuilder();
         subscribeBuilder.setTopic(topic);
         subscribeBuilder.setSubscription(subscription);
@@ -339,6 +339,7 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu
         subscribeBuilder.setPriorityLevel(priorityLevel);
         subscribeBuilder.setDurable(isDurable);
         subscribeBuilder.setReadCompacted(readCompacted);
+        subscribeBuilder.setInitializeOnLatest(initializeOnLatest);
         if (startMessageId != null) {
             subscribeBuilder.setStartMessageId(startMessageId);
         }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 8b1ee0e72..e6bd463ce 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -5833,6 +5833,10 @@ public Builder clearProtocolVersion() {
     // optional bool read_compacted = 11;
     boolean hasReadCompacted();
     boolean getReadCompacted();
+    
+    // optional bool initialize_on_latest = 12 [default = true];
+    boolean hasInitializeOnLatest();
+    boolean getInitializeOnLatest();
   }
   public static final class CommandSubscribe extends
       com.google.protobuf.GeneratedMessageLite
@@ -6102,6 +6106,16 @@ public boolean getReadCompacted() {
       return readCompacted_;
     }
     
+    // optional bool initialize_on_latest = 12 [default = true];
+    public static final int INITIALIZE_ON_LATEST_FIELD_NUMBER = 12;
+    private boolean initializeOnLatest_;
+    public boolean hasInitializeOnLatest() {
+      return ((bitField0_ & 0x00000400) == 0x00000400);
+    }
+    public boolean getInitializeOnLatest() {
+      return initializeOnLatest_;
+    }
+    
     private void initFields() {
       topic_ = "";
       subscription_ = "";
@@ -6114,6 +6128,7 @@ private void initFields() {
       startMessageId_ = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
       metadata_ = java.util.Collections.emptyList();
       readCompacted_ = false;
+      initializeOnLatest_ = true;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -6197,6 +6212,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr
       if (((bitField0_ & 0x00000200) == 0x00000200)) {
         output.writeBool(11, readCompacted_);
       }
+      if (((bitField0_ & 0x00000400) == 0x00000400)) {
+        output.writeBool(12, initializeOnLatest_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -6249,6 +6267,10 @@ public int getSerializedSize() {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(11, readCompacted_);
       }
+      if (((bitField0_ & 0x00000400) == 0x00000400)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(12, initializeOnLatest_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -6384,6 +6406,8 @@ public Builder clear() {
         bitField0_ = (bitField0_ & ~0x00000200);
         readCompacted_ = false;
         bitField0_ = (bitField0_ & ~0x00000400);
+        initializeOnLatest_ = true;
+        bitField0_ = (bitField0_ & ~0x00000800);
         return this;
       }
       
@@ -6462,6 +6486,10 @@ public Builder clone() {
           to_bitField0_ |= 0x00000200;
         }
         result.readCompacted_ = readCompacted_;
+        if (((from_bitField0_ & 0x00000800) == 0x00000800)) {
+          to_bitField0_ |= 0x00000400;
+        }
+        result.initializeOnLatest_ = initializeOnLatest_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -6508,6 +6536,9 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandSub
         if (other.hasReadCompacted()) {
           setReadCompacted(other.getReadCompacted());
         }
+        if (other.hasInitializeOnLatest()) {
+          setInitializeOnLatest(other.getInitializeOnLatest());
+        }
         return this;
       }
       
@@ -6634,6 +6665,11 @@ public Builder mergeFrom(
               readCompacted_ = input.readBool();
               break;
             }
+            case 96: {
+              bitField0_ |= 0x00000800;
+              initializeOnLatest_ = input.readBool();
+              break;
+            }
           }
         }
       }
@@ -7009,6 +7045,27 @@ public Builder clearReadCompacted() {
         return this;
       }
       
+      // optional bool initialize_on_latest = 12 [default = true];
+      private boolean initializeOnLatest_ = true;
+      public boolean hasInitializeOnLatest() {
+        return ((bitField0_ & 0x00000800) == 0x00000800);
+      }
+      public boolean getInitializeOnLatest() {
+        return initializeOnLatest_;
+      }
+      public Builder setInitializeOnLatest(boolean value) {
+        bitField0_ |= 0x00000800;
+        initializeOnLatest_ = value;
+        
+        return this;
+      }
+      public Builder clearInitializeOnLatest() {
+        bitField0_ = (bitField0_ & ~0x00000800);
+        initializeOnLatest_ = true;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandSubscribe)
     }
     
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index db2eec4f5..845ad1bbf 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -193,6 +193,9 @@ message CommandSubscribe {
     repeated KeyValue metadata = 10;
 
     optional bool read_compacted = 11;
+	// Signal wthether the subscription will initialize on latest
+	// or not -- earliest
+	optional bool initialize_on_latest = 12 [default = true];
 }
 
 message CommandPartitionedTopicMetadata {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services