You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/02 08:13:54 UTC

[incubator-pulsar] branch master updated: Converted main part of code to use builder APIs with typed interface (#1311)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a64b383  Converted main part of code to use builder APIs with typed interface (#1311)
a64b383 is described below

commit a64b383de7fee866756782c6743823bf90d69395
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Mar 2 00:13:51 2018 -0800

    Converted main part of code to use builder APIs with typed interface (#1311)
---
 .../pulsar/broker/service/BrokerService.java       | 37 +++++-----
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  8 +--
 .../client/impl/ConsumeBaseExceptionTest.java      |  2 +-
 .../clients/consumer/PulsarKafkaConsumer.java      | 51 +++++++-------
 .../clients/producer/PulsarKafkaProducer.java      | 42 ++++++------
 .../kafka/compat/PulsarClientKafkaConfig.java      | 37 ++++++----
 .../kafka/compat/PulsarConsumerKafkaConfig.java    | 15 ++--
 .../kafka/compat/PulsarProducerKafkaConfig.java    | 21 +++---
 .../apache/pulsar/client/api/ConsumerBuilder.java  |  6 +-
 .../pulsar/client/api/ConsumerConfiguration.java   |  4 +-
 .../pulsar/client/api/ConsumerEventListener.java   |  4 +-
 .../apache/pulsar/client/api/MessageRouter.java    |  4 +-
 .../apache/pulsar/client/api/ProducerBuilder.java  |  2 +-
 .../apache/pulsar/client/api/ReaderBuilder.java    |  2 +-
 .../pulsar/client/impl/BatchMessageContainer.java  |  6 +-
 .../pulsar/client/impl/ClientBuilderImpl.java      |  4 ++
 .../org/apache/pulsar/client/impl/ClientCnx.java   | 18 ++---
 .../apache/pulsar/client/impl/ConsumerBase.java    | 10 +--
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |  3 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 11 +--
 .../apache/pulsar/client/impl/ConsumerStats.java   |  8 +--
 .../pulsar/client/impl/ConsumerStatsDisabled.java  |  2 +-
 .../org/apache/pulsar/client/impl/MessageImpl.java | 21 +++---
 .../client/impl/PartitionedConsumerImpl.java       | 24 +++----
 .../client/impl/PartitionedProducerImpl.java       | 10 +--
 .../apache/pulsar/client/impl/ProducerImpl.java    | 21 +++---
 .../apache/pulsar/client/impl/ProducerStats.java   |  4 +-
 .../pulsar/client/impl/PulsarClientImpl.java       | 13 ++--
 .../pulsar/client/impl/ReaderBuilderImpl.java      |  7 +-
 .../org/apache/pulsar/client/impl/ReaderImpl.java  |  2 +-
 .../impl/RoundRobinPartitionMessageRouterImpl.java |  2 +-
 .../apache/pulsar/proxy/server/ProxyService.java   | 20 +++---
 .../proxy/server/ProxyForwardAuthDataTest.java     |  7 +-
 .../apache/pulsar/storm/MessageToValuesMapper.java |  2 +-
 .../java/org/apache/pulsar/storm/PulsarBolt.java   | 55 +++++++++++----
 .../java/org/apache/pulsar/storm/PulsarSpout.java  | 79 +++++++++++++++-------
 .../apache/pulsar/storm/SharedPulsarClient.java    | 55 ++++++++-------
 .../pulsar/testclient/PerformanceConsumer.java     | 62 ++++++++---------
 .../pulsar/testclient/PerformanceProducer.java     | 51 +++++++-------
 .../pulsar/testclient/PerformanceReader.java       | 36 +++++-----
 .../apache/pulsar/websocket/WebSocketService.java  | 29 ++++----
 41 files changed, 436 insertions(+), 361 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 1910680..bbdc696 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -78,11 +78,12 @@ import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect;
 import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventListner;
 import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventType;
-import org.apache.pulsar.client.api.ClientConfiguration;
-import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.configuration.FieldContext;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
@@ -499,27 +500,29 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                 String path = PulsarWebResource.path("clusters", cluster);
                 ClusterData data = this.pulsar.getConfigurationCache().clustersCache().get(path)
                         .orElseThrow(() -> new KeeperException.NoNodeException(path));
-                ClientConfiguration configuration = new ClientConfiguration();
-                configuration.setUseTcpNoDelay(false);
-                configuration.setConnectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker());
-                configuration.setStatsInterval(0, TimeUnit.SECONDS);
+                ClientBuilder clientBuilder = PulsarClient.builder()
+                        .enableTcpNoDelay(false)
+                        .connectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker())
+                        .statsInterval(0, TimeUnit.SECONDS);
                 if (pulsar.getConfiguration().isAuthenticationEnabled()) {
-                    configuration.setAuthentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
+                    clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
                             pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
                 }
-                String clusterUrl = null;
                 if (pulsar.getConfiguration().isReplicationTlsEnabled()) {
-                    clusterUrl = isNotBlank(data.getBrokerServiceUrlTls()) ? data.getBrokerServiceUrlTls()
-                            : data.getServiceUrlTls();
-                    configuration.setUseTls(true);
-                    configuration.setTlsTrustCertsFilePath(pulsar.getConfiguration().getBrokerClientTrustCertsFilePath());
-                    configuration
-                            .setTlsAllowInsecureConnection(pulsar.getConfiguration().isTlsAllowInsecureConnection());
+                    clientBuilder
+                            .serviceUrl(isNotBlank(data.getBrokerServiceUrlTls()) ? data.getBrokerServiceUrlTls()
+                                    : data.getServiceUrlTls())
+                            .enableTls(true)
+                            .tlsTrustCertsFilePath(pulsar.getConfiguration().getBrokerClientTrustCertsFilePath())
+                            .allowTlsInsecureConnection(pulsar.getConfiguration().isTlsAllowInsecureConnection());
                 } else {
-                    clusterUrl = isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl()
-                            : data.getServiceUrl();
+                    clientBuilder.serviceUrl(
+                            isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl() : data.getServiceUrl());
                 }
-                return new PulsarClientImpl(clusterUrl, configuration, this.workerGroup);
+
+                // Share all the IO threads across broker and client connections
+                ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
+                return new PulsarClientImpl(conf, workerGroup);
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index c0e3839..02055ac 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -104,24 +104,20 @@ public abstract class MockedPulsarServiceBaseTest {
 
     protected final void internalSetup() throws Exception {
         init();
-        org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
-        clientConf.setStatsInterval(0, TimeUnit.SECONDS);
         lookupUrl = new URI(brokerUrl.toString());
         if (isTcpLookup) {
             lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT);
         }
-        pulsarClient = PulsarClient.create(lookupUrl.toString(), clientConf);
+        pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).statsInterval(0, TimeUnit.SECONDS).build();
     }
 
     protected final void internalSetupForStatsTest() throws Exception {
         init();
-        org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
-        clientConf.setStatsInterval(1, TimeUnit.SECONDS);
         String lookupUrl = brokerUrl.toString();
         if (isTcpLookup) {
             lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
         }
-        pulsarClient = PulsarClient.create(lookupUrl, clientConf);
+        pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).statsInterval(1, TimeUnit.SECONDS).build();
     }
 
     protected final void init() throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java
