You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/03/16 05:29:53 UTC

[GitHub] XiaoZYang closed pull request #1397: Issue 1069: Provide a setting in consumer configuration to specify where to start consuming messages

XiaoZYang closed pull request #1397: Issue 1069: Provide a setting in consumer configuration to specify where to start consuming messages
URL: https://github.com/apache/incubator-pulsar/pull/1397
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 9149bb9f9..8dbb846d1 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -26,6 +26,7 @@
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 
 /**
  * A ManagedLedger it's a superset of a BookKeeper ledger concept.
@@ -132,11 +133,28 @@
      *
      * @param name
      *            the name associated with the ManagedCursor
+     * @param initializeOnLatest
+     *            the flag tell the method wthether it should intialize the cursor at latest position or not.
      * @return the ManagedCursor
      * @throws ManagedLedgerException
      */
     ManagedCursor openCursor(String name) throws InterruptedException, ManagedLedgerException;
 
+    /**
+     * Open a ManagedCursor in this ManagedLedger.
+     * <p>
+     * If the cursors doesn't exist, a new one will be created and its position will be at the end of the ManagedLedger.
+     *
+     * @param name
+     *            the name associated with the ManagedCursor
+     * @param initialPosition
+     *            the cursor will be set at lastest position or not when first created
+     *            default is <b>true</b>
+     * @return the ManagedCursor
+     * @throws ManagedLedgerException
+     */
+    public ManagedCursor openCursor(String name, InitialPosition initialPosition) throws InterruptedException, ManagedLedgerException;
+
     /**
      * Creates a new cursor whose metadata is not backed by durable storage. A caller can treat the non-durable cursor
      * exactly like a normal cursor, with the only difference in that after restart it will not remember which entries
@@ -194,7 +212,23 @@
     void asyncOpenCursor(String name, OpenCursorCallback callback, Object ctx);
 
     /**
-     * Get a list of all the cursors reading from this ManagedLedger.
+     * Open a ManagedCursor asynchronously.
+     *
+     * @see #openCursor(String)
+     * @param name
+     *            the name associated with the ManagedCursor
+     * @param initialPosition
+     *            the cursor will be set at lastest position or not when first created
+     *            default is <b>true</b>
+     * @param callback
+     *            callback object
+     * @param ctx
+     *            opaque context
+     */
+    public void asyncOpenCursor(String name, InitialPosition initialPosition, OpenCursorCallback callback, Object ctx);
+
+    /**
+     * Get a list of all the cursors reading from this ManagedLedger
      *
      * @return a list of cursors
      */
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index ac030e52b..8738185d4 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -78,6 +78,7 @@
 import org.apache.bookkeeper.mledger.util.Pair;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.slf4j.Logger;
@@ -546,7 +547,12 @@ public synchronized void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback
     }
 
     @Override
