You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/18 08:27:18 UTC

[GitHub] [pulsar] BewareMyPower opened a new pull request #12088: [PIP 96] Add payload converter for Pulsar client

BewareMyPower opened a new pull request #12088:
URL: https://github.com/apache/pulsar/pull/12088


   Master Issue: #12087 
   
   ### Motivation
   
   See https://github.com/apache/pulsar/issues/12087 for details.
   
   ### Modifications
   
   - Add a `PayloadConverter` interface to convert payload buffer according to `BrokerEntryMetadata` and `MessageMetadata`.
   - Support configuring whether to enable the converter and the package of the converter.
   - If it's enabled, when creating a consumer, loading the implemented class from the configured package and creating an instance with the default constructor.  `PayloadConverterProxy` is responsible for this.
   - Use the converted payload for `messageReceived` and release the payload buffer finally if it's a new allocated buffer.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   Two converters are added to pulsar-broker module for test:
   - org.apache.pulsar.client.converter.AddPrefixPayloadConverter: add a fixed prefix to the payload if the specific property is contained in the `MessageMetadata`.
   - org.apache.pulsar.client.converter.broker.AppendIndexPayloadConverter: add the index suffix to the payload if the `BrokerEntryMetadata` has the index field.
   
   Each class is associated with a test that uses `PayloadConverterValidator` to test whether the implementation is valid.
   
   Two test classes are added under `org.apache.pulsar.client.converter` package in pulsar-broker module:
   1. BrokerEntryMetadataTest: test `AppendIndexPayloadConverter` when broker entry metadata is enabled and exposed to client.
   2. PayloadConverterTest
       - testDefaultConverter: the case that payload converter is not enabled and the converter package configuration works.
       - testCustomConverter: the case that payload converter is enabled for a single topic.
       - testCustomConverterMultiTopics: the case that payload converter is enabled for multiple topics.
       - testAppendIndexPayloadConverter: the case that `AppendIndexPayloadConverter` will be found but broker entry metadata is not enabled.
   
   ### Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (**yes**)
     - The public API: (**yes**)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #12088: [PIP 96] Add message payload processor for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #12088:
URL: https://github.com/apache/pulsar/pull/12088#issuecomment-931208124


   Now this PR is ready for review. And the PR description was updated as well.
   
   Currently there's two +1 in [vote email](https://lists.apache.org/thread.html/rb4df052fdea1d6791ecdebdd09bb4cbc27ffec307ad678afb1157c6c%40%3Cdev.pulsar.apache.org%3E), you can take a look first, @eolivelli @codelipenghui 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #12088: [PIP 96] Add message payload processor for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #12088:
URL: https://github.com/apache/pulsar/pull/12088#issuecomment-934080093


   Comments were addressed, PTAL again @eolivelli 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #12088: [PIP 96] Add message payload processor for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #12088:
URL: https://github.com/apache/pulsar/pull/12088#issuecomment-933609511


   @eolivelli @315157973 @codelipenghui @hangc0276 @merlimat @rdhabalia PTAL when you have time since Pulsar 2.9.0 release cut is coming.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #12088: [PIP 96] Add message payload processor for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #12088:
