You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/04 14:01:29 UTC

[pulsar] branch master updated: Support KeyValue Schema Use Null Key And Null Value (#7139)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d5ff26  Support KeyValue Schema Use Null Key And Null Value (#7139)
1d5ff26 is described below

commit 1d5ff26ba42fb7ce6d9284d5bcffc1e3686a6f64
Author: ran <ga...@126.com>
AuthorDate: Thu Jun 4 22:01:13 2020 +0800

    Support KeyValue Schema Use Null Key And Null Value (#7139)
    
    Fixes #4804
    
    Thanks, @nlu90, work at #6384.
    
    ### Motivation
    
    Currently, the KeyValue schema doesn't handle `null` key and `null` value well.
---
 .../pulsar/broker/service/NullValueTest.java       |  93 +++++++++++++++++
 .../org/apache/pulsar/common/schema/KeyValue.java  |  33 ++++--
 .../org/apache/pulsar/client/impl/MessageImpl.java |  18 +++-
 .../client/impl/TypedMessageBuilderImpl.java       |  25 ++++-
 .../pulsar/client/impl/schema/KeyValueSchema.java  |  24 +++--
 .../apache/pulsar/common/api/proto/PulsarApi.java  | 114 +++++++++++++++++++++
 .../apache/pulsar/common/protocol/Commands.java    |   4 +
 pulsar-common/src/main/proto/PulsarApi.proto       |   5 +
 8 files changed, 294 insertions(+), 22 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
index 8ede9b4..855bda4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
@@ -26,6 +26,9 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -145,4 +148,94 @@ public class NullValueTest extends BrokerTestBase {
 
     }
 
+    @Test
+    public void keyValueNullInlineTest() throws PulsarClientException {
+        String topic = "persistent://prop/ns-abc/kv-null-value-test";
+
+        @Cleanup
+        Producer<KeyValue<String, String>> producer = pulsarClient
+                .newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING))
+                .topic(topic)
+                .create();
+
+        @Cleanup
+        Consumer<KeyValue<String, String>> consumer = pulsarClient
+                .newConsumer(KeyValueSchema.of(Schema.STRING, Schema.STRING))
+                .topic(topic)
+                .subscriptionName("test")
+                .subscribe();
+
+        int numMessage = 10;
+        for (int i = 0; i < numMessage; i++) {
+            producer.newMessage().value(new KeyValue<>(null, "test")).send();
+            producer.newMessage().value(new KeyValue<>("test", null)).send();
+            producer.newMessage().value(new KeyValue<>(null, null)).send();
+        }
+
+        Message<KeyValue<String, String>> message;
+        KeyValue<String, String> keyValue;
+        for (int i = 0; i < numMessage; i++) {
+            message = consumer.receive();
+            keyValue = message.getValue();
+            Assert.assertNull(keyValue.getKey());
+            Assert.assertEquals("test", keyValue.getValue());
+
+            message = consumer.receive();
+            keyValue = message.getValue();
+            Assert.assertEquals("test", keyValue.getKey());
+            Assert.assertNull(keyValue.getValue());
+
+            message = consumer.receive();
+            keyValue = message.getValue();
+            Assert.assertNull(keyValue.getKey());
+            Assert.assertNull(keyValue.getValue());
+        }
+
+    }
+
+    @Test
+    public void keyValueNullSeparatedTest() throws PulsarClientException {
+        String topic = "persistent://prop/ns-abc/kv-null-value-test";
+
+        @Cleanup
+        Producer<KeyValue<String, String>> producer = pulsarClient
+                .newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED))
+                .topic(topic)
+                .create();
+
+        @Cleanup
+        Consumer<KeyValue<String, String>> consumer = pulsarClient
+                .newConsumer(KeyValueSchema.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED))
+                .topic(topic)
+                .subscriptionName("test")
+                .subscribe();
+
+        int numMessage = 10;
+        for (int i = 0; i < numMessage; i++) {
+            producer.newMessage().value(new KeyValue<>(null, "test")).send();
+            producer.newMessage().value(new KeyValue<>("test", null)).send();
+            producer.newMessage().value(new KeyValue<>(null, null)).send();
+        }
+
+        Message<KeyValue<String, String>> message;
+        KeyValue<String, String> keyValue;
+        for (int i = 0; i < numMessage; i++) {
+            message = consumer.receive();
+            keyValue = message.getValue();
+            Assert.assertNull(keyValue.getKey());
+            Assert.assertEquals("test", keyValue.getValue());
+
+            message = consumer.receive();
+            keyValue = message.getValue();
+            Assert.assertEquals("test", keyValue.getKey());
+            Assert.assertNull(keyValue.getValue());
+
+            message = consumer.receive();
+            keyValue = message.getValue();
+            Assert.assertNull(keyValue.getKey());
+            Assert.assertNull(keyValue.getValue());
+        }
+
+    }
+
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java
index 9d86758..7074c08 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java
@@ -96,10 +96,25 @@ public class KeyValue<K, V> {
      */
     public static <K, V> byte[] encode(K key, Schema<K> keyWriter,
                                        V value, Schema<V> valueWriter) {
-        byte [] keyBytes = keyWriter.encode(key);
-        byte [] valueBytes = valueWriter.encode(value);
+        byte [] keyBytes;
+        if (key == null) {
+            keyBytes = new byte[0];
+        } else {
+            keyBytes = keyWriter.encode(key);
+        }
+
+        byte [] valueBytes;
+        if (value == null) {
+            valueBytes = new byte[0];
+        } else {
+            valueBytes = valueWriter.encode(value);
+        }
         ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keyBytes.length + 4 + valueBytes.length);
-        byteBuffer.putInt(keyBytes.length).put(keyBytes).putInt(valueBytes.length).put(valueBytes);
+        byteBuffer
+            .putInt(key == null ? -1 : keyBytes.length)
+            .put(keyBytes)
+            .putInt(value == null ? -1 : valueBytes.length)
+            .put(valueBytes);
         return byteBuffer.array();
     }
 
