You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/02/24 17:14:11 UTC

[GitHub] merlimat closed pull request #1276: Refactored ClientConfuguration to use ClientConfigurationData shared with ClientBuilderImpl

merlimat closed pull request #1276: Refactored ClientConfuguration to use ClientConfigurationData shared with ClientBuilderImpl
URL: https://github.com/apache/incubator-pulsar/pull/1276
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index eb97e0bae..ef35caf4c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -149,7 +149,7 @@ flexible messaging model and an intuitive client API.</description>
           </exclusion>
         </exclusions>
       </dependency>
-      
+
       <dependency>
         <groupId>org.testng</groupId>
         <artifactId>testng</artifactId>
@@ -374,7 +374,7 @@ flexible messaging model and an intuitive client API.</description>
         <artifactId>jersey-container-servlet</artifactId>
         <version>2.23.2</version>
       </dependency>
-      
+
       <dependency>
         <groupId>javax.ws.rs</groupId>
         <artifactId>javax.ws.rs-api</artifactId>
@@ -598,6 +598,13 @@ flexible messaging model and an intuitive client API.</description>
       <artifactId>powermock-module-testng</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.projectlombok</groupId>
+      <artifactId>lombok</artifactId>
+      <version>1.16.20</version>
+      <scope>provided</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java
index d36520cfb..52632f4cf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java
@@ -22,7 +22,6 @@
 
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.RawMessage;
-import org.apache.pulsar.common.api.ByteBufPair;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index f66685f6a..04aa61039 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -22,49 +22,39 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
-import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.codec.digest.DigestUtils;
+
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
-import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
-import io.netty.buffer.ByteBuf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.buffer.ByteBuf;
+
 public class RawReaderImpl implements RawReader {
 
     final static int DEFAULT_RECEIVER_QUEUE_SIZE = 1000;
-    private final PulsarClientImpl client;
-    private final String topic;
-    private final String subscription;
-    private final ConsumerConfiguration consumerConfiguration;
+    private final ConsumerConfigurationData consumerConfiguration;
     private RawConsumerImpl consumer;
 
     public RawReaderImpl(PulsarClientImpl client, String topic, String subscription,
                          CompletableFuture<Consumer> consumerFuture) {
-        this.client = client;
-        this.subscription = subscription;
-        this.topic = topic;
-
-        consumerConfiguration = new ConsumerConfiguration();
+        consumerConfiguration = new ConsumerConfigurationData();
+        consumerConfiguration.getTopicNames().add(topic);
+        consumerConfiguration.setSubscriptionName(subscription);
         consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
         consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE);
 
-        consumer = new RawConsumerImpl(client, topic, subscription, consumerConfiguration,
+        consumer = new RawConsumerImpl(client, consumerConfiguration,
                                        consumerFuture);
     }
 
@@ -92,11 +82,10 @@ public RawReaderImpl(PulsarClientImpl client, String topic, String subscription,
         final BlockingQueue<RawMessageAndCnx> incomingRawMessages;
         final Queue<CompletableFuture<RawMessage>> pendingRawReceives;
 
-        RawConsumerImpl(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
-                        CompletableFuture<Consumer> consumerFuture) {
-            super(client, topic, subscription, conf,
-                  client.externalExecutorProvider().getExecutor(), -1, consumerFuture,
-                  SubscriptionMode.Durable, MessageId.earliest);
+        RawConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf,
+                CompletableFuture<Consumer> consumerFuture) {
+            super(client, conf.getSingleTopic(), conf, client.externalExecutorProvider().getExecutor(), -1,
+                    consumerFuture, SubscriptionMode.Durable, MessageId.earliest);
             incomingRawMessages = new GrowableArrayBlockingQueue<>();
             pendingRawReceives = new ConcurrentLinkedQueue<>();
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 5a95800c1..b08858756 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -88,6 +88,7 @@
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
@@ -1151,17 +1152,18 @@ public void testClosingReplicationProducerTwice() throws Exception {
         brokerService.getReplicationClients().put(remoteCluster, client);
         PersistentReplicator replicator = new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService);
 
-        doReturn(new CompletableFuture<Producer>()).when(clientImpl).createProducerAsync(matches(globalTopicName), any());
+        doReturn(new CompletableFuture<Producer>()).when(clientImpl)
+                .createProducerAsync(any(ProducerConfigurationData.class));
 
         replicator.startProducer();
-        verify(clientImpl).createProducerAsync(matches(globalTopicName), any());
+        verify(clientImpl).createProducerAsync(any(ProducerConfigurationData.class));
 
         replicator.disconnect(false);
         replicator.disconnect(false);
 
         replicator.startProducer();
 
-        verify(clientImpl, Mockito.times(2)).createProducerAsync(matches(globalTopicName), any());
+        verify(clientImpl, Mockito.times(2)).createProducerAsync(any(ProducerConfigurationData.class));
     }
 
     @Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index cbef1d054..94e638a1f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -60,6 +60,7 @@
 import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -226,7 +227,7 @@ public void testConcurrentReplicator() throws Exception {
         }
         Thread.sleep(3000);
 
-        Mockito.verify(pulsarClient, Mockito.times(1)).createProducerAsync(Mockito.anyString(), Mockito.anyObject());
+        Mockito.verify(pulsarClient, Mockito.times(1)).createProducerAsync(Mockito.any(ProducerConfigurationData.class));
 
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
index 0447b780e..d8d6f83c2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
@@ -23,7 +23,7 @@
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.mockito.Mockito;
 import org.testng.annotations.AfterClass;
@@ -53,10 +53,11 @@ protected void cleanup() throws Exception {
 
     @Test
     public void testSingleIpAddress() throws Exception {
-        ClientConfiguration conf = new ClientConfiguration();
+        ClientConfigurationData conf = new ClientConfigurationData();
         EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("test"));
         ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
-        PulsarClientImpl client = new PulsarClientImpl(serviceUrl, conf, eventLoop, pool);
+        conf.setServiceUrl(serviceUrl);
+        PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool);
 
         List<InetAddress> result = Lists.newArrayList();
         result.add(InetAddress.getByName("127.0.0.1"));
@@ -71,10 +72,11 @@ public void testSingleIpAddress() throws Exception {
     public void testDoubleIpAddress() throws Exception {
         String serviceUrl = "pulsar://non-existing-dns-name:" + BROKER_PORT;
 
-        ClientConfiguration conf = new ClientConfiguration();
+        ClientConfigurationData conf = new ClientConfigurationData();
         EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("test"));
         ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
-        PulsarClientImpl client = new PulsarClientImpl(serviceUrl, conf, eventLoop, pool);
+        conf.setServiceUrl(serviceUrl);
+        PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool);
 
         List<InetAddress> result = Lists.newArrayList();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
index 900fadef6..30e54a51b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
@@ -51,9 +51,10 @@
 import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.ByteBufPair;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.Commands.ChecksumType;
-import org.apache.pulsar.common.api.ByteBufPair;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata.Builder;
 import org.slf4j.Logger;
@@ -295,7 +296,7 @@ public void testChecksumVersionComptability() throws Exception {
         ((PulsarClientImpl) pulsarClient).timer().stop();
 
         ClientCnx mockClientCnx = spy(
-                new ClientCnx(new ClientConfiguration(), ((PulsarClientImpl) pulsarClient).eventLoopGroup()));
+                new ClientCnx(new ClientConfigurationData(), ((PulsarClientImpl) pulsarClient).eventLoopGroup()));
         doReturn(producer.brokerChecksumSupportedVersion() - 1).when(mockClientCnx).getRemoteEndpointProtocolVersion();
         prod.setClientCnx(mockClientCnx);
 
@@ -360,7 +361,7 @@ public void testChecksumReconnection() throws Exception {
 
         // set clientCnx mock to get non-checksum supported version
         ClientCnx mockClientCnx = spy(
-                new ClientCnx(new ClientConfiguration(), ((PulsarClientImpl) pulsarClient).eventLoopGroup()));
+                new ClientCnx(new ClientConfigurationData(), ((PulsarClientImpl) pulsarClient).eventLoopGroup()));
         doReturn(producer.brokerChecksumSupportedVersion() - 1).when(mockClientCnx).getRemoteEndpointProtocolVersion();
         prod.setClientCnx(mockClientCnx);
 
@@ -489,7 +490,7 @@ public void testCorruptMessageRemove() throws Exception {
         MessageImpl msg1 = (MessageImpl) MessageBuilder.create().setContent("message-1".getBytes()).build();
         future = producer.sendAsync(msg1);
         ClientCnx cnx = spy(
-                new ClientCnx(new ClientConfiguration(), ((PulsarClientImpl) pulsarClient).eventLoopGroup()));
+                new ClientCnx(new ClientConfigurationData(), ((PulsarClientImpl) pulsarClient).eventLoopGroup()));
         String exc = "broker is already stopped";
         // when client-try to recover checksum by resending to broker: throw exception as broker is stopped
         doThrow(new IllegalStateException(exc)).when(cnx).ctx();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
index 7862c14a3..5860cd35e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
@@ -20,13 +20,12 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.fasterxml.jackson.annotation.JsonIgnore;
 import java.io.Serializable;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 
 /**
  * Class used to specify client side configuration like authentication, etc..
@@ -38,29 +37,13 @@
 
     private static final long serialVersionUID = 1L;
 
-    @JsonIgnore
-    private Authentication authentication = new AuthenticationDisabled();
-    private long operationTimeoutMs = 30000;
-    private long statsIntervalSeconds = 60;
-
-    private int numIoThreads = 1;
-    private int numListenerThreads = 1;
-    private int connectionsPerBroker = 1;
-
-    private boolean useTcpNoDelay = true;
-
-    private boolean useTls = false;
-    private String tlsTrustCertsFilePath = "";
-    private boolean tlsAllowInsecureConnection = false;
-    private boolean tlsHostnameVerificationEnable = false;
-    private int concurrentLookupRequest = 50000;
-    private int maxNumberOfRejectedRequestPerConnection = 50;
+    private final ClientConfigurationData confData = new ClientConfigurationData();
 
     /**
      * @return the authentication provider to be used
      */
     public Authentication getAuthentication() {
-        return authentication;
+        return confData.getAuthentication();
     }
 
     /**
@@ -71,12 +54,12 @@ public Authentication getAuthentication() {
      *
      * <pre>
      * <code>
-     * ClientConfiguration conf = new ClientConfiguration();
+     * ClientConfiguration confData = new ClientConfiguration();
      * String authPluginClassName = "org.apache.pulsar.client.impl.auth.MyAuthentication";
      * String authParamsString = "key1:val1,key2:val2";
      * Authentication auth = AuthenticationFactory.create(authPluginClassName, authParamsString);
-     * conf.setAuthentication(auth);
-     * PulsarClient client = PulsarClient.create(serviceUrl, conf);
+     * confData.setAuthentication(auth);
+     * PulsarClient client = PulsarClient.create(serviceUrl, confData);
      * ....
      * </code>
      * </pre>
@@ -84,7 +67,7 @@ public Authentication getAuthentication() {
      * @param authentication
      */
     public void setAuthentication(Authentication authentication) {
-        this.authentication = authentication;
+        confData.setAuthentication(authentication);
     }
 
     /**
@@ -95,11 +78,11 @@ public void setAuthentication(Authentication authentication) {
      *
      * <pre>
      * <code>
-     * ClientConfiguration conf = new ClientConfiguration();
+     * ClientConfiguration confData = new ClientConfiguration();
      * String authPluginClassName = "org.apache.pulsar.client.impl.auth.MyAuthentication";
      * String authParamsString = "key1:val1,key2:val2";
-     * conf.setAuthentication(authPluginClassName, authParamsString);
-     * PulsarClient client = PulsarClient.create(serviceUrl, conf);
+     * confData.setAuthentication(authPluginClassName, authParamsString);
+     * PulsarClient client = PulsarClient.create(serviceUrl, confData);
      * ....
      * </code>
      * </pre>
@@ -113,7 +96,7 @@ public void setAuthentication(Authentication authentication) {
      */
     public void setAuthentication(String authPluginClassName, String authParamsString)
             throws UnsupportedAuthenticationException {
-        this.authentication = AuthenticationFactory.create(authPluginClassName, authParamsString);
+        confData.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParamsString));
     }
 
     /**
@@ -124,12 +107,12 @@ public void setAuthentication(String authPluginClassName, String authParamsStrin
      *
      * <pre>
      * <code>
-     * ClientConfiguration conf = new ClientConfiguration();
+     * ClientConfiguration confData = new ClientConfiguration();
      * String authPluginClassName = "org.apache.pulsar.client.impl.auth.MyAuthentication";
      * Map<String, String> authParams = new HashMap<String, String>();
      * authParams.put("key1", "val1");
-     * conf.setAuthentication(authPluginClassName, authParams);
-     * PulsarClient client = PulsarClient.create(serviceUrl, conf);
+     * confData.setAuthentication(authPluginClassName, authParams);
+     * PulsarClient client = PulsarClient.create(serviceUrl, confData);
      * ....
      * </code>
      * </pre>
@@ -143,15 +126,14 @@ public void setAuthentication(String authPluginClassName, String authParamsStrin
      */
     public void setAuthentication(String authPluginClassName, Map<String, String> authParams)
             throws UnsupportedAuthenticationException {
-        this.authentication = AuthenticationFactory.create(authPluginClassName, authParams);
+        confData.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParams));
     }
 
