You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2021/04/20 23:27:03 UTC

[pulsar] branch master updated: PIP 83 : Pulsar client: Message consumption with pooled buffer (#10184)

This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ef06691  PIP 83 : Pulsar client: Message consumption with pooled buffer (#10184)
ef06691 is described below

commit ef06691531002c5d7cdbbdafc5494914ee8e0765
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Apr 20 16:26:28 2021 -0700

    PIP 83 : Pulsar client: Message consumption with pooled buffer (#10184)
    
    fix api, buffer-access, duplicate code
---
 .../client/api/SimpleProducerConsumerTest.java     |   2 +-
 .../client/impl/BrokerClientIntegrationTest.java   | 107 ++++++++-
 .../apache/pulsar/client/api/ConsumerBuilder.java  |  10 +
 .../java/org/apache/pulsar/client/api/Message.java |  15 ++
 .../java/org/apache/pulsar/client/api/Schema.java  |  17 ++
 .../client/internal/DefaultImplementation.java     |  20 ++
 .../org/apache/pulsar/client/cli/CmdConsume.java   |  34 ++-
 .../apache/pulsar/client/impl/ConsumerBase.java    |  13 +-
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |   5 +
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  25 +-
 .../client/impl/ConsumerStatsRecorderImpl.java     |   2 +-
 .../org/apache/pulsar/client/impl/MessageImpl.java | 256 ++++++++++++++-------
 .../apache/pulsar/client/impl/MessagesImpl.java    |   4 +-
 .../client/impl/MultiTopicsConsumerImpl.java       |   7 +-
 .../pulsar/client/impl/TopicMessageImpl.java       |  10 +
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  |   1 +
 .../impl/conf/ConsumerConfigurationData.java       |   2 +
 .../pulsar/client/impl/schema/AbstractSchema.java  |   3 +-
 .../client/impl/schema/ByteBufferSchema.java       |  13 +-
 .../pulsar/client/impl/schema/BytesSchema.java     |   2 +-
 .../client/impl/schema/LocalDateTimeSchema.java    |   4 +-
 .../pulsar/client/impl/schema/StringSchema.java    |   2 +-
 .../pulsar/testclient/PerformanceConsumer.java     |  25 +-
 23 files changed, 445 insertions(+), 134 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index abcb7ed..1123562 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -4029,4 +4029,4 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         blockedMessageLatch.countDown();
         log.info("-- Exiting {} test --", methodName);
     }
-}
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index c4d03ad..f7ce13b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.UUID.randomUUID;
 import static org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;
 import static org.mockito.Mockito.any;
@@ -30,12 +31,14 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
+import io.netty.buffer.ByteBuf;
 import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
@@ -55,7 +58,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.Cleanup;
-
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.Setter;
@@ -132,6 +134,11 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
         return new Object[][] { { SubscriptionType.Shared }, { SubscriptionType.Failover } };
     }
 
+    @DataProvider(name = "booleanFlagProvider")
+    public Object[][] booleanFlagProvider() {
+        return new Object[][] { { true }, { false } };
+    }
+
     /**
      * Verifies unload namespace-bundle doesn't close shared connection used by other namespace-bundle.
      *
@@ -918,4 +925,98 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
     private static final class TestMessageObject{
         private String value;
     }
-}
+    
+    /**
+     * It validates pooled message consumption for batch and non-batch messages.
+     * 
+     * @throws Exception
+     */
+    @Test(dataProvider = "booleanFlagProvider")
+    public void testConsumerWithPooledMessages(boolean isBatchingEnabled) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        @Cleanup
+        PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();
+
+        final String topic = "persistent://my-property/my-ns/testConsumerWithPooledMessages" + isBatchingEnabled;
+
+        @Cleanup
+        Consumer<ByteBuffer> consumer = newPulsarClient.newConsumer(Schema.BYTEBUFFER).topic(topic)
+                .subscriptionName("my-sub").poolMessages(true).subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = newPulsarClient.newProducer().topic(topic).enableBatching(isBatchingEnabled).create();
+
+        final int numMessages = 100;
+        for (int i = 0; i < numMessages; i++) {
+            producer.newMessage().value(("value-" + i).getBytes(UTF_8))
+            .eventTime((i + 1) * 100L).sendAsync();
+        }
+        producer.flush();
+
+        // Reuse pre-allocated pooled buffer to process every message
+        byte[] val = null;
+        int size = 0;
+        for (int i = 0; i < numMessages; i++) {
+            Message<ByteBuffer> msg = consumer.receive();
+            ByteBuffer value;
+            try {
+                value = msg.getValue();
+                int capacity = value.remaining();
+                // expand the size of buffer if needed
+                if (capacity > size) {
+                    val = new byte[capacity];
+                    size = capacity;
+                }
+                // read message into pooled buffer
+                value.get(val, 0, capacity);
+                // process the message
+                assertEquals(("value-" + i), new String(val, 0, capacity));
+            } finally {
+                msg.release();
+            }
+        }
+        consumer.close();
+        producer.close();
+    }
+    
+    /**
+     * It verifies that expiry/redelivery of messages relesaes the messages without leak.
+     * 
+     * @param isBatchingEnabled
+     * @throws Exception
+     */
+    @Test(dataProvider = "booleanFlagProvider")
+    public void testPooledMessageWithAckTimeout(boolean isBatchingEnabled) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        @Cleanup
+        PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();
+
+        final String topic = "persistent://my-property/my-ns/testPooledMessageWithAckTimeout" + isBatchingEnabled;
+
+        @Cleanup
+        ConsumerImpl<ByteBuffer> consumer = (ConsumerImpl<ByteBuffer>) newPulsarClient.newConsumer(Schema.BYTEBUFFER)
+                .topic(topic).subscriptionName("my-sub").poolMessages(true).subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = newPulsarClient.newProducer().topic(topic).enableBatching(isBatchingEnabled)
+                .create();
+
+        final int numMessages = 100;
+        for (int i = 0; i < numMessages; i++) {
+            producer.newMessage().value(("value-" + i).getBytes(UTF_8)).eventTime((i + 1) * 100L).sendAsync();
+        }
+        producer.flush();
+
+        retryStrategically((test) -> consumer.incomingMessages.peek() != null, 5, 500);
+        MessageImpl<ByteBuffer> msg = (MessageImpl) consumer.incomingMessages.peek();
+        assertNotNull(msg);
+        ByteBuf payload = ((MessageImpl) msg).getPayload();
+        assertNotEquals(payload.refCnt(), 0);
+        consumer.redeliverUnacknowledgedMessages();
+        assertEquals(payload.refCnt(), 0);
+        consumer.close();
+        producer.close();
+    }
+}
\ No newline at end of file
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 0b589e1..1038ba9 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -731,4 +731,14 @@ public interface ConsumerBuilder<T> extends Cloneable {
      * @return
      */
     ConsumerBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit);