index fafc541..c3096ec 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java
@@ -64,7 +64,7 @@ public class ConsumeBaseExceptionTest extends ProducerConsumerBase {
     public void testListener() throws PulsarClientException {
         Consumer consumer = null;
         ConsumerConfiguration conf = new ConsumerConfiguration();
-        conf.setMessageListener((Consumer c, Message msg) -> {
+        conf.setMessageListener((Consumer<byte[]> c, Message<byte[]> msg) -> {
         });
         consumer = pulsarClient.subscribe("persistent://prop/cluster/ns/topicName", "my-subscription", conf);
         Assert.assertTrue(consumer.receiveAsync().isCompletedExceptionally());
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 31c9921..886da5f 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -45,8 +45,8 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.pulsar.client.api.ClientConfiguration;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageListener;
@@ -62,7 +62,7 @@ import org.apache.pulsar.client.util.ConsumerName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 
-public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListener {
+public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListener<byte[]> {
 
     private static final long serialVersionUID = 1L;
 
@@ -74,7 +74,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     private final String groupId;
     private final boolean isAutoCommit;
 
-    private final ConcurrentMap<TopicPartition, org.apache.pulsar.client.api.Consumer> consumers = new ConcurrentHashMap<>();
+    private final ConcurrentMap<TopicPartition, org.apache.pulsar.client.api.Consumer<byte[]>> consumers = new ConcurrentHashMap<>();
 
     private final Map<TopicPartition, Long> lastReceivedOffset = new ConcurrentHashMap<>();
     private final Map<TopicPartition, OffsetAndMetadata> lastCommittedOffset = new ConcurrentHashMap<>();
@@ -84,10 +84,10 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     private final Properties properties;
 
     private static class QueueItem {
-        final org.apache.pulsar.client.api.Consumer consumer;
-        final Message message;
+        final org.apache.pulsar.client.api.Consumer<byte[]> consumer;
+        final Message<byte[]> message;
 
-        QueueItem(org.apache.pulsar.client.api.Consumer consumer, Message message) {
+        QueueItem(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> message) {
             this.consumer = consumer;
             this.message = message;
         }
@@ -146,19 +146,19 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
 
         this.properties = new Properties();
         config.originals().forEach((k, v) -> properties.put(k, v));
-        ClientConfiguration clientConf = PulsarClientKafkaConfig.getClientConfiguration(properties);
+        ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(properties);
         // Since this client instance is going to be used just for the consumers, we can enable Nagle to group
         // all the acknowledgments sent to broker within a short time frame
-        clientConf.setUseTcpNoDelay(false);
+        clientBuilder.enableTcpNoDelay(false);
         try {
-            client = PulsarClient.create(serviceUrl, clientConf);
+            client = clientBuilder.serviceUrl(serviceUrl).build();
         } catch (PulsarClientException e) {
             throw new RuntimeException(e);
         }
     }
 
     @Override
-    public void received(org.apache.pulsar.client.api.Consumer consumer, Message msg) {
+    public void received(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> msg) {
         // Block listener thread if the application is slowing down
         try {
             receivedMessages.put(new QueueItem(consumer, msg));
@@ -204,16 +204,17 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
                 // acknowledgeCumulative()
                 int numberOfPartitions = ((PulsarClientImpl) client).getNumberOfPartitions(topic).get();
 
-                ConsumerConfiguration conf = PulsarConsumerKafkaConfig.getConsumerConfiguration(properties);
-                conf.setSubscriptionType(SubscriptionType.Failover);
-                conf.setMessageListener(this);
+                ConsumerBuilder<byte[]> consumerBuilder = PulsarConsumerKafkaConfig.getConsumerBuilder(client, properties);
+                consumerBuilder.subscriptionType(SubscriptionType.Failover);
+                consumerBuilder.messageListener(this);
+                consumerBuilder.subscriptionName(groupId);
                 if (numberOfPartitions > 1) {
                     // Subscribe to each partition
-                    conf.setConsumerName(ConsumerName.generateRandomName());
+                    consumerBuilder.consumerName(ConsumerName.generateRandomName());
                     for (int i = 0; i < numberOfPartitions; i++) {
                         String partitionName = TopicName.get(topic).getPartition(i).toString();
-                        CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = client
-                                .subscribeAsync(partitionName, groupId, conf);
+                        CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.clone()
+                                .topic(partitionName).subscribeAsync();
                         int partitionIndex = i;
                         TopicPartition tp = new TopicPartition(topic, partitionIndex);
                         future.thenAccept(consumer -> consumers.putIfAbsent(tp, consumer));
@@ -222,8 +223,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
                     }
                 } else {
                     // Topic has a single partition
-                    CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = client.subscribeAsync(topic,
-                            groupId, conf);
+                    CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.topic(topic)
+                            .subscribeAsync();
                     TopicPartition tp = new TopicPartition(topic, 0);
                     future.thenAccept(consumer -> consumers.putIfAbsent(tp, consumer));
                     futures.add(future);
@@ -293,7 +294,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
                 TopicName topicName = TopicName.get(item.consumer.getTopic());
                 String topic = topicName.getPartitionedTopicName();
                 int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
-                Message msg = item.message;
+                Message<byte[]> msg = item.message;
                 MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
                 long offset = MessageIdUtils.getOffset(msgId);
 
@@ -335,7 +336,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     }
 
     @SuppressWarnings("unchecked")
-    private K getKey(String topic, Message msg) {
+    private K getKey(String topic, Message<byte[]> msg) {
         if (!msg.hasKey()) {
             return null;
         }
@@ -393,7 +394,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
         offsets.forEach((topicPartition, offsetAndMetadata) -> {
-            org.apache.pulsar.client.api.Consumer consumer = consumers.get(topicPartition);
+            org.apache.pulsar.client.api.Consumer<byte[]> consumer = consumers.get(topicPartition);
 
             lastCommittedOffset.put(topicPartition, offsetAndMetadata);
             futures.add(consumer.acknowledgeCumulativeAsync(MessageIdUtils.getMessageId(offsetAndMetadata.offset())));
@@ -415,7 +416,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     @Override
     public void seek(TopicPartition partition, long offset) {
         MessageId msgId = MessageIdUtils.getMessageId(offset);
-        org.apache.pulsar.client.api.Consumer c = consumers.get(partition);
+        org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(partition);
         if (c == null) {
             throw new IllegalArgumentException("Cannot seek on a partition where we are not subscribed");
         }
@@ -436,7 +437,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
         }
 
         for (TopicPartition tp : partitions) {
-            org.apache.pulsar.client.api.Consumer c = consumers.get(tp);
+            org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
             if (c == null) {
                 futures.add(FutureUtil.failedFuture(
                         new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
@@ -457,7 +458,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
         }
 
         for (TopicPartition tp : partitions) {
-            org.apache.pulsar.client.api.Consumer c = consumers.get(tp);
+            org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
             if (c == null) {
                 futures.add(FutureUtil.failedFuture(
                         new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 7b8bf9a..ae69c85 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -38,12 +38,11 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -54,9 +53,9 @@ import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
 public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
 
     private final PulsarClient client;
-    private final ProducerConfiguration pulsarProducerConf;
+    private final ProducerBuilder<byte[]> pulsarProducerBuilder;
 
-    private final ConcurrentMap<String, org.apache.pulsar.client.api.Producer> producers = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, org.apache.pulsar.client.api.Producer<byte[]>> producers = new ConcurrentHashMap<>();
 
     private final Serializer<K> keySerializer;
     private final Serializer<V> valueSerializer;
@@ -107,30 +106,29 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
         }
 
         String serviceUrl = producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
-        ClientConfiguration clientConf = PulsarClientKafkaConfig.getClientConfiguration(properties);
         try {
-            client = PulsarClient.create(serviceUrl, clientConf);
+            client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(serviceUrl).build();
         } catch (PulsarClientException e) {
             throw new RuntimeException(e);
         }
 
-        pulsarProducerConf = PulsarProducerKafkaConfig.getProducerConfiguration(properties);
+        pulsarProducerBuilder = PulsarProducerKafkaConfig.getProducerBuilder(client, properties);
 
         // To mimic the same batching mode as Kafka, we need to wait a very little amount of
         // time to batch if the client is trying to send messages fast enough
         long lingerMs = Long.parseLong(properties.getProperty(ProducerConfig.LINGER_MS_CONFIG, "1"));
-        pulsarProducerConf.setBatchingMaxPublishDelay(lingerMs, TimeUnit.MILLISECONDS);
+        pulsarProducerBuilder.batchingMaxPublishDelay(lingerMs, TimeUnit.MILLISECONDS);
 
         String compressionType = properties.getProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG);
         if ("gzip".equals(compressionType)) {
-            pulsarProducerConf.setCompressionType(CompressionType.ZLIB);
+            pulsarProducerBuilder.compressionType(CompressionType.ZLIB);
         } else if ("lz4".equals(compressionType)) {
-            pulsarProducerConf.setCompressionType(CompressionType.LZ4);
+            pulsarProducerBuilder.compressionType(CompressionType.LZ4);
         }
 
-        pulsarProducerConf.setSendTimeout(
-                Integer.parseInt(properties.getProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000")),
-                TimeUnit.MILLISECONDS);
+
+        int sendTimeoutMillis = Integer.parseInt(properties.getProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000"));
+        pulsarProducerBuilder.sendTimeout(sendTimeoutMillis, TimeUnit.MILLISECONDS);
 
         boolean blockOnBufferFull = Boolean
                 .parseBoolean(properties.getProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false"));
@@ -138,8 +136,8 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
         // Kafka blocking semantic when blockOnBufferFull=false is different from Pulsar client
         // Pulsar throws error immediately when the queue is full and blockIfQueueFull=false
         // Kafka, on the other hand, still blocks for "max.block.ms" time and then gives error.
-        boolean shouldBlockPulsarProducer = pulsarProducerConf.getSendTimeoutMs() > 0 || blockOnBufferFull;
-        pulsarProducerConf.setBlockIfQueueFull(shouldBlockPulsarProducer);
+        boolean shouldBlockPulsarProducer = sendTimeoutMillis > 0 || blockOnBufferFull;
+        pulsarProducerBuilder.blockIfQueueFull(shouldBlockPulsarProducer);
     }
 
     @Override
@@ -149,7 +147,7 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
 
     @Override
     public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
-        org.apache.pulsar.client.api.Producer producer;
+        org.apache.pulsar.client.api.Producer<byte[]> producer;
 
         try {
             producer = producers.computeIfAbsent(record.topic(), topic -> createNewProducer(topic));
@@ -162,7 +160,7 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
             return future;
         }
 
-        Message msg = getMessage(record);
+        Message<byte[]> msg = getMessage(record);
         int messageSize = msg.getData().length;
 
         CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
@@ -225,20 +223,20 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
         }
     }
 
-    private org.apache.pulsar.client.api.Producer createNewProducer(String topic) {
+    private org.apache.pulsar.client.api.Producer<byte[]> createNewProducer(String topic) {
         try {
-            return client.createProducer(topic, pulsarProducerConf);
+            return pulsarProducerBuilder.clone().topic(topic).create();
         } catch (PulsarClientException e) {
             throw new RuntimeException(e);
         }
     }
 
-    private Message getMessage(ProducerRecord<K, V> record) {
+    private Message<byte[]> getMessage(ProducerRecord<K, V> record) {
         if (record.partition() != null) {
             throw new UnsupportedOperationException("");
         }
 
-        MessageBuilder builder = MessageBuilder.create();
+        MessageBuilder<byte[]> builder = MessageBuilder.create();
         if (record.key() != null) {
             builder.setKey(getKey(record.topic(), record.key()));
         }
@@ -259,7 +257,7 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
         }
     }
 
-    private RecordMetadata getRecordMetadata(String topic, Message msg, MessageId messageId, int size) {
+    private RecordMetadata getRecordMetadata(String topic, Message<byte[]> msg, MessageId messageId, int size) {
         MessageIdImpl msgId = (MessageIdImpl) messageId;
 
         // Combine ledger id and entry id to form offset
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
index d9ce75e..ca57f5b 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
@@ -22,7 +22,8 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
 
 public class PulsarClientKafkaConfig {
 
@@ -31,6 +32,7 @@ public class PulsarClientKafkaConfig {
     public static final String USE_TLS = "pulsar.use.tls";
     public static final String TLS_TRUST_CERTS_FILE_PATH = "pulsar.tls.trust.certs.file.path";
     public static final String TLS_ALLOW_INSECURE_CONNECTION = "pulsar.tls.allow.insecure.connection";
+    public static final String TLS_HOSTNAME_VERIFICATION = "pulsar.tls.hostname.verification";
 
     public static final String OPERATION_TIMEOUT_MS = "pulsar.operation.timeout.ms";
     public static final String STATS_INTERVAL_SECONDS = "pulsar.stats.interval.seconds";
@@ -43,8 +45,8 @@ public class PulsarClientKafkaConfig {
     public static final String CONCURRENT_LOOKUP_REQUESTS = "pulsar.concurrent.lookup.requests";
     public static final String MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION = "pulsar.max.number.rejected.request.per.connection";
 
-    public static ClientConfiguration getClientConfiguration(Properties properties) {
-        ClientConfiguration conf = new ClientConfiguration();
+    public static ClientBuilder getClientBuilder(Properties properties) {
+        ClientBuilder clientBuilder = PulsarClient.builder();
 
         if (properties.containsKey(AUTHENTICATION_CLASS)) {
             String className = properties.getProperty(AUTHENTICATION_CLASS);
@@ -52,48 +54,53 @@ public class PulsarClientKafkaConfig {
                 @SuppressWarnings("unchecked")
                 Class<Authentication> clazz = (Class<Authentication>) Class.forName(className);
                 Authentication auth = clazz.newInstance();
-                conf.setAuthentication(auth);
+                clientBuilder.authentication(auth);
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
         }
 
-        conf.setUseTls(Boolean.parseBoolean(properties.getProperty(USE_TLS, "false")));
-        conf.setUseTls(Boolean.parseBoolean(properties.getProperty(TLS_ALLOW_INSECURE_CONNECTION, "false")));
+        clientBuilder.enableTls(Boolean.parseBoolean(properties.getProperty(USE_TLS, "false")));
+        clientBuilder.allowTlsInsecureConnection(
+                Boolean.parseBoolean(properties.getProperty(TLS_ALLOW_INSECURE_CONNECTION, "false")));
+        clientBuilder.enableTlsHostnameVerification(
+                Boolean.parseBoolean(properties.getProperty(TLS_HOSTNAME_VERIFICATION, "false")));
+
         if (properties.containsKey(TLS_TRUST_CERTS_FILE_PATH)) {
-            conf.setTlsTrustCertsFilePath(properties.getProperty(TLS_TRUST_CERTS_FILE_PATH));
+            clientBuilder.tlsTrustCertsFilePath(properties.getProperty(TLS_TRUST_CERTS_FILE_PATH));
         }
 
         if (properties.containsKey(OPERATION_TIMEOUT_MS)) {
-            conf.setOperationTimeout(Integer.parseInt(properties.getProperty(OPERATION_TIMEOUT_MS)),
+            clientBuilder.operationTimeout(Integer.parseInt(properties.getProperty(OPERATION_TIMEOUT_MS)),
                     TimeUnit.MILLISECONDS);
         }
 
         if (properties.containsKey(STATS_INTERVAL_SECONDS)) {
-            conf.setStatsInterval(Integer.parseInt(properties.getProperty(STATS_INTERVAL_SECONDS)), TimeUnit.SECONDS);
+            clientBuilder.statsInterval(Integer.parseInt(properties.getProperty(STATS_INTERVAL_SECONDS)),
+                    TimeUnit.SECONDS);
         }
 
         if (properties.containsKey(NUM_IO_THREADS)) {
-            conf.setIoThreads(Integer.parseInt(properties.getProperty(NUM_IO_THREADS)));
+            clientBuilder.ioThreads(Integer.parseInt(properties.getProperty(NUM_IO_THREADS)));
         }
 
         if (properties.containsKey(CONNECTIONS_PER_BROKER)) {
-            conf.setConnectionsPerBroker(Integer.parseInt(properties.getProperty(CONNECTIONS_PER_BROKER)));
+            clientBuilder.connectionsPerBroker(Integer.parseInt(properties.getProperty(CONNECTIONS_PER_BROKER)));
         }
 
         if (properties.containsKey(USE_TCP_NODELAY)) {
-            conf.setUseTcpNoDelay(Boolean.parseBoolean(properties.getProperty(USE_TCP_NODELAY)));
+            clientBuilder.enableTcpNoDelay(Boolean.parseBoolean(properties.getProperty(USE_TCP_NODELAY)));
         }
 
         if (properties.containsKey(CONCURRENT_LOOKUP_REQUESTS)) {
-            conf.setConcurrentLookupRequest(Integer.parseInt(properties.getProperty(CONCURRENT_LOOKUP_REQUESTS)));
+            clientBuilder.maxConcurrentLookupRequests(Integer.parseInt(properties.getProperty(CONCURRENT_LOOKUP_REQUESTS)));
         }
 
         if (properties.containsKey(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)) {
-            conf.setMaxNumberOfRejectedRequestPerConnection(
+            clientBuilder.maxNumberOfRejectedRequestPerConnection(
                     Integer.parseInt(properties.getProperty(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)));
         }
 
-        return conf;
+        return clientBuilder;
     }
 }
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
index f91c484..4addfb7 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
@@ -20,7 +20,8 @@ package org.apache.pulsar.client.kafka.compat;
 
 import java.util.Properties;
 
-import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
 
 public class PulsarConsumerKafkaConfig {
 
@@ -29,22 +30,22 @@ public class PulsarConsumerKafkaConfig {
     public static final String RECEIVER_QUEUE_SIZE = "pulsar.consumer.receiver.queue.size";
     public static final String TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS = "pulsar.consumer.total.receiver.queue.size.across.partitions";
 
-    public static ConsumerConfiguration getConsumerConfiguration(Properties properties) {
-        ConsumerConfiguration conf = new ConsumerConfiguration();
+    public static ConsumerBuilder<byte[]> getConsumerBuilder(PulsarClient client, Properties properties) {
+        ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer();
 
         if (properties.containsKey(CONSUMER_NAME)) {
-            conf.setConsumerName(properties.getProperty(CONSUMER_NAME));
+            consumerBuilder.consumerName(properties.getProperty(CONSUMER_NAME));
         }
 
         if (properties.containsKey(RECEIVER_QUEUE_SIZE)) {
-            conf.setReceiverQueueSize(Integer.parseInt(properties.getProperty(RECEIVER_QUEUE_SIZE)));
+            consumerBuilder.receiverQueueSize(Integer.parseInt(properties.getProperty(RECEIVER_QUEUE_SIZE)));
         }
 
         if (properties.containsKey(TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS)) {
-            conf.setMaxTotalReceiverQueueSizeAcrossPartitions(
+            consumerBuilder.maxTotalReceiverQueueSizeAcrossPartitions(
                     Integer.parseInt(properties.getProperty(TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS)));
         }
 
-        return conf;
+        return consumerBuilder;
     }
 }
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
index c2e4886..5a9a651 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
@@ -20,7 +20,8 @@ package org.apache.pulsar.client.kafka.compat;
 
 import java.util.Properties;
 
-import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
 
 public class PulsarProducerKafkaConfig {
 
@@ -33,32 +34,32 @@ public class PulsarProducerKafkaConfig {
     public static final String BATCHING_ENABLED = "pulsar.producer.batching.enabled";
     public static final String BATCHING_MAX_MESSAGES = "pulsar.producer.batching.max.messages";
 
-    public static ProducerConfiguration getProducerConfiguration(Properties properties) {
-        ProducerConfiguration conf = new ProducerConfiguration();
+    public static ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client, Properties properties) {
+        ProducerBuilder<byte[]> producerBuilder = client.newProducer();
 
         if (properties.containsKey(PRODUCER_NAME)) {
-            conf.setProducerName(properties.getProperty(PRODUCER_NAME));
+            producerBuilder.producerName(properties.getProperty(PRODUCER_NAME));
         }
 
         if (properties.containsKey(INITIAL_SEQUENCE_ID)) {
-            conf.setInitialSequenceId(Long.parseLong(properties.getProperty(INITIAL_SEQUENCE_ID)));
+            producerBuilder.initialSequenceId(Long.parseLong(properties.getProperty(INITIAL_SEQUENCE_ID)));
         }
 
         if (properties.containsKey(MAX_PENDING_MESSAGES)) {
-            conf.setMaxPendingMessages(Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES)));
+            producerBuilder.maxPendingMessages(Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES)));
         }
 
         if (properties.containsKey(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)) {
-            conf.setMaxPendingMessagesAcrossPartitions(
+            producerBuilder.maxPendingMessagesAcrossPartitions(
                     Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)));
         }
 
-        conf.setBatchingEnabled(Boolean.parseBoolean(properties.getProperty(BATCHING_ENABLED, "true")));
+        producerBuilder.enableBatching(Boolean.parseBoolean(properties.getProperty(BATCHING_ENABLED, "true")));
 
         if (properties.containsKey(BATCHING_MAX_MESSAGES)) {
-            conf.setBatchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES)));
+            producerBuilder.batchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES)));
         }
 
-        return conf;
+        return producerBuilder;
     }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index d9dcb1c..e4457df 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -100,7 +100,7 @@ public interface ConsumerBuilder<T> extends Serializable, Cloneable {
      *
      * @param topicsPattern
      */
-    ConsumerBuilder topicsPattern(Pattern topicsPattern);
+    ConsumerBuilder<T> topicsPattern(Pattern topicsPattern);
 
     /**
      * Specify a pattern for topics that this consumer will subscribe on.
@@ -111,7 +111,7 @@ public interface ConsumerBuilder<T> extends Serializable, Cloneable {
      * @param topicsPattern
      *            given regular expression for topics pattern
      */
-    ConsumerBuilder topicsPattern(String topicsPattern);
+    ConsumerBuilder<T> topicsPattern(String topicsPattern);
 
     /**
      * Specify the subscription name for this consumer.
@@ -238,7 +238,7 @@ public interface ConsumerBuilder<T> extends Serializable, Cloneable {
      * @param periodInMinutes
      *            whether to read from the compacted topic
      */
-    ConsumerBuilder patternAutoDiscoveryPeriod(int periodInMinutes);
+    ConsumerBuilder<T> patternAutoDiscoveryPeriod(int periodInMinutes);
 
     /**
      * Sets priority level for the shared subscription consumers to which broker gives more priority while dispatching
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
index 7761b9d..c526af5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
@@ -95,7 +95,7 @@ public class ConsumerConfiguration implements Serializable {
     /**
      * @return the configured {@link MessageListener} for the consumer
      */
-    public MessageListener getMessageListener() {
+    public MessageListener<byte[]> getMessageListener() {
         return conf.getMessageListener();
     }
 
@@ -108,7 +108,7 @@ public class ConsumerConfiguration implements Serializable {
      * @param messageListener
      *            the listener object
      */
-    public ConsumerConfiguration setMessageListener(MessageListener messageListener) {
+    public ConsumerConfiguration setMessageListener(MessageListener<byte[]> messageListener) {
         checkNotNull(messageListener);
         conf.setMessageListener(messageListener);
         return this;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java
index aeb8bbb..e2e6274 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java
@@ -26,11 +26,11 @@ public interface ConsumerEventListener {
     /**
      * Notified when the consumer group is changed, and the consumer becomes the active consumer.
      */
-    void becameActive(Consumer consumer, int partitionId);
+    void becameActive(Consumer<?> consumer, int partitionId);
 
     /**
      * Notified when the consumer group is changed, and the consumer is still inactive or becomes inactive.
      */
-    void becameInactive(Consumer consumer, int partitionId);
+    void becameInactive(Consumer<?> consumer, int partitionId);
 
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java
index a9cef6f..bc2b915 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java
@@ -30,7 +30,7 @@ public interface MessageRouter extends Serializable {
      * @deprecated since 1.22.0. Please use {@link #choosePartition(Message, TopicMetadata)} instead.
      */
     @Deprecated
-    default int choosePartition(Message msg) {
+    default int choosePartition(Message<?> msg) {
         throw new UnsupportedOperationException("Use #choosePartition(Message, TopicMetadata) instead");
     }
 
@@ -42,7 +42,7 @@ public interface MessageRouter extends Serializable {
      * @return the partition to route the message.
      * @since 1.22.0
      */
-    default int choosePartition(Message msg, TopicMetadata metadata) {
+    default int choosePartition(Message<?> msg, TopicMetadata metadata) {
         return choosePartition(msg);
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index a037cfe..16dcd00 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -262,7 +262,7 @@ public interface ProducerBuilder<T> extends Serializable, Cloneable {
      *            maximum number of messages in a batch
      * @return
      */
-    ProducerBuilder batchingMaxMessages(int batchMessagesMaxMessagesPerBatch);
+    ProducerBuilder<T> batchingMaxMessages(int batchMessagesMaxMessagesPerBatch);
 
     /**
      * Set the baseline for the sequence ids for messages published by the producer.
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
index c81b2eb..f40a8a4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
@@ -99,7 +99,7 @@ public interface ReaderBuilder<T> extends Serializable, Cloneable {
      * @param readerListener
      *            the listener object
      */
-    ReaderBuilder<T> readerListener(ReaderListener readerListener);
+    ReaderBuilder<T> readerListener(ReaderListener<T> readerListener);
 
     /**
      * Sets a {@link CryptoKeyReader}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
index d329d09..a97e524 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
@@ -53,7 +53,7 @@ class BatchMessageContainer {
     // sequence id for this batch which will be persisted as a single entry by broker
     long sequenceId = -1;
     ByteBuf batchedMessageMetadataAndPayload;
-    List<MessageImpl> messages = Lists.newArrayList();
+    List<MessageImpl<?>> messages = Lists.newArrayList();
     // keep track of callbacks for individual messages being published in a batch
     SendCallback firstCallback;
 
@@ -73,13 +73,13 @@ class BatchMessageContainer {
         this.producerName = producerName;
     }
 
-    boolean hasSpaceInBatch(MessageImpl msg) {
+    boolean hasSpaceInBatch(MessageImpl<?> msg) {
         int messageSize = msg.getDataBuffer().readableBytes();
         return ((messageSize + currentBatchSizeBytes) <= MAX_MESSAGE_BATCH_SIZE_BYTES
                 && numMessagesInBatch < maxNumMessagesInBatch);
     }
 
-    void add(MessageImpl msg, SendCallback callback) {
+    void add(MessageImpl<?> msg, SendCallback callback) {
 
         if (log.isDebugEnabled()) {
             log.debug("[{}] [{}] add message to batch, num messages in batch so far {}", topicName, producerName,
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 4cffb7f..3effc7f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -154,4 +154,8 @@ public class ClientBuilderImpl implements ClientBuilder {
         conf.setMaxNumberOfRejectedRequestPerConnection(maxNumberOfRejectedRequestPerConnection);
         return this;
     }
+
+    public ClientConfigurationData getClientConfigurationData() {
+        return conf;
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 6c2d758..37904fd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -86,8 +86,8 @@ public class ClientCnx extends PulsarHandler {
     private final ConcurrentLongHashMap<CompletableFuture<List<String>>> pendingGetTopicsRequests = new ConcurrentLongHashMap<>(
         16, 1);
 
-    private final ConcurrentLongHashMap<ProducerImpl> producers = new ConcurrentLongHashMap<>(16, 1);
-    private final ConcurrentLongHashMap<ConsumerImpl> consumers = new ConcurrentLongHashMap<>(16, 1);
+    private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new ConcurrentLongHashMap<>(16, 1);
+    private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = new ConcurrentLongHashMap<>(16, 1);
 
     private final CompletableFuture<Void> connectionFuture = new CompletableFuture<Void>();
     private final Semaphore pendingLookupRequestSemaphore;
@@ -253,7 +253,7 @@ public class ClientCnx extends PulsarHandler {
         if (log.isDebugEnabled()) {
             log.debug("{} Received a message from the server: {}", ctx.channel(), cmdMessage);
         }
-        ConsumerImpl consumer = consumers.get(cmdMessage.getConsumerId());
+        ConsumerImpl<?> consumer = consumers.get(cmdMessage.getConsumerId());
         if (consumer != null) {
             consumer.messageReceived(cmdMessage.getMessageId(), headersAndPayload, this);
         }
@@ -266,7 +266,7 @@ public class ClientCnx extends PulsarHandler {
         if (log.isDebugEnabled()) {
             log.debug("{} Received a consumer group change message from the server : {}", ctx.channel(), change);
         }
-        ConsumerImpl consumer = consumers.get(change.getConsumerId());
+        ConsumerImpl<?> consumer = consumers.get(change.getConsumerId());
         if (consumer != null) {
             consumer.activeConsumerChanged(change.getIsActive());
         }
@@ -398,7 +398,7 @@ public class ClientCnx extends PulsarHandler {
 
         log.info("[{}] Broker notification reached the end of topic: {}", remoteAddress, consumerId);
 
-        ConsumerImpl consumer = consumers.get(consumerId);
+        ConsumerImpl<?> consumer = consumers.get(consumerId);
         if (consumer != null) {
             consumer.setTerminated();
         }
@@ -472,7 +472,7 @@ public class ClientCnx extends PulsarHandler {
     protected void handleCloseProducer(CommandCloseProducer closeProducer) {
         log.info("[{}] Broker notification of Closed producer: {}", remoteAddress, closeProducer.getProducerId());
         final long producerId = closeProducer.getProducerId();
-        ProducerImpl producer = producers.get(producerId);
+        ProducerImpl<?> producer = producers.get(producerId);
         if (producer != null) {
             producer.connectionClosed(this);
         } else {
@@ -484,7 +484,7 @@ public class ClientCnx extends PulsarHandler {
     protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
         log.info("[{}] Broker notification of Closed consumer: {}", remoteAddress, closeConsumer.getConsumerId());
         final long consumerId = closeConsumer.getConsumerId();
-        ConsumerImpl consumer = consumers.get(consumerId);
+        ConsumerImpl<?> consumer = consumers.get(consumerId);
         if (consumer != null) {
             consumer.connectionClosed(this);
         } else {
@@ -666,11 +666,11 @@ public class ClientCnx extends PulsarHandler {
         return false;
     }
 
-    void registerConsumer(final long consumerId, final ConsumerImpl consumer) {
+    void registerConsumer(final long consumerId, final ConsumerImpl<?> consumer) {
         consumers.put(consumerId, consumer);
     }
 
-    void registerProducer(final long producerId, final ProducerImpl producer) {
+    void registerProducer(final long producerId, final ProducerImpl<?> producer) {
         producers.put(producerId, producer);
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index af6b732..f51b1b3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -50,7 +50,7 @@ public abstract class ConsumerBase<T> extends HandlerBase implements Consumer<T>
     }
 
     protected final String subscription;
-    protected final ConsumerConfigurationData conf;
+    protected final ConsumerConfigurationData<T> conf;
     protected final String consumerName;
     protected final CompletableFuture<Consumer<T>> subscribeFuture;
     protected final MessageListener<T> listener;
@@ -168,7 +168,7 @@ public abstract class ConsumerBase<T> extends HandlerBase implements Consumer<T>
     abstract protected Message<T> internalReceive(int timeout, TimeUnit unit) throws PulsarClientException;
 
     @Override
-    public void acknowledge(Message message) throws PulsarClientException {
+    public void acknowledge(Message<?> message) throws PulsarClientException {
         try {
             acknowledge(message.getMessageId());
         } catch (NullPointerException npe) {
@@ -194,7 +194,7 @@ public abstract class ConsumerBase<T> extends HandlerBase implements Consumer<T>
     }
 
     @Override
-    public void acknowledgeCumulative(Message message) throws PulsarClientException {
+    public void acknowledgeCumulative(Message<?> message) throws PulsarClientException {
         try {
             acknowledgeCumulative(message.getMessageId());
         } catch (NullPointerException npe) {
@@ -220,7 +220,7 @@ public abstract class ConsumerBase<T> extends HandlerBase implements Consumer<T>
     }
 
     @Override
-    public CompletableFuture<Void> acknowledgeAsync(Message message) {
+    public CompletableFuture<Void> acknowledgeAsync(Message<?> message) {
         try {
             return acknowledgeAsync(message.getMessageId());
         } catch (NullPointerException npe) {
@@ -229,7 +229,7 @@ public abstract class ConsumerBase<T> extends HandlerBase implements Consumer<T>
     }
 
     @Override
-    public CompletableFuture<Void> acknowledgeCumulativeAsync(Message message) {
+    public CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message) {
         try {
             return acknowledgeCumulativeAsync(message.getMessageId());
         } catch (NullPointerException npe) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index e7163c9..d8507bb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -206,9 +206,8 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
     }
 
     @Override
-    public ConsumerBuilder patternAutoDiscoveryPeriod(int periodInMinutes) {
+    public ConsumerBuilder<T> patternAutoDiscoveryPeriod(int periodInMinutes) {
         conf.setPatternAutoDiscoveryPeriod(periodInMinutes);
         return this;
     }
-
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 8499565..183651a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -82,6 +82,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> {
 
     // Number of messages that have delivered to the application. Every once in a while, this number will be sent to the
     // broker to notify that we are ready to get (and store in the incoming messages queue) more messages
+    @SuppressWarnings("rawtypes")
     private static final AtomicIntegerFieldUpdater<ConsumerImpl> AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater
             .newUpdater(ConsumerImpl.class, "availablePermits");
     @SuppressWarnings("unused")
@@ -288,7 +289,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> {
             do {
                 message = incomingMessages.take();
                 lastDequeuedMessage = message.getMessageId();
-                ClientCnx msgCnx = ((MessageImpl) message).getCnx();
+                ClientCnx msgCnx = ((MessageImpl<?>) message).getCnx();
                 // synchronized need to prevent race between connectionOpened and the check "msgCnx == cnx()"
                 synchronized (ConsumerImpl.this) {
                     // if message received due to an old flow - discard it and wait for the message from the
@@ -631,7 +632,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> {
      * not seen by the application
      */
     private BatchMessageIdImpl clearReceiverQueue() {
-        List<Message> currentMessageQueue = new ArrayList<>(incomingMessages.size());
+        List<Message<?>> currentMessageQueue = new ArrayList<>(incomingMessages.size());
         incomingMessages.drainTo(currentMessageQueue);
         if (!currentMessageQueue.isEmpty()) {
             MessageIdImpl nextMessageInQueue = (MessageIdImpl) currentMessageQueue.get(0).getMessageId();
@@ -984,9 +985,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> {
      *
      * Periodically, it sends a Flow command to notify the broker that it can push more messages
      */
-    protected synchronized void messageProcessed(Message msg) {
+    protected synchronized void messageProcessed(Message<?> msg) {
         ClientCnx currentCnx = cnx();
-        ClientCnx msgCnx = ((MessageImpl) msg).getCnx();
+        ClientCnx msgCnx = ((MessageImpl<?>) msg).getCnx();
         lastDequeuedMessage = msg.getMessageId();
 
         if (msgCnx != currentCnx) {
@@ -1371,7 +1372,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> {
         }
     }
 
-    private MessageIdImpl getMessageIdImpl(Message msg) {
+    private MessageIdImpl getMessageIdImpl(Message<?> msg) {
         MessageIdImpl messageId = (MessageIdImpl) msg.getMessageId();
         if (messageId instanceof BatchMessageIdImpl) {
             // messageIds contain MessageIdImpl, not BatchMessageIdImpl
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java
index 3ef26c8..21ff3c4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java
@@ -41,7 +41,7 @@ public class ConsumerStats implements Serializable {
     private static final long serialVersionUID = 1L;
     private TimerTask stat;
     private Timeout statTimeout;
-    private ConsumerImpl consumer;
+    private ConsumerImpl<?> consumer;
     private PulsarClientImpl pulsarClient;
     private long oldTime;
     private long statsIntervalSeconds;
@@ -74,7 +74,7 @@ public class ConsumerStats implements Serializable {
         throughputFormat = null;
     }
 
-    public ConsumerStats(PulsarClientImpl pulsarClient, ConsumerConfigurationData conf, ConsumerImpl consumer) {
+    public ConsumerStats(PulsarClientImpl pulsarClient, ConsumerConfigurationData<?> conf, ConsumerImpl<?> consumer) {
         this.pulsarClient = pulsarClient;
         this.consumer = consumer;
         this.statsIntervalSeconds = pulsarClient.getConfiguration().getStatsIntervalSeconds();
@@ -92,7 +92,7 @@ public class ConsumerStats implements Serializable {
         init(conf);
     }
 
-    private void init(ConsumerConfigurationData conf) {
+    private void init(ConsumerConfigurationData<?> conf) {
         ObjectMapper m = new ObjectMapper();
         m.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
         ObjectWriter w = m.writerWithDefaultPrettyPrinter();
@@ -149,7 +149,7 @@ public class ConsumerStats implements Serializable {
         statTimeout = pulsarClient.timer().newTimeout(stat, statsIntervalSeconds, TimeUnit.SECONDS);
     }
 
-    void updateNumMsgsReceived(Message message) {
+    void updateNumMsgsReceived(Message<?> message) {
         if (message != null) {
             numMsgsReceived.increment();
             numBytesReceived.add(message.getData().length);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
index fdcaf63..08b189c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
@@ -24,7 +24,7 @@ public class ConsumerStatsDisabled extends ConsumerStats {
     private static final long serialVersionUID = 1L;
 
     @Override
-    void updateNumMsgsReceived(Message message) {
+    void updateNumMsgsReceived(Message<?> message) {
         // Do nothing
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index f4585ce..8fcc40a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -56,7 +56,8 @@ public class MessageImpl<T> implements Message<T> {
 
     // Constructor for out-going message
     static <T> MessageImpl<T> create(MessageMetadata.Builder msgMetadataBuilder, ByteBuffer payload, Schema<T> schema) {
-        MessageImpl<T> msg = RECYCLER.get();
+        @SuppressWarnings("unchecked")
+        MessageImpl<T> msg = (MessageImpl<T>) RECYCLER.get();
         msg.msgMetadataBuilder = msgMetadataBuilder;
         msg.messageId = null;
         msg.cnx = null;
@@ -67,7 +68,8 @@ public class MessageImpl<T> implements Message<T> {
     }
 
     static MessageImpl<byte[]> create(MessageMetadata.Builder msgMetadataBuilder, ByteBuffer payload) {
-        MessageImpl<byte[]> msg = RECYCLER.get();
+        @SuppressWarnings("unchecked")
+        MessageImpl<byte[]> msg = (MessageImpl<byte[]>) RECYCLER.get();
         msg.msgMetadataBuilder = msgMetadataBuilder;
         msg.messageId = null;
         msg.cnx = null;
@@ -141,8 +143,9 @@ public class MessageImpl<T> implements Message<T> {
         this.properties = Collections.unmodifiableMap(properties);
     }
 
-    public static MessageImpl deserialize(ByteBuf headersAndPayload) throws IOException {
-        MessageImpl msg = RECYCLER.get();
+    public static MessageImpl<byte[]> deserialize(ByteBuf headersAndPayload) throws IOException {
+        @SuppressWarnings("unchecked")
+        MessageImpl<byte[]> msg = (MessageImpl<byte[]>) RECYCLER.get();
         MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
 
         msg.msgMetadataBuilder = MessageMetadata.newBuilder(msgMetadata);
@@ -287,16 +290,16 @@ public class MessageImpl<T> implements Message<T> {
         }
     }
 
-    private MessageImpl(Handle<MessageImpl> recyclerHandle) {
+    private MessageImpl(Handle<MessageImpl<?>> recyclerHandle) {
         this.recyclerHandle = recyclerHandle;
     }
 
-    private Handle<MessageImpl> recyclerHandle;
+    private Handle<MessageImpl<?>> recyclerHandle;
 
-    private final static Recycler<MessageImpl> RECYCLER = new Recycler<MessageImpl>() {
+    private final static Recycler<MessageImpl<?>> RECYCLER = new Recycler<MessageImpl<?>>() {
         @Override
-        protected MessageImpl newObject(Handle<MessageImpl> handle) {
-            return new MessageImpl(handle);
+        protected MessageImpl<?> newObject(Handle<MessageImpl<?>> handle) {
+            return new MessageImpl<>(handle);
         }
     };
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index f662ccf..df6894c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.google.common.collect.Lists;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -34,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
+
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -47,13 +47,15 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
 public class PartitionedConsumerImpl<T> extends ConsumerBase<T> {
 
     private final List<ConsumerImpl<T>> consumers;
 
     // Queue of partition consumers on which we have stopped calling receiveAsync() because the
     // shared incoming queue was full
-    private final ConcurrentLinkedQueue<ConsumerImpl> pausedConsumers;
+    private final ConcurrentLinkedQueue<ConsumerImpl<T>> pausedConsumers;
 
     // Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to
     // resume receiving from the paused consumer partitions
@@ -165,7 +167,7 @@ public class PartitionedConsumerImpl<T> extends ConsumerBase<T> {
         try {
             if (incomingMessages.size() <= sharedQueueResumeThreshold && !pausedConsumers.isEmpty()) {
                 while (true) {
-                    ConsumerImpl consumer = pausedConsumers.poll();
+                    ConsumerImpl<T> consumer = pausedConsumers.poll();
                     if (consumer == null) {
                         break;
                     }
@@ -360,12 +362,7 @@ public class PartitionedConsumerImpl<T> extends ConsumerBase<T> {
 
     @Override
     public boolean isConnected() {
-        for (ConsumerImpl consumer : consumers) {
-            if (!consumer.isConnected()) {
-                return false;
-            }
-        }
-        return true;
+        return consumers.stream().allMatch(ConsumerImpl::isConnected);
     }
 
     @Override
@@ -442,6 +439,7 @@ public class PartitionedConsumerImpl<T> extends ConsumerBase<T> {
         if (null != conf.getConsumerEventListener()) {
             internalConsumerConfig.setConsumerEventListener(conf.getConsumerEventListener());
         }
+
         int receiverQueueSize = Math.min(conf.getReceiverQueueSize(),
                 conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions);
         internalConsumerConfig.setReceiverQueueSize(receiverQueueSize);
@@ -460,7 +458,7 @@ public class PartitionedConsumerImpl<T> extends ConsumerBase<T> {
     @Override
     public void redeliverUnacknowledgedMessages() {
         synchronized (this) {
-            for (ConsumerImpl c : consumers) {
+            for (ConsumerImpl<T> c : consumers) {
                 c.redeliverUnacknowledgedMessages();
             }
             incomingMessages.clear();
@@ -509,11 +507,7 @@ public class PartitionedConsumerImpl<T> extends ConsumerBase<T> {
      * @return true if all batch messages have been acknowledged
      */
     public boolean isBatchingAckTrackerEmpty() {
-        boolean state = true;
-        for (Consumer consumer : consumers) {
-            state &= ((ConsumerImpl) consumer).isBatchingAckTrackerEmpty();
-        }
-        return state;
+        return consumers.stream().allMatch(ConsumerImpl::isBatchingAckTrackerEmpty);
     }
 
     List<ConsumerImpl<T>> getConsumers() {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index e70ce4b..5281d51 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -161,14 +161,8 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
 
     @Override
     public boolean isConnected() {
-        for (ProducerImpl producer : producers) {
-            // returns false if any of the partition is not connected
-            if (!producer.isConnected()) {
-                return false;
-            }
-        }
-
-        return true;
+        // returns false if any of the partition is not connected
+        return producers.stream().allMatch(ProducerImpl::isConnected);
     }
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index b55f279..15779cd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -104,6 +104,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask {
 
     private final Map<String, String> metadata;
 
+    @SuppressWarnings("rawtypes")
     private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater
             .newUpdater(ProducerImpl.class, "msgIdGenerator");
 
@@ -184,7 +185,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask {
     }
 
     @Override
-    public CompletableFuture<MessageId> sendAsync(Message message) {
+    public CompletableFuture<MessageId> sendAsync(Message<T> message) {
         CompletableFuture<MessageId> future = new CompletableFuture<>();
 
         sendAsync(message, new SendCallback() {
@@ -232,7 +233,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask {
         return future;
     }
 
-    public void sendAsync(Message message, SendCallback callback) {
+    public void sendAsync(Message<T> message, SendCallback callback) {
         checkArgument(message instanceof MessageImpl);
 
         if (!isValidProducerState(callback)) {
@@ -243,7 +244,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask {
             return;
         }
 
-        MessageImpl msg = (MessageImpl) message;
+        MessageImpl<T> msg = (MessageImpl<T>) message;
         MessageMetadata.Builder msgMetadata = msg.getMessageBuilder();
         ByteBuf payload = msg.getDataBuffer();
 
@@ -387,7 +388,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask {
         return Commands.newSend(producerId, sequenceId, numMessages, checksumType, msgMetadata, compressedPayload);
     }
 
-    private void doBatchSendAndAdd(MessageImpl msg, SendCallback callback, ByteBuf payload) {
+    private void doBatchSendAndAdd(MessageImpl<T> msg, SendCallback callback, ByteBuf payload) {
         if (log.isDebugEnabled()) {
             log.debug("[{}] [{}] Closing out batch to accomodate large message with size {}", topic, producerName,
                     msg.getDataBuffer().readableBytes());
@@ -440,12 +441,12 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask {
     }
 
     private static final class WriteInEventLoopCallback implements Runnable {
-        private ProducerImpl producer;
+        private ProducerImpl<?> producer;
         private ByteBufPair cmd;
         private long sequenceId;
         private ClientCnx cnx;
 
-        static WriteInEventLoopCallback create(ProducerImpl producer, ClientCnx cnx, OpSendMsg op) {
+        static WriteInEventLoopCallback create(ProducerImpl<?> producer, ClientCnx cnx, OpSendMsg op) {
             WriteInEventLoopCallback c = RECYCLER.get();
             c.producer = producer;
             c.cnx = cnx;
@@ -735,8 +736,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask {
     }
 
     protected static final class OpSendMsg {
-        MessageImpl msg;
-        List<MessageImpl> msgs;
+        MessageImpl<?> msg;
+        List<MessageImpl<?>> msgs;
         ByteBufPair cmd;
         SendCallback callback;
         long sequenceId;
@@ -744,7 +745,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask {
         long batchSizeByte = 0;
         int numMessagesInBatch = 1;
 
-        static OpSendMsg create(MessageImpl msg, ByteBufPair cmd, long sequenceId, SendCallback callback) {
+        static OpSendMsg create(MessageImpl<?> msg, ByteBufPair cmd, long sequenceId, SendCallback callback) {
             OpSendMsg op = RECYCLER.get();
             op.msg = msg;
             op.cmd = cmd;
@@ -754,7 +755,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask {
             return op;
         }
 
-        static OpSendMsg create(List<MessageImpl> msgs, ByteBufPair cmd, long sequenceId, SendCallback callback) {
+        static OpSendMsg create(List<MessageImpl<?>> msgs, ByteBufPair cmd, long sequenceId, SendCallback callback) {
             OpSendMsg op = RECYCLER.get();
             op.msgs = msgs;
             op.cmd = cmd;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java
index 6a5e321..2628003 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java
@@ -41,7 +41,7 @@ public class ProducerStats implements Serializable {
     private static final long serialVersionUID = 1L;
     private TimerTask stat;
     private Timeout statTimeout;
-    private ProducerImpl producer;
+    private ProducerImpl<?> producer;
     private PulsarClientImpl pulsarClient;
     private long oldTime;
     private long statsIntervalSeconds;
@@ -74,7 +74,7 @@ public class ProducerStats implements Serializable {
         ds = null;
     }
 
-    public ProducerStats(PulsarClientImpl pulsarClient, ProducerConfigurationData conf, ProducerImpl producer) {
+    public ProducerStats(PulsarClientImpl pulsarClient, ProducerConfigurationData conf, ProducerImpl<?> producer) {
         this.pulsarClient = pulsarClient;
         this.statsIntervalSeconds = pulsarClient.getConfiguration().getStatsIntervalSeconds();
         this.producer = producer;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 1741eeb..47e953b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.client.impl;
 
-import static com.google.common.base.Preconditions.checkState;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 
 import com.google.common.collect.Lists;
@@ -256,7 +255,7 @@ public class PulsarClientImpl implements PulsarClient {
                 log.debug("[{}] Received topic metadata. partitions: {}", topic, metadata.partitions);
             }
 
-            ProducerBase producer;
+            ProducerBase<T> producer;
             if (metadata.partitions > 1) {
                 producer = new PartitionedProducerImpl<>(PulsarClientImpl.this, topic, conf, metadata.partitions,
                         producerCreatedFuture, schema);
@@ -437,7 +436,7 @@ public class PulsarClientImpl implements PulsarClient {
 
                 List<String> topicsList = topicsPatternFilter(topics, conf.getTopicsPattern());
                 conf.getTopicNames().addAll(topicsList);
-                ConsumerBase consumer = new PatternTopicsConsumerImpl<>(conf.getTopicsPattern(),
+                ConsumerBase<T> consumer = new PatternTopicsConsumerImpl<>(conf.getTopicsPattern(),
                     PulsarClientImpl.this,
                     conf,
                     externalExecutorProvider.getExecutor(),
@@ -587,12 +586,12 @@ public class PulsarClientImpl implements PulsarClient {
         synchronized (producers) {
             // Copy to a new list, because the closing will trigger a removal from the map
             // and invalidate the iterator
-            List<ProducerBase> producersToClose = Lists.newArrayList(producers.keySet());
+            List<ProducerBase<?>> producersToClose = Lists.newArrayList(producers.keySet());
             producersToClose.forEach(p -> futures.add(p.closeAsync()));
         }
 
         synchronized (consumers) {
-            List<ConsumerBase> consumersToClose = Lists.newArrayList(consumers.keySet());
+            List<ConsumerBase<?>> consumersToClose = Lists.newArrayList(consumers.keySet());
             consumersToClose.forEach(c -> futures.add(c.closeAsync()));
         }
 
@@ -688,13 +687,13 @@ public class PulsarClientImpl implements PulsarClient {
         return EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
     }
 
-    void cleanupProducer(ProducerBase producer) {
+    void cleanupProducer(ProducerBase<?> producer) {
         synchronized (producers) {
             producers.remove(producer);
         }
     }
 
-    void cleanupConsumer(ConsumerBase consumer) {
+    void cleanupConsumer(ConsumerBase<?> consumer) {
         synchronized (consumers) {
             consumers.remove(consumer);
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index bef909d..4ab5f42 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -43,16 +43,17 @@ public class ReaderBuilderImpl<T> implements ReaderBuilder<T> {
     private final Schema<T> schema;
 
     ReaderBuilderImpl(PulsarClientImpl client, Schema<T> schema) {
-        this(client, new ReaderConfigurationData(), schema);
+        this(client, new ReaderConfigurationData<T>(), schema);
     }
 
-    private ReaderBuilderImpl(PulsarClientImpl client, ReaderConfigurationData conf, Schema<T> schema) {
+    private ReaderBuilderImpl(PulsarClientImpl client, ReaderConfigurationData<T> conf, Schema<T> schema) {
         this.client = client;
         this.conf = conf;
         this.schema = schema;
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public ReaderBuilder<T> clone() {
         try {
             return (ReaderBuilder<T>) super.clone();
@@ -106,7 +107,7 @@ public class ReaderBuilderImpl<T> implements ReaderBuilder<T> {
     }
 
     @Override
-    public ReaderBuilder<T> readerListener(ReaderListener readerListener) {
+    public ReaderBuilder<T> readerListener(ReaderListener<T> readerListener) {
         conf.setReaderListener(readerListener);
         return this;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index daf4799..ed374f6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -90,7 +90,7 @@ public class ReaderImpl<T> implements Reader<T> {
         return consumer.getTopic();
     }
 
-    public ConsumerImpl getConsumer() {
+    public ConsumerImpl<T> getConsumer() {
         return consumer;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
index 6b3f937..ce67f5b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
@@ -39,7 +39,7 @@ public class RoundRobinPartitionMessageRouterImpl extends MessageRouterBase {
     }
 
     @Override
-    public int choosePartition(Message msg, TopicMetadata topicMetadata) {
+    public int choosePartition(Message<?> msg, TopicMetadata topicMetadata) {
         // If the message has a key, it supersedes the round robin routing policy
         if (msg.hasKey()) {
             return hash.makeHash(msg.getKey()) % topicMetadata.numPartitions();
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index d95d83c..4337442 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -31,9 +31,10 @@ import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService;
@@ -95,19 +96,20 @@ public class ProxyService implements Closeable {
         this.acceptorGroup  = EventLoopUtil.newEventLoopGroup(1, acceptorThreadFactory);
         this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, workersThreadFactory);
 
-        ClientConfiguration clientConfiguration = new ClientConfiguration();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl(serviceUrl);
         if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
-            clientConfiguration.setAuthentication(proxyConfig.getBrokerClientAuthenticationPlugin(),
-                    proxyConfig.getBrokerClientAuthenticationParameters());
+            clientConf.setAuthentication(AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                    proxyConfig.getBrokerClientAuthenticationParameters()));
         }
         if (proxyConfig.isTlsEnabledWithBroker()) {
-            clientConfiguration.setUseTls(true);
-            clientConfiguration.setTlsTrustCertsFilePath(proxyConfig.getBrokerClientTrustCertsFilePath());
-            clientConfiguration.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection());
+            clientConf.setUseTls(true);
+            clientConf.setTlsTrustCertsFilePath(proxyConfig.getBrokerClientTrustCertsFilePath());
+            clientConf.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection());
         }
 
-        this.client = new PulsarClientImpl(serviceUrl, clientConfiguration, workerGroup);
-        this.clientAuthentication = clientConfiguration.getAuthentication();
+        this.client = new PulsarClientImpl(clientConf, workerGroup);
+        this.clientAuthentication = clientConf.getAuthentication();
     }
 
     public void start() throws Exception {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
index d4b8e4d..f98c48d 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
@@ -25,11 +25,9 @@ import java.util.Set;
 
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
 import org.apache.pulsar.proxy.server.ProxyRolesEnforcementTest.BasicAuthentication;
@@ -142,8 +140,7 @@ public class ProxyForwardAuthDataTest extends ProducerConsumerBase {
     }
 
     private PulsarClient createPulsarClient(String proxyServiceUrl, String authParams) throws PulsarClientException {
-        org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
-        clientConf.setAuthentication(BasicAuthentication.class.getName(), authParams);
-        return PulsarClient.create(proxyServiceUrl, clientConf);
+        return PulsarClient.builder().serviceUrl(proxyServiceUrl)
+                .authentication(BasicAuthentication.class.getName(), authParams).build();
     }
 }
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
index 3c43611..4291fa8 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
@@ -33,7 +33,7 @@ public interface MessageToValuesMapper extends Serializable {
      * @param msg
      * @return
      */
-    public Values toValues(Message msg);
+    public Values toValues(Message<byte[]> msg);
 
     /**
      * Declare the output schema for the spout.
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
index a293fd6..0aa1ee3 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
@@ -18,29 +18,34 @@
  */
 package org.apache.pulsar.storm;
 
+import static java.lang.String.format;
+
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.storm.utils.TupleUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.PulsarClientException;
-
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichBolt;
 import org.apache.storm.tuple.Tuple;
-import static java.lang.String.format;
+import org.apache.storm.utils.TupleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+@SuppressWarnings("deprecation")
 public class PulsarBolt extends BaseRichBolt implements IMetric {
     /**
      *
@@ -52,8 +57,8 @@ public class PulsarBolt extends BaseRichBolt implements IMetric {
     public static final String PRODUCER_RATE = "producerRate";
     public static final String PRODUCER_THROUGHPUT_BYTES = "producerThroughput";
 
-    private final ClientConfiguration clientConf;
-    private final ProducerConfiguration producerConf;
+    private final ClientConfigurationData clientConf;
+    private final ProducerConfigurationData producerConf;
     private final PulsarBoltConfiguration pulsarBoltConf;
     private final ConcurrentMap<String, Object> metricsMap = Maps.newConcurrentMap();
 
@@ -65,17 +70,39 @@ public class PulsarBolt extends BaseRichBolt implements IMetric {
     private volatile long messagesSent = 0;
     private volatile long messageSizeSent = 0;
 
+    public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientBuilder clientBuilder) {
+        this.clientConf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone();
+        this.producerConf = new ProducerConfigurationData();
+        Preconditions.checkNotNull(pulsarBoltConf.getServiceUrl());
+        Preconditions.checkNotNull(pulsarBoltConf.getTopic());
+        Preconditions.checkNotNull(pulsarBoltConf.getTupleToMessageMapper());
+
+        this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl());
+        this.producerConf.setTopicName(pulsarBoltConf.getTopic());
+        this.pulsarBoltConf = pulsarBoltConf;
+    }
+
+    /**
+     * @deprecated Use {@link #PulsarBolt(PulsarBoltConfiguration, ClientBuilder)}
+     */
+    @Deprecated
     public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientConfiguration clientConf) {
         this(pulsarBoltConf, clientConf, new ProducerConfiguration());
     }
 
+    /**
+     * @deprecated Use {@link #PulsarBolt(PulsarBoltConfiguration, ClientBuilder)}
+     */
+    @Deprecated
     public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientConfiguration clientConf,
             ProducerConfiguration producerConf) {
-        this.clientConf = clientConf;
-        this.producerConf = producerConf;
+        this.clientConf = clientConf.getConfigurationData().clone();
+        this.producerConf = producerConf.getProducerConfigurationData().clone();
         Preconditions.checkNotNull(pulsarBoltConf.getServiceUrl());
         Preconditions.checkNotNull(pulsarBoltConf.getTopic());
         Preconditions.checkNotNull(pulsarBoltConf.getTupleToMessageMapper());
+        this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl());
+        this.producerConf.setTopicName(pulsarBoltConf.getTopic());
         this.pulsarBoltConf = pulsarBoltConf;
     }
 
@@ -86,8 +113,8 @@ public class PulsarBolt extends BaseRichBolt implements IMetric {
         this.boltId = String.format("%s-%s", componentId, context.getThisTaskId());
         this.collector = collector;
         try {
-            sharedPulsarClient = SharedPulsarClient.get(componentId, pulsarBoltConf.getServiceUrl(), clientConf);
-            producer = sharedPulsarClient.getSharedProducer(pulsarBoltConf.getTopic(), producerConf);
+            sharedPulsarClient = SharedPulsarClient.get(componentId, clientConf);
+            producer = sharedPulsarClient.getSharedProducer(producerConf);
             LOG.info("[{}] Created a pulsar producer on topic {} to send messages", boltId, pulsarBoltConf.getTopic());
         } catch (PulsarClientException e) {
             LOG.error("[{}] Error initializing pulsar producer on topic {}", boltId, pulsarBoltConf.getTopic(), e);
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index 639adb9..af26035 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -22,15 +22,11 @@ import static java.lang.String.format;
 
 import java.util.Map;
 import java.util.Queue;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
@@ -38,15 +34,25 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.Backoff;
-
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.shade.com.google.common.collect.Sets;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+
+@SuppressWarnings("deprecation")
 public class PulsarSpout extends BaseRichSpout implements IMetric {
 
     private static final long serialVersionUID = 1L;
@@ -59,37 +65,61 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
     public static final String CONSUMER_RATE = "consumerRate";
     public static final String CONSUMER_THROUGHPUT_BYTES = "consumerThroughput";
 
-    private final ClientConfiguration clientConf;
-    private final ConsumerConfiguration consumerConf;
+    private final ClientConfigurationData clientConf;
+    private final ConsumerConfigurationData<byte[]> consumerConf;
     private final PulsarSpoutConfiguration pulsarSpoutConf;
     private final long failedRetriesTimeoutNano;
     private final int maxFailedRetries;
     private final ConcurrentMap<MessageId, MessageRetries> pendingMessageRetries = Maps.newConcurrentMap();
-    private final Queue<Message> failedMessages = Queues.newConcurrentLinkedQueue();
+    private final Queue<Message<byte[]>> failedMessages = Queues.newConcurrentLinkedQueue();
     private final ConcurrentMap<String, Object> metricsMap = Maps.newConcurrentMap();
 
     private SharedPulsarClient sharedPulsarClient;
     private String componentId;
     private String spoutId;
     private SpoutOutputCollector collector;
-    private Consumer consumer;
+    private Consumer<byte[]> consumer;
     private volatile long messagesReceived = 0;
     private volatile long messagesEmitted = 0;
     private volatile long pendingAcks = 0;
     private volatile long messageSizeReceived = 0;
 
+    public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientBuilder clientBuilder) {
+        Preconditions.checkNotNull(pulsarSpoutConf.getServiceUrl());
+        Preconditions.checkNotNull(pulsarSpoutConf.getTopic());
+        Preconditions.checkNotNull(pulsarSpoutConf.getSubscriptionName());
+        Preconditions.checkNotNull(pulsarSpoutConf.getMessageToValuesMapper());
+
+        this.clientConf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone();
+        this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl());
+        this.consumerConf = new ConsumerConfigurationData<>();
+        this.consumerConf.setTopicNames(Sets.newHashSet(pulsarSpoutConf.getTopic()));
+        this.consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName());
+
+        this.pulsarSpoutConf = pulsarSpoutConf;
+        this.failedRetriesTimeoutNano = pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.NANOSECONDS);
+        this.maxFailedRetries = pulsarSpoutConf.getMaxFailedRetries();
+    }
+
+    @Deprecated
     public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientConfiguration clientConf) {
         this(pulsarSpoutConf, clientConf, new ConsumerConfiguration());
     }
 
+    @Deprecated
     public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientConfiguration clientConf,
             ConsumerConfiguration consumerConf) {
-        this.clientConf = clientConf;
-        this.consumerConf = consumerConf;
+        this.clientConf = clientConf.getConfigurationData().clone();
+        this.consumerConf = consumerConf.getConfigurationData().clone();
         Preconditions.checkNotNull(pulsarSpoutConf.getServiceUrl());
         Preconditions.checkNotNull(pulsarSpoutConf.getTopic());
         Preconditions.checkNotNull(pulsarSpoutConf.getSubscriptionName());
         Preconditions.checkNotNull(pulsarSpoutConf.getMessageToValuesMapper());
+
+        this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl());
+        this.consumerConf.setTopicNames(Sets.newHashSet(pulsarSpoutConf.getTopic()));
+        this.consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName());
+
         this.pulsarSpoutConf = pulsarSpoutConf;
         this.failedRetriesTimeoutNano = pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.NANOSECONDS);
         this.maxFailedRetries = pulsarSpoutConf.getMaxFailedRetries();
@@ -115,7 +145,7 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
     @Override
     public void ack(Object msgId) {
         if (msgId instanceof Message) {
-            Message msg = (Message) msgId;
+            Message<?> msg = (Message<?>) msgId;
             if (LOG.isDebugEnabled()) {
                 LOG.debug("[{}] Received ack for message {}", spoutId, msg.getMessageId());
             }
@@ -128,7 +158,8 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
     @Override
     public void fail(Object msgId) {
         if (msgId instanceof Message) {
-            Message msg = (Message) msgId;
+            @SuppressWarnings("unchecked")
+            Message<byte[]> msg = (Message<byte[]>) msgId;
             MessageId id = msg.getMessageId();
             LOG.warn("[{}] Error processing message {}", spoutId, id);
 
@@ -160,7 +191,7 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
     public void nextTuple() {
         emitNextAvailableTuple();
     }
-    
+
     /**
      * It makes sure that it emits next available non-tuple to topology unless consumer queue doesn't have any message
      * available. It receives message from consumer queue and converts it to tuple and emits to topology. if the
@@ -168,7 +199,7 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
      * emit.
      */
     public void emitNextAvailableTuple() {
-        Message msg;
+        Message<byte[]> msg;
 
         // check if there are any failed messages to re-emit in the topology
         msg = failedMessages.peek();
@@ -219,13 +250,15 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
         pendingMessageRetries.clear();
         failedMessages.clear();
         try {
-            sharedPulsarClient = SharedPulsarClient.get(componentId, pulsarSpoutConf.getServiceUrl(), clientConf);
+            sharedPulsarClient = SharedPulsarClient.get(componentId, clientConf);
             if (pulsarSpoutConf.isSharedConsumerEnabled()) {
-                consumer = sharedPulsarClient.getSharedConsumer(pulsarSpoutConf.getTopic(),
-                        pulsarSpoutConf.getSubscriptionName(), consumerConf);
+                consumer = sharedPulsarClient.getSharedConsumer(consumerConf);
             } else {
-                consumer = sharedPulsarClient.getClient().subscribe(pulsarSpoutConf.getTopic(),
-                        pulsarSpoutConf.getSubscriptionName(), consumerConf);
+                try {
+                    consumer = sharedPulsarClient.getClient().subscribeAsync(consumerConf).join();
+                } catch (CompletionException e) {
+                    throw (PulsarClientException) e.getCause();
+                }
             }
             LOG.info("[{}] Created a pulsar consumer on topic {} to receive messages with subscription {}", spoutId,
                     pulsarSpoutConf.getTopic(), pulsarSpoutConf.getSubscriptionName());
@@ -244,7 +277,7 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
 
     }
 
-    private boolean mapToValueAndEmit(Message msg) {
+    private boolean mapToValueAndEmit(Message<byte[]> msg) {
         if (msg != null) {
             Values values = pulsarSpoutConf.getMessageToValuesMapper().toValues(msg);
             ++pendingAcks;
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
index 8674077..4506e11 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
@@ -18,36 +18,37 @@
  */
 package org.apache.pulsar.storm;
 
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Maps;
-import org.apache.pulsar.client.api.ClientConfiguration;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
 
 public class SharedPulsarClient {
     private static final Logger LOG = LoggerFactory.getLogger(SharedPulsarClient.class);
     private static final ConcurrentMap<String, SharedPulsarClient> instances = Maps.newConcurrentMap();
 
     private final String componentId;
-    private final PulsarClient client;
+    private final PulsarClientImpl client;
     private final AtomicInteger counter = new AtomicInteger();
 
-    private Consumer consumer;
-    private Producer producer;
+    private Consumer<byte[]> consumer;
+    private Producer<byte[]> producer;
 
-    private SharedPulsarClient(String componentId, String serviceUrl, ClientConfiguration clientConf)
+    private SharedPulsarClient(String componentId, ClientConfigurationData clientConf)
             throws PulsarClientException {
-        this.client = PulsarClient.create(serviceUrl, clientConf);
+        this.client = new PulsarClientImpl(clientConf);
         this.componentId = componentId;
     }
 
@@ -62,13 +63,13 @@ public class SharedPulsarClient {
      * @return
      * @throws PulsarClientException
      */
-    public static SharedPulsarClient get(String componentId, String serviceUrl, ClientConfiguration clientConf)
+    public static SharedPulsarClient get(String componentId, ClientConfigurationData clientConf)
             throws PulsarClientException {
         AtomicReference<PulsarClientException> exception = new AtomicReference<PulsarClientException>();
         instances.computeIfAbsent(componentId, pulsarClient -> {
             SharedPulsarClient sharedPulsarClient = null;
             try {
-                sharedPulsarClient = new SharedPulsarClient(componentId, serviceUrl, clientConf);
+                sharedPulsarClient = new SharedPulsarClient(componentId, clientConf);
                 LOG.info("[{}] Created a new Pulsar Client.", componentId);
             } catch (PulsarClientException e) {
                 exception.set(e);
@@ -81,33 +82,41 @@ public class SharedPulsarClient {
         return instances.get(componentId);
     }
 
-    public PulsarClient getClient() {
+    public PulsarClientImpl getClient() {
         counter.incrementAndGet();
         return client;
     }
 
-    public Consumer getSharedConsumer(String topic, String subscription, ConsumerConfiguration consumerConf)
+    public Consumer<byte[]> getSharedConsumer(ConsumerConfigurationData<byte[]> consumerConf)
             throws PulsarClientException {
         counter.incrementAndGet();
         synchronized (this) {
             if (consumer == null) {
-                consumer = client.subscribe(topic, subscription, consumerConf);
-                LOG.info("[{}] Created a new Pulsar Consumer on {}", componentId, topic);
+                try {
+                    consumer = client.subscribeAsync(consumerConf).join();
+                } catch (CompletionException e) {
+                    throw (PulsarClientException) e.getCause();
+                }
+                LOG.info("[{}] Created a new Pulsar Consumer on {}", componentId, consumerConf.getSingleTopic());
             } else {
-                LOG.info("[{}] Using a shared consumer on {}", componentId, topic);
+                LOG.info("[{}] Using a shared consumer on {}", componentId, consumerConf.getSingleTopic());
             }
         }
         return consumer;
     }
 
-    public Producer getSharedProducer(String topic, ProducerConfiguration producerConf) throws PulsarClientException {
+    public Producer<byte[]> getSharedProducer(ProducerConfigurationData producerConf) throws PulsarClientException {
         counter.incrementAndGet();
         synchronized (this) {
             if (producer == null) {
-                producer = client.createProducer(topic, producerConf);
-                LOG.info("[{}] Created a new Pulsar Producer on {}", componentId, topic);
+                try {
+                    producer = client.createProducerAsync(producerConf).join();
+                } catch (CompletionException e) {
+                    throw (PulsarClientException) e.getCause();
+                }
+                LOG.info("[{}] Created a new Pulsar Producer on {}", componentId, producerConf.getTopicName());
             } else {
-                LOG.info("[{}] Using a shared producer on {}", componentId, topic);
+                LOG.info("[{}] Using a shared producer on {}", componentId, producerConf.getTopicName());
             }
         }
         return producer;
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index f07eeba..e508412 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.testclient;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 
@@ -36,16 +34,14 @@ import java.util.concurrent.atomic.LongAdder;
 
 import org.HdrHistogram.Histogram;
 import org.HdrHistogram.Recorder;
-import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.EncryptionKeyInfo;
-import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -197,33 +193,33 @@ public class PerformanceConsumer {
 
         final RateLimiter limiter = arguments.rate > 0 ? RateLimiter.create(arguments.rate) : null;
 
-        MessageListener listener = new MessageListener() {
-            public void received(Consumer consumer, Message msg) {
-                messagesReceived.increment();
-                bytesReceived.add(msg.getData().length);
+        MessageListener<byte[]> listener = (consumer, msg) -> {
+            messagesReceived.increment();
+            bytesReceived.add(msg.getData().length);
 
-                if (limiter != null) {
-                    limiter.acquire();
-                }
+            if (limiter != null) {
+                limiter.acquire();
+            }
 
-                long latencyMillis = System.currentTimeMillis() - msg.getPublishTime();
-                recorder.recordValue(latencyMillis);
-                cumulativeRecorder.recordValue(latencyMillis);
+            long latencyMillis = System.currentTimeMillis() - msg.getPublishTime();
+            recorder.recordValue(latencyMillis);
+            cumulativeRecorder.recordValue(latencyMillis);
 
-                consumer.acknowledgeAsync(msg);
-            }
+            consumer.acknowledgeAsync(msg);
         };
 
-        ClientConfiguration clientConf = new ClientConfiguration();
-        clientConf.setConnectionsPerBroker(arguments.maxConnections);
-        clientConf.setStatsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS);
-        clientConf.setIoThreads(Runtime.getRuntime().availableProcessors());
+        ClientBuilder clientBuilder = PulsarClient.builder() //
+                .serviceUrl(arguments.serviceURL) //
+                .connectionsPerBroker(arguments.maxConnections) //
+                .statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) //
+                .ioThreads(Runtime.getRuntime().availableProcessors()) //
+                .enableTls(arguments.useTls) //
+                .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
         if (isNotBlank(arguments.authPluginClassName)) {
-            clientConf.setAuthentication(arguments.authPluginClassName, arguments.authParams);
+            clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
         }
-        clientConf.setUseTls(arguments.useTls);
-        clientConf.setTlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
-        PulsarClient pulsarClient = new PulsarClientImpl(arguments.serviceURL, clientConf);
+
+        PulsarClient pulsarClient = clientBuilder.build();
 
         class EncKeyReader implements CryptoKeyReader {
 
@@ -246,16 +242,17 @@ public class PerformanceConsumer {
                 return null;
             }
         }
+
         List<Future<Consumer<byte[]>>> futures = Lists.newArrayList();
-        ConsumerConfiguration consumerConfig = new ConsumerConfiguration();
-        consumerConfig.setMessageListener(listener);
-        consumerConfig.setReceiverQueueSize(arguments.receiverQueueSize);
-        consumerConfig.setSubscriptionType(arguments.subscriptionType);
+        ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer() //
+                .messageListener(listener) //
+                .receiverQueueSize(arguments.receiverQueueSize) //
+                .subscriptionType(arguments.subscriptionType);
 
         if (arguments.encKeyName != null) {
             byte[] pKey = Files.readAllBytes(Paths.get(arguments.encKeyFile));
             EncKeyReader keyReader = new EncKeyReader(pKey);
-            consumerConfig.setCryptoKeyReader(keyReader);
+            consumerBuilder.cryptoKeyReader(keyReader);
         }
 
         for (int i = 0; i < arguments.numTopics; i++) {
@@ -271,7 +268,8 @@ public class PerformanceConsumer {
                     subscriberName = arguments.subscriberName;
                 }
 
-                futures.add(pulsarClient.subscribeAsync(topicName.toString(), subscriberName, consumerConfig));
+                futures.add(consumerBuilder.clone().topic(topicName.toString()).subscriptionName(subscriberName)
+                        .subscribeAsync());
             }
         }
 
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 681eb3d..41dfa98 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -42,16 +42,14 @@ import java.util.concurrent.atomic.LongAdder;
 import org.HdrHistogram.Histogram;
 import org.HdrHistogram.HistogramLogWriter;
 import org.HdrHistogram.Recorder;
-import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.EncryptionKeyInfo;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -233,15 +231,17 @@ public class PerformanceProducer {
         String prefixTopicName = arguments.topics.get(0);
         List<Future<Producer<byte[]>>> futures = Lists.newArrayList();
 
-        ClientConfiguration clientConf = new ClientConfiguration();
-        clientConf.setConnectionsPerBroker(arguments.maxConnections);
-        clientConf.setIoThreads(Runtime.getRuntime().availableProcessors());
-        clientConf.setStatsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS);
+        ClientBuilder clientBuilder = PulsarClient.builder() //
+                .serviceUrl(arguments.serviceURL) //
+                .connectionsPerBroker(arguments.maxConnections) //
+                .ioThreads(Runtime.getRuntime().availableProcessors()) //
+                .statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) //
+                .enableTls(arguments.useTls) //
+                .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
+
         if (isNotBlank(arguments.authPluginClassName)) {
-            clientConf.setAuthentication(arguments.authPluginClassName, arguments.authParams);
+            clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
         }
-        clientConf.setUseTls(arguments.useTls);
-        clientConf.setTlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
 
         class EncKeyReader implements CryptoKeyReader {
 
@@ -264,27 +264,26 @@ public class PerformanceProducer {
                 return null;
             }
         }
-        PulsarClient client = new PulsarClientImpl(arguments.serviceURL, clientConf);
-
-        ProducerConfiguration producerConf = new ProducerConfiguration();
-        producerConf.setSendTimeout(0, TimeUnit.SECONDS);
-        producerConf.setCompressionType(arguments.compression);
-        producerConf.setMaxPendingMessages(arguments.maxOutstanding);
-        // enable round robin message routing if it is a partitioned topic
-        producerConf.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+        PulsarClient client = clientBuilder.build();
+        ProducerBuilder<byte[]> producerBuilder = client.newProducer() //
+                .sendTimeout(0, TimeUnit.SECONDS) //
+                .compressionType(arguments.compression) //
+                .maxPendingMessages(arguments.maxOutstanding) //
+                // enable round robin message routing if it is a partitioned topic
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+
         if (arguments.batchTime > 0) {
-            producerConf.setBatchingMaxPublishDelay(arguments.batchTime, TimeUnit.MILLISECONDS);
-            producerConf.setBatchingEnabled(true);
+            producerBuilder.batchingMaxPublishDelay(arguments.batchTime, TimeUnit.MILLISECONDS).enableBatching(true);
         }
 
         // Block if queue is full else we will start seeing errors in sendAsync
-        producerConf.setBlockIfQueueFull(true);
+        producerBuilder.blockIfQueueFull(true);
 
         if (arguments.encKeyName != null) {
-            producerConf.addEncryptionKey(arguments.encKeyName);
+            producerBuilder.addEncryptionKey(arguments.encKeyName);
             byte[] pKey = Files.readAllBytes(Paths.get(arguments.encKeyFile));
             EncKeyReader keyReader = new EncKeyReader(pKey);
-            producerConf.setCryptoKeyReader(keyReader);
+            producerBuilder.cryptoKeyReader(keyReader);
         }
 
         for (int i = 0; i < arguments.numTopics; i++) {
@@ -292,7 +291,7 @@ public class PerformanceProducer {
             log.info("Adding {} publishers on topic {}", arguments.numProducers, topic);
 
             for (int j = 0; j < arguments.numProducers; j++) {
-                futures.add(client.createProducerAsync(topic, producerConf));
+                futures.add(producerBuilder.clone().topic(topic).createAsync());
             }
         }
 
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index c5e66b2..7a4fdf9 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -29,14 +29,13 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAdder;
 
-import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.api.ReaderConfiguration;
+import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.ReaderListener;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
@@ -173,7 +172,7 @@ public class PerformanceReader {
 
         final RateLimiter limiter = arguments.rate > 0 ? RateLimiter.create(arguments.rate) : null;
 
-        ReaderListener listener = (reader, msg) -> {
+        ReaderListener<byte[]> listener = (reader, msg) -> {
             messagesReceived.increment();
             bytesReceived.add(msg.getData().length);
 
@@ -182,21 +181,21 @@ public class PerformanceReader {
             }
         };
 
-        ClientConfiguration clientConf = new ClientConfiguration();
-        clientConf.setConnectionsPerBroker(arguments.maxConnections);
-        clientConf.setStatsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS);
-        clientConf.setIoThreads(Runtime.getRuntime().availableProcessors());
+        ClientBuilder clientBuilder = PulsarClient.builder() //
+                .serviceUrl(arguments.serviceURL) //
+                .connectionsPerBroker(arguments.maxConnections) //
+                .statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) //
+                .ioThreads(Runtime.getRuntime().availableProcessors()) //
+                .enableTls(arguments.useTls) //
+                .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
+
         if (isNotBlank(arguments.authPluginClassName)) {
-            clientConf.setAuthentication(arguments.authPluginClassName, arguments.authParams);
+            clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
         }
-        clientConf.setUseTls(arguments.useTls);
-        clientConf.setTlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
-        PulsarClient pulsarClient = new PulsarClientImpl(arguments.serviceURL, clientConf);
+
+        PulsarClient pulsarClient = clientBuilder.build();
 
         List<CompletableFuture<Reader<byte[]>>> futures = Lists.newArrayList();
-        ReaderConfiguration readerConfig = new ReaderConfiguration();
-        readerConfig.setReaderListener(listener);
-        readerConfig.setReceiverQueueSize(arguments.receiverQueueSize);
 
         MessageId startMessageId;
         if ("earliest".equals(arguments.startMessageId)) {
@@ -208,11 +207,16 @@ public class PerformanceReader {
             startMessageId = new MessageIdImpl(Long.parseLong(parts[0]), Long.parseLong(parts[1]), -1);
         }
 
+        ReaderBuilder<byte[]> readerBuilder = pulsarClient.newReader() //
+                .readerListener(listener) //
+                .receiverQueueSize(arguments.receiverQueueSize) //
+                .startMessageId(startMessageId);
+
         for (int i = 0; i < arguments.numTopics; i++) {
             final TopicName topicName = (arguments.numTopics == 1) ? prefixTopicName
                     : TopicName.get(String.format("%s-%d", prefixTopicName, i));
 
-            futures.add(pulsarClient.createReaderAsync(topicName.toString(), startMessageId, readerConfig));
+            futures.add(readerBuilder.clone().topic(topicName.toString()).createAsync());
         }
 
         FutureUtil.waitForAll(futures).get();
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index 6573477..b917dc2 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -36,7 +36,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
-import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
@@ -176,30 +176,33 @@ public class WebSocketService implements Closeable {
     }
 
     private PulsarClient createClientInstance(ClusterData clusterData) throws IOException {
-        ClientConfiguration clientConf = new ClientConfiguration();
-        clientConf.setStatsInterval(0, TimeUnit.SECONDS);
-        clientConf.setUseTls(config.isTlsEnabled());
-        clientConf.setTlsAllowInsecureConnection(config.isTlsAllowInsecureConnection());
-        clientConf.setTlsTrustCertsFilePath(config.getBrokerClientTrustCertsFilePath());
-        clientConf.setIoThreads(config.getWebSocketNumIoThreads());
-        clientConf.setConnectionsPerBroker(config.getWebSocketConnectionsPerBroker());
+        ClientBuilder clientBuilder = PulsarClient.builder() //
+                .statsInterval(0, TimeUnit.SECONDS) //
+                .enableTls(config.isTlsEnabled()) //
+                .allowTlsInsecureConnection(config.isTlsAllowInsecureConnection()) //
+                .tlsTrustCertsFilePath(config.getBrokerClientTrustCertsFilePath()) //
+                .ioThreads(config.getWebSocketNumIoThreads()) //
+                .connectionsPerBroker(config.getWebSocketConnectionsPerBroker());
 
         if (isNotBlank(config.getBrokerClientAuthenticationPlugin())
                 && isNotBlank(config.getBrokerClientAuthenticationParameters())) {
-            clientConf.setAuthentication(config.getBrokerClientAuthenticationPlugin(),
+            clientBuilder.authentication(config.getBrokerClientAuthenticationPlugin(),
                     config.getBrokerClientAuthenticationParameters());
         }
 
         if (config.isTlsEnabled()) {
             if (isNotBlank(clusterData.getBrokerServiceUrlTls())) {
-                return PulsarClient.create(clusterData.getBrokerServiceUrlTls(), clientConf);
+                clientBuilder.serviceUrl(clusterData.getBrokerServiceUrlTls());
             } else if (isNotBlank(clusterData.getServiceUrlTls())) {
-                return PulsarClient.create(clusterData.getServiceUrlTls(), clientConf);
+                clientBuilder.serviceUrl(clusterData.getServiceUrlTls());
             }
         } else if (isNotBlank(clusterData.getBrokerServiceUrl())) {
-            return PulsarClient.create(clusterData.getBrokerServiceUrl(), clientConf);
+            clientBuilder.serviceUrl(clusterData.getBrokerServiceUrl());
+        } else {
+            clientBuilder.serviceUrl(clusterData.getServiceUrl());
         }
-        return PulsarClient.create(clusterData.getServiceUrl(), clientConf);
+
+        return clientBuilder.build();
     }
 
     private static ClusterData createClusterData(WebSocketProxyConfiguration config) {

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.