You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by gu...@apache.org on 2020/03/20 02:09:18 UTC
[pulsar] branch master updated: fix duplicate key to send propertys
(#6390)
This is an automated email from the ASF dual-hosted git repository.
guangning pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 79abc88 fix duplicate key to send propertys (#6390)
79abc88 is described below
commit 79abc88fed61c2d34b4850eb1853694f400e00b9
Author: liudezhi <33...@users.noreply.github.com>
AuthorDate: Fri Mar 20 10:09:06 2020 +0800
fix duplicate key to send propertys (#6390)
**Motivation**
Fix when sending a message, set duplicate key to properties, can't pull the message while concumer #6388
```javascript
//org.apache.pulsar.client.impl.MessageImpl
if (msgMetadata.getPropertiesCount() > 0) {
this.properties = Collections.unmodifiableMap(msgMetadataBuilder.getPropertiesList().stream()
.collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue)));
} else {
properties = Collections.emptyMap();
}
this.schema = schema;
```
Collectors.toMap can not allowed duplicate key
**Changes**
Replace old value with new value
```javascript
if (msgMetadata.getPropertiesCount() > 0) {
this.properties = Collections.unmodifiableMap(msgMetadataBuilder.getPropertiesList().stream()
.collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue,
(oldValue,newValue) -> newValue)));
} else {
properties = Collections.emptyMap();
}
this.schema = schema;
```
---
.../java/org/apache/pulsar/client/impl/MessageImpl.java | 10 +++++++---
.../org/apache/pulsar/client/impl/MessageImplTest.java | 15 +++++++++++++++
2 files changed, 22 insertions(+), 3 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index a268f6c..f11ae77 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -105,7 +105,8 @@ public class MessageImpl<T> implements Message<T> {
if (msgMetadata.getPropertiesCount() > 0) {
this.properties = Collections.unmodifiableMap(msgMetadataBuilder.getPropertiesList().stream()
- .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue)));
+ .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue,
+ (oldValue,newValue) -> newValue)));
} else {
properties = Collections.emptyMap();
}
@@ -299,6 +300,7 @@ public class MessageImpl<T> implements Message<T> {
}
}
+ @Override
public long getSequenceId() {
checkNotNull(msgMetadataBuilder);
if (msgMetadataBuilder.hasSequenceId()) {
@@ -330,8 +332,10 @@ public class MessageImpl<T> implements Message<T> {
public synchronized Map<String, String> getProperties() {
if (this.properties == null) {
if (msgMetadataBuilder.getPropertiesCount() > 0) {
- this.properties = Collections.unmodifiableMap(msgMetadataBuilder.getPropertiesList().stream()
- .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue)));
+ this.properties = Collections.unmodifiableMap(msgMetadataBuilder.getPropertiesList().stream()
+ .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue,
+ (oldValue,newValue) -> newValue)));
+
} else {
this.properties = Collections.emptyMap();
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
index dd49445..ac4737d 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
@@ -58,6 +58,21 @@ public class MessageImplTest {
}
@Test
+ public void testSetDuplicatePropertiesKey() {
+ MessageMetadata.Builder builder = MessageMetadata.newBuilder();
+ builder.addProperties(org.apache.pulsar.common.api.proto.PulsarApi.KeyValue.newBuilder()
+ .setKey("key1").setValue("value1").build());
+ builder.addProperties(org.apache.pulsar.common.api.proto.PulsarApi.KeyValue.newBuilder()
+ .setKey("key1").setValue("value2").build());
+ builder.addProperties(org.apache.pulsar.common.api.proto.PulsarApi.KeyValue.newBuilder()
+ .setKey("key3").setValue("value3").build());
+ ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
+ MessageImpl<?> msg = MessageImpl.create(builder, payload, Schema.BYTES);
+ assertEquals("value2", msg.getProperty("key1"));
+ assertEquals("value3", msg.getProperty("key3"));
+ }
+
+ @Test
public void testGetSequenceIdAssociated() {
MessageMetadata.Builder builder = MessageMetadata.newBuilder()
.setSequenceId(1234);