You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/05/19 06:11:54 UTC

[pulsar] branch master updated: [Issue 4803][client] return null if the message value/data is not set by producer (#6379)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d55bc00  [Issue 4803][client] return null if the message value/data is not set by producer (#6379)
d55bc00 is described below

commit d55bc00f34a2fa763a3756fa0adbb1366ae319bd
Author: Neng Lu <nl...@streamnative.io>
AuthorDate: Mon May 18 23:11:42 2020 -0700

    [Issue 4803][client] return null if the message value/data is not set by producer (#6379)
    
    Fixes #4803
    
    ### Motivation
    Allow the typed consumer receive messages with `null` value if the producer sends message without payload.
    
    ### Modifications
    - add a flag in `MessageMetadata` to indicate if the payload is set when the message is created
    - check and return `null` if the flag is not set when reading data from a message
---
 .../broker/admin/impl/PersistentTopicsBase.java    |   3 +
 .../apache/pulsar/broker/admin/AdminApiTest.java   |  48 +++++--
 .../pulsar/broker/service/NullValueTest.java       | 148 +++++++++++++++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   |  16 ++-
 .../apache/pulsar/client/impl/ConsumerBase.java    |   3 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   2 +-
 .../org/apache/pulsar/client/impl/MessageImpl.java |  19 ++-
 .../client/impl/TypedMessageBuilderImpl.java       |   6 +-
 .../apache/pulsar/client/impl/MessageImplTest.java |  15 +++
 .../apache/pulsar/common/api/proto/PulsarApi.java  | 114 ++++++++++++++++
 .../apache/pulsar/common/protocol/Commands.java    |   4 +
 pulsar-common/src/main/proto/PulsarApi.proto       |   5 +
 .../pulsar/PulsarConsumerSourceTests.java          |   3 +-
 .../worker/FunctionRuntimeManagerTest.java         |   8 +-
 .../org/apache/pulsar/storm/PulsarSpoutTest.java   |   7 +-
 15 files changed, 370 insertions(+), 31 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index b205d46..ece5371 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1930,6 +1930,9 @@ public class PersistentTopicsBase extends AdminResource {
         if (metadata.hasNumMessagesInBatch()) {
             responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch());
         }
+        if (metadata.hasNullValue()) {
+            responseBuilder.header("X-Pulsar-null-value", metadata.hasNullValue());
+        }
 
         // Decode if needed
         CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 39855d6..eb4dad7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -793,6 +793,17 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         topicStats = admin.topics().getStats(persistentTopicName);
         assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 0);
 
+        publishNullValueMessageOnPersistentTopic(persistentTopicName, 10);
+        topicStats = admin.topics().getStats(persistentTopicName);
+        assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 10);
+        messages = admin.topics().peekMessages(persistentTopicName, subName, 10);
+        assertEquals(messages.size(), 10);
+        for (int i = 0; i < 10; i++) {
+            assertNull(messages.get(i).getData());
+            assertNull(messages.get(i).getValue());
+        }
+        admin.topics().skipAllMessages(persistentTopicName, subName);
+
         consumer.close();
         client.close();
 
@@ -1559,10 +1570,15 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
     long secondTimestamp = System.currentTimeMillis();
 
     private void publishMessagesOnPersistentTopic(String topicName, int messages) throws Exception {
-        publishMessagesOnPersistentTopic(topicName, messages, 0);
+        publishMessagesOnPersistentTopic(topicName, messages, 0, false);
     }
 
