You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/19 05:29:37 UTC

[pulsar] branch branch-2.11 updated (43ada027134 -> e802b586eb6)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a change to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 43ada027134 [fix][cli] Fix mbeans to json (#17676)
     new 8b2694397ca [fix][broker] Fix namespace backlog quota check with retention. (#17706)
     new 30d7fce0802 [fix][metadata] Set revalidateAfterReconnection true for certain failures (#17664)
     new e802b586eb6 [fix][common] Fix parsing partitionedKey with Base64 encode issue. (#17687)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/pulsar/broker/admin/AdminResource.java  | 10 ++++--
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 32 +++++++++++++++++
 .../apache/pulsar/common/protocol/Commands.java    |  4 +++
 .../pulsar/common/compression/CommandsTest.java    | 41 +++++++++++++++++++---
 .../coordination/impl/ResourceLockImpl.java        |  1 +
 5 files changed, 80 insertions(+), 8 deletions(-)


[pulsar] 03/03: [fix][common] Fix parsing partitionedKey with Base64 encode issue. (#17687)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e802b586eb6c7cf8e2a6842aff2a5def5f70749a
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Sep 16 23:56:03 2022 +0800

    [fix][common] Fix parsing partitionedKey with Base64 encode issue. (#17687)
    
    * Fix parsing partitionedKey with Base64 encode issue.
    
    * release the buf
    
    * fix checkstyle issue.
---
 .../apache/pulsar/common/protocol/Commands.java    |  4 +++
 .../pulsar/common/compression/CommandsTest.java    | 41 +++++++++++++++++++---
 2 files changed, 40 insertions(+), 5 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 62243063741..61ce70a9a0a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -30,6 +30,7 @@ import io.netty.util.concurrent.FastThreadLocal;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Base64;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -1888,6 +1889,9 @@ public class Commands {
             if (metadata.hasOrderingKey()) {
                 return metadata.getOrderingKey();
             } else if (metadata.hasPartitionKey()) {
+                if (metadata.isPartitionKeyB64Encoded()) {
+                    return Base64.getDecoder().decode(metadata.getPartitionKey());
+                }
                 return metadata.getPartitionKey().getBytes(StandardCharsets.UTF_8);
             }
         } catch (Throwable t) {
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
index 24d34ac547f..207c6202426 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
@@ -18,22 +18,23 @@
  */
 package org.apache.pulsar.common.compression;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
-
 import com.scurrilous.circe.checksum.Crc32cIntChecksum;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
-
 import java.io.IOException;
-
+import java.util.Base64;
+import io.netty.util.ReferenceCountUtil;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Commands.ChecksumType;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class CommandsTest {
@@ -93,5 +94,35 @@ public class CommandsTest {
         return computedChecksum;
     }
 
-
+    @Test
+    public void testPeekStickyKey() {
+        String message = "msg-1";
+        String partitionedKey = "key1";
+        MessageMetadata messageMetadata2 = new MessageMetadata()
+                .setSequenceId(1)
+                .setProducerName("testProducer")
+                .setPartitionKey(partitionedKey)
+                .setPartitionKeyB64Encoded(false)
+                .setPublishTime(System.currentTimeMillis());
+        ByteBuf byteBuf = serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata2,
+                Unpooled.copiedBuffer(message.getBytes(UTF_8)));
+        byte[] bytes = Commands.peekStickyKey(byteBuf, "topic-1", "sub-1");
+        String key = new String(bytes);
+        Assert.assertEquals(partitionedKey, key);
+        ReferenceCountUtil.safeRelease(byteBuf);
+        // test 64 encoded
+        String partitionedKey2 = Base64.getEncoder().encodeToString("key2".getBytes(UTF_8));
+        MessageMetadata messageMetadata = new MessageMetadata()
+                .setSequenceId(1)
+                .setProducerName("testProducer")
+                .setPartitionKey(partitionedKey2)
+                .setPartitionKeyB64Encoded(true)
+                .setPublishTime(System.currentTimeMillis());
+        ByteBuf byteBuf2 = serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata,
+                Unpooled.copiedBuffer(message.getBytes(UTF_8)));
+        byte[] bytes2 = Commands.peekStickyKey(byteBuf2, "topic-2", "sub-2");
+        String key2 = Base64.getEncoder().encodeToString(bytes2);;
+        Assert.assertEquals(partitionedKey2, key2);
+        ReferenceCountUtil.safeRelease(byteBuf2);
+    }
 }


[pulsar] 02/03: [fix][metadata] Set revalidateAfterReconnection true for certain failures (#17664)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 30d7fce0802b20c354481a923a3b900586885b24
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Sun Sep 18 06:27:59 2022 -0700

    [fix][metadata] Set revalidateAfterReconnection true for certain failures (#17664)
---
 .../org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java   | 1 +
 1 file changed, 1 insertion(+)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
index bc3f47d41dc..22eaccc278b 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
@@ -196,6 +196,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
                             // We failed to revalidate the lock due to connectivity issue
                             // Continue assuming we hold the lock, until we can revalidate it, either
                             // on Reconnected or SessionReestablished events.
+                            revalidateAfterReconnection = true;
                             log.warn("Failed to revalidate the lock at {}. Retrying later on reconnection {}", path,
                                     ex.getCause().getMessage());
                         }


[pulsar] 01/03: [fix][broker] Fix namespace backlog quota check with retention. (#17706)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8b2694397ca8a26663c726402e4fa1ee01a076a0
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Mon Sep 19 10:29:51 2022 +0800

    [fix][broker] Fix namespace backlog quota check with retention. (#17706)
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 10 +++++--
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 32 ++++++++++++++++++++++
 2 files changed, 39 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 432611a69c0..c9078c63de7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -355,17 +355,21 @@ public abstract class AdminResource extends PulsarWebResource {
     }
 
     protected boolean checkBacklogQuota(BacklogQuota quota, RetentionPolicies retention) {
-        if (retention == null || retention.getRetentionSizeInMB() <= 0 || retention.getRetentionTimeInMinutes() <= 0) {
+        if (retention == null
+                || (retention.getRetentionSizeInMB() <= 0 && retention.getRetentionTimeInMinutes() <= 0)) {
             return true;
         }
         if (quota == null) {
             quota = pulsar().getBrokerService().getBacklogQuotaManager().getDefaultQuota();
         }
-        if (quota.getLimitSize() >= (retention.getRetentionSizeInMB() * 1024 * 1024)) {
+
+        if (retention.getRetentionSizeInMB() > 0
+                && quota.getLimitSize() >= (retention.getRetentionSizeInMB() * 1024 * 1024)) {
             return false;
         }
         // time based quota is in second
-        if (quota.getLimitTime() >= (retention.getRetentionTimeInMinutes() * 60)) {
+        if (retention.getRetentionTimeInMinutes() > 0
+                && quota.getLimitTime() >= retention.getRetentionTimeInMinutes() * 60) {
             return false;
         }
         return true;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 2ec0d76551a..7993750583f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -3397,4 +3397,36 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         long value2 = partitionedStats.getEarliestMsgPublishTimeInBacklogs();
         Assert.assertNotEquals(value2, 0);
     }
+
+    @Test
+    public void testRetentionAndBacklogQuotaCheck() throws PulsarAdminException {
+        String namespace = "prop-xyz/ns1";
+        //test size check.
+        admin.namespaces().setRetention(namespace, new RetentionPolicies(-1, 10));
+        admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(9 * 1024 * 1024).build());
+        Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, () -> {
+            admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(100 * 1024 * 1024).build());
+        });
+
+        //test time check
+        admin.namespaces().setRetention(namespace, new RetentionPolicies(10, -1));
+        admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitTime(9 * 60).build());
+        Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, () -> {
+            admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitTime(11 * 60).build());
+        });
+
+        // test both size and time.
+        admin.namespaces().setRetention(namespace, new RetentionPolicies(10, 10));
+        admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(9 * 1024 * 1024).build());
+        admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitTime(9 * 60).build());
+        admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(9 * 1024 * 1024).
+                limitTime(9 * 60).build());
+        Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, () -> {
+            admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(100 * 1024 * 1024).build());
+        });
+        Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, () -> {
+            admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitTime(100 * 60).build());
+        });
+
+    }
 }