You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/10/06 08:40:12 UTC

[pulsar] branch master updated: [PIP 96] Add message payload processor for Pulsar client (#12088)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 86b2a21  [PIP 96] Add message payload processor for Pulsar client (#12088)
86b2a21 is described below

commit 86b2a212d4bf38d536814517d6e91ea8d6be7a58
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Oct 6 16:39:28 2021 +0800

    [PIP 96] Add message payload processor for Pulsar client (#12088)
---
 .../pulsar/client/processor/CustomBatchFormat.java |  74 +++++++
 .../processor/CustomBatchPayloadProcessor.java     |  61 ++++++
 .../client/processor/CustomBatchProducer.java      |  70 ++++++
 .../processor/DefaultProcessorWithRefCnt.java      |  47 ++++
 .../processor/MessagePayloadProcessorTest.java     | 220 +++++++++++++++++++
 .../apache/pulsar/client/api/ConsumerBuilder.java  |   8 +
 .../apache/pulsar/client/api/MessagePayload.java   |  42 ++++
 .../pulsar/client/api/MessagePayloadContext.java   |  80 +++++++
 .../pulsar/client/api/MessagePayloadFactory.java   |  46 ++++
 .../pulsar/client/api/MessagePayloadProcessor.java |  70 ++++++
 .../PulsarClientImplementationBinding.java         |   3 +
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |   7 +
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 236 +++++++++++++++------
 .../client/impl/MessagePayloadContextImpl.java     | 148 +++++++++++++
 .../client/impl/MessagePayloadFactoryImpl.java     |  37 ++++
 .../pulsar/client/impl/MessagePayloadImpl.java     |  72 +++++++
 .../pulsar/client/impl/MessagePayloadUtils.java    |  34 +++
 .../PulsarClientImplementationBindingImpl.java     |   4 +
 .../impl/conf/ConsumerConfigurationData.java       |   4 +
 .../pulsar/client/api/MessagePayloadTest.java      | 104 +++++++++
 20 files changed, 1297 insertions(+), 70 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchFormat.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchFormat.java
new file mode 100644
index 0000000..571d292
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchFormat.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.processor;
+
+import io.netty.buffer.ByteBuf;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+
+/**
+ * A batch message whose format is customized.
+ *
+ * 1. First 2 bytes represent the number of messages.
+ * 2. Each message is a string, whose format is
+ *   1. First 2 bytes represent the length `N`.
+ *   2. Followed N bytes are the bytes of the string.
+ */
+public class CustomBatchFormat {
+
+    public static final String KEY = "entry.format";
+    public static final String VALUE = "custom";
+
+    @AllArgsConstructor
+    @Getter
+    public static class Metadata {
+        private final int numMessages;
+    }
+
+    public static ByteBuf serialize(Iterable<String> strings) {
+        final ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(1024);
+        buf.writeShort(0);
+        short numMessages = 0;
+        for (String s : strings) {
+            writeString(buf, s);
+            numMessages++;
+        }
+        buf.setShort(0, numMessages);
+        return buf;
+    }
+
+    private static void writeString(final ByteBuf buf, final String s) {
+        final byte[] bytes = Schema.STRING.encode(s);
+        buf.writeShort(bytes.length);
+        buf.writeBytes(bytes);
+    }
+
+    public static Metadata readMetadata(final ByteBuf buf) {
+        return new Metadata(buf.readShort());
+    }
+
+    public static byte[] readMessage(final ByteBuf buf) {
+        final short length = buf.readShort();
+        final byte[] bytes = new byte[length];
+        buf.readBytes(bytes);
+        return bytes;
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchPayloadProcessor.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchPayloadProcessor.java
new file mode 100644
index 0000000..c83b13b
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchPayloadProcessor.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.processor;
+
+import io.netty.buffer.ByteBuf;
+import java.util.function.Consumer;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.MessagePayloadContext;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessagePayload;
+import org.apache.pulsar.client.api.MessagePayloadFactory;
+import org.apache.pulsar.client.api.MessagePayloadProcessor;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessagePayloadUtils;
+
+@Slf4j
+public class CustomBatchPayloadProcessor implements MessagePayloadProcessor {
+
+    @Override
+    public <T> void process(MessagePayload payload, MessagePayloadContext context, Schema<T> schema,
+                            Consumer<Message<T>> messageConsumer) throws Exception {
+        final String value = context.getProperty(CustomBatchFormat.KEY);
+        if (value == null || !value.equals(CustomBatchFormat.VALUE)) {
+            DEFAULT.process(payload, context, schema, messageConsumer);
+            return;
+        }
+
+        final ByteBuf buf = MessagePayloadUtils.convertToByteBuf(payload);
+        try {
+            final int numMessages = CustomBatchFormat.readMetadata(buf).getNumMessages();
+            for (int i = 0; i < numMessages; i++) {
+                final MessagePayload singlePayload =
+                        MessagePayloadFactory.DEFAULT.wrap(CustomBatchFormat.readMessage(buf));
+                try {
+                    messageConsumer.accept(
+                            context.getMessageAt(i, numMessages, singlePayload, false, schema));
+                } finally {
+                    singlePayload.release();
+                }
+            }
+        } finally {
+            buf.release();
+        }
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchProducer.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchProducer.java
new file mode 100644
index 0000000..bc07206
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchProducer.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.processor;
+
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+
+@RequiredArgsConstructor
+@Slf4j
+public class CustomBatchProducer {
+
+    private final List<String> messages = new ArrayList<>();
+    private final PersistentTopic persistentTopic;
+    private final int batchingMaxMessages;
+
+    public void sendAsync(final String value) {
+        messages.add(value);
+        if (messages.size() >= batchingMaxMessages) {
+            flush();
+        }
+    }
+
+    public void flush() {
+        final ByteBuf buf = CustomBatchFormat.serialize(messages);
+        final ByteBuf headerAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.None,
+                createCustomMetadata(), buf);
+        buf.release();
+        persistentTopic.publishMessage(headerAndPayload, (e, ledgerId, entryId) -> {
+            if (e == null) {
+                log.info("Send successfully to {} ({}, {})", persistentTopic.getName(), ledgerId, entryId);
+            } else {
+                log.error("Failed to send: {}", e.getMessage());
+            }
+        });
+        messages.clear();
+    }
+
+    private static MessageMetadata createCustomMetadata() {
+        final MessageMetadata messageMetadata = new MessageMetadata();
+        // Here are required fields
+        messageMetadata.setProducerName("");
+        messageMetadata.setSequenceId(0L);
+        messageMetadata.setPublishTime(0L);
+        // Add the property to identify the message format
+        messageMetadata.addProperty().setKey(CustomBatchFormat.KEY).setValue(CustomBatchFormat.VALUE);
+        return messageMetadata;
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/DefaultProcessorWithRefCnt.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/DefaultProcessorWithRefCnt.java
new file mode 100644
index 0000000..63e295f
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/DefaultProcessorWithRefCnt.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.processor;
+
+import java.util.function.Consumer;
+import lombok.Getter;
+import org.apache.pulsar.client.api.MessagePayloadContext;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessagePayload;
+import org.apache.pulsar.client.api.MessagePayloadProcessor;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessagePayloadImpl;
+
+/**
+ * The processor for Pulsar format messages and maintains a total reference count.
+ *
+ * It's used to verify {@link MessagePayloadContext#getMessageAt} and {@link MessagePayloadContext#asSingleMessage} have release the
+ * ByteBuf successfully.
+ */
+public class DefaultProcessorWithRefCnt implements MessagePayloadProcessor {
+
+    @Getter
+    int totalRefCnt = 0;
+
+    @Override
+    public <T> void process(MessagePayload payload, MessagePayloadContext context, Schema<T> schema,
+                            Consumer<Message<T>> messageConsumer) throws Exception {
+        MessagePayloadProcessor.DEFAULT.process(payload, context, schema, messageConsumer);
+        totalRefCnt += ((MessagePayloadImpl) payload).getByteBuf().refCnt();
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/MessagePayloadProcessorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/MessagePayloadProcessorTest.java
new file mode 100644
index 0000000..6a12101
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/MessagePayloadProcessorTest.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.processor;
+
+import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import io.netty.buffer.ByteBuf;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessagePayloadProcessor;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Test for {@link MessagePayloadProcessor}.
+ */
+@Slf4j
+@Test(groups = "broker-impl")
+public class MessagePayloadProcessorTest extends ProducerConsumerBase {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        admin.clusters().createCluster("test",
+                ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        admin.tenants().createTenant("public",
+                new TenantInfoImpl(Sets.newHashSet("appid"), Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("public/default", Sets.newHashSet("test"));
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @DataProvider
+    public static Object[][] config() {
+        return new Object[][] {
+                // numPartitions / enableBatching / batchingMaxMessages
+                { 1, true, 1 },
+                { 1, true, 4 },
+                { 1, false, 1 },
+                { 3, false, 1 }
+        };
+    }
+
+    @DataProvider
+    public static Object[][] customBatchConfig() {
+        return new Object[][] {
+                // numMessages / batchingMaxMessages
+                { 10, 1 },
+                { 10, 4 }
+        };
+    }
+
+    @Test(dataProvider = "config")
+    public void testDefaultProcessor(int numPartitions, boolean enableBatching, int batchingMaxMessages)
+            throws Exception {
+        final String topic = "testDefaultProcessor-" + numPartitions + "-" + enableBatching + "-" + batchingMaxMessages;
+        final int numMessages = 10;
+        final String messagePrefix = "msg-";
+
+        if (numPartitions > 1) {
+            admin.topics().createPartitionedTopic(topic, numPartitions);
+        }
+
+        @Cleanup
+        final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(enableBatching)
+                .batchingMaxMessages(batchingMaxMessages)
+                .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
+                .messageRouter(new MessageRouter() {
+                    int i = 0;
+
+                    @Override
+                    public int choosePartition(Message<?> msg, TopicMetadata metadata) {
+                        return i++ % metadata.numPartitions();
+                    }
+                })
+                .create();
+        for (int i = 0; i < numMessages; i++) {
+            final String value = messagePrefix + i;
+            producer.sendAsync(value).whenComplete((id, e) -> {
+                if (e == null) {
+                    log.info("Send {} to {} {}", value, topic, id);
+                } else {
+                    log.error("Failed to send {}: {}", value, e.getMessage());
+                }
+            });
+        }
+
+        final DefaultProcessorWithRefCnt processor = new DefaultProcessorWithRefCnt();
+        @Cleanup
+        final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .messagePayloadProcessor(processor)
+                .subscribe();
+        final List<String> values = new ArrayList<>();
+        for (int i = 0; i < numMessages; i++) {
+            final Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
+            Assert.assertNotNull(message);
+            values.add(message.getValue());
+            consumer.acknowledge(message.getMessageId());
+        }
+
+        if (numPartitions > 1) {
+            // messages are out of order across multiple partitions
+            Collections.sort(values);
+        }
+        for (int i = 0; i < numMessages; i++) {
+            Assert.assertEquals(values.get(i), messagePrefix + i);
+        }
+
+        // Each buffer's refCnt is 2 because after retrieving the refCnt, there will be two release for the ByteBuf:
+        // 1. ConsumerImpl#processPayloadByProcessor
+        // 2. PulsarDecoder#channelRead
+        if (enableBatching) {
+            int numBatches = numMessages / batchingMaxMessages;
+            numBatches += (numMessages % batchingMaxMessages == 0) ? 0 : 1;
+            Assert.assertEquals(processor.getTotalRefCnt(), 2 * numBatches);
+        } else {
+            Assert.assertEquals(processor.getTotalRefCnt(), 2 * numMessages);
+        }
+    }
+
+    @Test
+    public void testCustomBatchFormat() {
+        final List<List<String>> inputs = new ArrayList<>();
+        inputs.add(Collections.emptyList());
+        inputs.add(Collections.singletonList("java"));
+        inputs.add(Arrays.asList("hello", "world", "java"));
+
+        for (List<String> input : inputs) {
+            final ByteBuf buf = CustomBatchFormat.serialize(input);
+
+            final CustomBatchFormat.Metadata metadata = CustomBatchFormat.readMetadata(buf);
+            final List<String> parsedTokens = new ArrayList<>();
+            for (int i = 0; i < metadata.getNumMessages(); i++) {
+                parsedTokens.add(Schema.STRING.decode(CustomBatchFormat.readMessage(buf)));
+            }
+
+            Assert.assertEquals(parsedTokens, input);
+            Assert.assertEquals(parsedTokens.size(), input.size());
+
+            Assert.assertEquals(buf.refCnt(), 1);
+            buf.release();
+        }
+    }
+
+    @Test(dataProvider = "customBatchConfig")
+    public void testCustomProcessor(final int numMessages, final int batchingMaxMessages) throws Exception {
+        final String topic = "persistent://public/default/testCustomProcessor-"
+                + numMessages + "-" + batchingMaxMessages;
+
+        @Cleanup
+        final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .messagePayloadProcessor(new CustomBatchPayloadProcessor())
+                .subscribe();
+
+        final PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().orElse(null);
+        Assert.assertNotNull(persistentTopic);
+
+        final String messagePrefix = "msg-";
+        final CustomBatchProducer producer = new CustomBatchProducer(persistentTopic, batchingMaxMessages);
+        for (int i = 0; i < numMessages; i++) {
+            producer.sendAsync(messagePrefix + i);
+        }
+        producer.flush();
+
+        for (int i = 0; i < numMessages; i++) {
+            final Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
+            Assert.assertNotNull(message);
+            Assert.assertEquals(message.getValue(), messagePrefix + i);
+            consumer.acknowledge(message.getMessageId());
+        }
+    }
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 1038ba9..3c3ce17 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -741,4 +741,12 @@ public interface ConsumerBuilder<T> extends Cloneable {
      * corruption, deserialization error, etc.).
      */
     ConsumerBuilder<T> poolMessages(boolean poolMessages);
+
+    /**
+     * If it's configured with a non-null value, the consumer will use the processor to process the payload, including
+     * decoding it to messages and triggering the listener.
+     *
+     * Default: null
+     */
+    ConsumerBuilder<T> messagePayloadProcessor(MessagePayloadProcessor payloadProcessor);
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayload.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayload.java
new file mode 100644
index 0000000..5d2ff63
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayload.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * The abstraction of a message's payload.
+ */
+public interface MessagePayload {
+
+    /**
+     * Copy the bytes of the payload into the byte array.
+     *
+     * @return the byte array that is filled with the readable bytes of the payload, it should not be null
+     */
+    byte[] copiedBuffer();
+
+    /**
+     * Release the resources if necessary.
+     *
+     * NOTE: For a MessagePayload object that is created from {@link MessagePayloadFactory#DEFAULT}, this method must be
+     * called to avoid memory leak.
+     */
+    default void release() {
+        // No ops
+    }
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadContext.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadContext.java
new file mode 100644
index 0000000..a444de7
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadContext.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * The context of the message payload, which usually represents a batched message (batch) or a single message.
+ */
+public interface MessagePayloadContext {
+
+    /**
+     * Get a value associated with the given key.
+     *
+     * When the message payload is not produced by Pulsar producer, a specific property is usually added to indicate the
+     * format. So this method is useful to determine whether the payload is produced by Pulsar producer.
+     *
+     * @param key
+     * @return the value associated with the key or null if the key or value doesn't exist
+     */
+    String getProperty(String key);
+
+    /**
+     * Get the number of messages when the payload is produced by Pulsar producer.
+     *
+     * @return the number of messages
+     */
+    int getNumMessages();
+
+    /**
+     * Check whether the payload is a batch when the payload is produced by Pulsar producer.
+     *
+     * @return true if the payload is a batch
+     */
+    boolean isBatch();
+
+    /**
+     * Get the internal single message with a specific index from a payload if the payload is a batch.
+     *
+     * @param index the batch index
+     * @param numMessages the number of messages in the batch
+     * @param payload the message payload
+     * @param containMetadata whether the payload contains the single message metadata
+     * @param schema the schema of the batch
+     * @param <T>
+     * @return the created message
+     * @implNote The `index` and `numMessages` parameters are used to create the message id with batch index.
+     *   If `containMetadata` is true, parse the single message metadata from the payload first. The fields of single
+     *   message metadata will overwrite the same fields of the entry's metadata.
+     */
+    <T> Message<T> getMessageAt(int index,
+                                int numMessages,
+                                MessagePayload payload,
+                                boolean containMetadata,
+                                Schema<T> schema);
+
+    /**
+     * Convert the given payload to a single message if the entry is not a batch.
+     *
+     * @param payload the message payload
+     * @param schema the schema of the message
+     * @param <T>
+     * @return the created message
+     */
+    <T> Message<T> asSingleMessage(MessagePayload payload, Schema<T> schema);
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadFactory.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadFactory.java
new file mode 100644
index 0000000..0181d6d
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadFactory.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.nio.ByteBuffer;
+import org.apache.pulsar.client.internal.DefaultImplementation;
+
+/**
+ * The factory class of {@link MessagePayload}.
+ */
+public interface MessagePayloadFactory {
+
+    MessagePayloadFactory DEFAULT = DefaultImplementation.getDefaultImplementation().newDefaultMessagePayloadFactory();
+
+    /**
+     * Create a payload whose underlying buffer refers to a byte array.
+     *
+     * @param bytes the byte array
+     * @return the created MessagePayload object
+     */
+    MessagePayload wrap(byte[] bytes);
+
+    /**
+     * Create a payload whose underlying buffer refers to a NIO buffer.
+     *
+     * @param buffer the NIO buffer
+     * @return the created MessagePayload object
+     */
+    MessagePayload wrap(ByteBuffer buffer);
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadProcessor.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadProcessor.java
new file mode 100644
index 0000000..1d50a50
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadProcessor.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.util.function.Consumer;
+
+/**
+ * The processor to process a message payload.
+ *
+ * It's responsible to convert the raw buffer to some messages, then trigger some callbacks so that consumer can consume
+ * these messages and handle the exception if it existed.
+ *
+ * The most important part is to decode the raw buffer. After that, we can call
+ * {@link MessagePayloadContext#getMessageAt} or {@link MessagePayloadContext#asSingleMessage} to construct
+ * {@link Message} for consumer to consume. Since we need to pass the {@link MessagePayload} object to these methods, we
+ * can use {@link MessagePayloadFactory#DEFAULT} to create it or just reuse the payload argument.
+ */
+public interface MessagePayloadProcessor {
+
+    /**
+     * Process the message payload.
+     *
+     * @param payload the payload whose underlying buffer is a Netty ByteBuf
+     * @param context the message context that contains the message format information and methods to create a message
+     * @param schema the message's schema
+     * @param messageConsumer the callback to consume each message
+     * @param <T>
+     * @throws Exception
+     */
+    <T> void process(MessagePayload payload,
+                     MessagePayloadContext context,
+                     Schema<T> schema,
+                     Consumer<Message<T>> messageConsumer) throws Exception;
+
+    // The default processor for Pulsar format payload. It should be noted getNumMessages() and isBatch() methods of
+    // EntryContext only work for Pulsar format. For other formats, the message metadata might be stored in the payload.
+    MessagePayloadProcessor DEFAULT = new MessagePayloadProcessor() {
+
+        @Override
+        public <T> void process(MessagePayload payload,
+                                MessagePayloadContext context,
+                                Schema<T> schema,
+                                Consumer<Message<T>> messageConsumer) {
+            if (context.isBatch()) {
+                final int numMessages = context.getNumMessages();
+                for (int i = 0; i < numMessages; i++) {
+                    messageConsumer.accept(context.getMessageAt(i, numMessages, payload, true, schema));
+                }
+            } else {
+                messageConsumer.accept(context.asSingleMessage(payload, schema));
+            }
+        }
+    };
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
index 98b4d5c..f7bcf05 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
@@ -21,6 +21,7 @@ import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessagePayloadFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -229,6 +230,8 @@ public interface PulsarClientImplementationBinding {
 
     BatcherBuilder newKeyBasedBatcherBuilder();
 
+    MessagePayloadFactory newDefaultMessagePayloadFactory();
+
     /**
      * Retrieves ByteBuffer data into byte[].
      *
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index dba18d9..cbfc27d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.MessageCrypto;
 import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.MessagePayloadProcessor;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
@@ -479,4 +480,10 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
         conf.setPoolMessages(poolMessages);
         return this;
     }
+
+    @Override
+    public ConsumerBuilder<T> messagePayloadProcessor(MessagePayloadProcessor payloadProcessor) {
+        conf.setPayloadProcessor(payloadProcessor);
+        return this;
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 98767d1..97fb13f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -48,9 +48,9 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@@ -996,6 +996,135 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         });
     }
 
+    protected boolean isBatch(MessageMetadata messageMetadata) {
+        // if message is not decryptable then it can't be parsed as a batch-message. so, add EncyrptionCtx to message
+        // and return undecrypted payload
+        return !isMessageUndecryptable(messageMetadata) &&
+                (messageMetadata.hasNumMessagesInBatch() || messageMetadata.getNumMessagesInBatch() != 1);
+    }
+
+    protected <U> MessageImpl<U> newSingleMessage(final int index,
+                                                  final int numMessages,
+                                                  final BrokerEntryMetadata brokerEntryMetadata,
+                                                  final MessageMetadata msgMetadata,
+                                                  final SingleMessageMetadata singleMessageMetadata,
+                                                  final ByteBuf payload,
+                                                  final MessageIdImpl messageId,
+                                                  final Schema<U> schema,
+                                                  final boolean containMetadata,
+                                                  final BitSetRecyclable ackBitSet,
+                                                  final BatchMessageAcker acker,
+                                                  final int redeliveryCount) {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] [{}] processing message num - {} in batch", subscription, consumerName, index);
+        }
+
+        ByteBuf singleMessagePayload = null;
+        try {
+            if (containMetadata) {
+                singleMessagePayload =
+                        Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, index, numMessages);
+            }
+
+            if (isSameEntry(messageId) && isPriorBatchIndex(index)) {
+                // If we are receiving a batch message, we need to discard messages that were prior
+                // to the startMessageId
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription,
+                            consumerName, startMessageId);
+                }
+                return null;
+            }
+
+            if (singleMessageMetadata != null && singleMessageMetadata.isCompactedOut()) {
+                // message has been compacted out, so don't send to the user
+                return null;
+            }
+
+            if (ackBitSet != null && !ackBitSet.get(index)) {
+                return null;
+            }
+
+            BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(),
+                    messageId.getEntryId(), getPartitionIndex(), index, numMessages, acker);
+
+            final ByteBuf payloadBuffer = (singleMessagePayload != null) ? singleMessagePayload : payload;
+            final MessageImpl<U> message = MessageImpl.create(topicName.toString(), batchMessageIdImpl,
+                    msgMetadata, singleMessageMetadata, payloadBuffer,
+                    createEncryptionContext(msgMetadata), cnx(), schema, redeliveryCount, poolMessages
+            );
+            message.setBrokerEntryMetadata(brokerEntryMetadata);
+            return message;
+        } catch (IOException | IllegalStateException e) {
+            throw new IllegalStateException(e);
+        } finally {
+            if (singleMessagePayload != null) {
+                singleMessagePayload.release();
+            }
+        }
+    }
+
+    protected <U> MessageImpl<U> newMessage(final MessageIdImpl messageId,
+                                            final BrokerEntryMetadata brokerEntryMetadata,
+                                            final MessageMetadata messageMetadata,
+                                            final ByteBuf payload,
+                                            final Schema<U> schema,
+                                            final int redeliveryCount) {
+        final MessageImpl<U> message = MessageImpl.create(topicName.toString(), messageId, messageMetadata, payload,
+                createEncryptionContext(messageMetadata), cnx(), schema, redeliveryCount, poolMessages
+        );
+        message.setBrokerEntryMetadata(brokerEntryMetadata);
+        return message;
+    }
+
+    private void executeNotifyCallback(final MessageImpl<T> message) {
+        // Enqueue the message so that it can be retrieved when application calls receive()
+        // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
+        // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
+        internalPinnedExecutor.execute(() -> {
+            if (hasNextPendingReceive()) {
+                notifyPendingReceivedCallback(message, null);
+            } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
+                notifyPendingBatchReceivedCallBack();
+            }
+        });
+    }
+
+    private void processPayloadByProcessor(final BrokerEntryMetadata brokerEntryMetadata,
+                                           final MessageMetadata messageMetadata,
+                                           final ByteBuf byteBuf,
+                                           final MessageIdImpl messageId,
+                                           final Schema<T> schema,
+                                           final int redeliveryCount,
+                                           final List<Long> ackSet) {
+        final MessagePayloadImpl payload = MessagePayloadImpl.create(byteBuf);
+        final MessagePayloadContextImpl entryContext = MessagePayloadContextImpl.get(
+                brokerEntryMetadata, messageMetadata, messageId, this, redeliveryCount, ackSet);
+        final AtomicInteger skippedMessages = new AtomicInteger(0);
+        try {
+            conf.getPayloadProcessor().process(payload, entryContext, schema, message -> {
+                if (message != null) {
+                    executeNotifyCallback((MessageImpl<T>) message);
+                } else {
+                    skippedMessages.incrementAndGet();
+                }
+            });
+        } catch (Throwable throwable) {
+            log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName, throwable);
+            discardCorruptedMessage(messageId, cnx(), ValidationError.BatchDeSerializeError);
+        } finally {
+            entryContext.recycle();
+            payload.release(); // byteBuf.release() is called in this method
+        }
+
+        if (skippedMessages.get() > 0) {
+            increaseAvailablePermits(cnx(), skippedMessages.get());
+        }
+
+        internalPinnedExecutor.execute(()
+                -> tryTriggerListener());
+    }
+
     void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ackSet, ByteBuf headersAndPayload, ClientCnx cnx) {
         if (log.isDebugEnabled()) {
             log.debug("[{}][{}] Received message: {}/{}", topic, subscription, messageId.getLedgerId(),
@@ -1051,6 +1180,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             return;
         }
 
+        if (conf.getPayloadProcessor() != null) {
+            // uncompressedPayload is released in this method so we don't need to call release() again
+            processPayloadByProcessor(
+                    brokerEntryMetadata, msgMetadata, uncompressedPayload, msgId, schema, redeliveryCount, ackSet);
+            return;
+        }
+
         // if message is not decryptable then it can't be parsed as a batch-message. so, add EncyrptionCtx to message
         // and return undecrypted payload
         if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) {
@@ -1063,7 +1199,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 }
             }
 
-            if (isSameEntry(messageId) && isPriorEntryIndex(messageId.getEntryId())) {
+            if (isSameEntry(msgId) && isPriorEntryIndex(messageId.getEntryId())) {
                 // We need to discard entries that were prior to startMessageId
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription,
@@ -1074,27 +1210,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 return;
             }
 
-            final MessageImpl<T> message = MessageImpl.create(topicName.toString(), msgId, msgMetadata,
-                    uncompressedPayload, createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount,
-                    poolMessages);
+            final MessageImpl<T> message =
+                    newMessage(msgId, brokerEntryMetadata, msgMetadata, uncompressedPayload, schema, redeliveryCount);
             uncompressedPayload.release();
-            message.setBrokerEntryMetadata(brokerEntryMetadata);
 
-            // Enqueue the message so that it can be retrieved when application calls receive()
-            // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
-            // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
-            internalPinnedExecutor.execute(() -> {
-                if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
-                        redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
-                    possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(),
-                            Collections.singletonList(message));
-                }
-                if (hasNextPendingReceive()) {
-                    notifyPendingReceivedCallback(message, null);
-                } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
-                    notifyPendingBatchReceivedCallBack();
-                }
-            });
+            if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
+                    redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
+                possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(),
+                        Collections.singletonList(message));
+            }
+            executeNotifyCallback(message);
         } else {
             // handle batch message enqueuing; uncompressed payload has all messages in batch
             receiveIndividualMessagesFromBatch(brokerEntryMetadata, msgMetadata, redeliveryCount, ackSet, uncompressedPayload, messageId, cnx);
@@ -1268,63 +1393,23 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         int skippedMessages = 0;
         try {
             for (int i = 0; i < batchSize; ++i) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] [{}] processing message num - {} in batch", subscription, consumerName, i);
-                }
-
-                ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
-                        singleMessageMetadata, i, batchSize);
-
-                if (isSameEntry(messageId) && isPriorBatchIndex(i)) {
-                    // If we are receiving a batch message, we need to discard messages that were prior
-                    // to the startMessageId
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription,
-                                consumerName, startMessageId);
-                    }
-                    singleMessagePayload.release();
-
-                    ++skippedMessages;
+                final MessageImpl<T> message = newSingleMessage(i, batchSize, brokerEntryMetadata, msgMetadata,
+                        singleMessageMetadata, uncompressedPayload, batchMessage, schema, true, ackBitSet, acker,
+                        redeliveryCount);
+                if (message == null) {
+                    skippedMessages++;
                     continue;
                 }
-
-                if (singleMessageMetadata.isCompactedOut()) {
-                    // message has been compacted out, so don't send to the user
-                    singleMessagePayload.release();
-
-                    ++skippedMessages;
-                    continue;
-                }
-
-                if (ackBitSet != null && !ackBitSet.get(i)) {
-                    singleMessagePayload.release();
-                    ++skippedMessages;
-                    continue;
-                }
-
-                BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(),
-                        messageId.getEntryId(), getPartitionIndex(), i, batchSize, acker);
-                final MessageImpl<T> message = MessageImpl.create(topicName.toString(), batchMessageIdImpl,
-                        msgMetadata, singleMessageMetadata, singleMessagePayload,
-                        createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount, poolMessages);
-                message.setBrokerEntryMetadata(brokerEntryMetadata);
                 if (possibleToDeadLetter != null) {
                     possibleToDeadLetter.add(message);
                 }
-                internalPinnedExecutor.execute(() -> {
-                    if (hasNextPendingReceive()) {
-                        notifyPendingReceivedCallback(message, null);
-                    } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
-                        notifyPendingBatchReceivedCallBack();
-                    }
-                    singleMessagePayload.release();
-                });
+                executeNotifyCallback(message);
             }
             if (ackBitSet != null) {
                 ackBitSet.recycle();
             }
-        } catch (IOException e) {
-            log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName);
+        } catch (IllegalStateException e) {
+            log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName, e);
             discardCorruptedMessage(messageId, cnx, ValidationError.BatchDeSerializeError);
         }
 
@@ -1350,7 +1435,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         return resetIncludeHead ? idx < startMessageId.getBatchIndex() : idx <= startMessageId.getBatchIndex();
     }
 
-    private boolean isSameEntry(MessageIdData messageId) {
+    private boolean isSameEntry(MessageIdImpl messageId) {
         return startMessageId != null
                 && messageId.getLedgerId() == startMessageId.getLedgerId()
                 && messageId.getEntryId() == startMessageId.getEntryId();
@@ -1544,6 +1629,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         return true;
     }
 
+    private void discardCorruptedMessage(MessageIdImpl messageId, ClientCnx currentCnx,
+                                         ValidationError validationError) {
+        log.error("[{}][{}] Discarding corrupted message at {}:{}", topic, subscription, messageId.getLedgerId(),
+                messageId.getEntryId());
+        ByteBuf cmd = Commands.newAck(consumerId, messageId.getLedgerId(), messageId.getEntryId(), null, AckType.Individual,
+                validationError, Collections.emptyMap(), -1);
+        currentCnx.ctx().writeAndFlush(cmd, currentCnx.ctx().voidPromise());
+        increaseAvailablePermits(currentCnx);
+        stats.incrementNumReceiveFailed();
+    }
+
     private void discardCorruptedMessage(MessageIdData messageId, ClientCnx currentCnx,
             ValidationError validationError) {
         log.error("[{}][{}] Discarding corrupted message at {}:{}", topic, subscription, messageId.getLedgerId(),
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java
new file mode 100644
index 0000000..aa6cab8
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.Recycler;
+import java.util.List;
+import lombok.NonNull;
+import org.apache.pulsar.client.api.MessagePayloadContext;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessagePayload;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
+import org.apache.pulsar.common.api.proto.KeyValue;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
+import org.apache.pulsar.common.util.SafeCollectionUtils;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+
+public class MessagePayloadContextImpl implements MessagePayloadContext {
+
+    private static final Recycler<MessagePayloadContextImpl> RECYCLER = new Recycler<MessagePayloadContextImpl>() {
+        @Override
+        protected MessagePayloadContextImpl newObject(Handle<MessagePayloadContextImpl> handle) {
+            return new MessagePayloadContextImpl(handle);
+        }
+    };
+
+    private final Recycler.Handle<MessagePayloadContextImpl> recyclerHandle;
+    private BrokerEntryMetadata brokerEntryMetadata;
+    private MessageMetadata messageMetadata;
+    private SingleMessageMetadata singleMessageMetadata;
+    private MessageIdImpl messageId;
+    private ConsumerImpl<?> consumer;
+    private int redeliveryCount;
+    private BatchMessageAcker acker;
+    private BitSetRecyclable ackBitSet;
+
+    private MessagePayloadContextImpl(final Recycler.Handle<MessagePayloadContextImpl> handle) {
+        this.recyclerHandle = handle;
+    }
+
+    public static MessagePayloadContextImpl get(final BrokerEntryMetadata brokerEntryMetadata,
+                                                @NonNull final MessageMetadata messageMetadata,
+                                                @NonNull final MessageIdImpl messageId,
+                                                @NonNull final ConsumerImpl<?> consumer,
+                                                final int redeliveryCount,
+                                                final List<Long> ackSet) {
+        final MessagePayloadContextImpl context = RECYCLER.get();
+        context.brokerEntryMetadata = brokerEntryMetadata;
+        context.messageMetadata = messageMetadata;
+        context.singleMessageMetadata = new SingleMessageMetadata();
+        context.messageId = messageId;
+        context.consumer = consumer;
+        context.redeliveryCount = redeliveryCount;
+        context.acker = BatchMessageAcker.newAcker(context.getNumMessages());
+        context.ackBitSet = (ackSet != null && ackSet.size() > 0)
+                ? BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(ackSet))
+                : null;
+        return context;
+    }
+
+    public void recycle() {
+        brokerEntryMetadata = null;
+        messageMetadata = null;
+        singleMessageMetadata = null;
+        messageId = null;
+        consumer = null;
+        redeliveryCount = 0;
+        acker = null;
+        if (ackBitSet != null) {
+            ackBitSet.recycle();
+            ackBitSet = null;
+        }
+    }
+
+    @Override
+    public String getProperty(String key) {
+        for (KeyValue keyValue : messageMetadata.getPropertiesList()) {
+            if (keyValue.hasKey() && keyValue.getKey().equals(key)) {
+                return keyValue.getValue();
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public int getNumMessages() {
+        return messageMetadata.getNumMessagesInBatch();
+    }
+
+    @Override
+    public boolean isBatch() {
+        return consumer.isBatch(messageMetadata);
+    }
+
+    @Override
+    public <T> Message<T> getMessageAt(int index,
+                                       int numMessages,
+                                       MessagePayload payload,
+                                       boolean containMetadata,
+                                       Schema<T> schema) {
+        final ByteBuf payloadBuffer = MessagePayloadUtils.convertToByteBuf(payload);
+        try {
+            return consumer.newSingleMessage(index,
+                    numMessages,
+                    brokerEntryMetadata,
+                    messageMetadata,
+                    singleMessageMetadata,
+                    payloadBuffer,
+                    messageId,
+                    schema,
+                    containMetadata,
+                    ackBitSet,
+                    acker,
+                    redeliveryCount);
+        } finally {
+            payloadBuffer.release();
+        }
+    }
+
+    @Override
+    public <T> Message<T> asSingleMessage(MessagePayload payload, Schema<T> schema) {
+        final ByteBuf payloadBuffer = MessagePayloadUtils.convertToByteBuf(payload);
+        try {
+            return consumer.newMessage(
+                    messageId, brokerEntryMetadata, messageMetadata, payloadBuffer, schema, redeliveryCount);
+        } finally {
+            payloadBuffer.release();
+        }
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadFactoryImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadFactoryImpl.java
new file mode 100644
index 0000000..70a5b56
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadFactoryImpl.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.buffer.Unpooled;
+import java.nio.ByteBuffer;
+import org.apache.pulsar.client.api.MessagePayload;
+import org.apache.pulsar.client.api.MessagePayloadFactory;
+
+public class MessagePayloadFactoryImpl implements MessagePayloadFactory {
+
+    @Override
+    public MessagePayload wrap(byte[] bytes) {
+        return MessagePayloadImpl.create(Unpooled.wrappedBuffer(bytes));
+    }
+
+    @Override
+    public MessagePayload wrap(ByteBuffer buffer) {
+        return MessagePayloadImpl.create(Unpooled.wrappedBuffer(buffer));
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadImpl.java
new file mode 100644
index 0000000..d99cd83
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadImpl.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.Recycler;
+import io.netty.util.ReferenceCountUtil;
+import lombok.Getter;
+import lombok.NonNull;
+import org.apache.pulsar.client.api.MessagePayload;
+
+/**
+ * A wrapper of {@link ByteBuf} that implements {@link MessagePayload}.
+ */
+public class MessagePayloadImpl implements MessagePayload {
+
+    private static final Recycler<MessagePayloadImpl> RECYCLER = new Recycler<MessagePayloadImpl>() {
+        @Override
+        protected MessagePayloadImpl newObject(Handle<MessagePayloadImpl> handle) {
+            return new MessagePayloadImpl(handle);
+        }
+    };
+
+    private final Recycler.Handle<MessagePayloadImpl> recyclerHandle;
+    @Getter
+    private ByteBuf byteBuf;
+
+    public static MessagePayloadImpl create(@NonNull final ByteBuf byteBuf) {
+        final MessagePayloadImpl payload = RECYCLER.get();
+        payload.byteBuf = byteBuf;
+        return payload;
+    }
+
+    private MessagePayloadImpl(final Recycler.Handle<MessagePayloadImpl> handle) {
+        this.recyclerHandle = handle;
+    }
+
+    @Override
+    public void release() {
+        ReferenceCountUtil.release(byteBuf);
+        byteBuf = null;
+        recyclerHandle.recycle(this);
+    }
+
+    @Override
+    public byte[] copiedBuffer() {
+        final int readable = byteBuf.readableBytes();
+        if (readable > 0) {
+            final byte[] bytes = new byte[readable];
+            byteBuf.getBytes(byteBuf.readerIndex(), bytes);
+            return bytes;
+        } else {
+            return new byte[0];
+        }
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadUtils.java
new file mode 100644
index 0000000..64faffe
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadUtils.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.pulsar.client.api.MessagePayload;
+
+public class MessagePayloadUtils {
+
+    public static ByteBuf convertToByteBuf(final MessagePayload payload) {
+        if (payload instanceof MessagePayloadImpl) {
+            return ((MessagePayloadImpl) payload).getByteBuf().retain();
+        } else {
+            return Unpooled.wrappedBuffer(payload.copiedBuffer());
+        }
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
index 225e246..c146f23 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessagePayloadFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -379,4 +380,7 @@ public final class PulsarClientImplementationBindingImpl implements PulsarClient
         return new KeyBasedBatcherBuilder();
     }
 
+    public MessagePayloadFactory newDefaultMessagePayloadFactory() {
+        return new MessagePayloadFactoryImpl();
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index a39ac21..1076946 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.MessageCrypto;
 import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.MessagePayloadProcessor;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
@@ -149,6 +150,9 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
     
     private boolean poolMessages = false;
 
+    @JsonIgnore
+    private transient MessagePayloadProcessor payloadProcessor = null;
+
     public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit timeUnit) {
         checkArgument(interval > 0, "interval needs to be > 0");
         this.autoUpdatePartitionsIntervalSeconds = timeUnit.toSeconds(interval);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessagePayloadTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessagePayloadTest.java
new file mode 100644
index 0000000..f980e51
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessagePayloadTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import io.netty.buffer.ByteBuf;
+import java.nio.ByteBuffer;
+import org.apache.pulsar.client.impl.MessagePayloadImpl;
+import org.apache.pulsar.client.impl.MessagePayloadUtils;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test of {@link MessagePayload}.
+ */
+public class MessagePayloadTest {
+
+    @Test
+    public void testConvertMessagePayloadImpl() {
+        final ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(1);
+
+        final MessagePayloadImpl payload = MessagePayloadImpl.create(buf);
+        Assert.assertEquals(buf.refCnt(), 1);
+
+        final ByteBuf convertedBuf = MessagePayloadUtils.convertToByteBuf(payload);
+        Assert.assertSame(convertedBuf, buf);
+
+        Assert.assertEquals(buf.refCnt(), 2);
+        buf.release();
+        buf.release();
+    }
+
+    @Test
+    public void testConvertCustomPayload() {
+        final ByteBuffer buffer = ByteBuffer.allocate(3);
+        buffer.put(new byte[]{ 0x11, 0x22, 0x33 });
+        buffer.flip();
+        buffer.get(); // skip 1st byte
+
+        final ByteBuf buf = MessagePayloadUtils.convertToByteBuf(new ByteBufferPayload(buffer));
+        Assert.assertEquals(buf.refCnt(), 1);
+
+        Assert.assertEquals(buf.readableBytes(), 2);
+        Assert.assertEquals(buf.readByte(), 0x22);
+        Assert.assertEquals(buf.readByte(), 0x33);
+
+        buf.release();
+    }
+
+    @Test
+    public void testConvertEmptyCustomPayload() {
+        final ByteBuf buf = MessagePayloadUtils.convertToByteBuf(new ByteBufferPayload(ByteBuffer.allocate(0)));
+        Assert.assertEquals(buf.refCnt(), 1);
+        Assert.assertEquals(buf.readableBytes(), 0);
+        buf.release();
+    }
+
+    private static class ByteBufferPayload implements MessagePayload {
+
+        private final ByteBuffer buffer;
+
+        public ByteBufferPayload(final ByteBuffer buffer) {
+            this.buffer = buffer;
+        }
+
+        @Override
+        public byte[] copiedBuffer() {
+            final byte[] bytes = new byte[buffer.remaining()];
+            buffer.get(bytes);
+            return bytes;
+        }
+    }
+
+    @Test
+    public void testFactoryWrap() {
+        MessagePayloadImpl payload = (MessagePayloadImpl) MessagePayloadFactory.DEFAULT.wrap(new byte[1]);
+        ByteBuf byteBuf = payload.getByteBuf();
+        Assert.assertEquals(byteBuf.refCnt(), 1);
+        payload.release();
+        Assert.assertEquals(byteBuf.refCnt(), 0);
+
+        payload = (MessagePayloadImpl) MessagePayloadFactory.DEFAULT.wrap(ByteBuffer.allocate(1));
+        byteBuf = payload.getByteBuf();
+        Assert.assertEquals(byteBuf.refCnt(), 1);
+        payload.release();
+        Assert.assertEquals(byteBuf.refCnt(), 0);
+    }
+}