+
+    /**
+     * Enable pooling of messages and the underlying data buffers.
+     * <p/>
+     * When pooling is enabled, the application is responsible for calling Message.release() after the handling of every
+     * received message. If “release()” is not called on a received message, there will be a memory leak. If an
+     * application attempts to use and already “released” message, it might experience undefined behavior (eg: memory
+     * corruption, deserialization error, etc.).
+     */
+    ConsumerBuilder<T> poolMessages(boolean poolMessages);
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
index 324c2ba..df3a4b9 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
@@ -67,6 +67,13 @@ public interface Message<T> {
     byte[] getData();
 
     /**
+     * Get the uncompressed message payload size in bytes.
+     * 
+     * @return size in bytes. 
+     */
+    int size();
+
+    /**
      * Get the de-serialized value of the message, according the configured {@link Schema}.
      *
      * @return the deserialized value of the message
@@ -217,4 +224,12 @@ public interface Message<T> {
      * @return the name of cluster, from which the message is replicated.
      */
     String getReplicatedFrom();
+
+    /**
+     * Release a message back to the pool. This is required only if the consumer was created with the option to pool
+     * messages, otherwise it will have no effect.
+     * 
+     * @since 2.8.0
+     */
+    void release();
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
index a9df5cc..69fcb00 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.api;
 
+import static org.apache.pulsar.client.internal.DefaultImplementation.getBytes;
 import java.nio.ByteBuffer;
 import java.sql.Time;
 import java.sql.Timestamp;
@@ -121,6 +122,22 @@ public interface Schema<T> extends Cloneable{
     }
 
     /**
+     * Decode a ByteBuffer into an object using a given version. <br/>
+     *
+     * @param data
+     *            the ByteBuffer to decode
+     * @param schemaVersion
+     *            the schema version to decode the object. null indicates using latest version.
+     * @return the deserialized object
+     */
+    default T decode(ByteBuffer data, byte[] schemaVersion) {
+        if (data == null) {
+            return null;
+        }
+        return decode(getBytes(data), schemaVersion);
+    }
+
+    /**
      * @return an object that represents the Schema associated metadata
      */
     SchemaInfo getSchemaInfo();
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
index 47b8410..dcee6a3 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
@@ -494,4 +494,24 @@ public class DefaultImplementation {
                 () -> (BatcherBuilder) getConstructor("org.apache.pulsar.client.impl.KeyBasedBatcherBuilder")
                         .newInstance());
     }
