You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/19 01:46:39 UTC

[pulsar] branch branch-2.11 updated (cf26b4e503d -> 24c4bf3be10)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a change to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from cf26b4e503d [fix][functions] Make mandatory to provide a schema in Context::newOutputRecordBuilder (#17118)
     new a1a24b88402 [fix][broker] Pass subscriptionName to auth service (#17123)
     new 93750378ebf [fix][flay-test]BrokerInterceptorTest.testProducerCreation (#17159)
     new f6c4af61b19 [fix][flaky-test]ConsumedLedgersTrimTest (#17116)
     new 24c4bf3be10 [fix][flaky-test] Fix DefaultMessageFormatter.formatMessage (#17104)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../broker/admin/impl/PersistentTopicsBase.java      | 20 ++++++++++----------
 .../broker/intercept/BrokerInterceptorTest.java      | 19 ++++++++++---------
 .../broker/service/ConsumedLedgersTrimTest.java      | 18 ++++++++++++------
 .../pulsar/testclient/DefaultMessageFormatter.java   | 11 ++++-------
 .../testclient/TestDefaultMessageFormatter.java      |  7 +++----
 5 files changed, 39 insertions(+), 36 deletions(-)


[pulsar] 03/04: [fix][flaky-test]ConsumedLedgersTrimTest (#17116)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f6c4af61b1993160b18f056e0d3bd6975ffc60b7
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Fri Aug 19 07:45:45 2022 +0800

    [fix][flaky-test]ConsumedLedgersTrimTest (#17116)
---
 .../pulsar/broker/service/ConsumedLedgersTrimTest.java | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
index 96f8822a72b..355036bdb25 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
@@ -54,6 +54,13 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase {
         super.internalCleanup();
     }
 
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        super.conf.setDefaultRetentionSizeInMB(-1);
+        super.conf.setDefaultRetentionTimeInMinutes(-1);
+    }
+
     @Test
     public void TestConsumedLedgersTrim() throws Exception {
         conf.setRetentionCheckIntervalInSeconds(1);
@@ -96,7 +103,6 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase {
             assertNotNull(msg);
             consumer.acknowledge(msg);
         }
-        Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), msgNum / 2);
 
         //no traffic, but consumed ledger will be cleaned
         Thread.sleep(1500);
@@ -123,7 +129,7 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase {
         PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
         ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
         managedLedgerConfig.setRetentionSizeInMB(-1);
-        managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS);
+        managedLedgerConfig.setRetentionTime(-1, TimeUnit.SECONDS);
         managedLedgerConfig.setMaxEntriesPerLedger(1000);
         managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
         MessageId initialMessageId = persistentTopic.getLastMessageId().get();
@@ -150,15 +156,15 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase {
         assertEquals(messageIdAfterRestart, messageIdBeforeRestart);
 
         persistentTopic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+        managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+        // now we have two ledgers, the first is expired but is contains the lastMessageId
+        // the second is empty and should be kept as it is the current tail
+        Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 2);
         managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
         managedLedgerConfig.setRetentionSizeInMB(-1);
         managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS);
         managedLedgerConfig.setMaxEntriesPerLedger(1);
         managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
-        managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
-        // now we have two ledgers, the first is expired but is contains the lastMessageId
-        // the second is empty and should be kept as it is the current tail
-        Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 2);
 
         // force trimConsumedLedgers
         Thread.sleep(3000);


[pulsar] 02/04: [fix][flay-test]BrokerInterceptorTest.testProducerCreation (#17159)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 93750378ebfda74839f96e029bb99045ef81ff17
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Fri Aug 19 07:43:47 2022 +0800

    [fix][flay-test]BrokerInterceptorTest.testProducerCreation (#17159)
---
 .../broker/intercept/BrokerInterceptorTest.java       | 19 ++++++++++---------
 1 file changed, 10 insertions(+), 9 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
index a26783cda65..b2c3b8d711f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.nar.NarClassLoader;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -114,7 +115,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
         BrokerInterceptor listener = pulsar.getBrokerInterceptor();
         Assert.assertTrue(listener instanceof CounterBrokerInterceptor);
         admin.namespaces().createNamespace("public/test", 4);
-        Assert.assertTrue(((CounterBrokerInterceptor)listener).getCount() >= 1);
+        Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getCount() >= 1);
     }
 
     @Test
@@ -123,7 +124,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
         Assert.assertTrue(listener instanceof CounterBrokerInterceptor);
         pulsarClient.newProducer(Schema.BOOL).topic("test").create();
         // CONNECT and PRODUCER
-        Assert.assertTrue(((CounterBrokerInterceptor)listener).getCount() >= 2);
+        Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getCount() >= 2);
     }
 
     @Test
@@ -133,7 +134,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
         pulsarClient.newProducer(Schema.BOOL).topic("test").create();
         pulsarClient.newConsumer(Schema.STRING).topic("test1").subscriptionName("test-sub").subscribe();
         // single connection for both producer and consumer
-        assertEquals(((CounterBrokerInterceptor) listener).getConnectionCreationCount(), 1);
+        Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getConnectionCreationCount() == 1);
     }
 
     @Test