-    
     /**
      * @return the operation timeout in ms
      */
     public long getOperationTimeoutMs() {
-        return operationTimeoutMs;
+        return confData.getOperationTimeoutMs();
     }
 
     /**
@@ -167,14 +149,14 @@ public long getOperationTimeoutMs() {
      */
     public void setOperationTimeout(int operationTimeout, TimeUnit unit) {
         checkArgument(operationTimeout >= 0);
-        this.operationTimeoutMs = unit.toMillis(operationTimeout);
+        confData.setOperationTimeoutMs(unit.toMillis(operationTimeout));
     }
 
     /**
      * @return the number of threads to use for handling connections
      */
     public int getIoThreads() {
-        return numIoThreads;
+        return confData.getNumIoThreads();
     }
 
     /**
@@ -184,14 +166,14 @@ public int getIoThreads() {
      */
     public void setIoThreads(int numIoThreads) {
         checkArgument(numIoThreads > 0);
-        this.numIoThreads = numIoThreads;
+        confData.setNumIoThreads(numIoThreads);
     }
 
     /**
      * @return the number of threads to use for message listeners
      */
     public int getListenerThreads() {
-        return numListenerThreads;
+        return confData.getNumListenerThreads();
     }
 
     /**
@@ -201,14 +183,14 @@ public int getListenerThreads() {
      */
     public void setListenerThreads(int numListenerThreads) {
         checkArgument(numListenerThreads > 0);
-        this.numListenerThreads = numListenerThreads;
+        confData.setNumListenerThreads(numListenerThreads);
     }
 
     /**
      * @return the max number of connections per single broker
      */
     public int getConnectionsPerBroker() {
-        return connectionsPerBroker;
+        return confData.getConnectionsPerBroker();
     }
 
     /**
@@ -223,14 +205,14 @@ public int getConnectionsPerBroker() {
      */
     public void setConnectionsPerBroker(int connectionsPerBroker) {
         checkArgument(connectionsPerBroker > 0, "Connections per broker need to be greater than 0");
-        this.connectionsPerBroker = connectionsPerBroker;
+        confData.setConnectionsPerBroker(connectionsPerBroker);
     }
 
     /**
      * @return whether TCP no-delay should be set on the connections
      */
     public boolean isUseTcpNoDelay() {
-        return useTcpNoDelay;
+        return confData.isUseTcpNoDelay();
     }
 
     /**
@@ -245,14 +227,14 @@ public boolean isUseTcpNoDelay() {
      * @param useTcpNoDelay
      */
     public void setUseTcpNoDelay(boolean useTcpNoDelay) {
-        this.useTcpNoDelay = useTcpNoDelay;
+        confData.setUseTcpNoDelay(useTcpNoDelay);
     }
 
     /**
      * @return whether TLS encryption is used on the connection
      */
     public boolean isUseTls() {
-        return useTls;
+        return confData.isUseTls();
     }
 
     /**
@@ -261,14 +243,14 @@ public boolean isUseTls() {
      * @param useTls
      */
     public void setUseTls(boolean useTls) {
-        this.useTls = useTls;
+        confData.setUseTls(useTls);
     }
 
     /**
      * @return path to the trusted TLS certificate file
      */
     public String getTlsTrustCertsFilePath() {
-        return tlsTrustCertsFilePath;
+        return confData.getTlsTrustCertsFilePath();
     }
 
     /**
@@ -277,14 +259,14 @@ public String getTlsTrustCertsFilePath() {
      * @param tlsTrustCertsFilePath
      */
     public void setTlsTrustCertsFilePath(String tlsTrustCertsFilePath) {
-        this.tlsTrustCertsFilePath = tlsTrustCertsFilePath;
+        confData.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
     }
 
     /**
      * @return whether the Pulsar client accept untrusted TLS certificate from broker
      */
     public boolean isTlsAllowInsecureConnection() {
-        return tlsAllowInsecureConnection;
+        return confData.isTlsAllowInsecureConnection();
     }
 
     /**
@@ -293,7 +275,7 @@ public boolean isTlsAllowInsecureConnection() {
      * @param tlsAllowInsecureConnection
      */
     public void setTlsAllowInsecureConnection(boolean tlsAllowInsecureConnection) {
-        this.tlsAllowInsecureConnection = tlsAllowInsecureConnection;
+        confData.setTlsAllowInsecureConnection(tlsAllowInsecureConnection);
     }
 
     /**
@@ -302,7 +284,7 @@ public void setTlsAllowInsecureConnection(boolean tlsAllowInsecureConnection) {
      * @return the interval between each stat info <i>(default: 60 seconds)</i>
      */
     public long getStatsIntervalSeconds() {
-        return statsIntervalSeconds;
+        return confData.getStatsIntervalSeconds();
     }
 
     /**
@@ -315,7 +297,7 @@ public long getStatsIntervalSeconds() {
      *            time unit for {@code statsInterval}
      */
     public void setStatsInterval(long statsInterval, TimeUnit unit) {
-        this.statsIntervalSeconds = unit.toSeconds(statsInterval);
+        confData.setStatsIntervalSeconds(unit.toSeconds(statsInterval));
     }
 
     /**
@@ -324,18 +306,18 @@ public void setStatsInterval(long statsInterval, TimeUnit unit) {
      * @return
      */
     public int getConcurrentLookupRequest() {
-        return concurrentLookupRequest;
+        return confData.getConcurrentLookupRequest();
     }
 
     /**
      * Number of concurrent lookup-requests allowed on each broker-connection to prevent overload on broker.
-     * <i>(default: 50000)</i> It should be configured with higher value only in case of it requires to produce/subscribe on
-     * thousands of topic using created {@link PulsarClient}
+     * <i>(default: 50000)</i> It should be configured with higher value only in case of it requires to
+     * produce/subscribe on thousands of topic using created {@link PulsarClient}
      *
      * @param concurrentLookupRequest
      */
     public void setConcurrentLookupRequest(int concurrentLookupRequest) {
-        this.concurrentLookupRequest = concurrentLookupRequest;
+        confData.setConcurrentLookupRequest(concurrentLookupRequest);
     }
 
     /**
@@ -344,7 +326,7 @@ public void setConcurrentLookupRequest(int concurrentLookupRequest) {
      * @return
      */
     public int getMaxNumberOfRejectedRequestPerConnection() {
-        return maxNumberOfRejectedRequestPerConnection;
+        return confData.getMaxNumberOfRejectedRequestPerConnection();
     }
 
     /**
@@ -355,24 +337,33 @@ public int getMaxNumberOfRejectedRequestPerConnection() {
      * @param maxNumberOfRejectedRequestPerConnection
      */
     public void setMaxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRequestPerConnection) {
-        this.maxNumberOfRejectedRequestPerConnection = maxNumberOfRejectedRequestPerConnection;
+        confData.setMaxNumberOfRejectedRequestPerConnection(maxNumberOfRejectedRequestPerConnection);
     }
 
     public boolean isTlsHostnameVerificationEnable() {
-        return tlsHostnameVerificationEnable;
+        return confData.isTlsHostnameVerificationEnable();
     }
 
     /**
      * It allows to validate hostname verification when client connects to broker over tls. It validates incoming x509
-     * certificate and matches provided hostname(CN/SAN) with expected broker's host name. It follows RFC 2818, 3.1. Server
-     * Identity hostname verification.
-     * 
+     * certificate and matches provided hostname(CN/SAN) with expected broker's host name. It follows RFC 2818, 3.1.
+     * Server Identity hostname verification.
+     *
      * @see <a href="https://tools.ietf.org/html/rfc2818">rfc2818</a>
-     * 
+     *
      * @param tlsHostnameVerificationEnable
      */
     public void setTlsHostnameVerificationEnable(boolean tlsHostnameVerificationEnable) {
-        this.tlsHostnameVerificationEnable = tlsHostnameVerificationEnable;
+        confData.setTlsHostnameVerificationEnable(tlsHostnameVerificationEnable);
     }
