You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/12/30 16:47:50 UTC
[pulsar] branch master updated: Provide separate module with Pulsar
v1 client API wrapper (#3228)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 742ee7e Provide separate module with Pulsar v1 client API wrapper (#3228)
742ee7e is described below
commit 742ee7eec3f150626b4017223429323aab82d61a
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sun Dec 30 08:47:45 2018 -0800
Provide separate module with Pulsar v1 client API wrapper (#3228)
* Provide separate module with Pulsar v1 client API wrapper
* Use impl classes rather than API
* Brought back DefaultSchemasTest.testStringSchema
---
pom.xml | 1 +
pulsar-client-1x-base/pom.xml | 40 ++
pulsar-client-1x-base/pulsar-client-1x/pom.xml | 52 +++
.../pulsar/client/api/ClientConfiguration.java | 390 +++++++++++++++++
.../org/apache/pulsar/client/api/Consumer.java | 308 +++++++++++++
.../pulsar/client/api/ConsumerConfiguration.java | 366 ++++++++++++++++
.../apache/pulsar/client/api/MessageBuilder.java | 140 ++++++
.../org/apache/pulsar/client/api/Producer.java | 199 +++++++++
.../pulsar/client/api/ProducerConfiguration.java | 483 +++++++++++++++++++++
.../org/apache/pulsar/client/api/PulsarClient.java | 274 ++++++++++++
.../java/org/apache/pulsar/client/api/Reader.java | 81 ++++
.../pulsar/client/api/ReaderConfiguration.java | 163 +++++++
.../pulsar/client/impl/MessageBuilderImpl.java | 116 +++++
.../pulsar/client/impl/v1/ConsumerV1Impl.java | 153 +++++++
.../pulsar/client/impl/v1/ProducerV1Impl.java | 91 ++++
.../pulsar/client/impl/v1/PulsarClientV1Impl.java | 173 ++++++++
.../apache/pulsar/client/impl/v1/ReaderV1Impl.java | 86 ++++
.../pulsar/client/impl/MessageBuilderTest.java | 10 +-
.../pulsar-client-2x-shaded/pom.xml | 91 ++++
.../client/impl/schema/DefaultSchemasTest.java | 22 +-
20 files changed, 3217 insertions(+), 22 deletions(-)
diff --git a/pom.xml b/pom.xml
index 2aaca3a..ee2780c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,6 +84,7 @@ flexible messaging model and an intuitive client API.</description>
<module>pulsar-client-api</module>
<module>pulsar-client</module>
<module>pulsar-client-shaded</module>
+ <module>pulsar-client-1x-base</module>
<module>pulsar-client-admin</module>
<module>pulsar-client-admin-shaded</module>
<module>pulsar-client-tools</module>
diff --git a/pulsar-client-1x-base/pom.xml b/pulsar-client-1x-base/pom.xml
new file mode 100644
index 0000000..22a5b1b
--- /dev/null
+++ b/pulsar-client-1x-base/pom.xml
@@ -0,0 +1,40 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar</artifactId>
+ <version>2.3.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>pulsar-client-1x-base</artifactId>
+ <name>Pulsar Client 1.x Compatibility Base</name>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>pulsar-client-2x-shaded</module>
+ <module>pulsar-client-1x</module>
+ </modules>
+</project>
diff --git a/pulsar-client-1x-base/pulsar-client-1x/pom.xml b/pulsar-client-1x-base/pulsar-client-1x/pom.xml
new file mode 100644
index 0000000..b6fd0b8
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/pom.xml
@@ -0,0 +1,52 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client-1x-base</artifactId>
+ <version>2.3.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>pulsar-client-1x</artifactId>
+ <name>Pulsar Client 1.x Compatibility API</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client-2x-shaded</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
new file mode 100644
index 0000000..e5bac91
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+
+/**
+ * Class used to specify client side configuration like authentication, etc..
+ *
+ * @deprecated Use {@link PulsarClient#builder()} to construct and configure a new {@link PulsarClient} instance
+ */
+@Deprecated
+public class ClientConfiguration implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ClientConfigurationData confData = new ClientConfigurationData();
+
+ /**
+ * @return the authentication provider to be used
+ */
+ public Authentication getAuthentication() {
+ return confData.getAuthentication();
+ }
+
+ /**
+ * Set the authentication provider to use in the Pulsar client instance.
+ * <p>
+ * Example:
+ * <p>
+ *
+ * <pre>
+ * <code>
+ * ClientConfiguration confData = new ClientConfiguration();
+ * String authPluginClassName = "org.apache.pulsar.client.impl.auth.MyAuthentication";
+ * String authParamsString = "key1:val1,key2:val2";
+ * Authentication auth = AuthenticationFactory.create(authPluginClassName, authParamsString);
+ * confData.setAuthentication(auth);
+ * PulsarClient client = PulsarClient.create(serviceUrl, confData);
+ * ....
+ * </code>
+ * </pre>
+ *
+ * @param authentication
+ */
+ public void setAuthentication(Authentication authentication) {
+ confData.setAuthentication(authentication);
+ }
+
+ /**
+ * Set the authentication provider to use in the Pulsar client instance.
+ * <p>
+ * Example:
+ * <p>
+ *
+ * <pre>
+ * <code>
+ * ClientConfiguration confData = new ClientConfiguration();
+ * String authPluginClassName = "org.apache.pulsar.client.impl.auth.MyAuthentication";
+ * String authParamsString = "key1:val1,key2:val2";
+ * confData.setAuthentication(authPluginClassName, authParamsString);
+ * PulsarClient client = PulsarClient.create(serviceUrl, confData);
+ * ....
+ * </code>
+ * </pre>
+ *
+ * @param authPluginClassName
+ * name of the Authentication-Plugin you want to use
+ * @param authParamsString
+ * string which represents parameters for the Authentication-Plugin, e.g., "key1:val1,key2:val2"
+ * @throws UnsupportedAuthenticationException
+ * failed to instantiate specified Authentication-Plugin
+ */
+ public void setAuthentication(String authPluginClassName, String authParamsString)
+ throws UnsupportedAuthenticationException {
+ confData.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParamsString));
+ }
+
+ /**
+ * Set the authentication provider to use in the Pulsar client instance.
+ * <p>
+ * Example:
+ * <p>
+ *
+ * <pre>
+ * <code>
+ * ClientConfiguration confData = new ClientConfiguration();
+ * String authPluginClassName = "org.apache.pulsar.client.impl.auth.MyAuthentication";
+ * Map<String, String> authParams = new HashMap<String, String>();
+ * authParams.put("key1", "val1");
+ * confData.setAuthentication(authPluginClassName, authParams);
+ * PulsarClient client = PulsarClient.create(serviceUrl, confData);
+ * ....
+ * </code>
+ * </pre>
+ *
+ * @param authPluginClassName
+ * name of the Authentication-Plugin you want to use
+ * @param authParams
+ * map which represents parameters for the Authentication-Plugin
+ * @throws UnsupportedAuthenticationException
+ * failed to instantiate specified Authentication-Plugin
+ */
+ public void setAuthentication(String authPluginClassName, Map<String, String> authParams)
+ throws UnsupportedAuthenticationException {
+ confData.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParams));
+ }
+
+ /**
+ * @return the operation timeout in ms
+ */
+ public long getOperationTimeoutMs() {
+ return confData.getOperationTimeoutMs();
+ }
+
+ /**
+ * Set the operation timeout <i>(default: 30 seconds)</i>
+ * <p>
+ * Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the
+ * operation will be marked as failed
+ *
+ * @param operationTimeout
+ * operation timeout
+ * @param unit
+ * time unit for {@code operationTimeout}
+ */
+ public void setOperationTimeout(int operationTimeout, TimeUnit unit) {
+ checkArgument(operationTimeout >= 0);
+ confData.setOperationTimeoutMs(unit.toMillis(operationTimeout));
+ }
+
+ /**
+ * @return the number of threads to use for handling connections
+ */
+ public int getIoThreads() {
+ return confData.getNumIoThreads();
+ }
+
+ /**
+ * Set the number of threads to be used for handling connections to brokers <i>(default: 1 thread)</i>
+ *
+ * @param numIoThreads
+ */
+ public void setIoThreads(int numIoThreads) {
+ checkArgument(numIoThreads > 0);
+ confData.setNumIoThreads(numIoThreads);
+ }
+
+ /**
+ * @return the number of threads to use for message listeners
+ */
+ public int getListenerThreads() {
+ return confData.getNumListenerThreads();
+ }
+
+ /**
+ * Set the number of threads to be used for message listeners <i>(default: 1 thread)</i>
+ *
+ * @param numListenerThreads
+ */
+ public void setListenerThreads(int numListenerThreads) {
+ checkArgument(numListenerThreads > 0);
+ confData.setNumListenerThreads(numListenerThreads);
+ }
+
+ /**
+ * @return the max number of connections per single broker
+ */
+ public int getConnectionsPerBroker() {
+ return confData.getConnectionsPerBroker();
+ }
+
+ /**
+ * Sets the max number of connection that the client library will open to a single broker.
+ * <p>
+ * By default, the connection pool will use a single connection for all the producers and consumers. Increasing this
+ * parameter may improve throughput when using many producers over a high latency connection.
+ * <p>
+ *
+ * @param connectionsPerBroker
+ * max number of connections per broker (needs to be greater than 0)
+ */
+ public void setConnectionsPerBroker(int connectionsPerBroker) {
+ checkArgument(connectionsPerBroker > 0, "Connections per broker need to be greater than 0");
+ confData.setConnectionsPerBroker(connectionsPerBroker);
+ }
+
+ /**
+ * @return whether TCP no-delay should be set on the connections
+ */
+ public boolean isUseTcpNoDelay() {
+ return confData.isUseTcpNoDelay();
+ }
+
+ /**
+ * Configure whether to use TCP no-delay flag on the connection, to disable Nagle algorithm.
+ * <p>
+ * No-delay features make sure packets are sent out on the network as soon as possible, and it's critical to achieve
+ * low latency publishes. On the other hand, sending out a huge number of small packets might limit the overall
+ * throughput, so if latency is not a concern, it's advisable to set the <code>useTcpNoDelay</code> flag to false.
+ * <p>
+ * Default value is true
+ *
+ * @param useTcpNoDelay
+ */
+ public void setUseTcpNoDelay(boolean useTcpNoDelay) {
+ confData.setUseTcpNoDelay(useTcpNoDelay);
+ }
+
+ /**
+ * @return whether TLS encryption is used on the connection
+ */
+ public boolean isUseTls() {
+ return confData.isUseTls();
+ }
+
+ /**
+ * Configure whether to use TLS encryption on the connection <i>(default: false)</i>
+ *
+ * @param useTls
+ */
+ public void setUseTls(boolean useTls) {
+ confData.setUseTls(useTls);
+ }
+
+ /**
+ * @return path to the trusted TLS certificate file
+ */
+ public String getTlsTrustCertsFilePath() {
+ return confData.getTlsTrustCertsFilePath();
+ }
+
+ /**
+ * Set the path to the trusted TLS certificate file
+ *
+ * @param tlsTrustCertsFilePath
+ */
+ public void setTlsTrustCertsFilePath(String tlsTrustCertsFilePath) {
+ confData.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
+ }
+
+ /**
+ * @return whether the Pulsar client accept untrusted TLS certificate from broker
+ */
+ public boolean isTlsAllowInsecureConnection() {
+ return confData.isTlsAllowInsecureConnection();
+ }
+
+ /**
+ * Configure whether the Pulsar client accept untrusted TLS certificate from broker <i>(default: false)</i>
+ *
+ * @param tlsAllowInsecureConnection
+ */
+ public void setTlsAllowInsecureConnection(boolean tlsAllowInsecureConnection) {
+ confData.setTlsAllowInsecureConnection(tlsAllowInsecureConnection);
+ }
+
+ /**
+ * Stats will be activated with positive statsIntervalSeconds
+ *
+ * @return the interval between each stat info <i>(default: 60 seconds)</i>
+ */
+ public long getStatsIntervalSeconds() {
+ return confData.getStatsIntervalSeconds();
+ }
+
+ /**
+ * Set the interval between each stat info <i>(default: 60 seconds)</i> Stats will be activated with positive
+ * statsIntervalSeconds It should be set to at least 1 second
+ *
+ * @param statsIntervalSeconds
+ * the interval between each stat info
+ * @param unit
+ * time unit for {@code statsInterval}
+ */
+ public void setStatsInterval(long statsInterval, TimeUnit unit) {
+ confData.setStatsIntervalSeconds(unit.toSeconds(statsInterval));
+ }
+
+ /**
+ * Get configured total allowed concurrent lookup-request.
+ *
+ * @return
+ */
+ public int getConcurrentLookupRequest() {
+ return confData.getConcurrentLookupRequest();
+ }
+
+ /**
+ * Number of concurrent lookup-requests allowed on each broker-connection to prevent overload on broker.
+ * <i>(default: 50000)</i> It should be configured with higher value only in case of it requires to
+ * produce/subscribe on thousands of topic using created {@link PulsarClient}
+ *
+ * @param concurrentLookupRequest
+ */
+ public void setConcurrentLookupRequest(int concurrentLookupRequest) {
+ confData.setConcurrentLookupRequest(concurrentLookupRequest);
+ }
+
+ /**
+ * Get configured max number of reject-request in a time-frame (30 seconds) after which connection will be closed
+ *
+ * @return
+ */
+ public int getMaxNumberOfRejectedRequestPerConnection() {
+ return confData.getMaxNumberOfRejectedRequestPerConnection();
+ }
+
+ /**
+ * Set max number of broker-rejected requests in a certain time-frame (30 seconds) after which current connection
+ * will be closed and client creates a new connection that give chance to connect a different broker <i>(default:
+ * 50)</i>
+ *
+ * @param maxNumberOfRejectedRequestPerConnection
+ */
+ public void setMaxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRequestPerConnection) {
+ confData.setMaxNumberOfRejectedRequestPerConnection(maxNumberOfRejectedRequestPerConnection);
+ }
+
+ public boolean isTlsHostnameVerificationEnable() {
+ return confData.isTlsHostnameVerificationEnable();
+ }
+
+ /**
+ * It allows to validate hostname verification when client connects to broker over tls. It validates incoming x509
+ * certificate and matches provided hostname(CN/SAN) with expected broker's host name. It follows RFC 2818, 3.1.
+ * Server Identity hostname verification.
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc2818">rfc2818</a>
+ *
+ * @param tlsHostnameVerificationEnable
+ */
+ public void setTlsHostnameVerificationEnable(boolean tlsHostnameVerificationEnable) {
+ confData.setTlsHostnameVerificationEnable(tlsHostnameVerificationEnable);
+ }
+
+ public ClientConfiguration setServiceUrl(String serviceUrl) {
+ confData.setServiceUrl(serviceUrl);
+ return this;
+ }
+
+ /**
+ * Set the duration of time to wait for a connection to a broker to be established. If the duration
+ * passes without a response from the broker, the connection attempt is dropped.
+ *
+ * @param duration the duration to wait
+ * @param unit the time unit in which the duration is defined
+ */
+ public void setConnectionTimeout(int duration, TimeUnit unit) {
+ confData.setConnectionTimeoutMs((int)unit.toMillis(duration));
+ }
+
+ /**
+ * Get the duration of time for which the client will wait for a connection to a broker to be
+ * established before giving up.
+ *
+ * @return the duration, in milliseconds
+ */
+ public long getConnectionTimeoutMs() {
+ return confData.getConnectionTimeoutMs();
+ }
+
+ public ClientConfigurationData getConfigurationData() {
+ return confData;
+ }
+
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Consumer.java
new file mode 100644
index 0000000..00da0a4
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An interface that abstracts behavior of Pulsar's consumer.
+ */
+public interface Consumer extends Closeable {
+
+ /**
+ * Get a topic for the consumer
+ *
+ * @return topic for the consumer
+ */
+ String getTopic();
+
+ /**
+ * Get a subscription for the consumer
+ *
+ * @return subscription for the consumer
+ */
+ String getSubscription();
+
+ /**
+ * Unsubscribe the consumer
+ * <p>
+ * This call blocks until the consumer is unsubscribed.
+ *
+ * @throws PulsarClientException
+ */
+ void unsubscribe() throws PulsarClientException;
+
+ /**
+ * Asynchronously unsubscribe the consumer
+ *
+ * @return {@link CompletableFuture} for this operation
+ */
+ CompletableFuture<Void> unsubscribeAsync();
+
+ /**
+ * Receives a single message.
+ * <p>
+ * This calls blocks until a message is available.
+ *
+ * @return the received message
+ * @throws PulsarClientException.AlreadyClosedException
+ * if the consumer was already closed
+ * @throws PulsarClientException.InvalidConfigurationException
+ * if a message listener was defined in the configuration
+ */
+ Message<byte[]> receive() throws PulsarClientException;
+
+ /**
+ * Receive a single message
+ * <p>
+ * Retrieves a message when it will be available and completes {@link CompletableFuture} with received message.
+ * </p>
+ * <p>
+ * {@code receiveAsync()} should be called subsequently once returned {@code CompletableFuture} gets complete with
+ * received message. Else it creates <i> backlog of receive requests </i> in the application.
+ * </p>
+ *
+ * @return {@link CompletableFuture}<{@link Message}> will be completed when message is available
+ */
+ CompletableFuture<Message<byte[]>> receiveAsync();
+
+ /**
+ * Receive a single message
+ * <p>
+ * Retrieves a message, waiting up to the specified wait time if necessary.
+ *
+ * @param timeout
+ * 0 or less means immediate rather than infinite
+ * @param unit
+ * @return the received {@link Message} or null if no message available before timeout
+ * @throws PulsarClientException.AlreadyClosedException
+ * if the consumer was already closed
+ * @throws PulsarClientException.InvalidConfigurationException
+ * if a message listener was defined in the configuration
+ */
+ Message<byte[]> receive(int timeout, TimeUnit unit) throws PulsarClientException;
+
+ /**
+ * Acknowledge the consumption of a single message
+ *
+ * @param message
+ * The {@code Message} to be acknowledged
+ * @throws PulsarClientException.AlreadyClosedException
+ * if the consumer was already closed
+ */
+ void acknowledge(Message<?> message) throws PulsarClientException;
+
+ /**
+ * Acknowledge the consumption of a single message, identified by its MessageId
+ *
+ * @param messageId
+ * The {@code MessageId} to be acknowledged
+ * @throws PulsarClientException.AlreadyClosedException
+ * if the consumer was already closed
+ */
+ void acknowledge(MessageId messageId) throws PulsarClientException;
+
+ /**
+ * Acknowledge the reception of all the messages in the stream up to (and including) the provided message.
+ *
+ * This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
+ * re-delivered to this consumer.
+ *
+ * Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
+ *
+ * It's equivalent to calling asyncAcknowledgeCumulative(Message) and waiting for the callback to be triggered.
+ *
+ * @param message
+ * The {@code Message} to be cumulatively acknowledged
+ * @throws PulsarClientException.AlreadyClosedException
+ * if the consumer was already closed
+ */
+ void acknowledgeCumulative(Message<?> message) throws PulsarClientException;
+
+ /**
+ * Acknowledge the reception of all the messages in the stream up to (and including) the provided message.
+ *
+ * This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
+ * re-delivered to this consumer.
+ *
+ * Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
+ *
+ * It's equivalent to calling asyncAcknowledgeCumulative(MessageId) and waiting for the callback to be triggered.
+ *
+ * @param messageId
+ * The {@code MessageId} to be cumulatively acknowledged
+ * @throws PulsarClientException.AlreadyClosedException
+ * if the consumer was already closed
+ */
+ void acknowledgeCumulative(MessageId messageId) throws PulsarClientException;
+
+ /**
+ * Asynchronously acknowledge the consumption of a single message
+ *
+ * @param message
+ * The {@code Message} to be acknowledged
+ * @return a future that can be used to track the completion of the operation
+ */
+ CompletableFuture<Void> acknowledgeAsync(Message<?> message);
+
+ /**
+ * Asynchronously acknowledge the consumption of a single message
+ *
+ * @param messageId
+ * The {@code MessageId} to be acknowledged
+ * @return a future that can be used to track the completion of the operation
+ */
+ CompletableFuture<Void> acknowledgeAsync(MessageId messageId);
+
+ /**
+ * Asynchronously Acknowledge the reception of all the messages in the stream up to (and including) the provided
+ * message.
+ *
+ * Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
+ *
+ * @param message
+ * The {@code Message} to be cumulatively acknowledged
+ * @return a future that can be used to track the completion of the operation
+ */
+ CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message);
+
+ /**
+ * Asynchronously Acknowledge the reception of all the messages in the stream up to (and including) the provided
+ * message.
+ *
+ * Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
+ *
+ * @param messageId
+ * The {@code MessageId} to be cumulatively acknowledged
+ * @return a future that can be used to track the completion of the operation
+ */
+ CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId);
+
+ /**
+ * Get statistics for the consumer.
+ *
+ * <ul>
+ * <li>numMsgsReceived : Number of messages received in the current interval
+ * <li>numBytesReceived : Number of bytes received in the current interval
+ * <li>numReceiveFailed : Number of messages failed to receive in the current interval
+ * <li>numAcksSent : Number of acks sent in the current interval
+ * <li>numAcksFailed : Number of acks failed to send in the current interval
+ * <li>totalMsgsReceived : Total number of messages received
+ * <li>totalBytesReceived : Total number of bytes received
+ * <li>totalReceiveFailed : Total number of messages failed to receive
+ * <li>totalAcksSent : Total number of acks sent
+ * <li>totalAcksFailed : Total number of acks failed to sent
+ * </ul>
+ *
+ * @return statistic for the consumer
+ */
+ ConsumerStats getStats();
+
+ /**
+ * Close the consumer and stop the broker to push more messages.
+ */
+ @Override
+ void close() throws PulsarClientException;
+
+ /**
+ * Asynchronously close the consumer and stop the broker to push more messages
+ *
+ * @return a future that can be used to track the completion of the operation
+ */
+ CompletableFuture<Void> closeAsync();
+
+ /**
+ * Return true if the topic was terminated and this consumer has already consumed all the messages in the topic.
+ *
+ * Please note that this does not simply mean that the consumer is caught up with the last message published by
+ * producers, rather the topic needs to be explicitly "terminated".
+ */
+ boolean hasReachedEndOfTopic();
+
+ /**
+ * Redelivers all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not
+ * active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all
+ * the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
+ * breaks, the messages are redelivered after reconnect.
+ */
+ void redeliverUnacknowledgedMessages();
+
+ /**
+ * Reset the subscription associated with this consumer to a specific message id.
+ * <p>
+ *
+ * The message id can either be a specific message or represent the first or last messages in the topic.
+ * <p>
+ * <ul>
+ * <li><code>MessageId.earliest</code> : Reset the subscription on the earliest message available in the topic
+ * <li><code>MessageId.latest</code> : Reset the subscription on the latest message in the topic
+ * </ul>
+ *
+ * Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on
+ * the individual partitions.
+ *
+ * @param messageId
+ * the message id where to reposition the subscription
+ */
+ void seek(MessageId messageId) throws PulsarClientException;
+
+ /**
+ * Reset the subscription associated with this consumer to a specific message id.
+ * <p>
+ *
+ * The message id can either be a specific message or represent the first or last messages in the topic.
+ * <p>
+ * <ul>
+ * <li><code>MessageId.earliest</code> : Reset the subscription on the earliest message available in the topic
+ * <li><code>MessageId.latest</code> : Reset the subscription on the latest message in the topic
+ * </ul>
+ *
+ * Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on
+ * the individual partitions.
+ *
+ * @param messageId
+ * the message id where to reposition the subscription
+ * @return a future to track the completion of the seek operation
+ */
+ CompletableFuture<Void> seekAsync(MessageId messageId);
+
+ /**
+ * @return Whether the consumer is connected to the broker
+ */
+ boolean isConnected();
+
+ /**
+ * Get the name of consumer.
+ * @return consumer name.
+ */
+ String getConsumerName();
+
+ /**
+ * Stop requesting new messages from the broker until {@link #resume()} is called. Note that this might cause
+ * {@link #receive()} to block until {@link #resume()} is called and new messages are pushed by the broker.
+ */
+ void pause();
+
+ /**
+ * Resume requesting messages from the broker.
+ */
+ void resume();
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
new file mode 100644
index 0000000..5ffba38
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+/**
+ * Class specifying the configuration of a consumer. In Exclusive subscription, only a single consumer is allowed to
+ * attach to the subscription. Other consumers will get an error message. In Shared subscription, multiple consumers
+ * will be able to use the same subscription name and the messages will be dispatched in a round robin fashion.
+ *
+ * @deprecated Use {@link PulsarClient#newConsumer} to build and configure a {@link Consumer} instance
+ */
+@Deprecated
+public class ConsumerConfiguration implements Serializable {
+
+ /**
+ * Resend shouldn't be requested before minAckTimeoutMillis.
+ */
+ static long minAckTimeoutMillis = 1000;
+
+ private static final long serialVersionUID = 1L;
+
+ private final ConsumerConfigurationData<byte[]> conf = new ConsumerConfigurationData<>();
+
+ private boolean initializeSubscriptionOnLatest = true;
+
+ public ConsumerConfiguration() {
+ // Disable acknowledgment grouping when using v1 API
+ conf.setAcknowledgementsGroupTimeMicros(0);
+ }
+
+ /**
+ * @return the configured timeout in milliseconds for unacked messages.
+ */
+ public long getAckTimeoutMillis() {
+ return conf.getAckTimeoutMillis();
+ }
+
+ /**
+ * Set the timeout for unacked messages, truncated to the nearest millisecond. The timeout needs to be greater than
+ * 10 seconds.
+ *
+ * @param ackTimeout
+ * for unacked messages.
+ * @param timeUnit
+ * unit in which the timeout is provided.
+ * @return {@link ConsumerConfiguration}
+ */
+ public ConsumerConfiguration setAckTimeout(long ackTimeout, TimeUnit timeUnit) {
+ long ackTimeoutMillis = timeUnit.toMillis(ackTimeout);
+ checkArgument(ackTimeoutMillis >= minAckTimeoutMillis,
+ "Ack timeout should be should be greater than " + minAckTimeoutMillis + " ms");
+ conf.setAckTimeoutMillis(timeUnit.toMillis(ackTimeout));
+ return this;
+ }
+
+ /**
+ * @return the configured subscription type
+ */
+ public SubscriptionType getSubscriptionType() {
+ return conf.getSubscriptionType();
+ }
+
+ /**
+ * Select the subscription type to be used when subscribing to the topic.
+ * <p>
+ * Default is {@link SubscriptionType#Exclusive}
+ *
+ * @param subscriptionType
+ * the subscription type value
+ */
+ public ConsumerConfiguration setSubscriptionType(SubscriptionType subscriptionType) {
+ checkNotNull(subscriptionType);
+ conf.setSubscriptionType(subscriptionType);
+ return this;
+ }
+
+ /**
+ * @return the configured {@link MessageListener} for the consumer
+ */
+ public MessageListener<byte[]> getMessageListener() {
+ return conf.getMessageListener();
+ }
+
+ /**
+ * Sets a {@link MessageListener} for the consumer
+ * <p>
+ * When a {@link MessageListener} is set, application will receive messages through it. Calls to
+ * {@link Consumer#receive()} will not be allowed.
+ *
+ * @param messageListener
+ * the listener object
+ */
+ public ConsumerConfiguration setMessageListener(MessageListener<byte[]> messageListener) {
+ checkNotNull(messageListener);
+ conf.setMessageListener(messageListener);
+ return this;
+ }
+
+ /**
+ * @return this configured {@link ConsumerEventListener} for the consumer.
+ * @see #setConsumerEventListener(ConsumerEventListener)
+ * @since 2.0
+ */
+ public ConsumerEventListener getConsumerEventListener() {
+ return conf.getConsumerEventListener();
+ }
+
+ /**
+ * Sets a {@link ConsumerEventListener} for the consumer.
+ *
+ * <p>
+ * The consumer group listener is used for receiving consumer state change in a consumer group for failover
+ * subscription. Application can then react to the consumer state changes.
+ *
+ * <p>
+ * This change is experimental. It is subject to changes coming in release 2.0.
+ *
+ * @param listener
+ * the consumer group listener object
+ * @return consumer configuration
+ * @since 2.0
+ */
+ public ConsumerConfiguration setConsumerEventListener(ConsumerEventListener listener) {
+ checkNotNull(listener);
+ conf.setConsumerEventListener(listener);
+ return this;
+ }
+
+ /**
+ * @return the configure receiver queue size value
+ */
+ public int getReceiverQueueSize() {
+ return conf.getReceiverQueueSize();
+ }
+
+ /**
+ * @return the configured max total receiver queue size across partitions
+ */
+ public int getMaxTotalReceiverQueueSizeAcrossPartitions() {
+ return conf.getMaxTotalReceiverQueueSizeAcrossPartitions();
+ }
+
+ /**
+ * Set the max total receiver queue size across partitons.
+ * <p>
+ * This setting will be used to reduce the receiver queue size for individual partitions
+ * {@link #setReceiverQueueSize(int)} if the total exceeds this value (default: 50000).
+ *
+ * @param maxTotalReceiverQueueSizeAcrossPartitions
+ */
+ public void setMaxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions) {
+ checkArgument(maxTotalReceiverQueueSizeAcrossPartitions >= conf.getReceiverQueueSize());
+ conf.setMaxTotalReceiverQueueSizeAcrossPartitions(maxTotalReceiverQueueSizeAcrossPartitions);
+ }
+
+ /**
+ * @return the CryptoKeyReader
+ */
+ public CryptoKeyReader getCryptoKeyReader() {
+ return conf.getCryptoKeyReader();
+ }
+
+ /**
+ * Sets a {@link CryptoKeyReader}
+ *
+ * @param cryptoKeyReader
+ * CryptoKeyReader object
+ */
+ public ConsumerConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
+ checkNotNull(cryptoKeyReader);
+ conf.setCryptoKeyReader(cryptoKeyReader);
+ return this;
+ }
+
+ /**
+ * Sets the ConsumerCryptoFailureAction to the value specified
+ *
+ * @param action
+ * consumer action
+ */
+ public void setCryptoFailureAction(ConsumerCryptoFailureAction action) {
+ conf.setCryptoFailureAction(action);
+ }
+
+ /**
+ * @return The ConsumerCryptoFailureAction
+ */
+ public ConsumerCryptoFailureAction getCryptoFailureAction() {
+ return conf.getCryptoFailureAction();
+ }
+
+ /**
+ * Sets the size of the consumer receive queue.
+ * <p>
+ * The consumer receive queue controls how many messages can be accumulated by the {@link Consumer} before the
+ * application calls {@link Consumer#receive()}. Using a higher value could potentially increase the consumer
+ * throughput at the expense of bigger memory utilization.
+ * </p>
+ * <p>
+ * <b>Setting the consumer queue size as zero</b>
+ * <ul>
+ * <li>Decreases the throughput of the consumer, by disabling pre-fetching of messages. This approach improves the
+ * message distribution on shared subscription, by pushing messages only to the consumers that are ready to process
+ * them. Neither {@link Consumer#receive(int, TimeUnit)} nor Partitioned Topics can be used if the consumer queue
+ * size is zero. {@link Consumer#receive()} function call should not be interrupted when the consumer queue size is
+ * zero.</li>
+ * <li>Doesn't support Batch-Message: if consumer receives any batch-message then it closes consumer connection with
+ * broker and {@link Consumer#receive()} call will remain blocked while {@link Consumer#receiveAsync()} receives
+ * exception in callback. <b> consumer will not be able receive any further message unless batch-message in pipeline
+ * is removed</b></li>
+ * </ul>
+ * </p>
+ * Default value is {@code 1000} messages and should be good for most use cases.
+ *
+ * @param receiverQueueSize
+ * the new receiver queue size value
+ */
+ public ConsumerConfiguration setReceiverQueueSize(int receiverQueueSize) {
+ checkArgument(receiverQueueSize >= 0, "Receiver queue size cannot be negative");
+ conf.setReceiverQueueSize(receiverQueueSize);
+ return this;
+ }
+
+ /**
+ * @return the consumer name
+ */
+ public String getConsumerName() {
+ return conf.getConsumerName();
+ }
+
+ /**
+ * Set the consumer name.
+ *
+ * @param consumerName
+ */
+ public ConsumerConfiguration setConsumerName(String consumerName) {
+ checkArgument(consumerName != null && !consumerName.equals(""));
+ conf.setConsumerName(consumerName);
+ return this;
+ }
+
+ public int getPriorityLevel() {
+ return conf.getPriorityLevel();
+ }
+
+ /**
+ * Sets priority level for the shared subscription consumers to which broker gives more priority while dispatching
+ * messages. Here, broker follows descending priorities. (eg: 0=max-priority, 1, 2,..) </br>
+ * In Shared subscription mode, broker will first dispatch messages to max priority-level consumers if they have
+ * permits, else broker will consider next priority level consumers. </br>
+ * If subscription has consumer-A with priorityLevel 0 and Consumer-B with priorityLevel 1 then broker will dispatch
+ * messages to only consumer-A until it runs out permit and then broker starts dispatching messages to Consumer-B.
+ *
+ * <pre>
+ * Consumer PriorityLevel Permits
+ * C1 0 2
+ * C2 0 1
+ * C3 0 1
+ * C4 1 2
+ * C5 1 1
+ * Order in which broker dispatches messages to consumers: C1, C2, C3, C1, C4, C5, C4
+ * </pre>
+ *
+ * @param priorityLevel
+ */
+ public void setPriorityLevel(int priorityLevel) {
+ conf.setPriorityLevel(priorityLevel);
+ }
+
+ public boolean getReadCompacted() {
+ return conf.isReadCompacted();
+ }
+
+ /**
+ * If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog
+ * of the topic. This means that, if the topic has been compacted, the consumer will only see the latest value for
+ * each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
+ * point, the messages will be sent as normal.
+ *
+ * readCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e.
+ * failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a
+ * shared subscription, will lead to the subscription call throwing a PulsarClientException.
+ *
+ * @param readCompacted
+ * whether to read from the compacted topic
+ */
+ public ConsumerConfiguration setReadCompacted(boolean readCompacted) {
+ conf.setReadCompacted(readCompacted);
+ return this;
+ }
+
+ /**
+ * Set a name/value property with this consumer.
+ *
+ * @param key
+ * @param value
+ * @return
+ */
+ public ConsumerConfiguration setProperty(String key, String value) {
+ checkArgument(key != null);
+ checkArgument(value != null);
+ conf.getProperties().put(key, value);
+ return this;
+ }
+
+ /**
+ * Add all the properties in the provided map
+ *
+ * @param properties
+ * @return
+ */
+ public ConsumerConfiguration setProperties(Map<String, String> properties) {
+ conf.getProperties().putAll(properties);
+ return this;
+ }
+
+ public Map<String, String> getProperties() {
+ return conf.getProperties();
+ }
+
+ public ConsumerConfigurationData<byte[]> getConfigurationData() {
+ return conf;
+ }
+
+ /**
+ * @param subscriptionInitialPosition the initial position at which to set
+ * set cursor when subscribing to the topic first time
+ * Default is {@value InitialPosition.Latest}
+ */
+ public ConsumerConfiguration setSubscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition) {
+ conf.setSubscriptionInitialPosition(subscriptionInitialPosition);
+ return this;
+ }
+
+ /**
+ * @return the configured {@link subscriptionInitailPosition} for the consumer
+ */
+ public SubscriptionInitialPosition getSubscriptionInitialPosition(){
+ return conf.getSubscriptionInitialPosition();
+ }
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java
new file mode 100644
index 0000000..4a9b99f
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pulsar.client.impl.MessageBuilderImpl;
+
+/**
+ * Message builder factory. Use this class to create messages to be send to the Pulsar producer
+ *
+ * @deprecated since 2.0. Use {@link TypedMessageBuilder} as returned by {@link Producer#newMessage()} to create a new
+ * message builder.
+ */
+@Deprecated
+public interface MessageBuilder {
+
+ static MessageBuilder create() {
+ return new MessageBuilderImpl();
+ }
+
+ /**
+ * Finalize the immutable message
+ *
+ * @return a {@link Message} ready to be sent through a {@link Producer}
+ */
+ Message<byte[]> build();
+
+ /**
+ * Set the content of the message
+ *
+ * @param data
+ * array containing the payload
+ */
+ MessageBuilder setContent(byte[] data);
+
+ /**
+ * Set the content of the message
+ *
+ * @param data
+ * array containing the payload
+ * @param offset
+ * offset into the data array
+ * @param length
+ * length of the payload starting from the above offset
+ */
+ MessageBuilder setContent(byte[] data, int offset, int length);
+
+ /**
+ * Set the content of the message
+ *
+ * @param buf
+ * a {@link ByteBuffer} with the payload of the message
+ */
+ MessageBuilder setContent(ByteBuffer buf);
+
+ /**
+ * Sets a new property on a message.
+ *
+ * @param name
+ * the name of the property
+ * @param value
+ * the associated value
+ */
+ MessageBuilder setProperty(String name, String value);
+
+ /**
+ * Add all the properties in the provided map
+ */
+ MessageBuilder setProperties(Map<String, String> properties);
+
+ /**
+ * Sets the key of the message for routing policy
+ *
+ * @param key
+ */
+ MessageBuilder setKey(String key);
+
+ /**
+ * Set the event time for a given message.
+ *
+ * <p>
+ * Applications can retrieve the event time by calling {@link Message#getEventTime()}.
+ *
+ * <p>
+ * Note: currently pulsar doesn't support event-time based index. so the subscribers can't seek the messages by
+ * event time.
+ *
+ * @since 1.20.0
+ */
+ MessageBuilder setEventTime(long timestamp);
+
+ /**
+ * Specify a custom sequence id for the message being published.
+ * <p>
+ * The sequence id can be used for deduplication purposes and it needs to follow these rules:
+ * <ol>
+ * <li><code>sequenceId >= 0</code>
+ * <li>Sequence id for a message needs to be greater than sequence id for earlier messages:
+ * <code>sequenceId(N+1) > sequenceId(N)</code>
+ * <li>It's not necessary for sequence ids to be consecutive. There can be holes between messages. Eg. the
+ * <code>sequenceId</code> could represent an offset or a cumulative size.
+ * </ol>
+ *
+ * @param sequenceId
+ * the sequence id to assign to the current message
+ * @since 1.20.0
+ */
+ MessageBuilder setSequenceId(long sequenceId);
+
+ /**
+ * Override the replication clusters for this message.
+ *
+ * @param clusters
+ */
+ MessageBuilder setReplicationClusters(List<String> clusters);
+
+ /**
+ * Disable replication for this message.
+ */
+ MessageBuilder disableReplication();
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Producer.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Producer.java
new file mode 100644
index 0000000..4d69668
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Producer.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Producer object.
+ *
+ * The producer is used to publish messages on a topic
+ *
+ *
+ */
+public interface Producer extends Closeable {
+
+ /**
+ * @return the topic which producer is publishing to
+ */
+ String getTopic();
+
+ /**
+ * @return the producer name which could have been assigned by the system or specified by the client
+ */
+ String getProducerName();
+
+ /**
+ * Sends a message.
+ * <p>
+ * This call will be blocking until is successfully acknowledged by the Pulsar broker.
+ * <p>
+ * Use {@link #newMessage()} to specify more properties than just the value on the message to be sent.
+ *
+ * @param message
+ * a message
+ * @return the message id assigned to the published message
+ * @throws PulsarClientException.TimeoutException
+ * if the message was not correctly received by the system within the timeout period
+ * @throws PulsarClientException.AlreadyClosedException
+ * if the producer was already closed
+ */
+ MessageId send(byte[] message) throws PulsarClientException;
+
+ /**
+ * Send a message asynchronously
+ * <p>
+ * When the producer queue is full, by default this method will complete the future with an exception
+ * {@link PulsarClientException.ProducerQueueIsFullError}
+ * <p>
+ * See {@link ProducerBuilder#maxPendingMessages(int)} to configure the producer queue size and
+ * {@link ProducerBuilder#blockIfQueueFull(boolean)} to change the blocking behavior.
+ * <p>
+ * Use {@link #newMessage()} to specify more properties than just the value on the message to be sent.
+ *
+ * @param message
+ * a byte array with the payload of the message
+ * @return a future that can be used to track when the message will have been safely persisted
+ */
+ CompletableFuture<MessageId> sendAsync(byte[] message);
+
+ /**
+ * Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
+ *
+ * @throws PulsarClientException
+ * @since 2.1.0
+ * @see #flushAsync()
+ */
+ void flush() throws PulsarClientException;
+
+ /**
+ * Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
+ *
+ * @return a future that can be used to track when all the messages have been safely persisted.
+ * @since 2.1.0
+ * @see #flush()
+ */
+ CompletableFuture<Void> flushAsync();
+
+ /**
+ * Send a message
+ *
+ * @param message
+ * a message
+ * @return the message id assigned to the published message
+ * @throws PulsarClientException.TimeoutException
+ * if the message was not correctly received by the system within the timeout period
+ *
+ * @deprecated since 2.0. Use {@link TypedMessageBuilder} as returned by {@link Producer#newMessage()} to create a
+ * new message builder.
+ */
+ @Deprecated
+ MessageId send(Message<byte[]> message) throws PulsarClientException;
+
+ /**
+ * Send a message asynchronously
+ * <p>
+ * When the returned {@link CompletableFuture} is marked as completed successfully, the provided message will
+ * contain the {@link MessageId} assigned by the broker to the published message.
+ * <p>
+ * Example:
+ *
+ * <pre>
+ * <code>Message msg = MessageBuilder.create().setContent(myContent).build();
+ * producer.sendAsync(msg).thenRun(v -> {
+ * System.out.println("Published message: " + msg.getMessageId());
+ * }).exceptionally(e -> {
+ * // Failed to publish
+ * });</code>
+ * </pre>
+ * <p>
+ * When the producer queue is full, by default this method will complete the future with an exception
+ * {@link PulsarClientException.ProducerQueueIsFullError}
+ * <p>
+ * See {@link ProducerBuilder#maxPendingMessages(int)} to configure the producer queue size and
+ * {@link ProducerBuilder#blockIfQueueFull(boolean)} to change the blocking behavior.
+ *
+ * @param message
+ * a message
+ * @return a future that can be used to track when the message will have been safely persisted
+ * @deprecated since 2.0. Use {@link TypedMessageBuilder} as returned by {@link Producer#newMessage()} to create a
+ * new message builder.
+ */
+ @Deprecated
+ CompletableFuture<MessageId> sendAsync(Message<byte[]> message);
+
+ /**
+ * Get the last sequence id that was published by this producer.
+ * <p>
+ * This represent either the automatically assigned or custom sequence id (set on the {@link MessageBuilder}) that
+ * was published and acknowledged by the broker.
+ * <p>
+ * After recreating a producer with the same producer name, this will return the last message that was published in
+ * the previous producer session, or -1 if there no message was ever published.
+ *
+ * @return the last sequence id published by this producer
+ */
+ long getLastSequenceId();
+
+ /**
+ * Get statistics for the producer
+ *
+ * <ul>
+ * <li>numMsgsSent : Number of messages sent in the current interval
+ * <li>numBytesSent : Number of bytes sent in the current interval
+ * <li>numSendFailed : Number of messages failed to send in the current interval
+ * <li>numAcksReceived : Number of acks received in the current interval
+ * <li>totalMsgsSent : Total number of messages sent
+ * <li>totalBytesSent : Total number of bytes sent
+ * <li>totalSendFailed : Total number of messages failed to send
+ * <li>totalAcksReceived: Total number of acks received
+ * </ul>
+ *
+ * @return statistic for the producer or null if ProducerStatsRecorderImpl is disabled.
+ */
+ ProducerStats getStats();
+
+ /**
+ * Close the producer and releases resources allocated.
+ *
+ * No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
+ * of errors, pending writes will not be retried.
+ *
+ * @throws PulsarClientException.AlreadyClosedException
+ * if the producer was already closed
+ */
+ @Override
+ void close() throws PulsarClientException;
+
+ /**
+ * Close the producer and releases resources allocated.
+ *
+ * No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
+ * of errors, pending writes will not be retried.
+ *
+ * @return a future that can used to track when the producer has been closed
+ */
+ CompletableFuture<Void> closeAsync();
+
+ /**
+ * @return Whether the producer is connected to the broker
+ */
+ boolean isConnected();
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
new file mode 100644
index 0000000..1af077b
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+
+import lombok.EqualsAndHashCode;
+
+/**
+ * Producer's configuration
+ *
+ * @deprecated use {@link PulsarClient#newProducer()} to construct and configure a {@link Producer} instance
+ */
+@Deprecated
+@EqualsAndHashCode
+public class ProducerConfiguration implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ProducerConfigurationData conf = new ProducerConfigurationData();
+
+ @Deprecated
+ public enum MessageRoutingMode {
+ SinglePartition, RoundRobinPartition, CustomPartition
+ }
+
+ @Deprecated
+ public enum HashingScheme {
+ JavaStringHash, Murmur3_32Hash
+ }
+
+ /**
+ * @return the configured custom producer name or null if no custom name was specified
+ * @since 1.20.0
+ */
+ public String getProducerName() {
+ return conf.getProducerName();
+ }
+
+ /**
+ * Specify a name for the producer
+ * <p>
+ * If not assigned, the system will generate a globally unique name which can be access with
+ * {@link Producer#getProducerName()}.
+ * <p>
+ * When specifying a name, it is app to the user to ensure that, for a given topic, the producer name is unique
+ * across all Pulsar's clusters.
+ * <p>
+ * If a producer with the same name is already connected to a particular topic, the
+ * {@link PulsarClient#createProducer(String)} operation will fail with {@link ProducerBusyException}.
+ *
+ * @param producerName
+ * the custom name to use for the producer
+ * @since 1.20.0
+ */
+ public void setProducerName(String producerName) {
+ conf.setProducerName(producerName);
+ }
+
+ /**
+ * @return the message send timeout in ms
+ */
+ public long getSendTimeoutMs() {
+ return conf.getSendTimeoutMs();
+ }
+
+ /**
+ * Set the send timeout <i>(default: 30 seconds)</i>
+ * <p>
+ * If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.
+ *
+ * @param sendTimeout
+ * the send timeout
+ * @param unit
+ * the time unit of the {@code sendTimeout}
+ */
+ public ProducerConfiguration setSendTimeout(int sendTimeout, TimeUnit unit) {
+ checkArgument(sendTimeout >= 0);
+ conf.setSendTimeoutMs(unit.toMillis(sendTimeout));
+ return this;
+ }
+
+ /**
+ * @return the maximum number of messages allowed in the outstanding messages queue for the producer
+ */
+ public int getMaxPendingMessages() {
+ return conf.getMaxPendingMessages();
+ }
+
+ /**
+ * Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
+ * <p>
+ * When the queue is full, by default, all calls to {@link Producer#send} and {@link Producer#sendAsync} will fail
+ * unless blockIfQueueFull is set to true. Use {@link #setBlockIfQueueFull} to change the blocking behavior.
+ *
+ * @param maxPendingMessages
+ * @return
+ */
+ public ProducerConfiguration setMaxPendingMessages(int maxPendingMessages) {
+ checkArgument(maxPendingMessages > 0);
+ conf.setMaxPendingMessages(maxPendingMessages);
+ return this;
+ }
+
+ public HashingScheme getHashingScheme() {
+ return HashingScheme.valueOf(conf.getHashingScheme().toString());
+ }
+
+ public ProducerConfiguration setHashingScheme(HashingScheme hashingScheme) {
+ conf.setHashingScheme(org.apache.pulsar.client.api.HashingScheme.valueOf(hashingScheme.toString()));
+ return this;
+ }
+
+ /**
+ *
+ * @return the maximum number of pending messages allowed across all the partitions
+ */
+ public int getMaxPendingMessagesAcrossPartitions() {
+ return conf.getMaxPendingMessagesAcrossPartitions();
+ }
+
+ /**
+ * Set the number of max pending messages across all the partitions
+ * <p>
+ * This setting will be used to lower the max pending messages for each partition
+ * ({@link #setMaxPendingMessages(int)}), if the total exceeds the configured value.
+ *
+ * @param maxPendingMessagesAcrossPartitions
+ */
+ public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
+ checkArgument(maxPendingMessagesAcrossPartitions >= conf.getMaxPendingMessages());
+ conf.setMaxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions);
+ }
+
+ /**
+ *
+ * @return whether the producer will block {@link Producer#send} and {@link Producer#sendAsync} operations when the
+ * pending queue is full
+ */
+ public boolean getBlockIfQueueFull() {
+ return conf.isBlockIfQueueFull();
+ }
+
+ /**
+ * Set whether the {@link Producer#send} and {@link Producer#sendAsync} operations should block when the outgoing
+ * message queue is full.
+ * <p>
+ * Default is <code>false</code>. If set to <code>false</code>, send operations will immediately fail with
+ * {@link PulsarClientException.ProducerQueueIsFullError} when there is no space left in pending queue.
+ *
+ * @param blockIfQueueFull
+ * whether to block {@link Producer#send} and {@link Producer#sendAsync} operations on queue full
+ * @return
+ */
+ public ProducerConfiguration setBlockIfQueueFull(boolean blockIfQueueFull) {
+ conf.setBlockIfQueueFull(blockIfQueueFull);
+ return this;
+ }
+
+ /**
+ * Set the message routing mode for the partitioned producer.
+ *
+ * @param messageRouteMode message routing mode.
+ * @return producer configuration
+ * @see MessageRoutingMode
+ */
+ public ProducerConfiguration setMessageRoutingMode(MessageRoutingMode messageRouteMode) {
+ checkNotNull(messageRouteMode);
+ conf.setMessageRoutingMode(
+ org.apache.pulsar.client.api.MessageRoutingMode.valueOf(messageRouteMode.toString()));
+ return this;
+ }
+
+ /**
+ * Get the message routing mode for the partitioned producer.
+ *
+ * @return message routing mode, default is round-robin routing.
+ * @see MessageRoutingMode#RoundRobinPartition
+ */
+ public MessageRoutingMode getMessageRoutingMode() {
+ return MessageRoutingMode.valueOf(conf.getMessageRoutingMode().toString());
+ }
+
+ /**
+ * Set the compression type for the producer.
+ * <p>
+ * By default, message payloads are not compressed. Supported compression types are:
+ * <ul>
+ * <li><code>CompressionType.LZ4</code></li>
+ * <li><code>CompressionType.ZLIB</code></li>
+ * </ul>
+ *
+ * @param compressionType
+ * @return
+ *
+ * @since 1.0.28 <br>
+ * Make sure all the consumer applications have been updated to use this client version, before starting to
+ * compress messages.
+ */
+ public ProducerConfiguration setCompressionType(CompressionType compressionType) {
+ conf.setCompressionType(compressionType);
+ return this;
+ }
+
+ /**
+ * @return the configured compression type for this producer
+ */
+ public CompressionType getCompressionType() {
+ return conf.getCompressionType();
+ }
+
+ /**
+ * Set a custom message routing policy by passing an implementation of MessageRouter
+ *
+ *
+ * @param messageRouter
+ */
+ public ProducerConfiguration setMessageRouter(MessageRouter messageRouter) {
+ checkNotNull(messageRouter);
+ setMessageRoutingMode(MessageRoutingMode.CustomPartition);
+ conf.setCustomMessageRouter(messageRouter);
+ return this;
+ }
+
+ /**
+ * Get the message router set by {@link #setMessageRouter(MessageRouter)}.
+ *
+ * @return message router.
+ * @deprecated since 1.22.0-incubating. <tt>numPartitions</tt> is already passed as parameter in
+ * {@link MessageRouter#choosePartition(Message, TopicMetadata)}.
+ * @see MessageRouter
+ */
+ @Deprecated
+ public MessageRouter getMessageRouter(int numPartitions) {
+ return conf.getCustomMessageRouter();
+ }
+
+ /**
+ * Get the message router set by {@link #setMessageRouter(MessageRouter)}.
+ *
+ * @return message router set by {@link #setMessageRouter(MessageRouter)}.
+ */
+ public MessageRouter getMessageRouter() {
+ return conf.getCustomMessageRouter();
+ }
+
+ /**
+ * Return the flag whether automatic message batching is enabled or not.
+ *
+ * @return true if batch messages are enabled. otherwise false.
+ * @since 2.0.0 <br>
+ * It is enabled by default.
+ */
+ public boolean getBatchingEnabled() {
+ return conf.isBatchingEnabled();
+ }
+
+ /**
+ * Control whether automatic batching of messages is enabled for the producer. <i>default: false [No batching]</i>
+ *
+ * When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the
+ * broker, leading to better throughput, especially when publishing small messages. If compression is enabled,
+ * messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or
+ * contents.
+ *
+ * When enabled default batch delay is set to 10 ms and default batch size is 1000 messages
+ *
+ * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit)
+ * @since 1.0.36 <br>
+ * Make sure all the consumer applications have been updated to use this client version, before starting to
+ * batch messages.
+ *
+ */
+ public ProducerConfiguration setBatchingEnabled(boolean batchMessagesEnabled) {
+ conf.setBatchingEnabled(batchMessagesEnabled);
+ return this;
+ }
+
+ /**
+ * @return the CryptoKeyReader
+ */
+ public CryptoKeyReader getCryptoKeyReader() {
+ return conf.getCryptoKeyReader();
+ }
+
+ /**
+ * Sets a {@link CryptoKeyReader}
+ *
+ * @param cryptoKeyReader
+ * CryptoKeyReader object
+ */
+ public ProducerConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
+ checkNotNull(cryptoKeyReader);
+ conf.setCryptoKeyReader(cryptoKeyReader);
+ return this;
+ }
+
+ /**
+ *
+ * @return encryptionKeys
+ *
+ */
+ public Set<String> getEncryptionKeys() {
+ return conf.getEncryptionKeys();
+ }
+
+ /**
+ *
+ * Returns true if encryption keys are added
+ *
+ */
+ public boolean isEncryptionEnabled() {
+ return conf.isEncryptionEnabled();
+ }
+
+ /**
+ * Add public encryption key, used by producer to encrypt the data key.
+ *
+ * At the time of producer creation, Pulsar client checks if there are keys added to encryptionKeys. If keys are
+ * found, a callback getKey(String keyName) is invoked against each key to load the values of the key. Application
+ * should implement this callback to return the key in pkcs8 format. If compression is enabled, message is encrypted
+ * after compression. If batch messaging is enabled, the batched message is encrypted.
+ *
+ */
+ public void addEncryptionKey(String key) {
+ conf.getEncryptionKeys().add(key);
+ }
+
+ public void removeEncryptionKey(String key) {
+ conf.getEncryptionKeys().remove(key);
+ }
+
+ /**
+ * Sets the ProducerCryptoFailureAction to the value specified
+ *
+ * @param action
+ * The producer action
+ */
+ public void setCryptoFailureAction(ProducerCryptoFailureAction action) {
+ conf.setCryptoFailureAction(action);
+ }
+
+ /**
+ * @return The ProducerCryptoFailureAction
+ */
+ public ProducerCryptoFailureAction getCryptoFailureAction() {
+ return conf.getCryptoFailureAction();
+ }
+
+ /**
+ *
+ * @return the batch time period in ms.
+ * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit)
+ */
+ public long getBatchingMaxPublishDelayMs() {
+ return TimeUnit.MICROSECONDS.toMillis(conf.getBatchingMaxPublishDelayMicros());
+ }
+
+ /**
+ * Set the time period within which the messages sent will be batched <i>default: 10ms</i> if batch messages are
+ * enabled. If set to a non zero value, messages will be queued until this time interval or until
+ *
+ * @see ProducerConfiguration#batchingMaxMessages threshold is reached; all messages will be published as a single
+ * batch message. The consumer will be delivered individual messages in the batch in the same order they were
+ * enqueued
+ * @since 1.0.36 <br>
+ * Make sure all the consumer applications have been updated to use this client version, before starting to
+ * batch messages.
+ * @param batchDelay
+ * the batch delay
+ * @param timeUnit
+ * the time unit of the {@code batchDelay}
+ * @return
+ */
+ public ProducerConfiguration setBatchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit) {
+ long delayInMs = timeUnit.toMillis(batchDelay);
+ checkArgument(delayInMs >= 1, "configured value for batch delay must be at least 1ms");
+ conf.setBatchingMaxPublishDelayMicros(timeUnit.toMicros(batchDelay));
+ return this;
+ }
+
+ /**
+ *
+ * @return the maximum number of messages permitted in a batch.
+ */
+ public int getBatchingMaxMessages() {
+ return conf.getBatchingMaxMessages();
+ }
+
+ /**
+ * Set the maximum number of messages permitted in a batch. <i>default: 1000</i> If set to a value greater than 1,
+ * messages will be queued until this threshold is reached or batch interval has elapsed
+ *
+ * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit) All messages in batch will be published as
+ * a single batch message. The consumer will be delivered individual messages in the batch in the same order
+ * they were enqueued
+ * @param batchMessagesMaxMessagesPerBatch
+ * maximum number of messages in a batch
+ * @return
+ */
+ public ProducerConfiguration setBatchingMaxMessages(int batchMessagesMaxMessagesPerBatch) {
+ checkArgument(batchMessagesMaxMessagesPerBatch > 0);
+ conf.setBatchingMaxMessages(batchMessagesMaxMessagesPerBatch);
+ return this;
+ }
+
+ public Optional<Long> getInitialSequenceId() {
+ return Optional.ofNullable(conf.getInitialSequenceId());
+ }
+
+ /**
+ * Set the baseline for the sequence ids for messages published by the producer.
+ * <p>
+ * First message will be using (initialSequenceId + 1) as its sequence id and subsequent messages will be assigned
+ * incremental sequence ids, if not otherwise specified.
+ *
+ * @param initialSequenceId
+ * @return
+ */
+ public ProducerConfiguration setInitialSequenceId(long initialSequenceId) {
+ conf.setInitialSequenceId(initialSequenceId);
+ return this;
+ }
+
+ /**
+ * Set a name/value property with this producer.
+ *
+ * @param key
+ * @param value
+ * @return
+ */
+ public ProducerConfiguration setProperty(String key, String value) {
+ checkArgument(key != null);
+ checkArgument(value != null);
+ conf.getProperties().put(key, value);
+ return this;
+ }
+
+ /**
+ * Add all the properties in the provided map
+ *
+ * @param properties
+ * @return
+ */
+ public ProducerConfiguration setProperties(Map<String, String> properties) {
+ conf.getProperties().putAll(properties);
+ return this;
+ }
+
+ public Map<String, String> getProperties() {
+ return conf.getProperties();
+ }
+
+ public ProducerConfigurationData getProducerConfigurationData() {
+ return conf;
+ }
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
new file mode 100644
index 0000000..a57a684
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.pulsar.client.impl.v1.PulsarClientV1Impl;
+
+/**
+ * Class that provides a client interface to Pulsar.
+ * <p>
+ * Client instances are thread-safe and can be reused for managing multiple {@link Producer}, {@link Consumer} and
+ * {@link Reader} instances.
+ */
+public interface PulsarClient extends Closeable {
+
+ /**
+ * Create a new PulsarClient object using default client configuration
+ *
+ * @param serviceUrl
+ * the url of the Pulsar endpoint to be used
+ * @return a new pulsar client object
+ * @throws PulsarClientException.InvalidServiceURL
+ * if the serviceUrl is invalid
+ * @deprecated use {@link #builder()} to construct a client instance
+ */
+ @Deprecated
+ public static PulsarClient create(String serviceUrl) throws PulsarClientException {
+ return create(serviceUrl, new ClientConfiguration());
+ }
+
+ /**
+ * Create a new PulsarClient object
+ *
+ * @param serviceUrl
+ * the url of the Pulsar endpoint to be used
+ * @param conf
+ * the client configuration
+ * @return a new pulsar client object
+ * @throws PulsarClientException.InvalidServiceURL
+ * if the serviceUrl is invalid
+ * @deprecated use {@link #builder()} to construct a client instance
+ */
+ @Deprecated
+ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) throws PulsarClientException {
+ return new PulsarClientV1Impl(serviceUrl, conf);
+ }
+
+ /**
+ * Create a producer with default {@link ProducerConfiguration} for publishing on a specific topic
+ *
+ * @param topic
+ * The name of the topic where to produce
+ * @return The producer object
+ * @throws PulsarClientException.AlreadyClosedException
+ * if the client was already closed
+ * @throws PulsarClientException.InvalidTopicNameException
+ * if the topic name is not valid
+ * @throws PulsarClientException.AuthenticationException
+ * if there was an error with the supplied credentials
+ * @throws PulsarClientException.AuthorizationException
+ * if the authorization to publish on topic was denied
+ * @deprecated use {@link #newProducer()} to build a new producer
+ */
+ @Deprecated
+ Producer createProducer(String topic) throws PulsarClientException;
+
+ /**
+ * Asynchronously create a producer with default {@link ProducerConfiguration} for publishing on a specific topic
+ *
+ * @param topic
+ * The name of the topic where to produce
+ * @return Future of the asynchronously created producer object
+ * @deprecated use {@link #newProducer()} to build a new producer
+ */
+ @Deprecated
+ CompletableFuture<Producer> createProducerAsync(String topic);
+
+ /**
+ * Create a producer with given {@code ProducerConfiguration} for publishing on a specific topic
+ *
+ * @param topic
+ * The name of the topic where to produce
+ * @param conf
+ * The {@code ProducerConfiguration} object
+ * @return The producer object
+ * @throws PulsarClientException
+ * if it was not possible to create the producer
+ * @throws InterruptedException
+ * @deprecated use {@link #newProducer()} to build a new producer
+ */
+ @Deprecated
+ Producer createProducer(String topic, ProducerConfiguration conf) throws PulsarClientException;
+
+ /**
+ * Asynchronously create a producer with given {@code ProducerConfiguration} for publishing on a specific topic
+ *
+ * @param topic
+ * The name of the topic where to produce
+ * @param conf
+ * The {@code ProducerConfiguration} object
+ * @return Future of the asynchronously created producer object
+ * @deprecated use {@link #newProducer()} to build a new producer
+ */
+ @Deprecated
+ CompletableFuture<Producer> createProducerAsync(String topic, ProducerConfiguration conf);
+
+ /**
+ * Subscribe to the given topic and subscription combination with default {@code ConsumerConfiguration}
+ *
+ * @param topic
+ * The name of the topic
+ * @param subscription
+ * The name of the subscription
+ * @return The {@code Consumer} object
+ * @throws PulsarClientException
+ * @throws InterruptedException
+ *
+ * @deprecated Use {@link #newConsumer()} to build a new consumer
+ */
+ @Deprecated
+ Consumer subscribe(String topic, String subscription) throws PulsarClientException;
+
+ /**
+ * Asynchronously subscribe to the given topic and subscription combination using default
+ * {@code ConsumerConfiguration}
+ *
+ * @param topic
+ * The topic name
+ * @param subscription
+ * The subscription name
+ * @return Future of the {@code Consumer} object
+ * @deprecated Use {@link #newConsumer()} to build a new consumer
+ */
+ @Deprecated
+ CompletableFuture<Consumer> subscribeAsync(String topic, String subscription);
+
+ /**
+ * Subscribe to the given topic and subscription combination with given {@code ConsumerConfiguration}
+ *
+ * @param topic
+ * The name of the topic
+ * @param subscription
+ * The name of the subscription
+ * @param conf
+ * The {@code ConsumerConfiguration} object
+ * @return The {@code Consumer} object
+ * @throws PulsarClientException
+ * @deprecated Use {@link #newConsumer()} to build a new consumer
+ */
+ @Deprecated
+ Consumer subscribe(String topic, String subscription, ConsumerConfiguration conf) throws PulsarClientException;
+
+ /**
+ * Asynchronously subscribe to the given topic and subscription combination using given
+ * {@code ConsumerConfiguration}
+ *
+ * @param topic
+ * The name of the topic
+ * @param subscription
+ * The name of the subscription
+ * @param conf
+ * The {@code ConsumerConfiguration} object
+ * @return Future of the {@code Consumer} object
+ * @deprecated Use {@link #newConsumer()} to build a new consumer
+ */
+ @Deprecated
+ CompletableFuture<Consumer> subscribeAsync(String topic, String subscription, ConsumerConfiguration conf);
+
+ /**
+ * Create a topic reader with given {@code ReaderConfiguration} for reading messages from the specified topic.
+ * <p>
+ * The Reader provides a low-level abstraction that allows for manual positioning in the topic, without using a
+ * subscription. Reader can only work on non-partitioned topics.
+ * <p>
+ * The initial reader positioning is done by specifying a message id. The options are:
+ * <ul>
+ * <li><code>MessageId.earliest</code> : Start reading from the earliest message available in the topic
+ * <li><code>MessageId.latest</code> : Start reading from the end topic, only getting messages published after the
+ * reader was created
+ * <li><code>MessageId</code> : When passing a particular message id, the reader will position itself on that
+ * specific position. The first message to be read will be the message next to the specified messageId.
+ * </ul>
+ *
+ * @param topic
+ * The name of the topic where to read
+ * @param startMessageId
+ * The message id where the reader will position itself. The first message returned will be the one after
+ * the specified startMessageId
+ * @param conf
+ * The {@code ReaderConfiguration} object
+ * @return The {@code Reader} object
+ * @deprecated Use {@link #newReader()} to build a new reader
+ */
+ @Deprecated
+ Reader createReader(String topic, MessageId startMessageId, ReaderConfiguration conf) throws PulsarClientException;
+
+ /**
+ * Asynchronously create a topic reader with given {@code ReaderConfiguration} for reading messages from the
+ * specified topic.
+ * <p>
+ * The Reader provides a low-level abstraction that allows for manual positioning in the topic, without using a
+ * subscription. Reader can only work on non-partitioned topics.
+ * <p>
+ * The initial reader positioning is done by specifying a message id. The options are:
+ * <ul>
+ * <li><code>MessageId.earliest</code> : Start reading from the earliest message available in the topic
+ * <li><code>MessageId.latest</code> : Start reading from the end topic, only getting messages published after the
+ * reader was created
+ * <li><code>MessageId</code> : When passing a particular message id, the reader will position itself on that
+ * specific position. The first message to be read will be the message next to the specified messageId.
+ * </ul>
+ *
+ * @param topic
+ * The name of the topic where to read
+ * @param startMessageId
+ * The message id where the reader will position itself. The first message returned will be the one after
+ * the specified startMessageId
+ * @param conf
+ * The {@code ReaderConfiguration} object
+ * @return Future of the asynchronously created producer object
+ * @deprecated Use {@link #newReader()} to build a new reader
+ */
+ @Deprecated
+ CompletableFuture<Reader> createReaderAsync(String topic, MessageId startMessageId, ReaderConfiguration conf);
+
+ /**
+ * Close the PulsarClient and release all the resources.
+ *
+ * All the producers and consumers will be orderly closed. Waits until all pending write request are persisted.
+ *
+ * @throws PulsarClientException
+ * if the close operation fails
+ */
+ @Override
+ void close() throws PulsarClientException;
+
+ /**
+ * Asynchronously close the PulsarClient and release all the resources.
+ *
+ * All the producers and consumers will be orderly closed. Waits until all pending write request are persisted.
+ *
+ * @throws PulsarClientException
+ * if the close operation fails
+ */
+ CompletableFuture<Void> closeAsync();
+
+ /**
+ * Perform immediate shutdown of PulsarClient.
+ *
+ * Release all the resources and close all the producers without waiting for ongoing operations to complete.
+ *
+ * @throws PulsarClientException
+ * if the forceful shutdown fails
+ */
+ void shutdown() throws PulsarClientException;
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Reader.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Reader.java
new file mode 100644
index 0000000..4a435c3
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Reader.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A Reader can be used to scan through all the messages currently available in a topic.
+ *
+ */
+public interface Reader extends Closeable {
+
+ /**
+ * @return the topic from which this reader is reading from
+ */
+ String getTopic();
+
+ /**
+ * Read the next message in the topic
+ *
+ * @return the next messasge
+ * @throws PulsarClientException
+ */
+ Message<byte[]> readNext() throws PulsarClientException;
+
+ /**
+ * Read the next message in the topic waiting for a maximum of timeout
+ * time units. Returns null if no message is recieved in that time.
+ *
+ * @return the next message(Could be null if none received in time)
+ * @throws PulsarClientException
+ */
+ Message<byte[]> readNext(int timeout, TimeUnit unit) throws PulsarClientException;
+
+ CompletableFuture<Message<byte[]>> readNextAsync();
+
+ /**
+ * Asynchronously close the reader and stop the broker to push more messages
+ *
+ * @return a future that can be used to track the completion of the operation
+ */
+ CompletableFuture<Void> closeAsync();
+
+ /**
+ * Return true if the topic was terminated and this reader has reached the end of the topic
+ */
+ boolean hasReachedEndOfTopic();
+
+ /**
+ * Check if there is any message available to read from the current position.
+ */
+ boolean hasMessageAvailable() throws PulsarClientException;
+
+ /**
+ * Asynchronously Check if there is message that has been published successfully to the broker in the topic.
+ */
+ CompletableFuture<Boolean> hasMessageAvailableAsync();
+
+ /**
+ * @return Whether the reader is connected to the broker
+ */
+ boolean isConnected();
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
new file mode 100644
index 0000000..243166c
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import org.apache.commons.lang3.StringUtils;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+
+/**
+ *
+ * @deprecated Use {@link PulsarClient#newReader()} to construct and configure a {@link Reader} instance
+ */
+@Deprecated
+public class ReaderConfiguration implements Serializable {
+
+ private final ReaderConfigurationData<byte[]> conf = new ReaderConfigurationData<>();
+
+ /**
+ * @return the configured {@link ReaderListener} for the reader
+ */
+ public ReaderListener<byte[]> getReaderListener() {
+ return conf.getReaderListener();
+ }
+
+ /**
+ * Sets a {@link ReaderListener} for the reader
+ * <p>
+ * When a {@link ReaderListener} is set, application will receive messages through it. Calls to
+ * {@link Reader#readNext()} will not be allowed.
+ *
+ * @param readerListener
+ * the listener object
+ */
+ public ReaderConfiguration setReaderListener(ReaderListener<byte[]> readerListener) {
+ checkNotNull(readerListener);
+ conf.setReaderListener(readerListener);
+ return this;
+ }
+
+ /**
+ * @return the configure receiver queue size value
+ */
+ public int getReceiverQueueSize() {
+ return conf.getReceiverQueueSize();
+ }
+
+ /**
+ * @return the CryptoKeyReader
+ */
+ public CryptoKeyReader getCryptoKeyReader() {
+ return conf.getCryptoKeyReader();
+ }
+
+ /**
+ * Sets a {@link CryptoKeyReader}
+ *
+ * @param cryptoKeyReader
+ * CryptoKeyReader object
+ */
+ public ReaderConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
+ checkNotNull(cryptoKeyReader);
+ conf.setCryptoKeyReader(cryptoKeyReader);
+ return this;
+ }
+
+ /**
+ * Sets the ConsumerCryptoFailureAction to the value specified
+ *
+ * @param action
+ * The action to take when the decoding fails
+ */
+ public void setCryptoFailureAction(ConsumerCryptoFailureAction action) {
+ conf.setCryptoFailureAction(action);
+ }
+
+ /**
+ * @return The ConsumerCryptoFailureAction
+ */
+ public ConsumerCryptoFailureAction getCryptoFailureAction() {
+ return conf.getCryptoFailureAction();
+ }
+
+ /**
+ * Sets the size of the consumer receive queue.
+ * <p>
+ * The consumer receive queue controls how many messages can be accumulated by the {@link Consumer} before the
+ * application calls {@link Consumer#receive()}. Using a higher value could potentially increase the consumer
+ * throughput at the expense of bigger memory utilization.
+ * </p>
+ * Default value is {@code 1000} messages and should be good for most use cases.
+ *
+ * @param receiverQueueSize
+ * the new receiver queue size value
+ */
+ public ReaderConfiguration setReceiverQueueSize(int receiverQueueSize) {
+ checkArgument(receiverQueueSize >= 0, "Receiver queue size cannot be negative");
+ conf.setReceiverQueueSize(receiverQueueSize);
+ return this;
+ }
+
+ /**
+ * @return the consumer name
+ */
+ public String getReaderName() {
+ return conf.getReaderName();
+ }
+
+ /**
+ * Set the consumer name.
+ *
+ * @param readerName
+ */
+ public ReaderConfiguration setReaderName(String readerName) {
+ checkArgument(StringUtils.isNotBlank(readerName));
+ conf.setReaderName(readerName);
+ return this;
+ }
+
+ /**
+ * @return the subscription role prefix for subscription auth
+ */
+ public String getSubscriptionRolePrefix() {
+ return conf.getSubscriptionRolePrefix();
+ }
+
+ /**
+ * Set the subscription role prefix for subscription auth. The default prefix is "reader".
+ *
+ * @param subscriptionRolePrefix
+ */
+ public ReaderConfiguration setSubscriptionRolePrefix(String subscriptionRolePrefix) {
+ checkArgument(StringUtils.isNotBlank(subscriptionRolePrefix));
+ conf.setSubscriptionRolePrefix(subscriptionRolePrefix);
+ return this;
+ }
+
+ public ReaderConfigurationData<byte[]> getReaderConfigurationData() {
+ return conf;
+ }
+
+ private static final long serialVersionUID = 1L;
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java
new file mode 100644
index 0000000..76b1dab
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.base.Preconditions;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+
+@SuppressWarnings("deprecation")
+public class MessageBuilderImpl implements MessageBuilder {
+ private static final ByteBuffer EMPTY_CONTENT = ByteBuffer.allocate(0);
+ private final MessageMetadata.Builder msgMetadataBuilder = MessageMetadata.newBuilder();
+ private ByteBuffer content = EMPTY_CONTENT;
+
+ @Override
+ public Message<byte[]> build() {
+ return MessageImpl.create(msgMetadataBuilder, content, Schema.BYTES);
+ }
+
+ @Override
+ public MessageBuilder setContent(byte[] data) {
+ setContent(data, 0, data.length);
+ return this;
+ }
+
+ @Override
+ public MessageBuilder setContent(byte[] data, int offet, int length) {
+ this.content = ByteBuffer.wrap(data, offet, length);
+ return this;
+ }
+
+ @Override
+ public MessageBuilder setContent(ByteBuffer buf) {
+ this.content = buf.duplicate();
+ return this;
+ }
+
+ @Override
+ public MessageBuilder setProperties(Map<String, String> properties) {
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ msgMetadataBuilder
+ .addProperties(KeyValue.newBuilder().setKey(entry.getKey()).setValue(entry.getValue()).build());
+ }
+
+ return this;
+ }
+
+ @Override
+ public MessageBuilder setProperty(String name, String value) {
+ msgMetadataBuilder.addProperties(KeyValue.newBuilder().setKey(name).setValue(value).build());
+ return this;
+ }
+
+ @Override
+ public MessageBuilder setKey(String key) {
+ msgMetadataBuilder.setPartitionKey(key);
+ return this;
+ }
+
+ @Override
+ public MessageBuilder setEventTime(long timestamp) {
+ checkArgument(timestamp > 0, "Invalid timestamp : '%s'", timestamp);
+ msgMetadataBuilder.setEventTime(timestamp);
+ return this;
+ }
+
+ @Override
+ public MessageBuilder setSequenceId(long sequenceId) {
+ checkArgument(sequenceId >= 0);
+ msgMetadataBuilder.setSequenceId(sequenceId);
+ return this;
+ }
+
+ @Override
+ public MessageBuilder setReplicationClusters(List<String> clusters) {
+ Preconditions.checkNotNull(clusters);
+ msgMetadataBuilder.clearReplicateTo();
+ msgMetadataBuilder.addAllReplicateTo(clusters);
+ return this;
+ }
+
+ @Override
+ public MessageBuilder disableReplication() {
+ msgMetadataBuilder.clearReplicateTo();
+ msgMetadataBuilder.addReplicateTo("__local__");
+ return this;
+ }
+
+
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java
new file mode 100644
index 0000000..e21e572
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v1;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerStats;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+public class ConsumerV1Impl implements Consumer {
+ private final org.apache.pulsar.shade.client.api.v2.Consumer<byte[]> consumer;
+
+ public ConsumerV1Impl(org.apache.pulsar.shade.client.api.v2.Consumer<byte[]> consumer) {
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void acknowledge(Message<?> arg0) throws PulsarClientException {
+ consumer.acknowledge(arg0);
+ }
+
+ @Override
+ public void acknowledge(MessageId arg0) throws PulsarClientException {
+ consumer.acknowledge(arg0);
+ }
+
+ @Override
+ public CompletableFuture<Void> acknowledgeAsync(Message<?> arg0) {
+ return consumer.acknowledgeAsync(arg0);
+ }
+
+ @Override
+ public CompletableFuture<Void> acknowledgeAsync(MessageId arg0) {
+ return consumer.acknowledgeAsync(arg0);
+ }
+
+ @Override
+ public void acknowledgeCumulative(Message<?> arg0) throws PulsarClientException {
+ consumer.acknowledgeCumulative(arg0);
+ }
+
+ @Override
+ public void acknowledgeCumulative(MessageId arg0) throws PulsarClientException {
+ consumer.acknowledgeCumulative(arg0);
+ }
+
+ @Override
+ public CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> arg0) {
+ return consumer.acknowledgeCumulativeAsync(arg0);
+ }
+
+ @Override
+ public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId arg0) {
+ return consumer.acknowledgeCumulativeAsync(arg0);
+ }
+
+ @Override
+ public void close() throws PulsarClientException {
+ consumer.close();
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ return consumer.closeAsync();
+ }
+
+ @Override
+ public String getConsumerName() {
+ return consumer.getConsumerName();
+ }
+
+ @Override
+ public ConsumerStats getStats() {
+ return consumer.getStats();
+ }
+
+ public String getSubscription() {
+ return consumer.getSubscription();
+ }
+
+ public String getTopic() {
+ return consumer.getTopic();
+ }
+
+ public boolean hasReachedEndOfTopic() {
+ return consumer.hasReachedEndOfTopic();
+ }
+
+ public boolean isConnected() {
+ return consumer.isConnected();
+ }
+
+ public void pause() {
+ consumer.pause();
+ }
+
+ public Message<byte[]> receive() throws PulsarClientException {
+ return consumer.receive();
+ }
+
+ public Message<byte[]> receive(int arg0, TimeUnit arg1) throws PulsarClientException {
+ return consumer.receive(arg0, arg1);
+ }
+
+ public CompletableFuture<Message<byte[]>> receiveAsync() {
+ return consumer.receiveAsync();
+ }
+
+ public void redeliverUnacknowledgedMessages() {
+ consumer.redeliverUnacknowledgedMessages();
+ }
+
+ public void resume() {
+ consumer.resume();
+ }
+
+ public void seek(MessageId arg0) throws PulsarClientException {
+ consumer.seek(arg0);
+ }
+
+ public CompletableFuture<Void> seekAsync(MessageId arg0) {
+ return consumer.seekAsync(arg0);
+ }
+
+ public void unsubscribe() throws PulsarClientException {
+ consumer.unsubscribe();
+ }
+
+ public CompletableFuture<Void> unsubscribeAsync() {
+ return consumer.unsubscribeAsync();
+ }
+
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ProducerV1Impl.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ProducerV1Impl.java
new file mode 100644
index 0000000..3f91377
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ProducerV1Impl.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v1;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerStats;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ProducerImpl;
+
+public class ProducerV1Impl implements Producer {
+
+ private final ProducerImpl<byte[]> producer;
+
+ public ProducerV1Impl(ProducerImpl<byte[]> producer) {
+ this.producer = producer;
+ }
+
+ public void close() throws PulsarClientException {
+ producer.close();
+ }
+
+ public CompletableFuture<Void> closeAsync() {
+ return producer.closeAsync();
+ }
+
+ public void flush() throws PulsarClientException {
+ producer.flush();
+ }
+
+ public CompletableFuture<Void> flushAsync() {
+ return producer.flushAsync();
+ }
+
+ public long getLastSequenceId() {
+ return producer.getLastSequenceId();
+ }
+
+ public ProducerStats getStats() {
+ return producer.getStats();
+ }
+
+ public boolean isConnected() {
+ return producer.isConnected();
+ }
+
+ public MessageId send(byte[] value) throws PulsarClientException {
+ return producer.send(value);
+ }
+
+ public MessageId send(Message<byte[]> value) throws PulsarClientException {
+ return producer.send(value);
+ }
+
+ public CompletableFuture<MessageId> sendAsync(byte[] arg0) {
+ return producer.sendAsync(arg0);
+ }
+
+ public CompletableFuture<MessageId> sendAsync(Message<byte[]> arg0) {
+ return producer.sendAsync(arg0);
+ }
+
+ @Override
+ public String getTopic() {
+ return producer.getTopic();
+ }
+
+ @Override
+ public String getProducerName() {
+ return producer.getProducerName();
+ }
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/PulsarClientV1Impl.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/PulsarClientV1Impl.java
new file mode 100644
index 0000000..a10f9f1
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/PulsarClientV1Impl.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v1;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderConfiguration;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@SuppressWarnings("deprecation")
+public class PulsarClientV1Impl implements PulsarClient {
+
+ private final PulsarClientImpl client;
+
+ public PulsarClientV1Impl(String serviceUrl, ClientConfiguration conf) throws PulsarClientException {
+ this.client = new PulsarClientImpl(conf.setServiceUrl(serviceUrl).getConfigurationData().clone());
+ }
+
+ @Override
+ public void close() throws PulsarClientException {
+ client.close();
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ return client.closeAsync();
+ }
+
+ @Override
+ public Producer createProducer(final String topic, final ProducerConfiguration conf) throws PulsarClientException {
+ if (conf == null) {
+ throw new PulsarClientException.InvalidConfigurationException("Invalid null configuration object");
+ }
+
+ try {
+ return createProducerAsync(topic, conf).get();
+ } catch (ExecutionException e) {
+ Throwable t = e.getCause();
+ if (t instanceof PulsarClientException) {
+ throw (PulsarClientException) t;
+ } else {
+ throw new PulsarClientException(t);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarClientException(e);
+ }
+ }
+
+ @Override
+ public Producer createProducer(String topic)
+ throws PulsarClientException {
+ return createProducer(topic, new ProducerConfiguration());
+ }
+
+ @Override
+ public CompletableFuture<Producer> createProducerAsync(final String topic, final ProducerConfiguration conf) {
+ ProducerConfigurationData confData = conf.getProducerConfigurationData().clone();
+ confData.setTopicName(topic);
+ return client.createProducerAsync(confData).thenApply(p -> new ProducerV1Impl((ProducerImpl<byte[]>) p));
+ }
+
+ @Override
+ public CompletableFuture<Producer> createProducerAsync(String topic) {
+ return createProducerAsync(topic, new ProducerConfiguration());
+ }
+
+ @Override
+ public Reader createReader(String topic, MessageId startMessageId, ReaderConfiguration conf)
+ throws PulsarClientException {
+ try {
+ return createReaderAsync(topic, startMessageId, conf).get();
+ } catch (ExecutionException e) {
+ Throwable t = e.getCause();
+ if (t instanceof PulsarClientException) {
+ throw (PulsarClientException) t;
+ } else {
+ throw new PulsarClientException(t);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarClientException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Reader> createReaderAsync(String topic, MessageId startMessageId,
+ ReaderConfiguration conf) {
+ ReaderConfigurationData<byte[]> confData = conf.getReaderConfigurationData().clone();
+ confData.setTopicName(topic);
+ confData.setStartMessageId(startMessageId);
+ return client.createReaderAsync(confData).thenApply(r -> new ReaderV1Impl(r));
+ }
+
+ @Override
+ public void shutdown() throws PulsarClientException {
+ client.shutdown();
+ }
+
+ @Override
+ public Consumer subscribe(String topic, String subscriptionName) throws PulsarClientException {
+ return subscribe(topic, subscriptionName, new ConsumerConfiguration());
+ }
+
+ @Override
+ public CompletableFuture<Consumer> subscribeAsync(final String topic, final String subscription,
+ final ConsumerConfiguration conf) {
+ if (conf == null) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.InvalidConfigurationException("Invalid null configuration"));
+ }
+
+ ConsumerConfigurationData<byte[]> confData = conf.getConfigurationData().clone();
+ confData.getTopicNames().add(topic);
+ confData.setSubscriptionName(subscription);
+ return client.subscribeAsync(confData).thenApply(c -> new ConsumerV1Impl(c));
+ }
+
+ @Override
+ public CompletableFuture<Consumer> subscribeAsync(String topic,
+ String subscriptionName) {
+ return subscribeAsync(topic, subscriptionName, new ConsumerConfiguration());
+ }
+
+ @Override
+ public Consumer subscribe(String topic, String subscription, ConsumerConfiguration conf)
+ throws PulsarClientException {
+ try {
+ return subscribeAsync(topic, subscription, conf).get();
+ } catch (ExecutionException e) {
+ Throwable t = e.getCause();
+ if (t instanceof PulsarClientException) {
+ throw (PulsarClientException) t;
+ } else {
+ throw new PulsarClientException(t);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarClientException(e);
+ }
+ }
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ReaderV1Impl.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ReaderV1Impl.java
new file mode 100644
index 0000000..7ae86b5
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ReaderV1Impl.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v1;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+
+public class ReaderV1Impl implements Reader {
+
+ private final org.apache.pulsar.shade.client.api.v2.Reader<byte[]> reader;
+
+ public ReaderV1Impl(org.apache.pulsar.shade.client.api.v2.Reader<byte[]> reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ return reader.closeAsync();
+ }
+
+ @Override
+ public String getTopic() {
+ return reader.getTopic();
+ }
+
+ @Override
+ public boolean hasMessageAvailable() throws PulsarClientException {
+ return reader.hasMessageAvailable();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> hasMessageAvailableAsync() {
+ return reader.hasMessageAvailableAsync();
+ }
+
+ @Override
+ public boolean hasReachedEndOfTopic() {
+ return reader.hasReachedEndOfTopic();
+ }
+
+ @Override
+ public boolean isConnected() {
+ return reader.isConnected();
+ }
+
+ @Override
+ public Message<byte[]> readNext() throws PulsarClientException {
+ return reader.readNext();
+ }
+
+ @Override
+ public Message<byte[]> readNext(int arg0, TimeUnit arg1) throws PulsarClientException {
+ return reader.readNext(arg0, arg1);
+ }
+
+ @Override
+ public CompletableFuture<Message<byte[]>> readNextAsync() {
+ return reader.readNextAsync();
+ }
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageBuilderTest.java b/pulsar-client-1x-base/pulsar-client-1x/src/test/java/org/apache/pulsar/client/impl/MessageBuilderTest.java
similarity index 88%
rename from pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageBuilderTest.java
rename to pulsar-client-1x-base/pulsar-client-1x/src/test/java/org/apache/pulsar/client/impl/MessageBuilderTest.java
index 56630a4..5e14b7b 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageBuilderTest.java
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/test/java/org/apache/pulsar/client/impl/MessageBuilderTest.java
@@ -36,20 +36,20 @@ public class MessageBuilderTest {
@Test(expectedExceptions = IllegalArgumentException.class)
public void testSetEventTimeNegative() {
- MessageBuilder<?> builder = MessageBuilder.create();
+ MessageBuilder builder = MessageBuilder.create();
builder.setEventTime(-1L);
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testSetEventTimeZero() {
- MessageBuilder<?> builder = MessageBuilder.create();
+ MessageBuilder builder = MessageBuilder.create();
builder.setEventTime(0L);
}
@Test
public void testSetEventTimePositive() {
long eventTime = System.currentTimeMillis();
- MessageBuilder<?> builder = MessageBuilder.create();
+ MessageBuilder builder = MessageBuilder.create();
builder.setContent(new byte[0]);
builder.setEventTime(eventTime);
Message<?> msg = builder.build();
@@ -58,7 +58,7 @@ public class MessageBuilderTest {
@Test
public void testBuildMessageWithoutEventTime() {
- MessageBuilder<?> builder = MessageBuilder.create();
+ MessageBuilder builder = MessageBuilder.create();
builder.setContent(new byte[0]);
Message<?> msg = builder.build();
assertEquals(0L, msg.getEventTime());
@@ -66,7 +66,7 @@ public class MessageBuilderTest {
@Test
public void testSetMessageProperties() {
- MessageBuilder<?> builder = MessageBuilder.create();
+ MessageBuilder builder = MessageBuilder.create();
builder.setContent(new byte[0]);
Map<String, String> map = Maps.newHashMap();
map.put("key1", "value1");
diff --git a/pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml b/pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml
new file mode 100644
index 0000000..6b6b2bc
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml
@@ -0,0 +1,91 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client-1x-base</artifactId>
+ <version>2.3.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>pulsar-client-2x-shaded</artifactId>
+ <name>Pulsar Client 2.x Shaded API</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <!-- Shade all the dependencies to avoid conflicts -->
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+ <minimizeJar>false</minimizeJar>
+
+ <artifactSet>
+ <includes>
+ <include>org.apache.pulsar:pulsar-client</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>org.apache.pulsar:pulsar-client</artifact>
+ <includes>
+ <include>**</include>
+ </includes>
+ </filter>
+ </filters>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.pulsar.client.api</pattern>
+ <shadedPattern>org.apache.pulsar.shade.client.api.v2</shadedPattern>
+ <includes>
+ <include>org.apache.pulsar.client.api.PulsarClient</include>
+ <include>org.apache.pulsar.client.api.Producer</include>
+ <include>org.apache.pulsar.client.api.Consumer</include>
+ <include>org.apache.pulsar.client.api.Reader</include>
+ </includes>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DefaultSchemasTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DefaultSchemasTest.java
index 70416f3..46516f0 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DefaultSchemasTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DefaultSchemasTest.java
@@ -18,9 +18,11 @@
*/
package org.apache.pulsar.client.impl.schema;
+import static org.testng.Assert.assertEquals;
+
+import java.nio.charset.StandardCharsets;
+
import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -30,10 +32,6 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import java.nio.charset.StandardCharsets;
-
-import static org.testng.Assert.assertEquals;
-
public class DefaultSchemasTest {
private PulsarClient client;
@@ -75,17 +73,7 @@ public class DefaultSchemasTest {
Assert.assertTrue(stringSchema.decode(testBytes).equals(testString));
assertEquals(stringSchema.encode(testString), testBytes);
- Message<String> msg1 = MessageBuilder.create(stringSchema)
- .setContent(testBytes)
- .build();
- assertEquals(stringSchema.decode(msg1.getData()), testString);
-
- Message<String> msg2 = MessageBuilder.create(stringSchema)
- .setValue(testString)
- .build();
- assertEquals(stringSchema.encode(testString), msg2.getData());
-
- byte[] bytes2 = testString.getBytes(StandardCharsets.UTF_16);
+ byte[] bytes2 = testString.getBytes(StandardCharsets.UTF_16);
StringSchema stringSchemaUtf16 = new StringSchema(StandardCharsets.UTF_16);
Assert.assertTrue(stringSchemaUtf16.decode(bytes2).equals(testString));
assertEquals(stringSchemaUtf16.encode(testString), bytes2);