You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/10/06 08:40:12 UTC
[pulsar] branch master updated: [PIP 96] Add message payload
processor for Pulsar client (#12088)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 86b2a21 [PIP 96] Add message payload processor for Pulsar client (#12088)
86b2a21 is described below
commit 86b2a212d4bf38d536814517d6e91ea8d6be7a58
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Oct 6 16:39:28 2021 +0800
[PIP 96] Add message payload processor for Pulsar client (#12088)
---
.../pulsar/client/processor/CustomBatchFormat.java | 74 +++++++
.../processor/CustomBatchPayloadProcessor.java | 61 ++++++
.../client/processor/CustomBatchProducer.java | 70 ++++++
.../processor/DefaultProcessorWithRefCnt.java | 47 ++++
.../processor/MessagePayloadProcessorTest.java | 220 +++++++++++++++++++
.../apache/pulsar/client/api/ConsumerBuilder.java | 8 +
.../apache/pulsar/client/api/MessagePayload.java | 42 ++++
.../pulsar/client/api/MessagePayloadContext.java | 80 +++++++
.../pulsar/client/api/MessagePayloadFactory.java | 46 ++++
.../pulsar/client/api/MessagePayloadProcessor.java | 70 ++++++
.../PulsarClientImplementationBinding.java | 3 +
.../pulsar/client/impl/ConsumerBuilderImpl.java | 7 +
.../apache/pulsar/client/impl/ConsumerImpl.java | 236 +++++++++++++++------
.../client/impl/MessagePayloadContextImpl.java | 148 +++++++++++++
.../client/impl/MessagePayloadFactoryImpl.java | 37 ++++
.../pulsar/client/impl/MessagePayloadImpl.java | 72 +++++++
.../pulsar/client/impl/MessagePayloadUtils.java | 34 +++
.../PulsarClientImplementationBindingImpl.java | 4 +
.../impl/conf/ConsumerConfigurationData.java | 4 +
.../pulsar/client/api/MessagePayloadTest.java | 104 +++++++++
20 files changed, 1297 insertions(+), 70 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchFormat.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchFormat.java
new file mode 100644
index 0000000..571d292
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchFormat.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.processor;
+
+import io.netty.buffer.ByteBuf;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+
+/**
+ * A batch message whose format is customized.
+ *
+ * 1. First 2 bytes represent the number of messages.
+ * 2. Each message is a string, whose format is
+ * 1. First 2 bytes represent the length `N`.
+ * 2. Followed N bytes are the bytes of the string.
+ */
+public class CustomBatchFormat {
+
+ public static final String KEY = "entry.format";
+ public static final String VALUE = "custom";
+
+ @AllArgsConstructor
+ @Getter
+ public static class Metadata {
+ private final int numMessages;
+ }
+
+ public static ByteBuf serialize(Iterable<String> strings) {
+ final ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(1024);
+ buf.writeShort(0);
+ short numMessages = 0;
+ for (String s : strings) {
+ writeString(buf, s);
+ numMessages++;
+ }
+ buf.setShort(0, numMessages);
+ return buf;
+ }
+
+ private static void writeString(final ByteBuf buf, final String s) {
+ final byte[] bytes = Schema.STRING.encode(s);
+ buf.writeShort(bytes.length);
+ buf.writeBytes(bytes);
+ }
+
+ public static Metadata readMetadata(final ByteBuf buf) {
+ return new Metadata(buf.readShort());
+ }
+
+ public static byte[] readMessage(final ByteBuf buf) {
+ final short length = buf.readShort();
+ final byte[] bytes = new byte[length];
+ buf.readBytes(bytes);
+ return bytes;
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchPayloadProcessor.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchPayloadProcessor.java
new file mode 100644
index 0000000..c83b13b
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchPayloadProcessor.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.processor;
+
+import io.netty.buffer.ByteBuf;
+import java.util.function.Consumer;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.MessagePayloadContext;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessagePayload;
+import org.apache.pulsar.client.api.MessagePayloadFactory;
+import org.apache.pulsar.client.api.MessagePayloadProcessor;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessagePayloadUtils;
+
+@Slf4j
+public class CustomBatchPayloadProcessor implements MessagePayloadProcessor {
+
+ @Override
+ public <T> void process(MessagePayload payload, MessagePayloadContext context, Schema<T> schema,
+ Consumer<Message<T>> messageConsumer) throws Exception {
+ final String value = context.getProperty(CustomBatchFormat.KEY);
+ if (value == null || !value.equals(CustomBatchFormat.VALUE)) {
+ DEFAULT.process(payload, context, schema, messageConsumer);
+ return;
+ }
+
+ final ByteBuf buf = MessagePayloadUtils.convertToByteBuf(payload);
+ try {
+ final int numMessages = CustomBatchFormat.readMetadata(buf).getNumMessages();
+ for (int i = 0; i < numMessages; i++) {
+ final MessagePayload singlePayload =
+ MessagePayloadFactory.DEFAULT.wrap(CustomBatchFormat.readMessage(buf));
+ try {
+ messageConsumer.accept(
+ context.getMessageAt(i, numMessages, singlePayload, false, schema));
+ } finally {
+ singlePayload.release();
+ }
+ }
+ } finally {
+ buf.release();
+ }
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchProducer.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchProducer.java
new file mode 100644
index 0000000..bc07206
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchProducer.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.processor;
+
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+
+@RequiredArgsConstructor
+@Slf4j
+public class CustomBatchProducer {
+
+ private final List<String> messages = new ArrayList<>();
+ private final PersistentTopic persistentTopic;
+ private final int batchingMaxMessages;
+
+ public void sendAsync(final String value) {
+ messages.add(value);
+ if (messages.size() >= batchingMaxMessages) {
+ flush();
+ }
+ }
+
+ public void flush() {
+ final ByteBuf buf = CustomBatchFormat.serialize(messages);
+ final ByteBuf headerAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.None,
+ createCustomMetadata(), buf);
+ buf.release();
+ persistentTopic.publishMessage(headerAndPayload, (e, ledgerId, entryId) -> {
+ if (e == null) {
+ log.info("Send successfully to {} ({}, {})", persistentTopic.getName(), ledgerId, entryId);
+ } else {
+ log.error("Failed to send: {}", e.getMessage());
+ }
+ });
+ messages.clear();
+ }
+
+ private static MessageMetadata createCustomMetadata() {
+ final MessageMetadata messageMetadata = new MessageMetadata();
+ // Here are required fields
+ messageMetadata.setProducerName("");
+ messageMetadata.setSequenceId(0L);
+ messageMetadata.setPublishTime(0L);
+ // Add the property to identify the message format
+ messageMetadata.addProperty().setKey(CustomBatchFormat.KEY).setValue(CustomBatchFormat.VALUE);
+ return messageMetadata;
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/DefaultProcessorWithRefCnt.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/DefaultProcessorWithRefCnt.java
new file mode 100644
index 0000000..63e295f
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/DefaultProcessorWithRefCnt.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.processor;
+
+import java.util.function.Consumer;
+import lombok.Getter;
+import org.apache.pulsar.client.api.MessagePayloadContext;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessagePayload;
+import org.apache.pulsar.client.api.MessagePayloadProcessor;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessagePayloadImpl;
+
+/**
+ * The processor for Pulsar format messages and maintains a total reference count.
+ *
+ * It's used to verify {@link MessagePayloadContext#getMessageAt} and {@link MessagePayloadContext#asSingleMessage} have release the
+ * ByteBuf successfully.
+ */
+public class DefaultProcessorWithRefCnt implements MessagePayloadProcessor {
+
+ @Getter
+ int totalRefCnt = 0;
+
+ @Override
+ public <T> void process(MessagePayload payload, MessagePayloadContext context, Schema<T> schema,
+ Consumer<Message<T>> messageConsumer) throws Exception {
+ MessagePayloadProcessor.DEFAULT.process(payload, context, schema, messageConsumer);
+ totalRefCnt += ((MessagePayloadImpl) payload).getByteBuf().refCnt();
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/MessagePayloadProcessorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/MessagePayloadProcessorTest.java
new file mode 100644
index 0000000..6a12101
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/MessagePayloadProcessorTest.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.processor;
+
+import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import io.netty.buffer.ByteBuf;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessagePayloadProcessor;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Test for {@link MessagePayloadProcessor}.
+ */
+@Slf4j
+@Test(groups = "broker-impl")
+public class MessagePayloadProcessorTest extends ProducerConsumerBase {
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ admin.clusters().createCluster("test",
+ ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+ admin.tenants().createTenant("public",
+ new TenantInfoImpl(Sets.newHashSet("appid"), Sets.newHashSet("test")));
+ admin.namespaces().createNamespace("public/default", Sets.newHashSet("test"));
+ }
+
+ @AfterClass
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @DataProvider
+ public static Object[][] config() {
+ return new Object[][] {
+ // numPartitions / enableBatching / batchingMaxMessages
+ { 1, true, 1 },
+ { 1, true, 4 },
+ { 1, false, 1 },
+ { 3, false, 1 }
+ };
+ }
+
+ @DataProvider
+ public static Object[][] customBatchConfig() {
+ return new Object[][] {
+ // numMessages / batchingMaxMessages
+ { 10, 1 },
+ { 10, 4 }
+ };
+ }
+
+ @Test(dataProvider = "config")
+ public void testDefaultProcessor(int numPartitions, boolean enableBatching, int batchingMaxMessages)
+ throws Exception {
+ final String topic = "testDefaultProcessor-" + numPartitions + "-" + enableBatching + "-" + batchingMaxMessages;
+ final int numMessages = 10;
+ final String messagePrefix = "msg-";
+
+ if (numPartitions > 1) {
+ admin.topics().createPartitionedTopic(topic, numPartitions);
+ }
+
+ @Cleanup
+ final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .enableBatching(enableBatching)
+ .batchingMaxMessages(batchingMaxMessages)
+ .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
+ .messageRouter(new MessageRouter() {
+ int i = 0;
+
+ @Override
+ public int choosePartition(Message<?> msg, TopicMetadata metadata) {
+ return i++ % metadata.numPartitions();
+ }
+ })
+ .create();
+ for (int i = 0; i < numMessages; i++) {
+ final String value = messagePrefix + i;
+ producer.sendAsync(value).whenComplete((id, e) -> {
+ if (e == null) {
+ log.info("Send {} to {} {}", value, topic, id);
+ } else {
+ log.error("Failed to send {}: {}", value, e.getMessage());
+ }
+ });
+ }
+
+ final DefaultProcessorWithRefCnt processor = new DefaultProcessorWithRefCnt();
+ @Cleanup
+ final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .messagePayloadProcessor(processor)
+ .subscribe();
+ final List<String> values = new ArrayList<>();
+ for (int i = 0; i < numMessages; i++) {
+ final Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
+ Assert.assertNotNull(message);
+ values.add(message.getValue());
+ consumer.acknowledge(message.getMessageId());
+ }
+
+ if (numPartitions > 1) {
+ // messages are out of order across multiple partitions
+ Collections.sort(values);
+ }
+ for (int i = 0; i < numMessages; i++) {
+ Assert.assertEquals(values.get(i), messagePrefix + i);
+ }
+
+ // Each buffer's refCnt is 2 because after retrieving the refCnt, there will be two release for the ByteBuf:
+ // 1. ConsumerImpl#processPayloadByProcessor
+ // 2. PulsarDecoder#channelRead
+ if (enableBatching) {
+ int numBatches = numMessages / batchingMaxMessages;
+ numBatches += (numMessages % batchingMaxMessages == 0) ? 0 : 1;
+ Assert.assertEquals(processor.getTotalRefCnt(), 2 * numBatches);
+ } else {
+ Assert.assertEquals(processor.getTotalRefCnt(), 2 * numMessages);
+ }
+ }
+
+ @Test
+ public void testCustomBatchFormat() {
+ final List<List<String>> inputs = new ArrayList<>();
+ inputs.add(Collections.emptyList());
+ inputs.add(Collections.singletonList("java"));
+ inputs.add(Arrays.asList("hello", "world", "java"));
+
+ for (List<String> input : inputs) {
+ final ByteBuf buf = CustomBatchFormat.serialize(input);
+
+ final CustomBatchFormat.Metadata metadata = CustomBatchFormat.readMetadata(buf);
+ final List<String> parsedTokens = new ArrayList<>();
+ for (int i = 0; i < metadata.getNumMessages(); i++) {
+ parsedTokens.add(Schema.STRING.decode(CustomBatchFormat.readMessage(buf)));
+ }
+
+ Assert.assertEquals(parsedTokens, input);
+ Assert.assertEquals(parsedTokens.size(), input.size());
+
+ Assert.assertEquals(buf.refCnt(), 1);
+ buf.release();
+ }
+ }
+
+ @Test(dataProvider = "customBatchConfig")
+ public void testCustomProcessor(final int numMessages, final int batchingMaxMessages) throws Exception {
+ final String topic = "persistent://public/default/testCustomProcessor-"
+ + numMessages + "-" + batchingMaxMessages;
+
+ @Cleanup
+ final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .messagePayloadProcessor(new CustomBatchPayloadProcessor())
+ .subscribe();
+
+ final PersistentTopic persistentTopic =
+ (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().orElse(null);
+ Assert.assertNotNull(persistentTopic);
+
+ final String messagePrefix = "msg-";
+ final CustomBatchProducer producer = new CustomBatchProducer(persistentTopic, batchingMaxMessages);
+ for (int i = 0; i < numMessages; i++) {
+ producer.sendAsync(messagePrefix + i);
+ }
+ producer.flush();
+
+ for (int i = 0; i < numMessages; i++) {
+ final Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
+ Assert.assertNotNull(message);
+ Assert.assertEquals(message.getValue(), messagePrefix + i);
+ consumer.acknowledge(message.getMessageId());
+ }
+ }
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 1038ba9..3c3ce17 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -741,4 +741,12 @@ public interface ConsumerBuilder<T> extends Cloneable {
* corruption, deserialization error, etc.).
*/
ConsumerBuilder<T> poolMessages(boolean poolMessages);
+
+ /**
+ * If it's configured with a non-null value, the consumer will use the processor to process the payload, including
+ * decoding it to messages and triggering the listener.
+ *
+ * Default: null
+ */
+ ConsumerBuilder<T> messagePayloadProcessor(MessagePayloadProcessor payloadProcessor);
}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayload.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayload.java
new file mode 100644
index 0000000..5d2ff63
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayload.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * The abstraction of a message's payload.
+ */
+public interface MessagePayload {
+
+ /**
+ * Copy the bytes of the payload into the byte array.
+ *
+ * @return the byte array that is filled with the readable bytes of the payload, it should not be null
+ */
+ byte[] copiedBuffer();
+
+ /**
+ * Release the resources if necessary.
+ *
+ * NOTE: For a MessagePayload object that is created from {@link MessagePayloadFactory#DEFAULT}, this method must be
+ * called to avoid memory leak.
+ */
+ default void release() {
+ // No ops
+ }
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadContext.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadContext.java
new file mode 100644
index 0000000..a444de7
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadContext.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * The context of the message payload, which usually represents a batched message (batch) or a single message.
+ */
+public interface MessagePayloadContext {
+
+ /**
+ * Get a value associated with the given key.
+ *
+ * When the message payload is not produced by Pulsar producer, a specific property is usually added to indicate the
+ * format. So this method is useful to determine whether the payload is produced by Pulsar producer.
+ *
+ * @param key
+ * @return the value associated with the key or null if the key or value doesn't exist
+ */
+ String getProperty(String key);
+
+ /**
+ * Get the number of messages when the payload is produced by Pulsar producer.
+ *
+ * @return the number of messages
+ */
+ int getNumMessages();
+
+ /**
+ * Check whether the payload is a batch when the payload is produced by Pulsar producer.
+ *
+ * @return true if the payload is a batch
+ */
+ boolean isBatch();
+
+ /**
+ * Get the internal single message with a specific index from a payload if the payload is a batch.
+ *
+ * @param index the batch index
+ * @param numMessages the number of messages in the batch
+ * @param payload the message payload
+ * @param containMetadata whether the payload contains the single message metadata
+ * @param schema the schema of the batch
+ * @param <T>
+ * @return the created message
+ * @implNote The `index` and `numMessages` parameters are used to create the message id with batch index.
+ * If `containMetadata` is true, parse the single message metadata from the payload first. The fields of single
+ * message metadata will overwrite the same fields of the entry's metadata.
+ */
+ <T> Message<T> getMessageAt(int index,
+ int numMessages,
+ MessagePayload payload,
+ boolean containMetadata,
+ Schema<T> schema);
+
+ /**
+ * Convert the given payload to a single message if the entry is not a batch.
+ *
+ * @param payload the message payload
+ * @param schema the schema of the message
+ * @param <T>
+ * @return the created message
+ */
+ <T> Message<T> asSingleMessage(MessagePayload payload, Schema<T> schema);
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadFactory.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadFactory.java
new file mode 100644
index 0000000..0181d6d
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadFactory.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.nio.ByteBuffer;
+import org.apache.pulsar.client.internal.DefaultImplementation;
+
+/**
+ * The factory class of {@link MessagePayload}.
+ */
+public interface MessagePayloadFactory {
+
+ MessagePayloadFactory DEFAULT = DefaultImplementation.getDefaultImplementation().newDefaultMessagePayloadFactory();
+
+ /**
+ * Create a payload whose underlying buffer refers to a byte array.
+ *
+ * @param bytes the byte array
+ * @return the created MessagePayload object
+ */
+ MessagePayload wrap(byte[] bytes);
+
+ /**
+ * Create a payload whose underlying buffer refers to a NIO buffer.
+ *
+ * @param buffer the NIO buffer
+ * @return the created MessagePayload object
+ */
+ MessagePayload wrap(ByteBuffer buffer);
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadProcessor.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadProcessor.java
new file mode 100644
index 0000000..1d50a50
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadProcessor.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.util.function.Consumer;
+
+/**
+ * The processor to process a message payload.
+ *
+ * It's responsible to convert the raw buffer to some messages, then trigger some callbacks so that consumer can consume
+ * these messages and handle the exception if it existed.
+ *
+ * The most important part is to decode the raw buffer. After that, we can call
+ * {@link MessagePayloadContext#getMessageAt} or {@link MessagePayloadContext#asSingleMessage} to construct
+ * {@link Message} for consumer to consume. Since we need to pass the {@link MessagePayload} object to these methods, we
+ * can use {@link MessagePayloadFactory#DEFAULT} to create it or just reuse the payload argument.
+ */
+public interface MessagePayloadProcessor {
+
+ /**
+ * Process the message payload.
+ *
+ * @param payload the payload whose underlying buffer is a Netty ByteBuf
+ * @param context the message context that contains the message format information and methods to create a message
+ * @param schema the message's schema
+ * @param messageConsumer the callback to consume each message
+ * @param <T>
+ * @throws Exception
+ */
+ <T> void process(MessagePayload payload,
+ MessagePayloadContext context,
+ Schema<T> schema,
+ Consumer<Message<T>> messageConsumer) throws Exception;
+
+ // The default processor for Pulsar format payload. It should be noted getNumMessages() and isBatch() methods of
+ // EntryContext only work for Pulsar format. For other formats, the message metadata might be stored in the payload.
+ MessagePayloadProcessor DEFAULT = new MessagePayloadProcessor() {
+
+ @Override
+ public <T> void process(MessagePayload payload,
+ MessagePayloadContext context,
+ Schema<T> schema,
+ Consumer<Message<T>> messageConsumer) {
+ if (context.isBatch()) {
+ final int numMessages = context.getNumMessages();
+ for (int i = 0; i < numMessages; i++) {
+ messageConsumer.accept(context.getMessageAt(i, numMessages, payload, true, schema));
+ }
+ } else {
+ messageConsumer.accept(context.asSingleMessage(payload, schema));
+ }
+ }
+ };
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
index 98b4d5c..f7bcf05 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
@@ -21,6 +21,7 @@ import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessagePayloadFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -229,6 +230,8 @@ public interface PulsarClientImplementationBinding {
BatcherBuilder newKeyBasedBatcherBuilder();
+ MessagePayloadFactory newDefaultMessagePayloadFactory();
+
/**
* Retrieves ByteBuffer data into byte[].
*
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index dba18d9..cbfc27d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.MessagePayloadProcessor;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
@@ -479,4 +480,10 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
conf.setPoolMessages(poolMessages);
return this;
}
+
+ @Override
+ public ConsumerBuilder<T> messagePayloadProcessor(MessagePayloadProcessor payloadProcessor) {
+ conf.setPayloadProcessor(payloadProcessor);
+ return this;
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 98767d1..97fb13f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -48,9 +48,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@@ -996,6 +996,135 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
});
}
+ protected boolean isBatch(MessageMetadata messageMetadata) {
+ // if message is not decryptable then it can't be parsed as a batch-message. so, add EncyrptionCtx to message
+ // and return undecrypted payload
+ return !isMessageUndecryptable(messageMetadata) &&
+ (messageMetadata.hasNumMessagesInBatch() || messageMetadata.getNumMessagesInBatch() != 1);
+ }
+
+ protected <U> MessageImpl<U> newSingleMessage(final int index,
+ final int numMessages,
+ final BrokerEntryMetadata brokerEntryMetadata,
+ final MessageMetadata msgMetadata,
+ final SingleMessageMetadata singleMessageMetadata,
+ final ByteBuf payload,
+ final MessageIdImpl messageId,
+ final Schema<U> schema,
+ final boolean containMetadata,
+ final BitSetRecyclable ackBitSet,
+ final BatchMessageAcker acker,
+ final int redeliveryCount) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] processing message num - {} in batch", subscription, consumerName, index);
+ }
+
+ ByteBuf singleMessagePayload = null;
+ try {
+ if (containMetadata) {
+ singleMessagePayload =
+ Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, index, numMessages);
+ }
+
+ if (isSameEntry(messageId) && isPriorBatchIndex(index)) {
+ // If we are receiving a batch message, we need to discard messages that were prior
+ // to the startMessageId
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription,
+ consumerName, startMessageId);
+ }
+ return null;
+ }
+
+ if (singleMessageMetadata != null && singleMessageMetadata.isCompactedOut()) {
+ // message has been compacted out, so don't send to the user
+ return null;
+ }
+
+ if (ackBitSet != null && !ackBitSet.get(index)) {
+ return null;
+ }
+
+ BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(),
+ messageId.getEntryId(), getPartitionIndex(), index, numMessages, acker);
+
+ final ByteBuf payloadBuffer = (singleMessagePayload != null) ? singleMessagePayload : payload;
+ final MessageImpl<U> message = MessageImpl.create(topicName.toString(), batchMessageIdImpl,
+ msgMetadata, singleMessageMetadata, payloadBuffer,
+ createEncryptionContext(msgMetadata), cnx(), schema, redeliveryCount, poolMessages
+ );
+ message.setBrokerEntryMetadata(brokerEntryMetadata);
+ return message;
+ } catch (IOException | IllegalStateException e) {
+ throw new IllegalStateException(e);
+ } finally {
+ if (singleMessagePayload != null) {
+ singleMessagePayload.release();
+ }
+ }
+ }
+
+ protected <U> MessageImpl<U> newMessage(final MessageIdImpl messageId,
+ final BrokerEntryMetadata brokerEntryMetadata,
+ final MessageMetadata messageMetadata,
+ final ByteBuf payload,
+ final Schema<U> schema,
+ final int redeliveryCount) {
+ final MessageImpl<U> message = MessageImpl.create(topicName.toString(), messageId, messageMetadata, payload,
+ createEncryptionContext(messageMetadata), cnx(), schema, redeliveryCount, poolMessages
+ );
+ message.setBrokerEntryMetadata(brokerEntryMetadata);
+ return message;
+ }
+
+ private void executeNotifyCallback(final MessageImpl<T> message) {
+ // Enqueue the message so that it can be retrieved when application calls receive()
+ // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
+ // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
+ internalPinnedExecutor.execute(() -> {
+ if (hasNextPendingReceive()) {
+ notifyPendingReceivedCallback(message, null);
+ } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
+ notifyPendingBatchReceivedCallBack();
+ }
+ });
+ }
+
+ private void processPayloadByProcessor(final BrokerEntryMetadata brokerEntryMetadata,
+ final MessageMetadata messageMetadata,
+ final ByteBuf byteBuf,
+ final MessageIdImpl messageId,
+ final Schema<T> schema,
+ final int redeliveryCount,
+ final List<Long> ackSet) {
+ final MessagePayloadImpl payload = MessagePayloadImpl.create(byteBuf);
+ final MessagePayloadContextImpl entryContext = MessagePayloadContextImpl.get(
+ brokerEntryMetadata, messageMetadata, messageId, this, redeliveryCount, ackSet);
+ final AtomicInteger skippedMessages = new AtomicInteger(0);
+ try {
+ conf.getPayloadProcessor().process(payload, entryContext, schema, message -> {
+ if (message != null) {
+ executeNotifyCallback((MessageImpl<T>) message);
+ } else {
+ skippedMessages.incrementAndGet();
+ }
+ });
+ } catch (Throwable throwable) {
+ log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName, throwable);
+ discardCorruptedMessage(messageId, cnx(), ValidationError.BatchDeSerializeError);
+ } finally {
+ entryContext.recycle();
+ payload.release(); // byteBuf.release() is called in this method
+ }
+
+ if (skippedMessages.get() > 0) {
+ increaseAvailablePermits(cnx(), skippedMessages.get());
+ }
+
+ internalPinnedExecutor.execute(()
+ -> tryTriggerListener());
+ }
+
void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ackSet, ByteBuf headersAndPayload, ClientCnx cnx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Received message: {}/{}", topic, subscription, messageId.getLedgerId(),
@@ -1051,6 +1180,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
return;
}
+ if (conf.getPayloadProcessor() != null) {
+ // uncompressedPayload is released in this method so we don't need to call release() again
+ processPayloadByProcessor(
+ brokerEntryMetadata, msgMetadata, uncompressedPayload, msgId, schema, redeliveryCount, ackSet);
+ return;
+ }
+
// if message is not decryptable then it can't be parsed as a batch-message. so, add EncyrptionCtx to message
// and return undecrypted payload
if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) {
@@ -1063,7 +1199,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
}
- if (isSameEntry(messageId) && isPriorEntryIndex(messageId.getEntryId())) {
+ if (isSameEntry(msgId) && isPriorEntryIndex(messageId.getEntryId())) {
// We need to discard entries that were prior to startMessageId
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription,
@@ -1074,27 +1210,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
return;
}
- final MessageImpl<T> message = MessageImpl.create(topicName.toString(), msgId, msgMetadata,
- uncompressedPayload, createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount,
- poolMessages);
+ final MessageImpl<T> message =
+ newMessage(msgId, brokerEntryMetadata, msgMetadata, uncompressedPayload, schema, redeliveryCount);
uncompressedPayload.release();
- message.setBrokerEntryMetadata(brokerEntryMetadata);
- // Enqueue the message so that it can be retrieved when application calls receive()
- // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
- // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
- internalPinnedExecutor.execute(() -> {
- if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
- redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
- possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(),
- Collections.singletonList(message));
- }
- if (hasNextPendingReceive()) {
- notifyPendingReceivedCallback(message, null);
- } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
- notifyPendingBatchReceivedCallBack();
- }
- });
+ if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
+ redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
+ possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(),
+ Collections.singletonList(message));
+ }
+ executeNotifyCallback(message);
} else {
// handle batch message enqueuing; uncompressed payload has all messages in batch
receiveIndividualMessagesFromBatch(brokerEntryMetadata, msgMetadata, redeliveryCount, ackSet, uncompressedPayload, messageId, cnx);
@@ -1268,63 +1393,23 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
int skippedMessages = 0;
try {
for (int i = 0; i < batchSize; ++i) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] processing message num - {} in batch", subscription, consumerName, i);
- }
-
- ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
- singleMessageMetadata, i, batchSize);
-
- if (isSameEntry(messageId) && isPriorBatchIndex(i)) {
- // If we are receiving a batch message, we need to discard messages that were prior
- // to the startMessageId
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription,
- consumerName, startMessageId);
- }
- singleMessagePayload.release();
-
- ++skippedMessages;
+ final MessageImpl<T> message = newSingleMessage(i, batchSize, brokerEntryMetadata, msgMetadata,
+ singleMessageMetadata, uncompressedPayload, batchMessage, schema, true, ackBitSet, acker,
+ redeliveryCount);
+ if (message == null) {
+ skippedMessages++;
continue;
}
-
- if (singleMessageMetadata.isCompactedOut()) {
- // message has been compacted out, so don't send to the user
- singleMessagePayload.release();
-
- ++skippedMessages;
- continue;
- }
-
- if (ackBitSet != null && !ackBitSet.get(i)) {
- singleMessagePayload.release();
- ++skippedMessages;
- continue;
- }
-
- BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(),
- messageId.getEntryId(), getPartitionIndex(), i, batchSize, acker);
- final MessageImpl<T> message = MessageImpl.create(topicName.toString(), batchMessageIdImpl,
- msgMetadata, singleMessageMetadata, singleMessagePayload,
- createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount, poolMessages);
- message.setBrokerEntryMetadata(brokerEntryMetadata);
if (possibleToDeadLetter != null) {
possibleToDeadLetter.add(message);
}
- internalPinnedExecutor.execute(() -> {
- if (hasNextPendingReceive()) {
- notifyPendingReceivedCallback(message, null);
- } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
- notifyPendingBatchReceivedCallBack();
- }
- singleMessagePayload.release();
- });
+ executeNotifyCallback(message);
}
if (ackBitSet != null) {
ackBitSet.recycle();
}
- } catch (IOException e) {
- log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName);
+ } catch (IllegalStateException e) {
+ log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName, e);
discardCorruptedMessage(messageId, cnx, ValidationError.BatchDeSerializeError);
}
@@ -1350,7 +1435,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
return resetIncludeHead ? idx < startMessageId.getBatchIndex() : idx <= startMessageId.getBatchIndex();
}
- private boolean isSameEntry(MessageIdData messageId) {
+ private boolean isSameEntry(MessageIdImpl messageId) {
return startMessageId != null
&& messageId.getLedgerId() == startMessageId.getLedgerId()
&& messageId.getEntryId() == startMessageId.getEntryId();
@@ -1544,6 +1629,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
return true;
}
+ private void discardCorruptedMessage(MessageIdImpl messageId, ClientCnx currentCnx,
+ ValidationError validationError) {
+ log.error("[{}][{}] Discarding corrupted message at {}:{}", topic, subscription, messageId.getLedgerId(),
+ messageId.getEntryId());
+ ByteBuf cmd = Commands.newAck(consumerId, messageId.getLedgerId(), messageId.getEntryId(), null, AckType.Individual,
+ validationError, Collections.emptyMap(), -1);
+ currentCnx.ctx().writeAndFlush(cmd, currentCnx.ctx().voidPromise());
+ increaseAvailablePermits(currentCnx);
+ stats.incrementNumReceiveFailed();
+ }
+
private void discardCorruptedMessage(MessageIdData messageId, ClientCnx currentCnx,
ValidationError validationError) {
log.error("[{}][{}] Discarding corrupted message at {}:{}", topic, subscription, messageId.getLedgerId(),
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java
new file mode 100644
index 0000000..aa6cab8
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.Recycler;
+import java.util.List;
+import lombok.NonNull;
+import org.apache.pulsar.client.api.MessagePayloadContext;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessagePayload;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
+import org.apache.pulsar.common.api.proto.KeyValue;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
+import org.apache.pulsar.common.util.SafeCollectionUtils;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+
+public class MessagePayloadContextImpl implements MessagePayloadContext {
+
+ private static final Recycler<MessagePayloadContextImpl> RECYCLER = new Recycler<MessagePayloadContextImpl>() {
+ @Override
+ protected MessagePayloadContextImpl newObject(Handle<MessagePayloadContextImpl> handle) {
+ return new MessagePayloadContextImpl(handle);
+ }
+ };
+
+ private final Recycler.Handle<MessagePayloadContextImpl> recyclerHandle;
+ private BrokerEntryMetadata brokerEntryMetadata;
+ private MessageMetadata messageMetadata;
+ private SingleMessageMetadata singleMessageMetadata;
+ private MessageIdImpl messageId;
+ private ConsumerImpl<?> consumer;
+ private int redeliveryCount;
+ private BatchMessageAcker acker;
+ private BitSetRecyclable ackBitSet;
+
+ private MessagePayloadContextImpl(final Recycler.Handle<MessagePayloadContextImpl> handle) {
+ this.recyclerHandle = handle;
+ }
+
+ public static MessagePayloadContextImpl get(final BrokerEntryMetadata brokerEntryMetadata,
+ @NonNull final MessageMetadata messageMetadata,
+ @NonNull final MessageIdImpl messageId,
+ @NonNull final ConsumerImpl<?> consumer,
+ final int redeliveryCount,
+ final List<Long> ackSet) {
+ final MessagePayloadContextImpl context = RECYCLER.get();
+ context.brokerEntryMetadata = brokerEntryMetadata;
+ context.messageMetadata = messageMetadata;
+ context.singleMessageMetadata = new SingleMessageMetadata();
+ context.messageId = messageId;
+ context.consumer = consumer;
+ context.redeliveryCount = redeliveryCount;
+ context.acker = BatchMessageAcker.newAcker(context.getNumMessages());
+ context.ackBitSet = (ackSet != null && ackSet.size() > 0)
+ ? BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(ackSet))
+ : null;
+ return context;
+ }
+
+ public void recycle() {
+ brokerEntryMetadata = null;
+ messageMetadata = null;
+ singleMessageMetadata = null;
+ messageId = null;
+ consumer = null;
+ redeliveryCount = 0;
+ acker = null;
+ if (ackBitSet != null) {
+ ackBitSet.recycle();
+ ackBitSet = null;
+ }
+ }
+
+ @Override
+ public String getProperty(String key) {
+ for (KeyValue keyValue : messageMetadata.getPropertiesList()) {
+ if (keyValue.hasKey() && keyValue.getKey().equals(key)) {
+ return keyValue.getValue();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public int getNumMessages() {
+ return messageMetadata.getNumMessagesInBatch();
+ }
+
+ @Override
+ public boolean isBatch() {
+ return consumer.isBatch(messageMetadata);
+ }
+
+ @Override
+ public <T> Message<T> getMessageAt(int index,
+ int numMessages,
+ MessagePayload payload,
+ boolean containMetadata,
+ Schema<T> schema) {
+ final ByteBuf payloadBuffer = MessagePayloadUtils.convertToByteBuf(payload);
+ try {
+ return consumer.newSingleMessage(index,
+ numMessages,
+ brokerEntryMetadata,
+ messageMetadata,
+ singleMessageMetadata,
+ payloadBuffer,
+ messageId,
+ schema,
+ containMetadata,
+ ackBitSet,
+ acker,
+ redeliveryCount);
+ } finally {
+ payloadBuffer.release();
+ }
+ }
+
+ @Override
+ public <T> Message<T> asSingleMessage(MessagePayload payload, Schema<T> schema) {
+ final ByteBuf payloadBuffer = MessagePayloadUtils.convertToByteBuf(payload);
+ try {
+ return consumer.newMessage(
+ messageId, brokerEntryMetadata, messageMetadata, payloadBuffer, schema, redeliveryCount);
+ } finally {
+ payloadBuffer.release();
+ }
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadFactoryImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadFactoryImpl.java
new file mode 100644
index 0000000..70a5b56
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadFactoryImpl.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.buffer.Unpooled;
+import java.nio.ByteBuffer;
+import org.apache.pulsar.client.api.MessagePayload;
+import org.apache.pulsar.client.api.MessagePayloadFactory;
+
+public class MessagePayloadFactoryImpl implements MessagePayloadFactory {
+
+ @Override
+ public MessagePayload wrap(byte[] bytes) {
+ return MessagePayloadImpl.create(Unpooled.wrappedBuffer(bytes));
+ }
+
+ @Override
+ public MessagePayload wrap(ByteBuffer buffer) {
+ return MessagePayloadImpl.create(Unpooled.wrappedBuffer(buffer));
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadImpl.java
new file mode 100644
index 0000000..d99cd83
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadImpl.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.Recycler;
+import io.netty.util.ReferenceCountUtil;
+import lombok.Getter;
+import lombok.NonNull;
+import org.apache.pulsar.client.api.MessagePayload;
+
+/**
+ * A wrapper of {@link ByteBuf} that implements {@link MessagePayload}.
+ */
+public class MessagePayloadImpl implements MessagePayload {
+
+ private static final Recycler<MessagePayloadImpl> RECYCLER = new Recycler<MessagePayloadImpl>() {
+ @Override
+ protected MessagePayloadImpl newObject(Handle<MessagePayloadImpl> handle) {
+ return new MessagePayloadImpl(handle);
+ }
+ };
+
+ private final Recycler.Handle<MessagePayloadImpl> recyclerHandle;
+ @Getter
+ private ByteBuf byteBuf;
+
+ public static MessagePayloadImpl create(@NonNull final ByteBuf byteBuf) {
+ final MessagePayloadImpl payload = RECYCLER.get();
+ payload.byteBuf = byteBuf;
+ return payload;
+ }
+
+ private MessagePayloadImpl(final Recycler.Handle<MessagePayloadImpl> handle) {
+ this.recyclerHandle = handle;
+ }
+
+ @Override
+ public void release() {
+ ReferenceCountUtil.release(byteBuf);
+ byteBuf = null;
+ recyclerHandle.recycle(this);
+ }
+
+ @Override
+ public byte[] copiedBuffer() {
+ final int readable = byteBuf.readableBytes();
+ if (readable > 0) {
+ final byte[] bytes = new byte[readable];
+ byteBuf.getBytes(byteBuf.readerIndex(), bytes);
+ return bytes;
+ } else {
+ return new byte[0];
+ }
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadUtils.java
new file mode 100644
index 0000000..64faffe
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadUtils.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.pulsar.client.api.MessagePayload;
+
+public class MessagePayloadUtils {
+
+ public static ByteBuf convertToByteBuf(final MessagePayload payload) {
+ if (payload instanceof MessagePayloadImpl) {
+ return ((MessagePayloadImpl) payload).getByteBuf().retain();
+ } else {
+ return Unpooled.wrappedBuffer(payload.copiedBuffer());
+ }
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
index 225e246..c146f23 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessagePayloadFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -379,4 +380,7 @@ public final class PulsarClientImplementationBindingImpl implements PulsarClient
return new KeyBasedBatcherBuilder();
}
+ public MessagePayloadFactory newDefaultMessagePayloadFactory() {
+ return new MessagePayloadFactoryImpl();
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index a39ac21..1076946 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.MessagePayloadProcessor;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
@@ -149,6 +150,9 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
private boolean poolMessages = false;
+ @JsonIgnore
+ private transient MessagePayloadProcessor payloadProcessor = null;
+
public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit timeUnit) {
checkArgument(interval > 0, "interval needs to be > 0");
this.autoUpdatePartitionsIntervalSeconds = timeUnit.toSeconds(interval);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessagePayloadTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessagePayloadTest.java
new file mode 100644
index 0000000..f980e51
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessagePayloadTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import io.netty.buffer.ByteBuf;
+import java.nio.ByteBuffer;
+import org.apache.pulsar.client.impl.MessagePayloadImpl;
+import org.apache.pulsar.client.impl.MessagePayloadUtils;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test of {@link MessagePayload}.
+ */
+public class MessagePayloadTest {
+
+ @Test
+ public void testConvertMessagePayloadImpl() {
+ final ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(1);
+
+ final MessagePayloadImpl payload = MessagePayloadImpl.create(buf);
+ Assert.assertEquals(buf.refCnt(), 1);
+
+ final ByteBuf convertedBuf = MessagePayloadUtils.convertToByteBuf(payload);
+ Assert.assertSame(convertedBuf, buf);
+
+ Assert.assertEquals(buf.refCnt(), 2);
+ buf.release();
+ buf.release();
+ }
+
+ @Test
+ public void testConvertCustomPayload() {
+ final ByteBuffer buffer = ByteBuffer.allocate(3);
+ buffer.put(new byte[]{ 0x11, 0x22, 0x33 });
+ buffer.flip();
+ buffer.get(); // skip 1st byte
+
+ final ByteBuf buf = MessagePayloadUtils.convertToByteBuf(new ByteBufferPayload(buffer));
+ Assert.assertEquals(buf.refCnt(), 1);
+
+ Assert.assertEquals(buf.readableBytes(), 2);
+ Assert.assertEquals(buf.readByte(), 0x22);
+ Assert.assertEquals(buf.readByte(), 0x33);
+
+ buf.release();
+ }
+
+ @Test
+ public void testConvertEmptyCustomPayload() {
+ final ByteBuf buf = MessagePayloadUtils.convertToByteBuf(new ByteBufferPayload(ByteBuffer.allocate(0)));
+ Assert.assertEquals(buf.refCnt(), 1);
+ Assert.assertEquals(buf.readableBytes(), 0);
+ buf.release();
+ }
+
+ private static class ByteBufferPayload implements MessagePayload {
+
+ private final ByteBuffer buffer;
+
+ public ByteBufferPayload(final ByteBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ @Override
+ public byte[] copiedBuffer() {
+ final byte[] bytes = new byte[buffer.remaining()];
+ buffer.get(bytes);
+ return bytes;
+ }
+ }
+
+ @Test
+ public void testFactoryWrap() {
+ MessagePayloadImpl payload = (MessagePayloadImpl) MessagePayloadFactory.DEFAULT.wrap(new byte[1]);
+ ByteBuf byteBuf = payload.getByteBuf();
+ Assert.assertEquals(byteBuf.refCnt(), 1);
+ payload.release();
+ Assert.assertEquals(byteBuf.refCnt(), 0);
+
+ payload = (MessagePayloadImpl) MessagePayloadFactory.DEFAULT.wrap(ByteBuffer.allocate(1));
+ byteBuf = payload.getByteBuf();
+ Assert.assertEquals(byteBuf.refCnt(), 1);
+ payload.release();
+ Assert.assertEquals(byteBuf.refCnt(), 0);
+ }
+}