@@ -142,7 +143,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
         Assert.assertTrue(listener instanceof CounterBrokerInterceptor);
         assertEquals(((CounterBrokerInterceptor) listener).getProducerCount(), 0);
         pulsarClient.newProducer(Schema.BOOL).topic("test").create();
-        assertEquals(((CounterBrokerInterceptor) listener).getProducerCount(), 1);
+        Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getProducerCount() == 1);
     }
 
     @Test
@@ -151,7 +152,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
         Assert.assertTrue(listener instanceof CounterBrokerInterceptor);
         assertEquals(((CounterBrokerInterceptor) listener).getConsumerCount(), 0);
         pulsarClient.newConsumer(Schema.STRING).topic("test1").subscriptionName("test-sub").subscribe();
-        assertEquals(((CounterBrokerInterceptor) listener).getConsumerCount(), 1);
+        Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getConsumerCount() == 1);
     }
 
     @Test
@@ -178,8 +179,8 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
 
         assertEquals(msg.getValue(), "hello world");
 
-        assertEquals(((CounterBrokerInterceptor) listener).getBeforeSendCount(), 1);
-        assertEquals(((CounterBrokerInterceptor)listener).getMessageDispatchCount(),1);
+        Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getBeforeSendCount() == 1);
+        Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getMessageDispatchCount() == 1);
     }
 
     @Test
@@ -195,8 +196,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
             Message<String> message = consumer.receive();
             consumer.acknowledge(message);
         }
-
-        Assert.assertEquals(((CounterBrokerInterceptor) interceptor).getHandleAckCount(), 1);
+        Awaitility.await().until(() -> ((CounterBrokerInterceptor) interceptor).getHandleAckCount() == 1);
     }
 
     @Test
@@ -225,6 +225,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
             }
         });
         future.get();
