You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/12/30 16:47:50 UTC

[pulsar] branch master updated: Provide separate module with Pulsar v1 client API wrapper (#3228)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 742ee7e  Provide separate module with Pulsar v1 client API wrapper (#3228)
742ee7e is described below

commit 742ee7eec3f150626b4017223429323aab82d61a
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sun Dec 30 08:47:45 2018 -0800

    Provide separate module with Pulsar v1 client API wrapper (#3228)
    
    * Provide separate module with Pulsar v1 client API wrapper
    
    * Use impl classes rather than API
    
    * Brought back DefaultSchemasTest.testStringSchema
---
 pom.xml                                            |   1 +
 pulsar-client-1x-base/pom.xml                      |  40 ++
 pulsar-client-1x-base/pulsar-client-1x/pom.xml     |  52 +++
 .../pulsar/client/api/ClientConfiguration.java     | 390 +++++++++++++++++
 .../org/apache/pulsar/client/api/Consumer.java     | 308 +++++++++++++
 .../pulsar/client/api/ConsumerConfiguration.java   | 366 ++++++++++++++++
 .../apache/pulsar/client/api/MessageBuilder.java   | 140 ++++++
 .../org/apache/pulsar/client/api/Producer.java     | 199 +++++++++
 .../pulsar/client/api/ProducerConfiguration.java   | 483 +++++++++++++++++++++
 .../org/apache/pulsar/client/api/PulsarClient.java | 274 ++++++++++++
 .../java/org/apache/pulsar/client/api/Reader.java  |  81 ++++
 .../pulsar/client/api/ReaderConfiguration.java     | 163 +++++++
 .../pulsar/client/impl/MessageBuilderImpl.java     | 116 +++++
 .../pulsar/client/impl/v1/ConsumerV1Impl.java      | 153 +++++++
 .../pulsar/client/impl/v1/ProducerV1Impl.java      |  91 ++++
 .../pulsar/client/impl/v1/PulsarClientV1Impl.java  | 173 ++++++++
 .../apache/pulsar/client/impl/v1/ReaderV1Impl.java |  86 ++++
 .../pulsar/client/impl/MessageBuilderTest.java     |  10 +-
 .../pulsar-client-2x-shaded/pom.xml                |  91 ++++
 .../client/impl/schema/DefaultSchemasTest.java     |  22 +-
 20 files changed, 3217 insertions(+), 22 deletions(-)

diff --git a/pom.xml b/pom.xml
index 2aaca3a..ee2780c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,6 +84,7 @@ flexible messaging model and an intuitive client API.</description>
     <module>pulsar-client-api</module>
     <module>pulsar-client</module>
     <module>pulsar-client-shaded</module>
+    <module>pulsar-client-1x-base</module>
     <module>pulsar-client-admin</module>
     <module>pulsar-client-admin-shaded</module>
     <module>pulsar-client-tools</module>
diff --git a/pulsar-client-1x-base/pom.xml b/pulsar-client-1x-base/pom.xml
new file mode 100644
index 0000000..22a5b1b
--- /dev/null
+++ b/pulsar-client-1x-base/pom.xml
@@ -0,0 +1,40 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar</artifactId>
+    <version>2.3.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>pulsar-client-1x-base</artifactId>
+  <name>Pulsar Client 1.x Compatibility Base</name>
+  <packaging>pom</packaging>
+
+  <modules>
+      <module>pulsar-client-2x-shaded</module>
+      <module>pulsar-client-1x</module>
+  </modules>
+</project>
diff --git a/pulsar-client-1x-base/pulsar-client-1x/pom.xml b/pulsar-client-1x-base/pulsar-client-1x/pom.xml
new file mode 100644
index 0000000..b6fd0b8
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/pom.xml
@@ -0,0 +1,52 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-client-1x-base</artifactId>
+    <version>2.3.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>pulsar-client-1x</artifactId>
+  <name>Pulsar Client 1.x Compatibility API</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-client-2x-shaded</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
+    
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    
+    <dependency>
+        <groupId>org.apache.commons</groupId>
+        <artifactId>commons-lang3</artifactId>
+      </dependency>
+  </dependencies>
+</project>
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
new file mode 100644
index 0000000..e5bac91
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+
+/**
+ * Class used to specify client side configuration like authentication, etc..
+ *
+ * @deprecated Use {@link PulsarClient#builder()} to construct and configure a new {@link PulsarClient} instance
+ */
+@Deprecated
+public class ClientConfiguration implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final ClientConfigurationData confData = new ClientConfigurationData();
+
+    /**
+     * @return the authentication provider to be used
+     */
+    public Authentication getAuthentication() {
+        return confData.getAuthentication();
+    }
+
+    /**
+     * Set the authentication provider to use in the Pulsar client instance.
+     * <p>
+     * Example:
+     * <p>
+     *
+     * <pre>
+     * <code>
+     * ClientConfiguration confData = new ClientConfiguration();
+     * String authPluginClassName = "org.apache.pulsar.client.impl.auth.MyAuthentication";
+     * String authParamsString = "key1:val1,key2:val2";
+     * Authentication auth = AuthenticationFactory.create(authPluginClassName, authParamsString);
+     * confData.setAuthentication(auth);
+     * PulsarClient client = PulsarClient.create(serviceUrl, confData);
+     * ....
+     * </code>
+     * </pre>
+     *
+     * @param authentication
+     */
+    public void setAuthentication(Authentication authentication) {
+        confData.setAuthentication(authentication);
+    }
+
+    /**
+     * Set the authentication provider to use in the Pulsar client instance.
+     * <p>
+     * Example:
+     * <p>
+     *
+     * <pre>
+     * <code>
+     * ClientConfiguration confData = new ClientConfiguration();
+     * String authPluginClassName = "org.apache.pulsar.client.impl.auth.MyAuthentication";
+     * String authParamsString = "key1:val1,key2:val2";
+     * confData.setAuthentication(authPluginClassName, authParamsString);
+     * PulsarClient client = PulsarClient.create(serviceUrl, confData);
+     * ....
+     * </code>
+     * </pre>
+     *
+     * @param authPluginClassName
+     *            name of the Authentication-Plugin you want to use
+     * @param authParamsString
+     *            string which represents parameters for the Authentication-Plugin, e.g., "key1:val1,key2:val2"
+     * @throws UnsupportedAuthenticationException
+     *             failed to instantiate specified Authentication-Plugin
+     */
+    public void setAuthentication(String authPluginClassName, String authParamsString)
+            throws UnsupportedAuthenticationException {
+        confData.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParamsString));
+    }
+
+    /**
+     * Set the authentication provider to use in the Pulsar client instance.
+     * <p>
+     * Example:
+     * <p>
+     *
+     * <pre>
+     * <code>
+     * ClientConfiguration confData = new ClientConfiguration();
+     * String authPluginClassName = "org.apache.pulsar.client.impl.auth.MyAuthentication";
+     * Map<String, String> authParams = new HashMap<String, String>();
+     * authParams.put("key1", "val1");
+     * confData.setAuthentication(authPluginClassName, authParams);
+     * PulsarClient client = PulsarClient.create(serviceUrl, confData);
+     * ....
+     * </code>
+     * </pre>
+     *
+     * @param authPluginClassName
+     *            name of the Authentication-Plugin you want to use
+     * @param authParams
+     *            map which represents parameters for the Authentication-Plugin
+     * @throws UnsupportedAuthenticationException
+     *             failed to instantiate specified Authentication-Plugin
+     */
+    public void setAuthentication(String authPluginClassName, Map<String, String> authParams)
+            throws UnsupportedAuthenticationException {
+        confData.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParams));
+    }
+
+    /**
+     * @return the operation timeout in ms
+     */
+    public long getOperationTimeoutMs() {
+        return confData.getOperationTimeoutMs();
+    }
+
+    /**
+     * Set the operation timeout <i>(default: 30 seconds)</i>
+     * <p>
+     * Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the
+     * operation will be marked as failed
+     *
+     * @param operationTimeout
+     *            operation timeout
+     * @param unit
+     *            time unit for {@code operationTimeout}
+     */
+    public void setOperationTimeout(int operationTimeout, TimeUnit unit) {
+        checkArgument(operationTimeout >= 0);
+        confData.setOperationTimeoutMs(unit.toMillis(operationTimeout));
+    }
+
+    /**
+     * @return the number of threads to use for handling connections
+     */
+    public int getIoThreads() {
+        return confData.getNumIoThreads();
+    }
+
+    /**
+     * Set the number of threads to be used for handling connections to brokers <i>(default: 1 thread)</i>
+     *
+     * @param numIoThreads
+     */
+    public void setIoThreads(int numIoThreads) {
+        checkArgument(numIoThreads > 0);
+        confData.setNumIoThreads(numIoThreads);
+    }
+
+    /**
+     * @return the number of threads to use for message listeners
+     */
+    public int getListenerThreads() {
+        return confData.getNumListenerThreads();
+    }
+
+    /**
+     * Set the number of threads to be used for message listeners <i>(default: 1 thread)</i>
+     *
+     * @param numListenerThreads
+     */
+    public void setListenerThreads(int numListenerThreads) {
+        checkArgument(numListenerThreads > 0);
+        confData.setNumListenerThreads(numListenerThreads);
+    }
+
+    /**
+     * @return the max number of connections per single broker
+     */
+    public int getConnectionsPerBroker() {
+        return confData.getConnectionsPerBroker();
+    }
+
+    /**
+     * Sets the max number of connection that the client library will open to a single broker.
+     * <p>
+     * By default, the connection pool will use a single connection for all the producers and consumers. Increasing this
+     * parameter may improve throughput when using many producers over a high latency connection.
+     * <p>
+     *
+     * @param connectionsPerBroker
+     *            max number of connections per broker (needs to be greater than 0)
+     */
+    public void setConnectionsPerBroker(int connectionsPerBroker) {
+        checkArgument(connectionsPerBroker > 0, "Connections per broker need to be greater than 0");
+        confData.setConnectionsPerBroker(connectionsPerBroker);
+    }
+
+    /**
+     * @return whether TCP no-delay should be set on the connections
+     */
+    public boolean isUseTcpNoDelay() {
+        return confData.isUseTcpNoDelay();
+    }
+
+    /**
+     * Configure whether to use TCP no-delay flag on the connection, to disable Nagle algorithm.
+     * <p>
+     * No-delay features make sure packets are sent out on the network as soon as possible, and it's critical to achieve
+     * low latency publishes. On the other hand, sending out a huge number of small packets might limit the overall
+     * throughput, so if latency is not a concern, it's advisable to set the <code>useTcpNoDelay</code> flag to false.
+     * <p>
+     * Default value is true
+     *
+     * @param useTcpNoDelay
+     */
+    public void setUseTcpNoDelay(boolean useTcpNoDelay) {
+        confData.setUseTcpNoDelay(useTcpNoDelay);
+    }
+
+    /**
+     * @return whether TLS encryption is used on the connection
+     */
+    public boolean isUseTls() {
+        return confData.isUseTls();
+    }
+
+    /**
+     * Configure whether to use TLS encryption on the connection <i>(default: false)</i>
+     *
+     * @param useTls
+     */
+    public void setUseTls(boolean useTls) {
+        confData.setUseTls(useTls);
+    }
+
+    /**
+     * @return path to the trusted TLS certificate file
+     */
+    public String getTlsTrustCertsFilePath() {
+        return confData.getTlsTrustCertsFilePath();
+    }
+
+    /**
+     * Set the path to the trusted TLS certificate file
+     *
+     * @param tlsTrustCertsFilePath
+     */
+    public void setTlsTrustCertsFilePath(String tlsTrustCertsFilePath) {
+        confData.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
+    }
+
+    /**
+     * @return whether the Pulsar client accept untrusted TLS certificate from broker
+     */
+    public boolean isTlsAllowInsecureConnection() {
+        return confData.isTlsAllowInsecureConnection();
+    }
+
+    /**
+     * Configure whether the Pulsar client accept untrusted TLS certificate from broker <i>(default: false)</i>
+     *
+     * @param tlsAllowInsecureConnection
+     */
+    public void setTlsAllowInsecureConnection(boolean tlsAllowInsecureConnection) {
+        confData.setTlsAllowInsecureConnection(tlsAllowInsecureConnection);
+    }
+
+    /**
+     * Stats will be activated with positive statsIntervalSeconds
+     *
+     * @return the interval between each stat info <i>(default: 60 seconds)</i>
+     */
+    public long getStatsIntervalSeconds() {
+        return confData.getStatsIntervalSeconds();
+    }
+
+    /**
+     * Set the interval between each stat info <i>(default: 60 seconds)</i> Stats will be activated with positive
+     * statsIntervalSeconds It should be set to at least 1 second
+     *
+     * @param statsIntervalSeconds
+     *            the interval between each stat info
+     * @param unit
+     *            time unit for {@code statsInterval}
+     */
+    public void setStatsInterval(long statsInterval, TimeUnit unit) {
+        confData.setStatsIntervalSeconds(unit.toSeconds(statsInterval));
+    }
+
+    /**
+     * Get configured total allowed concurrent lookup-request.
+     *
+     * @return
+     */
+    public int getConcurrentLookupRequest() {
+        return confData.getConcurrentLookupRequest();
+    }
+
+    /**
+     * Number of concurrent lookup-requests allowed on each broker-connection to prevent overload on broker.
+     * <i>(default: 50000)</i> It should be configured with higher value only in case of it requires to
+     * produce/subscribe on thousands of topic using created {@link PulsarClient}
+     *
+     * @param concurrentLookupRequest
+     */
+    public void setConcurrentLookupRequest(int concurrentLookupRequest) {
+        confData.setConcurrentLookupRequest(concurrentLookupRequest);
+    }
+
+    /**
+     * Get configured max number of reject-request in a time-frame (30 seconds) after which connection will be closed
+     *
+     * @return
+     */
+    public int getMaxNumberOfRejectedRequestPerConnection() {
+        return confData.getMaxNumberOfRejectedRequestPerConnection();
+    }
+
+    /**
+     * Set max number of broker-rejected requests in a certain time-frame (30 seconds) after which current connection
+     * will be closed and client creates a new connection that give chance to connect a different broker <i>(default:
+     * 50)</i>
+     *
+     * @param maxNumberOfRejectedRequestPerConnection
+     */
+    public void setMaxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRequestPerConnection) {
+        confData.setMaxNumberOfRejectedRequestPerConnection(maxNumberOfRejectedRequestPerConnection);
+    }
+
+    public boolean isTlsHostnameVerificationEnable() {
+        return confData.isTlsHostnameVerificationEnable();
+    }
+
+    /**
+     * It allows to validate hostname verification when client connects to broker over tls. It validates incoming x509
+     * certificate and matches provided hostname(CN/SAN) with expected broker's host name. It follows RFC 2818, 3.1.
+     * Server Identity hostname verification.
+     *
+     * @see <a href="https://tools.ietf.org/html/rfc2818">rfc2818</a>
+     *
+     * @param tlsHostnameVerificationEnable
+     */
+    public void setTlsHostnameVerificationEnable(boolean tlsHostnameVerificationEnable) {
+        confData.setTlsHostnameVerificationEnable(tlsHostnameVerificationEnable);
+    }
+
+    public ClientConfiguration setServiceUrl(String serviceUrl) {
+        confData.setServiceUrl(serviceUrl);
+        return this;
+    }
+
+    /**
+     * Set the duration of time to wait for a connection to a broker to be established. If the duration
+     * passes without a response from the broker, the connection attempt is dropped.
+     *
+     * @param duration the duration to wait
+     * @param unit the time unit in which the duration is defined
+     */
+    public void setConnectionTimeout(int duration, TimeUnit unit) {
+        confData.setConnectionTimeoutMs((int)unit.toMillis(duration));
+    }
+
+    /**
+     * Get the duration of time for which the client will wait for a connection to a broker to be
+     * established before giving up.
+     *
+     * @return the duration, in milliseconds
+     */
+    public long getConnectionTimeoutMs() {
+        return confData.getConnectionTimeoutMs();
+    }
+
+    public ClientConfigurationData getConfigurationData() {
+        return confData;
+    }
+
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Consumer.java
new file mode 100644
index 0000000..00da0a4
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An interface that abstracts behavior of Pulsar's consumer.
+ */
+public interface Consumer extends Closeable {
+
+    /**
+     * Get a topic for the consumer
+     *
+     * @return topic for the consumer
+     */
+    String getTopic();
+
+    /**
+     * Get a subscription for the consumer
+     *
+     * @return subscription for the consumer
+     */
+    String getSubscription();
+
+    /**
+     * Unsubscribe the consumer
+     * <p>
+     * This call blocks until the consumer is unsubscribed.
+     *
+     * @throws PulsarClientException
+     */
+    void unsubscribe() throws PulsarClientException;
+
+    /**
+     * Asynchronously unsubscribe the consumer
+     *
+     * @return {@link CompletableFuture} for this operation
+     */
+    CompletableFuture<Void> unsubscribeAsync();
+
+    /**
+     * Receives a single message.
+     * <p>
+     * This calls blocks until a message is available.
+     *
+     * @return the received message
+     * @throws PulsarClientException.AlreadyClosedException
+     *             if the consumer was already closed
+     * @throws PulsarClientException.InvalidConfigurationException
+     *             if a message listener was defined in the configuration
+     */
+    Message<byte[]> receive() throws PulsarClientException;
+
+    /**
+     * Receive a single message
+     * <p>
+     * Retrieves a message when it will be available and completes {@link CompletableFuture} with received message.
+     * </p>
+     * <p>
+     * {@code receiveAsync()} should be called subsequently once returned {@code CompletableFuture} gets complete with
+     * received message. Else it creates <i> backlog of receive requests </i> in the application.
+     * </p>
+     *
+     * @return {@link CompletableFuture}<{@link Message}> will be completed when message is available
+     */
+    CompletableFuture<Message<byte[]>> receiveAsync();
+
+    /**
+     * Receive a single message
+     * <p>
+     * Retrieves a message, waiting up to the specified wait time if necessary.
+     *
+     * @param timeout
+     *            0 or less means immediate rather than infinite
+     * @param unit
+     * @return the received {@link Message} or null if no message available before timeout
+     * @throws PulsarClientException.AlreadyClosedException
+     *             if the consumer was already closed
+     * @throws PulsarClientException.InvalidConfigurationException
+     *             if a message listener was defined in the configuration
+     */
+    Message<byte[]> receive(int timeout, TimeUnit unit) throws PulsarClientException;
+
+    /**
+     * Acknowledge the consumption of a single message
+     *
+     * @param message
+     *            The {@code Message} to be acknowledged
+     * @throws PulsarClientException.AlreadyClosedException
+     *             if the consumer was already closed
+     */
+    void acknowledge(Message<?> message) throws PulsarClientException;
+
+    /**
+     * Acknowledge the consumption of a single message, identified by its MessageId
+     *
+     * @param messageId
+     *            The {@code MessageId} to be acknowledged
+     * @throws PulsarClientException.AlreadyClosedException
+     *             if the consumer was already closed
+     */
+    void acknowledge(MessageId messageId) throws PulsarClientException;
+
+    /**
+     * Acknowledge the reception of all the messages in the stream up to (and including) the provided message.
+     *
+     * This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
+     * re-delivered to this consumer.
+     *
+     * Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
+     *
+     * It's equivalent to calling asyncAcknowledgeCumulative(Message) and waiting for the callback to be triggered.
+     *
+     * @param message
+     *            The {@code Message} to be cumulatively acknowledged
+     * @throws PulsarClientException.AlreadyClosedException
+     *             if the consumer was already closed
+     */
+    void acknowledgeCumulative(Message<?> message) throws PulsarClientException;
+
+    /**
+     * Acknowledge the reception of all the messages in the stream up to (and including) the provided message.
+     *
+     * This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
+     * re-delivered to this consumer.
+     *
+     * Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
+     *
+     * It's equivalent to calling asyncAcknowledgeCumulative(MessageId) and waiting for the callback to be triggered.
+     *
+     * @param messageId
+     *            The {@code MessageId} to be cumulatively acknowledged
+     * @throws PulsarClientException.AlreadyClosedException
+     *             if the consumer was already closed
+     */
+    void acknowledgeCumulative(MessageId messageId) throws PulsarClientException;
+
+    /**
+     * Asynchronously acknowledge the consumption of a single message
+     *
+     * @param message
+     *            The {@code Message} to be acknowledged
+     * @return a future that can be used to track the completion of the operation
+     */
+    CompletableFuture<Void> acknowledgeAsync(Message<?> message);
+
+    /**
+     * Asynchronously acknowledge the consumption of a single message
+     *
+     * @param messageId
+     *            The {@code MessageId} to be acknowledged
+     * @return a future that can be used to track the completion of the operation
+     */
+    CompletableFuture<Void> acknowledgeAsync(MessageId messageId);
+
+    /**
+     * Asynchronously Acknowledge the reception of all the messages in the stream up to (and including) the provided
+     * message.
+     *
+     * Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
+     *
+     * @param message
+     *            The {@code Message} to be cumulatively acknowledged
+     * @return a future that can be used to track the completion of the operation
+     */
+    CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message);
+
+    /**
+     * Asynchronously Acknowledge the reception of all the messages in the stream up to (and including) the provided
+     * message.
+     *
+     * Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
+     *
+     * @param messageId
+     *            The {@code MessageId} to be cumulatively acknowledged
+     * @return a future that can be used to track the completion of the operation
+     */
+    CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId);
+
+    /**
+     * Get statistics for the consumer.
+     *
+     * <ul>
+     * <li>numMsgsReceived : Number of messages received in the current interval
+     * <li>numBytesReceived : Number of bytes received in the current interval
+     * <li>numReceiveFailed : Number of messages failed to receive in the current interval
+     * <li>numAcksSent : Number of acks sent in the current interval
+     * <li>numAcksFailed : Number of acks failed to send in the current interval
+     * <li>totalMsgsReceived : Total number of messages received
+     * <li>totalBytesReceived : Total number of bytes received
+     * <li>totalReceiveFailed : Total number of messages failed to receive
+     * <li>totalAcksSent : Total number of acks sent
+     * <li>totalAcksFailed : Total number of acks failed to sent
+     * </ul>
+     *
+     * @return statistic for the consumer
+     */
+    ConsumerStats getStats();
+
+    /**
+     * Close the consumer and stop the broker to push more messages.
+     */
+    @Override
+    void close() throws PulsarClientException;
+
+    /**
+     * Asynchronously close the consumer and stop the broker to push more messages
+     *
+     * @return a future that can be used to track the completion of the operation
+     */
+    CompletableFuture<Void> closeAsync();
+
+    /**
+     * Return true if the topic was terminated and this consumer has already consumed all the messages in the topic.
+     *
+     * Please note that this does not simply mean that the consumer is caught up with the last message published by
+     * producers, rather the topic needs to be explicitly "terminated".
+     */
+    boolean hasReachedEndOfTopic();
+
+    /**
+     * Redelivers all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not
+     * active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all
+     * the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
+     * breaks, the messages are redelivered after reconnect.
+     */
+    void redeliverUnacknowledgedMessages();
+
+    /**
+     * Reset the subscription associated with this consumer to a specific message id.
+     * <p>
+     *
+     * The message id can either be a specific message or represent the first or last messages in the topic.
+     * <p>
+     * <ul>
+     * <li><code>MessageId.earliest</code> : Reset the subscription on the earliest message available in the topic
+     * <li><code>MessageId.latest</code> : Reset the subscription on the latest message in the topic
+     * </ul>
+     *
+     * Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on
+     * the individual partitions.
+     *
+     * @param messageId
+     *            the message id where to reposition the subscription
+     */
+    void seek(MessageId messageId) throws PulsarClientException;
+
+    /**
+     * Reset the subscription associated with this consumer to a specific message id.
+     * <p>
+     *
+     * The message id can either be a specific message or represent the first or last messages in the topic.
+     * <p>
+     * <ul>
+     * <li><code>MessageId.earliest</code> : Reset the subscription on the earliest message available in the topic
+     * <li><code>MessageId.latest</code> : Reset the subscription on the latest message in the topic
+     * </ul>
+     *
+     * Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on
+     * the individual partitions.
+     *
+     * @param messageId
+     *            the message id where to reposition the subscription
+     * @return a future to track the completion of the seek operation
+     */
+    CompletableFuture<Void> seekAsync(MessageId messageId);
+
+    /**
+     * @return Whether the consumer is connected to the broker
+     */
+    boolean isConnected();
+
+    /**
+     * Get the name of consumer.
+     * @return consumer name.
+     */
+    String getConsumerName();
+
+    /**
+     * Stop requesting new messages from the broker until {@link #resume()} is called. Note that this might cause
+     * {@link #receive()} to block until {@link #resume()} is called and new messages are pushed by the broker.
+     */
+    void pause();
+
+    /**
+     * Resume requesting messages from the broker.
+     */
+    void resume();
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
new file mode 100644
index 0000000..5ffba38
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+/**
+ * Class specifying the configuration of a consumer. In Exclusive subscription, only a single consumer is allowed to
+ * attach to the subscription. Other consumers will get an error message. In Shared subscription, multiple consumers
+ * will be able to use the same subscription name and the messages will be dispatched in a round robin fashion.
+ *
+ * @deprecated Use {@link PulsarClient#newConsumer} to build and configure a {@link Consumer} instance
+ */
+@Deprecated
+public class ConsumerConfiguration implements Serializable {
+
+    /**
+     * Resend shouldn't be requested before minAckTimeoutMillis.
+     */
+    static long minAckTimeoutMillis = 1000;
+
+    private static final long serialVersionUID = 1L;
+
+    private final ConsumerConfigurationData<byte[]> conf = new ConsumerConfigurationData<>();
+
+    private boolean initializeSubscriptionOnLatest = true;
+
+    public ConsumerConfiguration() {
+        // Disable acknowledgment grouping when using v1 API
+        conf.setAcknowledgementsGroupTimeMicros(0);
+    }
+
+    /**
+     * @return the configured timeout in milliseconds for unacked messages.
+     */
+    public long getAckTimeoutMillis() {
+        return conf.getAckTimeoutMillis();
+    }
+
+    /**
+     * Set the timeout for unacked messages, truncated to the nearest millisecond. The timeout needs to be greater than
+     * 10 seconds.
+     *
+     * @param ackTimeout
+     *            for unacked messages.
+     * @param timeUnit
+     *            unit in which the timeout is provided.
+     * @return {@link ConsumerConfiguration}
+     */
+    public ConsumerConfiguration setAckTimeout(long ackTimeout, TimeUnit timeUnit) {
+        long ackTimeoutMillis = timeUnit.toMillis(ackTimeout);
+        checkArgument(ackTimeoutMillis >= minAckTimeoutMillis,
+                "Ack timeout should be should be greater than " + minAckTimeoutMillis + " ms");
+        conf.setAckTimeoutMillis(timeUnit.toMillis(ackTimeout));
+        return this;
+    }
+
+    /**
+     * @return the configured subscription type
+     */
+    public SubscriptionType getSubscriptionType() {
+        return conf.getSubscriptionType();
+    }
+
+    /**
+     * Select the subscription type to be used when subscribing to the topic.
+     * <p>
+     * Default is {@link SubscriptionType#Exclusive}
+     *
+     * @param subscriptionType
+     *            the subscription type value
+     */
+    public ConsumerConfiguration setSubscriptionType(SubscriptionType subscriptionType) {
+        checkNotNull(subscriptionType);
+        conf.setSubscriptionType(subscriptionType);
+        return this;
+    }
+
+    /**
+     * @return the configured {@link MessageListener} for the consumer
+     */
+    public MessageListener<byte[]> getMessageListener() {
+        return conf.getMessageListener();
+    }
+
+    /**
+     * Sets a {@link MessageListener} for the consumer
+     * <p>
+     * When a {@link MessageListener} is set, application will receive messages through it. Calls to
+     * {@link Consumer#receive()} will not be allowed.
+     *
+     * @param messageListener
+     *            the listener object
+     */
+    public ConsumerConfiguration setMessageListener(MessageListener<byte[]> messageListener) {
+        checkNotNull(messageListener);
+        conf.setMessageListener(messageListener);
+        return this;
+    }
+
+    /**
+     * @return this configured {@link ConsumerEventListener} for the consumer.
+     * @see #setConsumerEventListener(ConsumerEventListener)
+     * @since 2.0
+     */
+    public ConsumerEventListener getConsumerEventListener() {
+        return conf.getConsumerEventListener();
+    }
+
+    /**
+     * Sets a {@link ConsumerEventListener} for the consumer.
+     *
+     * <p>
+     * The consumer group listener is used for receiving consumer state change in a consumer group for failover
+     * subscription. Application can then react to the consumer state changes.
+     *
+     * <p>
+     * This change is experimental. It is subject to changes coming in release 2.0.
+     *
+     * @param listener
+     *            the consumer group listener object
+     * @return consumer configuration
+     * @since 2.0
+     */
+    public ConsumerConfiguration setConsumerEventListener(ConsumerEventListener listener) {
+        checkNotNull(listener);
+        conf.setConsumerEventListener(listener);
+        return this;
+    }
+
+    /**
+     * @return the configure receiver queue size value
+     */
+    public int getReceiverQueueSize() {
+        return conf.getReceiverQueueSize();
+    }
+
+    /**
+     * @return the configured max total receiver queue size across partitions
+     */
+    public int getMaxTotalReceiverQueueSizeAcrossPartitions() {
+        return conf.getMaxTotalReceiverQueueSizeAcrossPartitions();
+    }
+
+    /**
+     * Set the max total receiver queue size across partitons.
+     * <p>
+     * This setting will be used to reduce the receiver queue size for individual partitions
+     * {@link #setReceiverQueueSize(int)} if the total exceeds this value (default: 50000).
+     *
+     * @param maxTotalReceiverQueueSizeAcrossPartitions
+     */
+    public void setMaxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions) {
+        checkArgument(maxTotalReceiverQueueSizeAcrossPartitions >= conf.getReceiverQueueSize());
+        conf.setMaxTotalReceiverQueueSizeAcrossPartitions(maxTotalReceiverQueueSizeAcrossPartitions);
+    }
+
+    /**
+     * @return the CryptoKeyReader
+     */
+    public CryptoKeyReader getCryptoKeyReader() {
+        return conf.getCryptoKeyReader();
+    }
+
+    /**
+     * Sets a {@link CryptoKeyReader}
+     *
+     * @param cryptoKeyReader
+     *            CryptoKeyReader object
+     */
+    public ConsumerConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
+        checkNotNull(cryptoKeyReader);
+        conf.setCryptoKeyReader(cryptoKeyReader);
+        return this;
+    }
+
+    /**
+     * Sets the ConsumerCryptoFailureAction to the value specified
+     *
+     * @param action
+     *            consumer action
+     */
+    public void setCryptoFailureAction(ConsumerCryptoFailureAction action) {
+        conf.setCryptoFailureAction(action);
+    }
+
+    /**
+     * @return The ConsumerCryptoFailureAction
+     */
+    public ConsumerCryptoFailureAction getCryptoFailureAction() {
+        return conf.getCryptoFailureAction();
+    }
+
+    /**
+     * Sets the size of the consumer receive queue.
+     * <p>
+     * The consumer receive queue controls how many messages can be accumulated by the {@link Consumer} before the
+     * application calls {@link Consumer#receive()}. Using a higher value could potentially increase the consumer
+     * throughput at the expense of bigger memory utilization.
+     * </p>
+     * <p>
+     * <b>Setting the consumer queue size as zero</b>
+     * <ul>
+     * <li>Decreases the throughput of the consumer, by disabling pre-fetching of messages. This approach improves the
+     * message distribution on shared subscription, by pushing messages only to the consumers that are ready to process
+     * them. Neither {@link Consumer#receive(int, TimeUnit)} nor Partitioned Topics can be used if the consumer queue
+     * size is zero. {@link Consumer#receive()} function call should not be interrupted when the consumer queue size is
+     * zero.</li>
+     * <li>Doesn't support Batch-Message: if consumer receives any batch-message then it closes consumer connection with
+     * broker and {@link Consumer#receive()} call will remain blocked while {@link Consumer#receiveAsync()} receives
+     * exception in callback. <b> consumer will not be able receive any further message unless batch-message in pipeline
+     * is removed</b></li>
+     * </ul>
+     * </p>
+     * Default value is {@code 1000} messages and should be good for most use cases.
+     *
+     * @param receiverQueueSize
+     *            the new receiver queue size value
+     */
+    public ConsumerConfiguration setReceiverQueueSize(int receiverQueueSize) {
+        checkArgument(receiverQueueSize >= 0, "Receiver queue size cannot be negative");
+        conf.setReceiverQueueSize(receiverQueueSize);
+        return this;
+    }
+
+    /**
+     * @return the consumer name
+     */
+    public String getConsumerName() {
+        return conf.getConsumerName();
+    }
+
+    /**
+     * Set the consumer name.
+     *
+     * @param consumerName
+     */
+    public ConsumerConfiguration setConsumerName(String consumerName) {
+        checkArgument(consumerName != null && !consumerName.equals(""));
+        conf.setConsumerName(consumerName);
+        return this;
+    }
+
+    public int getPriorityLevel() {
+        return conf.getPriorityLevel();
+    }
+
+    /**
+     * Sets priority level for the shared subscription consumers to which broker gives more priority while dispatching
+     * messages. Here, broker follows descending priorities. (eg: 0=max-priority, 1, 2,..) </br>
+     * In Shared subscription mode, broker will first dispatch messages to max priority-level consumers if they have
+     * permits, else broker will consider next priority level consumers. </br>
+     * If subscription has consumer-A with priorityLevel 0 and Consumer-B with priorityLevel 1 then broker will dispatch
+     * messages to only consumer-A until it runs out permit and then broker starts dispatching messages to Consumer-B.
+     *
+     * <pre>
+     * Consumer PriorityLevel Permits
+     * C1       0             2
+     * C2       0             1
+     * C3       0             1
+     * C4       1             2
+     * C5       1             1
+     * Order in which broker dispatches messages to consumers: C1, C2, C3, C1, C4, C5, C4
+     * </pre>
+     *
+     * @param priorityLevel
+     */
+    public void setPriorityLevel(int priorityLevel) {
+        conf.setPriorityLevel(priorityLevel);
+    }
+
+    public boolean getReadCompacted() {
+        return conf.isReadCompacted();
+    }
+
+    /**
+     * If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog
+     * of the topic. This means that, if the topic has been compacted, the consumer will only see the latest value for
+     * each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
+     * point, the messages will be sent as normal.
+     *
+     * readCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e.
+     * failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a
+     * shared subscription, will lead to the subscription call throwing a PulsarClientException.
+     *
+     * @param readCompacted
+     *            whether to read from the compacted topic
+     */
+    public ConsumerConfiguration setReadCompacted(boolean readCompacted) {
+        conf.setReadCompacted(readCompacted);
+        return this;
+    }
+
+    /**
+     * Set a name/value property with this consumer.
+     *
+     * @param key
+     * @param value
+     * @return
+     */
+    public ConsumerConfiguration setProperty(String key, String value) {
+        checkArgument(key != null);
+        checkArgument(value != null);
+        conf.getProperties().put(key, value);
+        return this;
+    }
+
+    /**
+     * Add all the properties in the provided map
+     *
+     * @param properties
+     * @return
+     */
+    public ConsumerConfiguration setProperties(Map<String, String> properties) {
+        conf.getProperties().putAll(properties);
+        return this;
+    }
+
+    public Map<String, String> getProperties() {
+        return conf.getProperties();
+    }
+
+    public ConsumerConfigurationData<byte[]> getConfigurationData() {
+        return conf;
+    }
+
+     /**
+     * @param subscriptionInitialPosition the initial position at which to set
+     * set cursor  when subscribing to the topic first time
+     * Default is {@value InitialPosition.Latest}
+     */
+    public ConsumerConfiguration setSubscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition) {
+        conf.setSubscriptionInitialPosition(subscriptionInitialPosition);
+        return this;
+    }
+
+    /**
+     * @return the configured {@link subscriptionInitailPosition} for the consumer
+     */
+    public SubscriptionInitialPosition getSubscriptionInitialPosition(){
+        return conf.getSubscriptionInitialPosition();
+    }
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java
new file mode 100644
index 0000000..4a9b99f
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pulsar.client.impl.MessageBuilderImpl;
+
+/**
+ * Message builder factory. Use this class to create messages to be send to the Pulsar producer
+ *
+ * @deprecated since 2.0. Use {@link TypedMessageBuilder} as returned by {@link Producer#newMessage()} to create a new
+ *             message builder.
+ */
+@Deprecated
+public interface MessageBuilder {
+
+    static MessageBuilder create() {
+        return new MessageBuilderImpl();
+    }
+
+    /**
+     * Finalize the immutable message
+     *
+     * @return a {@link Message} ready to be sent through a {@link Producer}
+     */
+    Message<byte[]> build();
+
+    /**
+     * Set the content of the message
+     *
+     * @param data
+     *            array containing the payload
+     */
+    MessageBuilder setContent(byte[] data);
+
+    /**
+     * Set the content of the message
+     *
+     * @param data
+     *            array containing the payload
+     * @param offset
+     *            offset into the data array
+     * @param length
+     *            length of the payload starting from the above offset
+     */
+    MessageBuilder setContent(byte[] data, int offset, int length);
+
+    /**
+     * Set the content of the message
+     *
+     * @param buf
+     *            a {@link ByteBuffer} with the payload of the message
+     */
+    MessageBuilder setContent(ByteBuffer buf);
+
+    /**
+     * Sets a new property on a message.
+     *
+     * @param name
+     *            the name of the property
+     * @param value
+     *            the associated value
+     */
+    MessageBuilder setProperty(String name, String value);
+
+    /**
+     * Add all the properties in the provided map
+     */
+    MessageBuilder setProperties(Map<String, String> properties);
+
+    /**
+     * Sets the key of the message for routing policy
+     *
+     * @param key
+     */
+    MessageBuilder setKey(String key);
+
+    /**
+     * Set the event time for a given message.
+     *
+     * <p>
+     * Applications can retrieve the event time by calling {@link Message#getEventTime()}.
+     *
+     * <p>
+     * Note: currently pulsar doesn't support event-time based index. so the subscribers can't seek the messages by
+     * event time.
+     *
+     * @since 1.20.0
+     */
+    MessageBuilder setEventTime(long timestamp);
+
+    /**
+     * Specify a custom sequence id for the message being published.
+     * <p>
+     * The sequence id can be used for deduplication purposes and it needs to follow these rules:
+     * <ol>
+     * <li><code>sequenceId >= 0</code>
+     * <li>Sequence id for a message needs to be greater than sequence id for earlier messages:
+     * <code>sequenceId(N+1) > sequenceId(N)</code>
+     * <li>It's not necessary for sequence ids to be consecutive. There can be holes between messages. Eg. the
+     * <code>sequenceId</code> could represent an offset or a cumulative size.
+     * </ol>
+     *
+     * @param sequenceId
+     *            the sequence id to assign to the current message
+     * @since 1.20.0
+     */
+    MessageBuilder setSequenceId(long sequenceId);
+
+    /**
+     * Override the replication clusters for this message.
+     *
+     * @param clusters
+     */
+    MessageBuilder setReplicationClusters(List<String> clusters);
+
+    /**
+     * Disable replication for this message.
+     */
+    MessageBuilder disableReplication();
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Producer.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Producer.java
new file mode 100644
index 0000000..4d69668
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Producer.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Producer object.
+ *
+ * The producer is used to publish messages on a topic
+ *
+ *
+ */
+public interface Producer extends Closeable {
+
+    /**
+     * @return the topic which producer is publishing to
+     */
+    String getTopic();
+
+    /**
+     * @return the producer name which could have been assigned by the system or specified by the client
+     */
+    String getProducerName();
+
+    /**
+     * Sends a message.
+     * <p>
+     * This call will be blocking until is successfully acknowledged by the Pulsar broker.
+     * <p>
+     * Use {@link #newMessage()} to specify more properties than just the value on the message to be sent.
+     *
+     * @param message
+     *            a message
+     * @return the message id assigned to the published message
+     * @throws PulsarClientException.TimeoutException
+     *             if the message was not correctly received by the system within the timeout period
+     * @throws PulsarClientException.AlreadyClosedException
+     *             if the producer was already closed
+     */
+    MessageId send(byte[] message) throws PulsarClientException;
+
+    /**
+     * Send a message asynchronously
+     * <p>
+     * When the producer queue is full, by default this method will complete the future with an exception
+     * {@link PulsarClientException.ProducerQueueIsFullError}
+     * <p>
+     * See {@link ProducerBuilder#maxPendingMessages(int)} to configure the producer queue size and
+     * {@link ProducerBuilder#blockIfQueueFull(boolean)} to change the blocking behavior.
+     * <p>
+     * Use {@link #newMessage()} to specify more properties than just the value on the message to be sent.
+     *
+     * @param message
+     *            a byte array with the payload of the message
+     * @return a future that can be used to track when the message will have been safely persisted
+     */
+    CompletableFuture<MessageId> sendAsync(byte[] message);
+
+    /**
+     * Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
+     *
+     * @throws PulsarClientException
+     * @since 2.1.0
+     * @see #flushAsync()
+     */
+    void flush() throws PulsarClientException;
+
+    /**
+     * Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
+     *
+     * @return a future that can be used to track when all the messages have been safely persisted.
+     * @since 2.1.0
+     * @see #flush()
+     */
+    CompletableFuture<Void> flushAsync();
+
+    /**
+     * Send a message
+     *
+     * @param message
+     *            a message
+     * @return the message id assigned to the published message
+     * @throws PulsarClientException.TimeoutException
+     *             if the message was not correctly received by the system within the timeout period
+     *
+     * @deprecated since 2.0. Use {@link TypedMessageBuilder} as returned by {@link Producer#newMessage()} to create a
+     *             new message builder.
+     */
+    @Deprecated
+    MessageId send(Message<byte[]> message) throws PulsarClientException;
+
+    /**
+     * Send a message asynchronously
+     * <p>
+     * When the returned {@link CompletableFuture} is marked as completed successfully, the provided message will
+     * contain the {@link MessageId} assigned by the broker to the published message.
+     * <p>
+     * Example:
+     *
+     * <pre>
+     * <code>Message msg = MessageBuilder.create().setContent(myContent).build();
+     * producer.sendAsync(msg).thenRun(v -> {
+     *    System.out.println("Published message: " + msg.getMessageId());
+     * }).exceptionally(e -> {
+     *    // Failed to publish
+     * });</code>
+     * </pre>
+     * <p>
+     * When the producer queue is full, by default this method will complete the future with an exception
+     * {@link PulsarClientException.ProducerQueueIsFullError}
+     * <p>
+     * See {@link ProducerBuilder#maxPendingMessages(int)} to configure the producer queue size and
+     * {@link ProducerBuilder#blockIfQueueFull(boolean)} to change the blocking behavior.
+     *
+     * @param message
+     *            a message
+     * @return a future that can be used to track when the message will have been safely persisted
+     * @deprecated since 2.0. Use {@link TypedMessageBuilder} as returned by {@link Producer#newMessage()} to create a
+     *             new message builder.
+     */
+    @Deprecated
+    CompletableFuture<MessageId> sendAsync(Message<byte[]> message);
+
+    /**
+     * Get the last sequence id that was published by this producer.
+     * <p>
+     * This represent either the automatically assigned or custom sequence id (set on the {@link MessageBuilder}) that
+     * was published and acknowledged by the broker.
+     * <p>
+     * After recreating a producer with the same producer name, this will return the last message that was published in
+     * the previous producer session, or -1 if there no message was ever published.
+     *
+     * @return the last sequence id published by this producer
+     */
+    long getLastSequenceId();
+
+    /**
+     * Get statistics for the producer
+     *
+     * <ul>
+     * <li>numMsgsSent : Number of messages sent in the current interval
+     * <li>numBytesSent : Number of bytes sent in the current interval
+     * <li>numSendFailed : Number of messages failed to send in the current interval
+     * <li>numAcksReceived : Number of acks received in the current interval
+     * <li>totalMsgsSent : Total number of messages sent
+     * <li>totalBytesSent : Total number of bytes sent
+     * <li>totalSendFailed : Total number of messages failed to send
+     * <li>totalAcksReceived: Total number of acks received
+     * </ul>
+     *
+     * @return statistic for the producer or null if ProducerStatsRecorderImpl is disabled.
+     */
+    ProducerStats getStats();
+
+    /**
+     * Close the producer and releases resources allocated.
+     *
+     * No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
+     * of errors, pending writes will not be retried.
+     *
+     * @throws PulsarClientException.AlreadyClosedException
+     *             if the producer was already closed
+     */
+    @Override
+    void close() throws PulsarClientException;
+
+    /**
+     * Close the producer and releases resources allocated.
+     *
+     * No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
+     * of errors, pending writes will not be retried.
+     *
+     * @return a future that can used to track when the producer has been closed
+     */
+    CompletableFuture<Void> closeAsync();
+
+    /**
+     * @return Whether the producer is connected to the broker
+     */
+    boolean isConnected();
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
new file mode 100644
index 0000000..1af077b
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+
+import lombok.EqualsAndHashCode;
+
+/**
+ * Producer's configuration
+ *
+ * @deprecated use {@link PulsarClient#newProducer()} to construct and configure a {@link Producer} instance
+ */
+@Deprecated
+@EqualsAndHashCode
+public class ProducerConfiguration implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final ProducerConfigurationData conf = new ProducerConfigurationData();
+
+    @Deprecated
+    public enum MessageRoutingMode {
+        SinglePartition, RoundRobinPartition, CustomPartition
+    }
+
+    @Deprecated
+    public enum HashingScheme {
+        JavaStringHash, Murmur3_32Hash
+    }
+
+    /**
+     * @return the configured custom producer name or null if no custom name was specified
+     * @since 1.20.0
+     */
+    public String getProducerName() {
+        return conf.getProducerName();
+    }
+
+    /**
+     * Specify a name for the producer
+     * <p>
+     * If not assigned, the system will generate a globally unique name which can be access with
+     * {@link Producer#getProducerName()}.
+     * <p>
+     * When specifying a name, it is app to the user to ensure that, for a given topic, the producer name is unique
+     * across all Pulsar's clusters.
+     * <p>
+     * If a producer with the same name is already connected to a particular topic, the
+     * {@link PulsarClient#createProducer(String)} operation will fail with {@link ProducerBusyException}.
+     *
+     * @param producerName
+     *            the custom name to use for the producer
+     * @since 1.20.0
+     */
+    public void setProducerName(String producerName) {
+        conf.setProducerName(producerName);
+    }
+
+    /**
+     * @return the message send timeout in ms
+     */
+    public long getSendTimeoutMs() {
+        return conf.getSendTimeoutMs();
+    }
+
+    /**
+     * Set the send timeout <i>(default: 30 seconds)</i>
+     * <p>
+     * If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.
+     *
+     * @param sendTimeout
+     *            the send timeout
+     * @param unit
+     *            the time unit of the {@code sendTimeout}
+     */
+    public ProducerConfiguration setSendTimeout(int sendTimeout, TimeUnit unit) {
+        checkArgument(sendTimeout >= 0);
+        conf.setSendTimeoutMs(unit.toMillis(sendTimeout));
+        return this;
+    }
+
+    /**
+     * @return the maximum number of messages allowed in the outstanding messages queue for the producer
+     */
+    public int getMaxPendingMessages() {
+        return conf.getMaxPendingMessages();
+    }
+
+    /**
+     * Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
+     * <p>
+     * When the queue is full, by default, all calls to {@link Producer#send} and {@link Producer#sendAsync} will fail
+     * unless blockIfQueueFull is set to true. Use {@link #setBlockIfQueueFull} to change the blocking behavior.
+     *
+     * @param maxPendingMessages
+     * @return
+     */
+    public ProducerConfiguration setMaxPendingMessages(int maxPendingMessages) {
+        checkArgument(maxPendingMessages > 0);
+        conf.setMaxPendingMessages(maxPendingMessages);
+        return this;
+    }
+
+    public HashingScheme getHashingScheme() {
+        return HashingScheme.valueOf(conf.getHashingScheme().toString());
+    }
+
+    public ProducerConfiguration setHashingScheme(HashingScheme hashingScheme) {
+        conf.setHashingScheme(org.apache.pulsar.client.api.HashingScheme.valueOf(hashingScheme.toString()));
+        return this;
+    }
+
+    /**
+     *
+     * @return the maximum number of pending messages allowed across all the partitions
+     */
+    public int getMaxPendingMessagesAcrossPartitions() {
+        return conf.getMaxPendingMessagesAcrossPartitions();
+    }
+
+    /**
+     * Set the number of max pending messages across all the partitions
+     * <p>
+     * This setting will be used to lower the max pending messages for each partition
+     * ({@link #setMaxPendingMessages(int)}), if the total exceeds the configured value.
+     *
+     * @param maxPendingMessagesAcrossPartitions
+     */
+    public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
+        checkArgument(maxPendingMessagesAcrossPartitions >= conf.getMaxPendingMessages());
+        conf.setMaxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions);
+    }
+
+    /**
+     *
+     * @return whether the producer will block {@link Producer#send} and {@link Producer#sendAsync} operations when the
+     *         pending queue is full
+     */
+    public boolean getBlockIfQueueFull() {
+        return conf.isBlockIfQueueFull();
+    }
+
+    /**
+     * Set whether the {@link Producer#send} and {@link Producer#sendAsync} operations should block when the outgoing
+     * message queue is full.
+     * <p>
+     * Default is <code>false</code>. If set to <code>false</code>, send operations will immediately fail with
+     * {@link PulsarClientException.ProducerQueueIsFullError} when there is no space left in pending queue.
+     *
+     * @param blockIfQueueFull
+     *            whether to block {@link Producer#send} and {@link Producer#sendAsync} operations on queue full
+     * @return
+     */
+    public ProducerConfiguration setBlockIfQueueFull(boolean blockIfQueueFull) {
+        conf.setBlockIfQueueFull(blockIfQueueFull);
+        return this;
+    }
+
+    /**
+     * Set the message routing mode for the partitioned producer.
+     *
+     * @param messageRouteMode message routing mode.
+     * @return producer configuration
+     * @see MessageRoutingMode
+     */
+    public ProducerConfiguration setMessageRoutingMode(MessageRoutingMode messageRouteMode) {
+        checkNotNull(messageRouteMode);
+        conf.setMessageRoutingMode(
+                org.apache.pulsar.client.api.MessageRoutingMode.valueOf(messageRouteMode.toString()));
+        return this;
+    }
+
+    /**
+     * Get the message routing mode for the partitioned producer.
+     *
+     * @return message routing mode, default is round-robin routing.
+     * @see MessageRoutingMode#RoundRobinPartition
+     */
+    public MessageRoutingMode getMessageRoutingMode() {
+        return MessageRoutingMode.valueOf(conf.getMessageRoutingMode().toString());
+    }
+
+    /**
+     * Set the compression type for the producer.
+     * <p>
+     * By default, message payloads are not compressed. Supported compression types are:
+     * <ul>
+     * <li><code>CompressionType.LZ4</code></li>
+     * <li><code>CompressionType.ZLIB</code></li>
+     * </ul>
+     *
+     * @param compressionType
+     * @return
+     *
+     * @since 1.0.28 <br>
+     *        Make sure all the consumer applications have been updated to use this client version, before starting to
+     *        compress messages.
+     */
+    public ProducerConfiguration setCompressionType(CompressionType compressionType) {
+        conf.setCompressionType(compressionType);
+        return this;
+    }
+
+    /**
+     * @return the configured compression type for this producer
+     */
+    public CompressionType getCompressionType() {
+        return conf.getCompressionType();
+    }
+
+    /**
+     * Set a custom message routing policy by passing an implementation of MessageRouter
+     *
+     *
+     * @param messageRouter
+     */
+    public ProducerConfiguration setMessageRouter(MessageRouter messageRouter) {
+        checkNotNull(messageRouter);
+        setMessageRoutingMode(MessageRoutingMode.CustomPartition);
+        conf.setCustomMessageRouter(messageRouter);
+        return this;
+    }
+
+    /**
+     * Get the message router set by {@link #setMessageRouter(MessageRouter)}.
+     *
+     * @return message router.
+     * @deprecated since 1.22.0-incubating. <tt>numPartitions</tt> is already passed as parameter in
+     *             {@link MessageRouter#choosePartition(Message, TopicMetadata)}.
+     * @see MessageRouter
+     */
+    @Deprecated
+    public MessageRouter getMessageRouter(int numPartitions) {
+        return conf.getCustomMessageRouter();
+    }
+
+    /**
+     * Get the message router set by {@link #setMessageRouter(MessageRouter)}.
+     *
+     * @return message router set by {@link #setMessageRouter(MessageRouter)}.
+     */
+    public MessageRouter getMessageRouter() {
+        return conf.getCustomMessageRouter();
+    }
+
+    /**
+     * Return the flag whether automatic message batching is enabled or not.
+     *
+     * @return true if batch messages are enabled. otherwise false.
+     * @since 2.0.0 <br>
+     *        It is enabled by default.
+     */
+    public boolean getBatchingEnabled() {
+        return conf.isBatchingEnabled();
+    }
+
+    /**
+     * Control whether automatic batching of messages is enabled for the producer. <i>default: false [No batching]</i>
+     *
+     * When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the
+     * broker, leading to better throughput, especially when publishing small messages. If compression is enabled,
+     * messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or
+     * contents.
+     *
+     * When enabled default batch delay is set to 10 ms and default batch size is 1000 messages
+     *
+     * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit)
+     * @since 1.0.36 <br>
+     *        Make sure all the consumer applications have been updated to use this client version, before starting to
+     *        batch messages.
+     *
+     */
+    public ProducerConfiguration setBatchingEnabled(boolean batchMessagesEnabled) {
+        conf.setBatchingEnabled(batchMessagesEnabled);
+        return this;
+    }
+
+    /**
+     * @return the CryptoKeyReader
+     */
+    public CryptoKeyReader getCryptoKeyReader() {
+        return conf.getCryptoKeyReader();
+    }
+
+    /**
+     * Sets a {@link CryptoKeyReader}
+     *
+     * @param cryptoKeyReader
+     *            CryptoKeyReader object
+     */
+    public ProducerConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
+        checkNotNull(cryptoKeyReader);
+        conf.setCryptoKeyReader(cryptoKeyReader);
+        return this;
+    }
+
+    /**
+     *
+     * @return encryptionKeys
+     *
+     */
+    public Set<String> getEncryptionKeys() {
+        return conf.getEncryptionKeys();
+    }
+
+    /**
+     *
+     * Returns true if encryption keys are added
+     *
+     */
+    public boolean isEncryptionEnabled() {
+        return conf.isEncryptionEnabled();
+    }
+
+    /**
+     * Add public encryption key, used by producer to encrypt the data key.
+     *
+     * At the time of producer creation, Pulsar client checks if there are keys added to encryptionKeys. If keys are
+     * found, a callback getKey(String keyName) is invoked against each key to load the values of the key. Application
+     * should implement this callback to return the key in pkcs8 format. If compression is enabled, message is encrypted
+     * after compression. If batch messaging is enabled, the batched message is encrypted.
+     *
+     */
+    public void addEncryptionKey(String key) {
+        conf.getEncryptionKeys().add(key);
+    }
+
+    public void removeEncryptionKey(String key) {
+        conf.getEncryptionKeys().remove(key);
+    }
+
+    /**
+     * Sets the ProducerCryptoFailureAction to the value specified
+     *
+     * @param action
+     *            The producer action
+     */
+    public void setCryptoFailureAction(ProducerCryptoFailureAction action) {
+        conf.setCryptoFailureAction(action);
+    }
+
+    /**
+     * @return The ProducerCryptoFailureAction
+     */
+    public ProducerCryptoFailureAction getCryptoFailureAction() {
+        return conf.getCryptoFailureAction();
+    }
+
+    /**
+     *
+     * @return the batch time period in ms.
+     * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit)
+     */
+    public long getBatchingMaxPublishDelayMs() {
+        return TimeUnit.MICROSECONDS.toMillis(conf.getBatchingMaxPublishDelayMicros());
+    }
+
+    /**
+     * Set the time period within which the messages sent will be batched <i>default: 10ms</i> if batch messages are
+     * enabled. If set to a non zero value, messages will be queued until this time interval or until
+     *
+     * @see ProducerConfiguration#batchingMaxMessages threshold is reached; all messages will be published as a single
+     *      batch message. The consumer will be delivered individual messages in the batch in the same order they were
+     *      enqueued
+     * @since 1.0.36 <br>
+     *        Make sure all the consumer applications have been updated to use this client version, before starting to
+     *        batch messages.
+     * @param batchDelay
+     *            the batch delay
+     * @param timeUnit
+     *            the time unit of the {@code batchDelay}
+     * @return
+     */
+    public ProducerConfiguration setBatchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit) {
+        long delayInMs = timeUnit.toMillis(batchDelay);
+        checkArgument(delayInMs >= 1, "configured value for batch delay must be at least 1ms");
+        conf.setBatchingMaxPublishDelayMicros(timeUnit.toMicros(batchDelay));
+        return this;
+    }
+
+    /**
+     *
+     * @return the maximum number of messages permitted in a batch.
+     */
+    public int getBatchingMaxMessages() {
+        return conf.getBatchingMaxMessages();
+    }
+
+    /**
+     * Set the maximum number of messages permitted in a batch. <i>default: 1000</i> If set to a value greater than 1,
+     * messages will be queued until this threshold is reached or batch interval has elapsed
+     *
+     * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit) All messages in batch will be published as
+     *      a single batch message. The consumer will be delivered individual messages in the batch in the same order
+     *      they were enqueued
+     * @param batchMessagesMaxMessagesPerBatch
+     *            maximum number of messages in a batch
+     * @return
+     */
+    public ProducerConfiguration setBatchingMaxMessages(int batchMessagesMaxMessagesPerBatch) {
+        checkArgument(batchMessagesMaxMessagesPerBatch > 0);
+        conf.setBatchingMaxMessages(batchMessagesMaxMessagesPerBatch);
+        return this;
+    }
+
+    public Optional<Long> getInitialSequenceId() {
+        return Optional.ofNullable(conf.getInitialSequenceId());
+    }
+
+    /**
+     * Set the baseline for the sequence ids for messages published by the producer.
+     * <p>
+     * First message will be using (initialSequenceId + 1) as its sequence id and subsequent messages will be assigned
+     * incremental sequence ids, if not otherwise specified.
+     *
+     * @param initialSequenceId
+     * @return
+     */
+    public ProducerConfiguration setInitialSequenceId(long initialSequenceId) {
+        conf.setInitialSequenceId(initialSequenceId);
+        return this;
+    }
+
+    /**
+     * Set a name/value property with this producer.
+     *
+     * @param key
+     * @param value
+     * @return
+     */
+    public ProducerConfiguration setProperty(String key, String value) {
+        checkArgument(key != null);
+        checkArgument(value != null);
+        conf.getProperties().put(key, value);
+        return this;
+    }
+
+    /**
+     * Add all the properties in the provided map
+     *
+     * @param properties
+     * @return
+     */
+    public ProducerConfiguration setProperties(Map<String, String> properties) {
+        conf.getProperties().putAll(properties);
+        return this;
+    }
+
+    public Map<String, String> getProperties() {
+        return conf.getProperties();
+    }
+
+    public ProducerConfigurationData getProducerConfigurationData() {
+        return conf;
+    }
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
new file mode 100644
index 0000000..a57a684
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.pulsar.client.impl.v1.PulsarClientV1Impl;
+
+/**
+ * Class that provides a client interface to Pulsar.
+ * <p>
+ * Client instances are thread-safe and can be reused for managing multiple {@link Producer}, {@link Consumer} and
+ * {@link Reader} instances.
+ */
+public interface PulsarClient extends Closeable {
+
+    /**
+     * Create a new PulsarClient object using default client configuration
+     *
+     * @param serviceUrl
+     *            the url of the Pulsar endpoint to be used
+     * @return a new pulsar client object
+     * @throws PulsarClientException.InvalidServiceURL
+     *             if the serviceUrl is invalid
+     * @deprecated use {@link #builder()} to construct a client instance
+     */
+    @Deprecated
+    public static PulsarClient create(String serviceUrl) throws PulsarClientException {
+        return create(serviceUrl, new ClientConfiguration());
+    }
+
+    /**
+     * Create a new PulsarClient object
+     *
+     * @param serviceUrl
+     *            the url of the Pulsar endpoint to be used
+     * @param conf
+     *            the client configuration
+     * @return a new pulsar client object
+     * @throws PulsarClientException.InvalidServiceURL
+     *             if the serviceUrl is invalid
+     * @deprecated use {@link #builder()} to construct a client instance
+     */
+    @Deprecated
+    public static PulsarClient create(String serviceUrl, ClientConfiguration conf) throws PulsarClientException {
+        return new PulsarClientV1Impl(serviceUrl, conf);
+    }
+
+    /**
+     * Create a producer with default {@link ProducerConfiguration} for publishing on a specific topic
+     *
+     * @param topic
+     *            The name of the topic where to produce
+     * @return The producer object
+     * @throws PulsarClientException.AlreadyClosedException
+     *             if the client was already closed
+     * @throws PulsarClientException.InvalidTopicNameException
+     *             if the topic name is not valid
+     * @throws PulsarClientException.AuthenticationException
+     *             if there was an error with the supplied credentials
+     * @throws PulsarClientException.AuthorizationException
+     *             if the authorization to publish on topic was denied
+     * @deprecated use {@link #newProducer()} to build a new producer
+     */
+    @Deprecated
+    Producer createProducer(String topic) throws PulsarClientException;
+
+    /**
+     * Asynchronously create a producer with default {@link ProducerConfiguration} for publishing on a specific topic
+     *
+     * @param topic
+     *            The name of the topic where to produce
+     * @return Future of the asynchronously created producer object
+     * @deprecated use {@link #newProducer()} to build a new producer
+     */
+    @Deprecated
+    CompletableFuture<Producer> createProducerAsync(String topic);
+
+    /**
+     * Create a producer with given {@code ProducerConfiguration} for publishing on a specific topic
+     *
+     * @param topic
+     *            The name of the topic where to produce
+     * @param conf
+     *            The {@code ProducerConfiguration} object
+     * @return The producer object
+     * @throws PulsarClientException
+     *             if it was not possible to create the producer
+     * @throws InterruptedException
+     * @deprecated use {@link #newProducer()} to build a new producer
+     */
+    @Deprecated
+    Producer createProducer(String topic, ProducerConfiguration conf) throws PulsarClientException;
+
+    /**
+     * Asynchronously create a producer with given {@code ProducerConfiguration} for publishing on a specific topic
+     *
+     * @param topic
+     *            The name of the topic where to produce
+     * @param conf
+     *            The {@code ProducerConfiguration} object
+     * @return Future of the asynchronously created producer object
+     * @deprecated use {@link #newProducer()} to build a new producer
+     */
+    @Deprecated
+    CompletableFuture<Producer> createProducerAsync(String topic, ProducerConfiguration conf);
+
+    /**
+     * Subscribe to the given topic and subscription combination with default {@code ConsumerConfiguration}
+     *
+     * @param topic
+     *            The name of the topic
+     * @param subscription
+     *            The name of the subscription
+     * @return The {@code Consumer} object
+     * @throws PulsarClientException
+     * @throws InterruptedException
+     *
+     * @deprecated Use {@link #newConsumer()} to build a new consumer
+     */
+    @Deprecated
+    Consumer subscribe(String topic, String subscription) throws PulsarClientException;
+
+    /**
+     * Asynchronously subscribe to the given topic and subscription combination using default
+     * {@code ConsumerConfiguration}
+     *
+     * @param topic
+     *            The topic name
+     * @param subscription
+     *            The subscription name
+     * @return Future of the {@code Consumer} object
+     * @deprecated Use {@link #newConsumer()} to build a new consumer
+     */
+    @Deprecated
+    CompletableFuture<Consumer> subscribeAsync(String topic, String subscription);
+
+    /**
+     * Subscribe to the given topic and subscription combination with given {@code ConsumerConfiguration}
+     *
+     * @param topic
+     *            The name of the topic
+     * @param subscription
+     *            The name of the subscription
+     * @param conf
+     *            The {@code ConsumerConfiguration} object
+     * @return The {@code Consumer} object
+     * @throws PulsarClientException
+     * @deprecated Use {@link #newConsumer()} to build a new consumer
+     */
+    @Deprecated
+    Consumer subscribe(String topic, String subscription, ConsumerConfiguration conf) throws PulsarClientException;
+
+    /**
+     * Asynchronously subscribe to the given topic and subscription combination using given
+     * {@code ConsumerConfiguration}
+     *
+     * @param topic
+     *            The name of the topic
+     * @param subscription
+     *            The name of the subscription
+     * @param conf
+     *            The {@code ConsumerConfiguration} object
+     * @return Future of the {@code Consumer} object
+     * @deprecated Use {@link #newConsumer()} to build a new consumer
+     */
+    @Deprecated
+    CompletableFuture<Consumer> subscribeAsync(String topic, String subscription, ConsumerConfiguration conf);
+
+    /**
+     * Create a topic reader with given {@code ReaderConfiguration} for reading messages from the specified topic.
+     * <p>
+     * The Reader provides a low-level abstraction that allows for manual positioning in the topic, without using a
+     * subscription. Reader can only work on non-partitioned topics.
+     * <p>
+     * The initial reader positioning is done by specifying a message id. The options are:
+     * <ul>
+     * <li><code>MessageId.earliest</code> : Start reading from the earliest message available in the topic
+     * <li><code>MessageId.latest</code> : Start reading from the end topic, only getting messages published after the
+     * reader was created
+     * <li><code>MessageId</code> : When passing a particular message id, the reader will position itself on that
+     * specific position. The first message to be read will be the message next to the specified messageId.
+     * </ul>
+     *
+     * @param topic
+     *            The name of the topic where to read
+     * @param startMessageId
+     *            The message id where the reader will position itself. The first message returned will be the one after
+     *            the specified startMessageId
+     * @param conf
+     *            The {@code ReaderConfiguration} object
+     * @return The {@code Reader} object
+     * @deprecated Use {@link #newReader()} to build a new reader
+     */
+    @Deprecated
+    Reader createReader(String topic, MessageId startMessageId, ReaderConfiguration conf) throws PulsarClientException;
+
+    /**
+     * Asynchronously create a topic reader with given {@code ReaderConfiguration} for reading messages from the
+     * specified topic.
+     * <p>
+     * The Reader provides a low-level abstraction that allows for manual positioning in the topic, without using a
+     * subscription. Reader can only work on non-partitioned topics.
+     * <p>
+     * The initial reader positioning is done by specifying a message id. The options are:
+     * <ul>
+     * <li><code>MessageId.earliest</code> : Start reading from the earliest message available in the topic
+     * <li><code>MessageId.latest</code> : Start reading from the end topic, only getting messages published after the
+     * reader was created
+     * <li><code>MessageId</code> : When passing a particular message id, the reader will position itself on that
+     * specific position. The first message to be read will be the message next to the specified messageId.
+     * </ul>
+     *
+     * @param topic
+     *            The name of the topic where to read
+     * @param startMessageId
+     *            The message id where the reader will position itself. The first message returned will be the one after
+     *            the specified startMessageId
+     * @param conf
+     *            The {@code ReaderConfiguration} object
+     * @return Future of the asynchronously created producer object
+     * @deprecated Use {@link #newReader()} to build a new reader
+     */
+    @Deprecated
+    CompletableFuture<Reader> createReaderAsync(String topic, MessageId startMessageId, ReaderConfiguration conf);
+
+    /**
+     * Close the PulsarClient and release all the resources.
+     *
+     * All the producers and consumers will be orderly closed. Waits until all pending write request are persisted.
+     *
+     * @throws PulsarClientException
+     *             if the close operation fails
+     */
+    @Override
+    void close() throws PulsarClientException;
+
+    /**
+     * Asynchronously close the PulsarClient and release all the resources.
+     *
+     * All the producers and consumers will be orderly closed. Waits until all pending write request are persisted.
+     *
+     * @throws PulsarClientException
+     *             if the close operation fails
+     */
+    CompletableFuture<Void> closeAsync();
+
+    /**
+     * Perform immediate shutdown of PulsarClient.
+     *
+     * Release all the resources and close all the producers without waiting for ongoing operations to complete.
+     *
+     * @throws PulsarClientException
+     *             if the forceful shutdown fails
+     */
+    void shutdown() throws PulsarClientException;
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Reader.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Reader.java
new file mode 100644
index 0000000..4a435c3
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Reader.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A Reader can be used to scan through all the messages currently available in a topic.
+ *
+ */
+public interface Reader extends Closeable {
+
+    /**
+     * @return the topic from which this reader is reading from
+     */
+    String getTopic();
+
+    /**
+     * Read the next message in the topic
+     *
+     * @return the next messasge
+     * @throws PulsarClientException
+     */
+    Message<byte[]> readNext() throws PulsarClientException;
+
+    /**
+     * Read the next message in the topic waiting for a maximum of timeout
+     * time units. Returns null if no message is recieved in that time.
+     *
+     * @return the next message(Could be null if none received in time)
+     * @throws PulsarClientException
+     */
+    Message<byte[]> readNext(int timeout, TimeUnit unit) throws PulsarClientException;
+
+    CompletableFuture<Message<byte[]>> readNextAsync();
+
+    /**
+     * Asynchronously close the reader and stop the broker to push more messages
+     *
+     * @return a future that can be used to track the completion of the operation
+     */
+    CompletableFuture<Void> closeAsync();
+
+    /**
+     * Return true if the topic was terminated and this reader has reached the end of the topic
+     */
+    boolean hasReachedEndOfTopic();
+
+    /**
+     * Check if there is any message available to read from the current position.
+     */
+    boolean hasMessageAvailable() throws PulsarClientException;
+
+    /**
+     * Asynchronously Check if there is message that has been published successfully to the broker in the topic.
+     */
+    CompletableFuture<Boolean> hasMessageAvailableAsync();
+
+    /**
+     * @return Whether the reader is connected to the broker
+     */
+    boolean isConnected();
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
new file mode 100644
index 0000000..243166c
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import org.apache.commons.lang3.StringUtils;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+
+/**
+ *
+ * @deprecated Use {@link PulsarClient#newReader()} to construct and configure a {@link Reader} instance
+ */
+@Deprecated
+public class ReaderConfiguration implements Serializable {
+
+    private final ReaderConfigurationData<byte[]> conf = new ReaderConfigurationData<>();
+
+    /**
+     * @return the configured {@link ReaderListener} for the reader
+     */
+    public ReaderListener<byte[]> getReaderListener() {
+        return conf.getReaderListener();
+    }
+
+    /**
+     * Sets a {@link ReaderListener} for the reader
+     * <p>
+     * When a {@link ReaderListener} is set, application will receive messages through it. Calls to
+     * {@link Reader#readNext()} will not be allowed.
+     *
+     * @param readerListener
+     *            the listener object
+     */
+    public ReaderConfiguration setReaderListener(ReaderListener<byte[]> readerListener) {
+        checkNotNull(readerListener);
+        conf.setReaderListener(readerListener);
+        return this;
+    }
+
+    /**
+     * @return the configure receiver queue size value
+     */
+    public int getReceiverQueueSize() {
+        return conf.getReceiverQueueSize();
+    }
+
+    /**
+     * @return the CryptoKeyReader
+     */
+    public CryptoKeyReader getCryptoKeyReader() {
+        return conf.getCryptoKeyReader();
+    }
+
+    /**
+     * Sets a {@link CryptoKeyReader}
+     *
+     * @param cryptoKeyReader
+     *            CryptoKeyReader object
+     */
+    public ReaderConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
+        checkNotNull(cryptoKeyReader);
+        conf.setCryptoKeyReader(cryptoKeyReader);
+        return this;
+    }
+
+    /**
+     * Sets the ConsumerCryptoFailureAction to the value specified
+     * 
+     * @param action
+     *            The action to take when the decoding fails
+     */
+    public void setCryptoFailureAction(ConsumerCryptoFailureAction action) {
+        conf.setCryptoFailureAction(action);
+    }
+
+    /**
+     * @return The ConsumerCryptoFailureAction
+     */
+    public ConsumerCryptoFailureAction getCryptoFailureAction() {
+        return conf.getCryptoFailureAction();
+    }
+
+    /**
+     * Sets the size of the consumer receive queue.
+     * <p>
+     * The consumer receive queue controls how many messages can be accumulated by the {@link Consumer} before the
+     * application calls {@link Consumer#receive()}. Using a higher value could potentially increase the consumer
+     * throughput at the expense of bigger memory utilization.
+     * </p>
+     * Default value is {@code 1000} messages and should be good for most use cases.
+     *
+     * @param receiverQueueSize
+     *            the new receiver queue size value
+     */
+    public ReaderConfiguration setReceiverQueueSize(int receiverQueueSize) {
+        checkArgument(receiverQueueSize >= 0, "Receiver queue size cannot be negative");
+        conf.setReceiverQueueSize(receiverQueueSize);
+        return this;
+    }
+
+    /**
+     * @return the consumer name
+     */
+    public String getReaderName() {
+        return conf.getReaderName();
+    }
+
+    /**
+     * Set the consumer name.
+     *
+     * @param readerName
+     */
+    public ReaderConfiguration setReaderName(String readerName) {
+        checkArgument(StringUtils.isNotBlank(readerName));
+        conf.setReaderName(readerName);
+        return this;
+    }
+
+    /**
+     * @return the subscription role prefix for subscription auth
+     */
+    public String getSubscriptionRolePrefix() {
+        return conf.getSubscriptionRolePrefix();
+    }
+
+    /**
+     * Set the subscription role prefix for subscription auth. The default prefix is "reader".
+     *
+     * @param subscriptionRolePrefix
+     */
+    public ReaderConfiguration setSubscriptionRolePrefix(String subscriptionRolePrefix) {
+        checkArgument(StringUtils.isNotBlank(subscriptionRolePrefix));
+        conf.setSubscriptionRolePrefix(subscriptionRolePrefix);
+        return this;
+    }
+
+    public ReaderConfigurationData<byte[]> getReaderConfigurationData() {
+        return conf;
+    }
+
+    private static final long serialVersionUID = 1L;
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java
new file mode 100644
index 0000000..76b1dab
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.base.Preconditions;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+
+@SuppressWarnings("deprecation")
+public class MessageBuilderImpl implements MessageBuilder {
+    private static final ByteBuffer EMPTY_CONTENT = ByteBuffer.allocate(0);
+    private final MessageMetadata.Builder msgMetadataBuilder = MessageMetadata.newBuilder();
+    private ByteBuffer content = EMPTY_CONTENT;
+
+    @Override
+    public Message<byte[]> build() {
+        return MessageImpl.create(msgMetadataBuilder, content, Schema.BYTES);
+    }
+
+    @Override
+    public MessageBuilder setContent(byte[] data) {
+        setContent(data, 0, data.length);
+        return this;
+    }
+
+    @Override
+    public MessageBuilder setContent(byte[] data, int offet, int length) {
+        this.content = ByteBuffer.wrap(data, offet, length);
+        return this;
+    }
+
+    @Override
+    public MessageBuilder setContent(ByteBuffer buf) {
+        this.content = buf.duplicate();
+        return this;
+    }
+
+    @Override
+    public MessageBuilder setProperties(Map<String, String> properties) {
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            msgMetadataBuilder
+                    .addProperties(KeyValue.newBuilder().setKey(entry.getKey()).setValue(entry.getValue()).build());
+        }
+
+        return this;
+    }
+
+    @Override
+    public MessageBuilder setProperty(String name, String value) {
+        msgMetadataBuilder.addProperties(KeyValue.newBuilder().setKey(name).setValue(value).build());
+        return this;
+    }
+
+    @Override
+    public MessageBuilder setKey(String key) {
+        msgMetadataBuilder.setPartitionKey(key);
+        return this;
+    }
+
+    @Override
+    public MessageBuilder setEventTime(long timestamp) {
+        checkArgument(timestamp > 0, "Invalid timestamp : '%s'", timestamp);
+        msgMetadataBuilder.setEventTime(timestamp);
+        return this;
+    }
+
+    @Override
+    public MessageBuilder setSequenceId(long sequenceId) {
+        checkArgument(sequenceId >= 0);
+        msgMetadataBuilder.setSequenceId(sequenceId);
+        return this;
+    }
+
+    @Override
+    public MessageBuilder setReplicationClusters(List<String> clusters) {
+        Preconditions.checkNotNull(clusters);
+        msgMetadataBuilder.clearReplicateTo();
+        msgMetadataBuilder.addAllReplicateTo(clusters);
+        return this;
+    }
+
+    @Override
+    public MessageBuilder disableReplication() {
+        msgMetadataBuilder.clearReplicateTo();
+        msgMetadataBuilder.addReplicateTo("__local__");
+        return this;
+    }
+
+
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java
new file mode 100644
index 0000000..e21e572
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v1;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerStats;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+public class ConsumerV1Impl implements Consumer {
+    private final org.apache.pulsar.shade.client.api.v2.Consumer<byte[]> consumer;
+
+    public ConsumerV1Impl(org.apache.pulsar.shade.client.api.v2.Consumer<byte[]> consumer) {
+        this.consumer = consumer;
+    }
+
+    @Override
+    public void acknowledge(Message<?> arg0) throws PulsarClientException {
+        consumer.acknowledge(arg0);
+    }
+
+    @Override
+    public void acknowledge(MessageId arg0) throws PulsarClientException {
+        consumer.acknowledge(arg0);
+    }
+
+    @Override
+    public CompletableFuture<Void> acknowledgeAsync(Message<?> arg0) {
+        return consumer.acknowledgeAsync(arg0);
+    }
+
+    @Override
+    public CompletableFuture<Void> acknowledgeAsync(MessageId arg0) {
+        return consumer.acknowledgeAsync(arg0);
+    }
+
+    @Override
+    public void acknowledgeCumulative(Message<?> arg0) throws PulsarClientException {
+        consumer.acknowledgeCumulative(arg0);
+    }
+
+    @Override
+    public void acknowledgeCumulative(MessageId arg0) throws PulsarClientException {
+        consumer.acknowledgeCumulative(arg0);
+    }
+
+    @Override
+    public CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> arg0) {
+        return consumer.acknowledgeCumulativeAsync(arg0);
+    }
+
+    @Override
+    public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId arg0) {
+        return consumer.acknowledgeCumulativeAsync(arg0);
+    }
+
+    @Override
+    public void close() throws PulsarClientException {
+        consumer.close();
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        return consumer.closeAsync();
+    }
+
+    @Override
+    public String getConsumerName() {
+        return consumer.getConsumerName();
+    }
+
+    @Override
+    public ConsumerStats getStats() {
+        return consumer.getStats();
+    }
+
+    public String getSubscription() {
+        return consumer.getSubscription();
+    }
+
+    public String getTopic() {
+        return consumer.getTopic();
+    }
+
+    public boolean hasReachedEndOfTopic() {
+        return consumer.hasReachedEndOfTopic();
+    }
+
+    public boolean isConnected() {
+        return consumer.isConnected();
+    }
+
+    public void pause() {
+        consumer.pause();
+    }
+
+    public Message<byte[]> receive() throws PulsarClientException {
+        return consumer.receive();
+    }
+
+    public Message<byte[]> receive(int arg0, TimeUnit arg1) throws PulsarClientException {
+        return consumer.receive(arg0, arg1);
+    }
+
+    public CompletableFuture<Message<byte[]>> receiveAsync() {
+        return consumer.receiveAsync();
+    }
+
+    public void redeliverUnacknowledgedMessages() {
+        consumer.redeliverUnacknowledgedMessages();
+    }
+
+    public void resume() {
+        consumer.resume();
+    }
+
+    public void seek(MessageId arg0) throws PulsarClientException {
+        consumer.seek(arg0);
+    }
+
+    public CompletableFuture<Void> seekAsync(MessageId arg0) {
+        return consumer.seekAsync(arg0);
+    }
+
+    public void unsubscribe() throws PulsarClientException {
+        consumer.unsubscribe();
+    }
+
+    public CompletableFuture<Void> unsubscribeAsync() {
+        return consumer.unsubscribeAsync();
+    }
+
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ProducerV1Impl.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ProducerV1Impl.java
new file mode 100644
index 0000000..3f91377
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ProducerV1Impl.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v1;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerStats;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ProducerImpl;
+
+public class ProducerV1Impl implements Producer {
+
+    private final ProducerImpl<byte[]> producer;
+
+    public ProducerV1Impl(ProducerImpl<byte[]> producer) {
+        this.producer = producer;
+    }
+
+    public void close() throws PulsarClientException {
+        producer.close();
+    }
+
+    public CompletableFuture<Void> closeAsync() {
+        return producer.closeAsync();
+    }
+
+    public void flush() throws PulsarClientException {
+        producer.flush();
+    }
+
+    public CompletableFuture<Void> flushAsync() {
+        return producer.flushAsync();
+    }
+
+    public long getLastSequenceId() {
+        return producer.getLastSequenceId();
+    }
+
+    public ProducerStats getStats() {
+        return producer.getStats();
+    }
+
+    public boolean isConnected() {
+        return producer.isConnected();
+    }
+
+    public MessageId send(byte[] value) throws PulsarClientException {
+        return producer.send(value);
+    }
+
+    public MessageId send(Message<byte[]> value) throws PulsarClientException {
+        return producer.send(value);
+    }
+
+    public CompletableFuture<MessageId> sendAsync(byte[] arg0) {
+        return producer.sendAsync(arg0);
+    }
+
+    public CompletableFuture<MessageId> sendAsync(Message<byte[]> arg0) {
+        return producer.sendAsync(arg0);
+    }
+
+    @Override
+    public String getTopic() {
+        return producer.getTopic();
+    }
+
+    @Override
+    public String getProducerName() {
+        return producer.getProducerName();
+    }
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/PulsarClientV1Impl.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/PulsarClientV1Impl.java
new file mode 100644
index 0000000..a10f9f1
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/PulsarClientV1Impl.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v1;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderConfiguration;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@SuppressWarnings("deprecation")
+public class PulsarClientV1Impl implements PulsarClient {
+
+    private final PulsarClientImpl client;
+
+    public PulsarClientV1Impl(String serviceUrl, ClientConfiguration conf) throws PulsarClientException {
+        this.client = new PulsarClientImpl(conf.setServiceUrl(serviceUrl).getConfigurationData().clone());
+    }
+
+    @Override
+    public void close() throws PulsarClientException {
+        client.close();
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        return client.closeAsync();
+    }
+
+    @Override
+    public Producer createProducer(final String topic, final ProducerConfiguration conf) throws PulsarClientException {
+        if (conf == null) {
+            throw new PulsarClientException.InvalidConfigurationException("Invalid null configuration object");
+        }
+
+        try {
+            return createProducerAsync(topic, conf).get();
+        } catch (ExecutionException e) {
+            Throwable t = e.getCause();
+            if (t instanceof PulsarClientException) {
+                throw (PulsarClientException) t;
+            } else {
+                throw new PulsarClientException(t);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    public Producer createProducer(String topic)
+            throws PulsarClientException {
+        return createProducer(topic, new ProducerConfiguration());
+    }
+
+    @Override
+    public CompletableFuture<Producer> createProducerAsync(final String topic, final ProducerConfiguration conf) {
+        ProducerConfigurationData confData = conf.getProducerConfigurationData().clone();
+        confData.setTopicName(topic);
+        return client.createProducerAsync(confData).thenApply(p -> new ProducerV1Impl((ProducerImpl<byte[]>) p));
+    }
+
+    @Override
+    public CompletableFuture<Producer> createProducerAsync(String topic) {
+        return createProducerAsync(topic, new ProducerConfiguration());
+    }
+
+    @Override
+    public Reader createReader(String topic, MessageId startMessageId, ReaderConfiguration conf)
+            throws PulsarClientException {
+        try {
+            return createReaderAsync(topic, startMessageId, conf).get();
+        } catch (ExecutionException e) {
+            Throwable t = e.getCause();
+            if (t instanceof PulsarClientException) {
+                throw (PulsarClientException) t;
+            } else {
+                throw new PulsarClientException(t);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Reader> createReaderAsync(String topic, MessageId startMessageId,
+            ReaderConfiguration conf) {
+        ReaderConfigurationData<byte[]> confData = conf.getReaderConfigurationData().clone();
+        confData.setTopicName(topic);
+        confData.setStartMessageId(startMessageId);
+        return client.createReaderAsync(confData).thenApply(r -> new ReaderV1Impl(r));
+    }
+
+    @Override
+    public void shutdown() throws PulsarClientException {
+        client.shutdown();
+    }
+
+    @Override
+    public Consumer subscribe(String topic, String subscriptionName) throws PulsarClientException {
+        return subscribe(topic, subscriptionName, new ConsumerConfiguration());
+    }
+
+    @Override
+    public CompletableFuture<Consumer> subscribeAsync(final String topic, final String subscription,
+            final ConsumerConfiguration conf) {
+        if (conf == null) {
+            return FutureUtil.failedFuture(
+                    new PulsarClientException.InvalidConfigurationException("Invalid null configuration"));
+        }
+
+        ConsumerConfigurationData<byte[]> confData = conf.getConfigurationData().clone();
+        confData.getTopicNames().add(topic);
+        confData.setSubscriptionName(subscription);
+        return client.subscribeAsync(confData).thenApply(c -> new ConsumerV1Impl(c));
+    }
+
+    @Override
+    public CompletableFuture<Consumer> subscribeAsync(String topic,
+            String subscriptionName) {
+        return subscribeAsync(topic, subscriptionName, new ConsumerConfiguration());
+    }
+
+    @Override
+    public Consumer subscribe(String topic, String subscription, ConsumerConfiguration conf)
+            throws PulsarClientException {
+        try {
+            return subscribeAsync(topic, subscription, conf).get();
+        } catch (ExecutionException e) {
+            Throwable t = e.getCause();
+            if (t instanceof PulsarClientException) {
+                throw (PulsarClientException) t;
+            } else {
+                throw new PulsarClientException(t);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException(e);
+        }
+    }
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ReaderV1Impl.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ReaderV1Impl.java
new file mode 100644
index 0000000..7ae86b5
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ReaderV1Impl.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v1;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+
+public class ReaderV1Impl implements Reader {
+
+    private final org.apache.pulsar.shade.client.api.v2.Reader<byte[]> reader;
+
+    public ReaderV1Impl(org.apache.pulsar.shade.client.api.v2.Reader<byte[]> reader) {
+        this.reader = reader;
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        return reader.closeAsync();
+    }
+
+    @Override
+    public String getTopic() {
+        return reader.getTopic();
+    }
+
+    @Override
+    public boolean hasMessageAvailable() throws PulsarClientException {
+        return reader.hasMessageAvailable();
+    }
+
+    @Override
+    public CompletableFuture<Boolean> hasMessageAvailableAsync() {
+        return reader.hasMessageAvailableAsync();
+    }
+
+    @Override
+    public boolean hasReachedEndOfTopic() {
+        return reader.hasReachedEndOfTopic();
+    }
+
+    @Override
+    public boolean isConnected() {
+        return reader.isConnected();
+    }
+
+    @Override
+    public Message<byte[]> readNext() throws PulsarClientException {
+        return reader.readNext();
+    }
+
+    @Override
+    public Message<byte[]> readNext(int arg0, TimeUnit arg1) throws PulsarClientException {
+        return reader.readNext(arg0, arg1);
+    }
+
+    @Override
+    public CompletableFuture<Message<byte[]>> readNextAsync() {
+        return reader.readNextAsync();
+    }
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageBuilderTest.java b/pulsar-client-1x-base/pulsar-client-1x/src/test/java/org/apache/pulsar/client/impl/MessageBuilderTest.java
similarity index 88%
rename from pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageBuilderTest.java
rename to pulsar-client-1x-base/pulsar-client-1x/src/test/java/org/apache/pulsar/client/impl/MessageBuilderTest.java
index 56630a4..5e14b7b 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageBuilderTest.java
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/test/java/org/apache/pulsar/client/impl/MessageBuilderTest.java
@@ -36,20 +36,20 @@ public class MessageBuilderTest {
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testSetEventTimeNegative() {
-        MessageBuilder<?> builder = MessageBuilder.create();
+        MessageBuilder builder = MessageBuilder.create();
         builder.setEventTime(-1L);
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testSetEventTimeZero() {
-        MessageBuilder<?> builder = MessageBuilder.create();
+        MessageBuilder builder = MessageBuilder.create();
         builder.setEventTime(0L);
     }
 
     @Test
     public void testSetEventTimePositive() {
         long eventTime = System.currentTimeMillis();
-        MessageBuilder<?> builder = MessageBuilder.create();
+        MessageBuilder builder = MessageBuilder.create();
         builder.setContent(new byte[0]);
         builder.setEventTime(eventTime);
         Message<?> msg = builder.build();
@@ -58,7 +58,7 @@ public class MessageBuilderTest {
 
     @Test
     public void testBuildMessageWithoutEventTime() {
-        MessageBuilder<?> builder = MessageBuilder.create();
+        MessageBuilder builder = MessageBuilder.create();
         builder.setContent(new byte[0]);
         Message<?> msg = builder.build();
         assertEquals(0L, msg.getEventTime());
@@ -66,7 +66,7 @@ public class MessageBuilderTest {
 
     @Test
     public void testSetMessageProperties() {
-        MessageBuilder<?> builder = MessageBuilder.create();
+        MessageBuilder builder = MessageBuilder.create();
         builder.setContent(new byte[0]);
         Map<String, String> map = Maps.newHashMap();
         map.put("key1", "value1");
diff --git a/pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml b/pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml
new file mode 100644
index 0000000..6b6b2bc
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml
@@ -0,0 +1,91 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-client-1x-base</artifactId>
+    <version>2.3.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>pulsar-client-2x-shaded</artifactId>
+  <name>Pulsar Client 2.x Shaded API</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-client</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
+  </dependencies>
+  
+  <build>
+    <plugins>
+       <plugin>
+        <!-- Shade all the dependencies to avoid conflicts -->
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <createDependencyReducedPom>true</createDependencyReducedPom>
+              <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+              <minimizeJar>false</minimizeJar>
+
+              <artifactSet>
+                <includes>
+                  <include>org.apache.pulsar:pulsar-client</include>
+                </includes>
+              </artifactSet>
+              <filters>
+                <filter>
+                  <artifact>org.apache.pulsar:pulsar-client</artifact>
+                  <includes>
+                    <include>**</include>
+                  </includes>
+                </filter>
+              </filters>
+              <relocations>
+                <relocation>
+                  <pattern>org.apache.pulsar.client.api</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.client.api.v2</shadedPattern>
+                  <includes>
+                    <include>org.apache.pulsar.client.api.PulsarClient</include>
+                    <include>org.apache.pulsar.client.api.Producer</include>
+                    <include>org.apache.pulsar.client.api.Consumer</include>
+                    <include>org.apache.pulsar.client.api.Reader</include>
+                  </includes>
+                </relocation>
+              </relocations>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DefaultSchemasTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DefaultSchemasTest.java
index 70416f3..46516f0 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DefaultSchemasTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DefaultSchemasTest.java
@@ -18,9 +18,11 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
+import static org.testng.Assert.assertEquals;
+
+import java.nio.charset.StandardCharsets;
+
 import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -30,10 +32,6 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import java.nio.charset.StandardCharsets;
-
-import static org.testng.Assert.assertEquals;
-
 public class DefaultSchemasTest {
     private PulsarClient client;
 
@@ -75,17 +73,7 @@ public class DefaultSchemasTest {
         Assert.assertTrue(stringSchema.decode(testBytes).equals(testString));
         assertEquals(stringSchema.encode(testString), testBytes);
 
-        Message<String> msg1 = MessageBuilder.create(stringSchema)
-                .setContent(testBytes)
-                .build();
-        assertEquals(stringSchema.decode(msg1.getData()), testString);
-
-        Message<String> msg2 = MessageBuilder.create(stringSchema)
-                .setValue(testString)
-                .build();
-        assertEquals(stringSchema.encode(testString), msg2.getData());
-
-        byte[] bytes2 = testString.getBytes(StandardCharsets.UTF_16);
+         byte[] bytes2 = testString.getBytes(StandardCharsets.UTF_16);
         StringSchema stringSchemaUtf16 = new StringSchema(StandardCharsets.UTF_16);
         Assert.assertTrue(stringSchemaUtf16.decode(bytes2).equals(testString));
         assertEquals(stringSchemaUtf16.encode(testString), bytes2);