You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by gu...@apache.org on 2020/03/20 02:09:18 UTC

[pulsar] branch master updated: fix duplicate key to send propertys (#6390)

This is an automated email from the ASF dual-hosted git repository.

guangning pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 79abc88  fix duplicate key to send propertys (#6390)
79abc88 is described below

commit 79abc88fed61c2d34b4850eb1853694f400e00b9
Author: liudezhi <33...@users.noreply.github.com>
AuthorDate: Fri Mar 20 10:09:06 2020 +0800

    fix duplicate key to send propertys (#6390)
    
    **Motivation**
    Fix when sending a message, set duplicate key to properties, can't pull the message while concumer #6388
    ```javascript
    //org.apache.pulsar.client.impl.MessageImpl
    if (msgMetadata.getPropertiesCount() > 0) {
                this.properties = Collections.unmodifiableMap(msgMetadataBuilder.getPropertiesList().stream()
                        .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue)));
            } else {
                properties = Collections.emptyMap();
            }
            this.schema = schema;
    ```
    Collectors.toMap can not allowed duplicate key
    
    **Changes**
    Replace old value with new value
    ```javascript
    if (msgMetadata.getPropertiesCount() > 0) {
                this.properties = Collections.unmodifiableMap(msgMetadataBuilder.getPropertiesList().stream()
                        .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue,
                                (oldValue,newValue) -> newValue)));
            } else {
                properties = Collections.emptyMap();
            }
            this.schema = schema;
    ```
---
 .../java/org/apache/pulsar/client/impl/MessageImpl.java   | 10 +++++++---
 .../org/apache/pulsar/client/impl/MessageImplTest.java    | 15 +++++++++++++++
 2 files changed, 22 insertions(+), 3 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index a268f6c..f11ae77 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -105,7 +105,8 @@ public class MessageImpl<T> implements Message<T> {
 
         if (msgMetadata.getPropertiesCount() > 0) {
             this.properties = Collections.unmodifiableMap(msgMetadataBuilder.getPropertiesList().stream()
-                    .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue)));
+                    .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue,
+                            (oldValue,newValue) -> newValue)));
         } else {
             properties = Collections.emptyMap();
         }
@@ -299,6 +300,7 @@ public class MessageImpl<T> implements Message<T> {
         }
     }
 
+    @Override
     public long getSequenceId() {
         checkNotNull(msgMetadataBuilder);
         if (msgMetadataBuilder.hasSequenceId()) {
@@ -330,8 +332,10 @@ public class MessageImpl<T> implements Message<T> {
     public synchronized Map<String, String> getProperties() {
         if (this.properties == null) {
             if (msgMetadataBuilder.getPropertiesCount() > 0) {
-                this.properties = Collections.unmodifiableMap(msgMetadataBuilder.getPropertiesList().stream()
-                        .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue)));
+                  this.properties = Collections.unmodifiableMap(msgMetadataBuilder.getPropertiesList().stream()
+                           .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue,
+                                   (oldValue,newValue) -> newValue)));
+                
             } else {
                 this.properties = Collections.emptyMap();
             }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
index dd49445..ac4737d 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
@@ -58,6 +58,21 @@ public class MessageImplTest {
     }
 
     @Test
+    public void testSetDuplicatePropertiesKey() {
+        MessageMetadata.Builder builder = MessageMetadata.newBuilder();
+        builder.addProperties(org.apache.pulsar.common.api.proto.PulsarApi.KeyValue.newBuilder()
+                .setKey("key1").setValue("value1").build());
+        builder.addProperties(org.apache.pulsar.common.api.proto.PulsarApi.KeyValue.newBuilder()
+                .setKey("key1").setValue("value2").build());
+        builder.addProperties(org.apache.pulsar.common.api.proto.PulsarApi.KeyValue.newBuilder()
+                .setKey("key3").setValue("value3").build());
+        ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
+        MessageImpl<?> msg = MessageImpl.create(builder, payload, Schema.BYTES);
+        assertEquals("value2", msg.getProperty("key1"));
+        assertEquals("value3", msg.getProperty("key3"));
+    }
+
+    @Test
     public void testGetSequenceIdAssociated() {
         MessageMetadata.Builder builder = MessageMetadata.newBuilder()
             .setSequenceId(1234);