+        Awaitility.await().until(() -> !interceptor.getResponseList().isEmpty());
         CounterBrokerInterceptor.ResponseEvent responseEvent = interceptor.getResponseList().get(0);
         Assert.assertEquals(responseEvent.getRequestUri(), "/admin/v3/test/asyncGet/my-topic/1000");
         Assert.assertEquals(responseEvent.getResponseStatus(),


[pulsar] 01/04: [fix][broker] Pass subscriptionName to auth service (#17123)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a1a24b88402529b31695e46de8105bd81783bbd9
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Thu Aug 18 10:48:53 2022 -0700

    [fix][broker] Pass subscriptionName to auth service (#17123)
---
 .../broker/admin/impl/PersistentTopicsBase.java      | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index de11d49d773..6521cee29ba 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1555,7 +1555,7 @@ public class PersistentTopicsBase extends AdminResource {
     private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse,
                                                                   String subName, boolean authoritative) {
         validateTopicOwnershipAsync(topicName, authoritative)
-            .thenRun(() -> validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE))
+            .thenRun(() -> validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE, subName))
             .thenCompose(__ -> getTopicReferenceAsync(topicName))
             .thenCompose(topic -> {
                 Subscription sub = topic.getSubscription(subName);
@@ -1590,7 +1590,7 @@ public class PersistentTopicsBase extends AdminResource {
                                                                           Optional<Position> position,
                                                                           boolean authoritative) {
         validateTopicOwnershipAsync(topicName, authoritative)
-                .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.CONSUME))
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName))
                 .thenCompose(__ -> getTopicReferenceAsync(topicName))
                 .thenCompose(topic -> {
                             Subscription sub = topic.getSubscription(subName);
@@ -1646,7 +1646,7 @@ public class PersistentTopicsBase extends AdminResource {
                                       String subName, Map<String, String> subscriptionProperties,
                                       boolean authoritative) {
         validateTopicOwnershipAsync(topicName, authoritative)
-                .thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME))
+                .thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME, subName))
                 .thenCompose(__ -> getTopicReferenceAsync(topicName))
                 .thenCompose(topic -> {
                     Subscription sub = topic.getSubscription(subName);
@@ -1673,7 +1673,7 @@ public class PersistentTopicsBase extends AdminResource {
                                                                             String subName,
                                                                             boolean authoritative) {
         validateTopicOwnershipAsync(topicName, authoritative)
-                .thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME))
+                .thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME, subName))
                 .thenCompose(__ -> getTopicReferenceAsync(topicName))
                 .thenApply((Topic topic) -> {
                     Subscription sub = topic.getSubscription(subName);
@@ -1776,7 +1776,7 @@ public class PersistentTopicsBase extends AdminResource {
     private void internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse asyncResponse,
                                                                             String subName, boolean authoritative) {
         validateTopicOwnershipAsync(topicName, authoritative)
-                .thenRun(() -> validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE))
+                .thenRun(() -> validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE, subName))
                 .thenCompose(__ -> getTopicReferenceAsync(topicName))
                 .thenCompose(topic -> {
                     Subscription sub = topic.getSubscription(subName);
@@ -1923,7 +1923,7 @@ public class PersistentTopicsBase extends AdminResource {
             future = CompletableFuture.completedFuture(null);
         }
         future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
-                .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.SKIP))
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName))
                 .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false)
                      .thenCompose(partitionMetadata -> {
                          if (partitionMetadata.partitions > 0) {
@@ -2332,7 +2332,7 @@ public class PersistentTopicsBase extends AdminResource {
 
         validateTopicOwnershipAsync(topicName, authoritative)
                 .thenCompose(__ -> {
-                    validateTopicOperation(topicName, TopicOperation.SUBSCRIBE);
+                    validateTopicOperation(topicName, TopicOperation.SUBSCRIBE, subscriptionName);
                     return pulsar().getBrokerService().getTopic(topicName.toString(), isAllowAutoTopicCreation);
                 }).thenApply(optTopic -> {
             if (optTopic.isPresent()) {
@@ -2870,7 +2870,7 @@ public class PersistentTopicsBase extends AdminResource {
             ret = CompletableFuture.completedFuture(null);
         }
         return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
-                .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES))
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES, subName))
                 .thenCompose(__ -> getTopicReferenceAsync(topicName))
                 .thenCompose(topic -> {
                     CompletableFuture<Entry> entry;
@@ -3790,7 +3790,7 @@ public class PersistentTopicsBase extends AdminResource {
         }
         future.thenCompose(__ ->
                 validateTopicOwnershipAsync(topicName, authoritative)
-                .thenCompose(unused -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES))
+                .thenCompose(unused -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES, subName))
                 .thenCompose(unused2 ->
                         // If the topic name is a partition name, no need to get partition topic metadata again
                         getPartitionedTopicMetadataAsync(topicName, authoritative, false)
@@ -3942,7 +3942,7 @@ public class PersistentTopicsBase extends AdminResource {
         }
 
         future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
-                .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES))
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES, subName))
                 .thenCompose(__ -> {
                     log.info("[{}][{}] received expire messages on subscription {} to position {}", clientAppId(),
                             topicName, subName, messageId);


[pulsar] 04/04: [fix][flaky-test] Fix DefaultMessageFormatter.formatMessage (#17104)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 24c4bf3be1093b17784888ab6adb46c6ede26f41
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Thu Aug 18 16:08:05 2022 +0800

    [fix][flaky-test] Fix DefaultMessageFormatter.formatMessage (#17104)
---
 .../org/apache/pulsar/testclient/DefaultMessageFormatter.java | 11 ++++-------
 .../apache/pulsar/testclient/TestDefaultMessageFormatter.java |  7 +++----
 2 files changed, 7 insertions(+), 11 deletions(-)

diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/DefaultMessageFormatter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/DefaultMessageFormatter.java
index 4b686b8bb7e..e76166dbe47 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/DefaultMessageFormatter.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/DefaultMessageFormatter.java
@@ -94,14 +94,11 @@ public class DefaultMessageFormatter implements IMessageFormatter {
     }
 
     private String getIntValue(float size) {
-        int i = 0;
-        if (size != 0) {
-            i = (int) get_FloatValue(size);
-        }
-        if (i == 0) {
-            i = r.nextInt() + 1;
+        if (size == 0) {
+            return String.valueOf(r.nextInt());
         }
-        return String.valueOf(i);
+
+        return String.valueOf((int) get_FloatValue(size));
     }
     private String getLongValue(float size) {
         if (size == 0) {
diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/TestDefaultMessageFormatter.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/TestDefaultMessageFormatter.java
index 255f2665107..89bb0251567 100644
--- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/TestDefaultMessageFormatter.java
+++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/TestDefaultMessageFormatter.java
@@ -19,12 +19,11 @@
 package org.apache.pulsar.testclient;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.nio.charset.StandardCharsets;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import java.nio.charset.StandardCharsets;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 public class TestDefaultMessageFormatter {
 
     @Test
@@ -65,7 +64,7 @@ public class TestDefaultMessageFormatter {
         Assert.assertTrue(l3 > 0);
         Assert.assertTrue(l3 <= 99999);
         Assert.assertTrue(i2 < 10);
-        Assert.assertTrue(0 < i2, "i2 was " + i2);
+        Assert.assertTrue(0 <= i2, "i2 was " + i2);
         Assert.assertTrue(f2 < 100000);
         Assert.assertTrue( -100000 < f2);
     }