-    
+
+    public ClientConfiguration setServiceUrl(String serviceUrl) {
+        confData.setServiceUrl(serviceUrl);
+        return this;
+    }
+
+    public ClientConfigurationData getConfigurationData() {
+        return confData;
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
index bd6236a15..1c68c326d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
@@ -21,12 +21,12 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.fasterxml.jackson.annotation.JsonIgnore;
 import java.io.Serializable;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+
 /**
  * Class specifying the configuration of a consumer. In Exclusive subscription, only a single consumer is allowed to
  * attach to the subscription. Other consumers will get an error message. In Shared subscription, multiple consumers
@@ -44,37 +44,13 @@
 
     private static final long serialVersionUID = 1L;
 
-    private SubscriptionType subscriptionType = SubscriptionType.Exclusive;
-
-    @JsonIgnore
-    private MessageListener messageListener;
-
-    @JsonIgnore
-    private ConsumerEventListener consumerEventListener;
-
-    private int receiverQueueSize = 1000;
-
-    private int maxTotalReceiverQueueSizeAcrossPartitions = 50000;
-
-    private String consumerName = null;
-
-    private long ackTimeoutMillis = 0;
-
-    private int priorityLevel = 0;
-
-    @JsonIgnore
-    private CryptoKeyReader cryptoKeyReader = null;
-    private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;
-
-    private final Map<String, String> properties = new HashMap<>();
-
-    private boolean readCompacted = false;
+    private final ConsumerConfigurationData conf = new ConsumerConfigurationData();
 
     /**
      * @return the configured timeout in milliseconds for unacked messages.
      */
     public long getAckTimeoutMillis() {
-        return ackTimeoutMillis;
+        return conf.getAckTimeoutMillis();
     }
 
     /**
@@ -91,7 +67,7 @@ public ConsumerConfiguration setAckTimeout(long ackTimeout, TimeUnit timeUnit) {
         long ackTimeoutMillis = timeUnit.toMillis(ackTimeout);
         checkArgument(ackTimeoutMillis >= minAckTimeoutMillis,
                 "Ack timeout should be should be greater than " + minAckTimeoutMillis + " ms");
-        this.ackTimeoutMillis = ackTimeoutMillis;
+        conf.setAckTimeoutMillis(timeUnit.toMillis(ackTimeout));
         return this;
     }
 
@@ -99,7 +75,7 @@ public ConsumerConfiguration setAckTimeout(long ackTimeout, TimeUnit timeUnit) {
      * @return the configured subscription type
      */
     public SubscriptionType getSubscriptionType() {
-        return this.subscriptionType;
+        return conf.getSubscriptionType();
     }
 
     /**
@@ -112,7 +88,7 @@ public SubscriptionType getSubscriptionType() {
      */
     public ConsumerConfiguration setSubscriptionType(SubscriptionType subscriptionType) {
         checkNotNull(subscriptionType);
-        this.subscriptionType = subscriptionType;
+        conf.setSubscriptionType(subscriptionType);
         return this;
     }
 
@@ -120,7 +96,7 @@ public ConsumerConfiguration setSubscriptionType(SubscriptionType subscriptionTy
      * @return the configured {@link MessageListener} for the consumer
      */
     public MessageListener getMessageListener() {
-        return this.messageListener;
+        return conf.getMessageListener();
     }
 
     /**
@@ -134,7 +110,7 @@ public MessageListener getMessageListener() {
      */
     public ConsumerConfiguration setMessageListener(MessageListener messageListener) {
         checkNotNull(messageListener);
-        this.messageListener = messageListener;
+        conf.setMessageListener(messageListener);
         return this;
     }
 
@@ -144,24 +120,27 @@ public ConsumerConfiguration setMessageListener(MessageListener messageListener)
      * @since 2.0
      */
     public ConsumerEventListener getConsumerEventListener() {
-        return this.consumerEventListener;
+        return conf.getConsumerEventListener();
     }
 
     /**
      * Sets a {@link ConsumerEventListener} for the consumer.
      *
-     * <p>The consumer group listener is used for receiving consumer state change in a consumer group for failover
+     * <p>
+     * The consumer group listener is used for receiving consumer state change in a consumer group for failover
      * subscription. Application can then react to the consumer state changes.
      *
-     * <p>This change is experimental. It is subject to changes coming in release 2.0.
+     * <p>
+     * This change is experimental. It is subject to changes coming in release 2.0.
      *
-     * @param listener the consumer group listener object
+     * @param listener
+     *            the consumer group listener object
      * @return consumer configuration
      * @since 2.0
      */
     public ConsumerConfiguration setConsumerEventListener(ConsumerEventListener listener) {
         checkNotNull(listener);
-        this.consumerEventListener = listener;
+        conf.setConsumerEventListener(listener);
         return this;
     }
 
@@ -169,15 +148,14 @@ public ConsumerConfiguration setConsumerEventListener(ConsumerEventListener list
      * @return the configure receiver queue size value
      */
     public int getReceiverQueueSize() {
-        return this.receiverQueueSize;
+        return conf.getReceiverQueueSize();
     }
 
-
     /**
      * @return the configured max total receiver queue size across partitions
      */
     public int getMaxTotalReceiverQueueSizeAcrossPartitions() {
-        return maxTotalReceiverQueueSizeAcrossPartitions;
+        return conf.getMaxTotalReceiverQueueSizeAcrossPartitions();
     }
 
     /**
@@ -189,15 +167,15 @@ public int getMaxTotalReceiverQueueSizeAcrossPartitions() {
      * @param maxTotalReceiverQueueSizeAcrossPartitions
      */
     public void setMaxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions) {
-        checkArgument(maxTotalReceiverQueueSizeAcrossPartitions >= receiverQueueSize);
-        this.maxTotalReceiverQueueSizeAcrossPartitions = maxTotalReceiverQueueSizeAcrossPartitions;
+        checkArgument(maxTotalReceiverQueueSizeAcrossPartitions >= conf.getReceiverQueueSize());
+        conf.setMaxTotalReceiverQueueSizeAcrossPartitions(maxTotalReceiverQueueSizeAcrossPartitions);
     }
 
     /**
      * @return the CryptoKeyReader
      */
     public CryptoKeyReader getCryptoKeyReader() {
-        return this.cryptoKeyReader;
+        return conf.getCryptoKeyReader();
     }
 
     /**
@@ -208,24 +186,25 @@ public CryptoKeyReader getCryptoKeyReader() {
      */
     public ConsumerConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
         checkNotNull(cryptoKeyReader);
-        this.cryptoKeyReader = cryptoKeyReader;
+        conf.setCryptoKeyReader(cryptoKeyReader);
         return this;
     }
 
     /**
      * Sets the ConsumerCryptoFailureAction to the value specified
      *
-     * @param The consumer action
+     * @param The
+     *            consumer action
      */
     public void setCryptoFailureAction(ConsumerCryptoFailureAction action) {
-        cryptoFailureAction = action;
+        conf.setCryptoFailureAction(action);
     }
 
     /**
      * @return The ConsumerCryptoFailureAction
      */
     public ConsumerCryptoFailureAction getCryptoFailureAction() {
-        return this.cryptoFailureAction;
+        return conf.getCryptoFailureAction();
     }
 
     /**
@@ -256,7 +235,7 @@ public ConsumerCryptoFailureAction getCryptoFailureAction() {
      */
     public ConsumerConfiguration setReceiverQueueSize(int receiverQueueSize) {
         checkArgument(receiverQueueSize >= 0, "Receiver queue size cannot be negative");
-        this.receiverQueueSize = receiverQueueSize;
+        conf.setReceiverQueueSize(receiverQueueSize);
         return this;
     }
 
@@ -264,7 +243,7 @@ public ConsumerConfiguration setReceiverQueueSize(int receiverQueueSize) {
      * @return the consumer name
      */
     public String getConsumerName() {
-        return consumerName;
+        return conf.getConsumerName();
     }
 
     /**
@@ -274,12 +253,12 @@ public String getConsumerName() {
      */
     public ConsumerConfiguration setConsumerName(String consumerName) {
         checkArgument(consumerName != null && !consumerName.equals(""));
-        this.consumerName = consumerName;
+        conf.setConsumerName(consumerName);
         return this;
     }
 
     public int getPriorityLevel() {
-        return priorityLevel;
+        return conf.getPriorityLevel();
     }
 
     /**
@@ -303,32 +282,34 @@ public int getPriorityLevel() {
      * @param priorityLevel
      */
     public void setPriorityLevel(int priorityLevel) {
-        this.priorityLevel = priorityLevel;
+        conf.setPriorityLevel(priorityLevel);
     }
 
     public boolean getReadCompacted() {
-        return readCompacted;
+        return conf.isReadCompacted();
     }
 
     /**
-     * If enabled, the consumer will read messages from the compacted topic rather than reading the full message
-     * backlog of the topic. This means that, if the topic has been compacted, the consumer will only see the latest
-     * value for each key in the topic, up until the point in the topic message backlog that has been compacted.
-     * Beyond that point, the messages will be sent as normal.
+     * If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog
+     * of the topic. This means that, if the topic has been compacted, the consumer will only see the latest value for
+     * each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
+     * point, the messages will be sent as normal.
      *
-     * readCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer
-     * (i.e. failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent
-     * topics or on a shared subscription, will lead to the subscription call throwing a PulsarClientException.
+     * readCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e.
+     * failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a
+     * shared subscription, will lead to the subscription call throwing a PulsarClientException.
      *
-     * @param readCompacted whether to read from the compacted topic
+     * @param readCompacted
+     *            whether to read from the compacted topic
      */
     public ConsumerConfiguration setReadCompacted(boolean readCompacted) {
-        this.readCompacted = readCompacted;
+        conf.setReadCompacted(readCompacted);
         return this;
     }
 
     /**
      * Set a name/value property with this consumer.
+     *
      * @param key
      * @param value
      * @return
@@ -336,23 +317,26 @@ public ConsumerConfiguration setReadCompacted(boolean readCompacted) {
     public ConsumerConfiguration setProperty(String key, String value) {
         checkArgument(key != null);
         checkArgument(value != null);
-        properties.put(key, value);
+        conf.getProperties().put(key, value);
         return this;
     }
 
     /**
      * Add all the properties in the provided map
+     *
      * @param properties
      * @return
      */
     public ConsumerConfiguration setProperties(Map<String, String> properties) {
-        if (properties != null) {
-            this.properties.putAll(properties);
-        }
+        conf.getProperties().putAll(properties);
         return this;
     }
 
     public Map<String, String> getProperties() {
-        return properties;
+        return conf.getProperties();
+    }
+
+    public ConsumerConfigurationData getConfigurationData() {
+        return conf;
     }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/HashingScheme.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/HashingScheme.java
new file mode 100644
index 000000000..a451c7e93
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/HashingScheme.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * Standard hashing functions available when choosing the partition to use for a particular message.
+ */
+public enum HashingScheme {
+
+    /**
+     * Use regural <code>String.hashCode()</code>
+     */
+    JavaStringHash,
+
+    /**
+     * Use Murmur3 hashing function.
+     * <a href="https://en.wikipedia.org/wiki/MurmurHash">https://en.wikipedia.org/wiki/MurmurHash</a>
+     */
+    Murmur3_32Hash
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRoutingMode.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRoutingMode.java
index 1d45489b9..97231296e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRoutingMode.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRoutingMode.java
@@ -18,6 +18,25 @@
  */
 package org.apache.pulsar.client.api;
 
+/**
+ * Default routing mode for messages to partition.
+ *
+ * This logic is applied when the application is not setting a key {@link MessageBuilder#setKey(String)} on a particular
+ * message.
+ */
 public enum MessageRoutingMode {
-    SinglePartition, RoundRobinPartition, CustomPartition
+    /**
+     * The producer will chose one single partition and publish all the messages into that partition.
+     */
+    SinglePartition,
+
+    /**
+     * Publish messages across all partitions in round-robin.
+     */
+    RoundRobinPartition,
+
+    /**
+     * Use custom message router implemenation that will be called to determine the partition for a particular message.
+     */
+    CustomPartition
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index 5ab1215dc..3961f8c64 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -144,12 +144,34 @@
     ProducerBuilder blockIfQueueFull(boolean blockIfQueueFull);
 
     /**
-     * Set the message routing mode for the partitioned producer
+     * Set the message routing mode for the partitioned producer.
      *
-     * @param mode
-     * @return
+     * Default routing mode for messages to partition.
+     *
+     * This logic is applied when the application is not setting a key {@link MessageBuilder#setKey(String)} on a
+     * particular message.
+     *
+     * @param messageRoutingMode
+     *            the message routing mode
+     */
+    ProducerBuilder messageRoutingMode(MessageRoutingMode messageRoutingMode);
+
+    /**
+     * Change the {@link HashingScheme} used to chose the partition on where to publish a particular message.
+     *
+     * Standard hashing functions available are:
+     * <ul>
+     * <li><code>JavaStringHash</code>: Java <code>String.hashCode()</code>
+     * <li><code>Murmur3_32Hash</code>: Use Murmur3 hashing function.
+     * <a href="https://en.wikipedia.org/wiki/MurmurHash">https://en.wikipedia.org/wiki/MurmurHash</a>
+     * </ul>
+     *
+     * Default is <code>JavaStringHash</code>.
+     *
+     * @param hashingScheme
+     *            the chosen {@link HashingScheme}
      */
-    ProducerBuilder messageRoutingMode(MessageRoutingMode messageRouteMode);
+    ProducerBuilder hashingScheme(HashingScheme hashingScheme);
 
     /**
      * Set the compression type for the producer.
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
index fb098cf92..edd312139 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
@@ -21,17 +21,16 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.fasterxml.jackson.annotation.JsonIgnore;
 import java.io.Serializable;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 
-import com.google.common.base.Objects;
+import lombok.EqualsAndHashCode;
 
 /**
  * Producer's configuration
@@ -39,51 +38,27 @@
  * @deprecated use {@link PulsarClient#newProducer()} to construct and configure a {@link Producer} instance
  */
 @Deprecated
+@EqualsAndHashCode
 public class ProducerConfiguration implements Serializable {
 
     private static final long serialVersionUID = 1L;
-    private String producerName = null;
-    private long sendTimeoutMs = 30000;
-    private boolean blockIfQueueFull = false;
-    private int maxPendingMessages = 1000;
-    private int maxPendingMessagesAcrossPartitions = 50000;
-    private MessageRoutingMode messageRouteMode = MessageRoutingMode.SinglePartition;
-    private HashingScheme hashingScheme = HashingScheme.JavaStringHash;
-    @JsonIgnore
-    private MessageRouter customMessageRouter = null;
-    private long batchingMaxPublishDelayMs = 10;
-    private int batchingMaxMessages = 1000;
-    private boolean batchingEnabled = false; // disabled by default
-
-    @JsonIgnore
-    private CryptoKeyReader cryptoKeyReader;
-    @JsonIgnore
-    private ConcurrentOpenHashSet<String> encryptionKeys;
-
-    private CompressionType compressionType = CompressionType.NONE;
-
-    // Cannot use Optional<Long> since it's not serializable
-    private Long initialSequenceId = null;
-
-    private final Map<String, String> properties = new HashMap<>();
+
+    private final ProducerConfigurationData conf = new ProducerConfigurationData();
 
     public enum MessageRoutingMode {
         SinglePartition, RoundRobinPartition, CustomPartition
     }
 
     public enum HashingScheme {
-        JavaStringHash,
-        Murmur3_32Hash
+        JavaStringHash, Murmur3_32Hash
     }
 
-    private ProducerCryptoFailureAction cryptoFailureAction = ProducerCryptoFailureAction.FAIL;
-
     /**
      * @return the configured custom producer name or null if no custom name was specified
      * @since 1.20.0
      */
     public String getProducerName() {
-        return producerName;
+        return conf.getProducerName();
     }
 
     /**
@@ -103,14 +78,14 @@ public String getProducerName() {
      * @since 1.20.0
      */
     public void setProducerName(String producerName) {
-        this.producerName = producerName;
+        conf.setProducerName(producerName);
     }
 
     /**
      * @return the message send timeout in ms
      */
     public long getSendTimeoutMs() {
-        return sendTimeoutMs;
+        return conf.getSendTimeoutMs();
     }
 
     /**
@@ -125,7 +100,7 @@ public long getSendTimeoutMs() {
      */
     public ProducerConfiguration setSendTimeout(int sendTimeout, TimeUnit unit) {
         checkArgument(sendTimeout >= 0);
-        this.sendTimeoutMs = unit.toMillis(sendTimeout);
+        conf.setSendTimeoutMs(unit.toMillis(sendTimeout));
         return this;
     }
 
@@ -133,7 +108,7 @@ public ProducerConfiguration setSendTimeout(int sendTimeout, TimeUnit unit) {
      * @return the maximum number of messages allowed in the outstanding messages queue for the producer
      */
     public int getMaxPendingMessages() {
-        return maxPendingMessages;
+        return conf.getMaxPendingMessages();
     }
 
     /**
@@ -147,16 +122,16 @@ public int getMaxPendingMessages() {
      */
     public ProducerConfiguration setMaxPendingMessages(int maxPendingMessages) {
         checkArgument(maxPendingMessages > 0);
-        this.maxPendingMessages = maxPendingMessages;
+        conf.setMaxPendingMessages(maxPendingMessages);
         return this;
     }
 
     public HashingScheme getHashingScheme() {
-        return hashingScheme;
+        return HashingScheme.valueOf(conf.getHashingScheme().toString());
     }
 
     public ProducerConfiguration setHashingScheme(HashingScheme hashingScheme) {
-        this.hashingScheme = hashingScheme;
+        conf.setHashingScheme(org.apache.pulsar.client.api.HashingScheme.valueOf(hashingScheme.toString()));
         return this;
     }
 
@@ -165,7 +140,7 @@ public ProducerConfiguration setHashingScheme(HashingScheme hashingScheme) {
      * @return the maximum number of pending messages allowed across all the partitions
      */
     public int getMaxPendingMessagesAcrossPartitions() {
-        return maxPendingMessagesAcrossPartitions;
+        return conf.getMaxPendingMessagesAcrossPartitions();
     }
 
     /**
@@ -177,8 +152,8 @@ public int getMaxPendingMessagesAcrossPartitions() {
      * @param maxPendingMessagesAcrossPartitions
      */
     public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
-        checkArgument(maxPendingMessagesAcrossPartitions >= maxPendingMessages);
-        this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
+        checkArgument(maxPendingMessagesAcrossPartitions >= conf.getMaxPendingMessages());
+        conf.setMaxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions);
     }
 
     /**
@@ -187,7 +162,7 @@ public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPa
      *         pending queue is full
      */
     public boolean getBlockIfQueueFull() {
-        return blockIfQueueFull;
+        return conf.isBlockIfQueueFull();
     }
 
     /**
@@ -202,7 +177,7 @@ public boolean getBlockIfQueueFull() {
      * @return
      */
     public ProducerConfiguration setBlockIfQueueFull(boolean blockIfQueueFull) {
-        this.blockIfQueueFull = blockIfQueueFull;
+        conf.setBlockIfQueueFull(blockIfQueueFull);
         return this;
     }
 
@@ -214,7 +189,8 @@ public ProducerConfiguration setBlockIfQueueFull(boolean blockIfQueueFull) {
      */
     public ProducerConfiguration setMessageRoutingMode(MessageRoutingMode messageRouteMode) {
         checkNotNull(messageRouteMode);
-        this.messageRouteMode = messageRouteMode;
+        conf.setMessageRoutingMode(
+                org.apache.pulsar.client.api.MessageRoutingMode.valueOf(messageRouteMode.toString()));
         return this;
     }
 
@@ -224,7 +200,7 @@ public ProducerConfiguration setMessageRoutingMode(MessageRoutingMode messageRou
      * @return
      */
     public MessageRoutingMode getMessageRoutingMode() {
-        return messageRouteMode;
+        return MessageRoutingMode.valueOf(conf.getMessageRoutingMode().toString());
     }
 
     /**
@@ -244,7 +220,7 @@ public MessageRoutingMode getMessageRoutingMode() {
      *        compress messages.
      */
     public ProducerConfiguration setCompressionType(CompressionType compressionType) {
-        this.compressionType = compressionType;
+        conf.setCompressionType(compressionType);
         return this;
     }
 
@@ -252,7 +228,7 @@ public ProducerConfiguration setCompressionType(CompressionType compressionType)
      * @return the configured compression type for this producer
      */
     public CompressionType getCompressionType() {
-        return compressionType;
+        return conf.getCompressionType();
     }
 
     /**
@@ -264,7 +240,7 @@ public CompressionType getCompressionType() {
     public ProducerConfiguration setMessageRouter(MessageRouter messageRouter) {
         checkNotNull(messageRouter);
         setMessageRoutingMode(MessageRoutingMode.CustomPartition);
-        customMessageRouter = messageRouter;
+        conf.setCustomMessageRouter(messageRouter);
         return this;
     }
 
@@ -278,7 +254,7 @@ public ProducerConfiguration setMessageRouter(MessageRouter messageRouter) {
      */
     @Deprecated
     public MessageRouter getMessageRouter(int numPartitions) {
-        return customMessageRouter;
+        return conf.getCustomMessageRouter();
     }
 
     /**
@@ -287,7 +263,7 @@ public MessageRouter getMessageRouter(int numPartitions) {
      * @return message router set by {@link #setMessageRouter(MessageRouter)}.
      */
     public MessageRouter getMessageRouter() {
-        return customMessageRouter;
+        return conf.getCustomMessageRouter();
     }
 
     /**
@@ -295,7 +271,7 @@ public MessageRouter getMessageRouter() {
      */
 
     public boolean getBatchingEnabled() {
-        return batchingEnabled;
+        return conf.isBatchingEnabled();
     }
 
     /**
@@ -315,7 +291,7 @@ public boolean getBatchingEnabled() {
      */
 
     public ProducerConfiguration setBatchingEnabled(boolean batchMessagesEnabled) {
-        this.batchingEnabled = batchMessagesEnabled;
+        conf.setBatchingEnabled(batchMessagesEnabled);
         return this;
     }
 
@@ -323,7 +299,7 @@ public ProducerConfiguration setBatchingEnabled(boolean batchMessagesEnabled) {
      * @return the CryptoKeyReader
      */
     public CryptoKeyReader getCryptoKeyReader() {
-        return this.cryptoKeyReader;
+        return conf.getCryptoKeyReader();
     }
 
     /**
@@ -334,7 +310,7 @@ public CryptoKeyReader getCryptoKeyReader() {
      */
     public ProducerConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
         checkNotNull(cryptoKeyReader);
-        this.cryptoKeyReader = cryptoKeyReader;
+        conf.setCryptoKeyReader(cryptoKeyReader);
         return this;
     }
 
@@ -343,8 +319,8 @@ public ProducerConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader)
      * @return encryptionKeys
      *
      */
-    public ConcurrentOpenHashSet<String> getEncryptionKeys() {
-        return this.encryptionKeys;
+    public Set<String> getEncryptionKeys() {
+        return conf.getEncryptionKeys();
     }
 
     /**
@@ -353,7 +329,7 @@ public ProducerConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader)
      *
      */
     public boolean isEncryptionEnabled() {
-        return (this.encryptionKeys != null) && !this.encryptionKeys.isEmpty() && (this.cryptoKeyReader != null);
+        return conf.isEncryptionEnabled();
     }
 
     /**
@@ -366,16 +342,11 @@ public boolean isEncryptionEnabled() {
      *
      */
     public void addEncryptionKey(String key) {
-        if (this.encryptionKeys == null) {
-            this.encryptionKeys = new ConcurrentOpenHashSet<String>(16, 1);
-        }
-        this.encryptionKeys.add(key);
+        conf.getEncryptionKeys().add(key);
     }
 
     public void removeEncryptionKey(String key) {
-        if (this.encryptionKeys != null) {
-            this.encryptionKeys.remove(key);
-        }
+        conf.getEncryptionKeys().remove(key);
     }
 
     /**
@@ -385,14 +356,14 @@ public void removeEncryptionKey(String key) {
      *            The producer action
      */
     public void setCryptoFailureAction(ProducerCryptoFailureAction action) {
-        cryptoFailureAction = action;
+        conf.setCryptoFailureAction(action);
     }
 
     /**
      * @return The ProducerCryptoFailureAction
      */
     public ProducerCryptoFailureAction getCryptoFailureAction() {
-        return this.cryptoFailureAction;
+        return conf.getCryptoFailureAction();
     }
 
     /**
@@ -401,7 +372,7 @@ public ProducerCryptoFailureAction getCryptoFailureAction() {
      * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit)
      */
     public long getBatchingMaxPublishDelayMs() {
-        return batchingMaxPublishDelayMs;
+        return TimeUnit.MICROSECONDS.toMillis(conf.getBatchingMaxPublishDelayMicros());
     }
 
     /**
@@ -423,7 +394,7 @@ public long getBatchingMaxPublishDelayMs() {
     public ProducerConfiguration setBatchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit) {
         long delayInMs = timeUnit.toMillis(batchDelay);
         checkArgument(delayInMs >= 1, "configured value for batch delay must be at least 1ms");
-        this.batchingMaxPublishDelayMs = delayInMs;
+        conf.setBatchingMaxPublishDelayMicros(timeUnit.toMicros(batchDelay));
         return this;
     }
 
@@ -432,7 +403,7 @@ public ProducerConfiguration setBatchingMaxPublishDelay(long batchDelay, TimeUni
      * @return the maximum number of messages permitted in a batch.
      */
     public int getBatchingMaxMessages() {
-        return batchingMaxMessages;
+        return conf.getBatchingMaxMessages();
     }
 
     /**
@@ -448,12 +419,12 @@ public int getBatchingMaxMessages() {
      */
     public ProducerConfiguration setBatchingMaxMessages(int batchMessagesMaxMessagesPerBatch) {
         checkArgument(batchMessagesMaxMessagesPerBatch > 0);
-        this.batchingMaxMessages = batchMessagesMaxMessagesPerBatch;
+        conf.setBatchingMaxMessages(batchMessagesMaxMessagesPerBatch);
         return this;
     }
 
     public Optional<Long> getInitialSequenceId() {
-        return initialSequenceId != null ? Optional.of(initialSequenceId) : Optional.empty();
+        return Optional.ofNullable(conf.getInitialSequenceId());
     }
 
     /**
@@ -466,7 +437,7 @@ public ProducerConfiguration setBatchingMaxMessages(int batchMessagesMaxMessages
      * @return
      */
     public ProducerConfiguration setInitialSequenceId(long initialSequenceId) {
-        this.initialSequenceId = initialSequenceId;
+        conf.setInitialSequenceId(initialSequenceId);
         return this;
     }
 
@@ -480,7 +451,7 @@ public ProducerConfiguration setInitialSequenceId(long initialSequenceId) {
     public ProducerConfiguration setProperty(String key, String value) {
         checkArgument(key != null);
         checkArgument(value != null);
-        properties.put(key, value);
+        conf.getProperties().put(key, value);
         return this;
     }
 
@@ -491,26 +462,15 @@ public ProducerConfiguration setProperty(String key, String value) {
      * @return
      */
     public ProducerConfiguration setProperties(Map<String, String> properties) {
-        if (properties != null) {
-            this.properties.putAll(properties);
-        }
+        conf.getProperties().putAll(properties);
         return this;
     }
 
     public Map<String, String> getProperties() {
-        return properties;
+        return conf.getProperties();
     }
 
-    @Override
-    public boolean equals(Object obj) {
-        if (obj instanceof ProducerConfiguration) {
-            ProducerConfiguration other = (ProducerConfiguration) obj;
-            return Objects.equal(this.sendTimeoutMs, other.sendTimeoutMs)
-                    && Objects.equal(maxPendingMessages, other.maxPendingMessages)
-                    && Objects.equal(this.messageRouteMode, other.messageRouteMode)
-                    && Objects.equal(this.hashingScheme, other.hashingScheme);
-        }
-
-        return false;
+    public ProducerConfigurationData getProducerConfigurationData() {
+        return conf;
     }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
index 6f3816c45..cdd51a514 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
@@ -23,6 +23,8 @@
 
 import java.io.Serializable;
 
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+
 /**
  *
  * @deprecated Use {@link PulsarClient#newReader()} to construct and configure a {@link Reader} instance
@@ -30,20 +32,13 @@
 @Deprecated
 public class ReaderConfiguration implements Serializable {
 
-    private int receiverQueueSize = 1000;
-
-    private ReaderListener readerListener;
-
-    private String readerName = null;
-
-    private CryptoKeyReader cryptoKeyReader = null;
-    private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;
+    private final ReaderConfigurationData conf = new ReaderConfigurationData();
 
     /**
      * @return the configured {@link ReaderListener} for the reader
      */
     public ReaderListener getReaderListener() {
-        return this.readerListener;
+        return conf.getReaderListener();
     }
 
     /**
@@ -57,7 +52,7 @@ public ReaderListener getReaderListener() {
      */
     public ReaderConfiguration setReaderListener(ReaderListener readerListener) {
         checkNotNull(readerListener);
-        this.readerListener = readerListener;
+        conf.setReaderListener(readerListener);
         return this;
     }
 
@@ -65,14 +60,14 @@ public ReaderConfiguration setReaderListener(ReaderListener readerListener) {
      * @return the configure receiver queue size value
      */
     public int getReceiverQueueSize() {
-        return this.receiverQueueSize;
+        return conf.getReceiverQueueSize();
     }
 
     /**
      * @return the CryptoKeyReader
      */
     public CryptoKeyReader getCryptoKeyReader() {
-        return this.cryptoKeyReader;
+        return conf.getCryptoKeyReader();
     }
 
     /**
@@ -83,7 +78,7 @@ public CryptoKeyReader getCryptoKeyReader() {
      */
     public ReaderConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
         checkNotNull(cryptoKeyReader);
-        this.cryptoKeyReader = cryptoKeyReader;
+        conf.setCryptoKeyReader(cryptoKeyReader);
         return this;
     }
 
@@ -94,14 +89,14 @@ public ReaderConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
      *            The action to take when the decoding fails
      */
     public void setCryptoFailureAction(ConsumerCryptoFailureAction action) {
-        cryptoFailureAction = action;
+        conf.setCryptoFailureAction(action);
     }
 
     /**
      * @return The ConsumerCryptoFailureAction
      */
     public ConsumerCryptoFailureAction getCryptoFailureAction() {
-        return this.cryptoFailureAction;
+        return conf.getCryptoFailureAction();
     }
 
     /**
@@ -118,7 +113,7 @@ public ConsumerCryptoFailureAction getCryptoFailureAction() {
      */
     public ReaderConfiguration setReceiverQueueSize(int receiverQueueSize) {
         checkArgument(receiverQueueSize >= 0, "Receiver queue size cannot be negative");
-        this.receiverQueueSize = receiverQueueSize;
+        conf.setReceiverQueueSize(receiverQueueSize);
         return this;
     }
 
@@ -126,7 +121,7 @@ public ReaderConfiguration setReceiverQueueSize(int receiverQueueSize) {
      * @return the consumer name
      */
     public String getReaderName() {
-        return readerName;
+        return conf.getReaderName();
     }
 
     /**
@@ -136,9 +131,13 @@ public String getReaderName() {
      */
     public ReaderConfiguration setReaderName(String readerName) {
         checkArgument(readerName != null && !readerName.equals(""));
-        this.readerName = readerName;
+        conf.setReaderName(readerName);
         return this;
     }
 
+    public ReaderConfigurationData getReaderConfigurationData() {
+        return conf;
+    }
+
     private static final long serialVersionUID = 1L;
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 8ad14ccb4..6c87f5645 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -36,7 +36,6 @@
 import org.slf4j.LoggerFactory;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.resolver.InetSocketAddressResolver;
 
 public class BinaryProtoLookupService implements LookupService {
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 2be5318e0..4cffb7fa8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -22,41 +22,44 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 
-@SuppressWarnings("deprecation")
 public class ClientBuilderImpl implements ClientBuilder {
 
     private static final long serialVersionUID = 1L;
 
-    String serviceUrl;
-    final ClientConfiguration conf = new ClientConfiguration();
+    final ClientConfigurationData conf;
+
+    public ClientBuilderImpl() {
+        this(new ClientConfigurationData());
+    }
+
+    private ClientBuilderImpl(ClientConfigurationData conf) {
+        this.conf = conf;
+    }
 
     @Override
     public PulsarClient build() throws PulsarClientException {
-        if (serviceUrl == null) {
+        if (conf.getServiceUrl() == null) {
             throw new IllegalArgumentException("service URL needs to be specified on the ClientBuilder object");
         }
 
-        return new PulsarClientImpl(serviceUrl, conf);
+        return new PulsarClientImpl(conf);
     }
 
     @Override
     public ClientBuilder clone() {
-        try {
-            return (ClientBuilder) super.clone();
-        } catch (CloneNotSupportedException e) {
-            throw new RuntimeException("Failed to clone ClientBuilderImpl");
-        }
+        return new ClientBuilderImpl(conf.clone());
     }
 
     @Override
     public ClientBuilder serviceUrl(String serviceUrl) {
-        this.serviceUrl = serviceUrl;
+        conf.setServiceUrl(serviceUrl);
         return this;
     }
 
@@ -69,32 +72,32 @@ public ClientBuilder authentication(Authentication authentication) {
     @Override
     public ClientBuilder authentication(String authPluginClassName, String authParamsString)
             throws UnsupportedAuthenticationException {
-        conf.setAuthentication(authPluginClassName, authParamsString);
+        conf.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParamsString));
         return this;
     }
 
     @Override
     public ClientBuilder authentication(String authPluginClassName, Map<String, String> authParams)
             throws UnsupportedAuthenticationException {
-        conf.setAuthentication(authPluginClassName, authParams);
+        conf.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParams));
         return this;
     }
 
     @Override
     public ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit) {
-        conf.setOperationTimeout(operationTimeout, unit);
+        conf.setOperationTimeoutMs(unit.toMillis(operationTimeout));
         return this;
     }
 
     @Override
     public ClientBuilder ioThreads(int numIoThreads) {
-        conf.setIoThreads(numIoThreads);
+        conf.setNumIoThreads(numIoThreads);
         return this;
     }
 
     @Override
     public ClientBuilder listenerThreads(int numListenerThreads) {
-        conf.setListenerThreads(numListenerThreads);
+        conf.setNumListenerThreads(numListenerThreads);
         return this;
     }
 
@@ -136,7 +139,7 @@ public ClientBuilder allowTlsInsecureConnection(boolean tlsAllowInsecureConnecti
 
     @Override
     public ClientBuilder statsInterval(long statsInterval, TimeUnit unit) {
-        conf.setStatsInterval(statsInterval, unit);
+        conf.setStatsIntervalSeconds(unit.toSeconds(statsInterval));
         return this;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 43c49b2a9..7505addb0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -33,11 +33,12 @@
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.http.conn.ssl.DefaultHostnameVerifier;
 import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.TimeoutException;
 import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarHandler;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
@@ -57,7 +58,6 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
-import org.apache.http.conn.ssl.DefaultHostnameVerifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -106,7 +106,7 @@
         None, SentConnectFrame, Ready, Failed
     }
 
-    public ClientCnx(ClientConfiguration conf, EventLoopGroup eventLoopGroup) {
+    public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
         super(30, TimeUnit.SECONDS);
         this.pendingLookupRequestSemaphore = new Semaphore(conf.getConcurrentLookupRequest(), true);
         this.authentication = conf.getAuthentication();
@@ -196,14 +196,14 @@ public static boolean isKnownException(Throwable t) {
 
     @Override
     protected void handleConnected(CommandConnected connected) {
-        
+
         if (isTlsHostnameVerificationEnable && remoteHostName != null && !verifyTlsHostName(remoteHostName, ctx)) {
             // close the connection if host-verification failed with the broker
             log.warn("[{}] Failed to verify hostname of {}", ctx.channel(), remoteHostName);
             ctx.close();
             return;
         }
-        
+
         checkArgument(state == State.SentConnectFrame);
 
         if (log.isDebugEnabled()) {
@@ -593,9 +593,9 @@ private void checkServerError(ServerError error, String errMsg) {
 
     /**
      * verifies host name provided in x509 Certificate in tls session
-     * 
+     *
      * it matches hostname with below scenarios
-     * 
+     *
      * <pre>
      *  1. Supports IPV4 and IPV6 host matching
      *  2. Supports wild card matching for DNS-name
@@ -605,7 +605,7 @@ private void checkServerError(ServerError error, String errMsg) {
      * 2.  localhost                    local*       PASS
      * 3.  pulsar1-broker.com           pulsar*.com  PASS
      * </pre>
-     * 
+     *
      * @param ctx
      * @return true if hostname is verified else return false
      */
@@ -648,7 +648,7 @@ void setTargetBroker(InetSocketAddress targetBrokerAddress) {
      void setRemoteHostName(String remoteHostName) {
         this.remoteHostName = remoteHostName;
     }
-    
+
     private PulsarClientException getPulsarClientException(ServerError error, String errorMsg) {
         switch (error) {
         case AuthenticationError:
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index c36fcd979..84edec123 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -31,8 +31,8 @@
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
-import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.ByteBufPair;
 import org.apache.pulsar.common.util.SecurityUtility;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
@@ -68,7 +68,7 @@
     private static final int MaxMessageSize = 5 * 1024 * 1024;
     public static final String TLS_HANDLER = "tls";
 
-    public ConnectionPool(ClientConfiguration conf, EventLoopGroup eventLoopGroup) {
+    public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
         this.eventLoopGroup = eventLoopGroup;
         this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index ef7ab8b82..842fbfb47 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.client.impl;
 
-import com.google.common.collect.Queues;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
@@ -28,20 +27,23 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.util.ConsumerName;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 
+import com.google.common.collect.Queues;
+
 public abstract class ConsumerBase extends HandlerBase implements Consumer {
 
     enum ConsumerType {
@@ -49,7 +51,7 @@
     }
 
     protected final String subscription;
-    protected final ConsumerConfiguration conf;
+    protected final ConsumerConfigurationData conf;
     protected final String consumerName;
     protected final CompletableFuture<Consumer> subscribeFuture;
     protected final MessageListener listener;
@@ -59,11 +61,11 @@
     protected final ConcurrentLinkedQueue<CompletableFuture<Message>> pendingReceives;
     protected int maxReceiverQueueSize;
 
-    protected ConsumerBase(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
-            int receiverQueueSize, ExecutorService listenerExecutor, CompletableFuture<Consumer> subscribeFuture) {
-        super(client, topic, new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0 , TimeUnit.MILLISECONDS));
+    protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData conf, int receiverQueueSize,
+            ExecutorService listenerExecutor, CompletableFuture<Consumer> subscribeFuture) {
+        super(client, topic, new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS));
         this.maxReceiverQueueSize = receiverQueueSize;
-        this.subscription = subscription;
+        this.subscription = conf.getSubscriptionName();
         this.conf = conf;
         this.consumerName = conf.getConsumerName() == null ? ConsumerName.generateRandomName() : conf.getConsumerName();
         this.subscribeFuture = subscribeFuture;
@@ -93,9 +95,13 @@ public Message receive() throws PulsarClientException {
         case Closing:
         case Closed:
             throw new PulsarClientException.AlreadyClosedException("Consumer already closed");
+        case Terminated:
+            throw new PulsarClientException.AlreadyClosedException("Topic was terminated");
         case Failed:
         case Uninitialized:
             throw new PulsarClientException.NotConnectedException();
+        default:
+            break;
         }
 
         return internalReceive();
@@ -116,6 +122,8 @@ public Message receive() throws PulsarClientException {
         case Closing:
         case Closed:
             return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer already closed"));
+        case Terminated:
+            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Topic was terminated"));
         case Failed:
         case Uninitialized:
             return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException());
@@ -146,6 +154,8 @@ public Message receive(int timeout, TimeUnit unit) throws PulsarClientException
         case Closing:
         case Closed:
             throw new PulsarClientException.AlreadyClosedException("Consumer already closed");
+        case Terminated:
+            throw new PulsarClientException.AlreadyClosedException("Topic was terminated");
         case Failed:
         case Uninitialized:
             throw new PulsarClientException.NotConnectedException();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 66cd2912c..6cac9b177 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -20,48 +20,45 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.util.FutureUtil;
 
+import com.google.common.collect.Lists;
 
-@SuppressWarnings("deprecation")
 public class ConsumerBuilderImpl implements ConsumerBuilder {
 
     private static final long serialVersionUID = 1L;
 
     private final PulsarClientImpl client;
-    private String subscriptionName;
-    private final ConsumerConfiguration conf;
-    private Set<String> topicNames;
+    private final ConsumerConfigurationData conf;
+
+    private static long MIN_ACK_TIMEOUT_MILLIS = 1000;
 
     ConsumerBuilderImpl(PulsarClientImpl client) {
+        this(client, new ConsumerConfigurationData());
+    }
+
+    private ConsumerBuilderImpl(PulsarClientImpl client, ConsumerConfigurationData conf) {
         this.client = client;
-        this.conf = new ConsumerConfiguration();
+        this.conf = conf;
     }
 
     @Override
     public ConsumerBuilder clone() {
-        try {
-            return (ConsumerBuilder) super.clone();
-        } catch (CloneNotSupportedException e) {
-            throw new RuntimeException("Failed to clone ConsumerBuilderImpl");
-        }
+        return new ConsumerBuilderImpl(client, conf.clone());
     }
 
     @Override
@@ -83,55 +80,45 @@ public Consumer subscribe() throws PulsarClientException {
 
     @Override
     public CompletableFuture<Consumer> subscribeAsync() {
-        if (topicNames == null || topicNames.isEmpty()) {
+        if (conf.getTopicNames().isEmpty()) {
             return FutureUtil
                     .failedFuture(new IllegalArgumentException("Topic name must be set on the consumer builder"));
         }
 
-        if (subscriptionName == null) {
+        if (conf.getSubscriptionName() == null) {
             return FutureUtil.failedFuture(
                     new IllegalArgumentException("Subscription name must be set on the consumer builder"));
         }
 
-        if (topicNames.size() == 1) {
-            return client.subscribeAsync(topicNames.stream().findFirst().orElse(""), subscriptionName, conf);
-        } else {
-            return client.subscribeAsync(topicNames, subscriptionName, conf);
-        }
+        return client.subscribeAsync(conf);
     }
 
     @Override
     public ConsumerBuilder topic(String... topicNames) {
         checkArgument(topicNames.length > 0, "Passed in topicNames should not be empty.");
-        if (this.topicNames == null) {
-            this.topicNames = Sets.newHashSet(topicNames);
-        } else {
-            this.topicNames.addAll(Lists.newArrayList(topicNames));
-        }
+        conf.getTopicNames().addAll(Lists.newArrayList(topicNames));
         return this;
     }
 
     @Override
     public ConsumerBuilder topics(List<String> topicNames) {
-        checkArgument(topicNames != null && !topicNames.isEmpty(),
-            "Passed in topicNames list should not be empty.");
-        if (this.topicNames == null) {
-            this.topicNames = Sets.newHashSet();
-        }
-        this.topicNames.addAll(topicNames);
+        checkArgument(topicNames != null && !topicNames.isEmpty(), "Passed in topicNames list should not be empty.");
+        conf.getTopicNames().addAll(topicNames);
 
         return this;
     }
 
     @Override
     public ConsumerBuilder subscriptionName(String subscriptionName) {
-        this.subscriptionName = subscriptionName;
+        conf.setSubscriptionName(subscriptionName);
         return this;
     }
 
     @Override
     public ConsumerBuilder ackTimeout(long ackTimeout, TimeUnit timeUnit) {
-        conf.setAckTimeout(ackTimeout, timeUnit);
+        checkArgument(timeUnit.toMillis(ackTimeout) >= MIN_ACK_TIMEOUT_MILLIS,
+                "Ack timeout should be should be greater than " + MIN_ACK_TIMEOUT_MILLIS + " ms");
+        conf.setAckTimeoutMillis(timeUnit.toMillis(ackTimeout));
         return this;
     }
 
@@ -179,13 +166,13 @@ public ConsumerBuilder priorityLevel(int priorityLevel) {
 
     @Override
     public ConsumerBuilder property(String key, String value) {
-        conf.setProperty(key, value);
+        conf.getProperties().put(key, value);
         return this;
     }
 
     @Override
     public ConsumerBuilder properties(Map<String, String> properties) {
-        conf.setProperties(properties);
+        conf.getProperties().putAll(properties);
         return this;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 93d0d8715..e9c781764 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -25,11 +25,6 @@
 import static org.apache.pulsar.common.api.Commands.hasChecksum;
 import static org.apache.pulsar.common.api.Commands.readChecksum;
 
-import com.google.common.collect.Iterables;
-import io.netty.buffer.ByteBuf;
-import io.netty.util.Timeout;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.BitSet;
@@ -52,13 +47,14 @@
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
+
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarDecoder;
 import org.apache.pulsar.common.api.proto.PulsarApi;
@@ -74,6 +70,13 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Iterables;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.Timeout;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+
 public class ConsumerImpl extends ConsumerBase {
     private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;
 
@@ -83,6 +86,7 @@
     // broker to notify that we are ready to get (and store in the incoming messages queue) more messages
     private static final AtomicIntegerFieldUpdater<ConsumerImpl> AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater
             .newUpdater(ConsumerImpl.class, "availablePermits");
+    @SuppressWarnings("unused")
     private volatile int availablePermits = 0;
 
     private MessageId lastDequeuedMessage = MessageId.earliest;
@@ -125,16 +129,15 @@
         NonDurable
     }
 
-    ConsumerImpl(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
+    ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData conf,
             ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer> subscribeFuture) {
-        this(client, topic, subscription, conf, listenerExecutor, partitionIndex, subscribeFuture,
-                SubscriptionMode.Durable, null);
+        this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, SubscriptionMode.Durable, null);
     }
 
-    ConsumerImpl(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
+    ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData conf,
             ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer> subscribeFuture,
             SubscriptionMode subscriptionMode, MessageId startMessageId) {
-        super(client, topic, subscription, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture);
+        super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture);
         this.consumerId = client.newConsumerId();
         this.subscriptionMode = subscriptionMode;
         this.startMessageId = startMessageId != null ? new BatchMessageIdImpl((MessageIdImpl) startMessageId) : null;
@@ -145,7 +148,7 @@
         this.codecProvider = new CompressionCodecProvider();
         this.priorityLevel = conf.getPriorityLevel();
         this.batchMessageAckTracker = new ConcurrentSkipListMap<>();
-        this.readCompacted = conf.getReadCompacted();
+        this.readCompacted = conf.isReadCompacted();
 
         if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
             stats = new ConsumerStats(client, conf, this);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java
index efa8dbb7c..3ef26c831 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java
@@ -24,8 +24,8 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAdder;
 
-import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,7 +74,7 @@ public ConsumerStats() {
         throughputFormat = null;
     }
 
-    public ConsumerStats(PulsarClientImpl pulsarClient, ConsumerConfiguration conf, ConsumerImpl consumer) {
+    public ConsumerStats(PulsarClientImpl pulsarClient, ConsumerConfigurationData conf, ConsumerImpl consumer) {
         this.pulsarClient = pulsarClient;
         this.consumer = consumer;
         this.statsIntervalSeconds = pulsarClient.getConfiguration().getStatsIntervalSeconds();
@@ -92,7 +92,7 @@ public ConsumerStats(PulsarClientImpl pulsarClient, ConsumerConfiguration conf,
         init(conf);
     }
 
-    private void init(ConsumerConfiguration conf) {
+    private void init(ConsumerConfigurationData conf) {
         ObjectMapper m = new ObjectMapper();
         m.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
         ObjectWriter w = m.writerWithDefaultPrettyPrinter();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index 7ad6e1a9d..eb672624d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -28,25 +28,31 @@
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 
-import io.netty.channel.EventLoopGroup;
-import io.netty.handler.codec.http.HttpRequest;
-import io.netty.handler.codec.http.HttpResponse;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.ssl.SslContext;
-
-import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.SecurityUtility;
-import org.asynchttpclient.*;
+import org.asynchttpclient.AsyncCompletionHandler;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.AsyncHttpClientConfig;
+import org.asynchttpclient.BoundRequestBuilder;
+import org.asynchttpclient.DefaultAsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.asynchttpclient.ListenableFuture;
+import org.asynchttpclient.Request;
+import org.asynchttpclient.Response;
 import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.util.concurrent.MoreExecutors;
 
+import io.netty.channel.EventLoopGroup;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.ssl.SslContext;
+
 public class HttpClient implements Closeable {
 
     protected final static int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10;
@@ -131,7 +137,7 @@ public void close() throws IOException {
                     builder.setHeader(header.getKey(), header.getValue());
                 }
             }
-            
+
             final ListenableFuture<Response> responseFuture = builder.setHeader("Accept", "application/json")
                     .execute(new AsyncCompletionHandler<Response>() {
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 208a9b119..14723795f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -23,8 +23,8 @@
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -40,9 +40,9 @@
     private final boolean useTls;
     private static final String BasePath = "lookup/v2/destination/";
 
-    public HttpLookupService(String serviceUrl, ClientConfiguration conf, EventLoopGroup eventLoopGroup)
+    public HttpLookupService(ClientConfigurationData conf, EventLoopGroup eventLoopGroup)
             throws PulsarClientException {
-        this.httpClient = new HttpClient(serviceUrl, conf.getAuthentication(),
+        this.httpClient = new HttpClient(conf.getServiceUrl(), conf.getAuthentication(),
                 eventLoopGroup, conf.isTlsAllowInsecureConnection(), conf.getTlsTrustCertsFilePath());
         this.useTls = conf.isUseTls();
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java
index 38d6a5850..e7115fc9a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java
@@ -37,6 +37,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
@@ -57,7 +58,6 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.bouncycastle.asn1.ASN1ObjectIdentifier;
 import org.bouncycastle.asn1.pkcs.PrivateKeyInfo;
 import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
@@ -283,23 +283,21 @@ private PrivateKey loadPrivateKey(byte[] keyBytes) throws Exception {
      * Encrypt data key using the public key(s) in the argument. <p> If more than one key name is specified, data key is
      * encrypted using each of those keys. If the public key is expired or changed, application is responsible to remove
      * the old key and add the new key <p>
-     * 
+     *
      * @param keyNames List of public keys to encrypt data key
-     * 
+     *
      * @param keyReader Implementation to read the key values
-     * 
+     *
      */
-    public synchronized void addPublicKeyCipher(ConcurrentOpenHashSet<String> keyNames, CryptoKeyReader keyReader)
+    public synchronized void addPublicKeyCipher(Set<String> keyNames, CryptoKeyReader keyReader)
             throws CryptoException {
 
         // Generate data key
         dataKey = keyGenerator.generateKey();
 
-        List<String> keyNameList = keyNames.values();
-        for (int i = 0; i < keyNameList.size(); i++) {
-            addPublicKeyCipher(keyNameList.get(i), keyReader);
+        for (String key : keyNames) {
+            addPublicKeyCipher(key, keyReader);
         }
-
     }
 
     private void addPublicKeyCipher(String keyName, CryptoKeyReader keyReader) throws CryptoException {
@@ -350,9 +348,9 @@ private void addPublicKeyCipher(String keyName, CryptoKeyReader keyReader) throw
 
     /*
      * Remove a key <p> Remove the key identified by the keyName from the list of keys.<p>
-     * 
+     *
      * @param keyName Unique name to identify the key
-     * 
+     *
      * @return true if succeeded, false otherwise
      */
     /*
@@ -368,16 +366,16 @@ public boolean removeKeyCipher(String keyName) {
 
     /*
      * Encrypt the payload using the data key and update message metadata with the keyname & encrypted data key
-     * 
+     *
      * @param encKeys One or more public keys to encrypt data key
-     * 
+     *
      * @param msgMetadata Message Metadata
-     * 
+     *
      * @param payload Message which needs to be encrypted
-     * 
+     *
      * @return encryptedData if success
      */
-    public synchronized ByteBuf encrypt(ConcurrentOpenHashSet<String> encKeys, CryptoKeyReader keyReader,
+    public synchronized ByteBuf encrypt(Set<String> encKeys, CryptoKeyReader keyReader,
             MessageMetadata.Builder msgMetadata, ByteBuf payload) throws PulsarClientException {
 
         if (encKeys.isEmpty()) {
@@ -385,9 +383,7 @@ public synchronized ByteBuf encrypt(ConcurrentOpenHashSet<String> encKeys, Crypt
         }
 
         // Update message metadata with encrypted data key
-        List<String> keyNameList = encKeys.values();
-        for (int i = 0; i < keyNameList.size(); i++) {
-            String keyName = keyNameList.get(i);
+        for (String keyName : encKeys) {
             if (encryptedDataKeyMap.get(keyName) == null) {
                 // Attempt to load the key. This will allow us to load keys as soon as
                 // a new key is added to producer config
@@ -569,13 +565,13 @@ private ByteBuf getKeyAndDecryptData(MessageMetadata msgMetadata, ByteBuf payloa
 
     /*
      * Decrypt the payload using the data key. Keys used to encrypt data key can be retrieved from msgMetadata
-     * 
+     *
      * @param msgMetadata Message Metadata
-     * 
+     *
      * @param payload Message which needs to be decrypted
-     * 
+     *
      * @param keyReader KeyReader implementation to retrieve key value
-     * 
+     *
      * @return decryptedData if success, null otherwise
      */
     public ByteBuf decrypt(MessageMetadata msgMetadata, ByteBuf payload, CryptoKeyReader keyReader) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRouterBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRouterBase.java
index 4825dabbb..1e4c036fd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRouterBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRouterBase.java
@@ -18,20 +18,22 @@
  */
 package org.apache.pulsar.client.impl;
 
+import org.apache.pulsar.client.api.HashingScheme;
 import org.apache.pulsar.client.api.MessageRouter;
-import org.apache.pulsar.client.api.ProducerConfiguration;
 
 public abstract class MessageRouterBase implements MessageRouter {
+    private static final long serialVersionUID = 1L;
+
     protected final Hash hash;
 
-    MessageRouterBase(ProducerConfiguration.HashingScheme hashingScheme) {
+    MessageRouterBase(HashingScheme hashingScheme) {
         switch (hashingScheme) {
-            case JavaStringHash:
-                this.hash = JavaStringHash.getInstance();
-                break;
-            case Murmur3_32Hash:
-            default:
-                this.hash = Murmur3_32Hash.getInstance();
+        case JavaStringHash:
+            this.hash = JavaStringHash.getInstance();
+            break;
+        case Murmur3_32Hash:
+        default:
+            this.hash = Murmur3_32Hash.getInstance();
         }
     }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index 332dd5626..dff123387 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -35,11 +35,11 @@
 import java.util.stream.Collectors;
 
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -65,10 +65,10 @@
     private final ConsumerStats stats;
     private final UnAckedMessageTracker unAckedMessageTracker;
 
-    PartitionedConsumerImpl(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
-            int numPartitions, ExecutorService listenerExecutor, CompletableFuture<Consumer> subscribeFuture) {
-        super(client, topic, subscription, conf, Math.max(Math.max(2, numPartitions), conf.getReceiverQueueSize()), listenerExecutor,
-                subscribeFuture);
+    PartitionedConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf, int numPartitions,
+            ExecutorService listenerExecutor, CompletableFuture<Consumer> subscribeFuture) {
+        super(client, conf.getSingleTopic(), conf, Math.max(Math.max(2, numPartitions), conf.getReceiverQueueSize()),
+                listenerExecutor, subscribeFuture);
         this.consumers = Lists.newArrayListWithCapacity(numPartitions);
         this.pausedConsumers = new ConcurrentLinkedQueue<>();
         this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
@@ -89,10 +89,10 @@
     private void start() {
         AtomicReference<Throwable> subscribeFail = new AtomicReference<Throwable>();
         AtomicInteger completed = new AtomicInteger();
-        ConsumerConfiguration internalConfig = getInternalConsumerConfig();
+        ConsumerConfigurationData internalConfig = getInternalConsumerConfig();
         for (int partitionIndex = 0; partitionIndex < numPartitions; partitionIndex++) {
             String partitionName = DestinationName.get(topic).getPartition(partitionIndex).toString();
-            ConsumerImpl consumer = new ConsumerImpl(client, partitionName, subscription, internalConfig,
+            ConsumerImpl consumer = new ConsumerImpl(client, partitionName, internalConfig,
                     client.externalExecutorProvider().getExecutor(), partitionIndex, new CompletableFuture<Consumer>());
             consumers.add(consumer);
             consumer.subscribeFuture().handle((cons, subscribeException) -> {
@@ -434,9 +434,10 @@ String getHandlerName() {
         return subscription;
     }
 
-    private ConsumerConfiguration getInternalConsumerConfig() {
-        ConsumerConfiguration internalConsumerConfig = new ConsumerConfiguration();
+    private ConsumerConfigurationData getInternalConsumerConfig() {
+        ConsumerConfigurationData internalConsumerConfig = new ConsumerConfigurationData();
         internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize());
+        internalConsumerConfig.setSubscriptionName(conf.getSubscriptionName());
         internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType());
         internalConsumerConfig.setConsumerName(consumerName);
         if (null != conf.getConsumerEventListener()) {
@@ -451,7 +452,7 @@ private ConsumerConfiguration getInternalConsumerConfig() {
             internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction());
         }
         if (conf.getAckTimeoutMillis() != 0) {
-            internalConsumerConfig.setAckTimeout(conf.getAckTimeoutMillis(), TimeUnit.MILLISECONDS);
+            internalConsumerConfig.setAckTimeoutMillis(conf.getAckTimeoutMillis());
         }
 
         return internalConsumerConfig;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 68fda5099..cf2aab16e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -30,11 +30,11 @@
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
@@ -49,7 +49,7 @@
     private final ProducerStats stats;
     private final TopicMetadata topicMetadata;
 
-    public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration conf, int numPartitions,
+    public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, int numPartitions,
             CompletableFuture<Producer> producerCreatedFuture) {
         super(client, topic, conf, producerCreatedFuture);
         this.producers = Lists.newArrayListWithCapacity(numPartitions);
@@ -67,7 +67,7 @@ private MessageRouter getMessageRouter() {
         MessageRouter messageRouter;
 
         MessageRoutingMode messageRouteMode = conf.getMessageRoutingMode();
-        MessageRouter customMessageRouter = conf.getMessageRouter();
+        MessageRouter customMessageRouter = conf.getCustomMessageRouter();
 
         switch (messageRouteMode) {
         case CustomPartition:
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
index 03680a1bc..0876db1ac 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
@@ -26,17 +26,18 @@
 import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 
 public abstract class ProducerBase extends HandlerBase implements Producer {
 
     protected final CompletableFuture<Producer> producerCreatedFuture;
-    protected final ProducerConfiguration conf;
+    protected final ProducerConfigurationData conf;
 
-    protected ProducerBase(PulsarClientImpl client, String topic, ProducerConfiguration conf,
+    protected ProducerBase(PulsarClientImpl client, String topic, ProducerConfigurationData conf,
             CompletableFuture<Producer> producerCreatedFuture) {
-        super(client, topic, new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS));
+        super(client, topic, new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS,
+                Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS));
         this.producerCreatedFuture = producerCreatedFuture;
         this.conf = conf;
     }
@@ -98,7 +99,7 @@ public String getTopic() {
         return topic;
     }
 
-    public ProducerConfiguration getConfiguration() {
+    public ProducerConfigurationData getConfiguration() {
         return conf;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 6bb9a9b59..230d9c944 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -25,36 +25,35 @@
 
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.HashingScheme;
 import org.apache.pulsar.client.api.MessageRouter;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.common.util.FutureUtil;
 
-@SuppressWarnings("deprecation")
 public class ProducerBuilderImpl implements ProducerBuilder {
 
     private static final long serialVersionUID = 1L;
 
     private final PulsarClientImpl client;
-    private String topicName;
-    private final ProducerConfiguration conf;
+    private final ProducerConfigurationData conf;
 
     ProducerBuilderImpl(PulsarClientImpl client) {
+        this(client, new ProducerConfigurationData());
+    }
+
+    private ProducerBuilderImpl(PulsarClientImpl client, ProducerConfigurationData conf) {
         this.client = client;
-        this.conf = new ProducerConfiguration();
+        this.conf = conf;
     }
 
     @Override
     public ProducerBuilder clone() {
-        try {
-            return (ProducerBuilder) super.clone();
-        } catch (CloneNotSupportedException e) {
-            throw new RuntimeException("Failed to clone ProducerBuilderImpl");
-        }
+        return new ProducerBuilderImpl(client, conf.clone());
     }
 
     @Override
@@ -76,17 +75,17 @@ public Producer create() throws PulsarClientException {
 
     @Override
     public CompletableFuture<Producer> createAsync() {
-        if (topicName == null) {
+        if (conf.getTopicName() == null) {
             return FutureUtil
                     .failedFuture(new IllegalArgumentException("Topic name must be set on the producer builder"));
         }
 
-        return client.createProducerAsync(topicName, conf);
+        return client.createProducerAsync(conf);
     }
 
     @Override
     public ProducerBuilder topic(String topicName) {
-        this.topicName = topicName;
+        conf.setTopicName(topicName);
         return this;
     }
 
@@ -98,7 +97,7 @@ public ProducerBuilder producerName(String producerName) {
 
     @Override
     public ProducerBuilder sendTimeout(int sendTimeout, TimeUnit unit) {
-        conf.setSendTimeout(sendTimeout, unit);
+        conf.setSendTimeoutMs(unit.toMillis(sendTimeout));
         return this;
     }
 
@@ -122,7 +121,7 @@ public ProducerBuilder blockIfQueueFull(boolean blockIfQueueFull) {
 
     @Override
     public ProducerBuilder messageRoutingMode(MessageRoutingMode messageRouteMode) {
-        conf.setMessageRoutingMode(ProducerConfiguration.MessageRoutingMode.valueOf(messageRouteMode.toString()));
+        conf.setMessageRoutingMode(messageRouteMode);
         return this;
     }
 
@@ -132,9 +131,15 @@ public ProducerBuilder compressionType(CompressionType compressionType) {
         return this;
     }
 
+    @Override
+    public ProducerBuilder hashingScheme(HashingScheme hashingScheme) {
+        conf.setHashingScheme(hashingScheme);
+        return this;
+    }
+
     @Override
     public ProducerBuilder messageRouter(MessageRouter messageRouter) {
-        conf.setMessageRouter(messageRouter);
+        conf.setCustomMessageRouter(messageRouter);
         return this;
     }
 
@@ -152,7 +157,7 @@ public ProducerBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
 
     @Override
     public ProducerBuilder addEncryptionKey(String key) {
-        conf.addEncryptionKey(key);
+        conf.getEncryptionKeys().add(key);
         return this;
     }
 
@@ -164,7 +169,7 @@ public ProducerBuilder cryptoFailureAction(ProducerCryptoFailureAction action) {
 
     @Override
     public ProducerBuilder batchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit) {
-        conf.setBatchingMaxPublishDelay(batchDelay, timeUnit);
+        conf.setBatchingMaxPublishDelayMicros(timeUnit.toMicros(batchDelay));
         return this;
     }
 
@@ -182,13 +187,13 @@ public ProducerBuilder initialSequenceId(long initialSequenceId) {
 
     @Override
     public ProducerBuilder property(String key, String value) {
-        conf.setProperty(key, value);
+        conf.getProperties().put(key, value);
         return this;
     }
 
     @Override
     public ProducerBuilder properties(Map<String, String> properties) {
-        conf.setProperties(properties);
+        conf.getProperties().putAll(properties);
         return this;
     }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 32d61a532..923a7e87e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -20,7 +20,6 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.lang.String.format;
-import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.pulsar.checksum.utils.Crc32cChecksum.computeChecksum;
 import static org.apache.pulsar.checksum.utils.Crc32cChecksum.resumeChecksum;
 import static org.apache.pulsar.common.api.Commands.hasChecksum;
@@ -42,13 +41,13 @@
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.CryptoException;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.common.api.ByteBufPair;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.Commands.ChecksumType;
-import org.apache.pulsar.common.api.ByteBufPair;
 import org.apache.pulsar.common.api.PulsarDecoder;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
@@ -107,7 +106,7 @@
     private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater
             .newUpdater(ProducerImpl.class, "msgIdGenerator");
 
-    public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration conf,
+    public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf,
             CompletableFuture<Producer> producerCreatedFuture, int partitionIndex) {
         super(client, topic, conf, producerCreatedFuture);
         this.producerId = client.newProducerId();
@@ -119,8 +118,8 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
         this.compressor = CompressionCodecProvider
                 .getCompressionCodec(convertCompressionType(conf.getCompressionType()));
 
-        if (conf.getInitialSequenceId().isPresent()) {
-            long initialSequenceId = conf.getInitialSequenceId().get();
+        if (conf.getInitialSequenceId() != null) {
+            long initialSequenceId = conf.getInitialSequenceId();
             this.lastSequenceIdPublished = initialSequenceId;
             this.msgIdGenerator = initialSequenceId + 1;
         } else {
@@ -151,7 +150,7 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
         }
 
         this.createProducerTimeout = System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs();
-        if (conf.getBatchingEnabled()) {
+        if (conf.isBatchingEnabled()) {
             this.maxNumMessagesInBatch = conf.getBatchingMaxMessages();
             this.batchMessageContainer = new BatchMessageContainer(maxNumMessagesInBatch,
                     convertCompressionType(conf.getCompressionType()), topic, producerName);
@@ -175,7 +174,7 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
     }
 
     private boolean isBatchMessagingEnabled() {
-        return conf.getBatchingEnabled();
+        return conf.isBatchingEnabled();
     }
 
     @Override
@@ -421,7 +420,7 @@ private boolean isValidProducerState(SendCallback callback) {
 
     private boolean canEnqueueRequest(SendCallback callback) {
         try {
-            if (conf.getBlockIfQueueFull()) {
+            if (conf.isBlockIfQueueFull()) {
                 semaphore.acquire();
             } else {
                 if (!semaphore.tryAcquire()) {
@@ -843,15 +842,15 @@ void connectionOpened(final ClientCnx cnx) {
                             this.producerName = producerName;
                         }
 
-                        if (this.lastSequenceIdPublished == -1 && !conf.getInitialSequenceId().isPresent()) {
+                        if (this.lastSequenceIdPublished == -1 && conf.getInitialSequenceId() == null) {
                             this.lastSequenceIdPublished = lastSequenceId;
                             this.msgIdGenerator = lastSequenceId + 1;
                         }
 
                         if (!producerCreatedFuture.isDone() && isBatchMessagingEnabled()) {
                             // schedule the first batch message task
-                            client.timer().newTimeout(batchMessageAndSendTask, conf.getBatchingMaxPublishDelayMs(),
-                                    TimeUnit.MILLISECONDS);
+                            client.timer().newTimeout(batchMessageAndSendTask, conf.getBatchingMaxPublishDelayMicros(),
+                                    TimeUnit.MICROSECONDS);
                         }
                         resendMessages(cnx);
                     }
@@ -1139,7 +1138,7 @@ public void run(Timeout timeout) throws Exception {
                 batchMessageAndSend();
             }
             // schedule the next batch message task
-            client.timer().newTimeout(this, conf.getBatchingMaxPublishDelayMs(), TimeUnit.MILLISECONDS);
+            client.timer().newTimeout(this, conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
         }
     };
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java
index 682a35728..6a5e321ff 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java
@@ -24,7 +24,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAdder;
 
-import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,7 +74,7 @@ public ProducerStats() {
         ds = null;
     }
 
-    public ProducerStats(PulsarClientImpl pulsarClient, ProducerConfiguration conf, ProducerImpl producer) {
+    public ProducerStats(PulsarClientImpl pulsarClient, ProducerConfigurationData conf, ProducerImpl producer) {
         this.pulsarClient = pulsarClient;
         this.statsIntervalSeconds = pulsarClient.getConfiguration().getStatsIntervalSeconds();
         this.producer = producer;
@@ -92,7 +92,7 @@ public ProducerStats(PulsarClientImpl pulsarClient, ProducerConfiguration conf,
         init(conf);
     }
 
-    private void init(ProducerConfiguration conf) {
+    private void init(ProducerConfigurationData conf) {
         ObjectMapper m = new ObjectMapper();
         m.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
         ObjectWriter w = m.writerWithDefaultPrettyPrinter();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index c065532a1..d4c40d478 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -20,7 +20,6 @@
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
 
-import java.util.Collection;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -45,6 +44,10 @@
 import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.ReaderConfiguration;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.naming.DestinationDomain;
 import org.apache.pulsar.common.naming.DestinationName;
@@ -67,7 +70,7 @@
 
     private static final Logger log = LoggerFactory.getLogger(PulsarClientImpl.class);
 
-    private final ClientConfiguration conf;
+    private final ClientConfigurationData conf;
     private final LookupService lookup;
     private final ConnectionPool cnxPool;
     private final Timer timer;
@@ -87,37 +90,53 @@
 
     private final EventLoopGroup eventLoopGroup;
 
+    @Deprecated
     public PulsarClientImpl(String serviceUrl, ClientConfiguration conf) throws PulsarClientException {
-        this(serviceUrl, conf, getEventLoopGroup(conf));
+        this(conf.setServiceUrl(serviceUrl).getConfigurationData().clone());
     }
 
+    @Deprecated
     public PulsarClientImpl(String serviceUrl, ClientConfiguration conf, EventLoopGroup eventLoopGroup)
             throws PulsarClientException {
-        this(serviceUrl, conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup));
+        this(conf.setServiceUrl(serviceUrl).getConfigurationData().clone(), eventLoopGroup);
     }
 
+    @Deprecated
     public PulsarClientImpl(String serviceUrl, ClientConfiguration conf, EventLoopGroup eventLoopGroup,
             ConnectionPool cnxPool) throws PulsarClientException {
-        if (isBlank(serviceUrl) || conf == null || eventLoopGroup == null) {
+        this(conf.setServiceUrl(serviceUrl).getConfigurationData().clone(), eventLoopGroup, cnxPool);
+    }
+
+    public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
+        this(conf, getEventLoopGroup(conf));
+    }
+
+    public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
+        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup));
+    }
+
+    public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool)
+            throws PulsarClientException {
+        if (conf == null || isBlank(conf.getServiceUrl()) || eventLoopGroup == null) {
             throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration");
         }
         this.eventLoopGroup = eventLoopGroup;
         this.conf = conf;
         conf.getAuthentication().start();
         this.cnxPool = cnxPool;
-        if (serviceUrl.startsWith("http")) {
-            lookup = new HttpLookupService(serviceUrl, conf, eventLoopGroup);
+        if (conf.getServiceUrl().startsWith("http")) {
+            lookup = new HttpLookupService(conf, eventLoopGroup);
         } else {
-            lookup = new BinaryProtoLookupService(this, serviceUrl, conf.isUseTls());
+            lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.isUseTls());
         }
         timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
-        externalExecutorProvider = new ExecutorProvider(conf.getListenerThreads(), "pulsar-external-listener");
+        externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
         producers = Maps.newIdentityHashMap();
         consumers = Maps.newIdentityHashMap();
         state.set(State.Open);
     }
 
-    public ClientConfiguration getConfiguration() {
+    public ClientConfigurationData getConfiguration() {
         return conf;
     }
 
@@ -137,9 +156,11 @@ public ReaderBuilder newReader() {
     }
 
     @Override
-    public Producer createProducer(String destination) throws PulsarClientException {
+    public Producer createProducer(String topic) throws PulsarClientException {
         try {
-            return createProducerAsync(destination, new ProducerConfiguration()).get();
+            ProducerConfigurationData conf = new ProducerConfigurationData();
+            conf.setTopicName(topic);
+            return createProducerAsync(conf).get();
         } catch (ExecutionException e) {
             Throwable t = e.getCause();
             if (t instanceof PulsarClientException) {
@@ -154,10 +175,15 @@ public Producer createProducer(String destination) throws PulsarClientException
     }
 
     @Override
-    public Producer createProducer(final String destination, final ProducerConfiguration conf)
-            throws PulsarClientException {
+    public Producer createProducer(final String topic, final ProducerConfiguration conf) throws PulsarClientException {
+        if (conf == null) {
+            throw new PulsarClientException.InvalidConfigurationException("Invalid null configuration object");
+        }
+
         try {
-            return createProducerAsync(destination, conf).get();
+            ProducerConfigurationData confData = conf.getProducerConfigurationData().clone();
+            confData.setTopicName(topic);
+            return createProducerAsync(confData).get();
         } catch (ExecutionException e) {
             Throwable t = e.getCause();
             if (t instanceof PulsarClientException) {
@@ -173,22 +199,33 @@ public Producer createProducer(final String destination, final ProducerConfigura
 
     @Override
     public CompletableFuture<Producer> createProducerAsync(String topic) {
-        return createProducerAsync(topic, new ProducerConfiguration());
+        ProducerConfigurationData conf = new ProducerConfigurationData();
+        conf.setTopicName(topic);
+        return createProducerAsync(conf);
     }
 
     @Override
     public CompletableFuture<Producer> createProducerAsync(final String topic, final ProducerConfiguration conf) {
+        ProducerConfigurationData confData = conf.getProducerConfigurationData().clone();
+        confData.setTopicName(topic);
+        return createProducerAsync(confData);
+    }
+
+    public CompletableFuture<Producer> createProducerAsync(ProducerConfigurationData conf) {
+        if (conf == null) {
+            return FutureUtil.failedFuture(
+                    new PulsarClientException.InvalidConfigurationException("Producer configuration undefined"));
+        }
+
         if (state.get() != State.Open) {
             return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
         }
 
+        String topic = conf.getTopicName();
+
         if (!DestinationName.isValid(topic)) {
             return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name"));
         }
-        if (conf == null) {
-            return FutureUtil.failedFuture(
-                    new PulsarClientException.InvalidConfigurationException("Producer configuration undefined"));
-        }
 
         CompletableFuture<Producer> producerCreatedFuture = new CompletableFuture<>();
 
@@ -242,44 +279,70 @@ public Consumer subscribe(String topic, String subscription, ConsumerConfigurati
 
     @Override
     public CompletableFuture<Consumer> subscribeAsync(String topic, String subscription) {
-        return subscribeAsync(topic, subscription, new ConsumerConfiguration());
+        ConsumerConfigurationData conf = new ConsumerConfigurationData();
+        conf.getTopicNames().add(topic);
+        conf.setSubscriptionName(subscription);
+        return subscribeAsync(conf);
     }
 
     @Override
     public CompletableFuture<Consumer> subscribeAsync(final String topic, final String subscription,
             final ConsumerConfiguration conf) {
+        if (conf == null) {
+            return FutureUtil.failedFuture(
+                    new PulsarClientException.InvalidConfigurationException("Invalid null configuration"));
+        }
+
+        ConsumerConfigurationData confData = conf.getConfigurationData().clone();
+        confData.getTopicNames().add(topic);
+        confData.setSubscriptionName(subscription);
+        return subscribeAsync(confData);
+    }
+
+    public CompletableFuture<Consumer> subscribeAsync(ConsumerConfigurationData conf) {
         if (state.get() != State.Open) {
             return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
         }
-        if (!DestinationName.isValid(topic)) {
+
+        if (conf == null) {
+            return FutureUtil.failedFuture(
+                    new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
+        }
+
+        if (!conf.getTopicNames().stream().allMatch(topic -> DestinationName.isValid(topic))) {
             return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name"));
         }
-        if (isBlank(subscription)) {
+
+        if (isBlank(conf.getSubscriptionName())) {
             return FutureUtil
                     .failedFuture(new PulsarClientException.InvalidConfigurationException("Empty subscription name"));
         }
-        if (conf == null) {
-            return FutureUtil.failedFuture(
-                    new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
-        }
-        if (conf.getReadCompacted()
-            && (!DestinationName.get(topic).getDomain().equals(DestinationDomain.persistent)
-                    || (conf.getSubscriptionType() != SubscriptionType.Exclusive
+
+        if (conf.isReadCompacted() && (!conf.getTopicNames().stream()
+                .allMatch(topic -> DestinationName.get(topic).getDomain() == DestinationDomain.persistent)
+                || (conf.getSubscriptionType() != SubscriptionType.Exclusive
                         && conf.getSubscriptionType() != SubscriptionType.Failover))) {
-            return FutureUtil.failedFuture(
-                    new PulsarClientException.InvalidConfigurationException(
-                            "Read compacted can only be used with exclusive of failover persistent subscriptions"));
+            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(
+                    "Read compacted can only be used with exclusive of failover persistent subscriptions"));
         }
 
-        if (conf.getConsumerEventListener() != null
-            && conf.getSubscriptionType() != SubscriptionType.Failover) {
-            return FutureUtil.failedFuture(
-                    new PulsarClientException.InvalidConfigurationException(
-                        "Active consumer listener is only supported for failover subscription"));
+        if (conf.getConsumerEventListener() != null && conf.getSubscriptionType() != SubscriptionType.Failover) {
+            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(
+                    "Active consumer listener is only supported for failover subscription"));
         }
 
+        if (conf.getTopicNames().size() == 1) {
+            return singleTopicSubscribeAsysnc(conf);
+        } else {
+            return multiTopicSubscribeAsync(conf);
+        }
+    }
+
+    private CompletableFuture<Consumer> singleTopicSubscribeAsysnc(ConsumerConfigurationData conf) {
         CompletableFuture<Consumer> consumerSubscribedFuture = new CompletableFuture<>();
 
+        String topic = conf.getSingleTopic();
+
         getPartitionedTopicMetadata(topic).thenAccept(metadata -> {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Received topic metadata. partitions: {}", topic, metadata.partitions);
@@ -289,10 +352,10 @@ public Consumer subscribe(String topic, String subscription, ConsumerConfigurati
             // gets the next single threaded executor from the list of executors
             ExecutorService listenerThread = externalExecutorProvider.getExecutor();
             if (metadata.partitions > 1) {
-                consumer = new PartitionedConsumerImpl(PulsarClientImpl.this, topic, subscription, conf,
-                        metadata.partitions, listenerThread, consumerSubscribedFuture);
+                consumer = new PartitionedConsumerImpl(PulsarClientImpl.this, conf, metadata.partitions, listenerThread,
+                        consumerSubscribedFuture);
             } else {
-                consumer = new ConsumerImpl(PulsarClientImpl.this, topic, subscription, conf, listenerThread, -1,
+                consumer = new ConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, -1,
                         consumerSubscribedFuture);
             }
 
@@ -308,31 +371,12 @@ public Consumer subscribe(String topic, String subscription, ConsumerConfigurati
         return consumerSubscribedFuture;
     }
 
-    public CompletableFuture<Consumer> subscribeAsync(Collection<String> topics,
-                                                      String subscription,
-                                                      ConsumerConfiguration conf) {
-        if (topics == null || topics.isEmpty()) {
-            return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Empty topics name"));
-        }
-
-        if (state.get() != State.Open) {
-            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
-        }
-
-        if (isBlank(subscription)) {
-            return FutureUtil
-                .failedFuture(new PulsarClientException.InvalidConfigurationException("Empty subscription name"));
-        }
-        if (conf == null) {
-            return FutureUtil.failedFuture(
-                new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
-        }
-
+    private CompletableFuture<Consumer> multiTopicSubscribeAsync(ConsumerConfigurationData conf) {
         CompletableFuture<Consumer> consumerSubscribedFuture = new CompletableFuture<>();
 
-        ConsumerBase consumer = new TopicsConsumerImpl(PulsarClientImpl.this, topics, subscription,
-            conf, externalExecutorProvider.getExecutor(),
-            consumerSubscribedFuture);
+        ConsumerBase consumer = new TopicsConsumerImpl(PulsarClientImpl.this, conf,
+                externalExecutorProvider.getExecutor(), consumerSubscribedFuture);
+
         synchronized (consumers) {
             consumers.put(consumer, Boolean.TRUE);
         }
@@ -361,20 +405,32 @@ public Reader createReader(String topic, MessageId startMessageId, ReaderConfigu
     @Override
     public CompletableFuture<Reader> createReaderAsync(String topic, MessageId startMessageId,
             ReaderConfiguration conf) {
+        ReaderConfigurationData confData = conf.getReaderConfigurationData().clone();
+        confData.setTopicName(topic);
+        confData.setStartMessageId(startMessageId);
+        return createReaderAsync(confData);
+    }
+
+    public CompletableFuture<Reader> createReaderAsync(ReaderConfigurationData conf) {
         if (state.get() != State.Open) {
             return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
         }
+
+        if (conf == null) {
+            return FutureUtil.failedFuture(
+                    new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
+        }
+
+        String topic = conf.getTopicName();
+
         if (!DestinationName.isValid(topic)) {
             return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name"));
         }
-        if (startMessageId == null) {
+
+        if (conf.getStartMessageId() == null) {
             return FutureUtil
                     .failedFuture(new PulsarClientException.InvalidConfigurationException("Invalid startMessageId"));
         }
-        if (conf == null) {
-            return FutureUtil.failedFuture(
-                    new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
-        }
 
         CompletableFuture<Reader> readerFuture = new CompletableFuture<>();
 
@@ -392,8 +448,7 @@ public Reader createReader(String topic, MessageId startMessageId, ReaderConfigu
             CompletableFuture<Consumer> consumerSubscribedFuture = new CompletableFuture<>();
             // gets the next single threaded executor from the list of executors
             ExecutorService listenerThread = externalExecutorProvider.getExecutor();
-            ReaderImpl reader = new ReaderImpl(PulsarClientImpl.this, topic, startMessageId, conf, listenerThread,
-                    consumerSubscribedFuture);
+            ReaderImpl reader = new ReaderImpl(PulsarClientImpl.this, conf, listenerThread, consumerSubscribedFuture);
 
             synchronized (consumers) {
                 consumers.put(reader.getConsumer(), Boolean.TRUE);
@@ -535,10 +590,9 @@ public EventLoopGroup eventLoopGroup() {
         return metadataFuture;
     }
 
-    private static EventLoopGroup getEventLoopGroup(ClientConfiguration conf) {
-        int numThreads = conf.getIoThreads();
+    private static EventLoopGroup getEventLoopGroup(ClientConfigurationData conf) {
         ThreadFactory threadFactory = new DefaultThreadFactory("pulsar-client-io");
-        return EventLoopUtil.newEventLoopGroup(numThreads, threadFactory);
+        return EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
     }
 
     void cleanupProducer(ProducerBase producer) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index f3751340f..eb1aeb8c9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -27,24 +27,25 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.client.api.ReaderConfiguration;
 import org.apache.pulsar.client.api.ReaderListener;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
 import org.apache.pulsar.common.util.FutureUtil;
 
-@SuppressWarnings("deprecation")
 public class ReaderBuilderImpl implements ReaderBuilder {
 
     private static final long serialVersionUID = 1L;
 
     private final PulsarClientImpl client;
 
-    private final ReaderConfiguration conf;
-    private String topicName;
-    private MessageId startMessageId;
+    private final ReaderConfigurationData conf;
 
     ReaderBuilderImpl(PulsarClientImpl client) {
+        this(client, new ReaderConfigurationData());
+    }
+
+    private ReaderBuilderImpl(PulsarClientImpl client, ReaderConfigurationData conf) {
         this.client = client;
-        this.conf = new ReaderConfiguration();
+        this.conf = conf;
     }
 
     @Override
@@ -75,28 +76,28 @@ public Reader create() throws PulsarClientException {
 
     @Override
     public CompletableFuture<Reader> createAsync() {
-        if (topicName == null) {
+        if (conf.getTopicName() == null) {
             return FutureUtil
                     .failedFuture(new IllegalArgumentException("Topic name must be set on the reader builder"));
         }
 
-        if (startMessageId == null) {
+        if (conf.getStartMessageId() == null) {
             return FutureUtil
                     .failedFuture(new IllegalArgumentException("Start message id must be set on the reader builder"));
         }
 
-        return client.createReaderAsync(topicName, startMessageId, conf);
+        return client.createReaderAsync(conf);
     }
 
     @Override
     public ReaderBuilder topic(String topicName) {
-        this.topicName = topicName;
+        conf.setTopicName(topicName);
         return this;
     }
 
     @Override
     public ReaderBuilder startMessageId(MessageId startMessageId) {
-        this.startMessageId = startMessageId;
+        conf.setStartMessageId(startMessageId);
         return this;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index bf0dc2fdc..aaa3902b8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -23,30 +23,31 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.api.ReaderConfiguration;
 import org.apache.pulsar.client.api.ReaderListener;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
 
 public class ReaderImpl implements Reader {
 
     private final ConsumerImpl consumer;
 
-    public ReaderImpl(PulsarClientImpl client, String topic, MessageId startMessageId,
-            ReaderConfiguration readerConfiguration, ExecutorService listenerExecutor,
-            CompletableFuture<Consumer> consumerFuture) {
+    public ReaderImpl(PulsarClientImpl client, ReaderConfigurationData readerConfiguration,
+            ExecutorService listenerExecutor, CompletableFuture<Consumer> consumerFuture) {
 
         String subscription = "reader-" + DigestUtils.sha1Hex(UUID.randomUUID().toString()).substring(0, 10);
 
-        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
+        ConsumerConfigurationData consumerConfiguration = new ConsumerConfigurationData();
+        consumerConfiguration.getTopicNames().add(readerConfiguration.getTopicName());
+        consumerConfiguration.setSubscriptionName(subscription);
         consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
         consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize());
         if (readerConfiguration.getReaderName() != null) {
@@ -76,8 +77,8 @@ public void reachedEndOfTopic(Consumer consumer) {
             consumerConfiguration.setCryptoKeyReader(readerConfiguration.getCryptoKeyReader());
         }
 
-        consumer = new ConsumerImpl(client, topic, subscription, consumerConfiguration, listenerExecutor, -1,
-                consumerFuture, SubscriptionMode.NonDurable, startMessageId);
+        consumer = new ConsumerImpl(client, readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor,
+                -1, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId());
     }
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
index a7c25c163..6b3f93794 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
@@ -20,17 +20,20 @@
 
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
+import org.apache.pulsar.client.api.HashingScheme;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.TopicMetadata;
 
 public class RoundRobinPartitionMessageRouterImpl extends MessageRouterBase {
 
+    private static final long serialVersionUID = 1L;
+
     private static final AtomicIntegerFieldUpdater<RoundRobinPartitionMessageRouterImpl> PARTITION_INDEX_UPDATER =
             AtomicIntegerFieldUpdater.newUpdater(RoundRobinPartitionMessageRouterImpl.class, "partitionIndex");
+    @SuppressWarnings("unused")
     private volatile int partitionIndex = 0;
 
-    public RoundRobinPartitionMessageRouterImpl(ProducerConfiguration.HashingScheme hashingScheme) {
+    public RoundRobinPartitionMessageRouterImpl(HashingScheme hashingScheme) {
         super(hashingScheme);
         PARTITION_INDEX_UPDATER.set(this, 0);
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
index 3a0cd544b..f9b94bc0e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
@@ -18,15 +18,17 @@
  */
 package org.apache.pulsar.client.impl;
 
+import org.apache.pulsar.client.api.HashingScheme;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.TopicMetadata;
 
 public class SinglePartitionMessageRouterImpl extends MessageRouterBase {
 
+    private static final long serialVersionUID = 1L;
+
     private final int partitionIndex;
 
-    public SinglePartitionMessageRouterImpl(int partitionIndex, ProducerConfiguration.HashingScheme hashingScheme) {
+    public SinglePartitionMessageRouterImpl(int partitionIndex, HashingScheme hashingScheme) {
         super(hashingScheme);
         this.partitionIndex = partitionIndex;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
index 852c5d25e..3abc9a10d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
@@ -21,7 +21,6 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
-import com.google.common.collect.Lists;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
@@ -37,15 +36,15 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.util.ConsumerName;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.naming.DestinationName;
@@ -54,6 +53,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
 public class TopicsConsumerImpl extends ConsumerBase {
 
     // All topics should be in same namespace
@@ -79,14 +80,12 @@
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
     private final ConsumerStats stats;
     private final UnAckedMessageTracker unAckedMessageTracker;
-    private final ConsumerConfiguration internalConfig;
+    private final ConsumerConfigurationData internalConfig;
 
-    TopicsConsumerImpl(PulsarClientImpl client, Collection<String> topics, String subscription,
-                       ConsumerConfiguration conf, ExecutorService listenerExecutor,
-                       CompletableFuture<Consumer> subscribeFuture) {
-        super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), subscription,
-            conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
-            subscribeFuture);
+    TopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf, ExecutorService listenerExecutor,
+            CompletableFuture<Consumer> subscribeFuture) {
+        super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), conf,
+                Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, subscribeFuture);
 
         checkArgument(conf.getReceiverQueueSize() > 0,
             "Receiver queue size needs to be greater than 0 for Topics Consumer");
@@ -106,23 +105,19 @@
         this.internalConfig = getInternalConsumerConfig();
         this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStats() : null;
 
-        if (topics.isEmpty()) {
+        if (conf.getTopicNames().isEmpty()) {
             this.namespaceName = null;
             setState(State.Ready);
             subscribeFuture().complete(TopicsConsumerImpl.this);
             return;
         }
 
-        checkArgument(topics.isEmpty() || topicNamesValid(topics), "Topics should have same namespace.");
-        this.namespaceName = topics.stream().findFirst().flatMap(
-            new Function<String, Optional<NamespaceName>>() {
-                @Override
-                public Optional<NamespaceName> apply(String s) {
-                    return Optional.of(DestinationName.get(s).getNamespaceObject());
-                }
-            }).get();
+        checkArgument(conf.getTopicNames().isEmpty() || topicNamesValid(conf.getTopicNames()), "Topics should have same namespace.");
+        this.namespaceName = conf.getTopicNames().stream().findFirst()
+                .flatMap(s -> Optional.of(DestinationName.get(s).getNamespaceObject())).get();
 
-        List<CompletableFuture<Void>> futures = topics.stream().map(t -> subscribeAsync(t)).collect(Collectors.toList());
+        List<CompletableFuture<Void>> futures = conf.getTopicNames().stream().map(t -> subscribeAsync(t))
+                .collect(Collectors.toList());
         FutureUtil.waitForAll(futures)
             .thenAccept(finalFuture -> {
                 try {
@@ -490,8 +485,9 @@ String getHandlerName() {
         return subscription;
     }
 
-    private ConsumerConfiguration getInternalConsumerConfig() {
-        ConsumerConfiguration internalConsumerConfig = new ConsumerConfiguration();
+    private ConsumerConfigurationData getInternalConsumerConfig() {
+        ConsumerConfigurationData internalConsumerConfig = new ConsumerConfigurationData();
+        internalConsumerConfig.setSubscriptionName(subscription);
         internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize());
         internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType());
         internalConsumerConfig.setConsumerName(consumerName);
@@ -500,7 +496,7 @@ private ConsumerConfiguration getInternalConsumerConfig() {
             internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction());
         }
         if (conf.getAckTimeoutMillis() != 0) {
-            internalConsumerConfig.setAckTimeout(conf.getAckTimeoutMillis(), TimeUnit.MILLISECONDS);
+            internalConsumerConfig.setAckTimeoutMillis(conf.getAckTimeoutMillis());
         }
 
         return internalConsumerConfig;
@@ -658,9 +654,8 @@ private boolean topicNameValid(String topicName) {
                         partitionIndex -> {
                             String partitionName = DestinationName.get(topicName).getPartition(partitionIndex).toString();
                             CompletableFuture<Consumer> subFuture = new CompletableFuture<Consumer>();
-                            ConsumerImpl newConsumer = new ConsumerImpl(client, partitionName, subscription, internalConfig,
-                                client.externalExecutorProvider().getExecutor(), partitionIndex,
-                                subFuture);
+                            ConsumerImpl newConsumer = new ConsumerImpl(client, partitionName, internalConfig,
+                                    client.externalExecutorProvider().getExecutor(), partitionIndex, subFuture);
                             consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
                             return subFuture;
                         })
@@ -671,9 +666,8 @@ private boolean topicNameValid(String topicName) {
                 partitionNumber.incrementAndGet();
 
                 CompletableFuture<Consumer> subFuture = new CompletableFuture<Consumer>();
-                ConsumerImpl newConsumer = new ConsumerImpl(client, topicName, subscription, internalConfig,
-                    client.externalExecutorProvider().getExecutor(), 0,
-                    subFuture);
+                ConsumerImpl newConsumer = new ConsumerImpl(client, topicName, internalConfig,
+                        client.externalExecutorProvider().getExecutor(), 0, subFuture);
                 consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
 
                 futureList = Lists.newArrayList(subFuture);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
new file mode 100644
index 000000000..4fe7569a3
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.conf;
+
+import java.io.Serializable;
+
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import lombok.Data;
+
+/**
+ * This is a simple holder of the client configuration values.
+ */
+@Data
+public class ClientConfigurationData implements Serializable, Cloneable {
+    private static final long serialVersionUID = 1L;
+
+    private String serviceUrl;
+
+    @JsonIgnore
+    private Authentication authentication = new AuthenticationDisabled();
+    private long operationTimeoutMs = 30000;
+    private long statsIntervalSeconds = 60;
+
+    private int numIoThreads = 1;
+    private int numListenerThreads = 1;
+    private int connectionsPerBroker = 1;
+
+    private boolean useTcpNoDelay = true;
+
+    private boolean useTls = false;
+    private String tlsTrustCertsFilePath = "";
+    private boolean tlsAllowInsecureConnection = false;
+    private boolean tlsHostnameVerificationEnable = false;
+    private int concurrentLookupRequest = 50000;
+    private int maxNumberOfRejectedRequestPerConnection = 50;
+
+    public ClientConfigurationData clone() {
+        try {
+            return (ClientConfigurationData) super.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeException("Failed to clone ClientConfigurationData");
+        }
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
new file mode 100644
index 000000000..89ab5d71a
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.conf;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.Serializable;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import lombok.Data;
+
+@Data
+public class ConsumerConfigurationData implements Serializable, Cloneable {
+    private static final long serialVersionUID = 1L;
+
+    private Set<String> topicNames = Sets.newTreeSet();
+
+    private String subscriptionName;
+
+    private SubscriptionType subscriptionType = SubscriptionType.Exclusive;
+
+    @JsonIgnore
+    private MessageListener messageListener;
+
+    @JsonIgnore
+    private ConsumerEventListener consumerEventListener;
+
+    private int receiverQueueSize = 1000;
+
+    private int maxTotalReceiverQueueSizeAcrossPartitions = 50000;
+
+    private String consumerName = null;
+
+    private long ackTimeoutMillis = 0;
+
+    private int priorityLevel = 0;
+
+    @JsonIgnore
+    private CryptoKeyReader cryptoKeyReader = null;
+
+    private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;
+
+    private SortedMap<String, String> properties = new TreeMap<>();
+
+    private boolean readCompacted = false;
+
+    @JsonIgnore
+    public String getSingleTopic() {
+        checkArgument(topicNames.size() == 1);
+        return topicNames.iterator().next();
+    }
+
+    public ConsumerConfigurationData clone() {
+        try {
+            ConsumerConfigurationData c = (ConsumerConfigurationData) super.clone();
+            c.topicNames = Sets.newTreeSet(this.topicNames);
+            c.properties = Maps.newTreeMap(this.properties);
+            return c;
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeException("Failed to clone ConsumerConfigurationData");
+        }
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
new file mode 100644
index 000000000..1449a454f
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.conf;
+
+import java.io.Serializable;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import lombok.Data;
+
+@Data
+public class ProducerConfigurationData implements Serializable, Cloneable {
+
+    private static final long serialVersionUID = 1L;
+
+    private String topicName = null;
+
+    private String producerName = null;
+    private long sendTimeoutMs = 30000;
+    private boolean blockIfQueueFull = false;
+    private int maxPendingMessages = 1000;
+    private int maxPendingMessagesAcrossPartitions = 50000;
+    private MessageRoutingMode messageRoutingMode = MessageRoutingMode.SinglePartition;
+    private HashingScheme hashingScheme = HashingScheme.JavaStringHash;
+
+    private ProducerCryptoFailureAction cryptoFailureAction = ProducerCryptoFailureAction.FAIL;
+
+    @JsonIgnore
+    private MessageRouter customMessageRouter = null;
+
+    private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(10);
+    private int batchingMaxMessages = 1000;
+    private boolean batchingEnabled = false; // disabled by default
+
+    @JsonIgnore
+    private CryptoKeyReader cryptoKeyReader;
+
+    @JsonIgnore
+    private Set<String> encryptionKeys = new TreeSet<>();
+
+    private CompressionType compressionType = CompressionType.NONE;
+
+    // Cannot use Optional<Long> since it's not serializable
+    private Long initialSequenceId = null;
+
+    private SortedMap<String, String> properties = new TreeMap<>();
+
+    /**
+     *
+     * Returns true if encryption keys are added
+     *
+     */
+    public boolean isEncryptionEnabled() {
+        return (this.encryptionKeys != null) && !this.encryptionKeys.isEmpty() && (this.cryptoKeyReader != null);
+    }
+
+    public ProducerConfigurationData clone() {
+        try {
+            ProducerConfigurationData c = (ProducerConfigurationData) super.clone();
+            c.encryptionKeys = Sets.newTreeSet(this.encryptionKeys);
+            c.properties = Maps.newTreeMap(this.properties);
+            return c;
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeException("Failed to clone ProducerConfigurationData", e);
+        }
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
new file mode 100644
index 000000000..d1323ee9e
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.conf;
+
+import java.io.Serializable;
+
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.ReaderListener;
+
+import lombok.Data;
+
+@Data
+public class ReaderConfigurationData implements Serializable, Cloneable {
+
+    private String topicName;
+    private MessageId startMessageId;
+
+    private int receiverQueueSize = 1000;
+
+    private ReaderListener readerListener;
+
+    private String readerName = null;
+
+    private CryptoKeyReader cryptoKeyReader = null;
+    private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;
+
+    public ReaderConfigurationData clone() {
+        try {
+            return (ReaderConfigurationData) super.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeException("Failed to clone ReaderConfigurationData");
+        }
+    }
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/api/ConsumerConfigurationTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/api/ConsumerConfigurationTest.java
index 2e7fd8006..8622542d0 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/api/ConsumerConfigurationTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/api/ConsumerConfigurationTest.java
@@ -21,13 +21,15 @@
 import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertFalse;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectWriter;
-import com.fasterxml.jackson.databind.SerializationFeature;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.Test;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.SerializationFeature;
+
 /**
  * Unit test of {@link ConsumerConfiguration}.
  */
@@ -38,20 +40,22 @@
     @Test
     public void testJsonIgnore() throws Exception {
 
-        ConsumerConfiguration conf = new ConsumerConfiguration()
-            .setConsumerEventListener(new ConsumerEventListener() {
+        ConsumerConfigurationData conf = new ConsumerConfigurationData();
+        conf.setConsumerEventListener(new ConsumerEventListener() {
+
+            @Override
+            public void becameActive(Consumer consumer, int partitionId) {
+            }
+
+            @Override
+            public void becameInactive(Consumer consumer, int partitionId) {
+            }
+        });
 
-                @Override
-                public void becameActive(Consumer consumer, int partitionId) {
-                }
+        conf.setMessageListener((MessageListener) (consumer, msg) -> {
+        });
 
-                @Override
-                public void becameInactive(Consumer consumer, int partitionId) {
-                }
-            })
-            .setMessageListener((MessageListener) (consumer, msg) -> {
-            })
-            .setCryptoKeyReader(mock(CryptoKeyReader.class));
+        conf.setCryptoKeyReader(mock(CryptoKeyReader.class));
 
         ObjectMapper m = new ObjectMapper();
         m.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
index f4b89d5d4..a9ab705a5 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
@@ -24,7 +24,6 @@
 import org.apache.pulsar.client.api.PulsarClient;
 import org.testng.annotations.Test;
 
-@SuppressWarnings("deprecation")
 public class BuildersTest {
 
     @Test
@@ -33,15 +32,15 @@ public void clientBuilderTest() {
                 .maxNumberOfRejectedRequestPerConnection(200).serviceUrl("pulsar://service:6650");
 
         assertEquals(clientBuilder.conf.isUseTls(), true);
-        assertEquals(clientBuilder.serviceUrl, "pulsar://service:6650");
+        assertEquals(clientBuilder.conf.getServiceUrl(), "pulsar://service:6650");
 
         ClientBuilderImpl b2 = (ClientBuilderImpl) clientBuilder.clone();
         assertTrue(b2 != clientBuilder);
 
         b2.serviceUrl("pulsar://other-broker:6650");
 
-        assertEquals(clientBuilder.serviceUrl, "pulsar://service:6650");
-        assertEquals(b2.serviceUrl, "pulsar://other-broker:6650");
+        assertEquals(clientBuilder.conf.getServiceUrl(), "pulsar://service:6650");
+        assertEquals(b2.conf.getServiceUrl(), "pulsar://other-broker:6650");
     }
 
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index a73342b4f..0584b7887 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -24,10 +24,9 @@
 import static org.testng.Assert.assertTrue;
 
 import java.lang.reflect.Field;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.PulsarHandler;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.testng.annotations.Test;
@@ -42,8 +41,8 @@
     @Test
     public void testClientCnxTimeout() throws Exception {
         EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("testClientCnxTimeout"));
-        ClientConfiguration conf = new ClientConfiguration();
-        conf.setOperationTimeout(10, TimeUnit.MILLISECONDS);
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setOperationTimeoutMs(10);
         ClientCnx cnx = new ClientCnx(conf, eventLoop);
 
         ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java
index d0e4d510b..bf41ca9f3 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java
@@ -22,8 +22,8 @@
 import static org.powermock.api.mockito.PowerMockito.when;
 import static org.testng.Assert.assertEquals;
 
+import org.apache.pulsar.client.api.HashingScheme;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.testng.annotations.Test;
 
 /**
@@ -36,7 +36,7 @@ public void testChoosePartitionWithoutKey() {
         Message msg = mock(Message.class);
         when(msg.getKey()).thenReturn(null);
 
-        RoundRobinPartitionMessageRouterImpl router = new RoundRobinPartitionMessageRouterImpl(ProducerConfiguration.HashingScheme.JavaStringHash);
+        RoundRobinPartitionMessageRouterImpl router = new RoundRobinPartitionMessageRouterImpl(HashingScheme.JavaStringHash);
         for (int i = 0; i < 10; i++) {
             assertEquals(i % 5, router.choosePartition(msg, new TopicMetadataImpl(5)));
         }
@@ -53,7 +53,7 @@ public void testChoosePartitionWithKey() {
         when(msg2.hasKey()).thenReturn(true);
         when(msg2.getKey()).thenReturn(key2);
 
-        RoundRobinPartitionMessageRouterImpl router = new RoundRobinPartitionMessageRouterImpl(ProducerConfiguration.HashingScheme.JavaStringHash);
+        RoundRobinPartitionMessageRouterImpl router = new RoundRobinPartitionMessageRouterImpl(HashingScheme.JavaStringHash);
         TopicMetadataImpl metadata = new TopicMetadataImpl(100);
 
         assertEquals(key1.hashCode() % 100, router.choosePartition(msg1, metadata));
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImplTest.java
index c8ea0b011..3cdf392a3 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImplTest.java
@@ -22,8 +22,8 @@
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 
+import org.apache.pulsar.client.api.HashingScheme;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.testng.annotations.Test;
 
 /**
@@ -36,7 +36,7 @@ public void testChoosePartitionWithoutKey() {
         Message msg = mock(Message.class);
         when(msg.getKey()).thenReturn(null);
 
-        SinglePartitionMessageRouterImpl router = new SinglePartitionMessageRouterImpl(1234, ProducerConfiguration.HashingScheme.JavaStringHash);
+        SinglePartitionMessageRouterImpl router = new SinglePartitionMessageRouterImpl(1234, HashingScheme.JavaStringHash);
         assertEquals(1234, router.choosePartition(msg, new TopicMetadataImpl(2468)));
     }
 
@@ -51,7 +51,7 @@ public void testChoosePartitionWithKey() {
         when(msg2.hasKey()).thenReturn(true);
         when(msg2.getKey()).thenReturn(key2);
 
-        SinglePartitionMessageRouterImpl router = new SinglePartitionMessageRouterImpl(1234, ProducerConfiguration.HashingScheme.JavaStringHash);
+        SinglePartitionMessageRouterImpl router = new SinglePartitionMessageRouterImpl(1234, HashingScheme.JavaStringHash);
         TopicMetadataImpl metadata = new TopicMetadataImpl(100);
 
         assertEquals(key1.hashCode() % 100, router.choosePartition(msg1, metadata));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services