-    private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception {
+    private void publishNullValueMessageOnPersistentTopic(String topicName, int messages) throws Exception {
+        publishMessagesOnPersistentTopic(topicName, messages, 0, true);
+    }
+
+    private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx,
+                                                  boolean nullValue) throws Exception {
         Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic(topicName)
             .enableBatching(false)
@@ -1570,8 +1586,12 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
             .create();
 
         for (int i = startIdx; i < (messages + startIdx); i++) {
-            String message = "message-" + i;
-            producer.send(message.getBytes());
+            if (nullValue) {
+                producer.send(null);
+            } else {
+                String message = "message-" + i;
+                producer.send(message.getBytes());
+            }
         }
 
         producer.close();
@@ -1704,13 +1724,13 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
 
         assertEquals(admin.topics().getSubscriptions(topicName), Lists.newArrayList("my-sub"));
 
-        publishMessagesOnPersistentTopic(topicName, 5, 0);
+        publishMessagesOnPersistentTopic(topicName, 5, 0, false);
 
         // Allow at least 1ms for messages to have different timestamps
         Thread.sleep(1);
         long messageTimestamp = System.currentTimeMillis();
 
-        publishMessagesOnPersistentTopic(topicName, 5, 5);
+        publishMessagesOnPersistentTopic(topicName, 5, 5, false);
 
         List<Message<byte[]>> messages = admin.topics().peekMessages(topicName, "my-sub", 10);
         assertEquals(messages.size(), 10);
@@ -1757,17 +1777,17 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
 
         assertEquals(admin.topics().getSubscriptions(topicName), Lists.newArrayList("my-sub"));
 
-        publishMessagesOnPersistentTopic(topicName, 5, 0);
+        publishMessagesOnPersistentTopic(topicName, 5, 0, false);
 
         // Allow at least 1ms for messages to have different timestamps
         Thread.sleep(1);
         long firstTimestamp = System.currentTimeMillis();
-        publishMessagesOnPersistentTopic(topicName, 3, 5);
+        publishMessagesOnPersistentTopic(topicName, 3, 5, false);
 
         Thread.sleep(1);
         long secondTimestamp = System.currentTimeMillis();
 
-        publishMessagesOnPersistentTopic(topicName, 2, 8);
+        publishMessagesOnPersistentTopic(topicName, 2, 8, false);
 
         List<Message<byte[]>> messages = admin.topics().peekMessages(topicName, "my-sub", 10);
         assertEquals(messages.size(), 10);
@@ -1829,13 +1849,13 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
                 .consumerName("consumerA").subscriptionType(SubscriptionType.Failover)
                 .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
 
-        publishMessagesOnPersistentTopic(topicName, 5, 0);
+        publishMessagesOnPersistentTopic(topicName, 5, 0, false);
 
         // Allow at least 1ms for messages to have different timestamps
         Thread.sleep(1);
         long messageTimestamp = System.currentTimeMillis();
 
-        publishMessagesOnPersistentTopic(topicName, 5, 5);
+        publishMessagesOnPersistentTopic(topicName, 5, 5, false);
 
         // Currently the active consumer is consumerA
         for (int i = 0; i < 10; i++) {
@@ -1866,7 +1886,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         // Closing consumerA activates consumerB
         consumerA.close();
 
-        publishMessagesOnPersistentTopic(topicName, 5, 10);
+        publishMessagesOnPersistentTopic(topicName, 5, 10, false);
 
         int receivedAfterFailover = 0;
         for (int i = 10; i < 15; i++) {
@@ -1901,11 +1921,11 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
 
         assertEquals(admin.topics().getSubscriptions(topicName), Lists.newArrayList("my-sub"));
 
-        publishMessagesOnPersistentTopic(topicName, 5, 0);
+        publishMessagesOnPersistentTopic(topicName, 5, 0, false);
         Thread.sleep(1);
 
         long timestamp = System.currentTimeMillis();
-        publishMessagesOnPersistentTopic(topicName, 5, 5);
+        publishMessagesOnPersistentTopic(topicName, 5, 5, false);
 
         for (int i = 0; i < 10; i++) {
             Message<byte[]> message = consumer.receive();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
new file mode 100644
index 0000000..8ede9b4
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Null value message produce and consume test.
+ */
+@Slf4j
+public class NullValueTest extends BrokerTestBase {
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.baseSetup();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void nullValueBytesSchemaTest() throws PulsarClientException {
+        String topic = "persistent://prop/ns-abc/null-value-bytes-test";
+
+        @Cleanup
+        Producer producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+
+        @Cleanup
+        Consumer consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("test")
+                .subscribe();
+
+        int numMessage = 10;
+        for (int i = 0; i < numMessage; i++) {
+            if (i % 2 == 0) {
+                producer.newMessage().value("not null".getBytes()).send();
+            } else {
+                producer.newMessage().value(null).send();
+            }
+        }
+
+        for (int i = 0; i < numMessage; i++) {
+            Message message = consumer.receive();
+            if (i % 2 == 0) {
+                Assert.assertNotNull(message.getData());
+                Assert.assertNotNull(message.getValue());
+                Assert.assertEquals(new String(message.getData()), "not null");
+            } else {
+                Assert.assertNull(message.getData());
+                Assert.assertNull(message.getValue());
+            }
+            consumer.acknowledge(message);
+        }
+
+        for (int i = 0; i < numMessage; i++) {
+            if (i % 2 == 0) {
+                producer.newMessage().value("not null".getBytes()).sendAsync();
+            } else {
+                producer.newMessage().value(null).sendAsync();
+            }
+        }
+
+        for (int i = 0; i < numMessage; i++) {
+            CompletableFuture<Message> completableFuture = consumer.receiveAsync();
+            final int index = i;
+            completableFuture.whenComplete((message, throwable) -> {
+                Assert.assertNull(throwable);
+                if (index % 2 == 0) {
+                    Assert.assertNotNull(message.getData());
+                    Assert.assertNotNull(message.getValue());
+                    Assert.assertEquals(new String(message.getData()), "not null");
+                } else {
+                    Assert.assertNull(message.getData());
+                    Assert.assertNull(message.getValue());
+                }
+                try {
+                    consumer.acknowledge(message);
+                } catch (PulsarClientException e) {
+                    Assert.assertNull(e);
+                }
+            });
+        }
+
+    }
+
+    @Test
+    public void nullValueBooleanSchemaTest() throws PulsarClientException {
+        String topic = "persistent://prop/ns-abc/null-value-bool-test";
+
+        @Cleanup
+        Producer<Boolean> producer = pulsarClient.newProducer(Schema.BOOL)
+                .topic(topic)
+                .create();
+
+        @Cleanup
+        Consumer<Boolean> consumer = pulsarClient.newConsumer(Schema.BOOL)
+                .topic(topic)
+                .subscriptionName("test")
+                .subscribe();
+
+        int numMessage = 10;
+        for (int i = 0; i < numMessage; i++) {
+            producer.newMessage().value(null).sendAsync();
+        }
+
+        for (int i = 0; i < numMessage; i++) {
+            Message<Boolean> message = consumer.receive();
+            Assert.assertNull(message.getValue());
+            Assert.assertNull(message.getData());
+        }
+
+    }
+
+}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 80b3fe0..f6d4fae 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1288,6 +1288,7 @@ public class TopicsImpl extends BaseResource implements Topics {
         }
 
         String msgId = response.getHeaderString(MESSAGE_ID);
+        PulsarApi.MessageMetadata.Builder messageMetadata = PulsarApi.MessageMetadata.newBuilder();
         try (InputStream stream = (InputStream) response.getEntity()) {
             byte[] data = new byte[stream.available()];
             stream.read(data);
@@ -1298,10 +1299,16 @@ public class TopicsImpl extends BaseResource implements Topics {
             if (tmp != null) {
                 properties.put("publish-time", (String) tmp);
             }
+
+            tmp = headers.getFirst("X-Pulsar-null-value");
+            if (tmp != null) {
+                messageMetadata.setNullValue(Boolean.parseBoolean(tmp.toString()));
+            }
+
             tmp = headers.getFirst(BATCH_HEADER);
             if (response.getHeaderString(BATCH_HEADER) != null) {
                 properties.put(BATCH_HEADER, (String) tmp);
-                return getIndividualMsgsFromBatch(topic, msgId, data, properties);
+                return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata);
             }
             for (Entry<String, List<Object>> entry : headers.entrySet()) {
                 String header = entry.getKey();
@@ -1312,12 +1319,12 @@ public class TopicsImpl extends BaseResource implements Topics {
             }
 
             return Collections.singletonList(new MessageImpl<byte[]>(topic, msgId, properties,
-                    Unpooled.wrappedBuffer(data), Schema.BYTES));
+                    Unpooled.wrappedBuffer(data), Schema.BYTES, messageMetadata));
         }
     }
 
     private List<Message<byte[]>> getIndividualMsgsFromBatch(String topic, String msgId, byte[] data,
-                                                             Map<String, String> properties) {
+                                 Map<String, String> properties, PulsarApi.MessageMetadata.Builder msgMetadataBuilder) {
         List<Message<byte[]>> ret = new ArrayList<>();
         int batchSize = Integer.parseInt(properties.get(BATCH_HEADER));
         ByteBuf buf = Unpooled.wrappedBuffer(data);
@@ -1334,7 +1341,8 @@ public class TopicsImpl extends BaseResource implements Topics {
                         properties.put(entry.getKey(), entry.getValue());
                     }
                 }
-                ret.add(new MessageImpl<>(topic, batchMsgId, properties, singleMessagePayload, Schema.BYTES));
+                ret.add(new MessageImpl<>(topic, batchMsgId, properties, singleMessagePayload,
+                        Schema.BYTES, msgMetadataBuilder));
             } catch (Exception ex) {
                 log.error("Exception occured while trying to get BatchMsgId: {}", batchMsgId, ex);
             }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index b489081..a0e6acd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -557,7 +557,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
         if (canEnqueueMessage(message)) {
             incomingMessages.add(message);
-            INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, message.getData().length);
+            INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(
+                    this, message.getData() == null ? 0 : message.getData().length);
         }
         return hasEnoughMessagesForBatchReceive();
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 04983d9..220bd76 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1289,7 +1289,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         stats.updateNumMsgsReceived(msg);
 
         trackMessage(msg);