@@ -113,12 +128,16 @@ public class KeyValue<K, V> {
     public static <K, V> KeyValue<K, V> decode(byte[] data, KeyValueDecoder<K, V> decoder) {
         ByteBuffer byteBuffer = ByteBuffer.wrap(data);
         int keyLength = byteBuffer.getInt();
-        byte[] keyBytes = new byte[keyLength];
-        byteBuffer.get(keyBytes);
+        byte[] keyBytes = keyLength == -1 ? null : new byte[keyLength];
+        if (keyBytes != null) {
+            byteBuffer.get(keyBytes);
+        }
 
         int valueLength = byteBuffer.getInt();
-        byte[] valueBytes = new byte[valueLength];
-        byteBuffer.get(valueBytes);
+        byte[] valueBytes = valueLength == -1 ? null : new byte[valueLength];
+        if (valueBytes != null) {
+            byteBuffer.get(valueBytes);
+        }
 
         return decoder.decode(keyBytes, valueBytes);
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 3b90284..a07834f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -158,6 +158,10 @@ public class MessageImpl<T> implements Message<T> {
             msgMetadataBuilder.setNullValue(singleMessageMetadata.hasNullValue());
         }
 
+        if (singleMessageMetadata.hasNullPartitionKey()) {
+            msgMetadataBuilder.setNullPartitionKey(singleMessageMetadata.hasNullPartitionKey());
+        }
+
         this.schema = schema;
     }
 
@@ -269,9 +273,6 @@ public class MessageImpl<T> implements Message<T> {
     @Override
     public T getValue() {
         checkNotNull(msgMetadataBuilder);
-        if (msgMetadataBuilder.hasNullValue()) {
-            return null;
-        }
         if (schema.getSchemaInfo() != null && SchemaType.KEY_VALUE == schema.getSchemaInfo().getType()) {
             if (schema.supportSchemaVersioning()) {
                 return getKeyValueBySchemaVersion();
@@ -279,6 +280,9 @@ public class MessageImpl<T> implements Message<T> {
                 return getKeyValue();
             }
         } else {
+            if (msgMetadataBuilder.hasNullValue()) {
+                return null;
+            }
             // check if the schema passed in from client supports schema versioning or not
             // this is an optimization to only get schema version when necessary
             if (schema.supportSchemaVersioning()) {
@@ -298,7 +302,9 @@ public class MessageImpl<T> implements Message<T> {
         KeyValueSchema kvSchema = (KeyValueSchema) schema;
         byte[] schemaVersion = getSchemaVersion();
         if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
-            return (T) kvSchema.decode(getKeyBytes(), getData(), schemaVersion);
+            return (T) kvSchema.decode(
+                    msgMetadataBuilder.hasNullPartitionKey() ? null : getKeyBytes(),
+                    msgMetadataBuilder.hasNullValue() ? null : getData(), schemaVersion);
         } else {
             return schema.decode(getData(), schemaVersion);
         }
@@ -307,7 +313,9 @@ public class MessageImpl<T> implements Message<T> {
     private T getKeyValue() {
         KeyValueSchema kvSchema = (KeyValueSchema) schema;
         if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
-            return (T) kvSchema.decode(getKeyBytes(), getData(), null);
+            return (T) kvSchema.decode(
+                    msgMetadataBuilder.hasNullPartitionKey() ? null : getKeyBytes(),
+                    msgMetadataBuilder.hasNullValue() ? null : getData(), null);
         } else {
             return schema.decode(getData());
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index 85e76a3..613bf02 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -111,6 +111,10 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> {
             KeyValueSchema kvSchema = (KeyValueSchema) schema;
             checkArgument(!(kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED),
                     "This method is not allowed to set keys when in encoding type is SEPARATED");
+            if (key == null) {
+                msgMetadataBuilder.setNullPartitionKey(true);
+                return this;
+            }
         }
         msgMetadataBuilder.setPartitionKey(key);
         msgMetadataBuilder.setPartitionKeyB64Encoded(false);
@@ -123,6 +127,10 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> {
             KeyValueSchema kvSchema = (KeyValueSchema) schema;
             checkArgument(!(kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED),
                     "This method is not allowed to set keys when in encoding type is SEPARATED");
+            if (key == null) {
+                msgMetadataBuilder.setNullPartitionKey(true);
+                return this;
+            }
         }
         msgMetadataBuilder.setPartitionKey(Base64.getEncoder().encodeToString(key));
         msgMetadataBuilder.setPartitionKeyB64Encoded(true);
@@ -146,11 +154,20 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> {
             org.apache.pulsar.common.schema.KeyValue kv = (org.apache.pulsar.common.schema.KeyValue) value;
             if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
                 // set key as the message key
-                msgMetadataBuilder.setPartitionKey(
-                        Base64.getEncoder().encodeToString(kvSchema.getKeySchema().encode(kv.getKey())));
-                msgMetadataBuilder.setPartitionKeyB64Encoded(true);
+                if (kv.getKey() != null) {
+                    msgMetadataBuilder.setPartitionKey(
+                            Base64.getEncoder().encodeToString(kvSchema.getKeySchema().encode(kv.getKey())));
+                    msgMetadataBuilder.setPartitionKeyB64Encoded(true);
+                } else {
+                    this.msgMetadataBuilder.setNullPartitionKey(true);
+                }
+
                 // set value as the payload
-                this.content = ByteBuffer.wrap(kvSchema.getValueSchema().encode(kv.getValue()));
+                if (kv.getValue() != null) {
+                    this.content = ByteBuffer.wrap(kvSchema.getValueSchema().encode(kv.getValue()));
+                } else {
+                    this.msgMetadataBuilder.setNullValue(true);
+                }
                 return this;
             }
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
index b81a947..ebc2f0a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
@@ -132,6 +132,9 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
                 valueSchema
             );
         } else {
+            if (message.getValue() == null) {
+                return null;
+            }
             return valueSchema.encode(message.getValue());
         }
     }
@@ -150,16 +153,25 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
 
     public KeyValue<K, V> decode(byte[] keyBytes, byte[] valueBytes, byte[] schemaVersion) {
         K k;
-        if (keySchema.supportSchemaVersioning() && schemaVersion != null) {
-            k = keySchema.decode(keyBytes, schemaVersion);
+        if (keyBytes == null) {
+            k = null;
         } else {
-            k = keySchema.decode(keyBytes);
+            if (keySchema.supportSchemaVersioning() && schemaVersion != null) {
+                k = keySchema.decode(keyBytes, schemaVersion);
+            } else {
+                k = keySchema.decode(keyBytes);
+            }
         }
+
         V v;
-        if (valueSchema.supportSchemaVersioning() && schemaVersion != null) {
-            v = valueSchema.decode(valueBytes, schemaVersion);
+        if (valueBytes == null) {
+            v = null;
         } else {
-            v = valueSchema.decode(valueBytes);
+            if (valueSchema.supportSchemaVersioning() && schemaVersion != null) {
+                v = valueSchema.decode(valueBytes, schemaVersion);
+            } else {
+                v = valueSchema.decode(valueBytes);
+            }
         }
         return new KeyValue<>(k, v);
     }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index a3d7ff0..c0b0143 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -3718,6 +3718,10 @@ public final class PulsarApi {
     // optional int32 chunk_id = 29;
     boolean hasChunkId();
     int getChunkId();
+    
+    // optional bool null_partition_key = 30 [default = false];
+    boolean hasNullPartitionKey();
+    boolean getNullPartitionKey();
   }
   public static final class MessageMetadata extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -4160,6 +4164,16 @@ public final class PulsarApi {
       return chunkId_;
     }
     
+    // optional bool null_partition_key = 30 [default = false];
+    public static final int NULL_PARTITION_KEY_FIELD_NUMBER = 30;
+    private boolean nullPartitionKey_;
+    public boolean hasNullPartitionKey() {
+      return ((bitField0_ & 0x01000000) == 0x01000000);
+    }
+    public boolean getNullPartitionKey() {
+      return nullPartitionKey_;
+    }
+    
     private void initFields() {
       producerName_ = "";
       sequenceId_ = 0L;
@@ -4188,6 +4202,7 @@ public final class PulsarApi {
       numChunksFromMsg_ = 0;
       totalChunkMsgSize_ = 0;
       chunkId_ = 0;
+      nullPartitionKey_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -4311,6 +4326,9 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00800000) == 0x00800000)) {
         output.writeInt32(29, chunkId_);
       }
+      if (((bitField0_ & 0x01000000) == 0x01000000)) {
+        output.writeBool(30, nullPartitionKey_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -4432,6 +4450,10 @@ public final class PulsarApi {
         size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeInt32Size(29, chunkId_);
       }
+      if (((bitField0_ & 0x01000000) == 0x01000000)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeBoolSize(30, nullPartitionKey_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -4599,6 +4621,8 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x02000000);
         chunkId_ = 0;
         bitField0_ = (bitField0_ & ~0x04000000);
+        nullPartitionKey_ = false;
+        bitField0_ = (bitField0_ & ~0x08000000);
         return this;
       }
       
@@ -4744,6 +4768,10 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00800000;
         }
         result.chunkId_ = chunkId_;
+        if (((from_bitField0_ & 0x08000000) == 0x08000000)) {
+          to_bitField0_ |= 0x01000000;
+        }
+        result.nullPartitionKey_ = nullPartitionKey_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -4852,6 +4880,9 @@ public final class PulsarApi {
         if (other.hasChunkId()) {
           setChunkId(other.getChunkId());
         }
+        if (other.hasNullPartitionKey()) {
+          setNullPartitionKey(other.getNullPartitionKey());
+        }
         return this;
       }
       
@@ -5046,6 +5077,11 @@ public final class PulsarApi {
               chunkId_ = input.readInt32();
               break;
             }
+            case 240: {
+              bitField0_ |= 0x08000000;
+              nullPartitionKey_ = input.readBool();
+              break;
+            }
           }
         }
       }
@@ -5877,6 +5913,27 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional bool null_partition_key = 30 [default = false];
+      private boolean nullPartitionKey_ ;
+      public boolean hasNullPartitionKey() {
+        return ((bitField0_ & 0x08000000) == 0x08000000);
+      }
+      public boolean getNullPartitionKey() {
+        return nullPartitionKey_;
+      }
+      public Builder setNullPartitionKey(boolean value) {
+        bitField0_ |= 0x08000000;
+        nullPartitionKey_ = value;
+        
+        return this;
+      }
+      public Builder clearNullPartitionKey() {
+        bitField0_ = (bitField0_ & ~0x08000000);
+        nullPartitionKey_ = false;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.MessageMetadata)
     }
     
@@ -5928,6 +5985,10 @@ public final class PulsarApi {
     // optional bool null_value = 9 [default = false];
     boolean hasNullValue();
     boolean getNullValue();
+    
+    // optional bool null_partition_key = 10 [default = false];
+    boolean hasNullPartitionKey();
+    boolean getNullPartitionKey();
   }
   public static final class SingleMessageMetadata extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -6087,6 +6148,16 @@ public final class PulsarApi {
       return nullValue_;
     }
     
+    // optional bool null_partition_key = 10 [default = false];
+    public static final int NULL_PARTITION_KEY_FIELD_NUMBER = 10;
+    private boolean nullPartitionKey_;
+    public boolean hasNullPartitionKey() {
+      return ((bitField0_ & 0x00000100) == 0x00000100);
+    }
+    public boolean getNullPartitionKey() {
+      return nullPartitionKey_;
+    }
+    
     private void initFields() {
       properties_ = java.util.Collections.emptyList();
       partitionKey_ = "";
@@ -6097,6 +6168,7 @@ public final class PulsarApi {
       orderingKey_ = org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
       sequenceId_ = 0L;
       nullValue_ = false;
+      nullPartitionKey_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -6152,6 +6224,9 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00000080) == 0x00000080)) {
         output.writeBool(9, nullValue_);
       }
+      if (((bitField0_ & 0x00000100) == 0x00000100)) {
+        output.writeBool(10, nullPartitionKey_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -6196,6 +6271,10 @@ public final class PulsarApi {
         size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeBoolSize(9, nullValue_);
       }
+      if (((bitField0_ & 0x00000100) == 0x00000100)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeBoolSize(10, nullPartitionKey_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -6327,6 +6406,8 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x00000080);
         nullValue_ = false;
         bitField0_ = (bitField0_ & ~0x00000100);
+        nullPartitionKey_ = false;
+        bitField0_ = (bitField0_ & ~0x00000200);
         return this;
       }
       
@@ -6397,6 +6478,10 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00000080;
         }
         result.nullValue_ = nullValue_;
+        if (((from_bitField0_ & 0x00000200) == 0x00000200)) {
+          to_bitField0_ |= 0x00000100;
+        }
+        result.nullPartitionKey_ = nullPartitionKey_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -6437,6 +6522,9 @@ public final class PulsarApi {
         if (other.hasNullValue()) {
           setNullValue(other.getNullValue());
         }
+        if (other.hasNullPartitionKey()) {
+          setNullPartitionKey(other.getNullPartitionKey());
+        }
         return this;
       }
       
@@ -6522,6 +6610,11 @@ public final class PulsarApi {
               nullValue_ = input.readBool();
               break;
             }
+            case 80: {
+              bitField0_ |= 0x00000200;
+              nullPartitionKey_ = input.readBool();
+              break;
+            }
           }
         }
       }