+
+    /**
+     * Retrieves ByteBuffer data into byte[].
+     * 
+     * @param byteBuffer
+     * @return
+     */
+    public static byte[] getBytes(ByteBuffer byteBuffer) {
+        if (byteBuffer == null) {
+            return null;
+        }
+        if (byteBuffer.hasArray() && byteBuffer.arrayOffset() == 0
+                && byteBuffer.array().length == byteBuffer.remaining()) {
+            return byteBuffer.array();
+        }
+        // Direct buffer is not backed by array and it needs to be read from direct memory
+        byte[] array = new byte[byteBuffer.remaining()];
+        byteBuffer.get(array);
+        return array;
+    }
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
index f5c9d03..0e4f418 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.client.cli;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
-
+import static org.apache.pulsar.client.internal.DefaultImplementation.getBytes;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
@@ -31,6 +31,7 @@ import com.google.gson.JsonPrimitive;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.URI;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Base64;
@@ -122,7 +123,9 @@ public class CmdConsume {
     @Parameter(names = { "-st", "--schema-type"}, description = "Set a schema type on the consumer, it can be 'bytes' or 'auto_consume'")
     private String schematype = "bytes";
 
-    
+    @Parameter(names = { "-pm", "--pool-messages" }, description = "Use the pooled message")
+    private boolean poolMessages = true;
+
     private ClientBuilder clientBuilder;
     private Authentication authentication;
     private String serviceURL;
@@ -171,6 +174,8 @@ public class CmdConsume {
         } else if (value instanceof GenericRecord) {
             Map<String, Object> asMap = genericRecordToMap((GenericRecord) value);
             data = asMap.toString();
+        } else if (value instanceof ByteBuffer) {
+            data = new String(getBytes((ByteBuffer) value));
         } else {
             data = value.toString();
         }
@@ -233,7 +238,7 @@ public class CmdConsume {
         try {
             ConsumerBuilder<?> builder;
             PulsarClient client = clientBuilder.build();
-            Schema<?> schema = Schema.BYTES;
+            Schema<?> schema = poolMessages ? Schema.BYTEBUFFER : Schema.BYTES;
             if ("auto_consume".equals(schematype)) {
                 schema = Schema.AUTO_CONSUME();
             } else if (!"bytes".equals(schematype)) {
@@ -243,7 +248,8 @@ public class CmdConsume {
                     .subscriptionName(this.subscriptionName)
                     .subscriptionType(subscriptionType)
                     .subscriptionMode(subscriptionMode)
-                    .subscriptionInitialPosition(subscriptionInitialPosition);
+                    .subscriptionInitialPosition(subscriptionInitialPosition)
+                    .poolMessages(poolMessages);
 
             if (isRegex) {
                 builder.topicsPattern(Pattern.compile(topic));
@@ -275,15 +281,19 @@ public class CmdConsume {
                 if (msg == null) {
                     LOG.debug("No message to consume after waiting for 5 seconds.");
                 } else {
-                    numMessagesConsumed += 1;
-                    if (!hideContent) {
-                        System.out.println(MESSAGE_BOUNDARY);
-                        String output = this.interpretMessage(msg, displayHex);
-                        System.out.println(output);
-                    } else if (numMessagesConsumed % 1000 == 0) {
-                        System.out.println("Received " + numMessagesConsumed + " messages");
+                    try {
+                        numMessagesConsumed += 1;
+                        if (!hideContent) {
+                            System.out.println(MESSAGE_BOUNDARY);
+                            String output = this.interpretMessage(msg, displayHex);
+                            System.out.println(output);
+                        } else if (numMessagesConsumed % 1000 == 0) {
+                            System.out.println("Received " + numMessagesConsumed + " messages");
+                        }  
+                        consumer.acknowledge(msg);
+                    } finally {
+                        msg.release();
                     }
-                    consumer.acknowledge(msg);
                 }
             }
             client.close();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 28c248f..dc1c858 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -908,8 +908,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     }
 
     protected void increaseIncomingMessageSize(final Message<?> message) {
-        INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(
-                this, message.getData() == null ? 0 : message.getData().length);
+        INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, message.size());
     }
 
     protected void resetIncomingMessageSize() {
@@ -917,14 +916,20 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     }
 
     protected void decreaseIncomingMessageSize(final Message<?> message) {
-        INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this,
-                (message.getData() != null) ? -message.getData().length : 0);
+        INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.size());
     }
 
     public long getIncomingMessageSize() {
         return INCOMING_MESSAGES_SIZE_UPDATER.get(this);
     }
 
+    protected void clearIncomingMessages() {
+        // release messages if they are pooled messages
+        incomingMessages.forEach(Message::release);
+        incomingMessages.clear();
+        resetIncomingMessageSize();
+    }
+
     protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);
 
     private ExecutorService getExecutor(Message<T> msg) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 338fe47..76b53ec 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -458,4 +458,9 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
         return this;
     }
 
+    @Override
+    public ConsumerBuilder<T> poolMessages(boolean poolMessages) {
+        conf.setPoolMessages(poolMessages);
+        return this;
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index ab1d44e..b5e587f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -184,6 +184,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     private final BlockingQueue<String> pendingChunkedMessageUuidQueue;
 
     private final boolean createTopicIfDoesNotExist;
+    private final boolean poolMessages;
 
     private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
 
@@ -252,6 +253,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         this.pendingChunkedMessageUuidQueue = new GrowableArrayBlockingQueue<>();
         this.expireTimeOfIncompleteChunkedMessageMillis = conf.getExpireTimeOfIncompleteChunkedMessageMillis();
         this.autoAckOldestChunkedMessageOnQueueFull = conf.isAutoAckOldestChunkedMessageOnQueueFull();
+        this.poolMessages = conf.isPoolMessages();
 
         if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
             stats = new ConsumerStatsRecorderImpl(client, conf, this);
@@ -835,7 +837,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 previousMessage = new BatchMessageIdImpl(nextMessageInQueue.getLedgerId(),
                         nextMessageInQueue.getEntryId() - 1, nextMessageInQueue.getPartitionIndex(), -1);
             }
-
+            // release messages if they are pooled messages
+            currentMessageQueue.forEach(Message::release);
             return previousMessage;
         } else if (!lastDequeuedMessageId.equals(MessageId.earliest)) {
             // If the queue was empty we need to restart from the message just after the last one that has been dequeued
@@ -1044,8 +1047,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 return;
             }
 
-            final MessageImpl<T> message = new MessageImpl<>(topicName.toString(), msgId, msgMetadata,
-                    uncompressedPayload, createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount);
+            final MessageImpl<T> message =  MessageImpl.create(topicName.toString(), msgId, msgMetadata,
+                    uncompressedPayload, createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount,
+                    poolMessages);
             uncompressedPayload.release();
 
             // Enqueue the message so that it can be retrieved when application calls receive()
@@ -1264,9 +1268,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
                 BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(),
                         messageId.getEntryId(), getPartitionIndex(), i, batchSize, acker);