-        INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -msg.getData().length);
+        INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, msg.getData() == null ? 0 : -msg.getData().length);
     }
 
     protected void trackMessage(Message<?> msg) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 7cca352..3b90284 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -154,16 +154,20 @@ public class MessageImpl<T> implements Message<T> {
             msgMetadataBuilder.setSequenceId(singleMessageMetadata.getSequenceId());
         }
 
+        if (singleMessageMetadata.hasNullValue()) {
+            msgMetadataBuilder.setNullValue(singleMessageMetadata.hasNullValue());
+        }
+
         this.schema = schema;
     }
 
     public MessageImpl(String topic, String msgId, Map<String, String> properties,
-            byte[] payload, Schema<T> schema) {
-        this(topic, msgId, properties, Unpooled.wrappedBuffer(payload), schema);
+            byte[] payload, Schema<T> schema, MessageMetadata.Builder msgMetadataBuilder) {
+        this(topic, msgId, properties, Unpooled.wrappedBuffer(payload), schema, msgMetadataBuilder);
     }
 
     public MessageImpl(String topic, String msgId, Map<String, String> properties,
-                       ByteBuf payload, Schema<T> schema) {
+                       ByteBuf payload, Schema<T> schema, MessageMetadata.Builder msgMetadataBuilder) {
         String[] data = msgId.split(":");
         long ledgerId = Long.parseLong(data[0]);
         long entryId = Long.parseLong(data[1]);
@@ -178,6 +182,7 @@ public class MessageImpl<T> implements Message<T> {
         this.properties = Collections.unmodifiableMap(properties);
         this.schema = schema;
         this.redeliveryCount = 0;
+        this.msgMetadataBuilder = msgMetadataBuilder;
     }
 
     public static MessageImpl<byte[]> deserialize(ByteBuf headersAndPayload) throws IOException {
@@ -234,6 +239,10 @@ public class MessageImpl<T> implements Message<T> {
 
     @Override
     public byte[] getData() {
+        checkNotNull(msgMetadataBuilder);
+        if (msgMetadataBuilder.hasNullValue()) {
+            return null;
+        }
         if (payload.arrayOffset() == 0 && payload.capacity() == payload.array().length) {
             return payload.array();
         } else {
@@ -259,6 +268,10 @@ public class MessageImpl<T> implements Message<T> {
 
     @Override
     public T getValue() {
+        checkNotNull(msgMetadataBuilder);
+        if (msgMetadataBuilder.hasNullValue()) {
+            return null;
+        }
         if (schema.getSchemaInfo() != null && SchemaType.KEY_VALUE == schema.getSchemaInfo().getType()) {
             if (schema.supportSchemaVersioning()) {
                 return getKeyValueBySchemaVersion();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index 8d78849..85e76a3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -137,8 +137,10 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> {
 
     @Override
     public TypedMessageBuilder<T> value(T value) {
-
-        checkArgument(value != null, "Need Non-Null content value");
+        if (value == null) {
+            msgMetadataBuilder.setNullValue(true);
+            return this;
+        }
         if (schema.getSchemaInfo() != null && schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
             KeyValueSchema kvSchema = (KeyValueSchema) schema;
             org.apache.pulsar.common.schema.KeyValue kv = (org.apache.pulsar.common.schema.KeyValue) value;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
index ac4737d..93f0e63 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.BooleanSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils;
 import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
@@ -413,4 +414,18 @@ public class MessageImplTest {
                 KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
                 KeyValueEncodingType.SEPARATED);
     }
+
+    @Test
+    public void testTypedSchemaGetNullValue() {
+        byte[] encodeBytes = new byte[0];
+        MessageMetadata.Builder builder = MessageMetadata.newBuilder()
+                .setProducerName("valueNotSet");
+        ByteString byteString = ByteString.copyFrom(new byte[0]);
+        builder.setSchemaVersion(byteString);
+        builder.setPartitionKey(Base64.getEncoder().encodeToString(encodeBytes));
+        builder.setPartitionKeyB64Encoded(true);
+        builder.setNullValue(true);
+        MessageImpl<Boolean> msg = MessageImpl.create(builder, ByteBuffer.wrap(encodeBytes), BooleanSchema.of());
+        assertNull(msg.getValue());
+    }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index e282ddc..9e0bf86 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -3599,6 +3599,10 @@ public final class PulsarApi {
     // optional uint64 highest_sequence_id = 24 [default = 0];
     boolean hasHighestSequenceId();
     long getHighestSequenceId();
+    
+    // optional bool null_value = 25 [default = false];
+    boolean hasNullValue();
+    boolean getNullValue();
   }
   public static final class MessageMetadata extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -3969,6 +3973,16 @@ public final class PulsarApi {
       return highestSequenceId_;
     }
     
+    // optional bool null_value = 25 [default = false];
+    public static final int NULL_VALUE_FIELD_NUMBER = 25;
+    private boolean nullValue_;
+    public boolean hasNullValue() {
+      return ((bitField0_ & 0x00080000) == 0x00080000);
+    }
+    public boolean getNullValue() {
+      return nullValue_;
+    }
+    
     private void initFields() {
       producerName_ = "";
       sequenceId_ = 0L;
@@ -3992,6 +4006,7 @@ public final class PulsarApi {
       txnidLeastBits_ = 0L;
       txnidMostBits_ = 0L;
       highestSequenceId_ = 0L;
+      nullValue_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -4100,6 +4115,9 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00040000) == 0x00040000)) {
         output.writeUInt64(24, highestSequenceId_);
       }
+      if (((bitField0_ & 0x00080000) == 0x00080000)) {
+        output.writeBool(25, nullValue_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -4201,6 +4219,10 @@ public final class PulsarApi {
         size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeUInt64Size(24, highestSequenceId_);
       }
+      if (((bitField0_ & 0x00080000) == 0x00080000)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeBoolSize(25, nullValue_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -4358,6 +4380,8 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x00100000);
         highestSequenceId_ = 0L;
         bitField0_ = (bitField0_ & ~0x00200000);
+        nullValue_ = false;
+        bitField0_ = (bitField0_ & ~0x00400000);
         return this;
       }
       
@@ -4483,6 +4507,10 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00040000;
         }
         result.highestSequenceId_ = highestSequenceId_;
+        if (((from_bitField0_ & 0x00400000) == 0x00400000)) {
+          to_bitField0_ |= 0x00080000;
+        }
+        result.nullValue_ = nullValue_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -4576,6 +4604,9 @@ public final class PulsarApi {
         if (other.hasHighestSequenceId()) {
           setHighestSequenceId(other.getHighestSequenceId());
         }
+        if (other.hasNullValue()) {
+          setNullValue(other.getNullValue());
+        }
         return this;
       }
       
@@ -4745,6 +4776,11 @@ public final class PulsarApi {
               highestSequenceId_ = input.readUInt64();
               break;
             }
+            case 200: {
+              bitField0_ |= 0x00400000;
+              nullValue_ = input.readBool();
+              break;
+            }
           }
         }
       }
@@ -5456,6 +5492,27 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional bool null_value = 25 [default = false];
+      private boolean nullValue_ ;
+      public boolean hasNullValue() {
+        return ((bitField0_ & 0x00400000) == 0x00400000);
+      }
+      public boolean getNullValue() {
+        return nullValue_;
+      }
+      public Builder setNullValue(boolean value) {
+        bitField0_ |= 0x00400000;
+        nullValue_ = value;
+        
+        return this;
+      }
+      public Builder clearNullValue() {
+        bitField0_ = (bitField0_ & ~0x00400000);
+        nullValue_ = false;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.MessageMetadata)
     }
     
@@ -5503,6 +5560,10 @@ public final class PulsarApi {
     // optional uint64 sequence_id = 8;
     boolean hasSequenceId();
     long getSequenceId();
+    
+    // optional bool null_value = 9 [default = false];
+    boolean hasNullValue();
+    boolean getNullValue();
   }
   public static final class SingleMessageMetadata extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -5652,6 +5713,16 @@ public final class PulsarApi {
       return sequenceId_;
     }
     
+    // optional bool null_value = 9 [default = false];
+    public static final int NULL_VALUE_FIELD_NUMBER = 9;
+    private boolean nullValue_;
+    public boolean hasNullValue() {
+      return ((bitField0_ & 0x00000080) == 0x00000080);
+    }
+    public boolean getNullValue() {
+      return nullValue_;
+    }
+    
     private void initFields() {
       properties_ = java.util.Collections.emptyList();
       partitionKey_ = "";
@@ -5661,6 +5732,7 @@ public final class PulsarApi {
       partitionKeyB64Encoded_ = false;
       orderingKey_ = org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
       sequenceId_ = 0L;
+      nullValue_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -5713,6 +5785,9 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00000040) == 0x00000040)) {
         output.writeUInt64(8, sequenceId_);
       }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        output.writeBool(9, nullValue_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -5753,6 +5828,10 @@ public final class PulsarApi {
         size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeUInt64Size(8, sequenceId_);
       }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeBoolSize(9, nullValue_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -5882,6 +5961,8 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x00000040);
         sequenceId_ = 0L;
         bitField0_ = (bitField0_ & ~0x00000080);
+        nullValue_ = false;
+        bitField0_ = (bitField0_ & ~0x00000100);
         return this;
       }
       
@@ -5948,6 +6029,10 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00000040;
         }
         result.sequenceId_ = sequenceId_;
+        if (((from_bitField0_ & 0x00000100) == 0x00000100)) {
+          to_bitField0_ |= 0x00000080;
+        }
+        result.nullValue_ = nullValue_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -5985,6 +6070,9 @@ public final class PulsarApi {
         if (other.hasSequenceId()) {
           setSequenceId(other.getSequenceId());
         }
+        if (other.hasNullValue()) {
+          setNullValue(other.getNullValue());
+        }
         return this;
       }
       
@@ -6065,6 +6153,11 @@ public final class PulsarApi {
               sequenceId_ = input.readUInt64();
               break;
             }
+            case 72: {
+              bitField0_ |= 0x00000100;
+              nullValue_ = input.readBool();
+              break;
+            }
           }
         }
       }
@@ -6325,6 +6418,27 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional bool null_value = 9 [default = false];
+      private boolean nullValue_ ;
+      public boolean hasNullValue() {
+        return ((bitField0_ & 0x00000100) == 0x00000100);
+      }
+      public boolean getNullValue() {
+        return nullValue_;
+      }
+      public Builder setNullValue(boolean value) {
+        bitField0_ |= 0x00000100;
+        nullValue_ = value;
+        
+        return this;
+      }
+      public Builder clearNullValue() {
+        bitField0_ = (bitField0_ & ~0x00000100);
+        nullValue_ = false;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.SingleMessageMetadata)
     }
     
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index b344700..0e1a1f9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1642,6 +1642,10 @@ public class Commands {
             singleMessageMetadataBuilder.setSequenceId(msgBuilder.getSequenceId());
         }
 
+        if (msgBuilder.hasNullValue()) {
+            singleMessageMetadataBuilder.setNullValue(msgBuilder.hasNullValue());
+        }
+
         try {
             return serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder, payload, batchBuffer);
         } finally {
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 43a813c..b4b7bd9 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -135,6 +135,9 @@ message MessageMetadata {
 
     /// Add highest sequence id to support batch message with external sequence id
     optional uint64 highest_sequence_id = 24 [default = 0];
+
+    // Indicate if the message payload value is set
+    optional bool null_value = 25 [ default = false ];
 }
 
 message SingleMessageMetadata {
@@ -151,6 +154,8 @@ message SingleMessageMetadata {
     optional bytes ordering_key = 7;
     // Allows consumer retrieve the sequence id that the producer set.
     optional uint64 sequence_id = 8;
+    // Indicate if the message payload value is set
+    optional bool null_value = 9 [ default = false ];
 }
 
 enum ServerError {
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
index cd39d96..99962d8 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -611,7 +612,7 @@ public class PulsarConsumerSourceTests {
 
     private static Message<byte[]> createMessage(String content, String messageId) {
         return new MessageImpl<byte[]>("my-topic", messageId, Collections.emptyMap(),
-                                       content.getBytes(), Schema.BYTES);
+                                       content.getBytes(), Schema.BYTES, PulsarApi.MessageMetadata.newBuilder());
     }
 
     private static String createMessageId(long ledgerId, long entryId, long partitionIndex) {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 77407a1..a6569b3 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntime;
@@ -509,17 +510,18 @@ public class FunctionRuntimeManagerTest {
                 .build();
 
         List<Message<byte[]>> messageList = new LinkedList<>();
+        PulsarApi.MessageMetadata.Builder msgMetadataBuilder = PulsarApi.MessageMetadata.newBuilder();
         Message message1 = spy(new MessageImpl("foo", MessageId.latest.toString(),
-                new HashMap<>(), Unpooled.copiedBuffer(assignment1.toByteArray()), null));
+                new HashMap<>(), Unpooled.copiedBuffer(assignment1.toByteArray()), null, msgMetadataBuilder));
         doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey();
 
         Message message2 = spy(new MessageImpl("foo", MessageId.latest.toString(),
-                new HashMap<>(), Unpooled.copiedBuffer(assignment2.toByteArray()), null));
+                new HashMap<>(), Unpooled.copiedBuffer(assignment2.toByteArray()), null, msgMetadataBuilder));
         doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey();
 
         // delete function2
         Message message3 = spy(new MessageImpl("foo", MessageId.latest.toString(),
-                new HashMap<>(), Unpooled.copiedBuffer("".getBytes()), null));
+                new HashMap<>(), Unpooled.copiedBuffer("".getBytes()), null, msgMetadataBuilder));
         doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment3.getInstance())).when(message3).getKey();
 
         messageList.add(message1);
diff --git a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
index ba67051..7f93d47 100644
--- a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
+++ b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.storm.PulsarSpout.SpoutConsumer;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -80,7 +81,8 @@ public class PulsarSpoutTest {
         ClientBuilder builder = spy(new ClientBuilderImpl());
         PulsarSpout spout = spy(new PulsarSpout(conf, builder));
 
-        Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), new byte[0], Schema.BYTES);
+        Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(),
+                new byte[0], Schema.BYTES, PulsarApi.MessageMetadata.newBuilder());
         Consumer<byte[]> consumer = mock(Consumer.class);
         SpoutConsumer spoutConsumer = new SpoutConsumer(consumer);
         CompletableFuture<Void> future = new CompletableFuture<>();
@@ -154,7 +156,8 @@ public class PulsarSpoutTest {
         when(client.getSharedConsumer(any())).thenReturn(consumer);
         instances.put(componentId, client);
 
-        Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), msgContent.getBytes(), Schema.BYTES);
+        Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(),
+                msgContent.getBytes(), Schema.BYTES, PulsarApi.MessageMetadata.newBuilder());
         when(consumer.receive(anyInt(), any())).thenReturn(msg);
 
         spout.open(config, context, collector);