@@ -6803,6 +6896,27 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional bool null_partition_key = 10 [default = false];
+      private boolean nullPartitionKey_ ;
+      public boolean hasNullPartitionKey() {
+        return ((bitField0_ & 0x00000200) == 0x00000200);
+      }
+      public boolean getNullPartitionKey() {
+        return nullPartitionKey_;
+      }
+      public Builder setNullPartitionKey(boolean value) {
+        bitField0_ |= 0x00000200;
+        nullPartitionKey_ = value;
+        
+        return this;
+      }
+      public Builder clearNullPartitionKey() {
+        bitField0_ = (bitField0_ & ~0x00000200);
+        nullPartitionKey_ = false;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.SingleMessageMetadata)
     }
     
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 51d0a11..d822519 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1678,6 +1678,10 @@ public class Commands {
             singleMessageMetadataBuilder.setNullValue(msgBuilder.hasNullValue());
         }
 
+        if (msgBuilder.hasNullPartitionKey()) {
+            singleMessageMetadataBuilder.setNullPartitionKey(msgBuilder.hasNullPartitionKey());
+        }
+
         try {
             return serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder, payload, batchBuffer);
         } finally {
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index d43617b..c483fce 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -143,6 +143,9 @@ message MessageMetadata {
 	optional int32 num_chunks_from_msg = 27;
 	optional int32 total_chunk_msg_size = 28;
 	optional int32 chunk_id = 29;
+
+    // Indicate if the message partition key is set
+    optional bool null_partition_key = 30 [default = false];
 }
 
 message SingleMessageMetadata {
@@ -161,6 +164,8 @@ message SingleMessageMetadata {
     optional uint64 sequence_id = 8;
     // Indicate if the message payload value is set
     optional bool null_value = 9 [ default = false ];
+    // Indicate if the message partition key is set
+    optional bool null_partition_key = 10 [ default = false];
 }
 
 enum ServerError {