URL: https://github.com/apache/pulsar/pull/12088#discussion_r721899954



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchFormat.java
##########
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.processor;
+
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * A batch message whose format is customized.
+ *
+ * 1. First 2 bytes represent the number of messages.
+ * 2. Each message is a string, whose format is
+ *   1. First 2 bytes represent the length `N`.
+ *   2. Followed N bytes are the bytes of the string.
+ */
+public class CustomBatchFormat {
+
+    public static final String KEY = "entry.format";
+    public static final String VALUE = "custom";
+
+    @AllArgsConstructor
+    @Getter
+    public static class Metadata {
+        private final int numMessages;
+    }
+
+    public static ByteBuf serialize(Iterable<String> strings) {
+        final ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(1024);
+        buf.writeShort(0);
+        short numMessages = 0;
+        for (String s : strings) {
+            writeString(buf, s);
+            numMessages++;
+        }
+        buf.setShort(0, numMessages);
+        return buf;
+    }
+
+    private static void writeString(final ByteBuf buf, final String s) {
+        final byte[] bytes = Schema.STRING.encode(s);
+        buf.writeShort(bytes.length);
+        buf.writeBytes(bytes);
+    }
+
+    public static Metadata readMetadata(final ByteBuf buf) {
+        return new Metadata(buf.readShort());
+    }
+
+    public static byte[] readMessage(final ByteBuf buf) {
+        final short length = buf.readShort();
+        final byte[] bytes = new byte[length];
+        buf.readBytes(bytes);
+        return bytes;
+    }
+
+    @Test
+    public void testMultipleStrings() {

Review comment:
       I will move it to `MessagePayloadProcessorTest` and add the `@Test(groups = "broker-impl")` annotation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli merged pull request #12088: [PIP 96] Add message payload processor for Pulsar client

Posted by GitBox <gi...@apache.org>.
eolivelli merged pull request #12088:
URL: https://github.com/apache/pulsar/pull/12088


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Demogorgon314 commented on a change in pull request #12088: [PIP 96] Add payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on a change in pull request #12088:
URL: https://github.com/apache/pulsar/pull/12088#discussion_r711568900



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/converter/PayloadConverterProxy.java
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.converter;
+
+import io.netty.buffer.ByteBuf;
+import java.util.Iterator;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.api.raw.PayloadConverter;
+import org.reflections.Reflections;
+import org.reflections.scanners.SubTypesScanner;
+
+/**
+ * The proxy class for {@link PayloadConverter}.
+ */
+@Slf4j
+public class PayloadConverterProxy {
+
+    private final PayloadConverter payloadConverter;
+
+    public PayloadConverterProxy(final boolean enabled, final String packageName) {
+        this.payloadConverter = createConverter(enabled, packageName);
+    }
+
+    public ByteBuf convert(final BrokerEntryMetadata brokerEntryMetadata,
+                           final MessageMetadata metadata,
+                           final ByteBuf payload) {
+        return payloadConverter.convert(brokerEntryMetadata, metadata, payload);
+    }
+
+    private static PayloadConverter createConverter(final boolean enabled, final String packageName) {
+        if (!enabled) {
+            return (brokerEntryMetadata, metadata, payload) -> payload;
+        }
+
+        final Reflections reflections = new Reflections(packageName, new SubTypesScanner(false));
+        final Set<Class<? extends PayloadConverter>> converters = reflections.getSubTypesOf(PayloadConverter.class);
+        final Iterator<Class<? extends PayloadConverter>> iterator = converters.iterator();
+
+        PayloadConverter payloadConverter = null;
+        while (iterator.hasNext()) {
+            try {
+                final Class<? extends PayloadConverter> clazz = iterator.next();
+                if (!clazz.getPackage().getName().equals(packageName)) {
+                    // Ignore the sub packages
+                    continue;
+                }
+                payloadConverter = clazz.newInstance();

Review comment:
       Better to use `clazz.getDeclaredConstructor().newInstance()`, because `clazz.newInstance()` are bypasses compile-time exception checking, and since JDK 9 `clazz.newInstance()` is deprecated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #12088: [PIP 96] Add payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #12088:
URL: https://github.com/apache/pulsar/pull/12088#discussion_r711588876



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/converter/PayloadConverterProxy.java
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.converter;
+
+import io.netty.buffer.ByteBuf;
+import java.util.Iterator;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.api.raw.PayloadConverter;
+import org.reflections.Reflections;
+import org.reflections.scanners.SubTypesScanner;
+
+/**
+ * The proxy class for {@link PayloadConverter}.
+ */
+@Slf4j
+public class PayloadConverterProxy {
+
+    private final PayloadConverter payloadConverter;
+
+    public PayloadConverterProxy(final boolean enabled, final String packageName) {
+        this.payloadConverter = createConverter(enabled, packageName);
+    }
+
+    public ByteBuf convert(final BrokerEntryMetadata brokerEntryMetadata,
+                           final MessageMetadata metadata,
+                           final ByteBuf payload) {
+        return payloadConverter.convert(brokerEntryMetadata, metadata, payload);
+    }
+
+    private static PayloadConverter createConverter(final boolean enabled, final String packageName) {
+        if (!enabled) {
+            return (brokerEntryMetadata, metadata, payload) -> payload;
+        }
+
+        final Reflections reflections = new Reflections(packageName, new SubTypesScanner(false));
+        final Set<Class<? extends PayloadConverter>> converters = reflections.getSubTypesOf(PayloadConverter.class);
+        final Iterator<Class<? extends PayloadConverter>> iterator = converters.iterator();
+
+        PayloadConverter payloadConverter = null;
+        while (iterator.hasNext()) {
+            try {
+                final Class<? extends PayloadConverter> clazz = iterator.next();
+                if (!clazz.getPackage().getName().equals(packageName)) {
+                    // Ignore the sub packages
+                    continue;
+                }
+                payloadConverter = clazz.newInstance();

Review comment:
       I used `clazz.getConstructor().getInstance()` instead, because it only allows public constructor. PTAL again.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #12088: [PIP 96] Add message payload processor for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #12088:
URL: https://github.com/apache/pulsar/pull/12088#discussion_r721899375



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -996,6 +996,130 @@ void activeConsumerChanged(boolean isActive) {
         });
     }
 
+    protected boolean isBatch(MessageMetadata messageMetadata) {
+        // if message is not decryptable then it can't be parsed as a batch-message. so, add EncyrptionCtx to message
+        // and return undecrypted payload
+        return !isMessageUndecryptable(messageMetadata) &&
+                (messageMetadata.hasNumMessagesInBatch() || messageMetadata.getNumMessagesInBatch() != 1);
+    }
+
+    protected <U> MessageImpl<U> newSingleMessage(final int index,
+                                                  final int numMessages,
+                                                  final BrokerEntryMetadata brokerEntryMetadata,
+                                                  final MessageMetadata msgMetadata,
+                                                  final SingleMessageMetadata singleMessageMetadata,
+                                                  final ByteBuf payload,
+                                                  final MessageIdImpl messageId,
+                                                  final Schema<U> schema,
+                                                  final boolean containMetadata,
+                                                  final BitSetRecyclable ackBitSet,
+                                                  final BatchMessageAcker acker,
+                                                  final int redeliveryCount) {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] [{}] processing message num - {} in batch", subscription, consumerName, index);
+        }
+
+        ByteBuf singleMessagePayload = null;
+        try {
+            if (containMetadata) {
+                singleMessagePayload =
+                        Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, index, numMessages);
+            }
+
+            if (isSameEntry(messageId) && isPriorBatchIndex(index)) {
+                // If we are receiving a batch message, we need to discard messages that were prior
+                // to the startMessageId
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription,
+                            consumerName, startMessageId);
+                }
+                return null;
+            }
+
+            if (singleMessageMetadata != null && singleMessageMetadata.isCompactedOut()) {
+                // message has been compacted out, so don't send to the user
+                return null;
+            }
+
+            if (ackBitSet != null && !ackBitSet.get(index)) {
+                return null;
+            }
+
+            BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(),
+                    messageId.getEntryId(), getPartitionIndex(), index, numMessages, acker);
+
+            final ByteBuf payloadBuffer = (singleMessagePayload != null) ? singleMessagePayload : payload;
+            return MessageImpl.create(topicName.toString(), batchMessageIdImpl,
+                    msgMetadata, singleMessageMetadata, payloadBuffer,
+                    createEncryptionContext(msgMetadata), cnx(), schema, redeliveryCount, poolMessages
+            ).setBrokerEntryMetadata(brokerEntryMetadata);
+        } catch (IOException | IllegalStateException e) {
+            throw new IllegalStateException(e);
+        } finally {
+            if (singleMessagePayload != null) {
+                singleMessagePayload.release();
+            }
+        }
+    }
+
+    protected <U> MessageImpl<U> newMessage(final MessageIdImpl messageId,
+                                            final BrokerEntryMetadata brokerEntryMetadata,
+                                            final MessageMetadata messageMetadata,
+                                            final ByteBuf payload,
+                                            final Schema<U> schema,
+                                            final int redeliveryCount) {
+        return MessageImpl.create(topicName.toString(), messageId, messageMetadata, payload,
+                createEncryptionContext(messageMetadata), cnx(), schema, redeliveryCount, poolMessages
+        ).setBrokerEntryMetadata(brokerEntryMetadata);
+    }
+
+    private void executeNotifyCallback(final MessageImpl<T> message) {
+        // Enqueue the message so that it can be retrieved when application calls receive()
+        // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
+        // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
+        internalPinnedExecutor.execute(() -> {
+            if (hasNextPendingReceive()) {
+                notifyPendingReceivedCallback(message, null);
+            } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
+                notifyPendingBatchReceivedCallBack();
+            }
+        });
+    }
+
+    private void processPayloadByProcessor(final BrokerEntryMetadata brokerEntryMetadata,
+                                           final MessageMetadata messageMetadata,
+                                           final ByteBuf byteBuf,
+                                           final MessageIdImpl messageId,
+                                           final Schema<T> schema,
+                                           final int redeliveryCount,
+                                           final List<Long> ackSet) {
+        final MessagePayloadImpl payload = MessagePayloadImpl.create(byteBuf);
+        final MessagePayloadContextImpl entryContext = MessagePayloadContextImpl.get(
+                brokerEntryMetadata, messageMetadata, messageId, this, redeliveryCount, ackSet);
+        final AtomicInteger skippedMessages = new AtomicInteger(0);
+        try {
+            conf.getPayloadProcessor().process(payload, entryContext, schema, message -> {
+                if (message == null) {
+                    skippedMessages.incrementAndGet();
+                }
+                executeNotifyCallback((MessageImpl<T>) message);
+            });

Review comment:
       Nice point. I forgot to skip this call when I refactored the iterable interface to current implementation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #12088: [PIP 96] Add message payload processor for Pulsar client

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #12088:
URL: https://github.com/apache/pulsar/pull/12088#discussion_r721740084



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -996,6 +996,130 @@ void activeConsumerChanged(boolean isActive) {
         });
     }
 
+    protected boolean isBatch(MessageMetadata messageMetadata) {
+        // if message is not decryptable then it can't be parsed as a batch-message. so, add EncyrptionCtx to message
+        // and return undecrypted payload
+        return !isMessageUndecryptable(messageMetadata) &&
+                (messageMetadata.hasNumMessagesInBatch() || messageMetadata.getNumMessagesInBatch() != 1);
+    }
+
+    protected <U> MessageImpl<U> newSingleMessage(final int index,
+                                                  final int numMessages,
+                                                  final BrokerEntryMetadata brokerEntryMetadata,
+                                                  final MessageMetadata msgMetadata,
+                                                  final SingleMessageMetadata singleMessageMetadata,
+                                                  final ByteBuf payload,
+                                                  final MessageIdImpl messageId,
+                                                  final Schema<U> schema,
+                                                  final boolean containMetadata,
+                                                  final BitSetRecyclable ackBitSet,
+                                                  final BatchMessageAcker acker,
+                                                  final int redeliveryCount) {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] [{}] processing message num - {} in batch", subscription, consumerName, index);
+        }
+
+        ByteBuf singleMessagePayload = null;
+        try {
+            if (containMetadata) {
+                singleMessagePayload =
+                        Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, index, numMessages);
+            }
+
+            if (isSameEntry(messageId) && isPriorBatchIndex(index)) {
+                // If we are receiving a batch message, we need to discard messages that were prior
+                // to the startMessageId
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription,
+                            consumerName, startMessageId);
+                }
+                return null;
+            }
+
+            if (singleMessageMetadata != null && singleMessageMetadata.isCompactedOut()) {
+                // message has been compacted out, so don't send to the user
+                return null;
+            }
+
+            if (ackBitSet != null && !ackBitSet.get(index)) {
+                return null;
+            }
+
+            BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(),
+                    messageId.getEntryId(), getPartitionIndex(), index, numMessages, acker);
+
+            final ByteBuf payloadBuffer = (singleMessagePayload != null) ? singleMessagePayload : payload;
+            return MessageImpl.create(topicName.toString(), batchMessageIdImpl,
+                    msgMetadata, singleMessageMetadata, payloadBuffer,
+                    createEncryptionContext(msgMetadata), cnx(), schema, redeliveryCount, poolMessages
+            ).setBrokerEntryMetadata(brokerEntryMetadata);
+        } catch (IOException | IllegalStateException e) {
+            throw new IllegalStateException(e);
+        } finally {
+            if (singleMessagePayload != null) {
+                singleMessagePayload.release();
+            }
+        }
+    }
+
+    protected <U> MessageImpl<U> newMessage(final MessageIdImpl messageId,
+                                            final BrokerEntryMetadata brokerEntryMetadata,
+                                            final MessageMetadata messageMetadata,
+                                            final ByteBuf payload,
+                                            final Schema<U> schema,
+                                            final int redeliveryCount) {
+        return MessageImpl.create(topicName.toString(), messageId, messageMetadata, payload,
+                createEncryptionContext(messageMetadata), cnx(), schema, redeliveryCount, poolMessages
+        ).setBrokerEntryMetadata(brokerEntryMetadata);
+    }
+
+    private void executeNotifyCallback(final MessageImpl<T> message) {
+        // Enqueue the message so that it can be retrieved when application calls receive()
+        // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
+        // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
+        internalPinnedExecutor.execute(() -> {
+            if (hasNextPendingReceive()) {
+                notifyPendingReceivedCallback(message, null);
+            } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
+                notifyPendingBatchReceivedCallBack();
+            }
+        });
+    }
+
+    private void processPayloadByProcessor(final BrokerEntryMetadata brokerEntryMetadata,
+                                           final MessageMetadata messageMetadata,
+                                           final ByteBuf byteBuf,
+                                           final MessageIdImpl messageId,
+                                           final Schema<T> schema,
+                                           final int redeliveryCount,
+                                           final List<Long> ackSet) {
+        final MessagePayloadImpl payload = MessagePayloadImpl.create(byteBuf);
+        final MessagePayloadContextImpl entryContext = MessagePayloadContextImpl.get(
+                brokerEntryMetadata, messageMetadata, messageId, this, redeliveryCount, ackSet);
+        final AtomicInteger skippedMessages = new AtomicInteger(0);
+        try {
+            conf.getPayloadProcessor().process(payload, entryContext, schema, message -> {
+                if (message == null) {
+                    skippedMessages.incrementAndGet();
+                }
+                executeNotifyCallback((MessageImpl<T>) message);
+            });

Review comment:
       Here `message` can be null.
   Should we skip this call?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
##########
@@ -625,8 +625,9 @@ public BrokerEntryMetadata getBrokerEntryMetadata() {
         return brokerEntryMetadata;
     }
 
-    public void setBrokerEntryMetadata(BrokerEntryMetadata brokerEntryMetadata) {
+    public MessageImpl<T> setBrokerEntryMetadata(BrokerEntryMetadata brokerEntryMetadata) {

Review comment:
       Do we really need to change this method?

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchFormat.java
##########
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.processor;
+
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * A batch message whose format is customized.
+ *
+ * 1. First 2 bytes represent the number of messages.
+ * 2. Each message is a string, whose format is
+ *   1. First 2 bytes represent the length `N`.
+ *   2. Followed N bytes are the bytes of the string.
+ */
+public class CustomBatchFormat {
+
+    public static final String KEY = "entry.format";
+    public static final String VALUE = "custom";
+
+    @AllArgsConstructor
+    @Getter
+    public static class Metadata {
+        private final int numMessages;
+    }
+
+    public static ByteBuf serialize(Iterable<String> strings) {
+        final ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(1024);
+        buf.writeShort(0);
+        short numMessages = 0;
+        for (String s : strings) {
+            writeString(buf, s);
+            numMessages++;
+        }
+        buf.setShort(0, numMessages);
+        return buf;
+    }
+
+    private static void writeString(final ByteBuf buf, final String s) {
+        final byte[] bytes = Schema.STRING.encode(s);
+        buf.writeShort(bytes.length);
+        buf.writeBytes(bytes);
+    }
+
+    public static Metadata readMetadata(final ByteBuf buf) {
+        return new Metadata(buf.readShort());
+    }
+
+    public static byte[] readMessage(final ByteBuf buf) {
+        final short length = buf.readShort();
+        final byte[] bytes = new byte[length];
+        buf.readBytes(bytes);
+        return bytes;
+    }
+
+    @Test
+    public void testMultipleStrings() {

Review comment:
       Will surefire pick up this test even if the class name does not end with Test?
   
   This usually doesn't work. You can run the test in the IDE but probably it won't run on CI

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadImpl.java
##########
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.Recycler;
+import io.netty.util.ReferenceCountUtil;
+import lombok.Getter;
+import lombok.NonNull;
+import org.apache.pulsar.client.api.MessagePayload;
+
+/**
+ * A wrapper of {@link ByteBuf} that implements {@link MessagePayload}.
+ */
+public class MessagePayloadImpl implements MessagePayload {
+
+    private static final Recycler<MessagePayloadImpl> RECYCLER = new Recycler<MessagePayloadImpl>() {
+        @Override
+        protected MessagePayloadImpl newObject(Handle<MessagePayloadImpl> handle) {
+            return new MessagePayloadImpl(handle);
+        }
+    };
+    private static final byte[] EMPTY_BYTES = new byte[0];
+
+    private final Recycler.Handle<MessagePayloadImpl> recyclerHandle;
+    @Getter
+    private ByteBuf byteBuf;
+
+    public static MessagePayloadImpl create(@NonNull final ByteBuf byteBuf) {
+        final MessagePayloadImpl payload = RECYCLER.get();
+        payload.byteBuf = byteBuf;
+        return payload;
+    }
+
+    private MessagePayloadImpl(final Recycler.Handle<MessagePayloadImpl> handle) {
+        this.recyclerHandle = handle;
+    }
+
+    @Override
+    public void release() {
+        ReferenceCountUtil.release(byteBuf);
+        byteBuf = null;
+        recyclerHandle.recycle(this);
+    }
+
+    @Override
+    public byte[] copiedBuffer() {
+        final int readable = byteBuf.readableBytes();
+        if (readable > 0) {
+            final byte[] bytes = new byte[readable];
+            byteBuf.getBytes(byteBuf.readerIndex(), bytes);
+            return bytes;
+        } else {
+            return EMPTY_BYTES;

Review comment:
       This case is unlikely to happen.
   It may be confusing as we expect that here we return a copied array but for empty arrays we are returning exactly the same reference.
   I suggest to return a new empty array




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org