You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/09/16 15:56:12 UTC
[pulsar] branch master updated: [fix][common] Fix parsing partitionedKey with Base64 encode issue. (#17687)
This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f3cc1071eb1 [fix][common] Fix parsing partitionedKey with Base64 encode issue. (#17687)
f3cc1071eb1 is described below
commit f3cc1071eb1f32b065d7893cee0c149895d843fa
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Sep 16 23:56:03 2022 +0800
[fix][common] Fix parsing partitionedKey with Base64 encode issue. (#17687)
* Fix parsing partitionedKey with Base64 encode issue.
* release the buf
* fix checkstyle issue.
---
.../apache/pulsar/common/protocol/Commands.java | 4 +++
.../pulsar/common/compression/CommandsTest.java | 41 +++++++++++++++++++---
2 files changed, 40 insertions(+), 5 deletions(-)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 95fca7f431b..0ebb2705d3a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -30,6 +30,7 @@ import io.netty.util.concurrent.FastThreadLocal;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -1900,6 +1901,9 @@ public class Commands {
if (metadata.hasOrderingKey()) {
return metadata.getOrderingKey();
} else if (metadata.hasPartitionKey()) {
+ if (metadata.isPartitionKeyB64Encoded()) {
+ return Base64.getDecoder().decode(metadata.getPartitionKey());
+ }
return metadata.getPartitionKey().getBytes(StandardCharsets.UTF_8);
}
} catch (Throwable t) {
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
index 24d34ac547f..207c6202426 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
@@ -18,22 +18,23 @@
*/
package org.apache.pulsar.common.compression;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-
import com.scurrilous.circe.checksum.Crc32cIntChecksum;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
-
import java.io.IOException;
-
+import java.util.Base64;
+import io.netty.util.ReferenceCountUtil;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Commands.ChecksumType;
+import org.testng.Assert;
import org.testng.annotations.Test;
public class CommandsTest {
@@ -93,5 +94,35 @@ public class CommandsTest {
return computedChecksum;
}
-
+ @Test
+ public void testPeekStickyKey() {
+ String message = "msg-1";
+ String partitionedKey = "key1";
+ MessageMetadata messageMetadata2 = new MessageMetadata()
+ .setSequenceId(1)
+ .setProducerName("testProducer")
+ .setPartitionKey(partitionedKey)
+ .setPartitionKeyB64Encoded(false)
+ .setPublishTime(System.currentTimeMillis());
+ ByteBuf byteBuf = serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata2,
+ Unpooled.copiedBuffer(message.getBytes(UTF_8)));
+ byte[] bytes = Commands.peekStickyKey(byteBuf, "topic-1", "sub-1");
+ String key = new String(bytes);
+ Assert.assertEquals(partitionedKey, key);
+ ReferenceCountUtil.safeRelease(byteBuf);
+ // test 64 encoded
+ String partitionedKey2 = Base64.getEncoder().encodeToString("key2".getBytes(UTF_8));
+ MessageMetadata messageMetadata = new MessageMetadata()
+ .setSequenceId(1)
+ .setProducerName("testProducer")
+ .setPartitionKey(partitionedKey2)
+ .setPartitionKeyB64Encoded(true)
+ .setPublishTime(System.currentTimeMillis());
+ ByteBuf byteBuf2 = serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata,
+ Unpooled.copiedBuffer(message.getBytes(UTF_8)));
+ byte[] bytes2 = Commands.peekStickyKey(byteBuf2, "topic-2", "sub-2");
+ String key2 = Base64.getEncoder().encodeToString(bytes2);;
+ Assert.assertEquals(partitionedKey2, key2);
+ ReferenceCountUtil.safeRelease(byteBuf2);
+ }
}