You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/12/30 16:47:47 UTC

[GitHub] merlimat closed pull request #3228: Provide separate module with Pulsar v1 client API wrapper

merlimat closed pull request #3228: Provide separate module with Pulsar v1 client API wrapper
URL: https://github.com/apache/pulsar/pull/3228
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index 2aaca3ad35..ee2780caeb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,6 +84,7 @@ flexible messaging model and an intuitive client API.</description>
     <module>pulsar-client-api</module>
     <module>pulsar-client</module>
     <module>pulsar-client-shaded</module>
+    <module>pulsar-client-1x-base</module>
     <module>pulsar-client-admin</module>
     <module>pulsar-client-admin-shaded</module>
     <module>pulsar-client-tools</module>
diff --git a/pulsar-client-1x-base/pom.xml b/pulsar-client-1x-base/pom.xml
new file mode 100644
index 0000000000..22a5b1b8e6
--- /dev/null
+++ b/pulsar-client-1x-base/pom.xml
@@ -0,0 +1,40 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar</artifactId>
+    <version>2.3.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>pulsar-client-1x-base</artifactId>
+  <name>Pulsar Client 1.x Compatibility Base</name>
+  <packaging>pom</packaging>
+
+  <modules>
+      <module>pulsar-client-2x-shaded</module>
+      <module>pulsar-client-1x</module>
+  </modules>
+</project>
diff --git a/pulsar-client-1x-base/pulsar-client-1x/pom.xml b/pulsar-client-1x-base/pulsar-client-1x/pom.xml
new file mode 100644
index 0000000000..b6fd0b84e3
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/pom.xml
@@ -0,0 +1,52 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-client-1x-base</artifactId>
+    <version>2.3.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>pulsar-client-1x</artifactId>
+  <name>Pulsar Client 1.x Compatibility API</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-client-2x-shaded</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
+    
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    
+    <dependency>
+        <groupId>org.apache.commons</groupId>
+        <artifactId>commons-lang3</artifactId>
+      </dependency>
+  </dependencies>
+</project>
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
new file mode 100644
index 0000000000..e5bac914b7
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+
+/**
+ * Class used to specify client side configuration like authentication, etc..
+ *
+ * @deprecated Use {@link PulsarClient#builder()} to construct and configure a new {@link PulsarClient} instance
+ */
+@Deprecated
+public class ClientConfiguration implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final ClientConfigurationData confData = new ClientConfigurationData();
+
+    /**
+     * @return the authentication provider to be used
+     */
+    public Authentication getAuthentication() {
+        return confData.getAuthentication();
+    }
+
+    /**
+     * Set the authentication provider to use in the Pulsar client instance.
+     * <p>
+     * Example:
+     * <p>
+     *
+     * <pre>
+     * <code>
+     * ClientConfiguration confData = new ClientConfiguration();
+     * String authPluginClassName = "org.apache.pulsar.client.impl.auth.MyAuthentication";
+     * String authParamsString = "key1:val1,key2:val2";
+     * Authentication auth = AuthenticationFactory.create(authPluginClassName, authParamsString);
+     * confData.setAuthentication(auth);
+     * PulsarClient client = PulsarClient.create(serviceUrl, confData);
+     * ....
+     * </code>
+     * </pre>
+     *
+     * @param authentication
+     */
+    public void setAuthentication(Authentication authentication) {
+        confData.setAuthentication(authentication);
+    }
+
+    /**
+     * Set the authentication provider to use in the Pulsar client instance.
+     * <p>
+     * Example:
+     * <p>
+     *
+     * <pre>
+     * <code>
+     * ClientConfiguration confData = new ClientConfiguration();
+     * String authPluginClassName = "org.apache.pulsar.client.impl.auth.MyAuthentication";
+     * String authParamsString = "key1:val1,key2:val2";
+     * confData.setAuthentication(authPluginClassName, authParamsString);
+     * PulsarClient client = PulsarClient.create(serviceUrl, confData);
+     * ....
+     * </code>
+     * </pre>
+     *
+     * @param authPluginClassName
+     *            name of the Authentication-Plugin you want to use
+     * @param authParamsString
+     *            string which represents parameters for the Authentication-Plugin, e.g., "key1:val1,key2:val2"
+     * @throws UnsupportedAuthenticationException
+     *             failed to instantiate specified Authentication-Plugin
+     */
+    public void setAuthentication(String authPluginClassName, String authParamsString)
+            throws UnsupportedAuthenticationException {
+        confData.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParamsString));
+    }
+
+    /**
+     * Set the authentication provider to use in the Pulsar client instance.
+     * <p>
+     * Example:
+     * <p>
+     *
+     * <pre>
+     * <code>
+     * ClientConfiguration confData = new ClientConfiguration();
+     * String authPluginClassName = "org.apache.pulsar.client.impl.auth.MyAuthentication";
+     * Map<String, String> authParams = new HashMap<String, String>();
+     * authParams.put("key1", "val1");
+     * confData.setAuthentication(authPluginClassName, authParams);
+     * PulsarClient client = PulsarClient.create(serviceUrl, confData);
+     * ....
+     * </code>
+     * </pre>
+     *
+     * @param authPluginClassName
+     *            name of the Authentication-Plugin you want to use
+     * @param authParams
+     *            map which represents parameters for the Authentication-Plugin
+     * @throws UnsupportedAuthenticationException
+     *             failed to instantiate specified Authentication-Plugin
+     */
+    public void setAuthentication(String authPluginClassName, Map<String, String> authParams)
+            throws UnsupportedAuthenticationException {
+        confData.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParams));
+    }
+
+    /**
+     * @return the operation timeout in ms
+     */
+    public long getOperationTimeoutMs() {
+        return confData.getOperationTimeoutMs();
+    }
+
+    /**
+     * Set the operation timeout <i>(default: 30 seconds)</i>
+     * <p>
+     * Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the
+     * operation will be marked as failed
+     *
+     * @param operationTimeout
+     *            operation timeout
+     * @param unit
+     *            time unit for {@code operationTimeout}
+     */
+    public void setOperationTimeout(int operationTimeout, TimeUnit unit) {
+        checkArgument(operationTimeout >= 0);
+        confData.setOperationTimeoutMs(unit.toMillis(operationTimeout));
+    }
+
+    /**
+     * @return the number of threads to use for handling connections
+     */
+    public int getIoThreads() {
+        return confData.getNumIoThreads();
+    }
+
+    /**
+     * Set the number of threads to be used for handling connections to brokers <i>(default: 1 thread)</i>
+     *
+     * @param numIoThreads
+     */
+    public void setIoThreads(int numIoThreads) {
+        checkArgument(numIoThreads > 0);
+        confData.setNumIoThreads(numIoThreads);
+    }
+
+    /**
+     * @return the number of threads to use for message listeners
+     */
+    public int getListenerThreads() {
+        return confData.getNumListenerThreads();
+    }
+
+    /**
+     * Set the number of threads to be used for message listeners <i>(default: 1 thread)</i>
+     *
+     * @param numListenerThreads
+     */
+    public void setListenerThreads(int numListenerThreads) {
+        checkArgument(numListenerThreads > 0);
+        confData.setNumListenerThreads(numListenerThreads);
+    }
+
+    /**
+     * @return the max number of connections per single broker
+     */
+    public int getConnectionsPerBroker() {
+        return confData.getConnectionsPerBroker();
+    }
+
+    /**
+     * Sets the max number of connection that the client library will open to a single broker.
+     * <p>
+     * By default, the connection pool will use a single connection for all the producers and consumers. Increasing this
+     * parameter may improve throughput when using many producers over a high latency connection.
+     * <p>
+     *
+     * @param connectionsPerBroker
+     *            max number of connections per broker (needs to be greater than 0)
+     */
+    public void setConnectionsPerBroker(int connectionsPerBroker) {
+        checkArgument(connectionsPerBroker > 0, "Connections per broker need to be greater than 0");
+        confData.setConnectionsPerBroker(connectionsPerBroker);
+    }
+
+    /**
+     * @return whether TCP no-delay should be set on the connections
+     */
+    public boolean isUseTcpNoDelay() {
+        return confData.isUseTcpNoDelay();
+    }
+
+    /**
+     * Configure whether to use TCP no-delay flag on the connection, to disable Nagle algorithm.
+     * <p>
+     * No-delay features make sure packets are sent out on the network as soon as possible, and it's critical to achieve
+     * low latency publishes. On the other hand, sending out a huge number of small packets might limit the overall
+     * throughput, so if latency is not a concern, it's advisable to set the <code>useTcpNoDelay</code> flag to false.
+     * <p>
+     * Default value is true
+     *
+     * @param useTcpNoDelay
+     */
+    public void setUseTcpNoDelay(boolean useTcpNoDelay) {
+        confData.setUseTcpNoDelay(useTcpNoDelay);
+    }
+
+    /**
+     * @return whether TLS encryption is used on the connection
+     */
+    public boolean isUseTls() {
+        return confData.isUseTls();
+    }
+
+    /**
+     * Configure whether to use TLS encryption on the connection <i>(default: false)</i>
+     *
+     * @param useTls
+     */
+    public void setUseTls(boolean useTls) {
+        confData.setUseTls(useTls);
+    }
+
+    /**
+     * @return path to the trusted TLS certificate file
+     */
+    public String getTlsTrustCertsFilePath() {
+        return confData.getTlsTrustCertsFilePath();
+    }
+
+    /**
+     * Set the path to the trusted TLS certificate file
+     *
+     * @param tlsTrustCertsFilePath
+     */
+    public void setTlsTrustCertsFilePath(String tlsTrustCertsFilePath) {
+        confData.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
+    }
+
+    /**
+     * @return whether the Pulsar client accept untrusted TLS certificate from broker
+     */
+    public boolean isTlsAllowInsecureConnection() {
+        return confData.isTlsAllowInsecureConnection();
+    }
+
+    /**
+     * Configure whether the Pulsar client accept untrusted TLS certificate from broker <i>(default: false)</i>
+     *
+     * @param tlsAllowInsecureConnection
+     */
+    public void setTlsAllowInsecureConnection(boolean tlsAllowInsecureConnection) {
+        confData.setTlsAllowInsecureConnection(tlsAllowInsecureConnection);
+    }
+
+    /**
+     * Stats will be activated with positive statsIntervalSeconds
+     *
+     * @return the interval between each stat info <i>(default: 60 seconds)</i>
+     */
+    public long getStatsIntervalSeconds() {
+        return confData.getStatsIntervalSeconds();
+    }
+
+    /**
+     * Set the interval between each stat info <i>(default: 60 seconds)</i> Stats will be activated with positive
+     * statsIntervalSeconds It should be set to at least 1 second
+     *
+     * @param statsIntervalSeconds
+     *            the interval between each stat info
+     * @param unit
+     *            time unit for {@code statsInterval}
+     */
+    public void setStatsInterval(long statsInterval, TimeUnit unit) {
+        confData.setStatsIntervalSeconds(unit.toSeconds(statsInterval));
+    }
+
+    /**
+     * Get configured total allowed concurrent lookup-request.
+     *
+     * @return
+     */
+    public int getConcurrentLookupRequest() {
+        return confData.getConcurrentLookupRequest();
+    }
+
+    /**
+     * Number of concurrent lookup-requests allowed on each broker-connection to prevent overload on broker.
+     * <i>(default: 50000)</i> It should be configured with higher value only in case of it requires to
+     * produce/subscribe on thousands of topic using created {@link PulsarClient}
+     *
+     * @param concurrentLookupRequest
+     */
+    public void setConcurrentLookupRequest(int concurrentLookupRequest) {
+        confData.setConcurrentLookupRequest(concurrentLookupRequest);
+    }
+
+    /**
+     * Get configured max number of reject-request in a time-frame (30 seconds) after which connection will be closed
+     *
+     * @return
+     */
+    public int getMaxNumberOfRejectedRequestPerConnection() {
+        return confData.getMaxNumberOfRejectedRequestPerConnection();
+    }
+
+    /**
+     * Set max number of broker-rejected requests in a certain time-frame (30 seconds) after which current connection
+     * will be closed and client creates a new connection that give chance to connect a different broker <i>(default:
+     * 50)</i>
+     *
+     * @param maxNumberOfRejectedRequestPerConnection
+     */
+    public void setMaxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRequestPerConnection) {
+        confData.setMaxNumberOfRejectedRequestPerConnection(maxNumberOfRejectedRequestPerConnection);
+    }
+
+    public boolean isTlsHostnameVerificationEnable() {
+        return confData.isTlsHostnameVerificationEnable();
+    }
+
+    /**
+     * It allows to validate hostname verification when client connects to broker over tls. It validates incoming x509
+     * certificate and matches provided hostname(CN/SAN) with expected broker's host name. It follows RFC 2818, 3.1.
+     * Server Identity hostname verification.
+     *
+     * @see <a href="https://tools.ietf.org/html/rfc2818">rfc2818</a>
+     *
+     * @param tlsHostnameVerificationEnable
+     */
+    public void setTlsHostnameVerificationEnable(boolean tlsHostnameVerificationEnable) {
+        confData.setTlsHostnameVerificationEnable(tlsHostnameVerificationEnable);
+    }
+
+    public ClientConfiguration setServiceUrl(String serviceUrl) {
+        confData.setServiceUrl(serviceUrl);
+        return this;
+    }
+
+    /**
+     * Set the duration of time to wait for a connection to a broker to be established. If the duration
+     * passes without a response from the broker, the connection attempt is dropped.
+     *
+     * @param duration the duration to wait
+     * @param unit the time unit in which the duration is defined
+     */
+    public void setConnectionTimeout(int duration, TimeUnit unit) {
+        confData.setConnectionTimeoutMs((int)unit.toMillis(duration));
+    }
+
+    /**
+     * Get the duration of time for which the client will wait for a connection to a broker to be
+     * established before giving up.
+     *
+     * @return the duration, in milliseconds
+     */
+    public long getConnectionTimeoutMs() {
+        return confData.getConnectionTimeoutMs();
+    }
+
+    public ClientConfigurationData getConfigurationData() {
+        return confData;
+    }
+
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Consumer.java
new file mode 100644
index 0000000000..00da0a4b75
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An interface that abstracts behavior of Pulsar's consumer.
+ */
+public interface Consumer extends Closeable {
+
+    /**
+     * Get a topic for the consumer
+     *
+     * @return topic for the consumer
+     */
+    String getTopic();
+
+    /**
+     * Get a subscription for the consumer
+     *
+     * @return subscription for the consumer
+     */
+    String getSubscription();
+
+    /**
+     * Unsubscribe the consumer
+     * <p>
+     * This call blocks until the consumer is unsubscribed.
+     *
+     * @throws PulsarClientException
+     */
+    void unsubscribe() throws PulsarClientException;
+
+    /**
+     * Asynchronously unsubscribe the consumer
+     *
+     * @return {@link CompletableFuture} for this operation
+     */
+    CompletableFuture<Void> unsubscribeAsync();
+
+    /**
+     * Receives a single message.
+     * <p>
+     * This calls blocks until a message is available.
+     *
+     * @return the received message
+     * @throws PulsarClientException.AlreadyClosedException
+     *             if the consumer was already closed
+     * @throws PulsarClientException.InvalidConfigurationException
+     *             if a message listener was defined in the configuration
+     */
+    Message<byte[]> receive() throws PulsarClientException;
+
+    /**
+     * Receive a single message
+     * <p>
+     * Retrieves a message when it will be available and completes {@link CompletableFuture} with received message.
+     * </p>
+     * <p>
+     * {@code receiveAsync()} should be called subsequently once returned {@code CompletableFuture} gets complete with
+     * received message. Else it creates <i> backlog of receive requests </i> in the application.
+     * </p>
+     *
+     * @return {@link CompletableFuture}<{@link Message}> will be completed when message is available
+     */
+    CompletableFuture<Message<byte[]>> receiveAsync();
+
+    /**
+     * Receive a single message
+     * <p>
+     * Retrieves a message, waiting up to the specified wait time if necessary.
+     *
+     * @param timeout
+     *            0 or less means immediate rather than infinite
+     * @param unit
+     * @return the received {@link Message} or null if no message available before timeout
+     * @throws PulsarClientException.AlreadyClosedException
+     *             if the consumer was already closed
+     * @throws PulsarClientException.InvalidConfigurationException
+     *             if a message listener was defined in the configuration
+     */
+    Message<byte[]> receive(int timeout, TimeUnit unit) throws PulsarClientException;
+
+    /**
+     * Acknowledge the consumption of a single message
+     *
+     * @param message
+     *            The {@code Message} to be acknowledged
+     * @throws PulsarClientException.AlreadyClosedException
+     *             if the consumer was already closed
+     */
+    void acknowledge(Message<?> message) throws PulsarClientException;
+
+    /**
+     * Acknowledge the consumption of a single message, identified by its MessageId
+     *
+     * @param messageId
+     *            The {@code MessageId} to be acknowledged
+     * @throws PulsarClientException.AlreadyClosedException
+     *             if the consumer was already closed
+     */
+    void acknowledge(MessageId messageId) throws PulsarClientException;
+
+    /**
+     * Acknowledge the reception of all the messages in the stream up to (and including) the provided message.
+     *
+     * This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
+     * re-delivered to this consumer.
+     *
+     * Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
+     *
+     * It's equivalent to calling asyncAcknowledgeCumulative(Message) and waiting for the callback to be triggered.
+     *
+     * @param message
+     *            The {@code Message} to be cumulatively acknowledged
+     * @throws PulsarClientException.AlreadyClosedException
+     *             if the consumer was already closed
+     */
+    void acknowledgeCumulative(Message<?> message) throws PulsarClientException;
+
+    /**
+     * Acknowledge the reception of all the messages in the stream up to (and including) the provided message.
+     *
+     * This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
+     * re-delivered to this consumer.
+     *
+     * Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
+     *
+     * It's equivalent to calling asyncAcknowledgeCumulative(MessageId) and waiting for the callback to be triggered.
+     *
+     * @param messageId
+     *            The {@code MessageId} to be cumulatively acknowledged
+     * @throws PulsarClientException.AlreadyClosedException
+     *             if the consumer was already closed
+     */
+    void acknowledgeCumulative(MessageId messageId) throws PulsarClientException;
+
+    /**
+     * Asynchronously acknowledge the consumption of a single message
+     *
+     * @param message
+     *            The {@code Message} to be acknowledged
+     * @return a future that can be used to track the completion of the operation
+     */
+    CompletableFuture<Void> acknowledgeAsync(Message<?> message);
+
+    /**
+     * Asynchronously acknowledge the consumption of a single message
+     *
+     * @param messageId
+     *            The {@code MessageId} to be acknowledged
+     * @return a future that can be used to track the completion of the operation
+     */
+    CompletableFuture<Void> acknowledgeAsync(MessageId messageId);
+
+    /**
+     * Asynchronously Acknowledge the reception of all the messages in the stream up to (and including) the provided
+     * message.
+     *
+     * Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
+     *
+     * @param message
+     *            The {@code Message} to be cumulatively acknowledged
+     * @return a future that can be used to track the completion of the operation
+     */
+    CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message);
+
+    /**
+     * Asynchronously Acknowledge the reception of all the messages in the stream up to (and including) the provided
+     * message.
+     *
+     * Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
+     *
+     * @param messageId
+     *            The {@code MessageId} to be cumulatively acknowledged
+     * @return a future that can be used to track the completion of the operation
+     */
+    CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId);
+
+    /**
+     * Get statistics for the consumer.
+     *
+     * <ul>
+     * <li>numMsgsReceived : Number of messages received in the current interval
+     * <li>numBytesReceived : Number of bytes received in the current interval
+     * <li>numReceiveFailed : Number of messages failed to receive in the current interval
+     * <li>numAcksSent : Number of acks sent in the current interval
+     * <li>numAcksFailed : Number of acks failed to send in the current interval
+     * <li>totalMsgsReceived : Total number of messages received
+     * <li>totalBytesReceived : Total number of bytes received
+     * <li>totalReceiveFailed : Total number of messages failed to receive
+     * <li>totalAcksSent : Total number of acks sent
+     * <li>totalAcksFailed : Total number of acks failed to sent
+     * </ul>
+     *
+     * @return statistic for the consumer
+     */
+    ConsumerStats getStats();
+
+    /**
+     * Close the consumer and stop the broker to push more messages.
+     */
+    @Override
+    void close() throws PulsarClientException;
+
+    /**
+     * Asynchronously close the consumer and stop the broker to push more messages
+     *
+     * @return a future that can be used to track the completion of the operation
+     */
+    CompletableFuture<Void> closeAsync();
+
+    /**
+     * Return true if the topic was terminated and this consumer has already consumed all the messages in the topic.
+     *
+     * Please note that this does not simply mean that the consumer is caught up with the last message published by
+     * producers, rather the topic needs to be explicitly "terminated".
+     */
+    boolean hasReachedEndOfTopic();
+
+    /**
+     * Redelivers all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not
+     * active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all
+     * the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
+     * breaks, the messages are redelivered after reconnect.
+     */
+    void redeliverUnacknowledgedMessages();
+
+    /**
+     * Reset the subscription associated with this consumer to a specific message id.
+     * <p>
+     *
+     * The message id can either be a specific message or represent the first or last messages in the topic.
+     * <p>
+     * <ul>
+     * <li><code>MessageId.earliest</code> : Reset the subscription on the earliest message available in the topic
+     * <li><code>MessageId.latest</code> : Reset the subscription on the latest message in the topic
+     * </ul>
+     *
+     * Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on
+     * the individual partitions.
+     *
+     * @param messageId
+     *            the message id where to reposition the subscription
+     */
+    void seek(MessageId messageId) throws PulsarClientException;
+
+    /**
+     * Reset the subscription associated with this consumer to a specific message id.
+     * <p>
+     *
+     * The message id can either be a specific message or represent the first or last messages in the topic.
+     * <p>
+     * <ul>
+     * <li><code>MessageId.earliest</code> : Reset the subscription on the earliest message available in the topic
+     * <li><code>MessageId.latest</code> : Reset the subscription on the latest message in the topic
+     * </ul>
+     *
+     * Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on
+     * the individual partitions.
+     *
+     * @param messageId
+     *            the message id where to reposition the subscription
+     * @return a future to track the completion of the seek operation
+     */
+    CompletableFuture<Void> seekAsync(MessageId messageId);
+
+    /**
+     * @return Whether the consumer is connected to the broker
+     */
+    boolean isConnected();
+
+    /**
+     * Get the name of consumer.
+     * @return consumer name.
+     */
+    String getConsumerName();
+
+    /**
+     * Stop requesting new messages from the broker until {@link #resume()} is called. Note that this might cause
+     * {@link #receive()} to block until {@link #resume()} is called and new messages are pushed by the broker.
+     */
+    void pause();
+
+    /**
+     * Resume requesting messages from the broker.
+     */
+    void resume();
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
new file mode 100644
index 0000000000..5ffba38ffc
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+/**
+ * Class specifying the configuration of a consumer. In Exclusive subscription, only a single consumer is allowed to
+ * attach to the subscription. Other consumers will get an error message. In Shared subscription, multiple consumers
+ * will be able to use the same subscription name and the messages will be dispatched in a round robin fashion.
+ *
+ * @deprecated Use {@link PulsarClient#newConsumer} to build and configure a {@link Consumer} instance
+ */
+@Deprecated
+public class ConsumerConfiguration implements Serializable {
+
+    /**
+     * Resend shouldn't be requested before minAckTimeoutMillis.
+     */
+    static long minAckTimeoutMillis = 1000;
+
+    private static final long serialVersionUID = 1L;
+
+    private final ConsumerConfigurationData<byte[]> conf = new ConsumerConfigurationData<>();
+
+    private boolean initializeSubscriptionOnLatest = true;
+
+    public ConsumerConfiguration() {
+        // Disable acknowledgment grouping when using v1 API
+        conf.setAcknowledgementsGroupTimeMicros(0);
+    }
+
+    /**
+     * @return the configured timeout in milliseconds for unacked messages.
+     */
+    public long getAckTimeoutMillis() {
+        return conf.getAckTimeoutMillis();
+    }
+
+    /**
+     * Set the timeout for unacked messages, truncated to the nearest millisecond. The timeout needs to be greater than
+     * 10 seconds.
+     *
+     * @param ackTimeout
+     *            for unacked messages.
+     * @param timeUnit
+     *            unit in which the timeout is provided.
+     * @return {@link ConsumerConfiguration}
+     */
+    public ConsumerConfiguration setAckTimeout(long ackTimeout, TimeUnit timeUnit) {
+        long ackTimeoutMillis = timeUnit.toMillis(ackTimeout);
+        checkArgument(ackTimeoutMillis >= minAckTimeoutMillis,
+                "Ack timeout should be should be greater than " + minAckTimeoutMillis + " ms");
+        conf.setAckTimeoutMillis(timeUnit.toMillis(ackTimeout));
+        return this;
+    }
+
+    /**
+     * @return the configured subscription type
+     */
+    public SubscriptionType getSubscriptionType() {
+        return conf.getSubscriptionType();
+    }
+
+    /**
+     * Select the subscription type to be used when subscribing to the topic.
+     * <p>
+     * Default is {@link SubscriptionType#Exclusive}
+     *
+     * @param subscriptionType
+     *            the subscription type value
+     */
+    public ConsumerConfiguration setSubscriptionType(SubscriptionType subscriptionType) {
+        checkNotNull(subscriptionType);
+        conf.setSubscriptionType(subscriptionType);
+        return this;
+    }
+
+    /**
+     * @return the configured {@link MessageListener} for the consumer
+     */
+    public MessageListener<byte[]> getMessageListener() {
+        return conf.getMessageListener();
+    }
+
+    /**
+     * Sets a {@link MessageListener} for the consumer
+     * <p>
+     * When a {@link MessageListener} is set, application will receive messages through it. Calls to
+     * {@link Consumer#receive()} will not be allowed.
+     *
+     * @param messageListener
+     *            the listener object
+     */
+    public ConsumerConfiguration setMessageListener(MessageListener<byte[]> messageListener) {
+        checkNotNull(messageListener);
+        conf.setMessageListener(messageListener);
+        return this;
+    }
+
+    /**
+     * @return this configured {@link ConsumerEventListener} for the consumer.
+     * @see #setConsumerEventListener(ConsumerEventListener)
+     * @since 2.0
+     */
+    public ConsumerEventListener getConsumerEventListener() {
+        return conf.getConsumerEventListener();
+    }
+
+    /**
+     * Sets a {@link ConsumerEventListener} for the consumer.
+     *
+     * <p>
+     * The consumer group listener is used for receiving consumer state change in a consumer group for failover
+     * subscription. Application can then react to the consumer state changes.
+     *
+     * <p>
+     * This change is experimental. It is subject to changes coming in release 2.0.
+     *
+     * @param listener
+     *            the consumer group listener object
+     * @return consumer configuration
+     * @since 2.0
+     */
+    public ConsumerConfiguration setConsumerEventListener(ConsumerEventListener listener) {
+        checkNotNull(listener);
+        conf.setConsumerEventListener(listener);
+        return this;
+    }
+
+    /**
+     * @return the configure receiver queue size value
+     */
+    public int getReceiverQueueSize() {
+        return conf.getReceiverQueueSize();
+    }
+
+    /**
+     * @return the configured max total receiver queue size across partitions
+     */
+    public int getMaxTotalReceiverQueueSizeAcrossPartitions() {
+        return conf.getMaxTotalReceiverQueueSizeAcrossPartitions();
+    }
+
+    /**
+     * Set the max total receiver queue size across partitons.
+     * <p>
+     * This setting will be used to reduce the receiver queue size for individual partitions
+     * {@link #setReceiverQueueSize(int)} if the total exceeds this value (default: 50000).
+     *
+     * @param maxTotalReceiverQueueSizeAcrossPartitions
+     */
+    public void setMaxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions) {
+        checkArgument(maxTotalReceiverQueueSizeAcrossPartitions >= conf.getReceiverQueueSize());
+        conf.setMaxTotalReceiverQueueSizeAcrossPartitions(maxTotalReceiverQueueSizeAcrossPartitions);
+    }
+
+    /**
+     * @return the CryptoKeyReader
+     */
+    public CryptoKeyReader getCryptoKeyReader() {
+        return conf.getCryptoKeyReader();
+    }
+
+    /**
+     * Sets a {@link CryptoKeyReader}
+     *
+     * @param cryptoKeyReader
+     *            CryptoKeyReader object
+     */
+    public ConsumerConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
+        checkNotNull(cryptoKeyReader);
+        conf.setCryptoKeyReader(cryptoKeyReader);
+        return this;
+    }
+
+    /**
+     * Sets the ConsumerCryptoFailureAction to the value specified
+     *
+     * @param action
+     *            consumer action
+     */
+    public void setCryptoFailureAction(ConsumerCryptoFailureAction action) {
+        conf.setCryptoFailureAction(action);
+    }
+
+    /**
+     * @return The ConsumerCryptoFailureAction
+     */
+    public ConsumerCryptoFailureAction getCryptoFailureAction() {
+        return conf.getCryptoFailureAction();
+    }
+
+    /**
+     * Sets the size of the consumer receive queue.
+     * <p>
+     * The consumer receive queue controls how many messages can be accumulated by the {@link Consumer} before the
+     * application calls {@link Consumer#receive()}. Using a higher value could potentially increase the consumer
+     * throughput at the expense of bigger memory utilization.
+     * </p>
+     * <p>
+     * <b>Setting the consumer queue size as zero</b>
+     * <ul>
+     * <li>Decreases the throughput of the consumer, by disabling pre-fetching of messages. This approach improves the
+     * message distribution on shared subscription, by pushing messages only to the consumers that are ready to process
+     * them. Neither {@link Consumer#receive(int, TimeUnit)} nor Partitioned Topics can be used if the consumer queue
+     * size is zero. {@link Consumer#receive()} function call should not be interrupted when the consumer queue size is
+     * zero.</li>
+     * <li>Doesn't support Batch-Message: if consumer receives any batch-message then it closes consumer connection with
+     * broker and {@link Consumer#receive()} call will remain blocked while {@link Consumer#receiveAsync()} receives
+     * exception in callback. <b> consumer will not be able receive any further message unless batch-message in pipeline
+     * is removed</b></li>
+     * </ul>
+     * </p>
+     * Default value is {@code 1000} messages and should be good for most use cases.
+     *
+     * @param receiverQueueSize
+     *            the new receiver queue size value
+     */
+    public ConsumerConfiguration setReceiverQueueSize(int receiverQueueSize) {
+        checkArgument(receiverQueueSize >= 0, "Receiver queue size cannot be negative");
+        conf.setReceiverQueueSize(receiverQueueSize);
+        return this;
+    }
+
+    /**
+     * @return the consumer name
+     */
+    public String getConsumerName() {
+        return conf.getConsumerName();
+    }
+
+    /**
+     * Set the consumer name.
+     *
+     * @param consumerName
+     */
+    public ConsumerConfiguration setConsumerName(String consumerName) {
+        checkArgument(consumerName != null && !consumerName.equals(""));
+        conf.setConsumerName(consumerName);
+        return this;
+    }
+
+    public int getPriorityLevel() {
+        return conf.getPriorityLevel();
+    }
+
+    /**
+     * Sets priority level for the shared subscription consumers to which broker gives more priority while dispatching
+     * messages. Here, broker follows descending priorities. (eg: 0=max-priority, 1, 2,..) </br>
+     * In Shared subscription mode, broker will first dispatch messages to max priority-level consumers if they have
+     * permits, else broker will consider next priority level consumers. </br>
+     * If subscription has consumer-A with priorityLevel 0 and Consumer-B with priorityLevel 1 then broker will dispatch
+     * messages to only consumer-A until it runs out permit and then broker starts dispatching messages to Consumer-B.
+     *
+     * <pre>
+     * Consumer PriorityLevel Permits
+     * C1       0             2
+     * C2       0             1
+     * C3       0             1
+     * C4       1             2
+     * C5       1             1
+     * Order in which broker dispatches messages to consumers: C1, C2, C3, C1, C4, C5, C4
+     * </pre>
+     *
+     * @param priorityLevel
+     */
+    public void setPriorityLevel(int priorityLevel) {
+        conf.setPriorityLevel(priorityLevel);
+    }
+
+    public boolean getReadCompacted() {
+        return conf.isReadCompacted();
+    }
+
+    /**
+     * If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog
+     * of the topic. This means that, if the topic has been compacted, the consumer will only see the latest value for
+     * each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
+     * point, the messages will be sent as normal.
+     *
+     * readCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e.
+     * failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a
+     * shared subscription, will lead to the subscription call throwing a PulsarClientException.
+     *
+     * @param readCompacted
+     *            whether to read from the compacted topic
+     */
+    public ConsumerConfiguration setReadCompacted(boolean readCompacted) {
+        conf.setReadCompacted(readCompacted);
+        return this;
+    }
+
+    /**
+     * Set a name/value property with this consumer.
+     *
+     * @param key
+     * @param value
+     * @return
+     */
+    public ConsumerConfiguration setProperty(String key, String value) {
+        checkArgument(key != null);
+        checkArgument(value != null);
+        conf.getProperties().put(key, value);
+        return this;
+    }
+
+    /**
+     * Add all the properties in the provided map
+     *
+     * @param properties
+     * @return
+     */
+    public ConsumerConfiguration setProperties(Map<String, String> properties) {
+        conf.getProperties().putAll(properties);
+        return this;
+    }
+
+    public Map<String, String> getProperties() {
+        return conf.getProperties();
+    }
+
+    public ConsumerConfigurationData<byte[]> getConfigurationData() {
+        return conf;
+    }
+
+     /**
+     * @param subscriptionInitialPosition the initial position at which to set
+     * set cursor  when subscribing to the topic first time
+     * Default is {@value InitialPosition.Latest}
+     */
+    public ConsumerConfiguration setSubscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition) {
+        conf.setSubscriptionInitialPosition(subscriptionInitialPosition);
+        return this;
+    }
+
+    /**
+     * @return the configured {@link subscriptionInitailPosition} for the consumer
+     */
+    public SubscriptionInitialPosition getSubscriptionInitialPosition(){
+        return conf.getSubscriptionInitialPosition();
+    }
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java
new file mode 100644
index 0000000000..4a9b99f87a
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pulsar.client.impl.MessageBuilderImpl;
+
+/**
+ * Message builder factory. Use this class to create messages to be send to the Pulsar producer
+ *
+ * @deprecated since 2.0. Use {@link TypedMessageBuilder} as returned by {@link Producer#newMessage()} to create a new
+ *             message builder.
+ */
+@Deprecated
+public interface MessageBuilder {
+
+    static MessageBuilder create() {
+        return new MessageBuilderImpl();
+    }
+
+    /**
+     * Finalize the immutable message
+     *
+     * @return a {@link Message} ready to be sent through a {@link Producer}
+     */
+    Message<byte[]> build();
+
+    /**
+     * Set the content of the message
+     *
+     * @param data
+     *            array containing the payload
+     */
+    MessageBuilder setContent(byte[] data);
+
+    /**
+     * Set the content of the message
+     *
+     * @param data
+     *            array containing the payload
+     * @param offset
+     *            offset into the data array
+     * @param length
+     *            length of the payload starting from the above offset
+     */
+    MessageBuilder setContent(byte[] data, int offset, int length);
+
+    /**
+     * Set the content of the message
+     *
+     * @param buf
+     *            a {@link ByteBuffer} with the payload of the message
+     */
+    MessageBuilder setContent(ByteBuffer buf);
+
+    /**
+     * Sets a new property on a message.
+     *
+     * @param name
+     *            the name of the property
+     * @param value
+     *            the associated value
+     */
+    MessageBuilder setProperty(String name, String value);
+
+    /**
+     * Add all the properties in the provided map
+     */
+    MessageBuilder setProperties(Map<String, String> properties);
+
+    /**
+     * Sets the key of the message for routing policy
+     *
+     * @param key
+     */
+    MessageBuilder setKey(String key);
+
+    /**
+     * Set the event time for a given message.
+     *
+     * <p>
+     * Applications can retrieve the event time by calling {@link Message#getEventTime()}.
+     *
+     * <p>
+     * Note: currently pulsar doesn't support event-time based index. so the subscribers can't seek the messages by
+     * event time.
+     *
+     * @since 1.20.0
+     */
+    MessageBuilder setEventTime(long timestamp);
+
+    /**
+     * Specify a custom sequence id for the message being published.
+     * <p>
+     * The sequence id can be used for deduplication purposes and it needs to follow these rules:
+     * <ol>
+     * <li><code>sequenceId >= 0</code>
+     * <li>Sequence id for a message needs to be greater than sequence id for earlier messages:
+     * <code>sequenceId(N+1) > sequenceId(N)</code>
+     * <li>It's not necessary for sequence ids to be consecutive. There can be holes between messages. Eg. the
+     * <code>sequenceId</code> could represent an offset or a cumulative size.
+     * </ol>
+     *
+     * @param sequenceId
+     *            the sequence id to assign to the current message
+     * @since 1.20.0
+     */
+    MessageBuilder setSequenceId(long sequenceId);
+
+    /**
+     * Override the replication clusters for this message.
+     *
+     * @param clusters
+     */
+    MessageBuilder setReplicationClusters(List<String> clusters);
+
+    /**
+     * Disable replication for this message.
+     */
+    MessageBuilder disableReplication();
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Producer.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Producer.java
new file mode 100644
index 0000000000..4d69668850
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Producer.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Producer object.
+ *
+ * The producer is used to publish messages on a topic
+ *
+ *
+ */
+public interface Producer extends Closeable {
+
+    /**
+     * @return the topic which producer is publishing to
+     */
+    String getTopic();
+
+    /**
+     * @return the producer name which could have been assigned by the system or specified by the client
+     */
+    String getProducerName();
+
+    /**
+     * Sends a message.
+     * <p>
+     * This call will be blocking until is successfully acknowledged by the Pulsar broker.
+     * <p>
+     * Use {@link #newMessage()} to specify more properties than just the value on the message to be sent.
+     *
+     * @param message
+     *            a message
+     * @return the message id assigned to the published message
+     * @throws PulsarClientException.TimeoutException
+     *             if the message was not correctly received by the system within the timeout period
+     * @throws PulsarClientException.AlreadyClosedException
+     *             if the producer was already closed
+     */
+    MessageId send(byte[] message) throws PulsarClientException;
+
+    /**
+     * Send a message asynchronously
+     * <p>
+     * When the producer queue is full, by default this method will complete the future with an exception
+     * {@link PulsarClientException.ProducerQueueIsFullError}
+     * <p>
+     * See {@link ProducerBuilder#maxPendingMessages(int)} to configure the producer queue size and
+     * {@link ProducerBuilder#blockIfQueueFull(boolean)} to change the blocking behavior.
+     * <p>
+     * Use {@link #newMessage()} to specify more properties than just the value on the message to be sent.
+     *
+     * @param message
+     *            a byte array with the payload of the message
+     * @return a future that can be used to track when the message will have been safely persisted
+     */
+    CompletableFuture<MessageId> sendAsync(byte[] message);
+
+    /**
+     * Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
+     *
+     * @throws PulsarClientException
+     * @since 2.1.0
+     * @see #flushAsync()
+     */
+    void flush() throws PulsarClientException;
+
+    /**
+     * Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
+     *
+     * @return a future that can be used to track when all the messages have been safely persisted.
+     * @since 2.1.0
+     * @see #flush()
+     */
+    CompletableFuture<Void> flushAsync();
+
+    /**
+     * Send a message
+     *
+     * @param message
+     *            a message
+     * @return the message id assigned to the published message
+     * @throws PulsarClientException.TimeoutException
+     *             if the message was not correctly received by the system within the timeout period
+     *
+     * @deprecated since 2.0. Use {@link TypedMessageBuilder} as returned by {@link Producer#newMessage()} to create a
+     *             new message builder.
+     */
+    @Deprecated
+    MessageId send(Message<byte[]> message) throws PulsarClientException;
+
+    /**
+     * Send a message asynchronously
+     * <p>
+     * When the returned {@link CompletableFuture} is marked as completed successfully, the provided message will
+     * contain the {@link MessageId} assigned by the broker to the published message.
+     * <p>
+     * Example:
+     *
+     * <pre>
+     * <code>Message msg = MessageBuilder.create().setContent(myContent).build();
+     * producer.sendAsync(msg).thenRun(v -> {
+     *    System.out.println("Published message: " + msg.getMessageId());
+     * }).exceptionally(e -> {
+     *    // Failed to publish
+     * });</code>
+     * </pre>
+     * <p>
+     * When the producer queue is full, by default this method will complete the future with an exception
+     * {@link PulsarClientException.ProducerQueueIsFullError}
+     * <p>
+     * See {@link ProducerBuilder#maxPendingMessages(int)} to configure the producer queue size and
+     * {@link ProducerBuilder#blockIfQueueFull(boolean)} to change the blocking behavior.
+     *
+     * @param message
+     *            a message
+     * @return a future that can be used to track when the message will have been safely persisted
+     * @deprecated since 2.0. Use {@link TypedMessageBuilder} as returned by {@link Producer#newMessage()} to create a
+     *             new message builder.
+     */
+    @Deprecated
+    CompletableFuture<MessageId> sendAsync(Message<byte[]> message);
+
+    /**
+     * Get the last sequence id that was published by this producer.
+     * <p>
+     * This represent either the automatically assigned or custom sequence id (set on the {@link MessageBuilder}) that
+     * was published and acknowledged by the broker.
+     * <p>
+     * After recreating a producer with the same producer name, this will return the last message that was published in
+     * the previous producer session, or -1 if there no message was ever published.
+     *
+     * @return the last sequence id published by this producer
+     */
+    long getLastSequenceId();
+
+    /**
+     * Get statistics for the producer
+     *
+     * <ul>
+     * <li>numMsgsSent : Number of messages sent in the current interval
+     * <li>numBytesSent : Number of bytes sent in the current interval
+     * <li>numSendFailed : Number of messages failed to send in the current interval
+     * <li>numAcksReceived : Number of acks received in the current interval
+     * <li>totalMsgsSent : Total number of messages sent
+     * <li>totalBytesSent : Total number of bytes sent
+     * <li>totalSendFailed : Total number of messages failed to send
+     * <li>totalAcksReceived: Total number of acks received
+     * </ul>
+     *
+     * @return statistic for the producer or null if ProducerStatsRecorderImpl is disabled.
+     */
+    ProducerStats getStats();
+
+    /**
+     * Close the producer and releases resources allocated.
+     *
+     * No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
+     * of errors, pending writes will not be retried.
+     *
+     * @throws PulsarClientException.AlreadyClosedException
+     *             if the producer was already closed
+     */
+    @Override
+    void close() throws PulsarClientException;
+
+    /**
+     * Close the producer and releases resources allocated.
+     *
+     * No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
+     * of errors, pending writes will not be retried.
+     *
+     * @return a future that can used to track when the producer has been closed
+     */
+    CompletableFuture<Void> closeAsync();
+
+    /**
+     * @return Whether the producer is connected to the broker
+     */
+    boolean isConnected();
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
new file mode 100644
index 0000000000..1af077be9c
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+
+import lombok.EqualsAndHashCode;
+
+/**
+ * Producer's configuration
+ *
+ * @deprecated use {@link PulsarClient#newProducer()} to construct and configure a {@link Producer} instance
+ */
+@Deprecated
+@EqualsAndHashCode
+public class ProducerConfiguration implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final ProducerConfigurationData conf = new ProducerConfigurationData();
+
+    @Deprecated
+    public enum MessageRoutingMode {
+        SinglePartition, RoundRobinPartition, CustomPartition
+    }
+
+    @Deprecated
+    public enum HashingScheme {
+        JavaStringHash, Murmur3_32Hash
+    }
+
+    /**
+     * @return the configured custom producer name or null if no custom name was specified
+     * @since 1.20.0
+     */
+    public String getProducerName() {
+        return conf.getProducerName();
+    }
+
+    /**
+     * Specify a name for the producer
+     * <p>
+     * If not assigned, the system will generate a globally unique name which can be access with
+     * {@link Producer#getProducerName()}.
+     * <p>
+     * When specifying a name, it is app to the user to ensure that, for a given topic, the producer name is unique
+     * across all Pulsar's clusters.
+     * <p>
+     * If a producer with the same name is already connected to a particular topic, the
+     * {@link PulsarClient#createProducer(String)} operation will fail with {@link ProducerBusyException}.
+     *
+     * @param producerName
+     *            the custom name to use for the producer
+     * @since 1.20.0
+     */
+    public void setProducerName(String producerName) {
+        conf.setProducerName(producerName);
+    }
+
+    /**
+     * @return the message send timeout in ms
+     */
+    public long getSendTimeoutMs() {
+        return conf.getSendTimeoutMs();
+    }
+
+    /**
+     * Set the send timeout <i>(default: 30 seconds)</i>
+     * <p>
+     * If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.
+     *
+     * @param sendTimeout
+     *            the send timeout
+     * @param unit
+     *            the time unit of the {@code sendTimeout}
+     */
+    public ProducerConfiguration setSendTimeout(int sendTimeout, TimeUnit unit) {
+        checkArgument(sendTimeout >= 0);
+        conf.setSendTimeoutMs(unit.toMillis(sendTimeout));
+        return this;
+    }
+
+    /**
+     * @return the maximum number of messages allowed in the outstanding messages queue for the producer
+     */
+    public int getMaxPendingMessages() {
+        return conf.getMaxPendingMessages();
+    }
+
+    /**
+     * Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
+     * <p>
+     * When the queue is full, by default, all calls to {@link Producer#send} and {@link Producer#sendAsync} will fail
+     * unless blockIfQueueFull is set to true. Use {@link #setBlockIfQueueFull} to change the blocking behavior.
+     *
+     * @param maxPendingMessages
+     * @return
+     */
+    public ProducerConfiguration setMaxPendingMessages(int maxPendingMessages) {
+        checkArgument(maxPendingMessages > 0);
+        conf.setMaxPendingMessages(maxPendingMessages);
+        return this;
+    }
+
+    public HashingScheme getHashingScheme() {
+        return HashingScheme.valueOf(conf.getHashingScheme().toString());
+    }
+
+    public ProducerConfiguration setHashingScheme(HashingScheme hashingScheme) {
+        conf.setHashingScheme(org.apache.pulsar.client.api.HashingScheme.valueOf(hashingScheme.toString()));
+        return this;
+    }
+
+    /**
+     *
+     * @return the maximum number of pending messages allowed across all the partitions
+     */
+    public int getMaxPendingMessagesAcrossPartitions() {
+        return conf.getMaxPendingMessagesAcrossPartitions();
+    }
+
+    /**
+     * Set the number of max pending messages across all the partitions
+     * <p>
+     * This setting will be used to lower the max pending messages for each partition
+     * ({@link #setMaxPendingMessages(int)}), if the total exceeds the configured value.
+     *
+     * @param maxPendingMessagesAcrossPartitions
+     */
+    public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
+        checkArgument(maxPendingMessagesAcrossPartitions >= conf.getMaxPendingMessages());
+        conf.setMaxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions);
+    }
+
+    /**
+     *
+     * @return whether the producer will block {@link Producer#send} and {@link Producer#sendAsync} operations when the
+     *         pending queue is full
+     */
+    public boolean getBlockIfQueueFull() {
+        return conf.isBlockIfQueueFull();
+    }
+
+    /**
+     * Set whether the {@link Producer#send} and {@link Producer#sendAsync} operations should block when the outgoing
+     * message queue is full.
+     * <p>
+     * Default is <code>false</code>. If set to <code>false</code>, send operations will immediately fail with
+     * {@link PulsarClientException.ProducerQueueIsFullError} when there is no space left in pending queue.
+     *
+     * @param blockIfQueueFull
+     *            whether to block {@link Producer#send} and {@link Producer#sendAsync} operations on queue full
+     * @return
+     */
+    public ProducerConfiguration setBlockIfQueueFull(boolean blockIfQueueFull) {
+        conf.setBlockIfQueueFull(blockIfQueueFull);
+        return this;
+    }
+
+    /**
+     * Set the message routing mode for the partitioned producer.
+     *
+     * @param messageRouteMode message routing mode.
+     * @return producer configuration
+     * @see MessageRoutingMode
+     */
+    public ProducerConfiguration setMessageRoutingMode(MessageRoutingMode messageRouteMode) {
+        checkNotNull(messageRouteMode);
+        conf.setMessageRoutingMode(
+                org.apache.pulsar.client.api.MessageRoutingMode.valueOf(messageRouteMode.toString()));
+        return this;
+    }
+
+    /**
+     * Get the message routing mode for the partitioned producer.
+     *
+     * @return message routing mode, default is round-robin routing.
+     * @see MessageRoutingMode#RoundRobinPartition
+     */
+    public MessageRoutingMode getMessageRoutingMode() {
+        return MessageRoutingMode.valueOf(conf.getMessageRoutingMode().toString());
+    }
+
+    /**
+     * Set the compression type for the producer.
+     * <p>
+     * By default, message payloads are not compressed. Supported compression types are:
+     * <ul>
+     * <li><code>CompressionType.LZ4</code></li>
+     * <li><code>CompressionType.ZLIB</code></li>
+     * </ul>
+     *
+     * @param compressionType
+     * @return
+     *
+     * @since 1.0.28 <br>
+     *        Make sure all the consumer applications have been updated to use this client version, before starting to
+     *        compress messages.
+     */
+    public ProducerConfiguration setCompressionType(CompressionType compressionType) {
+        conf.setCompressionType(compressionType);
+        return this;
+    }
+
+    /**
+     * @return the configured compression type for this producer
+     */
+    public CompressionType getCompressionType() {
+        return conf.getCompressionType();
+    }
+
+    /**
+     * Set a custom message routing policy by passing an implementation of MessageRouter
+     *
+     *
+     * @param messageRouter
+     */
+    public ProducerConfiguration setMessageRouter(MessageRouter messageRouter) {
+        checkNotNull(messageRouter);
+        setMessageRoutingMode(MessageRoutingMode.CustomPartition);
+        conf.setCustomMessageRouter(messageRouter);
+        return this;
+    }
+
+    /**
+     * Get the message router set by {@link #setMessageRouter(MessageRouter)}.
+     *
+     * @return message router.
+     * @deprecated since 1.22.0-incubating. <tt>numPartitions</tt> is already passed as parameter in
+     *             {@link MessageRouter#choosePartition(Message, TopicMetadata)}.
+     * @see MessageRouter
+     */
+    @Deprecated
+    public MessageRouter getMessageRouter(int numPartitions) {
+        return conf.getCustomMessageRouter();
+    }
+
+    /**
+     * Get the message router set by {@link #setMessageRouter(MessageRouter)}.
+     *
+     * @return message router set by {@link #setMessageRouter(MessageRouter)}.
+     */
+    public MessageRouter getMessageRouter() {
+        return conf.getCustomMessageRouter();
+    }
+
+    /**
+     * Return the flag whether automatic message batching is enabled or not.
+     *
+     * @return true if batch messages are enabled. otherwise false.
+     * @since 2.0.0 <br>
+     *        It is enabled by default.
+     */
+    public boolean getBatchingEnabled() {
+        return conf.isBatchingEnabled();
+    }
+
+    /**
+     * Control whether automatic batching of messages is enabled for the producer. <i>default: false [No batching]</i>
+     *
+     * When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the
+     * broker, leading to better throughput, especially when publishing small messages. If compression is enabled,
+     * messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or
+     * contents.
+     *
+     * When enabled default batch delay is set to 10 ms and default batch size is 1000 messages
+     *
+     * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit)
+     * @since 1.0.36 <br>
+     *        Make sure all the consumer applications have been updated to use this client version, before starting to
+     *        batch messages.
+     *
+     */
+    public ProducerConfiguration setBatchingEnabled(boolean batchMessagesEnabled) {
+        conf.setBatchingEnabled(batchMessagesEnabled);
+        return this;
+    }
+
+    /**
+     * @return the CryptoKeyReader
+     */
+    public CryptoKeyReader getCryptoKeyReader() {
+        return conf.getCryptoKeyReader();
+    }
+
+    /**
+     * Sets a {@link CryptoKeyReader}
+     *
+     * @param cryptoKeyReader
+     *            CryptoKeyReader object
+     */
+    public ProducerConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
+        checkNotNull(cryptoKeyReader);
+        conf.setCryptoKeyReader(cryptoKeyReader);
+        return this;
+    }
+
+    /**
+     *
+     * @return encryptionKeys
+     *
+     */
+    public Set<String> getEncryptionKeys() {
+        return conf.getEncryptionKeys();
+    }
+
+    /**
+     *
+     * Returns true if encryption keys are added
+     *
+     */
+    public boolean isEncryptionEnabled() {
+        return conf.isEncryptionEnabled();
+    }
+
+    /**
+     * Add public encryption key, used by producer to encrypt the data key.
+     *
+     * At the time of producer creation, Pulsar client checks if there are keys added to encryptionKeys. If keys are
+     * found, a callback getKey(String keyName) is invoked against each key to load the values of the key. Application
+     * should implement this callback to return the key in pkcs8 format. If compression is enabled, message is encrypted
+     * after compression. If batch messaging is enabled, the batched message is encrypted.
+     *
+     */
+    public void addEncryptionKey(String key) {
+        conf.getEncryptionKeys().add(key);
+    }
+
+    public void removeEncryptionKey(String key) {
+        conf.getEncryptionKeys().remove(key);
+    }
+
+    /**
+     * Sets the ProducerCryptoFailureAction to the value specified
+     *
+     * @param action
+     *            The producer action
+     */
+    public void setCryptoFailureAction(ProducerCryptoFailureAction action) {
+        conf.setCryptoFailureAction(action);
+    }
+
+    /**
+     * @return The ProducerCryptoFailureAction
+     */
+    public ProducerCryptoFailureAction getCryptoFailureAction() {
+        return conf.getCryptoFailureAction();
+    }
+
+    /**
+     *
+     * @return the batch time period in ms.
+     * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit)
+     */
+    public long getBatchingMaxPublishDelayMs() {
+        return TimeUnit.MICROSECONDS.toMillis(conf.getBatchingMaxPublishDelayMicros());
+    }
+
+    /**
+     * Set the time period within which the messages sent will be batched <i>default: 10ms</i> if batch messages are
+     * enabled. If set to a non zero value, messages will be queued until this time interval or until
+     *
+     * @see ProducerConfiguration#batchingMaxMessages threshold is reached; all messages will be published as a single
+     *      batch message. The consumer will be delivered individual messages in the batch in the same order they were
+     *      enqueued
+     * @since 1.0.36 <br>
+     *        Make sure all the consumer applications have been updated to use this client version, before starting to
+     *        batch messages.
+     * @param batchDelay
+     *            the batch delay
+     * @param timeUnit
+     *            the time unit of the {@code batchDelay}
+     * @return
+     */
+    public ProducerConfiguration setBatchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit) {
+        long delayInMs = timeUnit.toMillis(batchDelay);
+        checkArgument(delayInMs >= 1, "configured value for batch delay must be at least 1ms");
+        conf.setBatchingMaxPublishDelayMicros(timeUnit.toMicros(batchDelay));
+        return this;
+    }
+
+    /**
+     *
+     * @return the maximum number of messages permitted in a batch.
+     */
+    public int getBatchingMaxMessages() {
+        return conf.getBatchingMaxMessages();
+    }
+
+    /**
+     * Set the maximum number of messages permitted in a batch. <i>default: 1000</i> If set to a value greater than 1,
+     * messages will be queued until this threshold is reached or batch interval has elapsed
+     *
+     * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit) All messages in batch will be published as
+     *      a single batch message. The consumer will be delivered individual messages in the batch in the same order
+     *      they were enqueued
+     * @param batchMessagesMaxMessagesPerBatch
+     *            maximum number of messages in a batch
+     * @return
+     */
+    public ProducerConfiguration setBatchingMaxMessages(int batchMessagesMaxMessagesPerBatch) {
+        checkArgument(batchMessagesMaxMessagesPerBatch > 0);
+        conf.setBatchingMaxMessages(batchMessagesMaxMessagesPerBatch);
+        return this;
+    }
+
+    public Optional<Long> getInitialSequenceId() {
+        return Optional.ofNullable(conf.getInitialSequenceId());
+    }
+
+    /**
+     * Set the baseline for the sequence ids for messages published by the producer.
+     * <p>
+     * First message will be using (initialSequenceId + 1) as its sequence id and subsequent messages will be assigned
+     * incremental sequence ids, if not otherwise specified.
+     *
+     * @param initialSequenceId
+     * @return
+     */
+    public ProducerConfiguration setInitialSequenceId(long initialSequenceId) {
+        conf.setInitialSequenceId(initialSequenceId);
+        return this;
+    }
+
+    /**
+     * Set a name/value property with this producer.
+     *
+     * @param key
+     * @param value
+     * @return
+     */
+    public ProducerConfiguration setProperty(String key, String value) {
+        checkArgument(key != null);
+        checkArgument(value != null);
+        conf.getProperties().put(key, value);
+        return this;
+    }
+
+    /**
+     * Add all the properties in the provided map
+     *
+     * @param properties
+     * @return
+     */
+    public ProducerConfiguration setProperties(Map<String, String> properties) {
+        conf.getProperties().putAll(properties);
+        return this;
+    }
+
+    public Map<String, String> getProperties() {
+        return conf.getProperties();
+    }
+
+    public ProducerConfigurationData getProducerConfigurationData() {
+        return conf;
+    }
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
new file mode 100644
index 0000000000..a57a6846c8
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.pulsar.client.impl.v1.PulsarClientV1Impl;
+
+/**
+ * Class that provides a client interface to Pulsar.
+ * <p>
+ * Client instances are thread-safe and can be reused for managing multiple {@link Producer}, {@link Consumer} and
+ * {@link Reader} instances.
+ */
+public interface PulsarClient extends Closeable {
+
+    /**
+     * Create a new PulsarClient object using default client configuration
+     *
+     * @param serviceUrl
+     *            the url of the Pulsar endpoint to be used
+     * @return a new pulsar client object
+     * @throws PulsarClientException.InvalidServiceURL
+     *             if the serviceUrl is invalid
+     * @deprecated use {@link #builder()} to construct a client instance
+     */
+    @Deprecated
+    public static PulsarClient create(String serviceUrl) throws PulsarClientException {
+        return create(serviceUrl, new ClientConfiguration());
+    }
+
+    /**
+     * Create a new PulsarClient object
+     *
+     * @param serviceUrl
+     *            the url of the Pulsar endpoint to be used
+     * @param conf
+     *            the client configuration
+     * @return a new pulsar client object
+     * @throws PulsarClientException.InvalidServiceURL
+     *             if the serviceUrl is invalid
+     * @deprecated use {@link #builder()} to construct a client instance
+     */
+    @Deprecated
+    public static PulsarClient create(String serviceUrl, ClientConfiguration conf) throws PulsarClientException {
+        return new PulsarClientV1Impl(serviceUrl, conf);
+    }
+
+    /**
+     * Create a producer with default {@link ProducerConfiguration} for publishing on a specific topic
+     *
+     * @param topic
+     *            The name of the topic where to produce
+     * @return The producer object
+     * @throws PulsarClientException.AlreadyClosedException
+     *             if the client was already closed
+     * @throws PulsarClientException.InvalidTopicNameException
+     *             if the topic name is not valid
+     * @throws PulsarClientException.AuthenticationException
+     *             if there was an error with the supplied credentials
+     * @throws PulsarClientException.AuthorizationException
+     *             if the authorization to publish on topic was denied
+     * @deprecated use {@link #newProducer()} to build a new producer
+     */
+    @Deprecated
+    Producer createProducer(String topic) throws PulsarClientException;
+
+    /**
+     * Asynchronously create a producer with default {@link ProducerConfiguration} for publishing on a specific topic
+     *
+     * @param topic
+     *            The name of the topic where to produce
+     * @return Future of the asynchronously created producer object
+     * @deprecated use {@link #newProducer()} to build a new producer
+     */
+    @Deprecated
+    CompletableFuture<Producer> createProducerAsync(String topic);
+
+    /**
+     * Create a producer with given {@code ProducerConfiguration} for publishing on a specific topic
+     *
+     * @param topic
+     *            The name of the topic where to produce
+     * @param conf
+     *            The {@code ProducerConfiguration} object
+     * @return The producer object
+     * @throws PulsarClientException
+     *             if it was not possible to create the producer
+     * @throws InterruptedException
+     * @deprecated use {@link #newProducer()} to build a new producer
+     */
+    @Deprecated
+    Producer createProducer(String topic, ProducerConfiguration conf) throws PulsarClientException;
+
+    /**
+     * Asynchronously create a producer with given {@code ProducerConfiguration} for publishing on a specific topic
+     *
+     * @param topic
+     *            The name of the topic where to produce
+     * @param conf
+     *            The {@code ProducerConfiguration} object
+     * @return Future of the asynchronously created producer object
+     * @deprecated use {@link #newProducer()} to build a new producer
+     */
+    @Deprecated
+    CompletableFuture<Producer> createProducerAsync(String topic, ProducerConfiguration conf);
+
+    /**
+     * Subscribe to the given topic and subscription combination with default {@code ConsumerConfiguration}
+     *
+     * @param topic
+     *            The name of the topic
+     * @param subscription
+     *            The name of the subscription
+     * @return The {@code Consumer} object
+     * @throws PulsarClientException
+     * @throws InterruptedException
+     *
+     * @deprecated Use {@link #newConsumer()} to build a new consumer
+     */
+    @Deprecated
+    Consumer subscribe(String topic, String subscription) throws PulsarClientException;
+
+    /**
+     * Asynchronously subscribe to the given topic and subscription combination using default
+     * {@code ConsumerConfiguration}
+     *
+     * @param topic
+     *            The topic name
+     * @param subscription
+     *            The subscription name
+     * @return Future of the {@code Consumer} object
+     * @deprecated Use {@link #newConsumer()} to build a new consumer
+     */
+    @Deprecated
+    CompletableFuture<Consumer> subscribeAsync(String topic, String subscription);
+
+    /**
+     * Subscribe to the given topic and subscription combination with given {@code ConsumerConfiguration}
+     *
+     * @param topic
+     *            The name of the topic
+     * @param subscription
+     *            The name of the subscription
+     * @param conf
+     *            The {@code ConsumerConfiguration} object
+     * @return The {@code Consumer} object
+     * @throws PulsarClientException
+     * @deprecated Use {@link #newConsumer()} to build a new consumer
+     */
+    @Deprecated
+    Consumer subscribe(String topic, String subscription, ConsumerConfiguration conf) throws PulsarClientException;
+
+    /**
+     * Asynchronously subscribe to the given topic and subscription combination using given
+     * {@code ConsumerConfiguration}
+     *
+     * @param topic
+     *            The name of the topic
+     * @param subscription
+     *            The name of the subscription
+     * @param conf
+     *            The {@code ConsumerConfiguration} object
+     * @return Future of the {@code Consumer} object
+     * @deprecated Use {@link #newConsumer()} to build a new consumer
+     */
+    @Deprecated
+    CompletableFuture<Consumer> subscribeAsync(String topic, String subscription, ConsumerConfiguration conf);
+
+    /**
+     * Create a topic reader with given {@code ReaderConfiguration} for reading messages from the specified topic.
+     * <p>
+     * The Reader provides a low-level abstraction that allows for manual positioning in the topic, without using a
+     * subscription. Reader can only work on non-partitioned topics.
+     * <p>
+     * The initial reader positioning is done by specifying a message id. The options are:
+     * <ul>
+     * <li><code>MessageId.earliest</code> : Start reading from the earliest message available in the topic
+     * <li><code>MessageId.latest</code> : Start reading from the end topic, only getting messages published after the
+     * reader was created
+     * <li><code>MessageId</code> : When passing a particular message id, the reader will position itself on that
+     * specific position. The first message to be read will be the message next to the specified messageId.
+     * </ul>
+     *
+     * @param topic
+     *            The name of the topic where to read
+     * @param startMessageId
+     *            The message id where the reader will position itself. The first message returned will be the one after
+     *            the specified startMessageId
+     * @param conf
+     *            The {@code ReaderConfiguration} object
+     * @return The {@code Reader} object
+     * @deprecated Use {@link #newReader()} to build a new reader
+     */
+    @Deprecated
+    Reader createReader(String topic, MessageId startMessageId, ReaderConfiguration conf) throws PulsarClientException;
+
+    /**
+     * Asynchronously create a topic reader with given {@code ReaderConfiguration} for reading messages from the
+     * specified topic.
+     * <p>
+     * The Reader provides a low-level abstraction that allows for manual positioning in the topic, without using a
+     * subscription. Reader can only work on non-partitioned topics.
+     * <p>
+     * The initial reader positioning is done by specifying a message id. The options are:
+     * <ul>
+     * <li><code>MessageId.earliest</code> : Start reading from the earliest message available in the topic
+     * <li><code>MessageId.latest</code> : Start reading from the end topic, only getting messages published after the
+     * reader was created
+     * <li><code>MessageId</code> : When passing a particular message id, the reader will position itself on that
+     * specific position. The first message to be read will be the message next to the specified messageId.
+     * </ul>
+     *
+     * @param topic
+     *            The name of the topic where to read
+     * @param startMessageId
+     *            The message id where the reader will position itself. The first message returned will be the one after
+     *            the specified startMessageId
+     * @param conf
+     *            The {@code ReaderConfiguration} object
+     * @return Future of the asynchronously created producer object
+     * @deprecated Use {@link #newReader()} to build a new reader
+     */
+    @Deprecated
+    CompletableFuture<Reader> createReaderAsync(String topic, MessageId startMessageId, ReaderConfiguration conf);
+
+    /**
+     * Close the PulsarClient and release all the resources.
+     *
+     * All the producers and consumers will be orderly closed. Waits until all pending write request are persisted.
+     *
+     * @throws PulsarClientException
+     *             if the close operation fails
+     */
+    @Override
+    void close() throws PulsarClientException;
+
+    /**
+     * Asynchronously close the PulsarClient and release all the resources.
+     *
+     * All the producers and consumers will be orderly closed. Waits until all pending write request are persisted.
+     *
+     * @throws PulsarClientException
+     *             if the close operation fails
+     */
+    CompletableFuture<Void> closeAsync();
+
+    /**
+     * Perform immediate shutdown of PulsarClient.
+     *
+     * Release all the resources and close all the producers without waiting for ongoing operations to complete.
+     *
+     * @throws PulsarClientException
+     *             if the forceful shutdown fails
+     */
+    void shutdown() throws PulsarClientException;
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Reader.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Reader.java
new file mode 100644
index 0000000000..4a435c3cbd
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Reader.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A Reader can be used to scan through all the messages currently available in a topic.
+ *
+ */
+public interface Reader extends Closeable {
+
+    /**
+     * @return the topic from which this reader is reading from
+     */
+    String getTopic();
+
+    /**
+     * Read the next message in the topic
+     *
+     * @return the next messasge
+     * @throws PulsarClientException
+     */
+    Message<byte[]> readNext() throws PulsarClientException;
+
+    /**
+     * Read the next message in the topic waiting for a maximum of timeout
+     * time units. Returns null if no message is recieved in that time.
+     *
+     * @return the next message(Could be null if none received in time)
+     * @throws PulsarClientException
+     */
+    Message<byte[]> readNext(int timeout, TimeUnit unit) throws PulsarClientException;
+
+    CompletableFuture<Message<byte[]>> readNextAsync();
+
+    /**
+     * Asynchronously close the reader and stop the broker to push more messages
+     *
+     * @return a future that can be used to track the completion of the operation
+     */
+    CompletableFuture<Void> closeAsync();
+
+    /**
+     * Return true if the topic was terminated and this reader has reached the end of the topic
+     */
+    boolean hasReachedEndOfTopic();
+
+    /**
+     * Check if there is any message available to read from the current position.
+     */
+    boolean hasMessageAvailable() throws PulsarClientException;
+
+    /**
+     * Asynchronously Check if there is message that has been published successfully to the broker in the topic.
+     */
+    CompletableFuture<Boolean> hasMessageAvailableAsync();
+
+    /**
+     * @return Whether the reader is connected to the broker
+     */
+    boolean isConnected();
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
new file mode 100644
index 0000000000..243166cbf8
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import org.apache.commons.lang3.StringUtils;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+
+/**
+ *
+ * @deprecated Use {@link PulsarClient#newReader()} to construct and configure a {@link Reader} instance
+ */
+@Deprecated
+public class ReaderConfiguration implements Serializable {
+
+    private final ReaderConfigurationData<byte[]> conf = new ReaderConfigurationData<>();
+
+    /**
+     * @return the configured {@link ReaderListener} for the reader
+     */
+    public ReaderListener<byte[]> getReaderListener() {
+        return conf.getReaderListener();
+    }
+
+    /**
+     * Sets a {@link ReaderListener} for the reader
+     * <p>
+     * When a {@link ReaderListener} is set, application will receive messages through it. Calls to
+     * {@link Reader#readNext()} will not be allowed.
+     *
+     * @param readerListener
+     *            the listener object
+     */
+    public ReaderConfiguration setReaderListener(ReaderListener<byte[]> readerListener) {
+        checkNotNull(readerListener);
+        conf.setReaderListener(readerListener);
+        return this;
+    }
+
+    /**
+     * @return the configure receiver queue size value
+     */
+    public int getReceiverQueueSize() {
+        return conf.getReceiverQueueSize();
+    }
+
+    /**
+     * @return the CryptoKeyReader
+     */
+    public CryptoKeyReader getCryptoKeyReader() {
+        return conf.getCryptoKeyReader();
+    }
+
+    /**
+     * Sets a {@link CryptoKeyReader}
+     *
+     * @param cryptoKeyReader
+     *            CryptoKeyReader object
+     */
+    public ReaderConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
+        checkNotNull(cryptoKeyReader);
+        conf.setCryptoKeyReader(cryptoKeyReader);
+        return this;
+    }
+
+    /**
+     * Sets the ConsumerCryptoFailureAction to the value specified
+     * 
+     * @param action
+     *            The action to take when the decoding fails
+     */
+    public void setCryptoFailureAction(ConsumerCryptoFailureAction action) {
+        conf.setCryptoFailureAction(action);
+    }
+
+    /**
+     * @return The ConsumerCryptoFailureAction
+     */
+    public ConsumerCryptoFailureAction getCryptoFailureAction() {
+        return conf.getCryptoFailureAction();
+    }
+
+    /**
+     * Sets the size of the consumer receive queue.
+     * <p>
+     * The consumer receive queue controls how many messages can be accumulated by the {@link Consumer} before the
+     * application calls {@link Consumer#receive()}. Using a higher value could potentially increase the consumer
+     * throughput at the expense of bigger memory utilization.
+     * </p>
+     * Default value is {@code 1000} messages and should be good for most use cases.
+     *
+     * @param receiverQueueSize
+     *            the new receiver queue size value
+     */
+    public ReaderConfiguration setReceiverQueueSize(int receiverQueueSize) {
+        checkArgument(receiverQueueSize >= 0, "Receiver queue size cannot be negative");
+        conf.setReceiverQueueSize(receiverQueueSize);
+        return this;
+    }
+
+    /**
+     * @return the consumer name
+     */
+    public String getReaderName() {
+        return conf.getReaderName();
+    }
+
+    /**
+     * Set the consumer name.
+     *
+     * @param readerName
+     */
+    public ReaderConfiguration setReaderName(String readerName) {
+        checkArgument(StringUtils.isNotBlank(readerName));
+        conf.setReaderName(readerName);
+        return this;
+    }
+
+    /**
+     * @return the subscription role prefix for subscription auth
+     */
+    public String getSubscriptionRolePrefix() {
+        return conf.getSubscriptionRolePrefix();
+    }
+
+    /**
+     * Set the subscription role prefix for subscription auth. The default prefix is "reader".
+     *
+     * @param subscriptionRolePrefix
+     */
+    public ReaderConfiguration setSubscriptionRolePrefix(String subscriptionRolePrefix) {
+        checkArgument(StringUtils.isNotBlank(subscriptionRolePrefix));
+        conf.setSubscriptionRolePrefix(subscriptionRolePrefix);
+        return this;
+    }
+
+    public ReaderConfigurationData<byte[]> getReaderConfigurationData() {
+        return conf;
+    }
+
+    private static final long serialVersionUID = 1L;
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java
new file mode 100644
index 0000000000..76b1dabd07
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.base.Preconditions;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+
+@SuppressWarnings("deprecation")
+public class MessageBuilderImpl implements MessageBuilder {
+    private static final ByteBuffer EMPTY_CONTENT = ByteBuffer.allocate(0);
+    private final MessageMetadata.Builder msgMetadataBuilder = MessageMetadata.newBuilder();
+    private ByteBuffer content = EMPTY_CONTENT;
+
+    @Override
+    public Message<byte[]> build() {
+        return MessageImpl.create(msgMetadataBuilder, content, Schema.BYTES);
+    }
+
+    @Override
+    public MessageBuilder setContent(byte[] data) {
+        setContent(data, 0, data.length);
+        return this;
+    }
+
+    @Override
+    public MessageBuilder setContent(byte[] data, int offet, int length) {
+        this.content = ByteBuffer.wrap(data, offet, length);
+        return this;
+    }
+
+    @Override
+    public MessageBuilder setContent(ByteBuffer buf) {
+        this.content = buf.duplicate();
+        return this;
+    }
+
+    @Override
+    public MessageBuilder setProperties(Map<String, String> properties) {
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            msgMetadataBuilder
+                    .addProperties(KeyValue.newBuilder().setKey(entry.getKey()).setValue(entry.getValue()).build());
+        }
+
+        return this;
+    }
+
+    @Override
+    public MessageBuilder setProperty(String name, String value) {
+        msgMetadataBuilder.addProperties(KeyValue.newBuilder().setKey(name).setValue(value).build());
+        return this;
+    }
+
+    @Override
+    public MessageBuilder setKey(String key) {
+        msgMetadataBuilder.setPartitionKey(key);
+        return this;
+    }
+
+    @Override
+    public MessageBuilder setEventTime(long timestamp) {
+        checkArgument(timestamp > 0, "Invalid timestamp : '%s'", timestamp);
+        msgMetadataBuilder.setEventTime(timestamp);
+        return this;
+    }
+
+    @Override
+    public MessageBuilder setSequenceId(long sequenceId) {
+        checkArgument(sequenceId >= 0);
+        msgMetadataBuilder.setSequenceId(sequenceId);
+        return this;
+    }
+
+    @Override
+    public MessageBuilder setReplicationClusters(List<String> clusters) {
+        Preconditions.checkNotNull(clusters);
+        msgMetadataBuilder.clearReplicateTo();
+        msgMetadataBuilder.addAllReplicateTo(clusters);
+        return this;
+    }
+
+    @Override
+    public MessageBuilder disableReplication() {
+        msgMetadataBuilder.clearReplicateTo();
+        msgMetadataBuilder.addReplicateTo("__local__");
+        return this;
+    }
+
+
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java
new file mode 100644
index 0000000000..e21e5720ea
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v1;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerStats;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+public class ConsumerV1Impl implements Consumer {
+    private final org.apache.pulsar.shade.client.api.v2.Consumer<byte[]> consumer;
+
+    public ConsumerV1Impl(org.apache.pulsar.shade.client.api.v2.Consumer<byte[]> consumer) {
+        this.consumer = consumer;
+    }
+
+    @Override
+    public void acknowledge(Message<?> arg0) throws PulsarClientException {
+        consumer.acknowledge(arg0);
+    }
+
+    @Override
+    public void acknowledge(MessageId arg0) throws PulsarClientException {
+        consumer.acknowledge(arg0);
+    }
+
+    @Override
+    public CompletableFuture<Void> acknowledgeAsync(Message<?> arg0) {
+        return consumer.acknowledgeAsync(arg0);
+    }
+
+    @Override
+    public CompletableFuture<Void> acknowledgeAsync(MessageId arg0) {
+        return consumer.acknowledgeAsync(arg0);
+    }
+
+    @Override
+    public void acknowledgeCumulative(Message<?> arg0) throws PulsarClientException {
+        consumer.acknowledgeCumulative(arg0);
+    }
+
+    @Override
+    public void acknowledgeCumulative(MessageId arg0) throws PulsarClientException {
+        consumer.acknowledgeCumulative(arg0);
+    }
+
+    @Override
+    public CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> arg0) {
+        return consumer.acknowledgeCumulativeAsync(arg0);
+    }
+
+    @Override
+    public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId arg0) {
+        return consumer.acknowledgeCumulativeAsync(arg0);
+    }
+
+    @Override
+    public void close() throws PulsarClientException {
+        consumer.close();
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        return consumer.closeAsync();
+    }
+
+    @Override
+    public String getConsumerName() {
+        return consumer.getConsumerName();
+    }
+
+    @Override
+    public ConsumerStats getStats() {
+        return consumer.getStats();
+    }
+
+    public String getSubscription() {
+        return consumer.getSubscription();
+    }
+
+    public String getTopic() {
+        return consumer.getTopic();
+    }
+
+    public boolean hasReachedEndOfTopic() {
+        return consumer.hasReachedEndOfTopic();
+    }
+
+    public boolean isConnected() {
+        return consumer.isConnected();
+    }
+
+    public void pause() {
+        consumer.pause();
+    }
+
+    public Message<byte[]> receive() throws PulsarClientException {
+        return consumer.receive();
+    }
+
+    public Message<byte[]> receive(int arg0, TimeUnit arg1) throws PulsarClientException {
+        return consumer.receive(arg0, arg1);
+    }
+
+    public CompletableFuture<Message<byte[]>> receiveAsync() {
+        return consumer.receiveAsync();
+    }
+
+    public void redeliverUnacknowledgedMessages() {
+        consumer.redeliverUnacknowledgedMessages();
+    }
+
+    public void resume() {
+        consumer.resume();
+    }
+
+    public void seek(MessageId arg0) throws PulsarClientException {
+        consumer.seek(arg0);
+    }
+
+    public CompletableFuture<Void> seekAsync(MessageId arg0) {
+        return consumer.seekAsync(arg0);
+    }
+
+    public void unsubscribe() throws PulsarClientException {
+        consumer.unsubscribe();
+    }
+
+    public CompletableFuture<Void> unsubscribeAsync() {
+        return consumer.unsubscribeAsync();
+    }
+
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ProducerV1Impl.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ProducerV1Impl.java
new file mode 100644
index 0000000000..3f91377228
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ProducerV1Impl.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v1;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerStats;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ProducerImpl;
+
+public class ProducerV1Impl implements Producer {
+
+    private final ProducerImpl<byte[]> producer;
+
+    public ProducerV1Impl(ProducerImpl<byte[]> producer) {
+        this.producer = producer;
+    }
+
+    public void close() throws PulsarClientException {
+        producer.close();
+    }
+
+    public CompletableFuture<Void> closeAsync() {
+        return producer.closeAsync();
+    }
+
+    public void flush() throws PulsarClientException {
+        producer.flush();
+    }
+
+    public CompletableFuture<Void> flushAsync() {
+        return producer.flushAsync();
+    }
+
+    public long getLastSequenceId() {
+        return producer.getLastSequenceId();
+    }
+
+    public ProducerStats getStats() {
+        return producer.getStats();
+    }
+
+    public boolean isConnected() {
+        return producer.isConnected();
+    }
+
+    public MessageId send(byte[] value) throws PulsarClientException {
+        return producer.send(value);
+    }
+
+    public MessageId send(Message<byte[]> value) throws PulsarClientException {
+        return producer.send(value);
+    }
+
+    public CompletableFuture<MessageId> sendAsync(byte[] arg0) {
+        return producer.sendAsync(arg0);
+    }
+
+    public CompletableFuture<MessageId> sendAsync(Message<byte[]> arg0) {
+        return producer.sendAsync(arg0);
+    }
+
+    @Override
+    public String getTopic() {
+        return producer.getTopic();
+    }
+
+    @Override
+    public String getProducerName() {
+        return producer.getProducerName();
+    }
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/PulsarClientV1Impl.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/PulsarClientV1Impl.java
new file mode 100644
index 0000000000..a10f9f12e3
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/PulsarClientV1Impl.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v1;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderConfiguration;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@SuppressWarnings("deprecation")
+public class PulsarClientV1Impl implements PulsarClient {
+
+    private final PulsarClientImpl client;
+
+    public PulsarClientV1Impl(String serviceUrl, ClientConfiguration conf) throws PulsarClientException {
+        this.client = new PulsarClientImpl(conf.setServiceUrl(serviceUrl).getConfigurationData().clone());
+    }
+
+    @Override
+    public void close() throws PulsarClientException {
+        client.close();
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        return client.closeAsync();
+    }
+
+    @Override
+    public Producer createProducer(final String topic, final ProducerConfiguration conf) throws PulsarClientException {
+        if (conf == null) {
+            throw new PulsarClientException.InvalidConfigurationException("Invalid null configuration object");
+        }
+
+        try {
+            return createProducerAsync(topic, conf).get();
+        } catch (ExecutionException e) {
+            Throwable t = e.getCause();
+            if (t instanceof PulsarClientException) {
+                throw (PulsarClientException) t;
+            } else {
+                throw new PulsarClientException(t);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    public Producer createProducer(String topic)
+            throws PulsarClientException {
+        return createProducer(topic, new ProducerConfiguration());
+    }
+
+    @Override
+    public CompletableFuture<Producer> createProducerAsync(final String topic, final ProducerConfiguration conf) {
+        ProducerConfigurationData confData = conf.getProducerConfigurationData().clone();
+        confData.setTopicName(topic);
+        return client.createProducerAsync(confData).thenApply(p -> new ProducerV1Impl((ProducerImpl<byte[]>) p));
+    }
+
+    @Override
+    public CompletableFuture<Producer> createProducerAsync(String topic) {
+        return createProducerAsync(topic, new ProducerConfiguration());
+    }
+
+    @Override
+    public Reader createReader(String topic, MessageId startMessageId, ReaderConfiguration conf)
+            throws PulsarClientException {
+        try {
+            return createReaderAsync(topic, startMessageId, conf).get();
+        } catch (ExecutionException e) {
+            Throwable t = e.getCause();
+            if (t instanceof PulsarClientException) {
+                throw (PulsarClientException) t;
+            } else {
+                throw new PulsarClientException(t);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Reader> createReaderAsync(String topic, MessageId startMessageId,
+            ReaderConfiguration conf) {
+        ReaderConfigurationData<byte[]> confData = conf.getReaderConfigurationData().clone();
+        confData.setTopicName(topic);
+        confData.setStartMessageId(startMessageId);
+        return client.createReaderAsync(confData).thenApply(r -> new ReaderV1Impl(r));
+    }
+
+    @Override
+    public void shutdown() throws PulsarClientException {
+        client.shutdown();
+    }
+
+    @Override
+    public Consumer subscribe(String topic, String subscriptionName) throws PulsarClientException {
+        return subscribe(topic, subscriptionName, new ConsumerConfiguration());
+    }
+
+    @Override
+    public CompletableFuture<Consumer> subscribeAsync(final String topic, final String subscription,
+            final ConsumerConfiguration conf) {
+        if (conf == null) {
+            return FutureUtil.failedFuture(
+                    new PulsarClientException.InvalidConfigurationException("Invalid null configuration"));
+        }
+
+        ConsumerConfigurationData<byte[]> confData = conf.getConfigurationData().clone();
+        confData.getTopicNames().add(topic);
+        confData.setSubscriptionName(subscription);
+        return client.subscribeAsync(confData).thenApply(c -> new ConsumerV1Impl(c));
+    }
+
+    @Override
+    public CompletableFuture<Consumer> subscribeAsync(String topic,
+            String subscriptionName) {
+        return subscribeAsync(topic, subscriptionName, new ConsumerConfiguration());
+    }
+
+    @Override
+    public Consumer subscribe(String topic, String subscription, ConsumerConfiguration conf)
+            throws PulsarClientException {
+        try {
+            return subscribeAsync(topic, subscription, conf).get();
+        } catch (ExecutionException e) {
+            Throwable t = e.getCause();
+            if (t instanceof PulsarClientException) {
+                throw (PulsarClientException) t;
+            } else {
+                throw new PulsarClientException(t);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException(e);
+        }
+    }
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ReaderV1Impl.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ReaderV1Impl.java
new file mode 100644
index 0000000000..7ae86b5502
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ReaderV1Impl.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v1;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+
+public class ReaderV1Impl implements Reader {
+
+    private final org.apache.pulsar.shade.client.api.v2.Reader<byte[]> reader;
+
+    public ReaderV1Impl(org.apache.pulsar.shade.client.api.v2.Reader<byte[]> reader) {
+        this.reader = reader;
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        return reader.closeAsync();
+    }
+
+    @Override
+    public String getTopic() {
+        return reader.getTopic();
+    }
+
+    @Override
+    public boolean hasMessageAvailable() throws PulsarClientException {
+        return reader.hasMessageAvailable();
+    }
+
+    @Override
+    public CompletableFuture<Boolean> hasMessageAvailableAsync() {
+        return reader.hasMessageAvailableAsync();
+    }
+
+    @Override
+    public boolean hasReachedEndOfTopic() {
+        return reader.hasReachedEndOfTopic();
+    }
+
+    @Override
+    public boolean isConnected() {
+        return reader.isConnected();
+    }
+
+    @Override
+    public Message<byte[]> readNext() throws PulsarClientException {
+        return reader.readNext();
+    }
+
+    @Override
+    public Message<byte[]> readNext(int arg0, TimeUnit arg1) throws PulsarClientException {
+        return reader.readNext(arg0, arg1);
+    }
+
+    @Override
+    public CompletableFuture<Message<byte[]>> readNextAsync() {
+        return reader.readNextAsync();
+    }
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageBuilderTest.java b/pulsar-client-1x-base/pulsar-client-1x/src/test/java/org/apache/pulsar/client/impl/MessageBuilderTest.java
similarity index 88%
rename from pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageBuilderTest.java
rename to pulsar-client-1x-base/pulsar-client-1x/src/test/java/org/apache/pulsar/client/impl/MessageBuilderTest.java
index 56630a49fa..5e14b7bafe 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageBuilderTest.java
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/test/java/org/apache/pulsar/client/impl/MessageBuilderTest.java
@@ -36,20 +36,20 @@
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testSetEventTimeNegative() {
-        MessageBuilder<?> builder = MessageBuilder.create();
+        MessageBuilder builder = MessageBuilder.create();
         builder.setEventTime(-1L);
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testSetEventTimeZero() {
-        MessageBuilder<?> builder = MessageBuilder.create();
+        MessageBuilder builder = MessageBuilder.create();
         builder.setEventTime(0L);
     }
 
     @Test
     public void testSetEventTimePositive() {
         long eventTime = System.currentTimeMillis();
-        MessageBuilder<?> builder = MessageBuilder.create();
+        MessageBuilder builder = MessageBuilder.create();
         builder.setContent(new byte[0]);
         builder.setEventTime(eventTime);
         Message<?> msg = builder.build();
@@ -58,7 +58,7 @@ public void testSetEventTimePositive() {
 
     @Test
     public void testBuildMessageWithoutEventTime() {
-        MessageBuilder<?> builder = MessageBuilder.create();
+        MessageBuilder builder = MessageBuilder.create();
         builder.setContent(new byte[0]);
         Message<?> msg = builder.build();
         assertEquals(0L, msg.getEventTime());
@@ -66,7 +66,7 @@ public void testBuildMessageWithoutEventTime() {
 
     @Test
     public void testSetMessageProperties() {
-        MessageBuilder<?> builder = MessageBuilder.create();
+        MessageBuilder builder = MessageBuilder.create();
         builder.setContent(new byte[0]);
         Map<String, String> map = Maps.newHashMap();
         map.put("key1", "value1");
diff --git a/pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml b/pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml
new file mode 100644
index 0000000000..6b6b2bc35a
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml
@@ -0,0 +1,91 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-client-1x-base</artifactId>
+    <version>2.3.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>pulsar-client-2x-shaded</artifactId>
+  <name>Pulsar Client 2.x Shaded API</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-client</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
+  </dependencies>
+  
+  <build>
+    <plugins>
+       <plugin>
+        <!-- Shade all the dependencies to avoid conflicts -->
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <createDependencyReducedPom>true</createDependencyReducedPom>
+              <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+              <minimizeJar>false</minimizeJar>
+
+              <artifactSet>
+                <includes>
+                  <include>org.apache.pulsar:pulsar-client</include>
+                </includes>
+              </artifactSet>
+              <filters>
+                <filter>
+                  <artifact>org.apache.pulsar:pulsar-client</artifact>
+                  <includes>
+                    <include>**</include>
+                  </includes>
+                </filter>
+              </filters>
+              <relocations>
+                <relocation>
+                  <pattern>org.apache.pulsar.client.api</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.client.api.v2</shadedPattern>
+                  <includes>
+                    <include>org.apache.pulsar.client.api.PulsarClient</include>
+                    <include>org.apache.pulsar.client.api.Producer</include>
+                    <include>org.apache.pulsar.client.api.Consumer</include>
+                    <include>org.apache.pulsar.client.api.Reader</include>
+                  </includes>
+                </relocation>
+              </relocations>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DefaultSchemasTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DefaultSchemasTest.java
index 70416f3495..46516f0cdb 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DefaultSchemasTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DefaultSchemasTest.java
@@ -18,9 +18,11 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
+import static org.testng.Assert.assertEquals;
+
+import java.nio.charset.StandardCharsets;
+
 import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -30,10 +32,6 @@
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import java.nio.charset.StandardCharsets;
-
-import static org.testng.Assert.assertEquals;
-
 public class DefaultSchemasTest {
     private PulsarClient client;
 
@@ -75,17 +73,7 @@ public void testStringSchema() throws Exception {
         Assert.assertTrue(stringSchema.decode(testBytes).equals(testString));
         assertEquals(stringSchema.encode(testString), testBytes);
 
-        Message<String> msg1 = MessageBuilder.create(stringSchema)
-                .setContent(testBytes)
-                .build();
-        assertEquals(stringSchema.decode(msg1.getData()), testString);
-
-        Message<String> msg2 = MessageBuilder.create(stringSchema)
-                .setValue(testString)
-                .build();
-        assertEquals(stringSchema.encode(testString), msg2.getData());
-
-        byte[] bytes2 = testString.getBytes(StandardCharsets.UTF_16);
+         byte[] bytes2 = testString.getBytes(StandardCharsets.UTF_16);
         StringSchema stringSchemaUtf16 = new StringSchema(StandardCharsets.UTF_16);
         Assert.assertTrue(stringSchemaUtf16.decode(bytes2).equals(testString));
         assertEquals(stringSchemaUtf16.encode(testString), bytes2);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services