You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/02 08:13:54 UTC
[incubator-pulsar] branch master updated: Converted main part of
code to use builder APIs with typed interface (#1311)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a64b383 Converted main part of code to use builder APIs with typed interface (#1311)
a64b383 is described below
commit a64b383de7fee866756782c6743823bf90d69395
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Mar 2 00:13:51 2018 -0800
Converted main part of code to use builder APIs with typed interface (#1311)
---
.../pulsar/broker/service/BrokerService.java | 37 +++++-----
.../broker/auth/MockedPulsarServiceBaseTest.java | 8 +--
.../client/impl/ConsumeBaseExceptionTest.java | 2 +-
.../clients/consumer/PulsarKafkaConsumer.java | 51 +++++++-------
.../clients/producer/PulsarKafkaProducer.java | 42 ++++++------
.../kafka/compat/PulsarClientKafkaConfig.java | 37 ++++++----
.../kafka/compat/PulsarConsumerKafkaConfig.java | 15 ++--
.../kafka/compat/PulsarProducerKafkaConfig.java | 21 +++---
.../apache/pulsar/client/api/ConsumerBuilder.java | 6 +-
.../pulsar/client/api/ConsumerConfiguration.java | 4 +-
.../pulsar/client/api/ConsumerEventListener.java | 4 +-
.../apache/pulsar/client/api/MessageRouter.java | 4 +-
.../apache/pulsar/client/api/ProducerBuilder.java | 2 +-
.../apache/pulsar/client/api/ReaderBuilder.java | 2 +-
.../pulsar/client/impl/BatchMessageContainer.java | 6 +-
.../pulsar/client/impl/ClientBuilderImpl.java | 4 ++
.../org/apache/pulsar/client/impl/ClientCnx.java | 18 ++---
.../apache/pulsar/client/impl/ConsumerBase.java | 10 +--
.../pulsar/client/impl/ConsumerBuilderImpl.java | 3 +-
.../apache/pulsar/client/impl/ConsumerImpl.java | 11 +--
.../apache/pulsar/client/impl/ConsumerStats.java | 8 +--
.../pulsar/client/impl/ConsumerStatsDisabled.java | 2 +-
.../org/apache/pulsar/client/impl/MessageImpl.java | 21 +++---
.../client/impl/PartitionedConsumerImpl.java | 24 +++----
.../client/impl/PartitionedProducerImpl.java | 10 +--
.../apache/pulsar/client/impl/ProducerImpl.java | 21 +++---
.../apache/pulsar/client/impl/ProducerStats.java | 4 +-
.../pulsar/client/impl/PulsarClientImpl.java | 13 ++--
.../pulsar/client/impl/ReaderBuilderImpl.java | 7 +-
.../org/apache/pulsar/client/impl/ReaderImpl.java | 2 +-
.../impl/RoundRobinPartitionMessageRouterImpl.java | 2 +-
.../apache/pulsar/proxy/server/ProxyService.java | 20 +++---
.../proxy/server/ProxyForwardAuthDataTest.java | 7 +-
.../apache/pulsar/storm/MessageToValuesMapper.java | 2 +-
.../java/org/apache/pulsar/storm/PulsarBolt.java | 55 +++++++++++----
.../java/org/apache/pulsar/storm/PulsarSpout.java | 79 +++++++++++++++-------
.../apache/pulsar/storm/SharedPulsarClient.java | 55 ++++++++-------
.../pulsar/testclient/PerformanceConsumer.java | 62 ++++++++---------
.../pulsar/testclient/PerformanceProducer.java | 51 +++++++-------
.../pulsar/testclient/PerformanceReader.java | 36 +++++-----
.../apache/pulsar/websocket/WebSocketService.java | 29 ++++----
41 files changed, 436 insertions(+), 361 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 1910680..bbdc696 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -78,11 +78,12 @@ import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventListner;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventType;
-import org.apache.pulsar.client.api.ClientConfiguration;
-import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
@@ -499,27 +500,29 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
String path = PulsarWebResource.path("clusters", cluster);
ClusterData data = this.pulsar.getConfigurationCache().clustersCache().get(path)
.orElseThrow(() -> new KeeperException.NoNodeException(path));
- ClientConfiguration configuration = new ClientConfiguration();
- configuration.setUseTcpNoDelay(false);
- configuration.setConnectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker());
- configuration.setStatsInterval(0, TimeUnit.SECONDS);
+ ClientBuilder clientBuilder = PulsarClient.builder()
+ .enableTcpNoDelay(false)
+ .connectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker())
+ .statsInterval(0, TimeUnit.SECONDS);
if (pulsar.getConfiguration().isAuthenticationEnabled()) {
- configuration.setAuthentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
+ clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
}
- String clusterUrl = null;
if (pulsar.getConfiguration().isReplicationTlsEnabled()) {
- clusterUrl = isNotBlank(data.getBrokerServiceUrlTls()) ? data.getBrokerServiceUrlTls()
- : data.getServiceUrlTls();
- configuration.setUseTls(true);
- configuration.setTlsTrustCertsFilePath(pulsar.getConfiguration().getBrokerClientTrustCertsFilePath());
- configuration
- .setTlsAllowInsecureConnection(pulsar.getConfiguration().isTlsAllowInsecureConnection());
+ clientBuilder
+ .serviceUrl(isNotBlank(data.getBrokerServiceUrlTls()) ? data.getBrokerServiceUrlTls()
+ : data.getServiceUrlTls())
+ .enableTls(true)
+ .tlsTrustCertsFilePath(pulsar.getConfiguration().getBrokerClientTrustCertsFilePath())
+ .allowTlsInsecureConnection(pulsar.getConfiguration().isTlsAllowInsecureConnection());
} else {
- clusterUrl = isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl()
- : data.getServiceUrl();
+ clientBuilder.serviceUrl(
+ isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl() : data.getServiceUrl());
}
- return new PulsarClientImpl(clusterUrl, configuration, this.workerGroup);
+
+ // Share all the IO threads across broker and client connections
+ ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
+ return new PulsarClientImpl(conf, workerGroup);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index c0e3839..02055ac 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -104,24 +104,20 @@ public abstract class MockedPulsarServiceBaseTest {
protected final void internalSetup() throws Exception {
init();
- org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
- clientConf.setStatsInterval(0, TimeUnit.SECONDS);
lookupUrl = new URI(brokerUrl.toString());
if (isTcpLookup) {
lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT);
}
- pulsarClient = PulsarClient.create(lookupUrl.toString(), clientConf);
+ pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).statsInterval(0, TimeUnit.SECONDS).build();
}
protected final void internalSetupForStatsTest() throws Exception {
init();
- org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
- clientConf.setStatsInterval(1, TimeUnit.SECONDS);
String lookupUrl = brokerUrl.toString();
if (isTcpLookup) {
lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
}
- pulsarClient = PulsarClient.create(lookupUrl, clientConf);
+ pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).statsInterval(1, TimeUnit.SECONDS).build();
}
protected final void init() throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java
index fafc541..c3096ec 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java
@@ -64,7 +64,7 @@ public class ConsumeBaseExceptionTest extends ProducerConsumerBase {
public void testListener() throws PulsarClientException {
Consumer consumer = null;
ConsumerConfiguration conf = new ConsumerConfiguration();
- conf.setMessageListener((Consumer c, Message msg) -> {
+ conf.setMessageListener((Consumer<byte[]> c, Message<byte[]> msg) -> {
});
consumer = pulsarClient.subscribe("persistent://prop/cluster/ns/topicName", "my-subscription", conf);
Assert.assertTrue(consumer.receiveAsync().isCompletedExceptionally());
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 31c9921..886da5f 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -45,8 +45,8 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.pulsar.client.api.ClientConfiguration;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
@@ -62,7 +62,7 @@ import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
-public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListener {
+public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListener<byte[]> {
private static final long serialVersionUID = 1L;
@@ -74,7 +74,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
private final String groupId;
private final boolean isAutoCommit;
- private final ConcurrentMap<TopicPartition, org.apache.pulsar.client.api.Consumer> consumers = new ConcurrentHashMap<>();
+ private final ConcurrentMap<TopicPartition, org.apache.pulsar.client.api.Consumer<byte[]>> consumers = new ConcurrentHashMap<>();
private final Map<TopicPartition, Long> lastReceivedOffset = new ConcurrentHashMap<>();
private final Map<TopicPartition, OffsetAndMetadata> lastCommittedOffset = new ConcurrentHashMap<>();
@@ -84,10 +84,10 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
private final Properties properties;
private static class QueueItem {
- final org.apache.pulsar.client.api.Consumer consumer;
- final Message message;
+ final org.apache.pulsar.client.api.Consumer<byte[]> consumer;
+ final Message<byte[]> message;
- QueueItem(org.apache.pulsar.client.api.Consumer consumer, Message message) {
+ QueueItem(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> message) {
this.consumer = consumer;
this.message = message;
}
@@ -146,19 +146,19 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
this.properties = new Properties();
config.originals().forEach((k, v) -> properties.put(k, v));
- ClientConfiguration clientConf = PulsarClientKafkaConfig.getClientConfiguration(properties);
+ ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(properties);
// Since this client instance is going to be used just for the consumers, we can enable Nagle to group
// all the acknowledgments sent to broker within a short time frame
- clientConf.setUseTcpNoDelay(false);
+ clientBuilder.enableTcpNoDelay(false);
try {
- client = PulsarClient.create(serviceUrl, clientConf);
+ client = clientBuilder.serviceUrl(serviceUrl).build();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
@Override
- public void received(org.apache.pulsar.client.api.Consumer consumer, Message msg) {
+ public void received(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> msg) {
// Block listener thread if the application is slowing down
try {
receivedMessages.put(new QueueItem(consumer, msg));
@@ -204,16 +204,17 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
// acknowledgeCumulative()
int numberOfPartitions = ((PulsarClientImpl) client).getNumberOfPartitions(topic).get();
- ConsumerConfiguration conf = PulsarConsumerKafkaConfig.getConsumerConfiguration(properties);
- conf.setSubscriptionType(SubscriptionType.Failover);
- conf.setMessageListener(this);
+ ConsumerBuilder<byte[]> consumerBuilder = PulsarConsumerKafkaConfig.getConsumerBuilder(client, properties);
+ consumerBuilder.subscriptionType(SubscriptionType.Failover);
+ consumerBuilder.messageListener(this);
+ consumerBuilder.subscriptionName(groupId);
if (numberOfPartitions > 1) {
// Subscribe to each partition
- conf.setConsumerName(ConsumerName.generateRandomName());
+ consumerBuilder.consumerName(ConsumerName.generateRandomName());
for (int i = 0; i < numberOfPartitions; i++) {
String partitionName = TopicName.get(topic).getPartition(i).toString();
- CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = client
- .subscribeAsync(partitionName, groupId, conf);
+ CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.clone()
+ .topic(partitionName).subscribeAsync();
int partitionIndex = i;
TopicPartition tp = new TopicPartition(topic, partitionIndex);
future.thenAccept(consumer -> consumers.putIfAbsent(tp, consumer));
@@ -222,8 +223,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
} else {
// Topic has a single partition
- CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = client.subscribeAsync(topic,
- groupId, conf);
+ CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.topic(topic)
+ .subscribeAsync();
TopicPartition tp = new TopicPartition(topic, 0);
future.thenAccept(consumer -> consumers.putIfAbsent(tp, consumer));
futures.add(future);
@@ -293,7 +294,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
TopicName topicName = TopicName.get(item.consumer.getTopic());
String topic = topicName.getPartitionedTopicName();
int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
- Message msg = item.message;
+ Message<byte[]> msg = item.message;
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
long offset = MessageIdUtils.getOffset(msgId);
@@ -335,7 +336,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
@SuppressWarnings("unchecked")
- private K getKey(String topic, Message msg) {
+ private K getKey(String topic, Message<byte[]> msg) {
if (!msg.hasKey()) {
return null;
}
@@ -393,7 +394,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
List<CompletableFuture<Void>> futures = new ArrayList<>();
offsets.forEach((topicPartition, offsetAndMetadata) -> {
- org.apache.pulsar.client.api.Consumer consumer = consumers.get(topicPartition);
+ org.apache.pulsar.client.api.Consumer<byte[]> consumer = consumers.get(topicPartition);
lastCommittedOffset.put(topicPartition, offsetAndMetadata);
futures.add(consumer.acknowledgeCumulativeAsync(MessageIdUtils.getMessageId(offsetAndMetadata.offset())));
@@ -415,7 +416,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
@Override
public void seek(TopicPartition partition, long offset) {
MessageId msgId = MessageIdUtils.getMessageId(offset);
- org.apache.pulsar.client.api.Consumer c = consumers.get(partition);
+ org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(partition);
if (c == null) {
throw new IllegalArgumentException("Cannot seek on a partition where we are not subscribed");
}
@@ -436,7 +437,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
for (TopicPartition tp : partitions) {
- org.apache.pulsar.client.api.Consumer c = consumers.get(tp);
+ org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
if (c == null) {
futures.add(FutureUtil.failedFuture(
new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
@@ -457,7 +458,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
for (TopicPartition tp : partitions) {
- org.apache.pulsar.client.api.Consumer c = consumers.get(tp);
+ org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
if (c == null) {
futures.add(FutureUtil.failedFuture(
new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 7b8bf9a..ae69c85 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -38,12 +38,11 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -54,9 +53,9 @@ import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
private final PulsarClient client;
- private final ProducerConfiguration pulsarProducerConf;
+ private final ProducerBuilder<byte[]> pulsarProducerBuilder;
- private final ConcurrentMap<String, org.apache.pulsar.client.api.Producer> producers = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, org.apache.pulsar.client.api.Producer<byte[]>> producers = new ConcurrentHashMap<>();
private final Serializer<K> keySerializer;
private final Serializer<V> valueSerializer;
@@ -107,30 +106,29 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
}
String serviceUrl = producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
- ClientConfiguration clientConf = PulsarClientKafkaConfig.getClientConfiguration(properties);
try {
- client = PulsarClient.create(serviceUrl, clientConf);
+ client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(serviceUrl).build();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
- pulsarProducerConf = PulsarProducerKafkaConfig.getProducerConfiguration(properties);
+ pulsarProducerBuilder = PulsarProducerKafkaConfig.getProducerBuilder(client, properties);
// To mimic the same batching mode as Kafka, we need to wait a very little amount of
// time to batch if the client is trying to send messages fast enough
long lingerMs = Long.parseLong(properties.getProperty(ProducerConfig.LINGER_MS_CONFIG, "1"));
- pulsarProducerConf.setBatchingMaxPublishDelay(lingerMs, TimeUnit.MILLISECONDS);
+ pulsarProducerBuilder.batchingMaxPublishDelay(lingerMs, TimeUnit.MILLISECONDS);
String compressionType = properties.getProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG);
if ("gzip".equals(compressionType)) {
- pulsarProducerConf.setCompressionType(CompressionType.ZLIB);
+ pulsarProducerBuilder.compressionType(CompressionType.ZLIB);
} else if ("lz4".equals(compressionType)) {
- pulsarProducerConf.setCompressionType(CompressionType.LZ4);
+ pulsarProducerBuilder.compressionType(CompressionType.LZ4);
}
- pulsarProducerConf.setSendTimeout(
- Integer.parseInt(properties.getProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000")),
- TimeUnit.MILLISECONDS);
+
+ int sendTimeoutMillis = Integer.parseInt(properties.getProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000"));
+ pulsarProducerBuilder.sendTimeout(sendTimeoutMillis, TimeUnit.MILLISECONDS);
boolean blockOnBufferFull = Boolean
.parseBoolean(properties.getProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false"));
@@ -138,8 +136,8 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
// Kafka blocking semantic when blockOnBufferFull=false is different from Pulsar client
// Pulsar throws error immediately when the queue is full and blockIfQueueFull=false
// Kafka, on the other hand, still blocks for "max.block.ms" time and then gives error.
- boolean shouldBlockPulsarProducer = pulsarProducerConf.getSendTimeoutMs() > 0 || blockOnBufferFull;
- pulsarProducerConf.setBlockIfQueueFull(shouldBlockPulsarProducer);
+ boolean shouldBlockPulsarProducer = sendTimeoutMillis > 0 || blockOnBufferFull;
+ pulsarProducerBuilder.blockIfQueueFull(shouldBlockPulsarProducer);
}
@Override
@@ -149,7 +147,7 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
- org.apache.pulsar.client.api.Producer producer;
+ org.apache.pulsar.client.api.Producer<byte[]> producer;
try {
producer = producers.computeIfAbsent(record.topic(), topic -> createNewProducer(topic));
@@ -162,7 +160,7 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
return future;
}
- Message msg = getMessage(record);
+ Message<byte[]> msg = getMessage(record);
int messageSize = msg.getData().length;
CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
@@ -225,20 +223,20 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
}
}
- private org.apache.pulsar.client.api.Producer createNewProducer(String topic) {
+ private org.apache.pulsar.client.api.Producer<byte[]> createNewProducer(String topic) {
try {
- return client.createProducer(topic, pulsarProducerConf);
+ return pulsarProducerBuilder.clone().topic(topic).create();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
- private Message getMessage(ProducerRecord<K, V> record) {
+ private Message<byte[]> getMessage(ProducerRecord<K, V> record) {
if (record.partition() != null) {
throw new UnsupportedOperationException("");
}
- MessageBuilder builder = MessageBuilder.create();
+ MessageBuilder<byte[]> builder = MessageBuilder.create();
if (record.key() != null) {
builder.setKey(getKey(record.topic(), record.key()));
}
@@ -259,7 +257,7 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
}
}
- private RecordMetadata getRecordMetadata(String topic, Message msg, MessageId messageId, int size) {
+ private RecordMetadata getRecordMetadata(String topic, Message<byte[]> msg, MessageId messageId, int size) {
MessageIdImpl msgId = (MessageIdImpl) messageId;
// Combine ledger id and entry id to form offset
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
index d9ce75e..ca57f5b 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
@@ -22,7 +22,8 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
public class PulsarClientKafkaConfig {
@@ -31,6 +32,7 @@ public class PulsarClientKafkaConfig {
public static final String USE_TLS = "pulsar.use.tls";
public static final String TLS_TRUST_CERTS_FILE_PATH = "pulsar.tls.trust.certs.file.path";
public static final String TLS_ALLOW_INSECURE_CONNECTION = "pulsar.tls.allow.insecure.connection";
+ public static final String TLS_HOSTNAME_VERIFICATION = "pulsar.tls.hostname.verification";
public static final String OPERATION_TIMEOUT_MS = "pulsar.operation.timeout.ms";
public static final String STATS_INTERVAL_SECONDS = "pulsar.stats.interval.seconds";
@@ -43,8 +45,8 @@ public class PulsarClientKafkaConfig {
public static final String CONCURRENT_LOOKUP_REQUESTS = "pulsar.concurrent.lookup.requests";
public static final String MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION = "pulsar.max.number.rejected.request.per.connection";
- public static ClientConfiguration getClientConfiguration(Properties properties) {
- ClientConfiguration conf = new ClientConfiguration();
+ public static ClientBuilder getClientBuilder(Properties properties) {
+ ClientBuilder clientBuilder = PulsarClient.builder();
if (properties.containsKey(AUTHENTICATION_CLASS)) {
String className = properties.getProperty(AUTHENTICATION_CLASS);
@@ -52,48 +54,53 @@ public class PulsarClientKafkaConfig {
@SuppressWarnings("unchecked")
Class<Authentication> clazz = (Class<Authentication>) Class.forName(className);
Authentication auth = clazz.newInstance();
- conf.setAuthentication(auth);
+ clientBuilder.authentication(auth);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
- conf.setUseTls(Boolean.parseBoolean(properties.getProperty(USE_TLS, "false")));
- conf.setUseTls(Boolean.parseBoolean(properties.getProperty(TLS_ALLOW_INSECURE_CONNECTION, "false")));
+ clientBuilder.enableTls(Boolean.parseBoolean(properties.getProperty(USE_TLS, "false")));
+ clientBuilder.allowTlsInsecureConnection(
+ Boolean.parseBoolean(properties.getProperty(TLS_ALLOW_INSECURE_CONNECTION, "false")));
+ clientBuilder.enableTlsHostnameVerification(
+ Boolean.parseBoolean(properties.getProperty(TLS_HOSTNAME_VERIFICATION, "false")));
+
if (properties.containsKey(TLS_TRUST_CERTS_FILE_PATH)) {
- conf.setTlsTrustCertsFilePath(properties.getProperty(TLS_TRUST_CERTS_FILE_PATH));
+ clientBuilder.tlsTrustCertsFilePath(properties.getProperty(TLS_TRUST_CERTS_FILE_PATH));
}
if (properties.containsKey(OPERATION_TIMEOUT_MS)) {
- conf.setOperationTimeout(Integer.parseInt(properties.getProperty(OPERATION_TIMEOUT_MS)),
+ clientBuilder.operationTimeout(Integer.parseInt(properties.getProperty(OPERATION_TIMEOUT_MS)),
TimeUnit.MILLISECONDS);
}
if (properties.containsKey(STATS_INTERVAL_SECONDS)) {
- conf.setStatsInterval(Integer.parseInt(properties.getProperty(STATS_INTERVAL_SECONDS)), TimeUnit.SECONDS);
+ clientBuilder.statsInterval(Integer.parseInt(properties.getProperty(STATS_INTERVAL_SECONDS)),
+ TimeUnit.SECONDS);
}
if (properties.containsKey(NUM_IO_THREADS)) {
- conf.setIoThreads(Integer.parseInt(properties.getProperty(NUM_IO_THREADS)));
+ clientBuilder.ioThreads(Integer.parseInt(properties.getProperty(NUM_IO_THREADS)));
}
if (properties.containsKey(CONNECTIONS_PER_BROKER)) {
- conf.setConnectionsPerBroker(Integer.parseInt(properties.getProperty(CONNECTIONS_PER_BROKER)));
+ clientBuilder.connectionsPerBroker(Integer.parseInt(properties.getProperty(CONNECTIONS_PER_BROKER)));
}
if (properties.containsKey(USE_TCP_NODELAY)) {
- conf.setUseTcpNoDelay(Boolean.parseBoolean(properties.getProperty(USE_TCP_NODELAY)));
+ clientBuilder.enableTcpNoDelay(Boolean.parseBoolean(properties.getProperty(USE_TCP_NODELAY)));
}
if (properties.containsKey(CONCURRENT_LOOKUP_REQUESTS)) {
- conf.setConcurrentLookupRequest(Integer.parseInt(properties.getProperty(CONCURRENT_LOOKUP_REQUESTS)));
+ clientBuilder.maxConcurrentLookupRequests(Integer.parseInt(properties.getProperty(CONCURRENT_LOOKUP_REQUESTS)));
}
if (properties.containsKey(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)) {
- conf.setMaxNumberOfRejectedRequestPerConnection(
+ clientBuilder.maxNumberOfRejectedRequestPerConnection(
Integer.parseInt(properties.getProperty(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)));
}
- return conf;
+ return clientBuilder;
}
}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
index f91c484..4addfb7 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
@@ -20,7 +20,8 @@ package org.apache.pulsar.client.kafka.compat;
import java.util.Properties;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
public class PulsarConsumerKafkaConfig {
@@ -29,22 +30,22 @@ public class PulsarConsumerKafkaConfig {
public static final String RECEIVER_QUEUE_SIZE = "pulsar.consumer.receiver.queue.size";
public static final String TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS = "pulsar.consumer.total.receiver.queue.size.across.partitions";
- public static ConsumerConfiguration getConsumerConfiguration(Properties properties) {
- ConsumerConfiguration conf = new ConsumerConfiguration();
+ public static ConsumerBuilder<byte[]> getConsumerBuilder(PulsarClient client, Properties properties) {
+ ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer();
if (properties.containsKey(CONSUMER_NAME)) {
- conf.setConsumerName(properties.getProperty(CONSUMER_NAME));
+ consumerBuilder.consumerName(properties.getProperty(CONSUMER_NAME));
}
if (properties.containsKey(RECEIVER_QUEUE_SIZE)) {
- conf.setReceiverQueueSize(Integer.parseInt(properties.getProperty(RECEIVER_QUEUE_SIZE)));
+ consumerBuilder.receiverQueueSize(Integer.parseInt(properties.getProperty(RECEIVER_QUEUE_SIZE)));
}
if (properties.containsKey(TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS)) {
- conf.setMaxTotalReceiverQueueSizeAcrossPartitions(
+ consumerBuilder.maxTotalReceiverQueueSizeAcrossPartitions(
Integer.parseInt(properties.getProperty(TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS)));
}
- return conf;
+ return consumerBuilder;
}
}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
index c2e4886..5a9a651 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
@@ -20,7 +20,8 @@ package org.apache.pulsar.client.kafka.compat;
import java.util.Properties;
-import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
public class PulsarProducerKafkaConfig {
@@ -33,32 +34,32 @@ public class PulsarProducerKafkaConfig {
public static final String BATCHING_ENABLED = "pulsar.producer.batching.enabled";
public static final String BATCHING_MAX_MESSAGES = "pulsar.producer.batching.max.messages";
- public static ProducerConfiguration getProducerConfiguration(Properties properties) {
- ProducerConfiguration conf = new ProducerConfiguration();
+ public static ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client, Properties properties) {
+ ProducerBuilder<byte[]> producerBuilder = client.newProducer();
if (properties.containsKey(PRODUCER_NAME)) {
- conf.setProducerName(properties.getProperty(PRODUCER_NAME));
+ producerBuilder.producerName(properties.getProperty(PRODUCER_NAME));
}
if (properties.containsKey(INITIAL_SEQUENCE_ID)) {
- conf.setInitialSequenceId(Long.parseLong(properties.getProperty(INITIAL_SEQUENCE_ID)));
+ producerBuilder.initialSequenceId(Long.parseLong(properties.getProperty(INITIAL_SEQUENCE_ID)));
}
if (properties.containsKey(MAX_PENDING_MESSAGES)) {
- conf.setMaxPendingMessages(Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES)));
+ producerBuilder.maxPendingMessages(Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES)));
}
if (properties.containsKey(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)) {
- conf.setMaxPendingMessagesAcrossPartitions(
+ producerBuilder.maxPendingMessagesAcrossPartitions(
Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)));
}
- conf.setBatchingEnabled(Boolean.parseBoolean(properties.getProperty(BATCHING_ENABLED, "true")));
+ producerBuilder.enableBatching(Boolean.parseBoolean(properties.getProperty(BATCHING_ENABLED, "true")));
if (properties.containsKey(BATCHING_MAX_MESSAGES)) {
- conf.setBatchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES)));
+ producerBuilder.batchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES)));
}
- return conf;
+ return producerBuilder;
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index d9dcb1c..e4457df 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -100,7 +100,7 @@ public interface ConsumerBuilder<T> extends Serializable, Cloneable {
*
* @param topicsPattern
*/
- ConsumerBuilder topicsPattern(Pattern topicsPattern);
+ ConsumerBuilder<T> topicsPattern(Pattern topicsPattern);
/**
* Specify a pattern for topics that this consumer will subscribe on.
@@ -111,7 +111,7 @@ public interface ConsumerBuilder<T> extends Serializable, Cloneable {
* @param topicsPattern
* given regular expression for topics pattern
*/
- ConsumerBuilder topicsPattern(String topicsPattern);
+ ConsumerBuilder<T> topicsPattern(String topicsPattern);
/**
* Specify the subscription name for this consumer.
@@ -238,7 +238,7 @@ public interface ConsumerBuilder<T> extends Serializable, Cloneable {
* @param periodInMinutes
* whether to read from the compacted topic
*/
- ConsumerBuilder patternAutoDiscoveryPeriod(int periodInMinutes);
+ ConsumerBuilder<T> patternAutoDiscoveryPeriod(int periodInMinutes);
/**
* Sets priority level for the shared subscription consumers to which broker gives more priority while dispatching
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
index 7761b9d..c526af5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
@@ -95,7 +95,7 @@ public class ConsumerConfiguration implements Serializable {
/**
* @return the configured {@link MessageListener} for the consumer
*/
- public MessageListener getMessageListener() {
+ public MessageListener<byte[]> getMessageListener() {
return conf.getMessageListener();
}
@@ -108,7 +108,7 @@ public class ConsumerConfiguration implements Serializable {
* @param messageListener
* the listener object
*/
- public ConsumerConfiguration setMessageListener(MessageListener messageListener) {
+ public ConsumerConfiguration setMessageListener(MessageListener<byte[]> messageListener) {
checkNotNull(messageListener);
conf.setMessageListener(messageListener);
return this;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java
index aeb8bbb..e2e6274 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java
@@ -26,11 +26,11 @@ public interface ConsumerEventListener {
/**
* Notified when the consumer group is changed, and the consumer becomes the active consumer.
*/
- void becameActive(Consumer consumer, int partitionId);
+ void becameActive(Consumer<?> consumer, int partitionId);
/**
* Notified when the consumer group is changed, and the consumer is still inactive or becomes inactive.
*/
- void becameInactive(Consumer consumer, int partitionId);
+ void becameInactive(Consumer<?> consumer, int partitionId);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java
index a9cef6f..bc2b915 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java
@@ -30,7 +30,7 @@ public interface MessageRouter extends Serializable {
* @deprecated since 1.22.0. Please use {@link #choosePartition(Message, TopicMetadata)} instead.
*/
@Deprecated
- default int choosePartition(Message msg) {
+ default int choosePartition(Message<?> msg) {
throw new UnsupportedOperationException("Use #choosePartition(Message, TopicMetadata) instead");
}
@@ -42,7 +42,7 @@ public interface MessageRouter extends Serializable {
* @return the partition to route the message.
* @since 1.22.0
*/
- default int choosePartition(Message msg, TopicMetadata metadata) {
+ default int choosePartition(Message<?> msg, TopicMetadata metadata) {
return choosePartition(msg);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index a037cfe..16dcd00 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -262,7 +262,7 @@ public interface ProducerBuilder<T> extends Serializable, Cloneable {
* maximum number of messages in a batch
* @return
*/
- ProducerBuilder batchingMaxMessages(int batchMessagesMaxMessagesPerBatch);
+ ProducerBuilder<T> batchingMaxMessages(int batchMessagesMaxMessagesPerBatch);
/**
* Set the baseline for the sequence ids for messages published by the producer.
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
index c81b2eb..f40a8a4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
@@ -99,7 +99,7 @@ public interface ReaderBuilder<T> extends Serializable, Cloneable {
* @param readerListener
* the listener object
*/
- ReaderBuilder<T> readerListener(ReaderListener readerListener);
+ ReaderBuilder<T> readerListener(ReaderListener<T> readerListener);
/**
* Sets a {@link CryptoKeyReader}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
index d329d09..a97e524 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
@@ -53,7 +53,7 @@ class BatchMessageContainer {
// sequence id for this batch which will be persisted as a single entry by broker
long sequenceId = -1;
ByteBuf batchedMessageMetadataAndPayload;
- List<MessageImpl> messages = Lists.newArrayList();
+ List<MessageImpl<?>> messages = Lists.newArrayList();
// keep track of callbacks for individual messages being published in a batch
SendCallback firstCallback;
@@ -73,13 +73,13 @@ class BatchMessageContainer {
this.producerName = producerName;
}
- boolean hasSpaceInBatch(MessageImpl msg) {
+ boolean hasSpaceInBatch(MessageImpl<?> msg) {
int messageSize = msg.getDataBuffer().readableBytes();
return ((messageSize + currentBatchSizeBytes) <= MAX_MESSAGE_BATCH_SIZE_BYTES
&& numMessagesInBatch < maxNumMessagesInBatch);
}
- void add(MessageImpl msg, SendCallback callback) {
+ void add(MessageImpl<?> msg, SendCallback callback) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] add message to batch, num messages in batch so far {}", topicName, producerName,
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 4cffb7f..3effc7f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -154,4 +154,8 @@ public class ClientBuilderImpl implements ClientBuilder {
conf.setMaxNumberOfRejectedRequestPerConnection(maxNumberOfRejectedRequestPerConnection);
return this;
}
+
+ public ClientConfigurationData getClientConfigurationData() {
+ return conf;
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 6c2d758..37904fd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -86,8 +86,8 @@ public class ClientCnx extends PulsarHandler {
private final ConcurrentLongHashMap<CompletableFuture<List<String>>> pendingGetTopicsRequests = new ConcurrentLongHashMap<>(
16, 1);
- private final ConcurrentLongHashMap<ProducerImpl> producers = new ConcurrentLongHashMap<>(16, 1);
- private final ConcurrentLongHashMap<ConsumerImpl> consumers = new ConcurrentLongHashMap<>(16, 1);
+ private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new ConcurrentLongHashMap<>(16, 1);
+ private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = new ConcurrentLongHashMap<>(16, 1);
private final CompletableFuture<Void> connectionFuture = new CompletableFuture<Void>();
private final Semaphore pendingLookupRequestSemaphore;
@@ -253,7 +253,7 @@ public class ClientCnx extends PulsarHandler {
if (log.isDebugEnabled()) {
log.debug("{} Received a message from the server: {}", ctx.channel(), cmdMessage);
}
- ConsumerImpl consumer = consumers.get(cmdMessage.getConsumerId());
+ ConsumerImpl<?> consumer = consumers.get(cmdMessage.getConsumerId());
if (consumer != null) {
consumer.messageReceived(cmdMessage.getMessageId(), headersAndPayload, this);
}
@@ -266,7 +266,7 @@ public class ClientCnx extends PulsarHandler {
if (log.isDebugEnabled()) {
log.debug("{} Received a consumer group change message from the server : {}", ctx.channel(), change);
}
- ConsumerImpl consumer = consumers.get(change.getConsumerId());
+ ConsumerImpl<?> consumer = consumers.get(change.getConsumerId());
if (consumer != null) {
consumer.activeConsumerChanged(change.getIsActive());
}
@@ -398,7 +398,7 @@ public class ClientCnx extends PulsarHandler {
log.info("[{}] Broker notification reached the end of topic: {}", remoteAddress, consumerId);
- ConsumerImpl consumer = consumers.get(consumerId);
+ ConsumerImpl<?> consumer = consumers.get(consumerId);
if (consumer != null) {
consumer.setTerminated();
}
@@ -472,7 +472,7 @@ public class ClientCnx extends PulsarHandler {
protected void handleCloseProducer(CommandCloseProducer closeProducer) {
log.info("[{}] Broker notification of Closed producer: {}", remoteAddress, closeProducer.getProducerId());
final long producerId = closeProducer.getProducerId();
- ProducerImpl producer = producers.get(producerId);
+ ProducerImpl<?> producer = producers.get(producerId);
if (producer != null) {
producer.connectionClosed(this);
} else {
@@ -484,7 +484,7 @@ public class ClientCnx extends PulsarHandler {
protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
log.info("[{}] Broker notification of Closed consumer: {}", remoteAddress, closeConsumer.getConsumerId());
final long consumerId = closeConsumer.getConsumerId();
- ConsumerImpl consumer = consumers.get(consumerId);
+ ConsumerImpl<?> consumer = consumers.get(consumerId);
if (consumer != null) {
consumer.connectionClosed(this);
} else {
@@ -666,11 +666,11 @@ public class ClientCnx extends PulsarHandler {
return false;
}
- void registerConsumer(final long consumerId, final ConsumerImpl consumer) {
+ void registerConsumer(final long consumerId, final ConsumerImpl<?> consumer) {
consumers.put(consumerId, consumer);
}
- void registerProducer(final long producerId, final ProducerImpl producer) {
+ void registerProducer(final long producerId, final ProducerImpl<?> producer) {
producers.put(producerId, producer);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index af6b732..f51b1b3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -50,7 +50,7 @@ public abstract class ConsumerBase<T> extends HandlerBase implements Consumer<T>
}
protected final String subscription;
- protected final ConsumerConfigurationData conf;
+ protected final ConsumerConfigurationData<T> conf;
protected final String consumerName;
protected final CompletableFuture<Consumer<T>> subscribeFuture;
protected final MessageListener<T> listener;
@@ -168,7 +168,7 @@ public abstract class ConsumerBase<T> extends HandlerBase implements Consumer<T>
abstract protected Message<T> internalReceive(int timeout, TimeUnit unit) throws PulsarClientException;
@Override
- public void acknowledge(Message message) throws PulsarClientException {
+ public void acknowledge(Message<?> message) throws PulsarClientException {
try {
acknowledge(message.getMessageId());
} catch (NullPointerException npe) {
@@ -194,7 +194,7 @@ public abstract class ConsumerBase<T> extends HandlerBase implements Consumer<T>
}
@Override
- public void acknowledgeCumulative(Message message) throws PulsarClientException {
+ public void acknowledgeCumulative(Message<?> message) throws PulsarClientException {
try {
acknowledgeCumulative(message.getMessageId());
} catch (NullPointerException npe) {
@@ -220,7 +220,7 @@ public abstract class ConsumerBase<T> extends HandlerBase implements Consumer<T>
}
@Override
- public CompletableFuture<Void> acknowledgeAsync(Message message) {
+ public CompletableFuture<Void> acknowledgeAsync(Message<?> message) {
try {
return acknowledgeAsync(message.getMessageId());
} catch (NullPointerException npe) {
@@ -229,7 +229,7 @@ public abstract class ConsumerBase<T> extends HandlerBase implements Consumer<T>
}
@Override
- public CompletableFuture<Void> acknowledgeCumulativeAsync(Message message) {
+ public CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message) {
try {
return acknowledgeCumulativeAsync(message.getMessageId());
} catch (NullPointerException npe) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index e7163c9..d8507bb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -206,9 +206,8 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
}
@Override
- public ConsumerBuilder patternAutoDiscoveryPeriod(int periodInMinutes) {
+ public ConsumerBuilder<T> patternAutoDiscoveryPeriod(int periodInMinutes) {
conf.setPatternAutoDiscoveryPeriod(periodInMinutes);
return this;
}
-
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 8499565..183651a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -82,6 +82,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> {
// Number of messages that have delivered to the application. Every once in a while, this number will be sent to the
// broker to notify that we are ready to get (and store in the incoming messages queue) more messages
+ @SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<ConsumerImpl> AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(ConsumerImpl.class, "availablePermits");
@SuppressWarnings("unused")
@@ -288,7 +289,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> {
do {
message = incomingMessages.take();
lastDequeuedMessage = message.getMessageId();
- ClientCnx msgCnx = ((MessageImpl) message).getCnx();
+ ClientCnx msgCnx = ((MessageImpl<?>) message).getCnx();
// synchronized need to prevent race between connectionOpened and the check "msgCnx == cnx()"
synchronized (ConsumerImpl.this) {
// if message received due to an old flow - discard it and wait for the message from the
@@ -631,7 +632,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> {
* not seen by the application
*/
private BatchMessageIdImpl clearReceiverQueue() {
- List<Message> currentMessageQueue = new ArrayList<>(incomingMessages.size());
+ List<Message<?>> currentMessageQueue = new ArrayList<>(incomingMessages.size());
incomingMessages.drainTo(currentMessageQueue);
if (!currentMessageQueue.isEmpty()) {
MessageIdImpl nextMessageInQueue = (MessageIdImpl) currentMessageQueue.get(0).getMessageId();
@@ -984,9 +985,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> {
*
* Periodically, it sends a Flow command to notify the broker that it can push more messages
*/
- protected synchronized void messageProcessed(Message msg) {
+ protected synchronized void messageProcessed(Message<?> msg) {
ClientCnx currentCnx = cnx();
- ClientCnx msgCnx = ((MessageImpl) msg).getCnx();
+ ClientCnx msgCnx = ((MessageImpl<?>) msg).getCnx();
lastDequeuedMessage = msg.getMessageId();
if (msgCnx != currentCnx) {
@@ -1371,7 +1372,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> {
}
}
- private MessageIdImpl getMessageIdImpl(Message msg) {
+ private MessageIdImpl getMessageIdImpl(Message<?> msg) {
MessageIdImpl messageId = (MessageIdImpl) msg.getMessageId();
if (messageId instanceof BatchMessageIdImpl) {
// messageIds contain MessageIdImpl, not BatchMessageIdImpl
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java
index 3ef26c8..21ff3c4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java
@@ -41,7 +41,7 @@ public class ConsumerStats implements Serializable {
private static final long serialVersionUID = 1L;
private TimerTask stat;
private Timeout statTimeout;
- private ConsumerImpl consumer;
+ private ConsumerImpl<?> consumer;
private PulsarClientImpl pulsarClient;
private long oldTime;
private long statsIntervalSeconds;
@@ -74,7 +74,7 @@ public class ConsumerStats implements Serializable {
throughputFormat = null;
}
- public ConsumerStats(PulsarClientImpl pulsarClient, ConsumerConfigurationData conf, ConsumerImpl consumer) {
+ public ConsumerStats(PulsarClientImpl pulsarClient, ConsumerConfigurationData<?> conf, ConsumerImpl<?> consumer) {
this.pulsarClient = pulsarClient;
this.consumer = consumer;
this.statsIntervalSeconds = pulsarClient.getConfiguration().getStatsIntervalSeconds();
@@ -92,7 +92,7 @@ public class ConsumerStats implements Serializable {
init(conf);
}
- private void init(ConsumerConfigurationData conf) {
+ private void init(ConsumerConfigurationData<?> conf) {
ObjectMapper m = new ObjectMapper();
m.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
ObjectWriter w = m.writerWithDefaultPrettyPrinter();
@@ -149,7 +149,7 @@ public class ConsumerStats implements Serializable {
statTimeout = pulsarClient.timer().newTimeout(stat, statsIntervalSeconds, TimeUnit.SECONDS);
}
- void updateNumMsgsReceived(Message message) {
+ void updateNumMsgsReceived(Message<?> message) {
if (message != null) {
numMsgsReceived.increment();
numBytesReceived.add(message.getData().length);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
index fdcaf63..08b189c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
@@ -24,7 +24,7 @@ public class ConsumerStatsDisabled extends ConsumerStats {
private static final long serialVersionUID = 1L;
@Override
- void updateNumMsgsReceived(Message message) {
+ void updateNumMsgsReceived(Message<?> message) {
// Do nothing
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index f4585ce..8fcc40a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -56,7 +56,8 @@ public class MessageImpl<T> implements Message<T> {
// Constructor for out-going message
static <T> MessageImpl<T> create(MessageMetadata.Builder msgMetadataBuilder, ByteBuffer payload, Schema<T> schema) {
- MessageImpl<T> msg = RECYCLER.get();
+ @SuppressWarnings("unchecked")
+ MessageImpl<T> msg = (MessageImpl<T>) RECYCLER.get();
msg.msgMetadataBuilder = msgMetadataBuilder;
msg.messageId = null;
msg.cnx = null;
@@ -67,7 +68,8 @@ public class MessageImpl<T> implements Message<T> {
}
static MessageImpl<byte[]> create(MessageMetadata.Builder msgMetadataBuilder, ByteBuffer payload) {
- MessageImpl<byte[]> msg = RECYCLER.get();
+ @SuppressWarnings("unchecked")
+ MessageImpl<byte[]> msg = (MessageImpl<byte[]>) RECYCLER.get();
msg.msgMetadataBuilder = msgMetadataBuilder;
msg.messageId = null;
msg.cnx = null;
@@ -141,8 +143,9 @@ public class MessageImpl<T> implements Message<T> {
this.properties = Collections.unmodifiableMap(properties);
}
- public static MessageImpl deserialize(ByteBuf headersAndPayload) throws IOException {
- MessageImpl msg = RECYCLER.get();
+ public static MessageImpl<byte[]> deserialize(ByteBuf headersAndPayload) throws IOException {
+ @SuppressWarnings("unchecked")
+ MessageImpl<byte[]> msg = (MessageImpl<byte[]>) RECYCLER.get();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
msg.msgMetadataBuilder = MessageMetadata.newBuilder(msgMetadata);
@@ -287,16 +290,16 @@ public class MessageImpl<T> implements Message<T> {
}
}
- private MessageImpl(Handle<MessageImpl> recyclerHandle) {
+ private MessageImpl(Handle<MessageImpl<?>> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
- private Handle<MessageImpl> recyclerHandle;
+ private Handle<MessageImpl<?>> recyclerHandle;
- private final static Recycler<MessageImpl> RECYCLER = new Recycler<MessageImpl>() {
+ private final static Recycler<MessageImpl<?>> RECYCLER = new Recycler<MessageImpl<?>>() {
@Override
- protected MessageImpl newObject(Handle<MessageImpl> handle) {
- return new MessageImpl(handle);
+ protected MessageImpl<?> newObject(Handle<MessageImpl<?>> handle) {
+ return new MessageImpl<>(handle);
}
};
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index f662ccf..df6894c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
-import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -34,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
+
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -47,13 +47,15 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Lists;
+
public class PartitionedConsumerImpl<T> extends ConsumerBase<T> {
private final List<ConsumerImpl<T>> consumers;
// Queue of partition consumers on which we have stopped calling receiveAsync() because the
// shared incoming queue was full
- private final ConcurrentLinkedQueue<ConsumerImpl> pausedConsumers;
+ private final ConcurrentLinkedQueue<ConsumerImpl<T>> pausedConsumers;
// Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to
// resume receiving from the paused consumer partitions
@@ -165,7 +167,7 @@ public class PartitionedConsumerImpl<T> extends ConsumerBase<T> {
try {
if (incomingMessages.size() <= sharedQueueResumeThreshold && !pausedConsumers.isEmpty()) {
while (true) {
- ConsumerImpl consumer = pausedConsumers.poll();
+ ConsumerImpl<T> consumer = pausedConsumers.poll();
if (consumer == null) {
break;
}
@@ -360,12 +362,7 @@ public class PartitionedConsumerImpl<T> extends ConsumerBase<T> {
@Override
public boolean isConnected() {
- for (ConsumerImpl consumer : consumers) {
- if (!consumer.isConnected()) {
- return false;
- }
- }
- return true;
+ return consumers.stream().allMatch(ConsumerImpl::isConnected);
}
@Override
@@ -442,6 +439,7 @@ public class PartitionedConsumerImpl<T> extends ConsumerBase<T> {
if (null != conf.getConsumerEventListener()) {
internalConsumerConfig.setConsumerEventListener(conf.getConsumerEventListener());
}
+
int receiverQueueSize = Math.min(conf.getReceiverQueueSize(),
conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions);
internalConsumerConfig.setReceiverQueueSize(receiverQueueSize);
@@ -460,7 +458,7 @@ public class PartitionedConsumerImpl<T> extends ConsumerBase<T> {
@Override
public void redeliverUnacknowledgedMessages() {
synchronized (this) {
- for (ConsumerImpl c : consumers) {
+ for (ConsumerImpl<T> c : consumers) {
c.redeliverUnacknowledgedMessages();
}
incomingMessages.clear();
@@ -509,11 +507,7 @@ public class PartitionedConsumerImpl<T> extends ConsumerBase<T> {
* @return true if all batch messages have been acknowledged
*/
public boolean isBatchingAckTrackerEmpty() {
- boolean state = true;
- for (Consumer consumer : consumers) {
- state &= ((ConsumerImpl) consumer).isBatchingAckTrackerEmpty();
- }
- return state;
+ return consumers.stream().allMatch(ConsumerImpl::isBatchingAckTrackerEmpty);
}
List<ConsumerImpl<T>> getConsumers() {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index e70ce4b..5281d51 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -161,14 +161,8 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
@Override
public boolean isConnected() {
- for (ProducerImpl producer : producers) {
- // returns false if any of the partition is not connected
- if (!producer.isConnected()) {
- return false;
- }
- }
-
- return true;
+ // returns false if any of the partition is not connected
+ return producers.stream().allMatch(ProducerImpl::isConnected);
}
@Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index b55f279..15779cd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -104,6 +104,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask {
private final Map<String, String> metadata;
+ @SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater
.newUpdater(ProducerImpl.class, "msgIdGenerator");
@@ -184,7 +185,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask {
}
@Override
- public CompletableFuture<MessageId> sendAsync(Message message) {
+ public CompletableFuture<MessageId> sendAsync(Message<T> message) {
CompletableFuture<MessageId> future = new CompletableFuture<>();
sendAsync(message, new SendCallback() {
@@ -232,7 +233,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask {
return future;
}
- public void sendAsync(Message message, SendCallback callback) {
+ public void sendAsync(Message<T> message, SendCallback callback) {
checkArgument(message instanceof MessageImpl);
if (!isValidProducerState(callback)) {
@@ -243,7 +244,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask {
return;
}
- MessageImpl msg = (MessageImpl) message;
+ MessageImpl<T> msg = (MessageImpl<T>) message;
MessageMetadata.Builder msgMetadata = msg.getMessageBuilder();
ByteBuf payload = msg.getDataBuffer();
@@ -387,7 +388,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask {
return Commands.newSend(producerId, sequenceId, numMessages, checksumType, msgMetadata, compressedPayload);
}
- private void doBatchSendAndAdd(MessageImpl msg, SendCallback callback, ByteBuf payload) {
+ private void doBatchSendAndAdd(MessageImpl<T> msg, SendCallback callback, ByteBuf payload) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Closing out batch to accomodate large message with size {}", topic, producerName,
msg.getDataBuffer().readableBytes());
@@ -440,12 +441,12 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask {
}
private static final class WriteInEventLoopCallback implements Runnable {
- private ProducerImpl producer;
+ private ProducerImpl<?> producer;
private ByteBufPair cmd;
private long sequenceId;
private ClientCnx cnx;
- static WriteInEventLoopCallback create(ProducerImpl producer, ClientCnx cnx, OpSendMsg op) {
+ static WriteInEventLoopCallback create(ProducerImpl<?> producer, ClientCnx cnx, OpSendMsg op) {
WriteInEventLoopCallback c = RECYCLER.get();
c.producer = producer;
c.cnx = cnx;
@@ -735,8 +736,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask {
}
protected static final class OpSendMsg {
- MessageImpl msg;
- List<MessageImpl> msgs;
+ MessageImpl<?> msg;
+ List<MessageImpl<?>> msgs;
ByteBufPair cmd;
SendCallback callback;
long sequenceId;
@@ -744,7 +745,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask {
long batchSizeByte = 0;
int numMessagesInBatch = 1;
- static OpSendMsg create(MessageImpl msg, ByteBufPair cmd, long sequenceId, SendCallback callback) {
+ static OpSendMsg create(MessageImpl<?> msg, ByteBufPair cmd, long sequenceId, SendCallback callback) {
OpSendMsg op = RECYCLER.get();
op.msg = msg;
op.cmd = cmd;
@@ -754,7 +755,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask {
return op;
}
- static OpSendMsg create(List<MessageImpl> msgs, ByteBufPair cmd, long sequenceId, SendCallback callback) {
+ static OpSendMsg create(List<MessageImpl<?>> msgs, ByteBufPair cmd, long sequenceId, SendCallback callback) {
OpSendMsg op = RECYCLER.get();
op.msgs = msgs;
op.cmd = cmd;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java
index 6a5e321..2628003 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java
@@ -41,7 +41,7 @@ public class ProducerStats implements Serializable {
private static final long serialVersionUID = 1L;
private TimerTask stat;
private Timeout statTimeout;
- private ProducerImpl producer;
+ private ProducerImpl<?> producer;
private PulsarClientImpl pulsarClient;
private long oldTime;
private long statsIntervalSeconds;
@@ -74,7 +74,7 @@ public class ProducerStats implements Serializable {
ds = null;
}
- public ProducerStats(PulsarClientImpl pulsarClient, ProducerConfigurationData conf, ProducerImpl producer) {
+ public ProducerStats(PulsarClientImpl pulsarClient, ProducerConfigurationData conf, ProducerImpl<?> producer) {
this.pulsarClient = pulsarClient;
this.statsIntervalSeconds = pulsarClient.getConfiguration().getStatsIntervalSeconds();
this.producer = producer;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 1741eeb..47e953b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.client.impl;
-import static com.google.common.base.Preconditions.checkState;
import static org.apache.commons.lang3.StringUtils.isBlank;
import com.google.common.collect.Lists;
@@ -256,7 +255,7 @@ public class PulsarClientImpl implements PulsarClient {
log.debug("[{}] Received topic metadata. partitions: {}", topic, metadata.partitions);
}
- ProducerBase producer;
+ ProducerBase<T> producer;
if (metadata.partitions > 1) {
producer = new PartitionedProducerImpl<>(PulsarClientImpl.this, topic, conf, metadata.partitions,
producerCreatedFuture, schema);
@@ -437,7 +436,7 @@ public class PulsarClientImpl implements PulsarClient {
List<String> topicsList = topicsPatternFilter(topics, conf.getTopicsPattern());
conf.getTopicNames().addAll(topicsList);
- ConsumerBase consumer = new PatternTopicsConsumerImpl<>(conf.getTopicsPattern(),
+ ConsumerBase<T> consumer = new PatternTopicsConsumerImpl<>(conf.getTopicsPattern(),
PulsarClientImpl.this,
conf,
externalExecutorProvider.getExecutor(),
@@ -587,12 +586,12 @@ public class PulsarClientImpl implements PulsarClient {
synchronized (producers) {
// Copy to a new list, because the closing will trigger a removal from the map
// and invalidate the iterator
- List<ProducerBase> producersToClose = Lists.newArrayList(producers.keySet());
+ List<ProducerBase<?>> producersToClose = Lists.newArrayList(producers.keySet());
producersToClose.forEach(p -> futures.add(p.closeAsync()));
}
synchronized (consumers) {
- List<ConsumerBase> consumersToClose = Lists.newArrayList(consumers.keySet());
+ List<ConsumerBase<?>> consumersToClose = Lists.newArrayList(consumers.keySet());
consumersToClose.forEach(c -> futures.add(c.closeAsync()));
}
@@ -688,13 +687,13 @@ public class PulsarClientImpl implements PulsarClient {
return EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
}
- void cleanupProducer(ProducerBase producer) {
+ void cleanupProducer(ProducerBase<?> producer) {
synchronized (producers) {
producers.remove(producer);
}
}
- void cleanupConsumer(ConsumerBase consumer) {
+ void cleanupConsumer(ConsumerBase<?> consumer) {
synchronized (consumers) {
consumers.remove(consumer);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index bef909d..4ab5f42 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -43,16 +43,17 @@ public class ReaderBuilderImpl<T> implements ReaderBuilder<T> {
private final Schema<T> schema;
ReaderBuilderImpl(PulsarClientImpl client, Schema<T> schema) {
- this(client, new ReaderConfigurationData(), schema);
+ this(client, new ReaderConfigurationData<T>(), schema);
}
- private ReaderBuilderImpl(PulsarClientImpl client, ReaderConfigurationData conf, Schema<T> schema) {
+ private ReaderBuilderImpl(PulsarClientImpl client, ReaderConfigurationData<T> conf, Schema<T> schema) {
this.client = client;
this.conf = conf;
this.schema = schema;
}
@Override
+ @SuppressWarnings("unchecked")
public ReaderBuilder<T> clone() {
try {
return (ReaderBuilder<T>) super.clone();
@@ -106,7 +107,7 @@ public class ReaderBuilderImpl<T> implements ReaderBuilder<T> {
}
@Override
- public ReaderBuilder<T> readerListener(ReaderListener readerListener) {
+ public ReaderBuilder<T> readerListener(ReaderListener<T> readerListener) {
conf.setReaderListener(readerListener);
return this;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index daf4799..ed374f6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -90,7 +90,7 @@ public class ReaderImpl<T> implements Reader<T> {
return consumer.getTopic();
}
- public ConsumerImpl getConsumer() {
+ public ConsumerImpl<T> getConsumer() {
return consumer;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
index 6b3f937..ce67f5b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
@@ -39,7 +39,7 @@ public class RoundRobinPartitionMessageRouterImpl extends MessageRouterBase {
}
@Override
- public int choosePartition(Message msg, TopicMetadata topicMetadata) {
+ public int choosePartition(Message<?> msg, TopicMetadata topicMetadata) {
// If the message has a key, it supersedes the round robin routing policy
if (msg.hasKey()) {
return hash.makeHash(msg.getKey()) % topicMetadata.numPartitions();
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index d95d83c..4337442 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -31,9 +31,10 @@ import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService;
@@ -95,19 +96,20 @@ public class ProxyService implements Closeable {
this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1, acceptorThreadFactory);
this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, workersThreadFactory);
- ClientConfiguration clientConfiguration = new ClientConfiguration();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl(serviceUrl);
if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
- clientConfiguration.setAuthentication(proxyConfig.getBrokerClientAuthenticationPlugin(),
- proxyConfig.getBrokerClientAuthenticationParameters());
+ clientConf.setAuthentication(AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters()));
}
if (proxyConfig.isTlsEnabledWithBroker()) {
- clientConfiguration.setUseTls(true);
- clientConfiguration.setTlsTrustCertsFilePath(proxyConfig.getBrokerClientTrustCertsFilePath());
- clientConfiguration.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection());
+ clientConf.setUseTls(true);
+ clientConf.setTlsTrustCertsFilePath(proxyConfig.getBrokerClientTrustCertsFilePath());
+ clientConf.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection());
}
- this.client = new PulsarClientImpl(serviceUrl, clientConfiguration, workerGroup);
- this.clientAuthentication = clientConfiguration.getAuthentication();
+ this.client = new PulsarClientImpl(clientConf, workerGroup);
+ this.clientAuthentication = clientConf.getAuthentication();
}
public void start() throws Exception {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
index d4b8e4d..f98c48d 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
@@ -25,11 +25,9 @@ import java.util.Set;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.apache.pulsar.proxy.server.ProxyRolesEnforcementTest.BasicAuthentication;
@@ -142,8 +140,7 @@ public class ProxyForwardAuthDataTest extends ProducerConsumerBase {
}
private PulsarClient createPulsarClient(String proxyServiceUrl, String authParams) throws PulsarClientException {
- org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
- clientConf.setAuthentication(BasicAuthentication.class.getName(), authParams);
- return PulsarClient.create(proxyServiceUrl, clientConf);
+ return PulsarClient.builder().serviceUrl(proxyServiceUrl)
+ .authentication(BasicAuthentication.class.getName(), authParams).build();
}
}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
index 3c43611..4291fa8 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
@@ -33,7 +33,7 @@ public interface MessageToValuesMapper extends Serializable {
* @param msg
* @return
*/
- public Values toValues(Message msg);
+ public Values toValues(Message<byte[]> msg);
/**
* Declare the output schema for the spout.
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
index a293fd6..0aa1ee3 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
@@ -18,29 +18,34 @@
*/
package org.apache.pulsar.storm;
+import static java.lang.String.format;
+
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
-import org.apache.storm.utils.TupleUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
-
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
-import static java.lang.String.format;
+import org.apache.storm.utils.TupleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+@SuppressWarnings("deprecation")
public class PulsarBolt extends BaseRichBolt implements IMetric {
/**
*
@@ -52,8 +57,8 @@ public class PulsarBolt extends BaseRichBolt implements IMetric {
public static final String PRODUCER_RATE = "producerRate";
public static final String PRODUCER_THROUGHPUT_BYTES = "producerThroughput";
- private final ClientConfiguration clientConf;
- private final ProducerConfiguration producerConf;
+ private final ClientConfigurationData clientConf;
+ private final ProducerConfigurationData producerConf;
private final PulsarBoltConfiguration pulsarBoltConf;
private final ConcurrentMap<String, Object> metricsMap = Maps.newConcurrentMap();
@@ -65,17 +70,39 @@ public class PulsarBolt extends BaseRichBolt implements IMetric {
private volatile long messagesSent = 0;
private volatile long messageSizeSent = 0;
+ public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientBuilder clientBuilder) {
+ this.clientConf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone();
+ this.producerConf = new ProducerConfigurationData();
+ Preconditions.checkNotNull(pulsarBoltConf.getServiceUrl());
+ Preconditions.checkNotNull(pulsarBoltConf.getTopic());
+ Preconditions.checkNotNull(pulsarBoltConf.getTupleToMessageMapper());
+
+ this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl());
+ this.producerConf.setTopicName(pulsarBoltConf.getTopic());
+ this.pulsarBoltConf = pulsarBoltConf;
+ }
+
+ /**
+ * @deprecated Use {@link #PulsarBolt(PulsarBoltConfiguration, ClientBuilder)}
+ */
+ @Deprecated
public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientConfiguration clientConf) {
this(pulsarBoltConf, clientConf, new ProducerConfiguration());
}
+ /**
+ * @deprecated Use {@link #PulsarBolt(PulsarBoltConfiguration, ClientBuilder)}
+ */
+ @Deprecated
public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientConfiguration clientConf,
ProducerConfiguration producerConf) {
- this.clientConf = clientConf;
- this.producerConf = producerConf;
+ this.clientConf = clientConf.getConfigurationData().clone();
+ this.producerConf = producerConf.getProducerConfigurationData().clone();
Preconditions.checkNotNull(pulsarBoltConf.getServiceUrl());
Preconditions.checkNotNull(pulsarBoltConf.getTopic());
Preconditions.checkNotNull(pulsarBoltConf.getTupleToMessageMapper());
+ this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl());
+ this.producerConf.setTopicName(pulsarBoltConf.getTopic());
this.pulsarBoltConf = pulsarBoltConf;
}
@@ -86,8 +113,8 @@ public class PulsarBolt extends BaseRichBolt implements IMetric {
this.boltId = String.format("%s-%s", componentId, context.getThisTaskId());
this.collector = collector;
try {
- sharedPulsarClient = SharedPulsarClient.get(componentId, pulsarBoltConf.getServiceUrl(), clientConf);
- producer = sharedPulsarClient.getSharedProducer(pulsarBoltConf.getTopic(), producerConf);
+ sharedPulsarClient = SharedPulsarClient.get(componentId, clientConf);
+ producer = sharedPulsarClient.getSharedProducer(producerConf);
LOG.info("[{}] Created a pulsar producer on topic {} to send messages", boltId, pulsarBoltConf.getTopic());
} catch (PulsarClientException e) {
LOG.error("[{}] Error initializing pulsar producer on topic {}", boltId, pulsarBoltConf.getTopic(), e);
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index 639adb9..af26035 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -22,15 +22,11 @@ import static java.lang.String.format;
import java.util.Map;
import java.util.Queue;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
@@ -38,15 +34,25 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
-
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.shade.com.google.common.collect.Sets;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+
+@SuppressWarnings("deprecation")
public class PulsarSpout extends BaseRichSpout implements IMetric {
private static final long serialVersionUID = 1L;
@@ -59,37 +65,61 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
public static final String CONSUMER_RATE = "consumerRate";
public static final String CONSUMER_THROUGHPUT_BYTES = "consumerThroughput";
- private final ClientConfiguration clientConf;
- private final ConsumerConfiguration consumerConf;
+ private final ClientConfigurationData clientConf;
+ private final ConsumerConfigurationData<byte[]> consumerConf;
private final PulsarSpoutConfiguration pulsarSpoutConf;
private final long failedRetriesTimeoutNano;
private final int maxFailedRetries;
private final ConcurrentMap<MessageId, MessageRetries> pendingMessageRetries = Maps.newConcurrentMap();
- private final Queue<Message> failedMessages = Queues.newConcurrentLinkedQueue();
+ private final Queue<Message<byte[]>> failedMessages = Queues.newConcurrentLinkedQueue();
private final ConcurrentMap<String, Object> metricsMap = Maps.newConcurrentMap();
private SharedPulsarClient sharedPulsarClient;
private String componentId;
private String spoutId;
private SpoutOutputCollector collector;
- private Consumer consumer;
+ private Consumer<byte[]> consumer;
private volatile long messagesReceived = 0;
private volatile long messagesEmitted = 0;
private volatile long pendingAcks = 0;
private volatile long messageSizeReceived = 0;
+ public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientBuilder clientBuilder) {
+ Preconditions.checkNotNull(pulsarSpoutConf.getServiceUrl());
+ Preconditions.checkNotNull(pulsarSpoutConf.getTopic());
+ Preconditions.checkNotNull(pulsarSpoutConf.getSubscriptionName());
+ Preconditions.checkNotNull(pulsarSpoutConf.getMessageToValuesMapper());
+
+ this.clientConf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone();
+ this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl());
+ this.consumerConf = new ConsumerConfigurationData<>();
+ this.consumerConf.setTopicNames(Sets.newHashSet(pulsarSpoutConf.getTopic()));
+ this.consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName());
+
+ this.pulsarSpoutConf = pulsarSpoutConf;
+ this.failedRetriesTimeoutNano = pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.NANOSECONDS);
+ this.maxFailedRetries = pulsarSpoutConf.getMaxFailedRetries();
+ }
+
+ @Deprecated
public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientConfiguration clientConf) {
this(pulsarSpoutConf, clientConf, new ConsumerConfiguration());
}
+ @Deprecated
public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientConfiguration clientConf,
ConsumerConfiguration consumerConf) {
- this.clientConf = clientConf;
- this.consumerConf = consumerConf;
+ this.clientConf = clientConf.getConfigurationData().clone();
+ this.consumerConf = consumerConf.getConfigurationData().clone();
Preconditions.checkNotNull(pulsarSpoutConf.getServiceUrl());
Preconditions.checkNotNull(pulsarSpoutConf.getTopic());
Preconditions.checkNotNull(pulsarSpoutConf.getSubscriptionName());
Preconditions.checkNotNull(pulsarSpoutConf.getMessageToValuesMapper());
+
+ this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl());
+ this.consumerConf.setTopicNames(Sets.newHashSet(pulsarSpoutConf.getTopic()));
+ this.consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName());
+
this.pulsarSpoutConf = pulsarSpoutConf;
this.failedRetriesTimeoutNano = pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.NANOSECONDS);
this.maxFailedRetries = pulsarSpoutConf.getMaxFailedRetries();
@@ -115,7 +145,7 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
@Override
public void ack(Object msgId) {
if (msgId instanceof Message) {
- Message msg = (Message) msgId;
+ Message<?> msg = (Message<?>) msgId;
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Received ack for message {}", spoutId, msg.getMessageId());
}
@@ -128,7 +158,8 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
@Override
public void fail(Object msgId) {
if (msgId instanceof Message) {
- Message msg = (Message) msgId;
+ @SuppressWarnings("unchecked")
+ Message<byte[]> msg = (Message<byte[]>) msgId;
MessageId id = msg.getMessageId();
LOG.warn("[{}] Error processing message {}", spoutId, id);
@@ -160,7 +191,7 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
public void nextTuple() {
emitNextAvailableTuple();
}
-
+
/**
* It makes sure that it emits next available non-tuple to topology unless consumer queue doesn't have any message
* available. It receives message from consumer queue and converts it to tuple and emits to topology. if the
@@ -168,7 +199,7 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
* emit.
*/
public void emitNextAvailableTuple() {
- Message msg;
+ Message<byte[]> msg;
// check if there are any failed messages to re-emit in the topology
msg = failedMessages.peek();
@@ -219,13 +250,15 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
pendingMessageRetries.clear();
failedMessages.clear();
try {
- sharedPulsarClient = SharedPulsarClient.get(componentId, pulsarSpoutConf.getServiceUrl(), clientConf);
+ sharedPulsarClient = SharedPulsarClient.get(componentId, clientConf);
if (pulsarSpoutConf.isSharedConsumerEnabled()) {
- consumer = sharedPulsarClient.getSharedConsumer(pulsarSpoutConf.getTopic(),
- pulsarSpoutConf.getSubscriptionName(), consumerConf);
+ consumer = sharedPulsarClient.getSharedConsumer(consumerConf);
} else {
- consumer = sharedPulsarClient.getClient().subscribe(pulsarSpoutConf.getTopic(),
- pulsarSpoutConf.getSubscriptionName(), consumerConf);
+ try {
+ consumer = sharedPulsarClient.getClient().subscribeAsync(consumerConf).join();
+ } catch (CompletionException e) {
+ throw (PulsarClientException) e.getCause();
+ }
}
LOG.info("[{}] Created a pulsar consumer on topic {} to receive messages with subscription {}", spoutId,
pulsarSpoutConf.getTopic(), pulsarSpoutConf.getSubscriptionName());
@@ -244,7 +277,7 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
}
- private boolean mapToValueAndEmit(Message msg) {
+ private boolean mapToValueAndEmit(Message<byte[]> msg) {
if (msg != null) {
Values values = pulsarSpoutConf.getMessageToValuesMapper().toValues(msg);
++pendingAcks;
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
index 8674077..4506e11 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
@@ -18,36 +18,37 @@
*/
package org.apache.pulsar.storm;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
-import org.apache.pulsar.client.api.ClientConfiguration;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
public class SharedPulsarClient {
private static final Logger LOG = LoggerFactory.getLogger(SharedPulsarClient.class);
private static final ConcurrentMap<String, SharedPulsarClient> instances = Maps.newConcurrentMap();
private final String componentId;
- private final PulsarClient client;
+ private final PulsarClientImpl client;
private final AtomicInteger counter = new AtomicInteger();
- private Consumer consumer;
- private Producer producer;
+ private Consumer<byte[]> consumer;
+ private Producer<byte[]> producer;
- private SharedPulsarClient(String componentId, String serviceUrl, ClientConfiguration clientConf)
+ private SharedPulsarClient(String componentId, ClientConfigurationData clientConf)
throws PulsarClientException {
- this.client = PulsarClient.create(serviceUrl, clientConf);
+ this.client = new PulsarClientImpl(clientConf);
this.componentId = componentId;
}
@@ -62,13 +63,13 @@ public class SharedPulsarClient {
* @return
* @throws PulsarClientException
*/
- public static SharedPulsarClient get(String componentId, String serviceUrl, ClientConfiguration clientConf)
+ public static SharedPulsarClient get(String componentId, ClientConfigurationData clientConf)
throws PulsarClientException {
AtomicReference<PulsarClientException> exception = new AtomicReference<PulsarClientException>();
instances.computeIfAbsent(componentId, pulsarClient -> {
SharedPulsarClient sharedPulsarClient = null;
try {
- sharedPulsarClient = new SharedPulsarClient(componentId, serviceUrl, clientConf);
+ sharedPulsarClient = new SharedPulsarClient(componentId, clientConf);
LOG.info("[{}] Created a new Pulsar Client.", componentId);
} catch (PulsarClientException e) {
exception.set(e);
@@ -81,33 +82,41 @@ public class SharedPulsarClient {
return instances.get(componentId);
}
- public PulsarClient getClient() {
+ public PulsarClientImpl getClient() {
counter.incrementAndGet();
return client;
}
- public Consumer getSharedConsumer(String topic, String subscription, ConsumerConfiguration consumerConf)
+ public Consumer<byte[]> getSharedConsumer(ConsumerConfigurationData<byte[]> consumerConf)
throws PulsarClientException {
counter.incrementAndGet();
synchronized (this) {
if (consumer == null) {
- consumer = client.subscribe(topic, subscription, consumerConf);
- LOG.info("[{}] Created a new Pulsar Consumer on {}", componentId, topic);
+ try {
+ consumer = client.subscribeAsync(consumerConf).join();
+ } catch (CompletionException e) {
+ throw (PulsarClientException) e.getCause();
+ }
+ LOG.info("[{}] Created a new Pulsar Consumer on {}", componentId, consumerConf.getSingleTopic());
} else {
- LOG.info("[{}] Using a shared consumer on {}", componentId, topic);
+ LOG.info("[{}] Using a shared consumer on {}", componentId, consumerConf.getSingleTopic());
}
}
return consumer;
}
- public Producer getSharedProducer(String topic, ProducerConfiguration producerConf) throws PulsarClientException {
+ public Producer<byte[]> getSharedProducer(ProducerConfigurationData producerConf) throws PulsarClientException {
counter.incrementAndGet();
synchronized (this) {
if (producer == null) {
- producer = client.createProducer(topic, producerConf);
- LOG.info("[{}] Created a new Pulsar Producer on {}", componentId, topic);
+ try {
+ producer = client.createProducerAsync(producerConf).join();
+ } catch (CompletionException e) {
+ throw (PulsarClientException) e.getCause();
+ }
+ LOG.info("[{}] Created a new Pulsar Producer on {}", componentId, producerConf.getTopicName());
} else {
- LOG.info("[{}] Using a shared producer on {}", componentId, topic);
+ LOG.info("[{}] Using a shared producer on {}", componentId, producerConf.getTopicName());
}
}
return producer;
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index f07eeba..e508412 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -18,8 +18,6 @@
*/
package org.apache.pulsar.testclient;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -36,16 +34,14 @@ import java.util.concurrent.atomic.LongAdder;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.Recorder;
-import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.EncryptionKeyInfo;
-import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -197,33 +193,33 @@ public class PerformanceConsumer {
final RateLimiter limiter = arguments.rate > 0 ? RateLimiter.create(arguments.rate) : null;
- MessageListener listener = new MessageListener() {
- public void received(Consumer consumer, Message msg) {
- messagesReceived.increment();
- bytesReceived.add(msg.getData().length);
+ MessageListener<byte[]> listener = (consumer, msg) -> {
+ messagesReceived.increment();
+ bytesReceived.add(msg.getData().length);
- if (limiter != null) {
- limiter.acquire();
- }
+ if (limiter != null) {
+ limiter.acquire();
+ }
- long latencyMillis = System.currentTimeMillis() - msg.getPublishTime();
- recorder.recordValue(latencyMillis);
- cumulativeRecorder.recordValue(latencyMillis);
+ long latencyMillis = System.currentTimeMillis() - msg.getPublishTime();
+ recorder.recordValue(latencyMillis);
+ cumulativeRecorder.recordValue(latencyMillis);
- consumer.acknowledgeAsync(msg);
- }
+ consumer.acknowledgeAsync(msg);
};
- ClientConfiguration clientConf = new ClientConfiguration();
- clientConf.setConnectionsPerBroker(arguments.maxConnections);
- clientConf.setStatsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS);
- clientConf.setIoThreads(Runtime.getRuntime().availableProcessors());
+ ClientBuilder clientBuilder = PulsarClient.builder() //
+ .serviceUrl(arguments.serviceURL) //
+ .connectionsPerBroker(arguments.maxConnections) //
+ .statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) //
+ .ioThreads(Runtime.getRuntime().availableProcessors()) //
+ .enableTls(arguments.useTls) //
+ .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
if (isNotBlank(arguments.authPluginClassName)) {
- clientConf.setAuthentication(arguments.authPluginClassName, arguments.authParams);
+ clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
}
- clientConf.setUseTls(arguments.useTls);
- clientConf.setTlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
- PulsarClient pulsarClient = new PulsarClientImpl(arguments.serviceURL, clientConf);
+
+ PulsarClient pulsarClient = clientBuilder.build();
class EncKeyReader implements CryptoKeyReader {
@@ -246,16 +242,17 @@ public class PerformanceConsumer {
return null;
}
}
+
List<Future<Consumer<byte[]>>> futures = Lists.newArrayList();
- ConsumerConfiguration consumerConfig = new ConsumerConfiguration();
- consumerConfig.setMessageListener(listener);
- consumerConfig.setReceiverQueueSize(arguments.receiverQueueSize);
- consumerConfig.setSubscriptionType(arguments.subscriptionType);
+ ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer() //
+ .messageListener(listener) //
+ .receiverQueueSize(arguments.receiverQueueSize) //
+ .subscriptionType(arguments.subscriptionType);
if (arguments.encKeyName != null) {
byte[] pKey = Files.readAllBytes(Paths.get(arguments.encKeyFile));
EncKeyReader keyReader = new EncKeyReader(pKey);
- consumerConfig.setCryptoKeyReader(keyReader);
+ consumerBuilder.cryptoKeyReader(keyReader);
}
for (int i = 0; i < arguments.numTopics; i++) {
@@ -271,7 +268,8 @@ public class PerformanceConsumer {
subscriberName = arguments.subscriberName;
}
- futures.add(pulsarClient.subscribeAsync(topicName.toString(), subscriberName, consumerConfig));
+ futures.add(consumerBuilder.clone().topic(topicName.toString()).subscriptionName(subscriberName)
+ .subscribeAsync());
}
}
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 681eb3d..41dfa98 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -42,16 +42,14 @@ import java.util.concurrent.atomic.LongAdder;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.HistogramLogWriter;
import org.HdrHistogram.Recorder;
-import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.EncryptionKeyInfo;
+import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -233,15 +231,17 @@ public class PerformanceProducer {
String prefixTopicName = arguments.topics.get(0);
List<Future<Producer<byte[]>>> futures = Lists.newArrayList();
- ClientConfiguration clientConf = new ClientConfiguration();
- clientConf.setConnectionsPerBroker(arguments.maxConnections);
- clientConf.setIoThreads(Runtime.getRuntime().availableProcessors());
- clientConf.setStatsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS);
+ ClientBuilder clientBuilder = PulsarClient.builder() //
+ .serviceUrl(arguments.serviceURL) //
+ .connectionsPerBroker(arguments.maxConnections) //
+ .ioThreads(Runtime.getRuntime().availableProcessors()) //
+ .statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) //
+ .enableTls(arguments.useTls) //
+ .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
+
if (isNotBlank(arguments.authPluginClassName)) {
- clientConf.setAuthentication(arguments.authPluginClassName, arguments.authParams);
+ clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
}
- clientConf.setUseTls(arguments.useTls);
- clientConf.setTlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
class EncKeyReader implements CryptoKeyReader {
@@ -264,27 +264,26 @@ public class PerformanceProducer {
return null;
}
}
- PulsarClient client = new PulsarClientImpl(arguments.serviceURL, clientConf);
-
- ProducerConfiguration producerConf = new ProducerConfiguration();
- producerConf.setSendTimeout(0, TimeUnit.SECONDS);
- producerConf.setCompressionType(arguments.compression);
- producerConf.setMaxPendingMessages(arguments.maxOutstanding);
- // enable round robin message routing if it is a partitioned topic
- producerConf.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+ PulsarClient client = clientBuilder.build();
+ ProducerBuilder<byte[]> producerBuilder = client.newProducer() //
+ .sendTimeout(0, TimeUnit.SECONDS) //
+ .compressionType(arguments.compression) //
+ .maxPendingMessages(arguments.maxOutstanding) //
+ // enable round robin message routing if it is a partitioned topic
+ .messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+
if (arguments.batchTime > 0) {
- producerConf.setBatchingMaxPublishDelay(arguments.batchTime, TimeUnit.MILLISECONDS);
- producerConf.setBatchingEnabled(true);
+ producerBuilder.batchingMaxPublishDelay(arguments.batchTime, TimeUnit.MILLISECONDS).enableBatching(true);
}
// Block if queue is full else we will start seeing errors in sendAsync
- producerConf.setBlockIfQueueFull(true);
+ producerBuilder.blockIfQueueFull(true);
if (arguments.encKeyName != null) {
- producerConf.addEncryptionKey(arguments.encKeyName);
+ producerBuilder.addEncryptionKey(arguments.encKeyName);
byte[] pKey = Files.readAllBytes(Paths.get(arguments.encKeyFile));
EncKeyReader keyReader = new EncKeyReader(pKey);
- producerConf.setCryptoKeyReader(keyReader);
+ producerBuilder.cryptoKeyReader(keyReader);
}
for (int i = 0; i < arguments.numTopics; i++) {
@@ -292,7 +291,7 @@ public class PerformanceProducer {
log.info("Adding {} publishers on topic {}", arguments.numProducers, topic);
for (int j = 0; j < arguments.numProducers; j++) {
- futures.add(client.createProducerAsync(topic, producerConf));
+ futures.add(producerBuilder.clone().topic(topic).createAsync());
}
}
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index c5e66b2..7a4fdf9 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -29,14 +29,13 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
-import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.api.ReaderConfiguration;
+import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
@@ -173,7 +172,7 @@ public class PerformanceReader {
final RateLimiter limiter = arguments.rate > 0 ? RateLimiter.create(arguments.rate) : null;
- ReaderListener listener = (reader, msg) -> {
+ ReaderListener<byte[]> listener = (reader, msg) -> {
messagesReceived.increment();
bytesReceived.add(msg.getData().length);
@@ -182,21 +181,21 @@ public class PerformanceReader {
}
};
- ClientConfiguration clientConf = new ClientConfiguration();
- clientConf.setConnectionsPerBroker(arguments.maxConnections);
- clientConf.setStatsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS);
- clientConf.setIoThreads(Runtime.getRuntime().availableProcessors());
+ ClientBuilder clientBuilder = PulsarClient.builder() //
+ .serviceUrl(arguments.serviceURL) //
+ .connectionsPerBroker(arguments.maxConnections) //
+ .statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) //
+ .ioThreads(Runtime.getRuntime().availableProcessors()) //
+ .enableTls(arguments.useTls) //
+ .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
+
if (isNotBlank(arguments.authPluginClassName)) {
- clientConf.setAuthentication(arguments.authPluginClassName, arguments.authParams);
+ clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
}
- clientConf.setUseTls(arguments.useTls);
- clientConf.setTlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
- PulsarClient pulsarClient = new PulsarClientImpl(arguments.serviceURL, clientConf);
+
+ PulsarClient pulsarClient = clientBuilder.build();
List<CompletableFuture<Reader<byte[]>>> futures = Lists.newArrayList();
- ReaderConfiguration readerConfig = new ReaderConfiguration();
- readerConfig.setReaderListener(listener);
- readerConfig.setReceiverQueueSize(arguments.receiverQueueSize);
MessageId startMessageId;
if ("earliest".equals(arguments.startMessageId)) {
@@ -208,11 +207,16 @@ public class PerformanceReader {
startMessageId = new MessageIdImpl(Long.parseLong(parts[0]), Long.parseLong(parts[1]), -1);
}
+ ReaderBuilder<byte[]> readerBuilder = pulsarClient.newReader() //
+ .readerListener(listener) //
+ .receiverQueueSize(arguments.receiverQueueSize) //
+ .startMessageId(startMessageId);
+
for (int i = 0; i < arguments.numTopics; i++) {
final TopicName topicName = (arguments.numTopics == 1) ? prefixTopicName
: TopicName.get(String.format("%s-%d", prefixTopicName, i));
- futures.add(pulsarClient.createReaderAsync(topicName.toString(), startMessageId, readerConfig));
+ futures.add(readerBuilder.clone().topic(topicName.toString()).createAsync());
}
FutureUtil.waitForAll(futures).get();
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index 6573477..b917dc2 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -36,7 +36,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
-import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
@@ -176,30 +176,33 @@ public class WebSocketService implements Closeable {
}
private PulsarClient createClientInstance(ClusterData clusterData) throws IOException {
- ClientConfiguration clientConf = new ClientConfiguration();
- clientConf.setStatsInterval(0, TimeUnit.SECONDS);
- clientConf.setUseTls(config.isTlsEnabled());
- clientConf.setTlsAllowInsecureConnection(config.isTlsAllowInsecureConnection());
- clientConf.setTlsTrustCertsFilePath(config.getBrokerClientTrustCertsFilePath());
- clientConf.setIoThreads(config.getWebSocketNumIoThreads());
- clientConf.setConnectionsPerBroker(config.getWebSocketConnectionsPerBroker());
+ ClientBuilder clientBuilder = PulsarClient.builder() //
+ .statsInterval(0, TimeUnit.SECONDS) //
+ .enableTls(config.isTlsEnabled()) //
+ .allowTlsInsecureConnection(config.isTlsAllowInsecureConnection()) //
+ .tlsTrustCertsFilePath(config.getBrokerClientTrustCertsFilePath()) //
+ .ioThreads(config.getWebSocketNumIoThreads()) //
+ .connectionsPerBroker(config.getWebSocketConnectionsPerBroker());
if (isNotBlank(config.getBrokerClientAuthenticationPlugin())
&& isNotBlank(config.getBrokerClientAuthenticationParameters())) {
- clientConf.setAuthentication(config.getBrokerClientAuthenticationPlugin(),
+ clientBuilder.authentication(config.getBrokerClientAuthenticationPlugin(),
config.getBrokerClientAuthenticationParameters());
}
if (config.isTlsEnabled()) {
if (isNotBlank(clusterData.getBrokerServiceUrlTls())) {
- return PulsarClient.create(clusterData.getBrokerServiceUrlTls(), clientConf);
+ clientBuilder.serviceUrl(clusterData.getBrokerServiceUrlTls());
} else if (isNotBlank(clusterData.getServiceUrlTls())) {
- return PulsarClient.create(clusterData.getServiceUrlTls(), clientConf);
+ clientBuilder.serviceUrl(clusterData.getServiceUrlTls());
}
} else if (isNotBlank(clusterData.getBrokerServiceUrl())) {
- return PulsarClient.create(clusterData.getBrokerServiceUrl(), clientConf);
+ clientBuilder.serviceUrl(clusterData.getBrokerServiceUrl());
+ } else {
+ clientBuilder.serviceUrl(clusterData.getServiceUrl());
}
- return PulsarClient.create(clusterData.getServiceUrl(), clientConf);
+
+ return clientBuilder.build();
}
private static ClusterData createClusterData(WebSocketProxyConfiguration config) {
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.