You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/04 14:01:29 UTC
[pulsar] branch master updated: Support KeyValue Schema Use Null
Key And Null Value (#7139)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1d5ff26 Support KeyValue Schema Use Null Key And Null Value (#7139)
1d5ff26 is described below
commit 1d5ff26ba42fb7ce6d9284d5bcffc1e3686a6f64
Author: ran <ga...@126.com>
AuthorDate: Thu Jun 4 22:01:13 2020 +0800
Support KeyValue Schema Use Null Key And Null Value (#7139)
Fixes #4804
Thanks, @nlu90, work at #6384.
### Motivation
Currently, the KeyValue schema doesn't handle `null` key and `null` value well.
---
.../pulsar/broker/service/NullValueTest.java | 93 +++++++++++++++++
.../org/apache/pulsar/common/schema/KeyValue.java | 33 ++++--
.../org/apache/pulsar/client/impl/MessageImpl.java | 18 +++-
.../client/impl/TypedMessageBuilderImpl.java | 25 ++++-
.../pulsar/client/impl/schema/KeyValueSchema.java | 24 +++--
.../apache/pulsar/common/api/proto/PulsarApi.java | 114 +++++++++++++++++++++
.../apache/pulsar/common/protocol/Commands.java | 4 +
pulsar-common/src/main/proto/PulsarApi.proto | 5 +
8 files changed, 294 insertions(+), 22 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
index 8ede9b4..855bda4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
@@ -26,6 +26,9 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -145,4 +148,94 @@ public class NullValueTest extends BrokerTestBase {
}
+ @Test
+ public void keyValueNullInlineTest() throws PulsarClientException {
+ String topic = "persistent://prop/ns-abc/kv-null-value-test";
+
+ @Cleanup
+ Producer<KeyValue<String, String>> producer = pulsarClient
+ .newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING))
+ .topic(topic)
+ .create();
+
+ @Cleanup
+ Consumer<KeyValue<String, String>> consumer = pulsarClient
+ .newConsumer(KeyValueSchema.of(Schema.STRING, Schema.STRING))
+ .topic(topic)
+ .subscriptionName("test")
+ .subscribe();
+
+ int numMessage = 10;
+ for (int i = 0; i < numMessage; i++) {
+ producer.newMessage().value(new KeyValue<>(null, "test")).send();
+ producer.newMessage().value(new KeyValue<>("test", null)).send();
+ producer.newMessage().value(new KeyValue<>(null, null)).send();
+ }
+
+ Message<KeyValue<String, String>> message;
+ KeyValue<String, String> keyValue;
+ for (int i = 0; i < numMessage; i++) {
+ message = consumer.receive();
+ keyValue = message.getValue();
+ Assert.assertNull(keyValue.getKey());
+ Assert.assertEquals("test", keyValue.getValue());
+
+ message = consumer.receive();
+ keyValue = message.getValue();
+ Assert.assertEquals("test", keyValue.getKey());
+ Assert.assertNull(keyValue.getValue());
+
+ message = consumer.receive();
+ keyValue = message.getValue();
+ Assert.assertNull(keyValue.getKey());
+ Assert.assertNull(keyValue.getValue());
+ }
+
+ }
+
+ @Test
+ public void keyValueNullSeparatedTest() throws PulsarClientException {
+ String topic = "persistent://prop/ns-abc/kv-null-value-test";
+
+ @Cleanup
+ Producer<KeyValue<String, String>> producer = pulsarClient
+ .newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED))
+ .topic(topic)
+ .create();
+
+ @Cleanup
+ Consumer<KeyValue<String, String>> consumer = pulsarClient
+ .newConsumer(KeyValueSchema.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED))
+ .topic(topic)
+ .subscriptionName("test")
+ .subscribe();
+
+ int numMessage = 10;
+ for (int i = 0; i < numMessage; i++) {
+ producer.newMessage().value(new KeyValue<>(null, "test")).send();
+ producer.newMessage().value(new KeyValue<>("test", null)).send();
+ producer.newMessage().value(new KeyValue<>(null, null)).send();
+ }
+
+ Message<KeyValue<String, String>> message;
+ KeyValue<String, String> keyValue;
+ for (int i = 0; i < numMessage; i++) {
+ message = consumer.receive();
+ keyValue = message.getValue();
+ Assert.assertNull(keyValue.getKey());
+ Assert.assertEquals("test", keyValue.getValue());
+
+ message = consumer.receive();
+ keyValue = message.getValue();
+ Assert.assertEquals("test", keyValue.getKey());
+ Assert.assertNull(keyValue.getValue());
+
+ message = consumer.receive();
+ keyValue = message.getValue();
+ Assert.assertNull(keyValue.getKey());
+ Assert.assertNull(keyValue.getValue());
+ }
+
+ }
+
}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java
index 9d86758..7074c08 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java
@@ -96,10 +96,25 @@ public class KeyValue<K, V> {
*/
public static <K, V> byte[] encode(K key, Schema<K> keyWriter,
V value, Schema<V> valueWriter) {
- byte [] keyBytes = keyWriter.encode(key);
- byte [] valueBytes = valueWriter.encode(value);
+ byte [] keyBytes;
+ if (key == null) {
+ keyBytes = new byte[0];
+ } else {
+ keyBytes = keyWriter.encode(key);
+ }
+
+ byte [] valueBytes;
+ if (value == null) {
+ valueBytes = new byte[0];
+ } else {
+ valueBytes = valueWriter.encode(value);
+ }
ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keyBytes.length + 4 + valueBytes.length);
- byteBuffer.putInt(keyBytes.length).put(keyBytes).putInt(valueBytes.length).put(valueBytes);
+ byteBuffer
+ .putInt(key == null ? -1 : keyBytes.length)
+ .put(keyBytes)
+ .putInt(value == null ? -1 : valueBytes.length)
+ .put(valueBytes);
return byteBuffer.array();
}
@@ -113,12 +128,16 @@ public class KeyValue<K, V> {
public static <K, V> KeyValue<K, V> decode(byte[] data, KeyValueDecoder<K, V> decoder) {
ByteBuffer byteBuffer = ByteBuffer.wrap(data);
int keyLength = byteBuffer.getInt();
- byte[] keyBytes = new byte[keyLength];
- byteBuffer.get(keyBytes);
+ byte[] keyBytes = keyLength == -1 ? null : new byte[keyLength];
+ if (keyBytes != null) {
+ byteBuffer.get(keyBytes);
+ }
int valueLength = byteBuffer.getInt();
- byte[] valueBytes = new byte[valueLength];
- byteBuffer.get(valueBytes);
+ byte[] valueBytes = valueLength == -1 ? null : new byte[valueLength];
+ if (valueBytes != null) {
+ byteBuffer.get(valueBytes);
+ }
return decoder.decode(keyBytes, valueBytes);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 3b90284..a07834f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -158,6 +158,10 @@ public class MessageImpl<T> implements Message<T> {
msgMetadataBuilder.setNullValue(singleMessageMetadata.hasNullValue());
}
+ if (singleMessageMetadata.hasNullPartitionKey()) {
+ msgMetadataBuilder.setNullPartitionKey(singleMessageMetadata.hasNullPartitionKey());
+ }
+
this.schema = schema;
}
@@ -269,9 +273,6 @@ public class MessageImpl<T> implements Message<T> {
@Override
public T getValue() {
checkNotNull(msgMetadataBuilder);
- if (msgMetadataBuilder.hasNullValue()) {
- return null;
- }
if (schema.getSchemaInfo() != null && SchemaType.KEY_VALUE == schema.getSchemaInfo().getType()) {
if (schema.supportSchemaVersioning()) {
return getKeyValueBySchemaVersion();
@@ -279,6 +280,9 @@ public class MessageImpl<T> implements Message<T> {
return getKeyValue();
}
} else {
+ if (msgMetadataBuilder.hasNullValue()) {
+ return null;
+ }
// check if the schema passed in from client supports schema versioning or not
// this is an optimization to only get schema version when necessary
if (schema.supportSchemaVersioning()) {
@@ -298,7 +302,9 @@ public class MessageImpl<T> implements Message<T> {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
byte[] schemaVersion = getSchemaVersion();
if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
- return (T) kvSchema.decode(getKeyBytes(), getData(), schemaVersion);
+ return (T) kvSchema.decode(
+ msgMetadataBuilder.hasNullPartitionKey() ? null : getKeyBytes(),
+ msgMetadataBuilder.hasNullValue() ? null : getData(), schemaVersion);
} else {
return schema.decode(getData(), schemaVersion);
}
@@ -307,7 +313,9 @@ public class MessageImpl<T> implements Message<T> {
private T getKeyValue() {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
- return (T) kvSchema.decode(getKeyBytes(), getData(), null);
+ return (T) kvSchema.decode(
+ msgMetadataBuilder.hasNullPartitionKey() ? null : getKeyBytes(),
+ msgMetadataBuilder.hasNullValue() ? null : getData(), null);
} else {
return schema.decode(getData());
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index 85e76a3..613bf02 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -111,6 +111,10 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
checkArgument(!(kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED),
"This method is not allowed to set keys when in encoding type is SEPARATED");
+ if (key == null) {
+ msgMetadataBuilder.setNullPartitionKey(true);
+ return this;
+ }
}
msgMetadataBuilder.setPartitionKey(key);
msgMetadataBuilder.setPartitionKeyB64Encoded(false);
@@ -123,6 +127,10 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
checkArgument(!(kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED),
"This method is not allowed to set keys when in encoding type is SEPARATED");
+ if (key == null) {
+ msgMetadataBuilder.setNullPartitionKey(true);
+ return this;
+ }
}
msgMetadataBuilder.setPartitionKey(Base64.getEncoder().encodeToString(key));
msgMetadataBuilder.setPartitionKeyB64Encoded(true);
@@ -146,11 +154,20 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> {
org.apache.pulsar.common.schema.KeyValue kv = (org.apache.pulsar.common.schema.KeyValue) value;
if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
// set key as the message key
- msgMetadataBuilder.setPartitionKey(
- Base64.getEncoder().encodeToString(kvSchema.getKeySchema().encode(kv.getKey())));
- msgMetadataBuilder.setPartitionKeyB64Encoded(true);
+ if (kv.getKey() != null) {
+ msgMetadataBuilder.setPartitionKey(
+ Base64.getEncoder().encodeToString(kvSchema.getKeySchema().encode(kv.getKey())));
+ msgMetadataBuilder.setPartitionKeyB64Encoded(true);
+ } else {
+ this.msgMetadataBuilder.setNullPartitionKey(true);
+ }
+
// set value as the payload
- this.content = ByteBuffer.wrap(kvSchema.getValueSchema().encode(kv.getValue()));
+ if (kv.getValue() != null) {
+ this.content = ByteBuffer.wrap(kvSchema.getValueSchema().encode(kv.getValue()));
+ } else {
+ this.msgMetadataBuilder.setNullValue(true);
+ }
return this;
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
index b81a947..ebc2f0a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
@@ -132,6 +132,9 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
valueSchema
);
} else {
+ if (message.getValue() == null) {
+ return null;
+ }
return valueSchema.encode(message.getValue());
}
}
@@ -150,16 +153,25 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
public KeyValue<K, V> decode(byte[] keyBytes, byte[] valueBytes, byte[] schemaVersion) {
K k;
- if (keySchema.supportSchemaVersioning() && schemaVersion != null) {
- k = keySchema.decode(keyBytes, schemaVersion);
+ if (keyBytes == null) {
+ k = null;
} else {
- k = keySchema.decode(keyBytes);
+ if (keySchema.supportSchemaVersioning() && schemaVersion != null) {
+ k = keySchema.decode(keyBytes, schemaVersion);
+ } else {
+ k = keySchema.decode(keyBytes);
+ }
}
+
V v;
- if (valueSchema.supportSchemaVersioning() && schemaVersion != null) {
- v = valueSchema.decode(valueBytes, schemaVersion);
+ if (valueBytes == null) {
+ v = null;
} else {
- v = valueSchema.decode(valueBytes);
+ if (valueSchema.supportSchemaVersioning() && schemaVersion != null) {
+ v = valueSchema.decode(valueBytes, schemaVersion);
+ } else {
+ v = valueSchema.decode(valueBytes);
+ }
}
return new KeyValue<>(k, v);
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index a3d7ff0..c0b0143 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -3718,6 +3718,10 @@ public final class PulsarApi {
// optional int32 chunk_id = 29;
boolean hasChunkId();
int getChunkId();
+
+ // optional bool null_partition_key = 30 [default = false];
+ boolean hasNullPartitionKey();
+ boolean getNullPartitionKey();
}
public static final class MessageMetadata extends
org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -4160,6 +4164,16 @@ public final class PulsarApi {
return chunkId_;
}
+ // optional bool null_partition_key = 30 [default = false];
+ public static final int NULL_PARTITION_KEY_FIELD_NUMBER = 30;
+ private boolean nullPartitionKey_;
+ public boolean hasNullPartitionKey() {
+ return ((bitField0_ & 0x01000000) == 0x01000000);
+ }
+ public boolean getNullPartitionKey() {
+ return nullPartitionKey_;
+ }
+
private void initFields() {
producerName_ = "";
sequenceId_ = 0L;
@@ -4188,6 +4202,7 @@ public final class PulsarApi {
numChunksFromMsg_ = 0;
totalChunkMsgSize_ = 0;
chunkId_ = 0;
+ nullPartitionKey_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -4311,6 +4326,9 @@ public final class PulsarApi {
if (((bitField0_ & 0x00800000) == 0x00800000)) {
output.writeInt32(29, chunkId_);
}
+ if (((bitField0_ & 0x01000000) == 0x01000000)) {
+ output.writeBool(30, nullPartitionKey_);
+ }
}
private int memoizedSerializedSize = -1;
@@ -4432,6 +4450,10 @@ public final class PulsarApi {
size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
.computeInt32Size(29, chunkId_);
}
+ if (((bitField0_ & 0x01000000) == 0x01000000)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeBoolSize(30, nullPartitionKey_);
+ }
memoizedSerializedSize = size;
return size;
}
@@ -4599,6 +4621,8 @@ public final class PulsarApi {
bitField0_ = (bitField0_ & ~0x02000000);
chunkId_ = 0;
bitField0_ = (bitField0_ & ~0x04000000);
+ nullPartitionKey_ = false;
+ bitField0_ = (bitField0_ & ~0x08000000);
return this;
}
@@ -4744,6 +4768,10 @@ public final class PulsarApi {
to_bitField0_ |= 0x00800000;
}
result.chunkId_ = chunkId_;
+ if (((from_bitField0_ & 0x08000000) == 0x08000000)) {
+ to_bitField0_ |= 0x01000000;
+ }
+ result.nullPartitionKey_ = nullPartitionKey_;
result.bitField0_ = to_bitField0_;
return result;
}
@@ -4852,6 +4880,9 @@ public final class PulsarApi {
if (other.hasChunkId()) {
setChunkId(other.getChunkId());
}
+ if (other.hasNullPartitionKey()) {
+ setNullPartitionKey(other.getNullPartitionKey());
+ }
return this;
}
@@ -5046,6 +5077,11 @@ public final class PulsarApi {
chunkId_ = input.readInt32();
break;
}
+ case 240: {
+ bitField0_ |= 0x08000000;
+ nullPartitionKey_ = input.readBool();
+ break;
+ }
}
}
}
@@ -5877,6 +5913,27 @@ public final class PulsarApi {
return this;
}
+ // optional bool null_partition_key = 30 [default = false];
+ private boolean nullPartitionKey_ ;
+ public boolean hasNullPartitionKey() {
+ return ((bitField0_ & 0x08000000) == 0x08000000);
+ }
+ public boolean getNullPartitionKey() {
+ return nullPartitionKey_;
+ }
+ public Builder setNullPartitionKey(boolean value) {
+ bitField0_ |= 0x08000000;
+ nullPartitionKey_ = value;
+
+ return this;
+ }
+ public Builder clearNullPartitionKey() {
+ bitField0_ = (bitField0_ & ~0x08000000);
+ nullPartitionKey_ = false;
+
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:pulsar.proto.MessageMetadata)
}
@@ -5928,6 +5985,10 @@ public final class PulsarApi {
// optional bool null_value = 9 [default = false];
boolean hasNullValue();
boolean getNullValue();
+
+ // optional bool null_partition_key = 10 [default = false];
+ boolean hasNullPartitionKey();
+ boolean getNullPartitionKey();
}
public static final class SingleMessageMetadata extends
org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -6087,6 +6148,16 @@ public final class PulsarApi {
return nullValue_;
}
+ // optional bool null_partition_key = 10 [default = false];
+ public static final int NULL_PARTITION_KEY_FIELD_NUMBER = 10;
+ private boolean nullPartitionKey_;
+ public boolean hasNullPartitionKey() {
+ return ((bitField0_ & 0x00000100) == 0x00000100);
+ }
+ public boolean getNullPartitionKey() {
+ return nullPartitionKey_;
+ }
+
private void initFields() {
properties_ = java.util.Collections.emptyList();
partitionKey_ = "";
@@ -6097,6 +6168,7 @@ public final class PulsarApi {
orderingKey_ = org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
sequenceId_ = 0L;
nullValue_ = false;
+ nullPartitionKey_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -6152,6 +6224,9 @@ public final class PulsarApi {
if (((bitField0_ & 0x00000080) == 0x00000080)) {
output.writeBool(9, nullValue_);
}
+ if (((bitField0_ & 0x00000100) == 0x00000100)) {
+ output.writeBool(10, nullPartitionKey_);
+ }
}
private int memoizedSerializedSize = -1;
@@ -6196,6 +6271,10 @@ public final class PulsarApi {
size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
.computeBoolSize(9, nullValue_);
}
+ if (((bitField0_ & 0x00000100) == 0x00000100)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeBoolSize(10, nullPartitionKey_);
+ }
memoizedSerializedSize = size;
return size;
}
@@ -6327,6 +6406,8 @@ public final class PulsarApi {
bitField0_ = (bitField0_ & ~0x00000080);
nullValue_ = false;
bitField0_ = (bitField0_ & ~0x00000100);
+ nullPartitionKey_ = false;
+ bitField0_ = (bitField0_ & ~0x00000200);
return this;
}
@@ -6397,6 +6478,10 @@ public final class PulsarApi {
to_bitField0_ |= 0x00000080;
}
result.nullValue_ = nullValue_;
+ if (((from_bitField0_ & 0x00000200) == 0x00000200)) {
+ to_bitField0_ |= 0x00000100;
+ }
+ result.nullPartitionKey_ = nullPartitionKey_;
result.bitField0_ = to_bitField0_;
return result;
}
@@ -6437,6 +6522,9 @@ public final class PulsarApi {
if (other.hasNullValue()) {
setNullValue(other.getNullValue());
}
+ if (other.hasNullPartitionKey()) {
+ setNullPartitionKey(other.getNullPartitionKey());
+ }
return this;
}
@@ -6522,6 +6610,11 @@ public final class PulsarApi {
nullValue_ = input.readBool();
break;
}
+ case 80: {
+ bitField0_ |= 0x00000200;
+ nullPartitionKey_ = input.readBool();
+ break;
+ }
}
}
}
@@ -6803,6 +6896,27 @@ public final class PulsarApi {
return this;
}
+ // optional bool null_partition_key = 10 [default = false];
+ private boolean nullPartitionKey_ ;
+ public boolean hasNullPartitionKey() {
+ return ((bitField0_ & 0x00000200) == 0x00000200);
+ }
+ public boolean getNullPartitionKey() {
+ return nullPartitionKey_;
+ }
+ public Builder setNullPartitionKey(boolean value) {
+ bitField0_ |= 0x00000200;
+ nullPartitionKey_ = value;
+
+ return this;
+ }
+ public Builder clearNullPartitionKey() {
+ bitField0_ = (bitField0_ & ~0x00000200);
+ nullPartitionKey_ = false;
+
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:pulsar.proto.SingleMessageMetadata)
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 51d0a11..d822519 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1678,6 +1678,10 @@ public class Commands {
singleMessageMetadataBuilder.setNullValue(msgBuilder.hasNullValue());
}
+ if (msgBuilder.hasNullPartitionKey()) {
+ singleMessageMetadataBuilder.setNullPartitionKey(msgBuilder.hasNullPartitionKey());
+ }
+
try {
return serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder, payload, batchBuffer);
} finally {
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index d43617b..c483fce 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -143,6 +143,9 @@ message MessageMetadata {
optional int32 num_chunks_from_msg = 27;
optional int32 total_chunk_msg_size = 28;
optional int32 chunk_id = 29;
+
+ // Indicate if the message partition key is set
+ optional bool null_partition_key = 30 [default = false];
}
message SingleMessageMetadata {
@@ -161,6 +164,8 @@ message SingleMessageMetadata {
optional uint64 sequence_id = 8;
// Indicate if the message payload value is set
optional bool null_value = 9 [ default = false ];
+ // Indicate if the message partition key is set
+ optional bool null_partition_key = 10 [ default = false];
}
enum ServerError {