You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/07/27 08:36:22 UTC

[pulsar] branch branch-2.8 updated (43cd22e632d -> bda2c0514cd)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 43cd22e632d [Java Client] Fix wrong behavior of deduplication for key based batching (#15413)
     new ad868acd689 Rename test file name from *Test2.java to *Test.java
     new baf35d2d19c [broker][monitoring] add message ack rate metric for consumer (#15674)
     new 0ea0e2bd075 [Function] provide default error handler for function log appender (#15728)
     new e80d8a05b55 [fix][client] Remove consumer when close consumer command is received (#15761)
     new 49ecd542bbf [Fix][broker] Fix NPE when ledger id not found in `OpReadEntry` (#15837)
     new 03a0da4824e [fix][auth] Generate correct well-known OpenID configuration URL (#15928)
     new 48923ce1811 [fix][client] Remove producer when close producer command is received (#16028)
     new 45ce7bcbcf3 rename pulsar_producer_configuration_set_crypto_failure_action to pulsar_producer_configuration_get_crypto_failure_action (#16031)
     new b01d5d2da26 [improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043)
     new bda2c0514cd [fix][broker] Fix create client with TLS config (#16014)

The 10 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  9 +--
 .../bookkeeper/mledger/impl/OpReadEntry.java       |  4 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 28 ++++++++
 .../org/apache/pulsar/broker/PulsarService.java    | 12 ++--
 .../pulsar/broker/service/BrokerService.java       | 48 ++++++++++---
 .../org/apache/pulsar/broker/service/Consumer.java | 26 +++++--
 .../apache/pulsar/broker/service/ServerCnx.java    |  1 +
 .../pulsar/broker/service/StreamingStats.java      |  1 +
 .../nonpersistent/NonPersistentSubscription.java   |  1 +
 .../service/nonpersistent/NonPersistentTopic.java  |  4 ++
 .../service/persistent/PersistentSubscription.java |  1 +
 .../broker/service/persistent/PersistentTopic.java |  3 +
 .../stats/prometheus/AggregatedConsumerStats.java  |  2 +
 .../stats/prometheus/AggregatedNamespaceStats.java |  2 +
 .../prometheus/AggregatedSubscriptionStats.java    |  2 +
 .../stats/prometheus/NamespaceStatsAggregator.java |  2 +
 .../pulsar/broker/stats/prometheus/TopicStats.java |  7 ++
 .../{AdminApiTest2.java => AdminApi2Test.java}     |  3 +-
 .../pulsar/broker/stats/ConsumerStatsTest.java     | 81 ++++++++++++++++++++++
 .../pulsar/common/policies/data/ConsumerStats.java |  7 +-
 .../common/policies/data/SubscriptionStats.java    |  5 ++
 pulsar-client-cpp/lib/auth/AuthOauth2.cc           |  9 ++-
 pulsar-client-cpp/lib/auth/AuthOauth2.h            |  1 +
 pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc |  2 +-
 pulsar-client-cpp/tests/AuthPluginTest.cc          | 20 ++++++
 .../org/apache/pulsar/client/impl/ClientCnx.java   | 11 +--
 .../apache/pulsar/client/impl/ClientCnxTest.java   | 38 ++++++++++
 .../policies/data/stats/ConsumerStatsImpl.java     |  8 ++-
 .../policies/data/stats/SubscriptionStatsImpl.java |  5 ++
 pulsar-common/src/main/proto/PulsarApi.proto       |  3 +
 .../pulsar/functions/instance/LogAppender.java     | 25 +++++--
 31 files changed, 327 insertions(+), 44 deletions(-)
 rename pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/{AdminApiTest2.java => AdminApi2Test.java} (99%)


[pulsar] 10/10: [fix][broker] Fix create client with TLS config (#16014)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bda2c0514cd9543cd4ddcfa31fad608f99454138
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Wed Jun 15 21:40:25 2022 +0800

    [fix][broker] Fix create client with TLS config (#16014)
    
    ### Motivation
    
    In PulsarService, create a client with an incorrect config.
    
    When `tlsEnabled` is `true`, and `brokerClientTlsEnabled` is `false`, users will meet `Failed reason: General OpenSslEngine problem`, due to `tlsTrustCertsFilePath` is incorrect.
    
    ### Modifications
    
    - Fix check TLS enable
    - Setup ciphers and protocols
    - Remove duplicate setTlsTrustCertsFilePath
    
    (cherry picked from commit 22057ca0296e4eb6e0c9d41bc10e24bdbdc71efc)
---
 .../main/java/org/apache/pulsar/broker/PulsarService.java    | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 14897831755..49440be5452 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1356,12 +1356,14 @@ public class PulsarService implements AutoCloseable {
                         .filterAndMapProperties(this.getConfiguration().getProperties(), "brokerClient_");
                 ClientConfigurationData conf =
                         ConfigurationDataUtils.loadData(overrides, initialConf, ClientConfigurationData.class);
-                conf.setServiceUrl(this.getConfiguration().isTlsEnabled()
-                                ? this.brokerServiceUrlTls : this.brokerServiceUrl);
-                conf.setTlsAllowInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection());
-                conf.setTlsTrustCertsFilePath(this.getConfiguration().getTlsCertificateFilePath());
 
-                if (this.getConfiguration().isBrokerClientTlsEnabled()) {
+                boolean tlsEnabled = this.getConfiguration().isBrokerClientTlsEnabled();
+                conf.setServiceUrl(tlsEnabled ? this.brokerServiceUrlTls : this.brokerServiceUrl);
+
+                if (tlsEnabled) {
+                    conf.setTlsCiphers(this.getConfiguration().getBrokerClientTlsCiphers());
+                    conf.setTlsProtocols(this.getConfiguration().getBrokerClientTlsProtocols());
+                    conf.setTlsAllowInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection());
                     if (this.getConfiguration().isBrokerClientTlsEnabledWithKeyStore()) {
                         conf.setUseKeyStoreTls(true);
                         conf.setTlsTrustStoreType(this.getConfiguration().getBrokerClientTlsTrustStoreType());


[pulsar] 08/10: rename pulsar_producer_configuration_set_crypto_failure_action to pulsar_producer_configuration_get_crypto_failure_action (#16031)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 45ce7bcbcf3510bca3e6de96edae924d1dd228a8
Author: Frank Xiong <Fr...@outlook.com>
AuthorDate: Tue Jun 14 02:20:37 2022 +0800

    rename pulsar_producer_configuration_set_crypto_failure_action to pulsar_producer_configuration_get_crypto_failure_action (#16031)
    
    Fixes #16030
    
    ### Motivation
    Fix symlink error for function pulsar_producer_configuration_get_crypto_failure_action
    
    ### Modifications
    
    Rename function name `pulsar_producer_configuration_set_crypto_failure_action` to `pulsar_producer_configuration_get_crypto_failure_action`
    
    (cherry picked from commit bff34000385f6faf6dbff4385d0dc562602ac623)
---
 pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
index f26f63a593b..906a4d8230c 100644
--- a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
@@ -209,7 +209,7 @@ void pulsar_producer_configuration_set_default_crypto_key_reader(pulsar_producer
     conf->conf.setCryptoKeyReader(keyReader);
 }
 
-pulsar_producer_crypto_failure_action pulsar_producer_configuration_set_crypto_failure_action(
+pulsar_producer_crypto_failure_action pulsar_producer_configuration_get_crypto_failure_action(
     pulsar_producer_configuration_t *conf) {
     return (pulsar_producer_crypto_failure_action)conf->conf.getCryptoFailureAction();
 }


[pulsar] 05/10: [Fix][broker] Fix NPE when ledger id not found in `OpReadEntry` (#15837)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 49ecd542bbf3ef6161112b66c3d880e81fa1636c
Author: Qiang Zhao <ma...@gmail.com>
AuthorDate: Tue Jun 7 21:52:05 2022 +0800

    [Fix][broker] Fix NPE when ledger id not found in `OpReadEntry` (#15837)
    
    (cherry picked from commit 7a3ad611f51511afca4bcaa1de299517a1907e8e)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  9 ++-----
 .../bookkeeper/mledger/impl/OpReadEntry.java       |  4 ++--
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 28 ++++++++++++++++++++++
 3 files changed, 32 insertions(+), 9 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 89e9ac250bd..44e0ed96a25 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2153,14 +2153,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         }
     }
 
-    PositionImpl startReadOperationOnLedger(PositionImpl position, OpReadEntry opReadEntry) {
+    PositionImpl startReadOperationOnLedger(PositionImpl position) {
         Long ledgerId = ledgers.ceilingKey(position.getLedgerId());
-        if (null == ledgerId) {
-            opReadEntry.readEntriesFailed(new ManagedLedgerException.NoMoreEntriesToReadException("The ceilingKey(K key) method is used to return the " +
-                    "least key greater than or equal to the given key, or null if there is no such key"), null);
-        }
-
-        if (ledgerId != position.getLedgerId()) {
+        if (ledgerId != null && ledgerId != position.getLedgerId()) {
             // The ledger pointed by this position does not exist anymore. It was deleted because it was empty. We need
             // to skip on the next available ledger
             position = new PositionImpl(ledgerId, 0);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index b751034a0c4..caf0cde24eb 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -48,7 +48,7 @@ class OpReadEntry implements ReadEntriesCallback {
     public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPositionRef, int count,
             ReadEntriesCallback callback, Object ctx, PositionImpl maxPosition) {
         OpReadEntry op = RECYCLER.get();
-        op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef, op);
+        op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef);
         op.cursor = cursor;
         op.count = count;
         op.callback = callback;
@@ -138,7 +138,7 @@ class OpReadEntry implements ReadEntriesCallback {
                 ((PositionImpl) cursor.getReadPosition()).compareTo(maxPosition) < 0) {
             // We still have more entries to read from the next ledger, schedule a new async operation
             cursor.ledger.getExecutor().execute(safeRun(() -> {
-                readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
+                readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition);
                 cursor.ledger.asyncReadEntries(OpReadEntry.this);
             }));
         } else {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index d2f3be9ef0e..67c90d625ea 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -52,6 +52,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
@@ -407,6 +408,33 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         ledger.close();
     }
 
+    @Test
+    public void testStartReadOperationOnLedgerWithEmptyLedgers() throws ManagedLedgerException, InterruptedException {
+        ManagedLedger ledger = factory.open("my_test_ledger_1");
+        ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger;
+        NavigableMap<Long, LedgerInfo> ledgers = ledgerImpl.getLedgersInfo();
+        LedgerInfo ledgerInfo = ledgers.firstEntry().getValue();
+        ledgers.clear();
+        ManagedCursor c1 = ledger.openCursor("c1");
+        PositionImpl position = new PositionImpl(ledgerInfo.getLedgerId(), 0);
+        PositionImpl maxPosition = new PositionImpl(ledgerInfo.getLedgerId(), 99);
+        OpReadEntry opReadEntry = OpReadEntry.create((ManagedCursorImpl) c1, position, 20,
+                new ReadEntriesCallback() {
+
+                    @Override
+                    public void readEntriesComplete(List<Entry> entries, Object ctx) {
+
+                    }
+
+                    @Override
+                    public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+
+                    }
+                }, null, maxPosition);
+        Assert.assertEquals(opReadEntry.readPosition, position);
+    }
+
+
     @Test(timeOut = 20000)
     public void spanningMultipleLedgersWithSize() throws Exception {
         ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1000000);


[pulsar] 03/10: [Function] provide default error handler for function log appender (#15728)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0ea0e2bd0752796ad24f4d377edac3919290814a
Author: Neng Lu <nl...@streamnative.io>
AuthorDate: Sun Jun 12 18:05:25 2022 -0700

    [Function] provide default error handler for function log appender (#15728)
    
    (cherry picked from commit f7635ec6d99bd5a13a31c7e9f17640746afec43c)
---
 .../pulsar/functions/instance/LogAppender.java     | 25 ++++++++++++++++++----
 1 file changed, 21 insertions(+), 4 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java
index bbca3f9efa1..46b90e54381 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java
@@ -19,7 +19,12 @@
 package org.apache.pulsar.functions.instance;
 
 import java.nio.charset.StandardCharsets;
-import org.apache.logging.log4j.core.*;
+import java.util.concurrent.TimeUnit;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.ErrorHandler;
+import org.apache.logging.log4j.core.Layout;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.DefaultErrorHandler;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -32,6 +37,11 @@ import java.util.concurrent.TimeUnit;
  * to a log topic.
  */
 public class LogAppender implements Appender {
+
+    private static final String LOG_LEVEL = "loglevel";
+    private static final String INSTANCE = "instance";
+    private static final String FQN = "fqn";
+
     private PulsarClient pulsarClient;
     private String logTopic;
     private String fqn;
@@ -45,15 +55,16 @@ public class LogAppender implements Appender {
         this.logTopic = logTopic;
         this.fqn = fqn;
         this.instance = instance;
+        this.errorHandler = new DefaultErrorHandler(this);
     }
 
     @Override
     public void append(LogEvent logEvent) {
         producer.newMessage()
                 .value(logEvent.getMessage().getFormattedMessage().getBytes(StandardCharsets.UTF_8))
-                .property("loglevel", logEvent.getLevel().name())
-                .property("instance", instance)
-                .property("fqn", fqn)
+                .property(LOG_LEVEL, logEvent.getLevel().name())
+                .property(INSTANCE, instance)
+                .property(FQN, fqn)
                 .sendAsync();
     }
 
@@ -79,6 +90,12 @@ public class LogAppender implements Appender {
 
     @Override
     public void setHandler(ErrorHandler errorHandler) {
+        if (errorHandler == null) {
+            throw new RuntimeException("The log error handler cannot be set to null");
+        }
+        if (isStarted()) {
+            throw new RuntimeException("The log error handler cannot be changed once the appender is started");
+        }
         this.errorHandler = errorHandler;
     }
 


[pulsar] 01/10: Rename test file name from *Test2.java to *Test.java

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ad868acd689b8be7273f21b157d6f922cc6de074
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Jul 27 14:27:22 2022 +0800

    Rename test file name from *Test2.java to *Test.java
    
    To resolve the conflicts brought by https://github.com/apache/pulsar/pull/13644
---
 .../pulsar/broker/admin/{AdminApiTest2.java => AdminApi2Test.java}     | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
similarity index 99%
rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 581bd33f0a4..445dec0b633 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.broker.admin;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -108,7 +107,7 @@ import org.testng.annotations.Test;
 
 @Slf4j
 @Test(groups = "broker")
-public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
+public class AdminApi2Test extends MockedPulsarServiceBaseTest {
 
     private MockedPulsarService mockPulsarSetup;
 


[pulsar] 09/10: [improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b01d5d2da269c5f072167e8f21d569e25a35306f
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Jun 14 17:03:37 2022 +0800

    [improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043)
    
    * [improve][broker] Avoid reconnection when a partitioned topic was created concurrently
    
    ### Motivation
    
    When a partitioned topic was created concurrently, especially when
    automatically created by many producers. This case can be reproduced
    easily by configuring `allowAutoTopicCreationType=non-partitioned` and
    starting a Pulsar standalone. Then, run the following code:
    
    ```java
    try (PulsarClient client = PulsarClient.builder()
            .serviceUrl("pulsar://localhost:6650").build()) {
        for (int i = 0; i < 10; i++) {
            client.newProducer().topic("topic").createAsync();
        }
        Thread.sleep(1000);
    }
    ```
    
    We can see a lot of "Could not get connection while
    getPartitionedTopicMetadata" warning logs at client side, while there
    were more warning logs with full stack traces at broker side:
    
    ```
    2022-06-14T02:04:20,522+0800 [metadata-store-22-1] WARN  org.apache.pulsar.broker.service.ServerCnx - Failed to get Partitioned Metadata [/127.0.0.1:64846] persistent://public/default/topic: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /admin/partitioned-topics/public/default/persistent/topic
    org.apache.pulsar.metadata.api.MetadataStoreException$AlreadyExistsException: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /admin/partitioned-topics/public/default/persistent/topic
    ```
    
    It's because when broker handles the partitioned metadata command, it
    calls `fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync` and
    will try creating a partitioned topic if it doesn't exist. It's a race
    condition that if many connections are established during a short time
    interval and one of them created successfully, the following will fail
    with the `AlreadyExistsException`.
    
    ### Modifications
    
    Handles the `MetadataStoreException.AlreadyExistsException` in
    `unsafeGetPartitionedTopicMetadataAsync`. In this case, invoke
    `fetchPartitionedTopicMetadataAsync` to get the partitioned metadata
    again.
    
    ### Verifying this change
    
    Even if without this patch, the creation of producers could also succeed
    because they will reconnect to broker again after 100 ms because broker
    will return a `ServiceNotReady` error in thiss case. The only way to
    verify this fix is reproducing the bug again with this patch, we can
    see no reconnection will happen from the logs.
    
    * Revert "[improve][broker] Avoid reconnection when a partitioned topic was created concurrently"
    
    This reverts commit c259c0fdcfb299e6ed861796f7e2ab50632f9087.
    
    * Handle AlreadyExistsException in fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync
    
    (cherry picked from commit 2a7a8555c0b0296bcaa6a757a8646b8f65185ac6)
---
 .../pulsar/broker/service/BrokerService.java       | 48 +++++++++++++++++-----
 1 file changed, 38 insertions(+), 10 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bdaded637b6..d943734e0a8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2412,16 +2412,44 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                 .thenCompose(topicExists -> {
                     return fetchPartitionedTopicMetadataAsync(topicName)
                             .thenCompose(metadata -> {
-                                // If topic is already exist, creating partitioned topic is not allowed.
-                                if (metadata.partitions == 0
-                                        && !topicExists
-                                        && !topicName.isPartitioned()
-                                        && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName)
-                                        && pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) {
-                                    return pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName);
-                                } else {
-                                    return CompletableFuture.completedFuture(metadata);
-                                }
+                                CompletableFuture<PartitionedTopicMetadata> future = new CompletableFuture<>();
+
+                                // There are a couple of potentially blocking calls, which we cannot make from the
+                                // MetadataStore callback thread.
+                                pulsar.getExecutor().execute(() -> {
+                                    // If topic is already exist, creating partitioned topic is not allowed.
+
+                                    if (metadata.partitions == 0
+                                            && !topicExists
+                                            && !topicName.isPartitioned()
+                                            && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName)
+                                            && pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) {
+
+                                        pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName)
+                                                .thenAccept(md -> future.complete(md))
+                                                .exceptionally(ex -> {
+                                                    if (ex.getCause()
+                                                            instanceof MetadataStoreException.AlreadyExistsException) {
+                                                        // The partitioned topic might be created concurrently
+                                                        fetchPartitionedTopicMetadataAsync(topicName)
+                                                                .whenComplete((metadata2, ex2) -> {
+                                                                    if (ex2 == null) {
+                                                                        future.complete(metadata2);
+                                                                    } else {
+                                                                        future.completeExceptionally(ex2);
+                                                                    }
+                                                                });
+                                                    } else {
+                                                        future.completeExceptionally(ex);
+                                                    }
+                                                    return null;
+                                                });
+                                    } else {
+                                        future.complete(metadata);
+                                    }
+                                });
+
+                                return future;
                             });
                 });
     }


[pulsar] 06/10: [fix][auth] Generate correct well-known OpenID configuration URL (#15928)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 03a0da4824e3ad91705c8c3e1a16d298841667eb
Author: ran <ga...@126.com>
AuthorDate: Tue Jun 7 15:46:57 2022 +0800

    [fix][auth] Generate correct well-known OpenID configuration URL (#15928)
    
    (cherry picked from commit 304b03e7ff3eeff62c31f93738af488eb44abde0)
---
 pulsar-client-cpp/lib/auth/AuthOauth2.cc  |  9 ++++++++-
 pulsar-client-cpp/lib/auth/AuthOauth2.h   |  1 +
 pulsar-client-cpp/tests/AuthPluginTest.cc | 20 ++++++++++++++++++++
 3 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/pulsar-client-cpp/lib/auth/AuthOauth2.cc b/pulsar-client-cpp/lib/auth/AuthOauth2.cc
index 582354aab11..2fce8047a3a 100644
--- a/pulsar-client-cpp/lib/auth/AuthOauth2.cc
+++ b/pulsar-client-cpp/lib/auth/AuthOauth2.cc
@@ -143,6 +143,8 @@ ClientCredentialFlow::ClientCredentialFlow(ParamMap& params)
       audience_(params["audience"]),
       scope_(params["scope"]) {}
 
+std::string ClientCredentialFlow::getTokenEndPoint() const { return tokenEndPoint_; }
+
 static size_t curlWriteCallback(void* contents, size_t size, size_t nmemb, void* responseDataPtr) {
     ((std::string*)responseDataPtr)->append((char*)contents, size * nmemb);
     return size * nmemb;
@@ -168,7 +170,12 @@ void ClientCredentialFlow::initialize() {
     curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, "GET");
 
     // set URL: well-know endpoint
-    curl_easy_setopt(handle, CURLOPT_URL, (issuerUrl_ + "/.well-known/openid-configuration").c_str());
+    std::string wellKnownUrl = issuerUrl_;
+    if (wellKnownUrl.back() == '/') {
+        wellKnownUrl.pop_back();
+    }
+    wellKnownUrl.append("/.well-known/openid-configuration");
+    curl_easy_setopt(handle, CURLOPT_URL, wellKnownUrl.c_str());
 
     // Write callback
     curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlWriteCallback);
diff --git a/pulsar-client-cpp/lib/auth/AuthOauth2.h b/pulsar-client-cpp/lib/auth/AuthOauth2.h
index 59e8ad9320a..c940cf96985 100644
--- a/pulsar-client-cpp/lib/auth/AuthOauth2.h
+++ b/pulsar-client-cpp/lib/auth/AuthOauth2.h
@@ -58,6 +58,7 @@ class ClientCredentialFlow : public Oauth2Flow {
     void close();
 
     ParamMap generateParamMap() const;
+    std::string getTokenEndPoint() const;
 
    private:
     std::string tokenEndPoint_;
diff --git a/pulsar-client-cpp/tests/AuthPluginTest.cc b/pulsar-client-cpp/tests/AuthPluginTest.cc
index be987e07c48..01c19ebbea4 100644
--- a/pulsar-client-cpp/tests/AuthPluginTest.cc
+++ b/pulsar-client-cpp/tests/AuthPluginTest.cc
@@ -412,6 +412,26 @@ TEST(AuthPluginTest, testOauth2RequestBody) {
     ASSERT_EQ(flow2.generateParamMap(), expectedResult2);
 }
 
+TEST(AuthPluginTest, testInitialize) {
+    std::string issuerUrl = "https://dev-kt-aa9ne.us.auth0.com";
+    std::string expectedTokenEndPoint = issuerUrl + "/oauth/token";
+
+    ParamMap params;
+    params["issuer_url"] = issuerUrl;
+    params["client_id"] = "Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x";
+    params["client_secret"] = "rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ-g07ZH52N_poGAb";
+    params["audience"] = "https://dev-kt-aa9ne.us.auth0.com/api/v2/";
+
+    ClientCredentialFlow flow1(params);
+    flow1.initialize();
+    ASSERT_EQ(flow1.getTokenEndPoint(), expectedTokenEndPoint);
+
+    params["issuer_url"] = issuerUrl + "/";
+    ClientCredentialFlow flow2(params);
+    flow2.initialize();
+    ASSERT_EQ(flow2.getTokenEndPoint(), expectedTokenEndPoint);
+}
+
 TEST(AuthPluginTest, testOauth2Failure) {
     ParamMap params;
     auto addKeyValue = [&](const std::string& key, const std::string& value) {


[pulsar] 02/10: [broker][monitoring] add message ack rate metric for consumer (#15674)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit baf35d2d19c8324fefe426b3fd30cd92f6ec94b5
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Mon Jun 13 09:56:50 2022 +0800

    [broker][monitoring] add message ack rate metric for consumer (#15674)
    
    (cherry picked from commit 88b47e5e5ac1b7fcf895bd72b0545b24bdf61f7e)
---
 .../org/apache/pulsar/broker/service/Consumer.java | 26 +++++--
 .../apache/pulsar/broker/service/ServerCnx.java    |  1 +
 .../pulsar/broker/service/StreamingStats.java      |  1 +
 .../nonpersistent/NonPersistentSubscription.java   |  1 +
 .../service/nonpersistent/NonPersistentTopic.java  |  4 ++
 .../service/persistent/PersistentSubscription.java |  1 +
 .../broker/service/persistent/PersistentTopic.java |  3 +
 .../stats/prometheus/AggregatedConsumerStats.java  |  2 +
 .../stats/prometheus/AggregatedNamespaceStats.java |  2 +
 .../prometheus/AggregatedSubscriptionStats.java    |  2 +
 .../stats/prometheus/NamespaceStatsAggregator.java |  2 +
 .../pulsar/broker/stats/prometheus/TopicStats.java |  7 ++
 .../pulsar/broker/stats/ConsumerStatsTest.java     | 81 ++++++++++++++++++++++
 .../pulsar/common/policies/data/ConsumerStats.java |  7 +-
 .../common/policies/data/SubscriptionStats.java    |  5 ++
 .../policies/data/stats/ConsumerStatsImpl.java     |  8 ++-
 .../policies/data/stats/SubscriptionStatsImpl.java |  5 ++
 pulsar-common/src/main/proto/PulsarApi.proto       |  3 +
 18 files changed, 153 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 4be5cb641d3..e3af35434f2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -82,6 +82,7 @@ public class Consumer {
     private final Rate msgRedeliver;
     private final LongAdder msgOutCounter;
     private final LongAdder bytesOutCounter;
+    private final Rate messageAckRate;
 
     private long lastConsumedTimestamp;
     private long lastAckedTimestamp;
@@ -154,6 +155,7 @@ public class Consumer {
         this.msgRedeliver = new Rate();
         this.bytesOutCounter = new LongAdder();
         this.msgOutCounter = new LongAdder();
+        this.messageAckRate = new Rate();
         this.appId = appId;
 
         // Ensure we start from compacted view
@@ -347,6 +349,8 @@ public class Consumer {
     }
 
     public CompletableFuture<Void> messageAcked(CommandAck ack) {
+        CompletableFuture<Void> future;
+
         this.lastAckedTimestamp = System.currentTimeMillis();
         Map<String, Long> properties = Collections.emptyMap();
         if (ack.getPropertiesCount() > 0) {
@@ -378,20 +382,27 @@ public class Consumer {
             }
             if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) {
                 List<PositionImpl> positionsAcked = Collections.singletonList(position);
-                return transactionCumulativeAcknowledge(ack.getTxnidMostBits(),
+                future = transactionCumulativeAcknowledge(ack.getTxnidMostBits(),
                         ack.getTxnidLeastBits(), positionsAcked);
             } else {
                 List<Position> positionsAcked = Collections.singletonList(position);
                 subscription.acknowledgeMessage(positionsAcked, AckType.Cumulative, properties);
-                return CompletableFuture.completedFuture(null);
+                future = CompletableFuture.completedFuture(null);
             }
         } else {
             if (ack.hasTxnidLeastBits() && ack.hasTxnidMostBits()) {
-                return individualAckWithTransaction(ack);
+                future = individualAckWithTransaction(ack);
             } else {
-                return individualAckNormal(ack, properties);
+                future = individualAckNormal(ack, properties);
             }
         }
+
+        return future
+                .whenComplete((__, t) -> {
+                    if (t == null) {
+                        this.messageAckRate.recordEvent(ack.getMessageIdsCount());
+                    }
+                });
     }
 
     //this method is for individual ack not carry the transaction
@@ -704,7 +715,10 @@ public class Consumer {
         msgOut.calculateRate();
         chunkedMessageRate.calculateRate();
         msgRedeliver.calculateRate();
+        messageAckRate.calculateRate();
+
         stats.msgRateOut = msgOut.getRate();
+        stats.messageAckRate = messageAckRate.getRate();
         stats.msgThroughputOut = msgOut.getValueRate();
         stats.msgRateRedeliver = msgRedeliver.getRate();
         stats.chunkedMessageRate = chunkedMessageRate.getRate();
@@ -717,8 +731,8 @@ public class Consumer {
         lastAckedTimestamp = consumerStats.lastAckedTimestamp;
         lastConsumedTimestamp = consumerStats.lastConsumedTimestamp;
         MESSAGE_PERMITS_UPDATER.set(this, consumerStats.availablePermits);
-        if (log.isDebugEnabled()){
-            log.debug("[{}-{}] Setting broker.service.Consumer's messagePermits to {} for consumer", topicName,
+        if (log.isDebugEnabled()) {
+            log.debug("[{}-{}] Setting broker.service.Consumer's messagePermits to {} for consumer {}", topicName,
                     subscription, consumerStats.availablePermits, consumerId);
         }
         unackedMessages = consumerStats.unackedMessages;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 138f9490df7..5aeefff05dd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -608,6 +608,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                 .setConnectedSince(consumerStats.getConnectedSince())
                 .setMsgBacklog(subscription.getNumberOfEntriesInBacklog(false))
                 .setMsgRateExpired(subscription.getExpiredMessageRate())
+                .setMessageAckRate(consumerStats.messageAckRate)
                 .setType(subscription.getTypeString());
 
         return Commands.serializeWithSize(cmd);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java
index 02dcb8233ff..469c802b76a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java
@@ -65,6 +65,7 @@ public class StreamingStats {
         statsStream.writePair("msgThroughputOut", stats.msgThroughputOut);
         statsStream.writePair("msgRateRedeliver", stats.msgRateRedeliver);
         statsStream.writePair("avgMessagesPerEntry", stats.avgMessagesPerEntry);
+        statsStream.writePair("messageAckRate", stats.messageAckRate);
 
         if (Subscription.isIndividualAckMode(subType)) {
             statsStream.writePair("unackedMessages", stats.unackedMessages);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 59fab2df3c6..cb515cdbd97 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -453,6 +453,7 @@ public class NonPersistentSubscription implements Subscription {
                 ConsumerStatsImpl consumerStats = consumer.getStats();
                 subStats.consumers.add(consumerStats);
                 subStats.msgRateOut += consumerStats.msgRateOut;
+                subStats.messageAckRate += consumerStats.messageAckRate;
                 subStats.msgThroughputOut += consumerStats.msgThroughputOut;
                 subStats.bytesOutCounter += consumerStats.bytesOutCounter;
                 subStats.msgOutCounter += consumerStats.msgOutCounter;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 50ac27ff352..e7e650a4f4a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -697,6 +697,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
             double subMsgRateOut = 0;
             double subMsgThroughputOut = 0;
             double subMsgRateRedeliver = 0;
+            double subMsgAckRate = 0;
 
             // Start subscription name & consumers
             try {
@@ -711,6 +712,8 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
 
                     ConsumerStatsImpl consumerStats = consumer.getStats();
                     subMsgRateOut += consumerStats.msgRateOut;
+                    subMsgAckRate += consumerStats.messageAckRate;
+
                     subMsgThroughputOut += consumerStats.msgThroughputOut;
                     subMsgRateRedeliver += consumerStats.msgRateRedeliver;
 
@@ -725,6 +728,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
                 topicStatsStream.writePair("msgBacklog", subscription.getNumberOfEntriesInBacklog(false));
                 topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate());
                 topicStatsStream.writePair("msgRateOut", subMsgRateOut);
+                topicStatsStream.writePair("messageAckRate", subMsgAckRate);
                 topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
                 topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
                 topicStatsStream.writePair("type", subscription.getTypeString());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index d0498a69ca3..02cb9b3aab4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1040,6 +1040,7 @@ public class PersistentSubscription implements Subscription {
                 subStats.bytesOutCounter += consumerStats.bytesOutCounter;
                 subStats.msgOutCounter += consumerStats.msgOutCounter;
                 subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
+                subStats.messageAckRate += consumerStats.messageAckRate;
                 subStats.chunkedMessageRate += consumerStats.chunkedMessageRate;
                 subStats.unackedMessages += consumerStats.unackedMessages;
                 subStats.lastConsumedTimestamp =
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a583aecc1f2..a5cffa22939 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1773,6 +1773,7 @@ public class PersistentTopic extends AbstractTopic
             double subMsgRateOut = 0;
             double subMsgThroughputOut = 0;
             double subMsgRateRedeliver = 0;
+            double subMsgAckRate = 0;
 
             // Start subscription name & consumers
             try {
@@ -1786,6 +1787,7 @@ public class PersistentTopic extends AbstractTopic
 
                     ConsumerStatsImpl consumerStats = consumer.getStats();
                     subMsgRateOut += consumerStats.msgRateOut;
+                    subMsgAckRate += consumerStats.messageAckRate;
                     subMsgThroughputOut += consumerStats.msgThroughputOut;
                     subMsgRateRedeliver += consumerStats.msgRateRedeliver;
 
@@ -1800,6 +1802,7 @@ public class PersistentTopic extends AbstractTopic
                         subscription.getNumberOfEntriesInBacklog(true));
                 topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate());
                 topicStatsStream.writePair("msgRateOut", subMsgRateOut);
+                topicStatsStream.writePair("messageAckRate", subMsgAckRate);
                 topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
                 topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
                 topicStatsStream.writePair("numberOfEntriesSinceFirstNotAckedMessage",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
index 8b6bf7d5c96..0a4bd317df5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
@@ -28,6 +28,8 @@ public class AggregatedConsumerStats {
 
     public double msgRateOut;
 
+    public double msgAckRate;
+
     public double msgThroughputOut;
 
     public long availablePermits;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 8bacd0f582d..5610dbab218 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -33,6 +33,7 @@ public class AggregatedNamespaceStats {
     public double throughputIn;
     public double throughputOut;
 
+    public long messageAckRate;
     public long bytesInCounter;
     public long msgInCounter;
     public long bytesOutCounter;
@@ -122,6 +123,7 @@ public class AggregatedNamespaceStats {
                 consumerStats.blockedSubscriptionOnUnackedMsgs = v.blockedSubscriptionOnUnackedMsgs;
                 consumerStats.msgRateRedeliver += v.msgRateRedeliver;
                 consumerStats.unackedMessages += v.unackedMessages;
+                messageAckRate += v.msgAckRate;
             });
         });
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
index fb74daf419f..ffb2f237e03 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -36,6 +36,8 @@ public class AggregatedSubscriptionStats {
 
     public double msgRateOut;
 
+    public double messageAckRate;
+
     public double msgThroughputOut;
 
     public long msgDelayed;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 9df9ad4c7c6..2a21a0b402a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -220,6 +220,7 @@ public class NamespaceStatsAggregator {
                     consumerStats.unackedMessages = conStats.unackedMessages;
                     consumerStats.msgRateRedeliver = conStats.msgRateRedeliver;
                     consumerStats.msgRateOut = conStats.msgRateOut;
+                    consumerStats.msgAckRate = conStats.messageAckRate;
                     consumerStats.msgThroughputOut = conStats.msgThroughputOut;
                     consumerStats.bytesOutCounter = conStats.bytesOutCounter;
                     consumerStats.msgOutCounter = conStats.msgOutCounter;
@@ -304,6 +305,7 @@ public class NamespaceStatsAggregator {
         metric(stream, cluster, namespace, "pulsar_rate_out", stats.rateOut);
         metric(stream, cluster, namespace, "pulsar_throughput_in", stats.throughputIn);
         metric(stream, cluster, namespace, "pulsar_throughput_out", stats.throughputOut);
+        metric(stream, cluster, namespace, "pulsar_consumer_msg_ack_rate", stats.messageAckRate);
 
         metric(stream, cluster, namespace, "pulsar_in_bytes_total", stats.bytesInCounter);
         metric(stream, cluster, namespace, "pulsar_in_messages_total", stats.msgInCounter);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index bfa427e5e3a..5f7c146cd74 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -242,6 +242,8 @@ class TopicStats {
                     subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out",
                     subsStats.msgRateOut, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_ack_rate",
+                    subsStats.messageAckRate, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out",
                     subsStats.msgThroughputOut, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total",
@@ -276,6 +278,11 @@ class TopicStats {
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
                         "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut,
                         splitTopicAndPartitionIndexLabel);
+
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
+                        "pulsar_consumer_msg_ack_rate", consumerStats.msgAckRate,
+                        splitTopicAndPartitionIndexLabel);
+
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
                         "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut,
                         splitTopicAndPartitionIndexLabel);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 2f702ab89f6..ddfd3ee9f5f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -20,14 +20,22 @@ package org.apache.pulsar.broker.stats;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
@@ -38,9 +46,13 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 @Slf4j
@@ -183,6 +195,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase {
                 "msgThroughputOut",
                 "bytesOutCounter",
                 "msgOutCounter",
+                "messageAckRate",
                 "msgRateRedeliver",
                 "chunkedMessageRate",
                 "consumerName",
@@ -220,4 +233,72 @@ public class ConsumerStatsTest extends ProducerConsumerBase {
 
         consumer.close();
     }
+
+
+    @Test
+    public void testPersistentTopicMessageAckRateMetricTopicLevel() throws Exception {
+        String topicName = "persistent://public/default/msg_ack_rate" + UUID.randomUUID();
+        testMessageAckRateMetric(topicName, true);
+    }
+
+    @Test
+    public void testPersistentTopicMessageAckRateMetricNamespaceLevel() throws Exception {
+        String topicName = "persistent://public/default/msg_ack_rate" + UUID.randomUUID();
+        testMessageAckRateMetric(topicName, false);
+    }
+
+    private void testMessageAckRateMetric(String topicName, boolean exposeTopicLevelMetrics)
+            throws Exception {
+        final int messages = 100;
+        String subName = "test_sub";
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+                .subscriptionName(subName).isAckReceiptEnabled(true).subscribe();
+
+        String namespace = TopicName.get(topicName).getNamespace();
+
+        for (int i = 0; i < messages; i++) {
+            producer.send(UUID.randomUUID().toString());
+        }
+
+        for (int i = 0; i < messages; i++) {
+            Message<String> message = consumer.receive(20, TimeUnit.SECONDS);
+            if (message == null) {
+                break;
+            }
+
+            consumer.acknowledge(message);
+        }
+
+        Topic topic = pulsar.getBrokerService().getTopic(topicName, false).get().get();
+        Subscription subscription = topic.getSubscription(subName);
+        List<org.apache.pulsar.broker.service.Consumer> consumers = subscription.getConsumers();
+        Assert.assertEquals(consumers.size(), 1);
+        org.apache.pulsar.broker.service.Consumer consumer1 = consumers.get(0);
+        consumer1.updateRates();
+
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, exposeTopicLevelMetrics, true, true, output);
+        String metricStr = output.toString();
+
+        Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricStr);
+        Collection<PrometheusMetricsTest.Metric> metrics = metricsMap.get("pulsar_consumer_msg_ack_rate");
+        Assert.assertTrue(metrics.size() > 0);
+
+        int num = 0;
+        for (PrometheusMetricsTest.Metric metric : metrics) {
+            if (exposeTopicLevelMetrics && metric.tags.get("subscription").equals(subName)) {
+                num++;
+                Assert.assertTrue(metric.value > 0);
+            } else if (!exposeTopicLevelMetrics && metric.tags.get("namespace").equals(namespace)) {
+                num++;
+                Assert.assertTrue(metric.value > 0);
+            }
+        }
+
+        Assert.assertTrue(num > 0);
+    }
 }
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
index 7204af616e2..4c165083ab1 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
@@ -40,7 +40,12 @@ public interface ConsumerStats {
     /** Total rate of messages redelivered by this consumer (msg/s). */
     double getMsgRateRedeliver();
 
-    /** Total chunked messages dispatched. */
+    /**
+     * Total rate of message ack(msg/s).
+     */
+    double getMessageAckRate();
+
+    /** The total rate of chunked messages delivered to this consumer. */
     double getChunkedMessageRate();
 
     /** Name of the consumer. */
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index 2ce38aafb34..514f0fd6d1d 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -40,6 +40,11 @@ public interface SubscriptionStats {
     /** Total rate of messages redelivered on this subscription (msg/s). */
     double getMsgRateRedeliver();
 
+    /**
+     * Total rate of message ack(msg/s).
+     */
+    double getMessageAckRate();
+
     /** Chunked message dispatch rate. */
     int getChunkedMessageRate();
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
index 47f614889ca..1e31e053051 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
@@ -45,7 +45,12 @@ public class ConsumerStatsImpl implements ConsumerStats {
     /** Total rate of messages redelivered by this consumer (msg/s). */
     public double msgRateRedeliver;
 
-    /** Total chunked messages dispatched. */
+    /**
+     * Total rate of message ack(msg/s).
+     */
+    public double messageAckRate;
+
+    /** The total rate of chunked messages delivered to this consumer. */
     public double chunkedMessageRate;
 
     /** Name of the consumer. */
@@ -103,6 +108,7 @@ public class ConsumerStatsImpl implements ConsumerStats {
     public ConsumerStatsImpl add(ConsumerStatsImpl stats) {
         Objects.requireNonNull(stats);
         this.msgRateOut += stats.msgRateOut;
+        this.messageAckRate += stats.messageAckRate;
         this.msgThroughputOut += stats.msgThroughputOut;
         this.bytesOutCounter += stats.bytesOutCounter;
         this.msgOutCounter += stats.msgOutCounter;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
index 78781ac32b4..2a124b9e574 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
@@ -46,6 +46,11 @@ public class SubscriptionStatsImpl implements SubscriptionStats {
     /** Total rate of messages redelivered on this subscription (msg/s). */
     public double msgRateRedeliver;
 
+    /**
+     * Total rate of message ack(msg/s).
+     */
+    public double messageAckRate;
+
     /** Chunked message dispatch rate. */
     public int chunkedMessageRate;
 
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index cabd0996337..39912eacde5 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -695,6 +695,9 @@ message CommandConsumerStatsResponse {
 
         /// Number of messages in the subscription backlog
         optional uint64 msgBacklog                  = 15;
+
+        /// Total rate of messages ack. msg/s
+        optional double messageAckRate              = 16;
 }
 
 message CommandGetLastMessageId {


[pulsar] 07/10: [fix][client] Remove producer when close producer command is received (#16028)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 48923ce1811ea4bbba1d26798b036c6c052d25f1
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Mon Jun 13 15:16:39 2022 +0800

    [fix][client] Remove producer when close producer command is received (#16028)
    
    (cherry picked from commit 5ef895af7d8dec851167e56cdf3e8bec11080f8d)
---
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  5 +++--
 .../apache/pulsar/client/impl/ClientCnxTest.java   | 24 +++++++++++++++++++---
 2 files changed, 24 insertions(+), 5 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 2bad4e7ef2d..750c42a620b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -110,7 +110,8 @@ public class ClientCnx extends PulsarHandler {
     // LookupRequests that waiting in client side.
     private final Queue<Pair<Long, Pair<ByteBuf, TimedCompletableFuture<LookupDataResult>>>> waitingLookupRequests;
 
-    private final ConcurrentLongHashMap<ProducerImpl<?>> producers =
+    @VisibleForTesting
+    final ConcurrentLongHashMap<ProducerImpl<?>> producers =
             ConcurrentLongHashMap.<ProducerImpl<?>>newBuilder()
                     .expectedItems(16)
                     .concurrencyLevel(1)
@@ -711,7 +712,7 @@ public class ClientCnx extends PulsarHandler {
     protected void handleCloseProducer(CommandCloseProducer closeProducer) {
         log.info("[{}] Broker notification of Closed producer: {}", remoteAddress, closeProducer.getProducerId());
         final long producerId = closeProducer.getProducerId();
-        ProducerImpl<?> producer = producers.get(producerId);
+        ProducerImpl<?> producer = producers.remove(producerId);
         if (producer != null) {
             producer.connectionClosed(this);
         } else {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index a3a00b1b70e..6ce4afecd02 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.BrokerMetadataException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
+import org.apache.pulsar.common.api.proto.CommandCloseProducer;
 import org.apache.pulsar.common.api.proto.CommandError;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.protocol.Commands;
@@ -156,7 +157,7 @@ public class ClientCnxTest {
 
     @Test
     public void testHandleCloseConsumer() {
-        ThreadFactory threadFactory = new DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState");
+        ThreadFactory threadFactory = new DefaultThreadFactory("testHandleCloseConsumer");
         EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory);
         ClientConfigurationData conf = new ClientConfigurationData();
         ClientCnx cnx = new ClientCnx(conf, eventLoop);
@@ -165,11 +166,28 @@ public class ClientCnxTest {
         cnx.registerConsumer(consumerId, mock(ConsumerImpl.class));
         assertEquals(cnx.consumers.size(), 1);
 
-        CommandCloseConsumer closeConsumer = new CommandCloseConsumer()
-                .setConsumerId(1);
+        CommandCloseConsumer closeConsumer = new CommandCloseConsumer().setConsumerId(consumerId);
         cnx.handleCloseConsumer(closeConsumer);
         assertEquals(cnx.consumers.size(), 0);
 
         eventLoop.shutdownGracefully();
     }
+
+    @Test
+    public void testHandleCloseProducer() {
+        ThreadFactory threadFactory = new DefaultThreadFactory("testHandleCloseProducer");
+        EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory);
+        ClientConfigurationData conf = new ClientConfigurationData();
+        ClientCnx cnx = new ClientCnx(conf, eventLoop);
+
+        long producerId = 1;
+        cnx.registerProducer(producerId, mock(ProducerImpl.class));
+        assertEquals(cnx.producers.size(), 1);
+
+        CommandCloseProducer closeProducerCmd = new CommandCloseProducer().setProducerId(producerId);
+        cnx.handleCloseProducer(closeProducerCmd);
+        assertEquals(cnx.producers.size(), 0);
+
+        eventLoop.shutdownGracefully();
+    }
 }


[pulsar] 04/10: [fix][client] Remove consumer when close consumer command is received (#15761)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e80d8a05b553cd3291b5f8fac7785cd485f108fb
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Mon Jun 13 09:01:16 2022 +0800

    [fix][client] Remove consumer when close consumer command is received (#15761)
    
    (cherry picked from commit 5246c8e1cc44b96db6ba684e0ce64914cfd05a61)
---
 .../org/apache/pulsar/client/impl/ClientCnx.java     |  6 ++++--
 .../org/apache/pulsar/client/impl/ClientCnxTest.java | 20 ++++++++++++++++++++
 2 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index bb686553d78..2bad4e7ef2d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
@@ -114,7 +115,8 @@ public class ClientCnx extends PulsarHandler {
                     .expectedItems(16)
                     .concurrencyLevel(1)
                     .build();
-    private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers =
+    @VisibleForTesting
+    final ConcurrentLongHashMap<ConsumerImpl<?>> consumers =
             ConcurrentLongHashMap.<ConsumerImpl<?>>newBuilder()
                     .expectedItems(16)
                     .concurrencyLevel(1)
@@ -721,7 +723,7 @@ public class ClientCnx extends PulsarHandler {
     protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
         log.info("[{}] Broker notification of Closed consumer: {}", remoteAddress, closeConsumer.getConsumerId());
         final long consumerId = closeConsumer.getConsumerId();
-        ConsumerImpl<?> consumer = consumers.get(consumerId);
+        ConsumerImpl<?> consumer = consumers.remove(consumerId);
         if (consumer != null) {
             consumer.connectionClosed(this);
         } else {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index 558c0bfa13f..a3a00b1b70e 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ThreadFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.BrokerMetadataException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
 import org.apache.pulsar.common.api.proto.CommandError;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.protocol.Commands;
@@ -152,4 +153,23 @@ public class ClientCnxTest {
 
         eventLoop.shutdownGracefully();
     }
+
+    @Test
+    public void testHandleCloseConsumer() {
+        ThreadFactory threadFactory = new DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState");
+        EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory);
+        ClientConfigurationData conf = new ClientConfigurationData();
+        ClientCnx cnx = new ClientCnx(conf, eventLoop);
+
+        long consumerId = 1;
+        cnx.registerConsumer(consumerId, mock(ConsumerImpl.class));
+        assertEquals(cnx.consumers.size(), 1);
+
+        CommandCloseConsumer closeConsumer = new CommandCloseConsumer()
+                .setConsumerId(1);
+        cnx.handleCloseConsumer(closeConsumer);
+        assertEquals(cnx.consumers.size(), 0);
+
+        eventLoop.shutdownGracefully();
+    }
 }