-                final MessageImpl<T> message = new MessageImpl<>(topicName.toString(), batchMessageIdImpl,
+                final MessageImpl<T> message = MessageImpl.create(topicName.toString(), batchMessageIdImpl,
                         msgMetadata, singleMessageMetadata, singleMessagePayload,
-                        createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount);
+                        createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount, poolMessages);
                 if (possibleToDeadLetter != null) {
                     possibleToDeadLetter.add(message);
                 }
@@ -1542,8 +1546,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             int currentSize = 0;
             synchronized (this) {
                 currentSize = incomingMessages.size();
-                incomingMessages.clear();
-                resetIncomingMessageSize();
+                clearIncomingMessages();
                 unAckedMessageTracker.clear();
             }
             cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(consumerId), cnx.ctx().voidPromise());
@@ -1566,8 +1569,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     public int clearIncomingMessagesAndGetMessageNumber() {
         int messagesNumber = incomingMessages.size();
-        incomingMessages.clear();
-        resetIncomingMessageSize();
+        incomingMessages.forEach(Message::release);
+        clearIncomingMessages();
         unAckedMessageTracker.clear();
         return messagesNumber;
     }
@@ -1789,8 +1792,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             duringSeek.set(true);
             lastDequeuedMessageId = MessageId.earliest;
 
