You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/09/16 15:56:12 UTC

[pulsar] branch master updated: [fix][common] Fix parsing partitionedKey with Base64 encode issue. (#17687)

This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f3cc1071eb1 [fix][common] Fix parsing partitionedKey with Base64 encode issue. (#17687)
f3cc1071eb1 is described below

commit f3cc1071eb1f32b065d7893cee0c149895d843fa
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Sep 16 23:56:03 2022 +0800

    [fix][common] Fix parsing partitionedKey with Base64 encode issue. (#17687)
    
    * Fix parsing partitionedKey with Base64 encode issue.
    
    * release the buf
    
    * fix checkstyle issue.
---
 .../apache/pulsar/common/protocol/Commands.java    |  4 +++
 .../pulsar/common/compression/CommandsTest.java    | 41 +++++++++++++++++++---
 2 files changed, 40 insertions(+), 5 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 95fca7f431b..0ebb2705d3a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -30,6 +30,7 @@ import io.netty.util.concurrent.FastThreadLocal;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Base64;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -1900,6 +1901,9 @@ public class Commands {
             if (metadata.hasOrderingKey()) {
                 return metadata.getOrderingKey();
             } else if (metadata.hasPartitionKey()) {
+                if (metadata.isPartitionKeyB64Encoded()) {
+                    return Base64.getDecoder().decode(metadata.getPartitionKey());
+                }
                 return metadata.getPartitionKey().getBytes(StandardCharsets.UTF_8);
             }
         } catch (Throwable t) {
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
index 24d34ac547f..207c6202426 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
@@ -18,22 +18,23 @@
  */
 package org.apache.pulsar.common.compression;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
-
 import com.scurrilous.circe.checksum.Crc32cIntChecksum;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
-
 import java.io.IOException;
-
+import java.util.Base64;
+import io.netty.util.ReferenceCountUtil;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Commands.ChecksumType;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class CommandsTest {
@@ -93,5 +94,35 @@ public class CommandsTest {
         return computedChecksum;
     }
 
-
+    @Test
+    public void testPeekStickyKey() {
+        String message = "msg-1";
+        String partitionedKey = "key1";
+        MessageMetadata messageMetadata2 = new MessageMetadata()
+                .setSequenceId(1)
+                .setProducerName("testProducer")
+                .setPartitionKey(partitionedKey)
+                .setPartitionKeyB64Encoded(false)
+                .setPublishTime(System.currentTimeMillis());
+        ByteBuf byteBuf = serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata2,
+                Unpooled.copiedBuffer(message.getBytes(UTF_8)));
+        byte[] bytes = Commands.peekStickyKey(byteBuf, "topic-1", "sub-1");
+        String key = new String(bytes);
+        Assert.assertEquals(partitionedKey, key);
+        ReferenceCountUtil.safeRelease(byteBuf);
+        // test 64 encoded
+        String partitionedKey2 = Base64.getEncoder().encodeToString("key2".getBytes(UTF_8));
+        MessageMetadata messageMetadata = new MessageMetadata()
+                .setSequenceId(1)
+                .setProducerName("testProducer")
+                .setPartitionKey(partitionedKey2)
+                .setPartitionKeyB64Encoded(true)
+                .setPublishTime(System.currentTimeMillis());
+        ByteBuf byteBuf2 = serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata,
+                Unpooled.copiedBuffer(message.getBytes(UTF_8)));
+        byte[] bytes2 = Commands.peekStickyKey(byteBuf2, "topic-2", "sub-2");
+        String key2 = Base64.getEncoder().encodeToString(bytes2);;
+        Assert.assertEquals(partitionedKey2, key2);
+        ReferenceCountUtil.safeRelease(byteBuf2);
+    }
 }