-    public ManagedCursor openCursor(String cursorName) throws InterruptedException, ManagedLedgerException {
+    public ManagedCursor openCursor(String cursorName) throws InterruptedException, ManagedLedgerException{
+        return openCursor(cursorName, InitialPosition.Latest);
+    }
+
+    @Override
+    public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition) throws InterruptedException, ManagedLedgerException {
         final CountDownLatch counter = new CountDownLatch(1);
         class Result {
             ManagedCursor cursor = null;
@@ -554,7 +560,7 @@ public ManagedCursor openCursor(String cursorName) throws InterruptedException,
         }
         final Result result = new Result();
 
-        asyncOpenCursor(cursorName, new OpenCursorCallback() {
+        asyncOpenCursor(cursorName, initialPosition, new OpenCursorCallback() {
             @Override
             public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                 result.cursor = cursor;
@@ -582,9 +588,12 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
     }
 
     @Override
-    public synchronized void asyncOpenCursor(final String cursorName, final OpenCursorCallback callback,
-            final Object ctx) {
+    public synchronized void asyncOpenCursor(final String cursorName, final OpenCursorCallback callback, Object ctx){
+        this.asyncOpenCursor(cursorName, InitialPosition.Latest, callback, ctx);
+    }
 
+    @Override
+    public synchronized void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition, final OpenCursorCallback callback, final Object ctx){
         try {
             checkManagedLedgerIsOpen();
             checkFenced();
@@ -624,7 +633,7 @@ public void operationComplete() {
                 log.info("[{}] Opened new cursor: {}", name, cursor);
                 cursor.setActive();
                 // Update the ack position (ignoring entries that were written while the cursor was being created)
-                cursor.initializeCursorPosition(getLastPositionAndCounter());
+                cursor.initializeCursorPosition(initialPosition == InitialPosition.Latest ? getLastPositionAndCounter() : getFirstPositionAndCounter());
 
                 synchronized (this) {
                     cursors.add(cursor);
@@ -2030,6 +2039,22 @@ PositionImpl getMarkDeletePositionOfSlowestConsumer() {
         return Pair.create(pos, count);
     }
 
+    /**
+     * Get the first position written in the managed ledger, alongside with the associated counter
+     */
+    Pair<PositionImpl, Long> getFirstPositionAndCounter() {
+        PositionImpl pos;
+        long count;
+        Pair<PositionImpl, Long> lastPositionAndCounter;
+
+        do {
+            pos = getFirstPosition();
+            lastPositionAndCounter = getLastPositionAndCounter();
+            count = lastPositionAndCounter.second - getNumberOfEntries(Range.openClosed(pos, lastPositionAndCounter.first));
+        } while (pos.compareTo(getFirstPosition()) != 0 && lastPositionAndCounter.first.compareTo(getLastPosition()) != 0);
+        return Pair.create(pos, count);
+    }
+
     public void activateCursor(ManagedCursor cursor) {
         if (activeCursors.get(cursor.getName()) == null) {
             activeCursors.add(cursor);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 460e5227d..e8c995c87 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -21,6 +21,7 @@
 import static org.testng.Assert.*;
 
 import com.google.common.base.Charsets;
+import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
@@ -66,6 +67,7 @@
 import org.apache.bookkeeper.mledger.util.Pair;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.pulsar.common.api.ByteBufPair;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
 import org.apache.zookeeper.CreateMode;
@@ -2152,4 +2154,34 @@ public ByteBuf getMessageWithMetadata(byte[] data) throws IOException {
         return ByteBufPair.coalesce(ByteBufPair.get(headers, payload));
     }
 
+    @Test
+    public void testOpenCursorOnLatestAndEarliest() throws Exception {
+        final int MAX_ENTRY_PER_LEDGER = 2;
+        ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(MAX_ENTRY_PER_LEDGER);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("lastest_earliest_ledger", config);
+        // ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("lastest_earliest_ledger");
+        final int totalInsertedEntries = 20;
+        for (int i = 0; i < totalInsertedEntries; i++) {
+            String content = "entry" + i; // 5 bytes
+            ledger.addEntry(content.getBytes());
+        }
+        // Open Cursor also adds cursor into activeCursor-container
+        ManagedCursor latestCursor = ledger.openCursor("c1", InitialPosition.Latest);
+        ManagedCursor earliestCursor = ledger.openCursor("c2", InitialPosition.Earliest);
+
+        // Since getReadPosition returns the next position, we decrease the entryId by 1
+        PositionImpl p1 = (PositionImpl) latestCursor.getReadPosition();
+        PositionImpl p2 = (PositionImpl) earliestCursor.getReadPosition();
+
+        Pair<PositionImpl, Long> latestPositionAndCounter = ledger.getLastPositionAndCounter();
+        Pair<PositionImpl, Long> earliestPositionAndCounter = ledger.getFirstPositionAndCounter();
+
+        assertEquals(latestPositionAndCounter.first.getNext(), p1);
+        assertEquals(earliestPositionAndCounter.first.getNext(), p2);
+
+        assertEquals(latestPositionAndCounter.second.longValue(), totalInsertedEntries);
+        assertEquals(earliestPositionAndCounter.second.longValue(), totalInsertedEntries - earliestCursor.getNumberOfEntriesInBacklog());
+
+        ledger.close();
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index e0ff275f6..353977c40 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -76,6 +76,7 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSeek;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSend;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
@@ -559,6 +560,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
         final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
         final boolean readCompacted = subscribe.getReadCompacted();
         final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
+        final InitialPosition initialPosition = subscribe.getInitialPosition();
 
         CompletableFuture<Boolean> isProxyAuthorizedFuture;
         if (service.isAuthorizationEnabled() && originalPrincipal != null) {
@@ -622,7 +624,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
                         service.getTopic(topicName.toString())
                                 .thenCompose(topic -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
                                                                       subType, priorityLevel, consumerName, isDurable,
-                                                                      startMessageId, metadata, readCompacted))
+                                                                      startMessageId, metadata, readCompacted, initialPosition))
                                 .thenAccept(consumer -> {
                                     if (consumerFuture.complete(consumer)) {
                                         log.info("[{}] Created subscription on topic {} / {}", remoteAddress, topicName,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index be60116a4..24ceea548 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -25,7 +25,9 @@
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicStats;
@@ -80,9 +82,9 @@ default long getOriginalSequenceId() {
 
     CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType,
             int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
-            Map<String, String> metadata, boolean readCompacted);
+            Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition);
 
-    CompletableFuture<Subscription> createSubscription(String subscriptionName);
+    CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition);
 
     CompletableFuture<Void> unsubscribe(String subName);
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 4a6de05e0..56b340787 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -64,6 +64,7 @@
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -322,7 +323,7 @@ public void removeProducer(Producer producer) {
     @Override
     public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
             SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
-            Map<String, String> metadata, boolean readCompacted) {
+            Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition) {
 
         final CompletableFuture<Consumer> future = new CompletableFuture<>();
 
@@ -396,7 +397,7 @@ public void removeProducer(Producer producer) {
     }
 
     @Override
-    public CompletableFuture<Subscription> createSubscription(String subscriptionName) {
+    public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition) {
         return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName));
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index cdca1cd8f..a6aefc299 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -80,6 +80,7 @@
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -442,7 +443,7 @@ public void removeProducer(Producer producer) {
     @Override
     public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
             SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
-            Map<String, String> metadata, boolean readCompacted) {
+            Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition) {
 
         final CompletableFuture<Consumer> future = new CompletableFuture<>();
 
@@ -490,7 +491,7 @@ public void removeProducer(Producer producer) {
         }
 
         CompletableFuture<? extends Subscription> subscriptionFuture = isDurable ? //
-                getDurableSubscription(subscriptionName) //
+                getDurableSubscription(subscriptionName, initialPosition) //
                 : getNonDurableSubscription(subscriptionName, startMessageId);
 
         int maxUnackedMessages  = isDurable ? brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerConsumer() :0;
@@ -533,15 +534,15 @@ public void removeProducer(Producer producer) {
         return future;
     }
 
-    private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName) {
+    private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName, InitialPosition initialPosition) {
         CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
-        ledger.asyncOpenCursor(Codec.encode(subscriptionName), new OpenCursorCallback() {
+        ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, new OpenCursorCallback() {
             @Override
             public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}][{}] Opened cursor", topic, subscriptionName);
                 }
-
+                
                 subscriptionFuture.complete(subscriptions.computeIfAbsent(subscriptionName,
                         name -> createPersistentSubscription(subscriptionName, cursor)));
             }
@@ -598,8 +599,8 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
 
     @SuppressWarnings("unchecked")
     @Override
-    public CompletableFuture<Subscription> createSubscription(String subscriptionName) {
-        return getDurableSubscription(subscriptionName);
+    public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition) {
+        return getDurableSubscription(subscriptionName, initialPosition);
     }
 
     /**
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index 52c976def..77660d39c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -48,6 +48,7 @@
 import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.PersistentTopicTest;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -65,9 +66,7 @@
     private BrokerService brokerService;
     private ManagedLedgerFactory mlFactoryMock;
     private ServerCnx serverCnx;
-    @SuppressWarnings("unused")
     private ManagedLedger ledgerMock;
-    @SuppressWarnings("unused")
     private ManagedCursor cursorMock;
 
     final String successTopicName = "persistent://prop/use/ns-abc/successTopic";
@@ -122,7 +121,7 @@ public void testConcurrentTopicAndSubscriptionDelete() throws Exception {
                 .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), cmd.getInitialPosition());
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -180,7 +179,7 @@ public void testConcurrentTopicGCAndSubscriptionDelete() throws Exception {
                 .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), cmd.getInitialPosition());
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -242,7 +241,7 @@ public void testConcurrentTopicDeleteAndUnsubscribe() throws Exception {
                 .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), cmd.getInitialPosition());
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -300,7 +299,7 @@ public void testConcurrentTopicDeleteAndSubsUnsubscribe() throws Exception {
                 .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), cmd.getInitialPosition());
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index ce053fe02..6392bb619 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -86,17 +86,16 @@
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.compaction.CompactedTopic;
 import org.apache.pulsar.compaction.Compactor;
@@ -336,7 +335,7 @@ public void testAddRemoveProducer() throws Exception {
         String role = "appid1";
         // 1. simple add producer
         Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
-                role, false, null, SchemaVersion.Latest);
+                role, false, null);
         topic.addProducer(producer);
         assertEquals(topic.getProducers().size(), 1);
 
@@ -352,7 +351,7 @@ public void testAddRemoveProducer() throws Exception {
         // 3. add producer for a different topic
         PersistentTopic failTopic = new PersistentTopic(failTopicName, ledgerMock, brokerService);
         Producer failProducer = new Producer(failTopic, serverCnx, 2 /* producer id */, "prod-name",
-                role, false, null, SchemaVersion.Latest);
+                role, false, null);
         try {
             topic.addProducer(failProducer);
             fail("should have failed");
@@ -372,18 +371,18 @@ public void testMaxProducers() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
         String role = "appid1";
         // 1. add producer1
-        Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name1", role, false, null, SchemaVersion.Latest);
+        Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name1", role, false, null);
         topic.addProducer(producer);
         assertEquals(topic.getProducers().size(), 1);
 
         // 2. add producer2
-        Producer producer2 = new Producer(topic, serverCnx, 2 /* producer id */, "prod-name2", role, false, null, SchemaVersion.Latest);
+        Producer producer2 = new Producer(topic, serverCnx, 2 /* producer id */, "prod-name2", role, false, null);
         topic.addProducer(producer2);
         assertEquals(topic.getProducers().size(), 2);
 
         // 3. add producer3 but reached maxProducersPerTopic
         try {
-            Producer producer3 = new Producer(topic, serverCnx, 3 /* producer id */, "prod-name3", role, false, null, SchemaVersion.Latest);
+            Producer producer3 = new Producer(topic, serverCnx, 3 /* producer id */, "prod-name3", role, false, null);
             topic.addProducer(producer3);
             fail("should have failed");
         } catch (BrokerServiceException e) {
@@ -422,7 +421,7 @@ public void testSubscribeFail() throws Exception {
                 .setSubscription("").setRequestId(1).setSubType(SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), cmd.getInitialPosition());
         try {
             f1.get();
             fail("should fail with exception");
@@ -441,12 +440,12 @@ public void testSubscribeUnsubscribe() throws Exception {
 
         // 1. simple subscribe
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), cmd.getInitialPosition());
         f1.get();
 
         // 2. duplicate subscribe
         Future<Consumer> f2 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), cmd.getInitialPosition());
 
         try {
             f2.get();
@@ -702,7 +701,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
 
         try {
             Thread.sleep(10); /* delay to ensure that the ubsubscribe gets executed first */
-            new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
+            Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
                     50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */);
         } catch (BrokerServiceException e) {
             assertTrue(e instanceof BrokerServiceException.SubscriptionFencedException);
@@ -722,7 +721,7 @@ public void testDeleteTopic() throws Exception {
         // 2. delete topic with producer
         topic = (PersistentTopic) brokerService.getTopic(successTopicName).get();
         Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
-                role, false, null, SchemaVersion.Latest);
+                role, false, null);
         topic.addProducer(producer);
 
         assertTrue(topic.delete().isCompletedExceptionally());
@@ -733,7 +732,7 @@ public void testDeleteTopic() throws Exception {
                 .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), false /* read compacted */);
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), false /* read compacted */, cmd.getInitialPosition());
         f1.get();
 
         assertTrue(topic.delete().isCompletedExceptionally());
@@ -748,7 +747,7 @@ public void testDeleteAndUnsubscribeTopic() throws Exception {
                 .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), cmd.getInitialPosition());
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -802,7 +801,7 @@ public void testConcurrentTopicAndSubscriptionDelete() throws Exception {
                 .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), cmd.getInitialPosition());
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -878,7 +877,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
             String role = "appid1";
             Thread.sleep(10); /* delay to ensure that the delete gets executed first */
             Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
-                    role, false, null, SchemaVersion.Latest);
+                    role, false, null);
             topic.addProducer(producer);
             fail("Should have failed");
         } catch (BrokerServiceException e) {
@@ -889,8 +888,8 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                 .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
 
         Future<Consumer> f = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted());
-
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), cmd.getInitialPosition());
+     
         try {
             f.get();
             fail("should have failed");
@@ -908,7 +907,7 @@ void setupMLAsyncCallbackMocks() {
         doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
         doReturn("mockCursor").when(cursorMock).getName();
         // doNothing().when(cursorMock).asyncClose(new CloseCallback() {
-        doAnswer(new Answer<Object>() {
+        doAnswer(new Answer() {
             @Override
             public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                 // return closeFuture.get();
@@ -964,14 +963,16 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
             }
         }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), anyObject());
 
+
         // call openCursorComplete on cursor asyncOpen
         doAnswer(new Answer<Object>() {
             @Override
             public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-                ((OpenCursorCallback) invocationOnMock.getArguments()[1]).openCursorComplete(cursorMock, null);
+                ((OpenCursorCallback) invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
                 return null;
             }
-        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(OpenCursorCallback.class), anyObject());
+        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), anyObject());
+        
 
         // call deleteLedgerComplete on ledger asyncDelete
         doAnswer(new Answer<Object>() {
@@ -1007,7 +1008,7 @@ public void testFailoverSubscription() throws Exception {
         // 1. Subscribe with non partition topic
         Future<Consumer> f1 = topic1.subscribe(serverCnx, cmd1.getSubscription(), cmd1.getConsumerId(),
                 cmd1.getSubType(), 0, cmd1.getConsumerName(), cmd1.getDurable(), null, Collections.emptyMap(),
-                cmd1.getReadCompacted());
+                cmd1.getReadCompacted(), cmd1.getInitialPosition());
         f1.get();
 
         // 2. Subscribe with partition topic
@@ -1019,7 +1020,7 @@ public void testFailoverSubscription() throws Exception {
 
         Future<Consumer> f2 = topic2.subscribe(serverCnx, cmd2.getSubscription(), cmd2.getConsumerId(),
                 cmd2.getSubType(), 0, cmd2.getConsumerName(), cmd2.getDurable(), null, Collections.emptyMap(),
-                cmd2.getReadCompacted());
+                cmd2.getReadCompacted(), cmd2.getInitialPosition());
         f2.get();
 
         // 3. Subscribe and create second consumer
@@ -1029,7 +1030,7 @@ public void testFailoverSubscription() throws Exception {
 
         Future<Consumer> f3 = topic2.subscribe(serverCnx, cmd3.getSubscription(), cmd3.getConsumerId(),
                 cmd3.getSubType(), 0, cmd3.getConsumerName(), cmd3.getDurable(), null, Collections.emptyMap(),
-                cmd3.getReadCompacted());
+                cmd3.getReadCompacted(), cmd3.getInitialPosition());
         f3.get();
 
         assertEquals(
@@ -1050,7 +1051,7 @@ public void testFailoverSubscription() throws Exception {
 
         Future<Consumer> f4 = topic2.subscribe(serverCnx, cmd4.getSubscription(), cmd4.getConsumerId(),
                 cmd4.getSubType(), 0, cmd4.getConsumerName(), cmd4.getDurable(), null, Collections.emptyMap(),
-                cmd4.getReadCompacted());
+                cmd4.getReadCompacted(), cmd4.getInitialPosition());
         f4.get();
 
         assertEquals(
@@ -1076,7 +1077,7 @@ public void testFailoverSubscription() throws Exception {
 
         Future<Consumer> f5 = topic2.subscribe(serverCnx, cmd5.getSubscription(), cmd5.getConsumerId(),
                 cmd5.getSubType(), 0, cmd5.getConsumerName(), cmd5.getDurable(), null, Collections.emptyMap(),
-                cmd5.getReadCompacted());
+                cmd5.getReadCompacted(), cmd5.getInitialPosition());
 
         try {
             f5.get();
@@ -1093,7 +1094,7 @@ public void testFailoverSubscription() throws Exception {
 
         Future<Consumer> f6 = topic2.subscribe(serverCnx, cmd6.getSubscription(), cmd6.getConsumerId(),
                 cmd6.getSubType(), 0, cmd6.getConsumerName(), cmd6.getDurable(), null, Collections.emptyMap(),
-                cmd6.getReadCompacted());
+                cmd6.getReadCompacted(), cmd6.getInitialPosition());
         f6.get();
 
         // 7. unsubscribe exclusive sub
@@ -1160,7 +1161,7 @@ public void testAtomicReplicationRemoval() throws Exception {
 
         final URL brokerUrl = new URL(
                 "http://" + pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getBrokerServicePort());
-        PulsarClient client = PulsarClient.builder().serviceUrl(brokerUrl.toString()).build();
+        PulsarClient client = PulsarClient.create(brokerUrl.toString());
         ManagedCursor cursor = mock(ManagedCursorImpl.class);
         doReturn(remoteCluster).when(cursor).getName();
         brokerService.getReplicationClients().put(remoteCluster, client);
@@ -1190,7 +1191,6 @@ public void testAtomicReplicationRemoval() throws Exception {
         callback.deleteCursorComplete(null);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testClosingReplicationProducerTwice() throws Exception {
         final String globalTopicName = "persistent://prop/global/ns/testClosingReplicationProducerTwice";
@@ -1204,33 +1204,26 @@ public void testClosingReplicationProducerTwice() throws Exception {
 
         final URL brokerUrl = new URL(
                 "http://" + pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getBrokerServicePort());
-        PulsarClient client = spy(PulsarClient.builder().serviceUrl(brokerUrl.toString()).build());
+        PulsarClient client =  spy( PulsarClient.create(brokerUrl.toString()) );
         PulsarClientImpl clientImpl = (PulsarClientImpl) client;
-        doReturn(new CompletableFuture<Producer>()).when(clientImpl)
-            .createProducerAsync(any(ProducerConfigurationData.class), any(Schema.class));
 
         ManagedCursor cursor = mock(ManagedCursorImpl.class);
         doReturn(remoteCluster).when(cursor).getName();
         brokerService.getReplicationClients().put(remoteCluster, client);
         PersistentReplicator replicator = new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService);
 
-        // PersistentReplicator constructor calls startProducer()
-        verify(clientImpl)
-            .createProducerAsync(
-                any(ProducerConfigurationData.class),
-                any(Schema.class)
-            );
+        doReturn(new CompletableFuture<Producer>()).when(clientImpl)
+                .createProducerAsync(any(ProducerConfigurationData.class));
+
+        replicator.startProducer();
+        verify(clientImpl).createProducerAsync(any(ProducerConfigurationData.class));
 
         replicator.disconnect(false);
         replicator.disconnect(false);
 
         replicator.startProducer();
 
-        verify(clientImpl, Mockito.times(2))
-            .createProducerAsync(
-                any(ProducerConfigurationData.class),
-                any(Schema.class)
-            );
+        verify(clientImpl, Mockito.times(2)).createProducerAsync(any(ProducerConfigurationData.class));
     }
 
     @Test
@@ -1259,7 +1252,9 @@ public void testCompactorSubscriptionUpdatedOnInit() throws Exception {
 
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
         CompactedTopic compactedTopic = mock(CompactedTopic.class);
-        new CompactorSubscription(topic, compactedTopic, Compactor.COMPACTION_SUBSCRIPTION, cursorMock);
+        PersistentSubscription sub = new CompactorSubscription(topic, compactedTopic,
+                                                               Compactor.COMPACTION_SUBSCRIPTION,
+                                                               cursorMock);
         verify(compactedTopic, Mockito.times(1)).newCompactedLedger(position, ledgerId);
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 2cc4cf93c..a624ef0c7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -32,6 +32,8 @@
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
@@ -52,7 +54,7 @@
 import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.api.PulsarDecoder;
@@ -94,25 +96,26 @@ protected void cleanup() throws Exception {
     public void testSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1")
-                .subscriptionName("my-subscriber-name").subscribe();
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name",
+                conf);
 
-        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
-                .topic("persistent://my-property/use/my-ns/my-topic1");
+        ProducerConfiguration producerConf = new ProducerConfiguration();
 
         if (batchMessageDelayMs != 0) {
-            producerBuilder.enableBatching(true);
-            producerBuilder.batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
-            producerBuilder.batchingMaxMessages(5);
+            producerConf.setBatchingEnabled(true);
+            producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
+            producerConf.setBatchingMaxMessages(5);
         }
 
-        Producer<byte[]> producer = producerBuilder.create();
+        Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf);
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
         }
 
-        Message<byte[]> msg = null;
+        Message msg = null;
         Set<String> messageSet = Sets.newHashSet();
         for (int i = 0; i < 10; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
@@ -130,18 +133,17 @@ public void testSyncProducerAndConsumer(int batchMessageDelayMs) throws Exceptio
     @Test(dataProvider = "batch")
     public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs) throws Exception {
         log.info("-- Starting {} test --", methodName);
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
-                .subscriptionName("my-subscriber-name").subscribe();
-
-        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
-                .topic("persistent://my-property/use/my-ns/my-topic2");
-
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic2", "my-subscriber-name",
+                conf);
+        ProducerConfiguration producerConf = new ProducerConfiguration();
         if (batchMessageDelayMs != 0) {
-            producerBuilder.enableBatching(true);
-            producerBuilder.batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
-            producerBuilder.batchingMaxMessages(5);
+            producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
+            producerConf.setBatchingMaxMessages(5);
+            producerConf.setBatchingEnabled(true);
         }
-        Producer<byte[]> producer = producerBuilder.create();
+        Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic2", producerConf);
         List<Future<MessageId>> futures = Lists.newArrayList();
 
         // Asynchronously produce messages
@@ -156,7 +158,7 @@ public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs) throws Excepti
             future.get();
         }
 
-        Message<byte[]> msg = null;
+        Message msg = null;
         Set<String> messageSet = Sets.newHashSet();
         for (int i = 0; i < 10; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
@@ -177,28 +179,28 @@ public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs) throws Excepti
     @Test(dataProvider = "batch", timeOut = 100000)
     public void testMessageListener(int batchMessageDelayMs) throws Exception {
         log.info("-- Starting {} test --", methodName);
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
 
         int numMessages = 100;
         final CountDownLatch latch = new CountDownLatch(numMessages);
 
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic3")
-                .subscriptionName("my-subscriber-name").messageListener((c1, msg) -> {
-                    Assert.assertNotNull(msg, "Message cannot be null");
-                    String receivedMessage = new String(msg.getData());
-                    log.debug("Received message [{}] in the listener", receivedMessage);
-                    c1.acknowledgeAsync(msg);
-                    latch.countDown();
-                }).subscribe();
-
-        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
-                .topic("persistent://my-property/use/my-ns/my-topic3");
-
+        conf.setMessageListener((consumer, msg) -> {
+            Assert.assertNotNull(msg, "Message cannot be null");
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message [{}] in the listener", receivedMessage);
+            consumer.acknowledgeAsync(msg);
+            latch.countDown();
+        });
+        Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic3", "my-subscriber-name",
+                conf);
+        ProducerConfiguration producerConf = new ProducerConfiguration();
         if (batchMessageDelayMs != 0) {
-            producerBuilder.enableBatching(true);
-            producerBuilder.batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
-            producerBuilder.batchingMaxMessages(5);
+            producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
+            producerConf.setBatchingMaxMessages(5);
+            producerConf.setBatchingEnabled(true);
         }
-        Producer<byte[]> producer = producerBuilder.create();
+        Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic3", producerConf);
         List<Future<MessageId>> futures = Lists.newArrayList();
 
         // Asynchronously produce messages
@@ -223,17 +225,18 @@ public void testMessageListener(int batchMessageDelayMs) throws Exception {
     public void testBackoffAndReconnect(int batchMessageDelayMs) throws Exception {
         log.info("-- Starting {} test --", methodName);
         // Create consumer and producer
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic4")
-                .subscriptionName("my-subscriber-name").subscribe();
-        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
-                .topic("persistent://my-property/use/my-ns/my-topic4");
-
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic4", "my-subscriber-name",
+                conf);
+        ProducerConfiguration producerConf = new ProducerConfiguration();
         if (batchMessageDelayMs != 0) {
-            producerBuilder.enableBatching(true);
-            producerBuilder.batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
-            producerBuilder.batchingMaxMessages(5);
+            producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
+            producerConf.setBatchingMaxMessages(5);
+            producerConf.setBatchingEnabled(true);
         }
-        Producer<byte[]> producer = producerBuilder.create();
+        Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic4", producerConf);
+
         // Produce messages
         CompletableFuture<MessageId> lastFuture = null;
         for (int i = 0; i < 10; i++) {
@@ -245,7 +248,7 @@ public void testBackoffAndReconnect(int batchMessageDelayMs) throws Exception {
 
         lastFuture.get();
 
-        Message<byte[]> msg = null;
+        Message msg = null;
         for (int i = 0; i < 10; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
             log.info("Received: [{}]", new String(msg.getData()));
@@ -272,17 +275,19 @@ public void testBackoffAndReconnect(int batchMessageDelayMs) throws Exception {
     public void testSendTimeout(int batchMessageDelayMs) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic5")
-                .subscriptionName("my-subscriber-name").subscribe();
-        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
-                .topic("persistent://my-property/use/my-ns/my-topic5").sendTimeout(1, TimeUnit.SECONDS);
-
+        ConsumerConfiguration consumerConf = new ConsumerConfiguration();
+        consumerConf.setSubscriptionType(SubscriptionType.Exclusive);
+        Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic5", "my-subscriber-name",
+                consumerConf);
+        ProducerConfiguration producerConf = new ProducerConfiguration();
         if (batchMessageDelayMs != 0) {
-            producerBuilder.enableBatching(true);
-            producerBuilder.batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
-            producerBuilder.batchingMaxMessages(5);
+            producerConf.setBatchingMaxPublishDelay(2 * batchMessageDelayMs, TimeUnit.MILLISECONDS);
+            producerConf.setBatchingMaxMessages(5);
+            producerConf.setBatchingEnabled(true);
         }
-        Producer<byte[]> producer = producerBuilder.create();
+        producerConf.setSendTimeout(1, TimeUnit.SECONDS);
+
+        Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic5", producerConf);
         final String message = "my-message";
 
         // Trigger the send timeout
@@ -300,7 +305,7 @@ public void testSendTimeout(int batchMessageDelayMs) throws Exception {
         startBroker();
 
         // We should not have received any message
-        Message<byte[]> msg = consumer.receive(3, TimeUnit.SECONDS);
+        Message msg = consumer.receive(3, TimeUnit.SECONDS);
         Assert.assertNull(msg);
         consumer.close();
         log.info("-- Exiting {} test --", methodName);
@@ -310,29 +315,32 @@ public void testSendTimeout(int batchMessageDelayMs) throws Exception {
     public void testInvalidSequence() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        PulsarClient client1 = PulsarClient.builder().serviceUrl("http://127.0.0.1:" + BROKER_WEBSERVICE_PORT).build();
+        PulsarClient client1 = PulsarClient.create("http://127.0.0.1:" + BROKER_WEBSERVICE_PORT);
         client1.close();
 
+        ConsumerConfiguration consumerConf = new ConsumerConfiguration();
+        consumerConf.setSubscriptionType(SubscriptionType.Exclusive);
+
         try {
-            client1.newConsumer().topic("persistent://my-property/use/my-ns/my-topic6")
-                    .subscriptionName("my-subscriber-name").subscribe();
+            Consumer consumer = client1.subscribe("persistent://my-property/use/my-ns/my-topic6", "my-subscriber-name",
+                    consumerConf);
             Assert.fail("Should fail");
         } catch (PulsarClientException e) {
             Assert.assertTrue(e instanceof PulsarClientException.AlreadyClosedException);
         }
 
         try {
-            client1.newProducer().topic("persistent://my-property/use/my-ns/my-topic6").create();
+            Producer producer = client1.createProducer("persistent://my-property/use/my-ns/my-topic6");
             Assert.fail("Should fail");
         } catch (PulsarClientException e) {
             Assert.assertTrue(e instanceof PulsarClientException.AlreadyClosedException);
         }
 
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic6")
-                .subscriptionName("my-subscriber-name").subscribe();
+        Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic6", "my-subscriber-name",
+                consumerConf);
 
         try {
-            Message<byte[]> msg = MessageBuilder.create().setContent("InvalidMessage".getBytes()).build();
+            Message msg = MessageBuilder.create().setContent("InvalidMessage".getBytes()).build();
             consumer.acknowledge(msg);
         } catch (PulsarClientException.InvalidMessageException e) {
             // ok
@@ -354,8 +362,7 @@ public void testInvalidSequence() throws Exception {
             // ok
         }
 
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic6")
-                .create();
+        Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic6");
         producer.close();
 
         try {
@@ -370,72 +377,91 @@ public void testInvalidSequence() throws Exception {
     @Test
     public void testSillyUser() {
         try {
-            PulsarClient.builder().serviceUrl("invalid://url").build();
+            PulsarClient client1 = PulsarClient.create("invalid://url");
             Assert.fail("should fail");
         } catch (PulsarClientException e) {
             Assert.assertTrue(e instanceof PulsarClientException.InvalidServiceURL);
         }
 
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+
         try {
-            pulsarClient.newProducer().sendTimeout(-1, TimeUnit.SECONDS);
+            producerConf.setSendTimeout(-1, TimeUnit.SECONDS);
             Assert.fail("should fail");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            pulsarClient.newProducer().maxPendingMessages(0);
+            producerConf.setMaxPendingMessages(0);
             Assert.fail("should fail");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            pulsarClient.newProducer().topic("invalid://topic").create();
+            Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic7", null);
+            Assert.fail("should fail");
+        } catch (PulsarClientException e) {
+            Assert.assertTrue(e instanceof PulsarClientException.InvalidConfigurationException);
+        }
+
+        try {
+            Producer producer = pulsarClient.createProducer("invalid://topic", producerConf);
             Assert.fail("should fail");
         } catch (PulsarClientException e) {
             Assert.assertTrue(e instanceof PulsarClientException.InvalidTopicNameException);
         }
 
+        ConsumerConfiguration consumerConf = new ConsumerConfiguration();
+
         try {
-            pulsarClient.newConsumer().messageListener(null);
+            consumerConf.setMessageListener(null);
             Assert.fail("should fail");
         } catch (NullPointerException e) {
             // ok
         }
 
         try {
-            pulsarClient.newConsumer().subscriptionType(null);
+            consumerConf.setSubscriptionType(null);
             Assert.fail("should fail");
         } catch (NullPointerException e) {
             // ok
         }
 
         try {
-            pulsarClient.newConsumer().receiverQueueSize(-1);
+            consumerConf.setReceiverQueueSize(-1);
             Assert.fail("should fail");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic7").subscriptionName(null)
-                    .subscribe();
+            Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic7",
+                    "my-subscriber-name", null);
             Assert.fail("Should fail");
         } catch (PulsarClientException e) {
-            assertEquals(e.getClass(), InvalidConfigurationException.class);
+            Assert.assertTrue(e instanceof PulsarClientException.InvalidConfigurationException);
         }
 
         try {
-            pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic7").subscriptionName("")
-                    .subscribe();
+            Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic7", null,
+                    consumerConf);
             Assert.fail("Should fail");
         } catch (PulsarClientException e) {
             Assert.assertTrue(e instanceof PulsarClientException.InvalidConfigurationException);
         }
 
         try {
-            pulsarClient.newConsumer().topic("invalid://topic7").subscriptionName("my-subscriber-name").subscribe();
+            Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic7", "",
+                    consumerConf);
+            Assert.fail("Should fail");
+        } catch (PulsarClientException e) {
+            Assert.assertTrue(e instanceof PulsarClientException.InvalidConfigurationException);
+        }
+
+        try {
+            Consumer consumer = pulsarClient.subscribe("invalid://topic7", "my-subscriber-name", consumerConf);
             Assert.fail("Should fail");
         } catch (PulsarClientException e) {
             Assert.assertTrue(e instanceof PulsarClientException.InvalidTopicNameException);
@@ -450,10 +476,10 @@ public void testConcurrentConsumerReceiveWhileReconnect(int batchMessageDelayMs)
         final int recvQueueSize = 100;
         final int numConsumersThreads = 10;
 
+        final ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setReceiverQueueSize(recvQueueSize);
         String subName = UUID.randomUUID().toString();
-        final Consumer<byte[]> consumer = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/my-topic7").subscriptionName(subName)
-                .receiverQueueSize(recvQueueSize).subscribe();
+        final Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic7", subName, conf);
         ExecutorService executor = Executors.newCachedThreadPool();
 
         final CyclicBarrier barrier = new CyclicBarrier(numConsumersThreads + 1);
@@ -477,22 +503,20 @@ public Void call() throws Exception {
         Thread.sleep(2000);
 
         // publish 100 messages so that the consumers blocked on receive() will now get the messages
-        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
-                .topic("persistent://my-property/use/my-ns/my-topic7");
-
+        ProducerConfiguration producerConf = new ProducerConfiguration();
         if (batchMessageDelayMs != 0) {
-            producerBuilder.batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
-            producerBuilder.batchingMaxMessages(5);
-            producerBuilder.enableBatching(true);
+            producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
+            producerConf.setBatchingMaxMessages(5);
+            producerConf.setBatchingEnabled(true);
         }
-        Producer<byte[]> producer = producerBuilder.create();
+        Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic7", producerConf);
         for (int i = 0; i < recvQueueSize; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
         }
         Thread.sleep(500);
 
-        ConsumerImpl<byte[]> consumerImpl = (ConsumerImpl<byte[]>) consumer;
+        ConsumerImpl consumerImpl = (ConsumerImpl) consumer;
         // The available permits should be 10 and num messages in the queue should be 90
         Assert.assertEquals(consumerImpl.getAvailablePermits(), numConsumersThreads);
         Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - numConsumersThreads);
@@ -517,7 +541,7 @@ public Void call() throws Exception {
 
         // clear the queue
         while (true) {
-            Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
+            Message msg = consumer.receive(1, TimeUnit.SECONDS);
             if (msg == null) {
                 break;
             }
@@ -560,9 +584,8 @@ public void testSendBigMessageSize() throws Exception {
 
         try {
             final String topic = "persistent://my-property/use/my-ns/bigMsg";
-            Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
-            Message<byte[]> message = MessageBuilder.create().setContent(new byte[PulsarDecoder.MaxMessageSize + 1])
-                    .build();
+            Producer producer = pulsarClient.createProducer(topic);
+            Message message = MessageBuilder.create().setContent(new byte[PulsarDecoder.MaxMessageSize + 1]).build();
             producer.send(message);
             fail("Should have thrown exception");
         } catch (PulsarClientException.InvalidMessageException e) {
@@ -591,16 +614,18 @@ public void testSendBigMessageSizeButCompressed() throws Exception {
         final String topic = "persistent://my-property/use/my-ns/bigMsg";
 
         // (a) non-batch msg with compression
-        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).compressionType(CompressionType.LZ4)
-                .create();
-        Message<byte[]> message = MessageBuilder.create().setContent(new byte[PulsarDecoder.MaxMessageSize + 1])
-                .build();
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+        producerConf.setCompressionType(CompressionType.LZ4);
+        Producer producer = pulsarClient.createProducer(topic, producerConf);
+        Message message = MessageBuilder.create().setContent(new byte[PulsarDecoder.MaxMessageSize + 1]).build();
         producer.send(message);
         producer.close();
 
         // (b) batch-msg
-        producer = pulsarClient.newProducer().topic(topic).enableBatching(true).compressionType(CompressionType.LZ4)
-                .create();
+        producerConf = new ProducerConfiguration();
+        producerConf.setBatchingEnabled(true);
+        producerConf.setCompressionType(CompressionType.LZ4);
+        producer = pulsarClient.createProducer(topic, producerConf);
         message = MessageBuilder.create().setContent(new byte[PulsarDecoder.MaxMessageSize + 1]).build();
         try {
             producer.send(message);
@@ -611,7 +636,9 @@ public void testSendBigMessageSizeButCompressed() throws Exception {
         producer.close();
 
         // (c) non-batch msg without compression
-        producer = pulsarClient.newProducer().topic(topic).compressionType(CompressionType.NONE).create();
+        producerConf = new ProducerConfiguration();
+        producerConf.setCompressionType(CompressionType.NONE);
+        producer = pulsarClient.createProducer(topic, producerConf);
         message = MessageBuilder.create().setContent(new byte[PulsarDecoder.MaxMessageSize + 1]).build();
         try {
             producer.send(message);
@@ -622,8 +649,10 @@ public void testSendBigMessageSizeButCompressed() throws Exception {
         producer.close();
 
         // (d) non-batch msg with compression and try to consume message
-        producer = pulsarClient.newProducer().topic(topic).compressionType(CompressionType.LZ4).create();
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe();
+        producerConf = new ProducerConfiguration();
+        producerConf.setCompressionType(CompressionType.LZ4);
+        producer = pulsarClient.createProducer(topic, producerConf);
+        Consumer consumer = pulsarClient.subscribe(topic, "sub1");
         byte[] content = new byte[PulsarDecoder.MaxMessageSize + 10];
         message = MessageBuilder.create().setContent(content).build();
         producer.send(message);
@@ -653,21 +682,23 @@ public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception {
         final String topicName = "cache-topic";
         final String sub1 = "faster-sub1";
         final String sub2 = "slower-sub2";
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(SubscriptionType.Shared);
+        conf.setReceiverQueueSize(receiverSize);
 
-        /************ usecase-1: *************/
-        // 1. Subscriber Faster subscriber
-        Consumer<byte[]> subscriber1 = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/" + topicName).subscriptionName(sub1)
-                .subscriptionType(SubscriptionType.Shared).receiverQueueSize(receiverSize).subscribe();
-        final String topic = "persistent://my-property/use/my-ns/" + topicName;
-        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topic);
+        ProducerConfiguration producerConf = new ProducerConfiguration();
 
         if (batchMessageDelayMs != 0) {
-            producerBuilder.batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
-            producerBuilder.batchingMaxMessages(5);
-            producerBuilder.enableBatching(true);
+            producerConf.setBatchingEnabled(true);
+            producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
+            producerConf.setBatchingMaxMessages(5);
         }
-        Producer<byte[]> producer = producerBuilder.create();
+
+        /************ usecase-1: *************/
+        // 1. Subscriber Faster subscriber
+        Consumer subscriber1 = pulsarClient.subscribe("persistent://my-property/use/my-ns/" + topicName, sub1, conf);
+        final String topic = "persistent://my-property/use/my-ns/" + topicName;
+        Producer producer = pulsarClient.createProducer(topic, producerConf);
 
         PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic);
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger();
@@ -679,7 +710,7 @@ public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception {
         EntryCacheImpl entryCache = spy((EntryCacheImpl) cacheField.get(ledger));
         cacheField.set(ledger, entryCache);
 
-        Message<byte[]> msg = null;
+        Message msg = null;
         // 2. Produce messages
         for (int i = 0; i < 30; i++) {
             String message = "my-message-" + i;
@@ -707,8 +738,7 @@ public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception {
 
         /************ usecase-2: *************/
         // 1.b Subscriber slower-subscriber
-        Consumer<byte[]> subscriber2 = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/" + topicName).subscriptionName(sub2).subscribe();
+        Consumer subscriber2 = pulsarClient.subscribe("persistent://my-property/use/my-ns/" + topicName, sub2, conf);
         // Produce messages
         final int moreMessages = 10;
         for (int i = 0; i < receiverSize + moreMessages; i++) {
@@ -734,10 +764,10 @@ public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception {
 
         // 3.b Close subscriber2: which will trigger cache to clear the cache
         subscriber2.close();
-
+        
         // retry strategically until broker clean up closed subscribers and invalidate all cache entries
         retryStrategically((test) -> entryCache.getSize() == 0, 5, 100);
-
+        
         // Verify: EntryCache should be cleared
         assertTrue(entryCache.getSize() == 0);
         subscriber1.close();
@@ -754,22 +784,24 @@ public void testDeactivatingBacklogConsumer() throws Exception {
         final String topic = "persistent://my-property/use/my-ns/" + topicName;
         final String sub1 = "faster-sub1";
         final String sub2 = "slower-sub2";
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(SubscriptionType.Shared);
+        conf.setReceiverQueueSize(receiverSize);
 
-        // 1. Subscriber Faster subscriber: let it consume all messages immediately
-        Consumer<byte[]> subscriber1 = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/" + topicName).subscriptionName(sub1)
-                .subscriptionType(SubscriptionType.Shared).receiverQueueSize(receiverSize).subscribe();
-        // 1.b. Subscriber Slow subscriber:
-        Consumer<byte[]> subscriber2 = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/" + topicName).subscriptionName(sub2)
-                .subscriptionType(SubscriptionType.Shared).receiverQueueSize(receiverSize).subscribe();
+        ProducerConfiguration producerConf = new ProducerConfiguration();
 
-        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topic);
         if (batchMessageDelayMs != 0) {
-            producerBuilder.enableBatching(true).batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS)
-                    .batchingMaxMessages(5);
+            producerConf.setBatchingEnabled(true);
+            producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
+            producerConf.setBatchingMaxMessages(5);
         }
-        Producer<byte[]> producer = producerBuilder.create();
+
+        // 1. Subscriber Faster subscriber: let it consume all messages immediately
+        Consumer subscriber1 = pulsarClient.subscribe("persistent://my-property/use/my-ns/" + topicName, sub1, conf);
+        // 1.b. Subscriber Slow subscriber:
+        conf.setReceiverQueueSize(receiverSize);
+        Consumer subscriber2 = pulsarClient.subscribe("persistent://my-property/use/my-ns/" + topicName, sub2, conf);
+        Producer producer = pulsarClient.createProducer(topic, producerConf);
 
         PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic);
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger();
@@ -786,7 +818,7 @@ public void testDeactivatingBacklogConsumer() throws Exception {
         field.set(ledger, maxMessageCacheRetentionTimeMillis);
         final long maxActiveCursorBacklogEntries = (long) backlogThresholdField.get(ledger);
 
-        Message<byte[]> msg = null;
+        Message msg = null;
         final int totalMsgs = (int) maxActiveCursorBacklogEntries + receiverSize + 1;
         // 2. Produce messages
         for (int i = 0; i < totalMsgs; i++) {
@@ -834,13 +866,15 @@ public void testAsyncProducerAndConsumer() throws Exception {
         final int totalMsg = 100;
         final Set<String> produceMsgs = Sets.newHashSet();
         final Set<String> consumeMsgs = Sets.newHashSet();
+        final ProducerConfiguration producerConf = new ProducerConfiguration();
+        final ConsumerConfiguration conf = new ConsumerConfiguration();
 
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1")
-                .subscriptionName("my-subscriber-name").subscribe();
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name",
+                conf);
 
         // produce message
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
-                .create();
+        Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf);
         for (int i = 0; i < totalMsg; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
@@ -873,13 +907,15 @@ public void testAsyncProducerAndConsumerWithZeroQueueSize() throws Exception {
         final int totalMsg = 100;
         final Set<String> produceMsgs = Sets.newHashSet();
         final Set<String> consumeMsgs = Sets.newHashSet();
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1")
-                .subscriptionName("my-subscriber-name").subscribe();
-        ;
+        final ProducerConfiguration producerConf = new ProducerConfiguration();
+        final ConsumerConfiguration conf = new ConsumerConfiguration();
+
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name",
+                conf);
 
         // produce message
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
-                .create();
+        Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf);
         for (int i = 0; i < totalMsg; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
@@ -910,11 +946,11 @@ public void testSendCallBack() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
         final int totalMsg = 100;
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
-                .create();
+        final ProducerConfiguration producerConf = new ProducerConfiguration();
+        Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf);
         for (int i = 0; i < totalMsg; i++) {
             final String message = "my-message-" + i;
-            Message<byte[]> msg = MessageBuilder.create().setContent(message.getBytes()).build();
+            Message msg = MessageBuilder.create().setContent(message.getBytes()).build();
             final AtomicInteger msgLength = new AtomicInteger();
             CompletableFuture<MessageId> future = producer.sendAsync(msg).handle((r, ex) -> {
                 if (ex != null) {
@@ -939,22 +975,25 @@ public void testSendCallBack() throws Exception {
     public void testSharedConsumerAckDifferentConsumer() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("my-subscriber-name")
-                .receiverQueueSize(1).subscriptionType(SubscriptionType.Shared);
-        Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
-        Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setReceiverQueueSize(1);
+        conf.setSubscriptionType(SubscriptionType.Shared);
+        Consumer consumer1 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name",
+                conf);
+        Consumer consumer2 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name",
+                conf);
+
+        ProducerConfiguration producerConf = new ProducerConfiguration();
 
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
-                .create();
+        Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf);
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
         }
 
-        Message<byte[]> msg = null;
-        Set<Message<byte[]>> consumerMsgSet1 = Sets.newHashSet();
-        Set<Message<byte[]>> consumerMsgSet2 = Sets.newHashSet();
+        Message msg = null;
+        Set<Message> consumerMsgSet1 = Sets.newHashSet();
+        Set<Message> consumerMsgSet2 = Sets.newHashSet();
         for (int i = 0; i < 5; i++) {
             msg = consumer1.receive();
             consumerMsgSet1.add(msg);
@@ -994,10 +1033,10 @@ public void testSharedConsumerAckDifferentConsumer() throws Exception {
         log.info("-- Exiting {} test --", methodName);
     }
 
-    private void receiveAsync(Consumer<byte[]> consumer, int totalMessage, int currentMessage, CountDownLatch latch,
+    private void receiveAsync(Consumer consumer, int totalMessage, int currentMessage, CountDownLatch latch,
             final Set<String> consumeMsg, ExecutorService executor) throws PulsarClientException {
         if (currentMessage < totalMessage) {
-            CompletableFuture<Message<byte[]>> future = consumer.receiveAsync();
+            CompletableFuture<Message> future = consumer.receiveAsync();
             future.handle((msg, exception) -> {
                 if (exception == null) {
                     // add message to consumer-queue to verify with produced messages
@@ -1023,11 +1062,13 @@ private void receiveAsync(Consumer<byte[]> consumer, int totalMessage, int curre
     }
 
     /**
-     * Verify: Consumer stops receiving msg when reach unack-msg limit and starts receiving once acks messages 1.
-     * Produce X (600) messages 2. Consumer has receive size (10) and receive message without acknowledging 3. Consumer
-     * will stop receiving message after unAckThreshold = 500 4. Consumer acks messages and starts consuming remanining
-     * messages This testcase enables checksum sending while producing message and broker verifies the checksum for the
-     * message.
+     * Verify: Consumer stops receiving msg when reach unack-msg limit and
+     * starts receiving once acks messages
+     * 1. Produce X (600) messages
+     * 2. Consumer has receive size (10) and receive message without acknowledging
+     * 3. Consumer will stop receiving message after unAckThreshold = 500
+     * 4. Consumer acks messages and starts consuming remanining messages
+     * This testcase enables checksum sending while producing message and broker verifies the checksum for the message.
      *
      * @throws Exception
      */
@@ -1042,12 +1083,16 @@ public void testConsumerBlockingWithUnAckedMessages() throws Exception {
             final int totalProducedMsgs = 600;
 
             pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessagesBufferSize);
-            Consumer<byte[]> consumer = pulsarClient.newConsumer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").subscriptionName("subscriber-1")
-                    .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
+            ConsumerConfiguration conf = new ConsumerConfiguration();
+            conf.setReceiverQueueSize(receiverQueueSize);
+            conf.setSubscriptionType(SubscriptionType.Shared);
+            Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/unacked-topic",
+                    "subscriber-1", conf);
 
-            Producer<byte[]> producer = pulsarClient.newProducer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").create();
+            ProducerConfiguration producerConf = new ProducerConfiguration();
+
+            Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic",
+                    producerConf);
 
             // (1) Produced Messages
             for (int i = 0; i < totalProducedMsgs; i++) {
@@ -1056,8 +1101,8 @@ public void testConsumerBlockingWithUnAckedMessages() throws Exception {
             }
 
             // (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
-            Message<byte[]> msg = null;
-            List<Message<byte[]>> messages = Lists.newArrayList();
+            Message msg = null;
+            List<Message> messages = Lists.newArrayList();
             for (int i = 0; i < totalProducedMsgs; i++) {
                 msg = consumer.receive(1, TimeUnit.SECONDS);
                 if (msg != null) {
@@ -1102,11 +1147,18 @@ public void testConsumerBlockingWithUnAckedMessages() throws Exception {
     }
 
     /**
-     * Verify: iteration of a. message receive w/o acking b. stop receiving msg c. ack msgs d. started receiving msgs
+     * Verify: iteration of
+     * a. message receive w/o acking
+     * b. stop receiving msg
+     * c. ack msgs
+     * d. started receiving msgs
      *
-     * 1. Produce total X (1500) messages 2. Consumer consumes messages without acking until stop receiving from broker
-     * due to reaching ack-threshold (500) 3. Consumer acks messages after stop getting messages 4. Consumer again tries
-     * to consume messages 5. Consumer should be able to complete consuming all 1500 messages in 3 iteration (1500/500)
+     * 1. Produce total X (1500) messages
+     * 2. Consumer consumes messages without acking until stop receiving
+     * from broker due to reaching ack-threshold (500)
+     * 3. Consumer acks messages after stop getting messages
+     * 4. Consumer again tries to consume messages
+     * 5. Consumer should be able to complete consuming all 1500 messages in 3 iteration (1500/500)
      *
      * @throws Exception
      */
@@ -1123,12 +1175,16 @@ public void testConsumerBlockingWithUnAckedMessagesMultipleIteration() throws Ex
             // receiver consumes messages in iteration after acknowledging broker
             final int totalReceiveIteration = totalProducedMsgs / unAckedMessagesBufferSize;
             pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessagesBufferSize);
-            Consumer<byte[]> consumer = pulsarClient.newConsumer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").subscriptionName("subscriber-1")
-                    .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
+            ConsumerConfiguration conf = new ConsumerConfiguration();
+            conf.setReceiverQueueSize(receiverQueueSize);
+            conf.setSubscriptionType(SubscriptionType.Shared);
+            Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/unacked-topic",
+                    "subscriber-1", conf);
+
+            ProducerConfiguration producerConf = new ProducerConfiguration();
 
-            Producer<byte[]> producer = pulsarClient.newProducer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").create();
+            Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic",
+                    producerConf);
 
             // (1) Produced Messages
             for (int i = 0; i < totalProducedMsgs; i++) {
@@ -1140,8 +1196,8 @@ public void testConsumerBlockingWithUnAckedMessagesMultipleIteration() throws Ex
             // (2) Receive Messages
             for (int j = 0; j < totalReceiveIteration; j++) {
 
-                Message<byte[]> msg = null;
-                List<Message<byte[]>> messages = Lists.newArrayList();
+                Message msg = null;
+                List<Message> messages = Lists.newArrayList();
                 for (int i = 0; i < totalProducedMsgs; i++) {
                     msg = consumer.receive(1, TimeUnit.SECONDS);
                     if (msg != null) {
@@ -1177,6 +1233,7 @@ public void testConsumerBlockingWithUnAckedMessagesMultipleIteration() throws Ex
         }
     }
 
+
     /**
      * Verify: Consumer1 which doesn't send ack will not impact Consumer2 which sends ack for consumed message.
      *
@@ -1196,13 +1253,18 @@ public void testMutlipleSharedConsumerBlockingWithUnAckedMessages() throws Excep
             int totalReceiveMessages = 0;
 
             pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessages);
-            ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").subscriptionName("subscriber-1")
-                    .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared);
-            Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
-            Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
-            Producer<byte[]> producer = pulsarClient.newProducer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").create();
+            ConsumerConfiguration conf = new ConsumerConfiguration();
+            conf.setReceiverQueueSize(receiverQueueSize);
+            conf.setSubscriptionType(SubscriptionType.Shared);
+            Consumer consumer1 = pulsarClient.subscribe("persistent://my-property/use/my-ns/unacked-topic",
+                    "subscriber-1", conf);
+            Consumer consumer2 = pulsarClient.subscribe("persistent://my-property/use/my-ns/unacked-topic",
+                    "subscriber-1", conf);
+
+            ProducerConfiguration producerConf = new ProducerConfiguration();
+
+            Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic",
+                    producerConf);
 
             // (1) Produced Messages
             for (int i = 0; i < totalProducedMsgs; i++) {
@@ -1210,10 +1272,11 @@ public void testMutlipleSharedConsumerBlockingWithUnAckedMessages() throws Excep
                 producer.send(message.getBytes());
             }
 
+
             // (2) Consumer1: consume without ack:
             // try to consume messages: but will be able to consume number of messages = maxUnackedMessages
-            Message<byte[]> msg = null;
-            List<Message<byte[]>> messages = Lists.newArrayList();
+            Message msg = null;
+            List<Message> messages = Lists.newArrayList();
             for (int i = 0; i < totalProducedMsgs; i++) {
                 msg = consumer1.receive(1, TimeUnit.SECONDS);
                 if (msg != null) {
@@ -1286,13 +1349,17 @@ public void testShouldNotBlockConsumerIfRedeliverBeforeReceive() throws Exceptio
             final int receiverQueueSize = 20;
             final int totalProducedMsgs = 100;
 
-            ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").subscriptionName("subscriber-1")
-                    .receiverQueueSize(receiverQueueSize).ackTimeout(1, TimeUnit.SECONDS)
-                    .subscriptionType(SubscriptionType.Shared).subscribe();
+            ConsumerConfiguration conf = new ConsumerConfiguration();
+            conf.setReceiverQueueSize(receiverQueueSize);
+            conf.setAckTimeout(1, TimeUnit.SECONDS);
+            conf.setSubscriptionType(SubscriptionType.Shared);
+            ConsumerImpl consumer = (ConsumerImpl) pulsarClient
+                    .subscribe("persistent://my-property/use/my-ns/unacked-topic", "subscriber-1", conf);
 
-            Producer<byte[]> producer = pulsarClient.newProducer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").create();
+            ProducerConfiguration producerConf = new ProducerConfiguration();
+
+            Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic",
+                    producerConf);
 
             // (1) Produced Messages
             for (int i = 0; i < totalProducedMsgs; i++) {
@@ -1309,7 +1376,7 @@ public void testShouldNotBlockConsumerIfRedeliverBeforeReceive() throws Exceptio
             assertEquals(consumer.numMessagesInQueue(), receiverQueueSize);
 
             for (int i = 0; i < totalProducedMsgs; i++) {
-                Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
+                Message msg = consumer.receive(1, TimeUnit.SECONDS);
                 if (msg != null) {
                     consumer.acknowledge(msg);
                     totalReceiveMsg++;
@@ -1343,12 +1410,16 @@ public void testUnackBlockRedeliverMessages() throws Exception {
             final int totalProducedMsgs = 100;
 
             pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessagesBufferSize);
-            ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").subscriptionName("subscriber-1")
-                    .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
+            ConsumerConfiguration conf = new ConsumerConfiguration();
+            conf.setReceiverQueueSize(receiverQueueSize);
+            conf.setSubscriptionType(SubscriptionType.Shared);
+            ConsumerImpl consumer = (ConsumerImpl) pulsarClient
+                    .subscribe("persistent://my-property/use/my-ns/unacked-topic", "subscriber-1", conf);
+
+            ProducerConfiguration producerConf = new ProducerConfiguration();
 
-            Producer<byte[]> producer = pulsarClient.newProducer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").create();
+            Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic",
+                    producerConf);
 
             // (1) Produced Messages
             for (int i = 0; i < totalProducedMsgs; i++) {
@@ -1357,8 +1428,8 @@ public void testUnackBlockRedeliverMessages() throws Exception {
             }
 
             // (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
-            Message<byte[]> msg = null;
-            List<Message<byte[]>> messages = Lists.newArrayList();
+            Message msg = null;
+            List<Message> messages = Lists.newArrayList();
             for (int i = 0; i < totalProducedMsgs; i++) {
                 msg = consumer.receive(1, TimeUnit.SECONDS);
                 if (msg != null) {
@@ -1410,20 +1481,22 @@ public void testUnackedBlockAtBatch(int batchMessageDelayMs) throws Exception {
             int totalReceiveMessages = 0;
 
             pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessages);
-            Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").subscriptionName("subscriber-1")
-                    .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
+            ConsumerConfiguration conf = new ConsumerConfiguration();
+            conf.setReceiverQueueSize(receiverQueueSize);
+            conf.setSubscriptionType(SubscriptionType.Shared);
+            Consumer consumer1 = pulsarClient.subscribe("persistent://my-property/use/my-ns/unacked-topic",
+                    "subscriber-1", conf);
 
-            ProducerBuilder<byte[]> producerBuidler = pulsarClient.newProducer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic");
+            ProducerConfiguration producerConf = new ProducerConfiguration();
 
             if (batchMessageDelayMs != 0) {
-                producerBuidler.enableBatching(true);
-                producerBuidler.batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
-                producerBuidler.batchingMaxMessages(5);
+                producerConf.setBatchingEnabled(true);
+                producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
+                producerConf.setBatchingMaxMessages(5);
             }
 
-            Producer<byte[]> producer = producerBuidler.create();
+            Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic",
+                    producerConf);
 
             List<CompletableFuture<MessageId>> futures = Lists.newArrayList();
             // (1) Produced Messages
@@ -1436,8 +1509,8 @@ public void testUnackedBlockAtBatch(int batchMessageDelayMs) throws Exception {
 
             // (2) Consumer1: consume without ack:
             // try to consume messages: but will be able to consume number of messages = maxUnackedMessages
-            Message<byte[]> msg = null;
-            List<Message<byte[]>> messages = Lists.newArrayList();
+            Message msg = null;
+            List<Message> messages = Lists.newArrayList();
             for (int i = 0; i < totalProducedMsgs; i++) {
                 msg = consumer1.receive(1, TimeUnit.SECONDS);
                 if (msg != null) {
@@ -1503,14 +1576,18 @@ public void testBlockUnackConsumerAckByDifferentConsumer() throws Exception {
             int totalReceiveMessages = 0;
 
             pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessages);
-            ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").subscriptionName("subscriber-1")
-                    .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared);
-            Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
-            Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
+            ConsumerConfiguration conf = new ConsumerConfiguration();
+            conf.setReceiverQueueSize(receiverQueueSize);
+            conf.setSubscriptionType(SubscriptionType.Shared);
+            Consumer consumer1 = pulsarClient.subscribe("persistent://my-property/use/my-ns/unacked-topic",
+                    "subscriber-1", conf);
+            Consumer consumer2 = pulsarClient.subscribe("persistent://my-property/use/my-ns/unacked-topic",
+                    "subscriber-1", conf);
 
-            Producer<byte[]> producer = pulsarClient.newProducer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").create();
+            ProducerConfiguration producerConf = new ProducerConfiguration();
+
+            Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic",
+                    producerConf);
 
             // (1) Produced Messages
             for (int i = 0; i < totalProducedMsgs; i++) {
@@ -1518,10 +1595,11 @@ public void testBlockUnackConsumerAckByDifferentConsumer() throws Exception {
                 producer.send(message.getBytes());
             }
 
+
             // (2) Consumer1: consume without ack:
             // try to consume messages: but will be able to consume number of messages = maxUnackedMessages
-            Message<byte[]> msg = null;
-            List<Message<byte[]>> messages = Lists.newArrayList();
+            Message msg = null;
+            List<Message> messages = Lists.newArrayList();
             for (int i = 0; i < totalProducedMsgs; i++) {
                 msg = consumer1.receive(1, TimeUnit.SECONDS);
                 if (msg != null) {
@@ -1533,7 +1611,7 @@ public void testBlockUnackConsumerAckByDifferentConsumer() throws Exception {
                 }
             }
 
-            assertEquals(messages.size(), maxUnackedMessages); // consumer1
+            assertEquals(messages.size(), maxUnackedMessages); //consumer1
 
             // (3) ack for all UnackedMessages from consumer2
             messages.forEach(m -> {
@@ -1584,24 +1662,26 @@ public void testEnabledChecksumClient() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
         final int totalMsg = 10;
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1")
-                .subscriptionName("my-subscriber-name").subscribe();
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name",
+                conf);
 
-        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
-                .topic("persistent://my-property/use/my-ns/my-topic1");
+        ProducerConfiguration producerConf = new ProducerConfiguration();
         final int batchMessageDelayMs = 300;
         if (batchMessageDelayMs != 0) {
-            producerBuilder.enableBatching(true).batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS)
-                    .batchingMaxMessages(5);
+            producerConf.setBatchingEnabled(true);
+            producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
+            producerConf.setBatchingMaxMessages(5);
         }
 
-        Producer<byte[]> producer = producerBuilder.create();
+        Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf);
         for (int i = 0; i < totalMsg; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
         }
 
-        Message<byte[]> msg = null;
+        Message msg = null;
         Set<String> messageSet = Sets.newHashSet();
         for (int i = 0; i < totalMsg; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
@@ -1635,12 +1715,16 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesProduceWithPause()
             final int totalProducedMsgs = 20;
 
             pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessagesBufferSize);
-            ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").subscriptionName("subscriber-1")
-                    .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
+            ConsumerConfiguration conf = new ConsumerConfiguration();
+            conf.setReceiverQueueSize(receiverQueueSize);
+            conf.setSubscriptionType(SubscriptionType.Shared);
+            ConsumerImpl consumer = (ConsumerImpl) pulsarClient
+                    .subscribe("persistent://my-property/use/my-ns/unacked-topic", "subscriber-1", conf);
+
+            ProducerConfiguration producerConf = new ProducerConfiguration();
 
-            Producer<byte[]> producer = pulsarClient.newProducer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").create();
+            Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic",
+                    producerConf);
 
             // (1) Produced Messages
             for (int i = 0; i < totalProducedMsgs; i++) {
@@ -1650,8 +1734,8 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesProduceWithPause()
             }
 
             // (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
-            Message<byte[]> msg = null;
-            List<Message<byte[]>> messages1 = Lists.newArrayList();
+            Message msg = null;
+            List<Message> messages1 = Lists.newArrayList();
             for (int i = 0; i < totalProducedMsgs; i++) {
                 msg = consumer.receive(1, TimeUnit.SECONDS);
                 if (msg != null) {
@@ -1701,8 +1785,8 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesProduceWithPause()
      * It verifies that redelivery-of-specific messages: that redelivers all those messages even when consumer gets
      * blocked due to unacked messsages
      *
-     * Usecase: Consumer starts consuming only after all messages have been produced. So, consumer consumes total
-     * receiver-queue-size number messages => ask for redelivery and receives all messages again.
+     * Usecase: Consumer starts consuming only after all messages have been produced.
+     * So, consumer consumes total receiver-queue-size number messages => ask for redelivery and receives all messages again.
      *
      * @throws Exception
      */
@@ -1717,14 +1801,18 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhile
             final int totalProducedMsgs = 50;
 
             pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessagesBufferSize);
+            ConsumerConfiguration conf = new ConsumerConfiguration();
+            conf.setReceiverQueueSize(receiverQueueSize);
+            conf.setSubscriptionType(SubscriptionType.Shared);
             // Only subscribe consumer
-            ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").subscriptionName("subscriber-1")
-                    .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
+            ConsumerImpl consumer = (ConsumerImpl) pulsarClient
+                    .subscribe("persistent://my-property/use/my-ns/unacked-topic", "subscriber-1", conf);
             consumer.close();
 
-            Producer<byte[]> producer = pulsarClient.newProducer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").create();
+            ProducerConfiguration producerConf = new ProducerConfiguration();
+
+            Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic",
+                    producerConf);
 
             // (1) Produced Messages
             for (int i = 0; i < totalProducedMsgs; i++) {
@@ -1734,13 +1822,12 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhile
             }
 
             // (1.a) start consumer again
-            consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").subscriptionName("subscriber-1")
-                    .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
+            consumer = (ConsumerImpl) pulsarClient.subscribe("persistent://my-property/use/my-ns/unacked-topic",
+                    "subscriber-1", conf);
 
             // (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
-            Message<byte[]> msg = null;
-            List<Message<byte[]>> messages1 = Lists.newArrayList();
+            Message msg = null;
+            List<Message> messages1 = Lists.newArrayList();
             for (int i = 0; i < totalProducedMsgs; i++) {
                 msg = consumer.receive(1, TimeUnit.SECONDS);
                 if (msg != null) {
@@ -1789,16 +1876,24 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhile
     @Test
     public void testPriorityConsumer() throws Exception {
         log.info("-- Starting {} test --", methodName);
-        ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/my-topic2").subscriptionName("my-subscriber-name")
-                .subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).priorityLevel(1);
-
-        Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
-        Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
-        Consumer<byte[]> consumer3 = consumerBuilder.subscribe();
-        Consumer<byte[]> consumer4 = consumerBuilder.clone().priorityLevel(2).subscribe();
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2")
-                .create();
+        ConsumerConfiguration conf1 = new ConsumerConfiguration();
+        conf1.setSubscriptionType(SubscriptionType.Shared);
+        conf1.setPriorityLevel(1);
+        conf1.setReceiverQueueSize(5);
+        ConsumerConfiguration conf4 = new ConsumerConfiguration();
+        conf4.setSubscriptionType(SubscriptionType.Shared);
+        conf4.setPriorityLevel(2);
+        conf4.setReceiverQueueSize(5);
+        Consumer consumer1 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic2", "my-subscriber-name",
+                conf1);
+        Consumer consumer2 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic2", "my-subscriber-name",
+                conf1);
+        Consumer consumer3 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic2", "my-subscriber-name",
+                conf1);
+        Consumer consumer4 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic2", "my-subscriber-name",
+                conf4);
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+        Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic2", producerConf);
         List<Future<MessageId>> futures = Lists.newArrayList();
 
         // Asynchronously produce messages
@@ -1819,8 +1914,9 @@ public void testPriorityConsumer() throws Exception {
         }
 
         /**
-         * a. consumer1 and consumer2 now has more permits (as received and sent more permits) b. try to produce more
-         * messages: which will again distribute among consumer1 and consumer2 and should not dispatch to consumer4
+         * a. consumer1 and consumer2 now has more permits (as received and sent more permits)
+         * b. try to produce more messages: which will again distribute among consumer1 and consumer2
+         * and should not dispatch to consumer4
          *
          */
         for (int i = 0; i < 5; i++) {
@@ -1852,20 +1948,21 @@ public void testPriorityConsumer() throws Exception {
      *
      * @throws Exception
      */
-    @Test(timeOut = 5000)
+    @Test(timeOut=5000)
     public void testSharedSamePriorityConsumer() throws Exception {
         log.info("-- Starting {} test --", methodName);
+        ConsumerConfiguration conf1 = new ConsumerConfiguration();
+        conf1.setSubscriptionType(SubscriptionType.Shared);
         final int queueSize = 5;
+        conf1.setReceiverQueueSize(queueSize);
         int maxUnAckMsgs = pulsar.getConfiguration().getMaxConcurrentLookupRequest();
         pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(queueSize);
-
-        ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/my-topic2").subscriptionName("my-subscriber-name")
-                .subscriptionType(SubscriptionType.Shared).receiverQueueSize(queueSize);
-        Consumer<byte[]> c1 = consumerBuilder.subscribe();
-        Consumer<byte[]> c2 = consumerBuilder.subscribe();
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2")
-                .create();
+        Consumer c1 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic2", "my-subscriber-name",
+                conf1);
+        Consumer c2 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic2", "my-subscriber-name",
+                conf1);
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+        Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic2", producerConf);
         List<Future<MessageId>> futures = Lists.newArrayList();
 
         // Asynchronously produce messages
@@ -1881,11 +1978,11 @@ public void testSharedSamePriorityConsumer() throws Exception {
             future.get();
         }
 
-        List<Message<byte[]>> messages = Lists.newArrayList();
+        List<Message> messages = Lists.newArrayList();
 
         // let consumer1 and consumer2 cosume messages up to the queue will be full
         for (int i = 0; i < totalPublishMessages; i++) {
-            Message<byte[]> msg = c1.receive(500, TimeUnit.MILLISECONDS);
+            Message msg = c1.receive(500, TimeUnit.MILLISECONDS);
             if (msg != null) {
                 messages.add(msg);
             } else {
@@ -1893,7 +1990,7 @@ public void testSharedSamePriorityConsumer() throws Exception {
             }
         }
         for (int i = 0; i < totalPublishMessages; i++) {
-            Message<byte[]> msg = c2.receive(500, TimeUnit.MILLISECONDS);
+            Message msg = c2.receive(500, TimeUnit.MILLISECONDS);
             if (msg != null) {
                 messages.add(msg);
             } else {
@@ -1904,14 +2001,17 @@ public void testSharedSamePriorityConsumer() throws Exception {
         Assert.assertEquals(queueSize * 2, messages.size());
 
         // create new consumers with the same priority
-        Consumer<byte[]> c3 = consumerBuilder.subscribe();
-        Consumer<byte[]> c4 = consumerBuilder.subscribe();
-        Consumer<byte[]> c5 = consumerBuilder.subscribe();
+        Consumer c3 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic2", "my-subscriber-name",
+                conf1);
+        Consumer c4 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic2", "my-subscriber-name",
+                conf1);
+        Consumer c5 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic2", "my-subscriber-name",
+                conf1);
 
         // c1 and c2 are blocked: so, let c3, c4 and c5 consume rest of the messages
 
         for (int i = 0; i < totalPublishMessages; i++) {
-            Message<byte[]> msg = c4.receive(500, TimeUnit.MILLISECONDS);
+            Message msg = c4.receive(500, TimeUnit.MILLISECONDS);
             if (msg != null) {
                 messages.add(msg);
             } else {
@@ -1920,7 +2020,7 @@ public void testSharedSamePriorityConsumer() throws Exception {
         }
 
         for (int i = 0; i < totalPublishMessages; i++) {
-            Message<byte[]> msg = c5.receive(500, TimeUnit.MILLISECONDS);
+            Message msg = c5.receive(500, TimeUnit.MILLISECONDS);
             if (msg != null) {
                 messages.add(msg);
             } else {
@@ -1929,7 +2029,7 @@ public void testSharedSamePriorityConsumer() throws Exception {
         }
 
         for (int i = 0; i < totalPublishMessages; i++) {
-            Message<byte[]> msg = c3.receive(500, TimeUnit.MILLISECONDS);
+            Message msg = c3.receive(500, TimeUnit.MILLISECONDS);
             if (msg != null) {
                 messages.add(msg);
                 c3.acknowledge(msg);
@@ -1958,13 +2058,17 @@ public void testRedeliveryFailOverConsumer() throws Exception {
 
         final int receiverQueueSize = 10;
 
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setReceiverQueueSize(receiverQueueSize);
+        conf.setSubscriptionType(SubscriptionType.Failover);
         // Only subscribe consumer
-        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/unacked-topic").subscriptionName("subscriber-1")
-                .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Failover).subscribe();
+        ConsumerImpl consumer = (ConsumerImpl) pulsarClient
+                .subscribe("persistent://my-property/use/my-ns/unacked-topic", "subscriber-1", conf);
+
+        ProducerConfiguration producerConf = new ProducerConfiguration();
 
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/unacked-topic")
-                .create();
+        Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic",
+                producerConf);
 
         // (1) First round to produce-consume messages
         int consumeMsgInParts = 4;
@@ -1974,8 +2078,8 @@ public void testRedeliveryFailOverConsumer() throws Exception {
             Thread.sleep(10);
         }
         // (1.a) consume first consumeMsgInParts msgs and trigger redeliver
-        Message<byte[]> msg = null;
-        List<Message<byte[]>> messages1 = Lists.newArrayList();
+        Message msg = null;
+        List<Message> messages1 = Lists.newArrayList();
         for (int i = 0; i < consumeMsgInParts; i++) {
             msg = consumer.receive(1, TimeUnit.SECONDS);
             if (msg != null) {
@@ -2036,9 +2140,8 @@ public void testFailReceiveAsyncOnConsumerClose() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
         // (1) simple consumers
-        Consumer<byte[]> consumer = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/failAsyncReceive").subscriptionName("my-subscriber-name")
-                .subscribe();
+        Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/failAsyncReceive",
+                "my-subscriber-name", new ConsumerConfiguration());
         consumer.close();
         // receive messages
         try {
@@ -2052,8 +2155,8 @@ public void testFailReceiveAsyncOnConsumerClose() throws Exception {
         int numPartitions = 4;
         TopicName topicName = TopicName.get("persistent://my-property/use/my-ns/failAsyncReceive");
         admin.persistentTopics().createPartitionedTopic(topicName.toString(), numPartitions);
-        Consumer<byte[]> partitionedConsumer = pulsarClient.newConsumer().topic(topicName.toString())
-                .subscriptionName("my-partitioned-subscriber").subscribe();
+        Consumer partitionedConsumer = pulsarClient.subscribe(topicName.toString(), "my-partitioned-subscriber",
+                new ConsumerConfiguration());
         partitionedConsumer.close();
         // receive messages
         try {
@@ -2073,7 +2176,6 @@ public void testECDSAEncryption() throws Exception {
         class EncKeyReader implements CryptoKeyReader {
 
             EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
-
             @Override
             public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
                 String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
@@ -2110,20 +2212,23 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
         final int totalMsg = 10;
 
         Set<String> messageSet = Sets.newHashSet();
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        conf.setCryptoKeyReader(new EncKeyReader());
+        Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/myecdsa-topic1", "my-subscriber-name",
+                conf);
 
-        Consumer<byte[]> consumer = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/myecdsa-topic1").subscriptionName("my-subscriber-name")
-                .cryptoKeyReader(new EncKeyReader()).subscribe();
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+        producerConf.addEncryptionKey("client-ecdsa.pem");
+        producerConf.setCryptoKeyReader(new EncKeyReader());
 
-        Producer<byte[]> producer = pulsarClient.newProducer()
-                .topic("persistent://my-property/use/my-ns/myecdsa-topic1").addEncryptionKey("client-ecdsa.pem")
-                .cryptoKeyReader(new EncKeyReader()).create();
+        Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/myecdsa-topic1", producerConf);
         for (int i = 0; i < totalMsg; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
         }
 
-        Message<byte[]> msg = null;
+        Message msg = null;
 
         for (int i = 0; i < totalMsg; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
@@ -2145,7 +2250,6 @@ public void testRSAEncryption() throws Exception {
         class EncKeyReader implements CryptoKeyReader {
 
             EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
-
             @Override
             public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
                 String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
@@ -2182,26 +2286,30 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
         final int totalMsg = 10;
 
         Set<String> messageSet = Sets.newHashSet();
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/myrsa-topic1")
-                .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()).subscribe();
-
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/myrsa-topic1")
-                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
-        Producer<byte[]> producer2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/myrsa-topic1")
-                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
-
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        conf.setCryptoKeyReader(new EncKeyReader());
+        Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/myrsa-topic1", "my-subscriber-name",
+                conf);
+
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+        producerConf.addEncryptionKey("client-rsa.pem");
+        producerConf.setCryptoKeyReader(new EncKeyReader());
+
+        Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/myrsa-topic1", producerConf);
+        Producer producer2 = pulsarClient.createProducer("persistent://my-property/use/my-ns/myrsa-topic1", producerConf);
         for (int i = 0; i < totalMsg; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
         }
-        for (int i = totalMsg; i < totalMsg * 2; i++) {
+        for (int i = totalMsg; i < totalMsg*2; i++) {
             String message = "my-message-" + i;
             producer2.send(message.getBytes());
         }
 
-        Message<byte[]> msg = null;
+        Message msg = null;
 
-        for (int i = 0; i < totalMsg * 2; i++) {
+        for (int i = 0; i < totalMsg*2; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
             log.debug("Received message: [{}]", receivedMessage);
@@ -2221,7 +2329,6 @@ public void testEncryptionFailure() throws Exception {
         class EncKeyReader implements CryptoKeyReader {
 
             EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
-
             @Override
             public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
                 String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
@@ -2253,25 +2360,32 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
 
         final int totalMsg = 10;
 
-        Message<byte[]> msg = null;
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+        
+        Message msg = null;
         Set<String> messageSet = Sets.newHashSet();
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
-                .subscriptionName("my-subscriber-name").subscribe();
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/myenc-topic1", "my-subscriber-name",
+                conf);
 
         // 1. Invalid key name
+        producerConf.addEncryptionKey("client-non-existant-rsa.pem");
+        producerConf.setCryptoKeyReader(new EncKeyReader());
+
         try {
-            pulsarClient.newProducer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
-                    .addEncryptionKey("client-non-existant-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
+            Producer producer = pulsarClient.createProducer("persistent://my-property/use/myenc-ns/myenc-topic1", producerConf);
             Assert.fail("Producer creation should not suceed if failing to read key");
         } catch (Exception e) {
             // ok
         }
 
         // 2. Producer with valid key name
-        Producer<byte[]> producer = pulsarClient.newProducer()
-                .topic("persistent://my-property/use/myenc-ns/myenc-topic1").addEncryptionKey("client-rsa.pem")
-                .cryptoKeyReader(new EncKeyReader()).create();
-
+        producerConf = new ProducerConfiguration();
+        producerConf.setCryptoKeyReader(new EncKeyReader());
+        producerConf.addEncryptionKey("client-rsa.pem");
+        Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/myenc-topic1", producerConf);
+        
         for (int i = 0; i < totalMsg; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
@@ -2283,33 +2397,33 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
         Assert.assertNull(msg, "Receive should have failed with no keyreader");
 
         // 4. Set consumer config to consume even if decryption fails
+        conf.setCryptoFailureAction(ConsumerCryptoFailureAction.CONSUME);
         consumer.close();
-        consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
-                .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
-                .subscribe();
-
+        consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/myenc-topic1", "my-subscriber-name",
+                conf);
+        
         int msgNum = 0;
         try {
             // Receive should proceed and deliver encrypted message
             msg = consumer.receive(5, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
             String expectedMessage = "my-message-" + msgNum++;
-            Assert.assertNotEquals(receivedMessage, expectedMessage, "Received encrypted message " + receivedMessage
-                    + " should not match the expected message " + expectedMessage);
+            Assert.assertNotEquals(receivedMessage, expectedMessage,
+                    "Received encrypted message " + receivedMessage + " should not match the expected message " + expectedMessage);
             consumer.acknowledgeCumulative(msg);
         } catch (Exception e) {
-            e.printStackTrace();
             Assert.fail("Failed to receive message even aftet ConsumerCryptoFailureAction.CONSUME is set.");
         }
 
         // 5. Set keyreader and failure action
+        conf.setCryptoFailureAction(ConsumerCryptoFailureAction.FAIL);
         consumer.close();
         // Set keyreader
-        consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
-                .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.FAIL)
-                .cryptoKeyReader(new EncKeyReader()).subscribe();
+        conf.setCryptoKeyReader(new EncKeyReader());
+        consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/myenc-topic1", "my-subscriber-name",
+                conf);
 
-        for (int i = msgNum; i < totalMsg - 1; i++) {
+        for (int i = msgNum; i < totalMsg-1; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
             log.debug("Received message: [{}]", receivedMessage);
@@ -2320,11 +2434,12 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
         consumer.acknowledgeCumulative(msg);
         consumer.close();
 
-        // 6. Set consumer config to discard if decryption fails
+         //6. Set consumer config to discard if decryption fails
         consumer.close();
-        consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
-                .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.DISCARD)
-                .subscribe();
+        ConsumerConfiguration conf2 = new ConsumerConfiguration();
+        conf2.setSubscriptionType(SubscriptionType.Exclusive);
+        conf2.setCryptoFailureAction(ConsumerCryptoFailureAction.DISCARD);
+        consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/myenc-topic1", "my-subscriber-name", conf2);
 
         // Receive should proceed and discard encrypted messages
         msg = consumer.receive(5, TimeUnit.SECONDS);
@@ -2333,4 +2448,40 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test
+    public void testConsumerSubscriptionInitialize() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        String topic = "persistent://my-property/use/my-ns/my-topic-test-subscription-initialize";
+        
+        Producer producer = pulsarClient.createProducer(topic);
+
+        // first produce 5 messages
+        for (int i = 0; i < 5; i++) {
+            final String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        // second create consumers
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        Consumer defaultConsumer = pulsarClient.subscribe(topic, "test-subscription-default");
+        conf.setSubscriptionInitialPosition(SubscriptionInitialPosition.Latest);
+        Consumer latestConsumer = pulsarClient.subscribe(topic, "test-subscription-latest", conf);
+        conf.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+        Consumer earlistConsumer = pulsarClient.subscribe(topic, "test-subscription-earliest", conf);
+
+        // third produce 5 messages
+        for (int i = 5; i < 10; i++) {
+            final String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        assertEquals(defaultConsumer.receive().getData(), "my-message-5".getBytes());
+        assertEquals(latestConsumer.receive().getData(), "my-message-5".getBytes());
+        assertEquals(earlistConsumer.receive().getData(), "my-message-0".getBytes());
+
+        defaultConsumer.close();
+        latestConsumer.close();
+        earlistConsumer.close();
+        log.info("-- Exiting {} test --", methodName);
+    }    
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
index c526af5b9..50df8a533 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
@@ -26,7 +26,7 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 /**
  * Class specifying the configuration of a consumer. In Exclusive subscription, only a single consumer is allowed to
  * attach to the subscription. Other consumers will get an error message. In Shared subscription, multiple consumers
@@ -46,6 +46,7 @@
 
     private final ConsumerConfigurationData<byte[]> conf = new ConsumerConfigurationData<>();
 
+    private boolean initializeSubscriptionOnLatest = true;
     /**
      * @return the configured timeout in milliseconds for unacked messages.
      */
@@ -339,4 +340,21 @@ public ConsumerConfiguration setProperties(Map<String, String> properties) {
     public ConsumerConfigurationData<byte[]> getConfigurationData() {
         return conf;
     }
+    
+     /** 
+     * @param subscriptionInitialPosition the initial position at which to set
+     * set cursor  when subscribing to the topic first time
+     * Default is {@value InitialPosition.Latest}
+     */
+    public ConsumerConfiguration setSubscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition) {
+        conf.setSubscriptionInitialPosition(subscriptionInitialPosition);
+        return this;
+    }   
+
+    /** 
+     * @return the configured {@link subscriptionInitailPosition} for the consumer
+     */
+    public SubscriptionInitialPosition getSubscriptionInitialPosition(){
+        return conf.getSubscriptionInitialPosition();
+    }   
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/SubscriptionInitialPosition.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/SubscriptionInitialPosition.java
new file mode 100644
index 000000000..05deee3b0
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/SubscriptionInitialPosition.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * intial position at which the cursor will be set when subscribe
+ *
+ *
+ */
+public enum SubscriptionInitialPosition {
+    /**
+     * the latest position which means the start consuming position will be the last message
+     */
+    Latest,
+
+    /**
+     * the earliest position which means the start consuming position will be the first message
+     */
+    Earliest,
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index d631fd619..067a67770 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -56,6 +56,7 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarDecoder;
@@ -123,6 +124,7 @@
 
     private final boolean readCompacted;
 
+    private final SubscriptionInitialPosition subscriptionInitialPosition;
     private final ConnectionHandler connectionHandler;
 
     enum SubscriptionMode {
@@ -136,12 +138,12 @@
 
     ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
             ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) {
-        this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, SubscriptionMode.Durable, null, schema);
+        this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, SubscriptionMode.Durable, null, schema, SubscriptionInitialPosition.Latest);
     }
 
     ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
                  ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
-                 SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema) {
+                 SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, SubscriptionInitialPosition subscriptionInitialPosition) {
         super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema);
         this.consumerId = client.newConsumerId();
         this.subscriptionMode = subscriptionMode;
@@ -154,6 +156,7 @@
         this.priorityLevel = conf.getPriorityLevel();
         this.batchMessageAckTracker = new ConcurrentSkipListMap<>();
         this.readCompacted = conf.isReadCompacted();
+        this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition();
 
         if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
             stats = new ConsumerStatsRecorderImpl(client, conf, this);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index ed374f663..881a69c26 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -32,6 +32,7 @@
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderListener;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -82,7 +83,7 @@ public void reachedEndOfTopic(Consumer<T> consumer) {
         }
 
         consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor,
-                -1, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema);
+                -1, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, SubscriptionInitialPosition.Latest);
     }
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 77f2e9736..3f2eb2b02 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -35,6 +35,7 @@
 import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 
 @Data
 public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
@@ -73,6 +74,8 @@
 
     private boolean readCompacted = false;
 
+    private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;
+
     private int patternAutoDiscoveryPeriod = 1;
 
     @JsonIgnore
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 4776ee1f8..23587b564 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -6741,6 +6741,10 @@ public Builder clearProtocolVersion() {
     // optional .pulsar.proto.Schema schema = 12;
     boolean hasSchema();
     org.apache.pulsar.common.api.proto.PulsarApi.Schema getSchema();
+    
+    // optional .pulsar.proto.CommandSubscribe.InitialPosition initialPosition = 13 [default = Latest];
+    boolean hasInitialPosition();
+    org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition getInitialPosition();
   }
   public static final class CommandSubscribe extends
       com.google.protobuf.GeneratedMessageLite
@@ -6822,6 +6826,47 @@ private SubType(int index, int value) {
       // @@protoc_insertion_point(enum_scope:pulsar.proto.CommandSubscribe.SubType)
     }
     
+    public enum InitialPosition
+        implements com.google.protobuf.Internal.EnumLite {
+      Latest(0, 0),
+      Earliest(1, 1),
+      ;
+      
+      public static final int Latest_VALUE = 0;
+      public static final int Earliest_VALUE = 1;
+      
+      
+      public final int getNumber() { return value; }
+      
+      public static InitialPosition valueOf(int value) {
+        switch (value) {
+          case 0: return Latest;
+          case 1: return Earliest;
+          default: return null;
+        }
+      }
+      
+      public static com.google.protobuf.Internal.EnumLiteMap<InitialPosition>
+          internalGetValueMap() {
+        return internalValueMap;
+      }
+      private static com.google.protobuf.Internal.EnumLiteMap<InitialPosition>
+          internalValueMap =
+            new com.google.protobuf.Internal.EnumLiteMap<InitialPosition>() {
+              public InitialPosition findValueByNumber(int number) {
+                return InitialPosition.valueOf(number);
+              }
+            };
+      
+      private final int value;
+      
+      private InitialPosition(int index, int value) {
+        this.value = value;
+      }
+      
+      // @@protoc_insertion_point(enum_scope:pulsar.proto.CommandSubscribe.InitialPosition)
+    }
+    
     private int bitField0_;
     // required string topic = 1;
     public static final int TOPIC_FIELD_NUMBER = 1;
@@ -7020,6 +7065,16 @@ public boolean hasSchema() {
       return schema_;
     }
     
+    // optional .pulsar.proto.CommandSubscribe.InitialPosition initialPosition = 13 [default = Latest];
+    public static final int INITIALPOSITION_FIELD_NUMBER = 13;
+    private org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition initialPosition_;
+    public boolean hasInitialPosition() {
+      return ((bitField0_ & 0x00000800) == 0x00000800);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition getInitialPosition() {
+      return initialPosition_;
+    }
+    
     private void initFields() {
       topic_ = "";
       subscription_ = "";
@@ -7033,6 +7088,7 @@ private void initFields() {
       metadata_ = java.util.Collections.emptyList();
       readCompacted_ = false;
       schema_ = org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
+      initialPosition_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition.Latest;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -7125,6 +7181,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr
       if (((bitField0_ & 0x00000400) == 0x00000400)) {
         output.writeMessage(12, schema_);
       }
+      if (((bitField0_ & 0x00000800) == 0x00000800)) {
+        output.writeEnum(13, initialPosition_.getNumber());
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -7181,6 +7240,10 @@ public int getSerializedSize() {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(12, schema_);
       }
+      if (((bitField0_ & 0x00000800) == 0x00000800)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeEnumSize(13, initialPosition_.getNumber());
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -7318,6 +7381,8 @@ public Builder clear() {
         bitField0_ = (bitField0_ & ~0x00000400);
         schema_ = org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
         bitField0_ = (bitField0_ & ~0x00000800);
+        initialPosition_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition.Latest;
+        bitField0_ = (bitField0_ & ~0x00001000);
         return this;
       }
       
@@ -7400,6 +7465,10 @@ public Builder clone() {
           to_bitField0_ |= 0x00000400;
         }
         result.schema_ = schema_;
+        if (((from_bitField0_ & 0x00001000) == 0x00001000)) {
+          to_bitField0_ |= 0x00000800;
+        }
+        result.initialPosition_ = initialPosition_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -7449,6 +7518,9 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandSub
         if (other.hasSchema()) {
           mergeSchema(other.getSchema());
         }
+        if (other.hasInitialPosition()) {
+          setInitialPosition(other.getInitialPosition());
+        }
         return this;
       }
       
@@ -7591,6 +7663,15 @@ public Builder mergeFrom(
               subBuilder.recycle();
               break;
             }
+            case 104: {
+              int rawValue = input.readEnum();
+              org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition value = org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition.valueOf(rawValue);
+              if (value != null) {
+                bitField0_ |= 0x00001000;
+                initialPosition_ = value;
+              }
+              break;
+            }
           }
         }
       }
@@ -8009,6 +8090,30 @@ public Builder clearSchema() {
         return this;
       }
       
+      // optional .pulsar.proto.CommandSubscribe.InitialPosition initialPosition = 13 [default = Latest];
+      private org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition initialPosition_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition.Latest;
+      public boolean hasInitialPosition() {
+        return ((bitField0_ & 0x00001000) == 0x00001000);
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition getInitialPosition() {
+        return initialPosition_;
+      }
+      public Builder setInitialPosition(org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        bitField0_ |= 0x00001000;
+        initialPosition_ = value;
+        
+        return this;
+      }
+      public Builder clearInitialPosition() {
+        bitField0_ = (bitField0_ & ~0x00001000);
+        initialPosition_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition.Latest;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandSubscribe)
     }
     
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 0c0a50135..505d7eda5 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -216,6 +216,13 @@ message CommandSubscribe {
     optional bool read_compacted = 11;
 
 	optional Schema schema = 12;
+	enum InitialPosition {
+		Latest   = 0;
+		Earliest = 1;
+	}
+	// Signal wthether the subscription will initialize on latest
+	// or not -- earliest
+	optional InitialPosition initialPosition = 13 [default = Latest];
 }
 
 message CommandPartitionedTopicMetadata {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services