-            incomingMessages.clear();
-            resetIncomingMessageSize();
+            clearIncomingMessages();
             seekFuture.complete(null);
         }).exceptionally(e -> {
             log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
@@ -2113,6 +2115,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                     messageIds.add(id);
                     break;
                 }
+                message.release();
                 message = incomingMessages.poll();
             }
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
index f87c463..aee60b9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
@@ -175,7 +175,7 @@ public class ConsumerStatsRecorderImpl implements ConsumerStatsRecorder {
     public void updateNumMsgsReceived(Message<?> message) {
         if (message != null) {
             numMsgsReceived.increment();
-            numBytesReceived.add(message.getData() == null ? 0 : message.getData().length);
+            numBytesReceived.add(message.size());
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 350210c..8ed18fe 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -21,11 +21,13 @@ package org.apache.pulsar.client.impl;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.util.Recycler;
+import io.netty.util.ReferenceCountUtil;
 import io.netty.util.Recycler.Handle;
 
 import java.io.IOException;
@@ -59,17 +61,20 @@ public class MessageImpl<T> implements Message<T> {
     private final MessageMetadata msgMetadata;
     private ClientCnx cnx;
     private ByteBuf payload;
+
     private Schema<T> schema;
     private SchemaState schemaState = SchemaState.None;
     private Optional<EncryptionContext> encryptionCtx = Optional.empty();
 
     private String topic; // only set for incoming messages
     transient private Map<String, String> properties;
-    private final int redeliveryCount;
+    private int redeliveryCount;
     private int uncompressedSize;
 
     private BrokerEntryMetadata brokerEntryMetadata;
 
+    private boolean poolMessage;
+    
     // Constructor for out-going message
     public static <T> MessageImpl<T> create(MessageMetadata msgMetadata, ByteBuffer payload, Schema<T> schema) {
         @SuppressWarnings("unchecked")
@@ -94,93 +99,138 @@ public class MessageImpl<T> implements Message<T> {
 
     MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf payload,
                 Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema) {
-        this(topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema, 0);
+        this(topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema, 0, false);
     }
 
     MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf payload,
-                Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema, int redeliveryCount) {
-        this.msgMetadata = new MessageMetadata().copyFrom(msgMetadata);
-        this.messageId = messageId;
-        this.topic = topic;
-        this.cnx = cnx;
-        this.redeliveryCount = redeliveryCount;
-
-        // Need to make a copy since the passed payload is using a ref-count buffer that we don't know when could
-        // release, since the Message is passed to the user. Also, the passed ByteBuf is coming from network and is
-        // backed by a direct buffer which we could not expose as a byte[]
-        this.payload = Unpooled.copiedBuffer(payload);
-        this.encryptionCtx = encryptionCtx;
-
-        if (msgMetadata.getPropertiesCount() > 0) {
-            this.properties = Collections.unmodifiableMap(msgMetadata.getPropertiesList().stream()
-                    .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue,
-                            (oldValue,newValue) -> newValue)));
+                Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema, int redeliveryCount,
+                boolean pooledMessage) {
+        this.msgMetadata = new MessageMetadata();
+        init(this, topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema, redeliveryCount, pooledMessage);
+    }
+
+    public static <T> MessageImpl<T> create(String topic, MessageIdImpl messageId, MessageMetadata msgMetadata,
+            ByteBuf payload, Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema,
+            int redeliveryCount, boolean pooledMessage) {
+        if (pooledMessage) {
+            @SuppressWarnings("unchecked")
+            MessageImpl<T> msg = (MessageImpl<T>) RECYCLER.get();
+            init(msg, topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema, redeliveryCount,
+                    pooledMessage);
+            return msg;
         } else {
-            properties = Collections.emptyMap();
+            return new MessageImpl<>(topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema,
+                    redeliveryCount, pooledMessage);
         }
-        this.schema = schema;
     }
 
     MessageImpl(String topic, BatchMessageIdImpl batchMessageIdImpl, MessageMetadata msgMetadata,
-                SingleMessageMetadata singleMessageMetadata, ByteBuf payload,
-                Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema) {
-        this(topic, batchMessageIdImpl, msgMetadata, singleMessageMetadata, payload, encryptionCtx, cnx, schema, 0);
+            SingleMessageMetadata singleMessageMetadata, ByteBuf payload, Optional<EncryptionContext> encryptionCtx,
+            ClientCnx cnx, Schema<T> schema) {
+        this(topic, batchMessageIdImpl, msgMetadata, singleMessageMetadata, payload, encryptionCtx, cnx, schema, 0,
+                false);
     }
 
     MessageImpl(String topic, BatchMessageIdImpl batchMessageIdImpl, MessageMetadata batchMetadata,
-                SingleMessageMetadata singleMessageMetadata, ByteBuf payload,
-                Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema, int redeliveryCount) {
-        this.msgMetadata = new MessageMetadata().copyFrom(batchMetadata);
-        this.messageId = batchMessageIdImpl;
-        this.topic = topic;
-        this.cnx = cnx;
-        this.redeliveryCount = redeliveryCount;
+            SingleMessageMetadata singleMessageMetadata, ByteBuf payload, Optional<EncryptionContext> encryptionCtx,
+            ClientCnx cnx, Schema<T> schema, int redeliveryCount, boolean keepMessageInDirectMemory) {
+        this.msgMetadata = new MessageMetadata();
+        init(this, topic, batchMessageIdImpl, batchMetadata, singleMessageMetadata, payload, encryptionCtx, cnx, schema,
+                redeliveryCount, keepMessageInDirectMemory);
 
-        this.payload = Unpooled.copiedBuffer(payload);
-        this.encryptionCtx = encryptionCtx;
+    }
 
-        if (singleMessageMetadata.getPropertiesCount() > 0) {
-            Map<String, String> properties = Maps.newTreeMap();
-            for (KeyValue entry : singleMessageMetadata.getPropertiesList()) {
-                properties.put(entry.getKey(), entry.getValue());
-            }
-            this.properties = Collections.unmodifiableMap(properties);
+    public static <T> MessageImpl<T> create(String topic, BatchMessageIdImpl batchMessageIdImpl,
+            MessageMetadata batchMetadata, SingleMessageMetadata singleMessageMetadata, ByteBuf payload,
+            Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema, int redeliveryCount,
+            boolean pooledMessage) {
+        if (pooledMessage) {
+            @SuppressWarnings("unchecked")
+            MessageImpl<T> msg = (MessageImpl<T>) RECYCLER.get();
+            init(msg, topic, batchMessageIdImpl, batchMetadata, singleMessageMetadata, payload, encryptionCtx, cnx,
+                    schema, redeliveryCount, pooledMessage);
+            return msg;
         } else {
-            properties = Collections.emptyMap();
-        }
-        if (singleMessageMetadata.hasPartitionKey()) {
-            msgMetadata.setPartitionKeyB64Encoded(singleMessageMetadata.isPartitionKeyB64Encoded())
-                    .setPartitionKey(singleMessageMetadata.getPartitionKey());
-        } else if (msgMetadata.hasPartitionKey()) {
-            msgMetadata.clearPartitionKey();
-            msgMetadata.clearPartitionKeyB64Encoded();
+            return new MessageImpl<>(topic, batchMessageIdImpl, batchMetadata, singleMessageMetadata, payload,
+                    encryptionCtx, cnx, schema, redeliveryCount, pooledMessage);
         }
+    }
 
-        if (singleMessageMetadata.hasOrderingKey()) {
-            msgMetadata.setOrderingKey(singleMessageMetadata.getOrderingKey());
-        } else if (msgMetadata.hasOrderingKey()) {
-            msgMetadata.clearOrderingKey();
-        }
+    static <T> void init(MessageImpl<T> msg, String topic, MessageIdImpl messageId, MessageMetadata msgMetadata,
+            ByteBuf payload, Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema,
+            int redeliveryCount, boolean poolMessage) {
+        init(msg, topic, null /* batchMessageIdImpl */, msgMetadata, null /* singleMessageMetadata */, payload,
+                encryptionCtx, cnx, schema, redeliveryCount, poolMessage);
+        msg.messageId = messageId;
+    }
+    
+    private static <T> void init(MessageImpl<T> msg, String topic, BatchMessageIdImpl batchMessageIdImpl,
+            MessageMetadata msgMetadata, SingleMessageMetadata singleMessageMetadata, ByteBuf payload,
+            Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema, int redeliveryCount,
+            boolean poolMessage) {
+        msg.msgMetadata.clear();
+        msg.msgMetadata.copyFrom(msgMetadata);
+        msg.messageId = batchMessageIdImpl;
+        msg.topic = topic;
+        msg.cnx = cnx;
+        msg.redeliveryCount = redeliveryCount;
+        msg.encryptionCtx = encryptionCtx;
+        msg.schema = schema;
 
-        if (singleMessageMetadata.hasEventTime()) {
-            msgMetadata.setEventTime(singleMessageMetadata.getEventTime());
-        }
+        msg.poolMessage = poolMessage;
+        // If it's not pool message then need to make a copy since the passed payload is 
+        // using a ref-count buffer that we don't know when could release, since the 
+        // Message is passed to the user. Also, the passed ByteBuf is coming from network 
+        // and is backed by a direct buffer which we could not expose as a byte[]
+        msg.payload = poolMessage ? payload.retain() : Unpooled.copiedBuffer(payload);
+        
+        if (singleMessageMetadata != null) {
+            if (singleMessageMetadata.getPropertiesCount() > 0) {
+                Map<String, String> properties = Maps.newTreeMap();
+                for (KeyValue entry : singleMessageMetadata.getPropertiesList()) {
+                    properties.put(entry.getKey(), entry.getValue());
+                }
+                msg.properties = Collections.unmodifiableMap(properties);
+            } else {
+                msg.properties = Collections.emptyMap();
+            }
+            if (singleMessageMetadata.hasPartitionKey()) {
+                msg.msgMetadata.setPartitionKeyB64Encoded(singleMessageMetadata.isPartitionKeyB64Encoded())
+                        .setPartitionKey(singleMessageMetadata.getPartitionKey());
+            } else if (msg.msgMetadata.hasPartitionKey()) {
+                msg.msgMetadata.clearPartitionKey();
+                msg.msgMetadata.clearPartitionKeyB64Encoded();
+            }
 
-        if (singleMessageMetadata.hasSequenceId()) {
-            msgMetadata.setSequenceId(singleMessageMetadata.getSequenceId());
-        }
+            if (singleMessageMetadata.hasOrderingKey()) {
+                msg.msgMetadata.setOrderingKey(singleMessageMetadata.getOrderingKey());
+            } else if (msg.msgMetadata.hasOrderingKey()) {
+                msg.msgMetadata.clearOrderingKey();
+            }
 
-        if (singleMessageMetadata.hasNullValue()) {
-            msgMetadata.setNullValue(singleMessageMetadata.isNullValue());
-        }
+            if (singleMessageMetadata.hasEventTime()) {
+                msg.msgMetadata.setEventTime(singleMessageMetadata.getEventTime());
+            }
 
-        if (singleMessageMetadata.hasNullPartitionKey()) {
-            msgMetadata.setNullPartitionKey(singleMessageMetadata.isNullPartitionKey());
-        }
+            if (singleMessageMetadata.hasSequenceId()) {
+                msg.msgMetadata.setSequenceId(singleMessageMetadata.getSequenceId());
+            }
 
-        this.schema = schema;
-    }
+            if (singleMessageMetadata.hasNullValue()) {
+                msg.msgMetadata.setNullValue(singleMessageMetadata.isNullValue());
+            }
 
+            if (singleMessageMetadata.hasNullPartitionKey()) {
+                msg.msgMetadata.setNullPartitionKey(singleMessageMetadata.isNullPartitionKey());
+            }
+        } else if (msgMetadata.getPropertiesCount() > 0) {
+            msg.properties = Collections.unmodifiableMap(msgMetadata.getPropertiesList().stream().collect(
+                    Collectors.toMap(KeyValue::getKey, KeyValue::getValue, (oldValue, newValue) -> newValue)));
+        } else {
+            msg.properties = Collections.emptyMap();
+        }
+    }
+    
     public MessageImpl(String topic, String msgId, Map<String, String> properties,
             byte[] payload, Schema<T> schema, MessageMetadata msgMetadata) {
         this(topic, msgId, properties, Unpooled.wrappedBuffer(payload), schema, msgMetadata);
@@ -311,6 +361,11 @@ public class MessageImpl<T> implements Message<T> {
         if (msgMetadata.isNullValue()) {
             return null;
         }
+        if (payload.isDirect()) {
+            byte[] data = new byte[payload.readableBytes()];
+            payload.getBytes(payload.readerIndex(), data);
+            return data;
+        }
         if (payload.arrayOffset() == 0 && payload.capacity() == payload.array().length) {
             return payload.array();
         } else {
@@ -321,6 +376,14 @@ public class MessageImpl<T> implements Message<T> {
         }
     }
 
+    @Override
+    public int size() {
+        if (msgMetadata.isNullValue()) {
+            return 0;
+        }
+        return payload.readableBytes();
+    }
+
     public Schema<T> getSchema() {
         return this.schema;
     }
@@ -356,19 +419,11 @@ public class MessageImpl<T> implements Message<T> {
             }
             // check if the schema passed in from client supports schema versioning or not
             // this is an optimization to only get schema version when necessary
-            if (schema.supportSchemaVersioning()) {
-                byte[] schemaVersion = getSchemaVersion();
-                if (null == schemaVersion) {
-                    return schema.decode(getData());
-                } else {
-                    return schema.decode(getData(), schemaVersion);
-                }
-            } else {
-                return schema.decode(getData());
-            }
+            return decode(schema.supportSchemaVersioning() ? getSchemaVersion() : null);
         }
     }
 
+
     private KeyValueSchema getKeyValueSchema() {
         if (schema instanceof AutoConsumeSchema) {
             return (KeyValueSchema) ((AutoConsumeSchema) schema).getInternalSchema();
@@ -377,6 +432,19 @@ public class MessageImpl<T> implements Message<T> {
         }
     }
 
+
+    private T decode(byte[] schemaVersion) {
+        T value = poolMessage ? schema.decode(payload.nioBuffer(), schemaVersion) : null;
+        if (value != null) {
+            return value;
+        }
+        if (null == schemaVersion) {
+            return schema.decode(getData());
+        } else {
+            return schema.decode(getData(), schemaVersion);
+        }
+    }
+    
     private T getKeyValueBySchemaVersion() {
         KeyValueSchema kvSchema = getKeyValueSchema();
         byte[] schemaVersion = getSchemaVersion();
@@ -390,7 +458,7 @@ public class MessageImpl<T> implements Message<T> {
                 return (T) keyValue;
             }
         } else {
-            return schema.decode(getData(), schemaVersion);
+            return decode(schemaVersion);
         }
     }
 
@@ -406,7 +474,7 @@ public class MessageImpl<T> implements Message<T> {
                 return (T) keyValue;
             }
         } else {
-            return schema.decode(getData());
+            return decode(null);
         }
     }
 
@@ -527,24 +595,42 @@ public class MessageImpl<T> implements Message<T> {
     }
 
     public void recycle() {
-        msgMetadata.clear();
+        if (msgMetadata != null) {
+            msgMetadata.clear();
+        }
+        if (brokerEntryMetadata != null) {
+            brokerEntryMetadata.clear();
+        }
+        cnx = null;
         messageId = null;
         topic = null;
         payload = null;
+        encryptionCtx = null;
+        redeliveryCount = 0;
+        uncompressedSize = 0;
         properties = null;
         schema = null;
         schemaState = SchemaState.None;
-        brokerEntryMetadata = null;
+        poolMessage = false;
 
         if (recyclerHandle != null) {
             recyclerHandle.recycle(this);
         }
     }
 
+    @Override
+    public void release() {
+        if (poolMessage) {
+            ReferenceCountUtil.safeRelease(payload);
+            recycle();
+        }
+    }
+
     private MessageImpl(Handle<MessageImpl<?>> recyclerHandle) {
         this.recyclerHandle = recyclerHandle;
         this.redeliveryCount = 0;
         this.msgMetadata = new MessageMetadata();
+        this.brokerEntryMetadata = new BrokerEntryMetadata();
     }
 
     private Handle<MessageImpl<?>> recyclerHandle;
@@ -590,7 +676,15 @@ public class MessageImpl<T> implements Message<T> {
         this.schemaState = schemaState;
     }
 
-
+    /**
+     * used only for unit-test to validate payload's state and ref-cnt.
+     * 
+     * @return
+     */
+    @VisibleForTesting
+    ByteBuf getPayload() {
+        return payload;
+    }
 
     enum SchemaState {
         None, Ready, Broken
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
index c56694e..4ff23eb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
@@ -49,7 +49,7 @@ public class MessagesImpl<T> implements Messages<T> {
             return false;
         }
 
-        if (maxSizeOfMessages > 0 && currentSizeOfMessages + message.getData().length > maxSizeOfMessages) {
+        if (maxSizeOfMessages > 0 && currentSizeOfMessages + message.size() > maxSizeOfMessages) {
             return false;
         }
 
@@ -62,7 +62,7 @@ public class MessagesImpl<T> implements Messages<T> {
         }
         Preconditions.checkArgument(canAdd(message), "No more space to add messages.");
         currentNumberOfMessages ++;
-        currentSizeOfMessages += message.getData().length;
+        currentSizeOfMessages += message.size();
         messageList.add(message);
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 8033c18..94cb7d7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -602,8 +602,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             consumer.redeliverUnacknowledgedMessages();
             consumer.unAckedChunkedMessageIdSequenceMap.clear();
         });
-        incomingMessages.clear();
-        resetIncomingMessageSize();
+        clearIncomingMessages();
         unAckedMessageTracker.clear();
 
         resumeReceivingFromPausedConsumersIfNeeded();
@@ -687,8 +686,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         consumers.values().forEach(consumerImpl -> futures.add(consumerImpl.seekAsync(targetMessageId)));
 
         unAckedMessageTracker.clear();
-        incomingMessages.clear();
-        resetIncomingMessageSize();
+        clearIncomingMessages();
 
         return FutureUtil.waitForAll(futures);
     }
