You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2021/08/21 07:50:55 UTC
[pulsar] branch master updated: PIP 83 : Pulsar Reader: Message
consumption with pooled buffer (#11725)
This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8d8e6b7 PIP 83 : Pulsar Reader: Message consumption with pooled buffer (#11725)
8d8e6b7 is described below
commit 8d8e6b751ee0dc99306a1c61c22a8d75b5927811
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sat Aug 21 00:50:17 2021 -0700
PIP 83 : Pulsar Reader: Message consumption with pooled buffer (#11725)
* PIP 83 : Pulsar Reader: Message consumption with pooled buffer
---
.../client/impl/BrokerClientIntegrationTest.java | 56 ++++++++++++++++++++++
.../apache/pulsar/client/api/ReaderBuilder.java | 10 ++++
.../pulsar/client/impl/MultiTopicsReaderImpl.java | 1 +
.../pulsar/client/impl/ReaderBuilderImpl.java | 7 ++-
.../org/apache/pulsar/client/impl/ReaderImpl.java | 1 +
.../client/impl/conf/ReaderConfigurationData.java | 2 +
6 files changed, 76 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index fb3c30b..a111dd8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -81,6 +81,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
@@ -953,4 +954,59 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
consumer.close();
producer.close();
}
+
+ /**
+ * It validates pooled message consumption for batch and non-batch messages.
+ *
+ * @throws Exception
+ */
+ @Test(dataProvider = "booleanFlagProvider")
+ public void testConsumerWithPooledMessagesWithReader(boolean isBatchingEnabled) throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ @Cleanup
+ PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();
+
+ final String topic = "persistent://my-property/my-ns/testConsumerWithPooledMessages" + isBatchingEnabled;
+
+ @Cleanup
+ Reader<ByteBuffer> reader = newPulsarClient.newReader(Schema.BYTEBUFFER).topic(topic).poolMessages(true)
+ .startMessageId(MessageId.latest).create();
+
+ @Cleanup
+ Producer<byte[]> producer = newPulsarClient.newProducer().topic(topic).enableBatching(isBatchingEnabled).create();
+
+ final int numMessages = 100;
+ for (int i = 0; i < numMessages; i++) {
+ producer.newMessage().value(("value-" + i).getBytes(UTF_8))
+ .eventTime((i + 1) * 100L).sendAsync();
+ }
+ producer.flush();
+
+ // Reuse pre-allocated pooled buffer to process every message
+ byte[] val = null;
+ int size = 0;
+ for (int i = 0; i < numMessages; i++) {
+ Message<ByteBuffer> msg = reader.readNext();
+ ByteBuffer value;
+ try {
+ value = msg.getValue();
+ int capacity = value.remaining();
+ // expand the size of buffer if needed
+ if (capacity > size) {
+ val = new byte[capacity];
+ size = capacity;
+ }
+ // read message into pooled buffer
+ value.get(val, 0, capacity);
+ // process the message
+ assertEquals(("value-" + i), new String(val, 0, capacity));
+ assertTrue(value.isDirect());
+ } finally {
+ msg.release();
+ }
+ }
+ reader.close();
+ producer.close();
+ }
}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
index a84208b..4186df7 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
@@ -280,4 +280,14 @@ public interface ReaderBuilder<T> extends Cloneable {
* @return the reader builder instance
*/
ReaderBuilder<T> keyHashRange(Range... ranges);
+
+ /**
+ * Enable pooling of messages and the underlying data buffers.
+ * <p/>
+ * When pooling is enabled, the application is responsible for calling Message.release() after the handling of every
+ * received message. If “release()” is not called on a received message, there will be a memory leak. If an
+ * application attempts to use and already “released” message, it might experience undefined behavior (for example, memory
+ * corruption, deserialization error, etc.).
+ */
+ ReaderBuilder<T> poolMessages(boolean poolMessages);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
index 32c9869..fab61b2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
@@ -64,6 +64,7 @@ public class MultiTopicsReaderImpl<T> implements Reader<T> {
consumerConfiguration.setSubscriptionMode(SubscriptionMode.NonDurable);
consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize());
consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted());
+ consumerConfiguration.setPoolMessages(readerConfiguration.isPoolMessages());
if (readerConfiguration.getReaderListener() != null) {
ReaderListener<T> readerListener = readerConfiguration.getReaderListener();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index b8f017f..4305bb6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -40,7 +40,6 @@ import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.impl.DefaultCryptoKeyReader;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.common.util.FutureUtil;
@@ -223,4 +222,10 @@ public class ReaderBuilderImpl<T> implements ReaderBuilder<T> {
conf.setKeyHashRanges(Arrays.asList(ranges));
return this;
}
+
+ @Override
+ public ReaderBuilder<T> poolMessages(boolean poolMessages) {
+ conf.setPoolMessages(poolMessages);
+ return this;
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 5b86864..37d346e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -69,6 +69,7 @@ public class ReaderImpl<T> implements Reader<T> {
consumerConfiguration.setSubscriptionMode(SubscriptionMode.NonDurable);
consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize());
consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted());
+ consumerConfiguration.setPoolMessages(readerConfiguration.isPoolMessages());
// Reader doesn't need any batch receiving behaviours
// disable the batch receive timer for the ConsumerImpl instance wrapped by the ReaderImpl
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
index 14770ad..47efa86 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
@@ -61,6 +61,8 @@ public class ReaderConfigurationData<T> implements Serializable, Cloneable {
private transient List<Range> keyHashRanges;
+ private boolean poolMessages = false;
+
@JsonIgnore
public String getTopicName() {
if (topicNames.size() > 1) {