You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2021/04/20 23:27:03 UTC
[pulsar] branch master updated: PIP 83 : Pulsar client: Message
consumption with pooled buffer (#10184)
This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ef06691 PIP 83 : Pulsar client: Message consumption with pooled buffer (#10184)
ef06691 is described below
commit ef06691531002c5d7cdbbdafc5494914ee8e0765
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Apr 20 16:26:28 2021 -0700
PIP 83 : Pulsar client: Message consumption with pooled buffer (#10184)
fix api, buffer-access, duplicate code
---
.../client/api/SimpleProducerConsumerTest.java | 2 +-
.../client/impl/BrokerClientIntegrationTest.java | 107 ++++++++-
.../apache/pulsar/client/api/ConsumerBuilder.java | 10 +
.../java/org/apache/pulsar/client/api/Message.java | 15 ++
.../java/org/apache/pulsar/client/api/Schema.java | 17 ++
.../client/internal/DefaultImplementation.java | 20 ++
.../org/apache/pulsar/client/cli/CmdConsume.java | 34 ++-
.../apache/pulsar/client/impl/ConsumerBase.java | 13 +-
.../pulsar/client/impl/ConsumerBuilderImpl.java | 5 +
.../apache/pulsar/client/impl/ConsumerImpl.java | 25 +-
.../client/impl/ConsumerStatsRecorderImpl.java | 2 +-
.../org/apache/pulsar/client/impl/MessageImpl.java | 256 ++++++++++++++-------
.../apache/pulsar/client/impl/MessagesImpl.java | 4 +-
.../client/impl/MultiTopicsConsumerImpl.java | 7 +-
.../pulsar/client/impl/TopicMessageImpl.java | 10 +
.../pulsar/client/impl/ZeroQueueConsumerImpl.java | 1 +
.../impl/conf/ConsumerConfigurationData.java | 2 +
.../pulsar/client/impl/schema/AbstractSchema.java | 3 +-
.../client/impl/schema/ByteBufferSchema.java | 13 +-
.../pulsar/client/impl/schema/BytesSchema.java | 2 +-
.../client/impl/schema/LocalDateTimeSchema.java | 4 +-
.../pulsar/client/impl/schema/StringSchema.java | 2 +-
.../pulsar/testclient/PerformanceConsumer.java | 25 +-
23 files changed, 445 insertions(+), 134 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index abcb7ed..1123562 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -4029,4 +4029,4 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
blockedMessageLatch.countDown();
log.info("-- Exiting {} test --", methodName);
}
-}
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index c4d03ad..f7ce13b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.UUID.randomUUID;
import static org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;
import static org.mockito.Mockito.any;
@@ -30,12 +31,14 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
+import io.netty.buffer.ByteBuf;
import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
@@ -55,7 +58,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Cleanup;
-
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
@@ -132,6 +134,11 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
return new Object[][] { { SubscriptionType.Shared }, { SubscriptionType.Failover } };
}
+ @DataProvider(name = "booleanFlagProvider")
+ public Object[][] booleanFlagProvider() {
+ return new Object[][] { { true }, { false } };
+ }
+
/**
* Verifies unload namespace-bundle doesn't close shared connection used by other namespace-bundle.
*
@@ -918,4 +925,98 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
private static final class TestMessageObject{
private String value;
}
-}
+
+ /**
+ * It validates pooled message consumption for batch and non-batch messages.
+ *
+ * @throws Exception
+ */
+ @Test(dataProvider = "booleanFlagProvider")
+ public void testConsumerWithPooledMessages(boolean isBatchingEnabled) throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ @Cleanup
+ PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();
+
+ final String topic = "persistent://my-property/my-ns/testConsumerWithPooledMessages" + isBatchingEnabled;
+
+ @Cleanup
+ Consumer<ByteBuffer> consumer = newPulsarClient.newConsumer(Schema.BYTEBUFFER).topic(topic)
+ .subscriptionName("my-sub").poolMessages(true).subscribe();
+
+ @Cleanup
+ Producer<byte[]> producer = newPulsarClient.newProducer().topic(topic).enableBatching(isBatchingEnabled).create();
+
+ final int numMessages = 100;
+ for (int i = 0; i < numMessages; i++) {
+ producer.newMessage().value(("value-" + i).getBytes(UTF_8))
+ .eventTime((i + 1) * 100L).sendAsync();
+ }
+ producer.flush();
+
+ // Reuse pre-allocated pooled buffer to process every message
+ byte[] val = null;
+ int size = 0;
+ for (int i = 0; i < numMessages; i++) {
+ Message<ByteBuffer> msg = consumer.receive();
+ ByteBuffer value;
+ try {
+ value = msg.getValue();
+ int capacity = value.remaining();
+ // expand the size of buffer if needed
+ if (capacity > size) {
+ val = new byte[capacity];
+ size = capacity;
+ }
+ // read message into pooled buffer
+ value.get(val, 0, capacity);
+ // process the message
+ assertEquals(("value-" + i), new String(val, 0, capacity));
+ } finally {
+ msg.release();
+ }
+ }
+ consumer.close();
+ producer.close();
+ }
+
+ /**
+ * It verifies that expiry/redelivery of messages relesaes the messages without leak.
+ *
+ * @param isBatchingEnabled
+ * @throws Exception
+ */
+ @Test(dataProvider = "booleanFlagProvider")
+ public void testPooledMessageWithAckTimeout(boolean isBatchingEnabled) throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ @Cleanup
+ PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();
+
+ final String topic = "persistent://my-property/my-ns/testPooledMessageWithAckTimeout" + isBatchingEnabled;
+
+ @Cleanup
+ ConsumerImpl<ByteBuffer> consumer = (ConsumerImpl<ByteBuffer>) newPulsarClient.newConsumer(Schema.BYTEBUFFER)
+ .topic(topic).subscriptionName("my-sub").poolMessages(true).subscribe();
+
+ @Cleanup
+ Producer<byte[]> producer = newPulsarClient.newProducer().topic(topic).enableBatching(isBatchingEnabled)
+ .create();
+
+ final int numMessages = 100;
+ for (int i = 0; i < numMessages; i++) {
+ producer.newMessage().value(("value-" + i).getBytes(UTF_8)).eventTime((i + 1) * 100L).sendAsync();
+ }
+ producer.flush();
+
+ retryStrategically((test) -> consumer.incomingMessages.peek() != null, 5, 500);
+ MessageImpl<ByteBuffer> msg = (MessageImpl) consumer.incomingMessages.peek();
+ assertNotNull(msg);
+ ByteBuf payload = ((MessageImpl) msg).getPayload();
+ assertNotEquals(payload.refCnt(), 0);
+ consumer.redeliverUnacknowledgedMessages();
+ assertEquals(payload.refCnt(), 0);
+ consumer.close();
+ producer.close();
+ }
+}
\ No newline at end of file
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 0b589e1..1038ba9 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -731,4 +731,14 @@ public interface ConsumerBuilder<T> extends Cloneable {
* @return
*/
ConsumerBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit);
+
+ /**
+ * Enable pooling of messages and the underlying data buffers.
+ * <p/>
+ * When pooling is enabled, the application is responsible for calling Message.release() after the handling of every
+ * received message. If “release()” is not called on a received message, there will be a memory leak. If an
+ * application attempts to use and already “released” message, it might experience undefined behavior (eg: memory
+ * corruption, deserialization error, etc.).
+ */
+ ConsumerBuilder<T> poolMessages(boolean poolMessages);
}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
index 324c2ba..df3a4b9 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
@@ -67,6 +67,13 @@ public interface Message<T> {
byte[] getData();
/**
+ * Get the uncompressed message payload size in bytes.
+ *
+ * @return size in bytes.
+ */
+ int size();
+
+ /**
* Get the de-serialized value of the message, according the configured {@link Schema}.
*
* @return the deserialized value of the message
@@ -217,4 +224,12 @@ public interface Message<T> {
* @return the name of cluster, from which the message is replicated.
*/
String getReplicatedFrom();
+
+ /**
+ * Release a message back to the pool. This is required only if the consumer was created with the option to pool
+ * messages, otherwise it will have no effect.
+ *
+ * @since 2.8.0
+ */
+ void release();
}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
index a9df5cc..69fcb00 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;
+import static org.apache.pulsar.client.internal.DefaultImplementation.getBytes;
import java.nio.ByteBuffer;
import java.sql.Time;
import java.sql.Timestamp;
@@ -121,6 +122,22 @@ public interface Schema<T> extends Cloneable{
}
/**
+ * Decode a ByteBuffer into an object using a given version. <br/>
+ *
+ * @param data
+ * the ByteBuffer to decode
+ * @param schemaVersion
+ * the schema version to decode the object. null indicates using latest version.
+ * @return the deserialized object
+ */
+ default T decode(ByteBuffer data, byte[] schemaVersion) {
+ if (data == null) {
+ return null;
+ }
+ return decode(getBytes(data), schemaVersion);
+ }
+
+ /**
* @return an object that represents the Schema associated metadata
*/
SchemaInfo getSchemaInfo();
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
index 47b8410..dcee6a3 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
@@ -494,4 +494,24 @@ public class DefaultImplementation {
() -> (BatcherBuilder) getConstructor("org.apache.pulsar.client.impl.KeyBasedBatcherBuilder")
.newInstance());
}
+
+ /**
+ * Retrieves ByteBuffer data into byte[].
+ *
+ * @param byteBuffer
+ * @return
+ */
+ public static byte[] getBytes(ByteBuffer byteBuffer) {
+ if (byteBuffer == null) {
+ return null;
+ }
+ if (byteBuffer.hasArray() && byteBuffer.arrayOffset() == 0
+ && byteBuffer.array().length == byteBuffer.remaining()) {
+ return byteBuffer.array();
+ }
+ // Direct buffer is not backed by array and it needs to be read from direct memory
+ byte[] array = new byte[byteBuffer.remaining()];
+ byteBuffer.get(array);
+ return array;
+ }
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
index f5c9d03..0e4f418 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.client.cli;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
-
+import static org.apache.pulsar.client.internal.DefaultImplementation.getBytes;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
@@ -31,6 +31,7 @@ import com.google.gson.JsonPrimitive;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
@@ -122,7 +123,9 @@ public class CmdConsume {
@Parameter(names = { "-st", "--schema-type"}, description = "Set a schema type on the consumer, it can be 'bytes' or 'auto_consume'")
private String schematype = "bytes";
-
+ @Parameter(names = { "-pm", "--pool-messages" }, description = "Use the pooled message")
+ private boolean poolMessages = true;
+
private ClientBuilder clientBuilder;
private Authentication authentication;
private String serviceURL;
@@ -171,6 +174,8 @@ public class CmdConsume {
} else if (value instanceof GenericRecord) {
Map<String, Object> asMap = genericRecordToMap((GenericRecord) value);
data = asMap.toString();
+ } else if (value instanceof ByteBuffer) {
+ data = new String(getBytes((ByteBuffer) value));
} else {
data = value.toString();
}
@@ -233,7 +238,7 @@ public class CmdConsume {
try {
ConsumerBuilder<?> builder;
PulsarClient client = clientBuilder.build();
- Schema<?> schema = Schema.BYTES;
+ Schema<?> schema = poolMessages ? Schema.BYTEBUFFER : Schema.BYTES;
if ("auto_consume".equals(schematype)) {
schema = Schema.AUTO_CONSUME();
} else if (!"bytes".equals(schematype)) {
@@ -243,7 +248,8 @@ public class CmdConsume {
.subscriptionName(this.subscriptionName)
.subscriptionType(subscriptionType)
.subscriptionMode(subscriptionMode)
- .subscriptionInitialPosition(subscriptionInitialPosition);
+ .subscriptionInitialPosition(subscriptionInitialPosition)
+ .poolMessages(poolMessages);
if (isRegex) {
builder.topicsPattern(Pattern.compile(topic));
@@ -275,15 +281,19 @@ public class CmdConsume {
if (msg == null) {
LOG.debug("No message to consume after waiting for 5 seconds.");
} else {
- numMessagesConsumed += 1;
- if (!hideContent) {
- System.out.println(MESSAGE_BOUNDARY);
- String output = this.interpretMessage(msg, displayHex);
- System.out.println(output);
- } else if (numMessagesConsumed % 1000 == 0) {
- System.out.println("Received " + numMessagesConsumed + " messages");
+ try {
+ numMessagesConsumed += 1;
+ if (!hideContent) {
+ System.out.println(MESSAGE_BOUNDARY);
+ String output = this.interpretMessage(msg, displayHex);
+ System.out.println(output);
+ } else if (numMessagesConsumed % 1000 == 0) {
+ System.out.println("Received " + numMessagesConsumed + " messages");
+ }
+ consumer.acknowledge(msg);
+ } finally {
+ msg.release();
}
- consumer.acknowledge(msg);
}
}
client.close();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 28c248f..dc1c858 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -908,8 +908,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
}
protected void increaseIncomingMessageSize(final Message<?> message) {
- INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(
- this, message.getData() == null ? 0 : message.getData().length);
+ INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, message.size());
}
protected void resetIncomingMessageSize() {
@@ -917,14 +916,20 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
}
protected void decreaseIncomingMessageSize(final Message<?> message) {
- INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this,
- (message.getData() != null) ? -message.getData().length : 0);
+ INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.size());
}
public long getIncomingMessageSize() {
return INCOMING_MESSAGES_SIZE_UPDATER.get(this);
}
+ protected void clearIncomingMessages() {
+ // release messages if they are pooled messages
+ incomingMessages.forEach(Message::release);
+ incomingMessages.clear();
+ resetIncomingMessageSize();
+ }
+
protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);
private ExecutorService getExecutor(Message<T> msg) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 338fe47..76b53ec 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -458,4 +458,9 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
return this;
}
+ @Override
+ public ConsumerBuilder<T> poolMessages(boolean poolMessages) {
+ conf.setPoolMessages(poolMessages);
+ return this;
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index ab1d44e..b5e587f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -184,6 +184,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private final BlockingQueue<String> pendingChunkedMessageUuidQueue;
private final boolean createTopicIfDoesNotExist;
+ private final boolean poolMessages;
private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
@@ -252,6 +253,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
this.pendingChunkedMessageUuidQueue = new GrowableArrayBlockingQueue<>();
this.expireTimeOfIncompleteChunkedMessageMillis = conf.getExpireTimeOfIncompleteChunkedMessageMillis();
this.autoAckOldestChunkedMessageOnQueueFull = conf.isAutoAckOldestChunkedMessageOnQueueFull();
+ this.poolMessages = conf.isPoolMessages();
if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
stats = new ConsumerStatsRecorderImpl(client, conf, this);
@@ -835,7 +837,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
previousMessage = new BatchMessageIdImpl(nextMessageInQueue.getLedgerId(),
nextMessageInQueue.getEntryId() - 1, nextMessageInQueue.getPartitionIndex(), -1);
}
-
+ // release messages if they are pooled messages
+ currentMessageQueue.forEach(Message::release);
return previousMessage;
} else if (!lastDequeuedMessageId.equals(MessageId.earliest)) {
// If the queue was empty we need to restart from the message just after the last one that has been dequeued
@@ -1044,8 +1047,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
return;
}
- final MessageImpl<T> message = new MessageImpl<>(topicName.toString(), msgId, msgMetadata,
- uncompressedPayload, createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount);
+ final MessageImpl<T> message = MessageImpl.create(topicName.toString(), msgId, msgMetadata,
+ uncompressedPayload, createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount,
+ poolMessages);
uncompressedPayload.release();
// Enqueue the message so that it can be retrieved when application calls receive()
@@ -1264,9 +1268,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(), getPartitionIndex(), i, batchSize, acker);
- final MessageImpl<T> message = new MessageImpl<>(topicName.toString(), batchMessageIdImpl,
+ final MessageImpl<T> message = MessageImpl.create(topicName.toString(), batchMessageIdImpl,
msgMetadata, singleMessageMetadata, singleMessagePayload,
- createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount);
+ createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount, poolMessages);
if (possibleToDeadLetter != null) {
possibleToDeadLetter.add(message);
}
@@ -1542,8 +1546,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
int currentSize = 0;
synchronized (this) {
currentSize = incomingMessages.size();
- incomingMessages.clear();
- resetIncomingMessageSize();
+ clearIncomingMessages();
unAckedMessageTracker.clear();
}
cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(consumerId), cnx.ctx().voidPromise());
@@ -1566,8 +1569,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
public int clearIncomingMessagesAndGetMessageNumber() {
int messagesNumber = incomingMessages.size();
- incomingMessages.clear();
- resetIncomingMessageSize();
+ incomingMessages.forEach(Message::release);
+ clearIncomingMessages();
unAckedMessageTracker.clear();
return messagesNumber;
}
@@ -1789,8 +1792,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
duringSeek.set(true);
lastDequeuedMessageId = MessageId.earliest;
- incomingMessages.clear();
- resetIncomingMessageSize();
+ clearIncomingMessages();
seekFuture.complete(null);
}).exceptionally(e -> {
log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
@@ -2113,6 +2115,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
messageIds.add(id);
break;
}
+ message.release();
message = incomingMessages.poll();
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
index f87c463..aee60b9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
@@ -175,7 +175,7 @@ public class ConsumerStatsRecorderImpl implements ConsumerStatsRecorder {
public void updateNumMsgsReceived(Message<?> message) {
if (message != null) {
numMsgsReceived.increment();
- numBytesReceived.add(message.getData() == null ? 0 : message.getData().length);
+ numBytesReceived.add(message.size());
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 350210c..8ed18fe 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -21,11 +21,13 @@ package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.nio.charset.StandardCharsets.UTF_8;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
+import io.netty.util.ReferenceCountUtil;
import io.netty.util.Recycler.Handle;
import java.io.IOException;
@@ -59,17 +61,20 @@ public class MessageImpl<T> implements Message<T> {
private final MessageMetadata msgMetadata;
private ClientCnx cnx;
private ByteBuf payload;
+
private Schema<T> schema;
private SchemaState schemaState = SchemaState.None;
private Optional<EncryptionContext> encryptionCtx = Optional.empty();
private String topic; // only set for incoming messages
transient private Map<String, String> properties;
- private final int redeliveryCount;
+ private int redeliveryCount;
private int uncompressedSize;
private BrokerEntryMetadata brokerEntryMetadata;
+ private boolean poolMessage;
+
// Constructor for out-going message
public static <T> MessageImpl<T> create(MessageMetadata msgMetadata, ByteBuffer payload, Schema<T> schema) {
@SuppressWarnings("unchecked")
@@ -94,93 +99,138 @@ public class MessageImpl<T> implements Message<T> {
MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf payload,
Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema) {
- this(topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema, 0);
+ this(topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema, 0, false);
}
MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf payload,
- Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema, int redeliveryCount) {
- this.msgMetadata = new MessageMetadata().copyFrom(msgMetadata);
- this.messageId = messageId;
- this.topic = topic;
- this.cnx = cnx;
- this.redeliveryCount = redeliveryCount;
-
- // Need to make a copy since the passed payload is using a ref-count buffer that we don't know when could
- // release, since the Message is passed to the user. Also, the passed ByteBuf is coming from network and is
- // backed by a direct buffer which we could not expose as a byte[]
- this.payload = Unpooled.copiedBuffer(payload);
- this.encryptionCtx = encryptionCtx;
-
- if (msgMetadata.getPropertiesCount() > 0) {
- this.properties = Collections.unmodifiableMap(msgMetadata.getPropertiesList().stream()
- .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue,
- (oldValue,newValue) -> newValue)));
+ Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema, int redeliveryCount,
+ boolean pooledMessage) {
+ this.msgMetadata = new MessageMetadata();
+ init(this, topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema, redeliveryCount, pooledMessage);
+ }
+
+ public static <T> MessageImpl<T> create(String topic, MessageIdImpl messageId, MessageMetadata msgMetadata,
+ ByteBuf payload, Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema,
+ int redeliveryCount, boolean pooledMessage) {
+ if (pooledMessage) {
+ @SuppressWarnings("unchecked")
+ MessageImpl<T> msg = (MessageImpl<T>) RECYCLER.get();
+ init(msg, topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema, redeliveryCount,
+ pooledMessage);
+ return msg;
} else {
- properties = Collections.emptyMap();
+ return new MessageImpl<>(topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema,
+ redeliveryCount, pooledMessage);
}
- this.schema = schema;
}
MessageImpl(String topic, BatchMessageIdImpl batchMessageIdImpl, MessageMetadata msgMetadata,
- SingleMessageMetadata singleMessageMetadata, ByteBuf payload,
- Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema) {
- this(topic, batchMessageIdImpl, msgMetadata, singleMessageMetadata, payload, encryptionCtx, cnx, schema, 0);
+ SingleMessageMetadata singleMessageMetadata, ByteBuf payload, Optional<EncryptionContext> encryptionCtx,
+ ClientCnx cnx, Schema<T> schema) {
+ this(topic, batchMessageIdImpl, msgMetadata, singleMessageMetadata, payload, encryptionCtx, cnx, schema, 0,
+ false);
}
MessageImpl(String topic, BatchMessageIdImpl batchMessageIdImpl, MessageMetadata batchMetadata,
- SingleMessageMetadata singleMessageMetadata, ByteBuf payload,
- Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema, int redeliveryCount) {
- this.msgMetadata = new MessageMetadata().copyFrom(batchMetadata);
- this.messageId = batchMessageIdImpl;
- this.topic = topic;
- this.cnx = cnx;
- this.redeliveryCount = redeliveryCount;
+ SingleMessageMetadata singleMessageMetadata, ByteBuf payload, Optional<EncryptionContext> encryptionCtx,
+ ClientCnx cnx, Schema<T> schema, int redeliveryCount, boolean keepMessageInDirectMemory) {
+ this.msgMetadata = new MessageMetadata();
+ init(this, topic, batchMessageIdImpl, batchMetadata, singleMessageMetadata, payload, encryptionCtx, cnx, schema,
+ redeliveryCount, keepMessageInDirectMemory);
- this.payload = Unpooled.copiedBuffer(payload);
- this.encryptionCtx = encryptionCtx;
+ }
- if (singleMessageMetadata.getPropertiesCount() > 0) {
- Map<String, String> properties = Maps.newTreeMap();
- for (KeyValue entry : singleMessageMetadata.getPropertiesList()) {
- properties.put(entry.getKey(), entry.getValue());
- }
- this.properties = Collections.unmodifiableMap(properties);
+ public static <T> MessageImpl<T> create(String topic, BatchMessageIdImpl batchMessageIdImpl,
+ MessageMetadata batchMetadata, SingleMessageMetadata singleMessageMetadata, ByteBuf payload,
+ Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema, int redeliveryCount,
+ boolean pooledMessage) {
+ if (pooledMessage) {
+ @SuppressWarnings("unchecked")
+ MessageImpl<T> msg = (MessageImpl<T>) RECYCLER.get();
+ init(msg, topic, batchMessageIdImpl, batchMetadata, singleMessageMetadata, payload, encryptionCtx, cnx,
+ schema, redeliveryCount, pooledMessage);
+ return msg;
} else {
- properties = Collections.emptyMap();
- }
- if (singleMessageMetadata.hasPartitionKey()) {
- msgMetadata.setPartitionKeyB64Encoded(singleMessageMetadata.isPartitionKeyB64Encoded())
- .setPartitionKey(singleMessageMetadata.getPartitionKey());
- } else if (msgMetadata.hasPartitionKey()) {
- msgMetadata.clearPartitionKey();
- msgMetadata.clearPartitionKeyB64Encoded();
+ return new MessageImpl<>(topic, batchMessageIdImpl, batchMetadata, singleMessageMetadata, payload,
+ encryptionCtx, cnx, schema, redeliveryCount, pooledMessage);
}
+ }
- if (singleMessageMetadata.hasOrderingKey()) {
- msgMetadata.setOrderingKey(singleMessageMetadata.getOrderingKey());
- } else if (msgMetadata.hasOrderingKey()) {
- msgMetadata.clearOrderingKey();
- }
+ static <T> void init(MessageImpl<T> msg, String topic, MessageIdImpl messageId, MessageMetadata msgMetadata,
+ ByteBuf payload, Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema,
+ int redeliveryCount, boolean poolMessage) {
+ init(msg, topic, null /* batchMessageIdImpl */, msgMetadata, null /* singleMessageMetadata */, payload,
+ encryptionCtx, cnx, schema, redeliveryCount, poolMessage);
+ msg.messageId = messageId;
+ }
+
+ private static <T> void init(MessageImpl<T> msg, String topic, BatchMessageIdImpl batchMessageIdImpl,
+ MessageMetadata msgMetadata, SingleMessageMetadata singleMessageMetadata, ByteBuf payload,
+ Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema, int redeliveryCount,
+ boolean poolMessage) {
+ msg.msgMetadata.clear();
+ msg.msgMetadata.copyFrom(msgMetadata);
+ msg.messageId = batchMessageIdImpl;
+ msg.topic = topic;
+ msg.cnx = cnx;
+ msg.redeliveryCount = redeliveryCount;
+ msg.encryptionCtx = encryptionCtx;
+ msg.schema = schema;
- if (singleMessageMetadata.hasEventTime()) {
- msgMetadata.setEventTime(singleMessageMetadata.getEventTime());
- }
+ msg.poolMessage = poolMessage;
+ // If it's not pool message then need to make a copy since the passed payload is
+ // using a ref-count buffer that we don't know when could release, since the
+ // Message is passed to the user. Also, the passed ByteBuf is coming from network
+ // and is backed by a direct buffer which we could not expose as a byte[]
+ msg.payload = poolMessage ? payload.retain() : Unpooled.copiedBuffer(payload);
+
+ if (singleMessageMetadata != null) {
+ if (singleMessageMetadata.getPropertiesCount() > 0) {
+ Map<String, String> properties = Maps.newTreeMap();
+ for (KeyValue entry : singleMessageMetadata.getPropertiesList()) {
+ properties.put(entry.getKey(), entry.getValue());
+ }
+ msg.properties = Collections.unmodifiableMap(properties);
+ } else {
+ msg.properties = Collections.emptyMap();
+ }
+ if (singleMessageMetadata.hasPartitionKey()) {
+ msg.msgMetadata.setPartitionKeyB64Encoded(singleMessageMetadata.isPartitionKeyB64Encoded())
+ .setPartitionKey(singleMessageMetadata.getPartitionKey());
+ } else if (msg.msgMetadata.hasPartitionKey()) {
+ msg.msgMetadata.clearPartitionKey();
+ msg.msgMetadata.clearPartitionKeyB64Encoded();
+ }
- if (singleMessageMetadata.hasSequenceId()) {
- msgMetadata.setSequenceId(singleMessageMetadata.getSequenceId());
- }
+ if (singleMessageMetadata.hasOrderingKey()) {
+ msg.msgMetadata.setOrderingKey(singleMessageMetadata.getOrderingKey());
+ } else if (msg.msgMetadata.hasOrderingKey()) {
+ msg.msgMetadata.clearOrderingKey();
+ }
- if (singleMessageMetadata.hasNullValue()) {
- msgMetadata.setNullValue(singleMessageMetadata.isNullValue());
- }
+ if (singleMessageMetadata.hasEventTime()) {
+ msg.msgMetadata.setEventTime(singleMessageMetadata.getEventTime());
+ }
- if (singleMessageMetadata.hasNullPartitionKey()) {
- msgMetadata.setNullPartitionKey(singleMessageMetadata.isNullPartitionKey());
- }
+ if (singleMessageMetadata.hasSequenceId()) {
+ msg.msgMetadata.setSequenceId(singleMessageMetadata.getSequenceId());
+ }
- this.schema = schema;
- }
+ if (singleMessageMetadata.hasNullValue()) {
+ msg.msgMetadata.setNullValue(singleMessageMetadata.isNullValue());
+ }
+ if (singleMessageMetadata.hasNullPartitionKey()) {
+ msg.msgMetadata.setNullPartitionKey(singleMessageMetadata.isNullPartitionKey());
+ }
+ } else if (msgMetadata.getPropertiesCount() > 0) {
+ msg.properties = Collections.unmodifiableMap(msgMetadata.getPropertiesList().stream().collect(
+ Collectors.toMap(KeyValue::getKey, KeyValue::getValue, (oldValue, newValue) -> newValue)));
+ } else {
+ msg.properties = Collections.emptyMap();
+ }
+ }
+
public MessageImpl(String topic, String msgId, Map<String, String> properties,
byte[] payload, Schema<T> schema, MessageMetadata msgMetadata) {
this(topic, msgId, properties, Unpooled.wrappedBuffer(payload), schema, msgMetadata);
@@ -311,6 +361,11 @@ public class MessageImpl<T> implements Message<T> {
if (msgMetadata.isNullValue()) {
return null;
}
+ if (payload.isDirect()) {
+ byte[] data = new byte[payload.readableBytes()];
+ payload.getBytes(payload.readerIndex(), data);
+ return data;
+ }
if (payload.arrayOffset() == 0 && payload.capacity() == payload.array().length) {
return payload.array();
} else {
@@ -321,6 +376,14 @@ public class MessageImpl<T> implements Message<T> {
}
}
+ @Override
+ public int size() {
+ if (msgMetadata.isNullValue()) {
+ return 0;
+ }
+ return payload.readableBytes();
+ }
+
public Schema<T> getSchema() {
return this.schema;
}
@@ -356,19 +419,11 @@ public class MessageImpl<T> implements Message<T> {
}
// check if the schema passed in from client supports schema versioning or not
// this is an optimization to only get schema version when necessary
- if (schema.supportSchemaVersioning()) {
- byte[] schemaVersion = getSchemaVersion();
- if (null == schemaVersion) {
- return schema.decode(getData());
- } else {
- return schema.decode(getData(), schemaVersion);
- }
- } else {
- return schema.decode(getData());
- }
+ return decode(schema.supportSchemaVersioning() ? getSchemaVersion() : null);
}
}
+
private KeyValueSchema getKeyValueSchema() {
if (schema instanceof AutoConsumeSchema) {
return (KeyValueSchema) ((AutoConsumeSchema) schema).getInternalSchema();
@@ -377,6 +432,19 @@ public class MessageImpl<T> implements Message<T> {
}
}
+
+ private T decode(byte[] schemaVersion) {
+ T value = poolMessage ? schema.decode(payload.nioBuffer(), schemaVersion) : null;
+ if (value != null) {
+ return value;
+ }
+ if (null == schemaVersion) {
+ return schema.decode(getData());
+ } else {
+ return schema.decode(getData(), schemaVersion);
+ }
+ }
+
private T getKeyValueBySchemaVersion() {
KeyValueSchema kvSchema = getKeyValueSchema();
byte[] schemaVersion = getSchemaVersion();
@@ -390,7 +458,7 @@ public class MessageImpl<T> implements Message<T> {
return (T) keyValue;
}
} else {
- return schema.decode(getData(), schemaVersion);
+ return decode(schemaVersion);
}
}
@@ -406,7 +474,7 @@ public class MessageImpl<T> implements Message<T> {
return (T) keyValue;
}
} else {
- return schema.decode(getData());
+ return decode(null);
}
}
@@ -527,24 +595,42 @@ public class MessageImpl<T> implements Message<T> {
}
public void recycle() {
- msgMetadata.clear();
+ if (msgMetadata != null) {
+ msgMetadata.clear();
+ }
+ if (brokerEntryMetadata != null) {
+ brokerEntryMetadata.clear();
+ }
+ cnx = null;
messageId = null;
topic = null;
payload = null;
+ encryptionCtx = null;
+ redeliveryCount = 0;
+ uncompressedSize = 0;
properties = null;
schema = null;
schemaState = SchemaState.None;
- brokerEntryMetadata = null;
+ poolMessage = false;
if (recyclerHandle != null) {
recyclerHandle.recycle(this);
}
}
+ @Override
+ public void release() {
+ if (poolMessage) {
+ ReferenceCountUtil.safeRelease(payload);
+ recycle();
+ }
+ }
+
private MessageImpl(Handle<MessageImpl<?>> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
this.redeliveryCount = 0;
this.msgMetadata = new MessageMetadata();
+ this.brokerEntryMetadata = new BrokerEntryMetadata();
}
private Handle<MessageImpl<?>> recyclerHandle;
@@ -590,7 +676,15 @@ public class MessageImpl<T> implements Message<T> {
this.schemaState = schemaState;
}
-
+ /**
+ * used only for unit-test to validate payload's state and ref-cnt.
+ *
+ * @return
+ */
+ @VisibleForTesting
+ ByteBuf getPayload() {
+ return payload;
+ }
enum SchemaState {
None, Ready, Broken
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
index c56694e..4ff23eb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
@@ -49,7 +49,7 @@ public class MessagesImpl<T> implements Messages<T> {
return false;
}
- if (maxSizeOfMessages > 0 && currentSizeOfMessages + message.getData().length > maxSizeOfMessages) {
+ if (maxSizeOfMessages > 0 && currentSizeOfMessages + message.size() > maxSizeOfMessages) {
return false;
}
@@ -62,7 +62,7 @@ public class MessagesImpl<T> implements Messages<T> {
}
Preconditions.checkArgument(canAdd(message), "No more space to add messages.");
currentNumberOfMessages ++;
- currentSizeOfMessages += message.getData().length;
+ currentSizeOfMessages += message.size();
messageList.add(message);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 8033c18..94cb7d7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -602,8 +602,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
consumer.redeliverUnacknowledgedMessages();
consumer.unAckedChunkedMessageIdSequenceMap.clear();
});
- incomingMessages.clear();
- resetIncomingMessageSize();
+ clearIncomingMessages();
unAckedMessageTracker.clear();
resumeReceivingFromPausedConsumersIfNeeded();
@@ -687,8 +686,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
consumers.values().forEach(consumerImpl -> futures.add(consumerImpl.seekAsync(targetMessageId)));
unAckedMessageTracker.clear();
- incomingMessages.clear();
- resetIncomingMessageSize();
+ clearIncomingMessages();
return FutureUtil.waitForAll(futures);
}
@@ -777,6 +775,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
messageIds.add(messageId);
break;
}
+ message.release();
message = incomingMessages.poll();
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index e0c0ef1..8c94eea 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -94,6 +94,11 @@ public class TopicMessageImpl<T> implements Message<T> {
}
@Override
+ public int size() {
+ return msg.size();
+ }
+
+ @Override
public long getPublishTime() {
return msg.getPublishTime();
}
@@ -184,4 +189,9 @@ public class TopicMessageImpl<T> implements Message<T> {
}
return null;
}
+
+ @Override
+ public void release() {
+ msg.release();
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index d353d38..98d5b48 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -85,6 +85,7 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
// Just being cautious
if (incomingMessages.size() > 0) {
log.error("The incoming message queue should never be greater than 0 when Queue size is 0");
+ incomingMessages.forEach(Message::release);
incomingMessages.clear();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 32fa5e8..a39ac21 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -146,6 +146,8 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
private boolean batchIndexAckEnabled = false;
private boolean ackReceiptEnabled = false;
+
+ private boolean poolMessages = false;
public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit timeUnit) {
checkArgument(interval > 0, "interval needs to be > 0");
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
index 255a491..3205a64 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
@@ -42,6 +42,7 @@ public abstract class AbstractSchema<T> implements Schema<T> {
/**
* Decode a byteBuf into an object using the schema definition and deserializer implementation
+ * <p>Do not modify reader/writer index of ByteBuf so, it can be reused to access correct data.
*
* @param byteBuf
* the byte buffer to decode
@@ -61,7 +62,7 @@ public abstract class AbstractSchema<T> implements Schema<T> {
// ignore version by default (most of the primitive schema implementations ignore schema version)
return decode(byteBuf);
}
-
+
@Override
public Schema<T> clone() {
return this;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
index 4a79931..c560f0e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
@@ -76,18 +76,23 @@ public class ByteBufferSchema extends AbstractSchema<ByteBuffer> {
}
@Override
- public ByteBuffer decode(ByteBuf byteBuf) {
- if (null == byteBuf) {
+ public ByteBuffer decode(ByteBuf byteBuffer) {
+ if (null == byteBuffer) {
return null;
} else {
- int size = byteBuf.readableBytes();
+ int size = byteBuffer.readableBytes();
byte[] bytes = new byte[size];
- byteBuf.readBytes(bytes);
+ byteBuffer.readBytes(bytes);
return ByteBuffer.wrap(bytes);
}
}
@Override
+ public ByteBuffer decode(ByteBuffer byteBuffer, byte[] schemaVersion) {
+ return byteBuffer;
+ }
+
+ @Override
public SchemaInfo getSchemaInfo() {
return SCHEMA_INFO;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
index 9447b00..9c7ec37 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
@@ -60,7 +60,7 @@ public class BytesSchema extends AbstractSchema<byte[]> {
int size = byteBuf.readableBytes();
byte[] bytes = new byte[size];
- byteBuf.readBytes(bytes, 0, size);
+ byteBuf.getBytes(byteBuf.readerIndex(), bytes);
return bytes;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
index 8a6c4fb..aa86a19 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
@@ -76,8 +76,8 @@ public class LocalDateTimeSchema extends AbstractSchema<LocalDateTime> {
if (null == byteBuf) {
return null;
}
- long epochDay = byteBuf.readLong();
- long nanoOfDay = byteBuf.readLong();
+ long epochDay = byteBuf.getLong(0);
+ long nanoOfDay = byteBuf.getLong(8);
return LocalDateTime.of(LocalDate.ofEpochDay(epochDay), LocalTime.ofNanoOfDay(nanoOfDay));
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
index 7e813ac..7e57f6c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
@@ -120,7 +120,7 @@ public class StringSchema extends AbstractSchema<String> {
bytes = new byte[size * 2];
tmpBuffer.set(bytes);
}
- byteBuf.readBytes(bytes, 0, size);
+ byteBuf.getBytes(byteBuf.readerIndex(), bytes, 0, size);
return new String(bytes, 0, size, charset);
}
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index ca6b07e..034b613 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -26,6 +26,9 @@ import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import java.io.FileInputStream;
+import java.lang.management.BufferPoolMXBean;
+import java.lang.management.ManagementFactory;
+import java.nio.ByteBuffer;
import java.text.DecimalFormat;
import java.util.Collections;
import java.util.List;
@@ -41,9 +44,11 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.stats.JvmMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -168,6 +173,9 @@ public class PerformanceConsumer {
@Parameter(names = {"--batch-index-ack" }, description = "Enable or disable the batch index acknowledgment")
public boolean batchIndexAck = false;
+
+ @Parameter(names = { "-pm", "--pool-messages" }, description = "Use the pooled message")
+ private boolean poolMessages = true;
}
public static void main(String[] args) throws Exception {
@@ -271,7 +279,7 @@ public class PerformanceConsumer {
final RateLimiter limiter = arguments.rate > 0 ? RateLimiter.create(arguments.rate) : null;
long startTime = System.nanoTime();
long testEndTime = startTime + (long) (arguments.testTime * 1e9);
- MessageListener<byte[]> listener = (consumer, msg) -> {
+ MessageListener<ByteBuffer> listener = (consumer, msg) -> {
if (arguments.testTime > 0) {
if (System.nanoTime() > testEndTime) {
log.info("------------------- DONE -----------------------");
@@ -280,10 +288,10 @@ public class PerformanceConsumer {
}
}
messagesReceived.increment();
- bytesReceived.add(msg.getData().length);
+ bytesReceived.add(msg.size());
totalMessagesReceived.increment();
- totalBytesReceived.add(msg.getData().length);
+ totalBytesReceived.add(msg.size());
if (limiter != null) {
limiter.acquire();
@@ -296,6 +304,10 @@ public class PerformanceConsumer {
}
consumer.acknowledgeAsync(msg);
+
+ if(arguments.poolMessages) {
+ msg.release();
+ }
};
ClientBuilder clientBuilder = PulsarClient.builder() //
@@ -318,8 +330,8 @@ public class PerformanceConsumer {
PulsarClient pulsarClient = clientBuilder.build();
- List<Future<Consumer<byte[]>>> futures = Lists.newArrayList();
- ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer() //
+ List<Future<Consumer<ByteBuffer>>> futures = Lists.newArrayList();
+ ConsumerBuilder<ByteBuffer> consumerBuilder = pulsarClient.newConsumer(Schema.BYTEBUFFER) //
.messageListener(listener) //
.receiverQueueSize(arguments.receiverQueueSize) //
.maxTotalReceiverQueueSizeAcrossPartitions(arguments.maxTotalReceiverQueueSizeAcrossPartitions)
@@ -328,6 +340,7 @@ public class PerformanceConsumer {
.subscriptionInitialPosition(arguments.subscriptionInitialPosition)
.autoAckOldestChunkedMessageOnQueueFull(arguments.autoAckOldestChunkedMessageOnQueueFull)
.enableBatchIndexAcknowledgment(arguments.batchIndexAck)
+ .poolMessages(arguments.poolMessages)
.replicateSubscriptionState(arguments.replicatedSubscription);
if (arguments.maxPendingChunkedMessage > 0) {
consumerBuilder.maxPendingChunkedMessage(arguments.maxPendingChunkedMessage);
@@ -355,7 +368,7 @@ public class PerformanceConsumer {
}
}
- for (Future<Consumer<byte[]>> future : futures) {
+ for (Future<Consumer<ByteBuffer>> future : futures) {
future.get();
}