@@ -777,6 +775,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                     messageIds.add(messageId);
                     break;
                 }
+                message.release();
                 message = incomingMessages.poll();
             }
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index e0c0ef1..8c94eea 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -94,6 +94,11 @@ public class TopicMessageImpl<T> implements Message<T> {
     }
 
     @Override
+    public int size() {
+        return msg.size();
+    }
+
+    @Override
     public long getPublishTime() {
         return msg.getPublishTime();
     }
@@ -184,4 +189,9 @@ public class TopicMessageImpl<T> implements Message<T> {
         }
         return null;
     }
+
+    @Override
+    public void release() {
+        msg.release();
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index d353d38..98d5b48 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -85,6 +85,7 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
         // Just being cautious
         if (incomingMessages.size() > 0) {
             log.error("The incoming message queue should never be greater than 0 when Queue size is 0");
+            incomingMessages.forEach(Message::release);
             incomingMessages.clear();
         }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 32fa5e8..a39ac21 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -146,6 +146,8 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
     private boolean batchIndexAckEnabled = false;
 
     private boolean ackReceiptEnabled = false;
+    
+    private boolean poolMessages = false;
 
     public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit timeUnit) {
         checkArgument(interval > 0, "interval needs to be > 0");
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
index 255a491..3205a64 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
@@ -42,6 +42,7 @@ public abstract class AbstractSchema<T> implements Schema<T> {
 
     /**
      * Decode a byteBuf into an object using the schema definition and deserializer implementation
+     * <p>Do not modify reader/writer index of ByteBuf so, it can be reused to access correct data.
      *
      * @param byteBuf
      *            the byte buffer to decode
@@ -61,7 +62,7 @@ public abstract class AbstractSchema<T> implements Schema<T> {
         // ignore version by default (most of the primitive schema implementations ignore schema version)
         return decode(byteBuf);
     }
-
+    
     @Override
     public Schema<T> clone() {
         return this;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
index 4a79931..c560f0e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
@@ -76,18 +76,23 @@ public class ByteBufferSchema extends AbstractSchema<ByteBuffer> {
     }
 
     @Override
-    public ByteBuffer decode(ByteBuf byteBuf) {
-        if (null == byteBuf) {
+    public ByteBuffer decode(ByteBuf byteBuffer) {
+        if (null == byteBuffer) {
             return null;
         } else {
-            int size = byteBuf.readableBytes();
+            int size = byteBuffer.readableBytes();
             byte[] bytes = new byte[size];
-            byteBuf.readBytes(bytes);
+            byteBuffer.readBytes(bytes);
             return ByteBuffer.wrap(bytes);
         }
     }
 
     @Override
+    public ByteBuffer decode(ByteBuffer byteBuffer, byte[] schemaVersion) {
+        return byteBuffer;
+    }
+
+    @Override
     public SchemaInfo getSchemaInfo() {
         return SCHEMA_INFO;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
index 9447b00..9c7ec37 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
@@ -60,7 +60,7 @@ public class BytesSchema extends AbstractSchema<byte[]> {
         int size = byteBuf.readableBytes();
         byte[] bytes = new byte[size];
 
-        byteBuf.readBytes(bytes, 0, size);
+        byteBuf.getBytes(byteBuf.readerIndex(), bytes);
         return bytes;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
index 8a6c4fb..aa86a19 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
@@ -76,8 +76,8 @@ public class LocalDateTimeSchema extends AbstractSchema<LocalDateTime> {
       if (null == byteBuf) {
          return null;
       }
-      long epochDay = byteBuf.readLong();
-      long nanoOfDay = byteBuf.readLong();
+      long epochDay = byteBuf.getLong(0);
+      long nanoOfDay = byteBuf.getLong(8);
       return LocalDateTime.of(LocalDate.ofEpochDay(epochDay), LocalTime.ofNanoOfDay(nanoOfDay));
    }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
index 7e813ac..7e57f6c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
@@ -120,7 +120,7 @@ public class StringSchema extends AbstractSchema<String> {
                 bytes = new byte[size * 2];
                 tmpBuffer.set(bytes);
             }
-            byteBuf.readBytes(bytes, 0, size);
+            byteBuf.getBytes(byteBuf.readerIndex(), bytes, 0, size);
 
             return new String(bytes, 0, size, charset);
         }
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index ca6b07e..034b613 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -26,6 +26,9 @@ import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
 import java.io.FileInputStream;
+import java.lang.management.BufferPoolMXBean;
+import java.lang.management.ManagementFactory;
+import java.nio.ByteBuffer;
 import java.text.DecimalFormat;
 import java.util.Collections;
 import java.util.List;
@@ -41,9 +44,11 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.stats.JvmMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -168,6 +173,9 @@ public class PerformanceConsumer {
     
         @Parameter(names = {"--batch-index-ack" }, description = "Enable or disable the batch index acknowledgment")
         public boolean batchIndexAck = false;
+
+        @Parameter(names = { "-pm", "--pool-messages" }, description = "Use the pooled message")
+        private boolean poolMessages = true;
     }
 
     public static void main(String[] args) throws Exception {
@@ -271,7 +279,7 @@ public class PerformanceConsumer {
         final RateLimiter limiter = arguments.rate > 0 ? RateLimiter.create(arguments.rate) : null;
         long startTime = System.nanoTime();
         long testEndTime = startTime + (long) (arguments.testTime * 1e9);
-        MessageListener<byte[]> listener = (consumer, msg) -> {
+        MessageListener<ByteBuffer> listener = (consumer, msg) -> {
             if (arguments.testTime > 0) {
                 if (System.nanoTime() > testEndTime) {
                     log.info("------------------- DONE -----------------------");
@@ -280,10 +288,10 @@ public class PerformanceConsumer {
                 }
             }
             messagesReceived.increment();
-            bytesReceived.add(msg.getData().length);
+            bytesReceived.add(msg.size());
 
             totalMessagesReceived.increment();
-            totalBytesReceived.add(msg.getData().length);
+            totalBytesReceived.add(msg.size());
 
             if (limiter != null) {
                 limiter.acquire();
@@ -296,6 +304,10 @@ public class PerformanceConsumer {
             }
 
             consumer.acknowledgeAsync(msg);
+
+            if(arguments.poolMessages) {
+                msg.release();
+            }
         };
 
         ClientBuilder clientBuilder = PulsarClient.builder() //
@@ -318,8 +330,8 @@ public class PerformanceConsumer {
 
         PulsarClient pulsarClient = clientBuilder.build();
 
-        List<Future<Consumer<byte[]>>> futures = Lists.newArrayList();
-        ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer() //
+        List<Future<Consumer<ByteBuffer>>> futures = Lists.newArrayList();
+        ConsumerBuilder<ByteBuffer> consumerBuilder = pulsarClient.newConsumer(Schema.BYTEBUFFER) //
                 .messageListener(listener) //
                 .receiverQueueSize(arguments.receiverQueueSize) //
                 .maxTotalReceiverQueueSizeAcrossPartitions(arguments.maxTotalReceiverQueueSizeAcrossPartitions)
@@ -328,6 +340,7 @@ public class PerformanceConsumer {
                 .subscriptionInitialPosition(arguments.subscriptionInitialPosition)
                 .autoAckOldestChunkedMessageOnQueueFull(arguments.autoAckOldestChunkedMessageOnQueueFull)
                 .enableBatchIndexAcknowledgment(arguments.batchIndexAck)
+                .poolMessages(arguments.poolMessages)
                 .replicateSubscriptionState(arguments.replicatedSubscription);
         if (arguments.maxPendingChunkedMessage > 0) {
             consumerBuilder.maxPendingChunkedMessage(arguments.maxPendingChunkedMessage);
@@ -355,7 +368,7 @@ public class PerformanceConsumer {
             }
         }
 
-        for (Future<Consumer<byte[]>> future : futures) {
+        for (Future<Consumer<ByteBuffer>> future : futures) {
             future.get();
         }