You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/12 13:44:51 UTC

[pulsar] branch branch-2.5 updated (c03e303 -> 2dae2b5)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a change to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from c03e303  update version 2.5.1
     new 7a71328  [functions] Fix typos in exceptions related to functions (#6910)
     new 450cc15  use originalAuthMethod on originalAuthChecker (#6870)
     new 9c5644b  Expose pulsar_out_bytes_total and pulsar_out_messages_total for namespace/subscription/consumer. (#6918)
     new 0dfb482  [function] Function endpoint admin/v3/functions/{tenant}/{namespace} always returns 404 (#6767)
     new c6bbfc6  fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize (#6862)
     new e7b8e3d  Pulsar SQL Support Avro Schema `ByteBuffer` Type (#6925)
     new 2b1a1ed  Close producer when the topic does not exists. (#6879)
     new be913ed  [Docs] Fix the Create subscribtion swagger of PersistentTopic (#6776)
     new 1ccecf1  [C++] Auto update topic partitions (#6732)
     new 3421e75  Fix message id error if messages were sent to a partitioned topic (#6938)
     new 9d9af98  add keystore doc (#6922)
     new 71e7878  Fix pulsar client admin thread number explode (#6940)
     new 926d687  [ISSUE 6563][Broker] Invalidate managed ledgers zookeeper cache instead of reloading on watcher triggered (#6659)
     new 4095bdf  [C++] Subscription InitialPosition is not correctly set on regex consumers (#6810)
     new f428d84  [Issue 6887][pulsar-broker] ttlDurationDefaultInSeconds not applying (#6920)
     new aed2c17  fix brokerPublisherThrottlingTickTimeMillis in broker.conf (#6877)
     new 2dae2b5  fix autoSkipConf (#6863)

The 17 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 conf/broker.conf                                   |   2 +-
 conf/client.conf                                   |   2 +-
 conf/standalone.conf                               |   2 +-
 .../apache/pulsar/broker/admin/AdminResource.java  |   7 +-
 .../broker/admin/impl/PersistentTopicsBase.java    |   1 -
 .../pulsar/broker/admin/v1/PersistentTopics.java   |   5 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  13 +-
 .../broker/cache/LocalZooKeeperCacheService.java   |   8 +-
 .../pulsar/broker/service/BrokerService.java       |   2 +-
 .../org/apache/pulsar/broker/service/Consumer.java |   9 +
 .../apache/pulsar/broker/service/ServerCnx.java    |  22 +-
 .../nonpersistent/NonPersistentSubscription.java   |   2 +
 .../service/nonpersistent/NonPersistentTopic.java  |   2 +
 .../service/persistent/PersistentSubscription.java |   2 +
 .../broker/service/persistent/PersistentTopic.java |   2 +
 .../stats/prometheus/AggregatedConsumerStats.java  |   4 +
 .../stats/prometheus/AggregatedNamespaceStats.java |  10 +
 .../prometheus/AggregatedSubscriptionStats.java    |   4 +
 .../stats/prometheus/NamespaceStatsAggregator.java |  77 +++---
 .../pulsar/broker/stats/prometheus/TopicStats.java |   8 +
 .../apache/pulsar/broker/admin/AdminApiTest.java   |   7 +
 .../pulsar/broker/admin/PersistentTopicsTest.java  |   6 +-
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 172 ++++++++++++-
 .../client/api/ConsumerBatchReceiveTest.java       | 124 +++++++--
 .../pulsar/client/impl/TopicDoesNotExistsTest.java |  70 +++++
 .../admin/internal/http/AsyncHttpConnector.java    |   1 +
 .../internal/http/AsyncHttpConnectorProvider.java  |   6 +-
 .../pulsar/client/api/BatchReceivePolicy.java      |   2 +-
 .../include/pulsar/ClientConfiguration.h           |  16 ++
 pulsar-client-cpp/lib/ClientConfiguration.cc       |   9 +
 pulsar-client-cpp/lib/ClientConfigurationImpl.h    |   5 +-
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   |   6 +-
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc   | 152 ++++++++---
 pulsar-client-cpp/lib/PartitionedConsumerImpl.h    |  13 +
 pulsar-client-cpp/lib/PartitionedProducerImpl.cc   | 125 +++++++--
 pulsar-client-cpp/lib/PartitionedProducerImpl.h    |  17 ++
 pulsar-client-cpp/lib/ProducerImpl.cc              |   8 +-
 pulsar-client-cpp/lib/ProducerImpl.h               |   3 +-
 pulsar-client-cpp/tests/BasicEndToEndTest.cc       |  90 +++++++
 pulsar-client-cpp/tests/HttpHelper.cc              |   6 +-
 pulsar-client-cpp/tests/HttpHelper.h               |   1 +
 pulsar-client-cpp/tests/PartitionsUpdateTest.cc    | 176 +++++++++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    |  29 ++-
 .../client/impl/MultiTopicsConsumerImpl.java       |   2 +-
 .../apache/pulsar/client/impl/ProducerImpl.java    |  15 +-
 .../pulsar/common/policies/data/ConsumerStats.java |   8 +
 .../common/policies/data/SubscriptionStats.java    |  10 +
 .../pulsar/common/policies/data/TopicStats.java    |  19 +-
 .../functions/utils/FunctionConfigUtils.java       |  16 +-
 .../pulsar/functions/utils/SinkConfigUtils.java    |   4 +-
 .../pulsar/functions/utils/SourceConfigUtils.java  |   3 +-
 .../functions/utils/FunctionConfigUtilsTest.java   |   8 +-
 .../functions/utils/SinkConfigUtilsTest.java       |   8 +-
 .../functions/utils/SourceConfigUtilsTest.java     |   4 +-
 .../worker/rest/api/v3/FunctionsApiV3Resource.java |   8 +
 .../pulsar/sql/presto/PulsarRecordCursor.java      |  30 ++-
 .../pulsar/sql/presto/TestPulsarRecordCursor.java  |  51 ++++
 .../pulsar/zookeeper/ZooKeeperChildrenCache.java   |   4 -
 .../zookeeper/ZooKeeperManagedLedgerCache.java     |  71 +++++
 site2/docs/reference-metrics.md                    |   2 +
 site2/docs/security-tls-keystore.md                | 286 +++++++++++++++++++++
 site2/website/sidebars.json                        |   1 +
 62 files changed, 1606 insertions(+), 172 deletions(-)
 create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
 create mode 100644 pulsar-client-cpp/tests/PartitionsUpdateTest.cc
 create mode 100644 pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperManagedLedgerCache.java
 create mode 100644 site2/docs/security-tls-keystore.md


[pulsar] 04/17: [function] Function endpoint admin/v3/functions/{tenant}/{namespace} always returns 404 (#6767)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0dfb4822b3298415c73a60add8da61df638592b9
Author: Chris Bartholomew <c_...@yahoo.com>
AuthorDate: Fri May 8 22:16:57 2020 -0400

    [function] Function endpoint admin/v3/functions/{tenant}/{namespace} always returns 404 (#6767)
    
    Fix #6839
    ### Motivation
    
    The V3 endpoint that returns a list of all functions in a namespace always returns 404. The V2 version of the endpoint returns the actual list of functions in the namespace. It looks like during the switch from V2 to V3, the implementation for the endpoint was missed. This endpoint is part of the current API [documentation](https://pulsar.apache.org/functions-rest-api/?version=2.5.0#operation/listFunctions), so I don't think it was removed intentionally.
    
    This endpoint is also used by the pulsar-admin command, so that always returns 404:
    
    ```
    pulsar-admin functions list --tenant chris-kafkaesque-io --namespace local-useast2-aws
    HTTP 404 Not Found
    
    Reason: HTTP 404 Not Found
    ```
    
    ### Modifications
    
    I have added the endpoint to `FunctionsApiV3Resource.java`. It is essentially a clone of the V2 version.
    
    ### Verifying this change
    
    This is a pretty small change. I have confirmed that the V3 version of the endpoint now returns the same list of functions as the V2 version. I have also confirmed that the pulsar-admin command now works:
    
    ```
    bin/pulsar-admin functions list --tenant chris-kafkaesque-io --namespace TTL
    23:45:10.763 [main] INFO  org.apache.pulsar.common.util.SecurityUtility - Found and Instantiated Bouncy Castle provider in classpath BC
    "exclaim"
    "pulsar-functions-0.1"
    ```
    
    (cherry picked from commit 714a776c124f5afff86f6df87675f3452e6dd207)
---
 .../functions/worker/rest/api/v3/FunctionsApiV3Resource.java      | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
index 203ae62..ee6b4f6 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
@@ -109,6 +109,14 @@ public class FunctionsApiV3Resource extends FunctionApiResource {
     }
 
     @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/{tenant}/{namespace}")
+    public List<String> listSources(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String namespace) {
+        return functions.listFunctions(tenant, namespace, clientAppId(), clientAuthData());
+    }
+
+    @GET
     @ApiOperation(
             value = "Displays the status of a Pulsar Function instance",
             response = FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData.class


[pulsar] 14/17: [C++] Subscription InitialPosition is not correctly set on regex consumers (#6810)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4095bdf556494684c890813af3ab8806f2ac9405
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue May 12 00:28:40 2020 -0700

    [C++] Subscription InitialPosition is not correctly set on regex consumers (#6810)
    
    ### Motivation
    
    The subscription `InitialPosition` is not currently set when using the multi-topic or regex consumers in C++/Python.
    
    That makes that if you try to start from `MessageId::earliest`, it would be ignored.
    
    * [C++] Subscription InitialPosition is not correctly set on regex consumers
    
    * fix test fail for topic name
    
    * fix `make format`
    
    Co-authored-by: Jia Zhai <zh...@apache.org>(cherry picked from commit 0b803a83fbbe873020cdf18f8259ea92584492ec)
---
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc |  6 +---
 pulsar-client-cpp/tests/BasicEndToEndTest.cc     | 36 ++++++++++++++++++++++++
 2 files changed, 37 insertions(+), 5 deletions(-)

diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index e837adf..34a0c90 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -157,13 +157,9 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
     }
 
     std::shared_ptr<ConsumerImpl> consumer;
-    ConsumerConfiguration config;
+    ConsumerConfiguration config = conf_.clone();
     ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get();
 
-    // all the consumers should have same name.
-    config.setConsumerName(conf_.getConsumerName());
-    config.setConsumerType(conf_.getConsumerType());
-    config.setBrokerConsumerStatsCacheTimeInMs(conf_.getBrokerConsumerStatsCacheTimeInMs());
     config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, shared_from_this(),
                                         std::placeholders::_1, std::placeholders::_2));
 
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 5277ca4..e99f408 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -3026,6 +3026,42 @@ TEST(BasicEndToEndTest, testRegexTopicsWithMessageListener) {
     }
 }
 
+TEST(BasicEndToEndTest, testRegexTopicsWithInitialPosition) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+
+    std::string topicName =
+        "persistent://public/default/test-regex-initial-position-" + std::to_string(time(NULL));
+
+    Producer producer;
+    Result result = client.createProducer(topicName, producer);
+    ASSERT_EQ(ResultOk, result);
+
+    for (int i = 0; i < 10; i++) {
+        producer.send(MessageBuilder().setContent("test-" + std::to_string(i)).build());
+    }
+
+    std::string subsName = "testRegexTopicsWithMessageListener-sub";
+    std::string pattern = topicName + ".*";
+
+    // Subscription gets created after messages are produced but it will start from the beginning of the topic
+    ConsumerConfiguration consumerConf;
+    consumerConf.setSubscriptionInitialPosition(InitialPositionEarliest);
+
+    Consumer consumer;
+    result = client.subscribeWithRegex(pattern, subsName, consumerConf, consumer);
+    ASSERT_EQ(ResultOk, result);
+    ASSERT_EQ(consumer.getSubscriptionName(), subsName);
+
+    for (int i = 0; i < 10; i++) {
+        Message msg;
+        Result res = consumer.receive(msg);
+        ASSERT_EQ(ResultOk, result);
+    }
+
+    client.close();
+}
+
 TEST(BasicEndToEndTest, testPartitionedTopicWithOnePartition) {
     ClientConfiguration config;
     Client client(lookupUrl);


[pulsar] 16/17: fix brokerPublisherThrottlingTickTimeMillis in broker.conf (#6877)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit aed2c17f3d8c70d5d66a3e1ff4c442fa0e4cde7e
Author: liudezhi <33...@users.noreply.github.com>
AuthorDate: Tue May 12 20:53:43 2020 +0800

    fix brokerPublisherThrottlingTickTimeMillis in broker.conf (#6877)
    
    Master Issue: #6876
    
    Motivation
    the brokerPublisherThrPottlingTickTimeMillis config spelling mistake
    
    Modifications
    broker.conf
    
      # Tick time to schedule task that checks broker publish rate limiting across all topics
    # Reducing to lower value can give more accuracy while throttling publish but
    # it uses more CPU to perform frequent check. (Disable publish throttling with value 0)
    brokerPublisherThrottlingTickTimeMillis=50
    
    * fix brokerPublisherThrottlingTickTimeMillis in broker.conf
    
    * fix brokerPublisherThrottlingTickTimeMillis in standalone.conf
    
    Co-authored-by: dezhiliu <de...@tencent.com>(cherry picked from commit 65dc9d99924e5ec00952a81c7091195304baa58e)
---
 conf/broker.conf     | 2 +-
 conf/standalone.conf | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index d0c942a..62107cb 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -194,7 +194,7 @@ topicPublisherThrottlingTickTimeMillis=10
 # Tick time to schedule task that checks broker publish rate limiting across all topics
 # Reducing to lower value can give more accuracy while throttling publish but
 # it uses more CPU to perform frequent check. (Disable publish throttling with value 0)
-brokerPublisherThrPottlingTickTimeMillis=50
+brokerPublisherThrottlingTickTimeMillis=50
 
 # Max Rate(in 1 seconds) of Message allowed to publish for a broker if broker publish rate limiting enabled
 # (Disable message rate limit with value 0)
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 3d6c12e..6619e90 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -151,7 +151,7 @@ topicPublisherThrottlingTickTimeMillis=2
 # Tick time to schedule task that checks broker publish rate limiting across all topics
 # Reducing to lower value can give more accuracy while throttling publish but
 # it uses more CPU to perform frequent check. (Disable publish throttling with value 0)
-brokerPublisherThrPottlingTickTimeMillis=50
+brokerPublisherThrottlingTickTimeMillis=50
 
 # Max Rate(in 1 seconds) of Message allowed to publish for a broker if broker publish rate limiting enabled
 # (Disable message rate limit with value 0)


[pulsar] 10/17: Fix message id error if messages were sent to a partitioned topic (#6938)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3421e75e0035063a8eeb2c927f3f23da64c05dff
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue May 12 12:12:43 2020 +0800

    Fix message id error if messages were sent to a partitioned topic (#6938)
    
    ### Motivation
    
    If messages were sent to a partitioned topic, the message id's `partition` field was always -1 because SendReceipt command only contains ledger id and entry id.
    
    ### Modifications
    
    - Add a `partition` field to `ProducerImpl` and set the `MessageId`'s `partition` field with it in `ackReceived` method later.
    - Add a test to check message id in send callback if messages were sent to a partitioned topic.
    
    (cherry picked from commit 15cb920b394874d37039df5e7665092651c28fae)
---
 pulsar-client-cpp/lib/PartitionedProducerImpl.cc |  2 +-
 pulsar-client-cpp/lib/ProducerImpl.cc            |  8 +++-
 pulsar-client-cpp/lib/ProducerImpl.h             |  3 +-
 pulsar-client-cpp/tests/BasicEndToEndTest.cc     | 54 ++++++++++++++++++++++++
 4 files changed, 63 insertions(+), 4 deletions(-)

diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index 0461ee3..628afbc 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -90,7 +90,7 @@ unsigned int PartitionedProducerImpl::getNumPartitionsWithLock() const {
 ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition) const {
     using namespace std::placeholders;
     std::string topicPartitionName = topicName_->getTopicPartitionName(partition);
-    auto producer = std::make_shared<ProducerImpl>(client_, topicPartitionName, conf_);
+    auto producer = std::make_shared<ProducerImpl>(client_, topicPartitionName, conf_, partition);
     producer->getProducerCreatedFuture().addListener(
         std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated,
                   const_cast<PartitionedProducerImpl*>(this)->shared_from_this(), _1, _2, partition));
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index 0095dc8..a2547cd 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -39,13 +39,15 @@ OpSendMsg::OpSendMsg(uint64_t producerId, uint64_t sequenceId, const Message& ms
       sequenceId_(sequenceId),
       timeout_(TimeUtils::now() + milliseconds(conf.getSendTimeout())) {}
 
-ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, const ProducerConfiguration& conf)
+ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, const ProducerConfiguration& conf,
+                           int32_t partition)
     : HandlerBase(
           client, topic,
           Backoff(milliseconds(100), seconds(60), milliseconds(std::max(100, conf.getSendTimeout() - 100)))),
       conf_(conf),
       executor_(client->getIOExecutorProvider()->get()),
       pendingMessagesQueue_(conf_.getMaxPendingMessages()),
+      partition_(partition),
       producerName_(conf_.getProducerName()),
       producerStr_("[" + topic_ + ", " + producerName_ + "] "),
       producerId_(client->newProducerId()),
@@ -627,7 +629,9 @@ bool ProducerImpl::removeCorruptMessage(uint64_t sequenceId) {
     }
 }
 
-bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& messageId) {
+bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
+    MessageId messageId(partition_, rawMessageId.ledgerId(), rawMessageId.entryId(),
+                        rawMessageId.batchIndex());
     OpSendMsg op;
     Lock lock(mutex_);
     bool havePendingAck = pendingMessagesQueue_.peek(op);
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h
index 80927b1..e4f35d4 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -63,7 +63,7 @@ class ProducerImpl : public HandlerBase,
                      public ProducerImplBase {
    public:
     ProducerImpl(ClientImplPtr client, const std::string& topic,
-                 const ProducerConfiguration& producerConfiguration);
+                 const ProducerConfiguration& producerConfiguration, int32_t partition = -1);
     ~ProducerImpl();
 
     int keepMaxMessageSize_;
@@ -150,6 +150,7 @@ class ProducerImpl : public HandlerBase,
 
     MessageQueue pendingMessagesQueue_;
 
+    int32_t partition_;  // -1 if topic is non-partitioned
     std::string producerName_;
     std::string producerStr_;
     uint64_t producerId_;
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 99b7f87..5277ca4 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -3207,11 +3207,65 @@ TEST(BasicEndToEndTest, testSendCallback) {
         Message msg;
         ASSERT_EQ(ResultOk, consumer.receive(msg));
         receivedIdSet.emplace(msg.getMessageId());
+        consumer.acknowledge(msg);
+    }
+
+    latch.wait();
+    ASSERT_EQ(sentIdSet, receivedIdSet);
+
+    consumer.close();
+    producer.close();
+
+    const std::string partitionedTopicName = topicName + "-" + std::to_string(time(nullptr));
+    const std::string url = adminUrl + "admin/v2/persistent/" +
+                            partitionedTopicName.substr(partitionedTopicName.find("://") + 3) + "/partitions";
+    const int numPartitions = 3;
+
+    int res = makePutRequest(url, std::to_string(numPartitions));
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    std::this_thread::sleep_for(std::chrono::seconds(2));
+
+    ProducerConfiguration producerConfig;
+    producerConfig.setBatchingEnabled(false);
+    producerConfig.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+    ASSERT_EQ(ResultOk, client.createProducer(partitionedTopicName, producerConfig, producer));
+    ASSERT_EQ(ResultOk, client.subscribe(partitionedTopicName, "SubscriptionName", consumer));
+
+    sentIdSet.clear();
+    receivedIdSet.clear();
+
+    const int numMessages = numPartitions * 2;
+    latch = Latch(numMessages);
+    for (int i = 0; i < numMessages; i++) {
+        const auto msg = MessageBuilder().setContent("a").build();
+        producer.sendAsync(msg, [&sentIdSet, i, &latch](Result result, const MessageId &id) {
+            ASSERT_EQ(ResultOk, result);
+            sentIdSet.emplace(id);
+            latch.countdown();
+        });
+    }
+
+    for (int i = 0; i < numMessages; i++) {
+        Message msg;
+        ASSERT_EQ(ResultOk, consumer.receive(msg));
+        receivedIdSet.emplace(msg.getMessageId());
+        consumer.acknowledge(msg);
     }
 
     latch.wait();
     ASSERT_EQ(sentIdSet, receivedIdSet);
 
+    std::set<int> partitionIndexSet;
+    for (const auto &id : sentIdSet) {
+        partitionIndexSet.emplace(id.partition());
+    }
+    std::set<int> expectedPartitionIndexSet;
+    for (int i = 0; i < numPartitions; i++) {
+        expectedPartitionIndexSet.emplace(i);
+    }
+    ASSERT_EQ(sentIdSet, receivedIdSet);
+
     consumer.close();
     producer.close();
     client.close();


[pulsar] 15/17: [Issue 6887][pulsar-broker] ttlDurationDefaultInSeconds not applying (#6920)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f428d84b2448ac83607124a7794ba2bdc25028fb
Author: Jiechuan Chen <65...@qq.com>
AuthorDate: Tue May 12 18:53:06 2020 +0800

    [Issue 6887][pulsar-broker] ttlDurationDefaultInSeconds not applying (#6920)
    
    Fixes #6887
    
    ### Motivation
    The ttl for namespaces should be retrieved from broker's configuration if it is not configured at namespace policies. However, the current code only returns the value stored in namespace policies directly without judging if it is configured or not.
    
    ### Modifications
    
    Added a condition to test if ttl is configured at namespace policies. If not, retrieve value stored in broker's configuration and return it as output.
    
    * fixes "ttlDurationDefaultInSeconds is not applied"
    
    * testcase added for getTtlDurationDefaultInSeconds
    (cherry picked from commit a24203cc140a97940bca99ca7a761c7e5e78c49c)
---
 .../main/java/org/apache/pulsar/broker/admin/AdminResource.java    | 4 ++++
 .../src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java | 7 +++++++
 2 files changed, 11 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index c49acbe..f18d964 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -478,6 +478,10 @@ public abstract class AdminResource extends PulsarWebResource {
         if (policies.clusterSubscribeRate.isEmpty()) {
             policies.clusterSubscribeRate.put(cluster, subscribeRate());
         }
+
+        if (policies.message_ttl_in_seconds <= 0) {
+            policies.message_ttl_in_seconds = config.getTtlDurationDefaultInSeconds();
+        }
     }
 
     protected BacklogQuota namespaceBacklogQuota(String namespace, String namespacePath) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 0ab6311..82808b1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -2295,4 +2295,11 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         TopicStats topicStats = admin.topics().getStats(topic);
         assertEquals(topicStats.backlogSize, 0);
     }
+
+    @Test
+    public void testGetTtlDurationDefaultInSeconds() throws Exception {
+        conf.setTtlDurationDefaultInSeconds(3600);
+        int seconds = admin.namespaces().getPolicies("prop-xyz/ns1").message_ttl_in_seconds;
+        assertEquals(seconds, 3600);
+    }
 }


[pulsar] 11/17: add keystore doc (#6922)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9d9af98161ff421a82e8b8dad852890912ec2c58
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Tue May 12 13:58:26 2020 +0800

    add keystore doc (#6922)
    
    related with #6853
    add keystore tls config doc
    (cherry picked from commit fd6f772d934725e4a38f019da90016e30166ada7)
---
 conf/client.conf                    |   2 +-
 site2/docs/security-tls-keystore.md | 286 ++++++++++++++++++++++++++++++++++++
 site2/website/sidebars.json         |   1 +
 3 files changed, 288 insertions(+), 1 deletion(-)

diff --git a/conf/client.conf b/conf/client.conf
index 597478e..8e93e1f 100644
--- a/conf/client.conf
+++ b/conf/client.conf
@@ -57,7 +57,7 @@ tlsEnableHostnameVerification=false
 tlsTrustCertsFilePath=
 
 # Enable TLS with KeyStore type configuration in broker.
-useKeyStoreTls=false;
+useKeyStoreTls=false
 
 # TLS KeyStore type configuration: JKS, PKCS12
 tlsTrustStoreType=JKS
diff --git a/site2/docs/security-tls-keystore.md b/site2/docs/security-tls-keystore.md
new file mode 100644
index 0000000..befe23c
--- /dev/null
+++ b/site2/docs/security-tls-keystore.md
@@ -0,0 +1,286 @@
+---
+id: security-tls-keystore
+title: Using TLS with KeyStore configure
+sidebar_label: Using TLS with KeyStore configure
+---
+
+## Overview
+
+Apache Pulsar supports [TLS encryption](security-tls-transport.md) and [TLS authentication](security-tls-authentication.md) between clients and Apache Pulsar service. 
+By default it uses PEM format file configuration. This page tries to describe use [KeyStore](https://en.wikipedia.org/wiki/Java_KeyStore) type configure for TLS.
+
+
+## TLS encryption with KeyStore configure
+ 
+### Generate TLS key and certificate
+
+The first step of deploying TLS is to generate the key and the certificate for each machine in the cluster.
+You can use Java’s `keytool` utility to accomplish this task. We will generate the key into a temporary keystore
+initially for broker, so that we can export and sign it later with CA.
+
+```shell
+keytool -keystore broker.keystore.jks -alias localhost -validity {validity} -genkey
+```
+
+You need to specify two parameters in the above command:
+
+1. `keystore`: the keystore file that stores the certificate. The *keystore* file contains the private key of
+    the certificate; hence, it needs to be kept safely.
+2. `validity`: the valid time of the certificate in days.
+
+> Ensure that common name (CN) matches exactly with the fully qualified domain name (FQDN) of the server.
+The client compares the CN with the DNS domain name to ensure that it is indeed connecting to the desired server, not a malicious one.
+
+### Creating your own CA
+
+After the first step, each broker in the cluster has a public-private key pair, and a certificate to identify the machine.
+The certificate, however, is unsigned, which means that an attacker can create such a certificate to pretend to be any machine.
+
+Therefore, it is important to prevent forged certificates by signing them for each machine in the cluster.
+A `certificate authority (CA)` is responsible for signing certificates. CA works likes a government that issues passports —
+the government stamps (signs) each passport so that the passport becomes difficult to forge. Other governments verify the stamps
+to ensure the passport is authentic. Similarly, the CA signs the certificates, and the cryptography guarantees that a signed
+certificate is computationally difficult to forge. Thus, as long as the CA is a genuine and trusted authority, the clients have
+high assurance that they are connecting to the authentic machines.
+
+```shell
+openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
+```
+
+The generated CA is simply a *public-private* key pair and certificate, and it is intended to sign other certificates.
+
+The next step is to add the generated CA to the clients' truststore so that the clients can trust this CA:
+
+```shell
+keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
+```
+
+NOTE: If you configure the brokers to require client authentication by setting `tlsRequireTrustedClientCertOnConnect` to `true` on the
+broker configuration, then you must also provide a truststore for the brokers and it should have all the CA certificates that clients keys were signed by.
+
+```shell
+keytool -keystore broker.truststore.jks -alias CARoot -import -file ca-cert
+```
+
+In contrast to the keystore, which stores each machine’s own identity, the truststore of a client stores all the certificates
+that the client should trust. Importing a certificate into one’s truststore also means trusting all certificates that are signed
+by that certificate. As the analogy above, trusting the government (CA) also means trusting all passports (certificates) that
+it has issued. This attribute is called the chain of trust, and it is particularly useful when deploying TLS on a large BookKeeper cluster.
+You can sign all certificates in the cluster with a single CA, and have all machines share the same truststore that trusts the CA.
+That way all machines can authenticate all other machines.
+
+
+### Signing the certificate
+
+The next step is to sign all certificates in the keystore with the CA we generated. First, you need to export the certificate from the keystore:
+
+```shell
+keytool -keystore broker.keystore.jks -alias localhost -certreq -file cert-file
+```
+
+Then sign it with the CA:
+
+```shell
+openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}
+```
+
+Finally, you need to import both the certificate of the CA and the signed certificate into the keystore:
+
+```shell
+keytool -keystore broker.keystore.jks -alias CARoot -import -file ca-cert
+keytool -keystore broker.keystore.jks -alias localhost -import -file cert-signed
+```
+
+The definitions of the parameters are the following:
+
+1. `keystore`: the location of the keystore
+2. `ca-cert`: the certificate of the CA
+3. `ca-key`: the private key of the CA
+4. `ca-password`: the passphrase of the CA
+5. `cert-file`: the exported, unsigned certificate of the broker
+6. `cert-signed`: the signed certificate of the broker
+
+### Configuring brokers
+
+Brokers enable TLS by provide valid `brokerServicePortTls` and `webServicePortTls`, and also need set `tlsEnabledWithKeyStore` to `true` for using KeyStore type configuration.
+Besides this, KeyStore path,  KeyStore password, TrustStore path, and TrustStore password need to provided.
+And since broker will create internal client/admin client to communicate with other brokers, user also need to provide config for them, this is similar to how user config the outside client/admin-client.
+If `tlsRequireTrustedClientCertOnConnect` is `true`, broker will reject the Connection if the Client Certificate is not trusted. 
+
+The following TLS configs are needed on the broker side:
+
+```properties
+tlsEnabledWithKeyStore=true
+# key store
+tlsKeyStoreType=JKS
+tlsKeyStore=/var/private/tls/broker.keystore.jks
+tlsKeyStorePassword=brokerpw
+
+# trust store
+tlsTrustStoreType=JKS
+tlsTrustStore=/var/private/tls/broker.truststore.jks
+tlsTrustStorePassword=brokerpw
+
+# interal client/admin-client config
+brokerClientTlsEnabled=true
+brokerClientTlsEnabledWithKeyStore=true
+brokerClientTlsTrustStoreType=JKS
+brokerClientTlsTrustStore=/var/private/tls/client.truststore.jks
+brokerClientTlsTrustStorePassword=clientpw
+```
+
+NOTE: it is important to restrict access to the store files via filesystem permissions.
+
+Optional settings that may worth consider:
+
+1. tlsClientAuthentication=false: Enable/Disable using TLS for authentication. This config when enabled will authenticate the other end
+    of the communication channel. It should be enabled on both brokers and clients for mutual TLS.
+2. tlsCiphers=[TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256], A cipher suite is a named combination of authentication, encryption, MAC and key exchange
+    algorithm used to negotiate the security settings for a network connection using TLS network protocol. By default,
+    it is null. [OpenSSL Ciphers](https://www.openssl.org/docs/man1.0.2/apps/ciphers.html)
+    [JDK Ciphers](http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites)
+3. tlsProtocols=[TLSv1.2,TLSv1.1,TLSv1] (list out the TLS protocols that you are going to accept from clients).
+    By default, it is not set.
+
+### Configuring Clients
+
+This is similar to [TLS encryption configuing for client with PEM type](security-tls-transport.md#Client configuration).
+For a a minimal configuration, user need to provide the TrustStore information.
+
+e.g. 
+1. for [Command-line tools](reference-cli-tools.md) like [`pulsar-admin`](reference-cli-tools#pulsar-admin), [`pulsar-perf`](reference-cli-tools#pulsar-perf), and [`pulsar-client`](reference-cli-tools#pulsar-client) use the `conf/client.conf` config file in a Pulsar installation.
+
+    ```properties
+    webServiceUrl=https://broker.example.com:8443/
+    brokerServiceUrl=pulsar+ssl://broker.example.com:6651/
+    useKeyStoreTls=true
+    tlsTrustStoreType=JKS
+    tlsTrustStorePath=/var/private/tls/client.truststore.jks
+    tlsTrustStorePassword=clientpw
+    ```
+
+1. for java client
+    ```java
+    import org.apache.pulsar.client.api.PulsarClient;
+    
+    PulsarClient client = PulsarClient.builder()
+        .serviceUrl("pulsar+ssl://broker.example.com:6651/")
+        .enableTls(true)
+        .useKeyStoreTls(true)
+        .tlsTrustStorePath("/var/private/tls/client.truststore.jks")
+        .tlsTrustStorePassword("clientpw")
+        .allowTlsInsecureConnection(false)
+        .build();
+    ```
+
+1. for java admin client
+```java
+    PulsarAdmin amdin = PulsarAdmin.builder().serviceHttpUrl("https://broker.example.com:8443")
+                .useKeyStoreTls(true)
+                .tlsTrustStorePath("/var/private/tls/client.truststore.jks")
+                .tlsTrustStorePassword("clientpw")
+                .allowTlsInsecureConnection(false)
+                .build();
+```
+
+## TLS authentication with KeyStore configure
+
+This similar to [TLS authentication with PEM type](security-tls-authentication.md)
+
+### broker authentication config
+
+`broker.conf`
+
+```properties
+# Configuration to enable authentication
+authenticationEnabled=true
+authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderTls
+
+# this should be the CN for one of client keystore.
+superUserRoles=admin
+
+# Enable KeyStore type
+tlsEnabledWithKeyStore=true
+requireTrustedClientCertOnConnect=true
+
+# key store
+tlsKeyStoreType=JKS
+tlsKeyStore=/var/private/tls/broker.keystore.jks
+tlsKeyStorePassword=brokerpw
+
+# trust store
+tlsTrustStoreType=JKS
+tlsTrustStore=/var/private/tls/broker.truststore.jks
+tlsTrustStorePassword=brokerpw
+
+# interal client/admin-client config
+brokerClientTlsEnabled=true
+brokerClientTlsEnabledWithKeyStore=true
+brokerClientTlsTrustStoreType=JKS
+brokerClientTlsTrustStore=/var/private/tls/client.truststore.jks
+brokerClientTlsTrustStorePassword=clientpw
+# internal auth config
+brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls
+brokerClientAuthenticationParameters=keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw
+# currently websocket not support keystore type
+webSocketServiceEnabled=false
+```
+
+### client authentication configuring
+
+Besides the TLS encryption configuring. The main work is configuring the KeyStore, which contains a valid CN as client role, for client.
+
+e.g. 
+1. for [Command-line tools](reference-cli-tools.md) like [`pulsar-admin`](reference-cli-tools#pulsar-admin), [`pulsar-perf`](reference-cli-tools#pulsar-perf), and [`pulsar-client`](reference-cli-tools#pulsar-client) use the `conf/client.conf` config file in a Pulsar installation.
+
+    ```properties
+    webServiceUrl=https://broker.example.com:8443/
+    brokerServiceUrl=pulsar+ssl://broker.example.com:6651/
+    useKeyStoreTls=true
+    tlsTrustStoreType=JKS
+    tlsTrustStorePath=/var/private/tls/client.truststore.jks
+    tlsTrustStorePassword=clientpw
+    authPlugin=org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls
+    authParams=keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw
+    ```
+
+1. for java client
+    ```java
+    import org.apache.pulsar.client.api.PulsarClient;
+    
+    PulsarClient client = PulsarClient.builder()
+        .serviceUrl("pulsar+ssl://broker.example.com:6651/")
+        .enableTls(true)
+        .useKeyStoreTls(true)
+        .tlsTrustStorePath("/var/private/tls/client.truststore.jks")
+        .tlsTrustStorePassword("clientpw")
+        .allowTlsInsecureConnection(false)
+        .authentication(
+                "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls",
+                "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw")
+        .build();
+    ```
+
+1. for java admin client
+    ```java
+        PulsarAdmin amdin = PulsarAdmin.builder().serviceHttpUrl("https://broker.example.com:8443")
+            .useKeyStoreTls(true)
+            .tlsTrustStorePath("/var/private/tls/client.truststore.jks")
+            .tlsTrustStorePassword("clientpw")
+            .allowTlsInsecureConnection(false)
+            .authentication(
+                   "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls",
+                   "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw")
+            .build();
+    ```
+
+## Enabling TLS Logging
+
+You can enable TLS debug logging at the JVM level by starting the brokers and/or clients with `javax.net.debug` system property. For example:
+
+```shell
+-Djavax.net.debug=all
+```
+
+You can find more details on this in [Oracle documentation](http://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/ReadDebug.html) on
+[debugging SSL/TLS connections](http://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/ReadDebug.html).
diff --git a/site2/website/sidebars.json b/site2/website/sidebars.json
index b3b6473..8fc9d49 100644
--- a/site2/website/sidebars.json
+++ b/site2/website/sidebars.json
@@ -75,6 +75,7 @@
       "security-overview",
       "security-tls-transport",
       "security-tls-authentication",
+      "security-tls-keystore",
       "security-jwt",
       "security-athenz",
       "security-kerberos",


[pulsar] 13/17: [ISSUE 6563][Broker] Invalidate managed ledgers zookeeper cache instead of reloading on watcher triggered (#6659)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 926d6871436ba463d64440933a3b8a4aa85d15bf
Author: Pavel <rf...@gmail.com>
AuthorDate: Tue May 12 10:14:10 2020 +0300

    [ISSUE 6563][Broker] Invalidate managed ledgers zookeeper cache instead of reloading on watcher triggered (#6659)
    
    Fixes #6563
    
    ### Motivation
    Frequent topics creation/deletion triggers zookeeper children cache reloading for z-nodes **/managed-ledgers/<tenant_name>/<cluster_name>/<namespace_name>/persistent** more than needed.
    This creates additional load on zookeeper and broker, slows down brokers and makes them less stable. Also this causes scalability issues - adding more brokers increases operations duration even more.
    
    * [ISSUE 6563][Broker] Invalidate managed ledgers zookeeper cache instead of reloading on watcher triggered
    
    * [ISSUE 6563][Broker] Adding licence header to the new class
    
    * [ISSUE 6563][Broker] Invalidate correct zk cache path
    
    * [ISSUE 6563][Broker] Fix mocking issue
    
    Co-authored-by: Pavel Tishkevich <pa...@onde.app>
    Co-authored-by: penghui <pe...@apache.org>(cherry picked from commit f4fc7994071383748d1c5def92da2a652419c2c0)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  3 +-
 .../broker/cache/LocalZooKeeperCacheService.java   |  8 +--
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  6 +-
 .../pulsar/zookeeper/ZooKeeperChildrenCache.java   |  4 --
 .../zookeeper/ZooKeeperManagedLedgerCache.java     | 71 ++++++++++++++++++++++
 5 files changed, 80 insertions(+), 12 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 42e7a7b..c49acbe 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -72,6 +72,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
+import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
@@ -526,7 +527,7 @@ public abstract class AdminResource extends PulsarWebResource {
         return pulsar().getConfigurationCache().clustersCache();
     }
 
-    protected ZooKeeperChildrenCache managedLedgerListCache() {
+    protected ZooKeeperManagedLedgerCache managedLedgerListCache() {
         return pulsar().getLocalZkCacheService().managedLedgerListCache();
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
index 625f805..48b3f88 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
@@ -34,7 +34,7 @@ import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
-import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
+import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -54,7 +54,7 @@ public class LocalZooKeeperCacheService {
     private final ZooKeeperCache cache;
 
     private ZooKeeperDataCache<NamespaceEphemeralData> ownerInfoCache;
-    private ZooKeeperChildrenCache managedLedgerListCache;
+    private ZooKeeperManagedLedgerCache managedLedgerListCache;
     private ResourceQuotaCache resourceQuotaCache;
     private ZooKeeperDataCache<LocalPolicies> policiesCache;
 
@@ -118,7 +118,7 @@ public class LocalZooKeeperCacheService {
             }
         };
 
-        this.managedLedgerListCache = new ZooKeeperChildrenCache(cache, MANAGED_LEDGER_ROOT);
+        this.managedLedgerListCache = new ZooKeeperManagedLedgerCache(cache, MANAGED_LEDGER_ROOT);
         this.resourceQuotaCache = new ResourceQuotaCache(cache);
         this.resourceQuotaCache.initZK();
     }
@@ -244,7 +244,7 @@ public class LocalZooKeeperCacheService {
         return this.policiesCache;
     }
 
-    public ZooKeeperChildrenCache managedLedgerListCache() {
+    public ZooKeeperManagedLedgerCache managedLedgerListCache() {
         return this.managedLedgerListCache;
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index fe1ad87..bec3101 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -40,7 +40,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
+import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
 import org.apache.zookeeper.KeeperException;
 import org.mockito.ArgumentCaptor;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -254,7 +254,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         final String nonPartitionTopicName2 = "special-topic-partition-123";
         final String partitionedTopicName = "special-topic";
         LocalZooKeeperCacheService mockLocalZooKeeperCacheService = mock(LocalZooKeeperCacheService.class);
-        ZooKeeperChildrenCache mockZooKeeperChildrenCache = mock(ZooKeeperChildrenCache.class);
+        ZooKeeperManagedLedgerCache mockZooKeeperChildrenCache = mock(ZooKeeperManagedLedgerCache.class);
         doReturn(mockLocalZooKeeperCacheService).when(pulsar).getLocalZkCacheService();
         doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache();
         doReturn(ImmutableSet.of(nonPartitionTopicName1, nonPartitionTopicName2)).when(mockZooKeeperChildrenCache).get(anyString());
@@ -272,7 +272,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         final String nonPartitionTopicName2 = "special-topic-partition-10";
         final String partitionedTopicName = "special-topic";
         LocalZooKeeperCacheService mockLocalZooKeeperCacheService = mock(LocalZooKeeperCacheService.class);
-        ZooKeeperChildrenCache mockZooKeeperChildrenCache = mock(ZooKeeperChildrenCache.class);
+        ZooKeeperManagedLedgerCache mockZooKeeperChildrenCache = mock(ZooKeeperManagedLedgerCache.class);
         doReturn(mockLocalZooKeeperCacheService).when(pulsar).getLocalZkCacheService();
         doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache();
         doReturn(ImmutableSet.of(nonPartitionTopicName2)).when(mockZooKeeperChildrenCache).get(anyString());
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
index 0bb6f46..ce64480 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
@@ -75,10 +75,6 @@ public class ZooKeeperChildrenCache implements Watcher, CacheUpdater<Set<String>
         cache.invalidateChildren(path);
     }
 
-    public void clearTree() {
-        cache.invalidateRoot(path);
-    }
-
     @Override
     public void reloadCache(final String path) {
         cache.invalidate(path);
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperManagedLedgerCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperManagedLedgerCache.java
new file mode 100644
index 0000000..0b2ab14
--- /dev/null
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperManagedLedgerCache.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.zookeeper;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+public class ZooKeeperManagedLedgerCache implements Watcher {
+    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperManagedLedgerCache.class);
+
+    private final ZooKeeperCache cache;
+    private final String path;
+
+    public ZooKeeperManagedLedgerCache(ZooKeeperCache cache, String path) {
+        this.cache = cache;
+        this.path = path;
+    }
+
+    public Set<String> get(String path) throws KeeperException, InterruptedException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("getChildren called at: {}", path);
+        }
+
+        Set<String> children = cache.getChildrenAsync(path, this).join();
+        if (children == null) {
+            throw KeeperException.create(KeeperException.Code.NONODE);
+        }
+
+        return children;
+    }
+
+    public CompletableFuture<Set<String>> getAsync(String path) {
+        return cache.getChildrenAsync(path, this);
+    }
+
+    public void clearTree() {
+        cache.invalidateRoot(path);
+    }
+
+    @Override
+    public void process(WatchedEvent watchedEvent) {
+        LOG.info("[{}] Received ZooKeeper watch event: {}", cache.zkSession.get(), watchedEvent);
+        String watchedEventPath = watchedEvent.getPath();
+        if (watchedEventPath != null) {
+            LOG.info("invalidate called in zookeeperChildrenCache for path {}", watchedEventPath);
+            cache.invalidate(watchedEventPath);
+        }
+    }
+}


[pulsar] 05/17: fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize (#6862)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c6bbfc6d59c83ee78688b4b7dd177277aa8e4c72
Author: hangc0276 <ha...@163.com>
AuthorDate: Mon May 11 08:41:11 2020 +0800

    fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize (#6862)
    
    Fix #6854
    
    ### Bug description
    The consumer stuck due to `hasEnoughMessagesForBatchReceive` checking:
    ```
    protected boolean hasEnoughMessagesForBatchReceive() {
            if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumBytes() <= 0) {
                return false;
            }
            return (batchReceivePolicy.getMaxNumMessages() > 0 && incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages())
                    || (batchReceivePolicy.getMaxNumBytes() > 0 && INCOMING_MESSAGES_SIZE_UPDATER.get(this) >= batchReceivePolicy.getMaxNumBytes());
        }
    ```
    
    ### Changes
    When batchReceivePolicy maxNumMessages > maxReceiverQueueSize, we force batch receivePolicy maxNumMessages to maxReceiverQueueSize to avoid consumer stuck when checking `hasEnoughMessagesForBatchReceive`
    
    * fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize
    
    * throw log warn and add test case
    (cherry picked from commit 561868d4c441d654ab795a9386d8e9a5e28f03dd)
---
 .../client/api/ConsumerBatchReceiveTest.java       | 124 ++++++++++++++++-----
 .../pulsar/client/api/BatchReceivePolicy.java      |   2 +-
 .../apache/pulsar/client/impl/ConsumerBase.java    |  29 ++++-
 .../client/impl/MultiTopicsConsumerImpl.java       |   2 +-
 4 files changed, 128 insertions(+), 29 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
index 19ec983..54a6e52 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
@@ -55,96 +55,166 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase {
         return new Object[][] {
 
                 // Default batch receive policy.
-                { BatchReceivePolicy.DEFAULT_POLICY, true },
+                { BatchReceivePolicy.DEFAULT_POLICY, true, 1000},
                 // Only receive timeout limitation.
                 { BatchReceivePolicy.builder()
                         .timeout(50, TimeUnit.MILLISECONDS)
-                        .build(), true
+                        .build(), true, 1000
                 },
                 // Only number of messages in a single batch receive limitation.
                 { BatchReceivePolicy.builder()
                         .maxNumMessages(10)
-                        .build(), true
+                        .build(), true, 1000
                 },
                 // Number of messages and timeout limitation
                 { BatchReceivePolicy.builder()
                         .maxNumMessages(13)
                         .timeout(50, TimeUnit.MILLISECONDS)
-                        .build(), true
+                        .build(), true, 1000
                 },
                 // Size of messages and timeout limitation
                 { BatchReceivePolicy.builder()
                         .maxNumBytes(64)
                         .timeout(50, TimeUnit.MILLISECONDS)
-                        .build(), true
+                        .build(), true, 1000
                 },
                 // Default batch receive policy.
-                { BatchReceivePolicy.DEFAULT_POLICY, false },
+                { BatchReceivePolicy.DEFAULT_POLICY, false, 1000 },
                 // Only receive timeout limitation.
                 { BatchReceivePolicy.builder()
                         .timeout(50, TimeUnit.MILLISECONDS)
-                        .build(), false
+                        .build(), false, 1000
                 },
                 // Only number of messages in a single batch receive limitation.
                 { BatchReceivePolicy.builder()
                         .maxNumMessages(10)
-                        .build(), false
+                        .build(), false, 1000
                 },
                 // Number of messages and timeout limitation
                 { BatchReceivePolicy.builder()
                         .maxNumMessages(13)
                         .timeout(50, TimeUnit.MILLISECONDS)
-                        .build(), false
+                        .build(), false, 1000
                 },
                 // Size of messages and timeout limitation
                 { BatchReceivePolicy.builder()
                         .maxNumBytes(64)
                         .timeout(50, TimeUnit.MILLISECONDS)
-                        .build(), false
+                        .build(), false, 1000
+                },
+                // Number of message limitation exceed receiverQueue size
+                {
+                    BatchReceivePolicy.builder()
+                        .maxNumMessages(70)
+                        .build(), true, 50
+                },
+                // Number of message limitation exceed receiverQueue size and timeout limitation
+                {
+                    BatchReceivePolicy.builder()
+                        .maxNumMessages(50)
+                        .timeout(10, TimeUnit.MILLISECONDS)
+                        .build(), true, 30
+                },
+                // Number of message limitation is negative and timeout limitation
+                {
+                    BatchReceivePolicy.builder()
+                        .maxNumMessages(-10)
+                        .timeout(10, TimeUnit.MILLISECONDS)
+                        .build(), true, 10
+                },
+                // Size of message limitation is negative and timeout limitation
+                {
+                    BatchReceivePolicy.builder()
+                        .maxNumBytes(-100)
+                        .timeout(50, TimeUnit.MILLISECONDS)
+                        .build(), true, 30
+                },
+                // Number of message limitation and size of message limitation are both negative and timeout limitation
+                {
+                    BatchReceivePolicy.builder()
+                        .maxNumMessages(-10)
+                        .maxNumBytes(-100)
+                        .timeout(50, TimeUnit.MILLISECONDS)
+                        .build(), true, 30
+                },
+                // Number of message limitation exceed receiverQueue size
+                {
+                    BatchReceivePolicy.builder()
+                        .maxNumMessages(70)
+                        .build(), false, 50
+                },
+                // Number of message limitation exceed receiverQueue size and timeout limitation
+                {
+                    BatchReceivePolicy.builder()
+                        .maxNumMessages(50)
+                        .timeout(50, TimeUnit.MILLISECONDS)
+                        .build(), false, 30
+                },
+                // Number of message limitation is negative and timeout limitation
+                {
+                    BatchReceivePolicy.builder()
+                        .maxNumMessages(-10)
+                        .timeout(50, TimeUnit.MILLISECONDS)
+                        .build(), false, 30
+                },
+                // Size of message limitation is negative and timeout limitation
+                {
+                    BatchReceivePolicy.builder()
+                        .maxNumBytes(-100)
+                        .timeout(50, TimeUnit.MILLISECONDS)
+                        .build(), false, 30
+                },
+                // Number of message limitation and size of message limitation are both negative and timeout limitation
+                {
+                    BatchReceivePolicy.builder()
+                        .maxNumMessages(-10)
+                        .maxNumBytes(-100)
+                        .timeout(50, TimeUnit.MILLISECONDS)
+                        .build(), false, 30
                 }
         };
     }
 
     @Test(dataProvider = "batchReceivePolicy")
-    public void testBatchReceiveNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
+    public void testBatchReceiveNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
         final String topic = "persistent://my-property/my-ns/batch-receive-non-partition-" + UUID.randomUUID();
-        testBatchReceive(topic, batchReceivePolicy, batchProduce);
+        testBatchReceive(topic, batchReceivePolicy, batchProduce, receiverQueueSize);
     }
 
     @Test(dataProvider = "batchReceivePolicy")
-    public void testBatchReceivePartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
+    public void testBatchReceivePartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
         final String topic = "persistent://my-property/my-ns/batch-receive-" + UUID.randomUUID();
         admin.topics().createPartitionedTopic(topic, 3);
-        testBatchReceive(topic, batchReceivePolicy, batchProduce);
+        testBatchReceive(topic, batchReceivePolicy, batchProduce, receiverQueueSize);
     }
 
     @Test(dataProvider = "batchReceivePolicy")
-    public void testAsyncBatchReceiveNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
+    public void testAsyncBatchReceiveNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
         final String topic = "persistent://my-property/my-ns/batch-receive-non-partition-async-" + UUID.randomUUID();
-        testBatchReceiveAsync(topic, batchReceivePolicy, batchProduce);
+        testBatchReceiveAsync(topic, batchReceivePolicy, batchProduce, receiverQueueSize);
     }
 
     @Test(dataProvider = "batchReceivePolicy")
-    public void testAsyncBatchReceivePartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
+    public void testAsyncBatchReceivePartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
         final String topic = "persistent://my-property/my-ns/batch-receive-async-" + UUID.randomUUID();
         admin.topics().createPartitionedTopic(topic, 3);
-        testBatchReceiveAsync(topic, batchReceivePolicy, batchProduce);
+        testBatchReceiveAsync(topic, batchReceivePolicy, batchProduce, receiverQueueSize);
     }
 
     @Test(dataProvider = "batchReceivePolicy")
-    public void testBatchReceiveAndRedeliveryNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
+    public void testBatchReceiveAndRedeliveryNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
         final String topic = "persistent://my-property/my-ns/batch-receive-and-redelivery-non-partition-" + UUID.randomUUID();
-        testBatchReceiveAndRedelivery(topic, batchReceivePolicy, batchProduce);
+        testBatchReceiveAndRedelivery(topic, batchReceivePolicy, batchProduce, receiverQueueSize);
     }
 
     @Test(dataProvider = "batchReceivePolicy")
-    public void testBatchReceiveAndRedeliveryPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
+    public void testBatchReceiveAndRedeliveryPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
         final String topic = "persistent://my-property/my-ns/batch-receive-and-redelivery-" + UUID.randomUUID();
         admin.topics().createPartitionedTopic(topic, 3);
-        testBatchReceiveAndRedelivery(topic, batchReceivePolicy, batchProduce);
+        testBatchReceiveAndRedelivery(topic, batchReceivePolicy, batchProduce, receiverQueueSize);
     }
 
-    private void testBatchReceive(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
+    private void testBatchReceive(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
         ProducerBuilder<String> producerBuilder = pulsarClient.newProducer(Schema.STRING).topic(topic);
         if (!batchProduce) {
             producerBuilder.enableBatching(false);
@@ -155,14 +225,14 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase {
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)
                 .subscriptionName("s1")
+                .receiverQueueSize(receiverQueueSize)
                 .batchReceivePolicy(batchReceivePolicy)
                 .subscribe();
         sendMessagesAsyncAndWait(producer, 100);
         batchReceiveAndCheck(consumer, 100);
     }
 
-    private void testBatchReceiveAsync(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
-
+    private void testBatchReceiveAsync(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
         if (batchReceivePolicy.getTimeoutMs() <= 0) {
             return;
         }
@@ -178,6 +248,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase {
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)
                 .subscriptionName("s1")
+                .receiverQueueSize(receiverQueueSize)
                 .batchReceivePolicy(batchReceivePolicy)
                 .subscribe();
 
@@ -187,7 +258,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase {
         latch.await();
     }
 
-    private void testBatchReceiveAndRedelivery(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception {
+    private void testBatchReceiveAndRedelivery(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
         ProducerBuilder<String> producerBuilder = pulsarClient.newProducer(Schema.STRING).topic(topic);
         if (!batchProduce) {
             producerBuilder.enableBatching(false);
@@ -198,6 +269,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase {
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)
                 .subscriptionName("s1")
+                .receiverQueueSize(receiverQueueSize)
                 .batchReceivePolicy(batchReceivePolicy)
                 .ackTimeout(1, TimeUnit.SECONDS)
                 .subscribe();
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java
index f331b1b..3bce4c8 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java
@@ -97,7 +97,7 @@ public class BatchReceivePolicy implements Serializable {
         return maxNumMessages;
     }
 
-    public long getMaxNumBytes() {
+    public int getMaxNumBytes() {
         return maxNumBytes;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 59d4041..e2b3329 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -50,6 +50,8 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T> {
 
@@ -95,10 +97,33 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         this.schema = schema;
         this.interceptors = interceptors;
         if (conf.getBatchReceivePolicy() != null) {
-            this.batchReceivePolicy = conf.getBatchReceivePolicy();
+            BatchReceivePolicy userBatchReceivePolicy = conf.getBatchReceivePolicy();
+            if (userBatchReceivePolicy.getMaxNumMessages() > this.maxReceiverQueueSize) {
+                this.batchReceivePolicy = BatchReceivePolicy.builder()
+                        .maxNumMessages(this.maxReceiverQueueSize)
+                        .maxNumBytes(userBatchReceivePolicy.getMaxNumBytes())
+                        .timeout((int) userBatchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS)
+                        .build();
+                log.warn("BatchReceivePolicy maxNumMessages: {} is greater than maxReceiverQueueSize: {}, " +
+                        "reset to maxReceiverQueueSize. batchReceivePolicy: {}",
+                        userBatchReceivePolicy.getMaxNumMessages(), this.maxReceiverQueueSize,
+                        this.batchReceivePolicy.toString());
+            } else if (userBatchReceivePolicy.getMaxNumMessages() <= 0 && userBatchReceivePolicy.getMaxNumBytes() <= 0) {
+                this.batchReceivePolicy = BatchReceivePolicy.builder()
+                        .maxNumMessages(BatchReceivePolicy.DEFAULT_POLICY.getMaxNumMessages())
+                        .maxNumBytes(BatchReceivePolicy.DEFAULT_POLICY.getMaxNumBytes())
+                        .timeout((int) userBatchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS)
+                        .build();
+                log.warn("BatchReceivePolicy maxNumMessages: {} or maxNumBytes: {} is less than 0. " +
+                        "Reset to DEFAULT_POLICY. batchReceivePolicy: {}", userBatchReceivePolicy.getMaxNumMessages(),
+                        userBatchReceivePolicy.getMaxNumBytes(), this.batchReceivePolicy.toString());
+            } else {
+                this.batchReceivePolicy = conf.getBatchReceivePolicy();
+            }
         } else {
             this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY;
         }
+
         if (batchReceivePolicy.getTimeoutMs() > 0) {
             batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask, batchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS);
         }
@@ -594,4 +619,6 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     }
 
     protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);
+
+    private static final Logger log = LoggerFactory.getLogger(ConsumerBase.class);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 970e134..4972b99 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -626,6 +626,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     @Override
     protected void completeOpBatchReceive(OpBatchReceive<T> op) {
         notifyPendingBatchReceivedCallBack(op);
+        resumeReceivingFromPausedConsumersIfNeeded();
     }
 
     @Override
@@ -642,7 +643,6 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         try {
             seekAsync(timestamp).get();
         } catch (Exception e) {
-            throw PulsarClientException.unwrap(e);
         }
     }
 


[pulsar] 02/17: use originalAuthMethod on originalAuthChecker (#6870)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 450cc152277fe4dd64c68097dfc9feee75ecce27
Author: Alexandre DUVAL <ka...@gmail.com>
AuthorDate: Sat May 9 02:46:00 2020 +0200

    use originalAuthMethod on originalAuthChecker (#6870)
    
    Fixes #6873
    
    Use originalAuthMethod on originalAuthProvider check.
    (cherry picked from commit 8381371a3cdae52521c8d50ca053d7510989fb33)
---
 .../java/org/apache/pulsar/broker/service/ServerCnx.java | 16 +++++++++++++++-
 1 file changed, 15 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index db61f8d..e72b599 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -649,9 +649,23 @@ public class ServerCnx extends PulsarHandler {
             //  2. we require to validate the original credentials
             //  3. no credentials were passed
             if (connect.hasOriginalPrincipal() && service.getPulsar().getConfig().isAuthenticateOriginalAuthData()) {
+                // init authentication
+                String originalAuthMethod;
+                if (connect.hasOriginalAuthMethod()) {
+                    originalAuthMethod = connect.getOriginalAuthMethod();
+                } else {
+                    originalAuthMethod = "none";
+                }
+
                 AuthenticationProvider originalAuthenticationProvider = getBrokerService()
                         .getAuthenticationService()
-                        .getAuthenticationProvider(authMethod);
+                        .getAuthenticationProvider(originalAuthMethod);
+
+                if (originalAuthenticationProvider == null) {
+                    throw new AuthenticationException(String.format("Can't find AuthenticationProvider for original role" +
+                            " using auth method [%s] is not available", originalAuthMethod));
+                }
+
                 originalAuthState = originalAuthenticationProvider.newAuthState(
                         AuthData.of(connect.getOriginalAuthData().getBytes()),
                         remoteAddress,


[pulsar] 08/17: [Docs] Fix the Create subscribtion swagger of PersistentTopic (#6776)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit be913edefa37b77ecf5780caebc9873de34d7b30
Author: 冉小龙 <rx...@apache.org>
AuthorDate: Tue May 12 12:10:50 2020 +0800

    [Docs] Fix the Create subscribtion swagger of PersistentTopic (#6776)
    
    * [Docs] Fix the Create subscribtion swagger of PersistentTopic
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
    
    * fix a little
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
    
    * fix a little
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
    
    * fix comments
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
    
    * fix comments
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
    
    * fix comments
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>(cherry picked from commit ce29135cfa15d9f20477a137c497848320f1b737)
---
 .../org/apache/pulsar/broker/admin/v1/PersistentTopics.java |  5 +++--
 .../org/apache/pulsar/broker/admin/v2/PersistentTopics.java | 13 ++++++++-----
 2 files changed, 11 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index fbe378e..027e7bb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -470,8 +470,9 @@ public class PersistentTopics extends PersistentTopicsBase {
 
     @PUT
     @Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subscriptionName}")
-    @ApiOperation(value = "Reset subscription to message position closest to given position.", notes = "Creates a subscription on the topic at the specified message id")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+    @ApiOperation(value = "Create a subscription on the topic.", notes = "Creates a subscription on the topic at the specified message id")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
             @ApiResponse(code = 405, message = "Not supported for partitioned topics") })
     public void createSubscription(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index b7b780b..6f187ff 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -752,7 +752,7 @@ public class PersistentTopics extends PersistentTopicsBase {
 
     @PUT
     @Path("/{tenant}/{namespace}/{topic}/subscription/{subscriptionName}")
-    @ApiOperation(value = "Reset subscription to message position closest to given position.", notes = "Creates a subscription on the topic at the specified message id")
+    @ApiOperation(value = "Create a subscription on the topic.", notes = "Creates a subscription on the topic at the specified message id")
     @ApiResponses(value = {
             @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" +
                     "subscriber is not authorized to access this operation"),
@@ -771,14 +771,17 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String topic,
             @ApiParam(value = "Subscription to create position on", required = true)
             @PathParam("subscriptionName") String encodedSubName,
-            @ApiParam(value = "messageId where to create the subscription. " +
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @ApiParam(name = "messageId", value = "messageId where to create the subscription. " +
                     "It can be 'latest', 'earliest' or (ledgerId:entryId)",
                     defaultValue = "latest",
                     allowableValues = "latest,earliest,ledgerId:entryId"
             )
-            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId,
-            @ApiParam(value = "Is authentication required to perform this operation")
-            @QueryParam("replicated") boolean replicated) {
+            MessageIdImpl messageId,
+            @ApiParam(value = "Is replicated required to perform this operation")
+            @QueryParam("replicated") boolean replicated
+            ) {
         try {
             validateTopicName(tenant, namespace, topic);
             internalCreateSubscription(asyncResponse, decode(encodedSubName), messageId, authoritative, replicated);


[pulsar] 07/17: Close producer when the topic does not exists. (#6879)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2b1a1ed8add1d2f2de025af5fa1a7ed514f0ebcf
Author: lipenghui <pe...@apache.org>
AuthorDate: Mon May 11 17:50:39 2020 +0800

    Close producer when the topic does not exists. (#6879)
    
    Fixes #6838
    
    ### Motivation
    
    Close producer when the topic does not exists.
    
    ### Modifications
    
    1. Fix exception handle for the topic does not exist.
    2. Change state to Close when producer got TopicDoesNotExists exception so that the producer can close the cnx and will no longer add send timeout tasks to HashedWheelTimer.
    (cherry picked from commit 6eed21739e5c33e5064cd34cf0da0d93a991b1e5)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  1 -
 .../apache/pulsar/broker/service/ServerCnx.java    |  6 ++
 .../pulsar/client/impl/TopicDoesNotExistsTest.java | 70 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    | 15 ++++-
 4 files changed, 90 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index a1e22ad..0322820 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -62,7 +62,6 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.admin.ZkAdminPaths;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index e72b599..c1a90be 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -37,6 +37,7 @@ import io.netty.handler.ssl.SslHandler;
 import java.net.SocketAddress;
 
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
@@ -1103,6 +1104,11 @@ public class ServerCnx extends PulsarHandler {
                             });
                         }).exceptionally(exception -> {
                             Throwable cause = exception.getCause();
+
+                            if (cause instanceof NoSuchElementException) {
+                                cause = new TopicNotFoundException("Topic Not Found.");
+                            }
+
                             if (!(cause instanceof ServiceUnitNotReadyException)) {
                                 // Do not print stack traces for expected exceptions
                                 log.error("[{}] Failed to create topic {}", remoteAddress, topicName, exception);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
new file mode 100644
index 0000000..adaa831
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl;
+
+import io.netty.util.HashedWheelTimer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for not exists topic.
+ */
+public class TopicDoesNotExistsTest extends ProducerConsumerBase {
+
+    @Override
+    @BeforeClass
+    public void setup() throws Exception {
+        conf.setAllowAutoTopicCreation(false);
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @Override
+    @AfterClass
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test(expectedExceptions = PulsarClientException.TopicDoesNotExistException.class)
+    public void testCreateProducerOnNotExistsTopic() throws PulsarClientException, InterruptedException {
+        pulsarClient.newProducer()
+                .topic("persistent://public/default/" + UUID.randomUUID().toString())
+                .sendTimeout(1, TimeUnit.SECONDS)
+                .create();
+        Thread.sleep(2000);
+        HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) pulsarClient).timer();
+        Assert.assertEquals(timer.pendingTimeouts(), 0);
+    }
+
+    @Test(expectedExceptions = PulsarClientException.TopicDoesNotExistException.class)
+    public void testCreateConsumerOnNotExistsTopic() throws PulsarClientException {
+        pulsarClient.newConsumer()
+                .topic("persistent://public/default/" + UUID.randomUUID().toString())
+                .subscriptionName("test")
+                .subscribe();
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index f1f48a2..e947f97 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1168,7 +1168,20 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                         return null;
                     }
                     log.error("[{}] [{}] Failed to create producer: {}", topic, producerName, cause.getMessage());
-
+                    // Close the producer since topic does not exists.
+                    if (getState() == State.Failed
+                            && cause instanceof PulsarClientException.TopicDoesNotExistException) {
+                        closeAsync().whenComplete((v, ex) -> {
+                            if (ex != null) {
+                                log.error("Failed to close producer on TopicDoesNotExistException.", ex);
+                            }
+                            producerCreatedFuture.completeExceptionally(cause);
+                            if (getState() == State.Closing || getState() == State.Closed) {
+                                cnx.channel().close();
+                            }
+                        });
+                        return null;
+                    }
                     if (cause instanceof PulsarClientException.ProducerBlockedQuotaExceededException) {
                         synchronized (this) {
                             log.warn("[{}] [{}] Topic backlog quota exceeded. Throwing Exception on producer.", topic,


[pulsar] 09/17: [C++] Auto update topic partitions (#6732)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1ccecf139a34778994acb03ce28db42f88c8067f
Author: BewareMyPower <xy...@163.com>
AuthorDate: Sun May 10 13:45:46 2020 +0800

    [C++] Auto update topic partitions (#6732)
    
    ### Motivation
    
    We need to increase producers or consumers when partitions updated.
    
    Java client has implemented this feature, see [#3513](https://github.com/apache/pulsar/pull/3513). This PR trys to implement the same feature in C++ client.
    
    ### Modifications
    
    - Add a `boost::asio::deadline_timer` to `PartitionedConsumerImpl` and `PartitionedProducerImpl` to register lookup task to detect partitions changes periodly;
    - Add an `unsigned int` configuration parameter to indicate the period seconds of detecting partitions change (default: 60 seconds);
    - Unlock the `mutex_` in `PartitionedConsumerImpl::receive` after `state_` were checked.
    > Explain: When new consumers are created, `handleSinglePartitionConsumerCreated` will be called finally, which tried to lock the `mutex_`. It may happen that `receive` acquire the lock again and again so that `handleSinglePartitionConsumerCreated`
    are blocked in `lock.lock()` for a long time.
    
    * auto update topic partitions extend for consumer and producer in c++ client
    
    * add c++ unit test for partitions update
    
    * format code with clang-format-5.0
    
    * stop partitions update timer after producer/consumer called closeAsync()
    
    * fix bugs when running gtest-parallel
    
    * fix bug: Producer::flush() may cause deadlock
    
    * use getters to read `numPartitions` with or without lock
    (cherry picked from commit 30934e16eacb83c34a6691db3d9ad294d260599e)
---
 .../include/pulsar/ClientConfiguration.h           |  16 ++
 pulsar-client-cpp/lib/ClientConfiguration.cc       |   9 ++
 pulsar-client-cpp/lib/ClientConfigurationImpl.h    |   5 +-
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc   | 152 ++++++++++++++----
 pulsar-client-cpp/lib/PartitionedConsumerImpl.h    |  13 ++
 pulsar-client-cpp/lib/PartitionedProducerImpl.cc   | 125 ++++++++++++---
 pulsar-client-cpp/lib/PartitionedProducerImpl.h    |  17 ++
 pulsar-client-cpp/tests/HttpHelper.cc              |   6 +-
 pulsar-client-cpp/tests/HttpHelper.h               |   1 +
 pulsar-client-cpp/tests/PartitionsUpdateTest.cc    | 176 +++++++++++++++++++++
 10 files changed, 470 insertions(+), 50 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
index fd21a3e..056f7a7 100644
--- a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
@@ -149,6 +149,22 @@ class PULSAR_PUBLIC ClientConfiguration {
      */
     const unsigned int& getStatsIntervalInSeconds() const;
 
+    /**
+     * Set partitions update interval in seconds.
+     * If a partitioned topic is produced or subscribed and `intervalInSeconds` is not 0, every
+     * `intervalInSeconds` seconds the partition number will be retrieved by sending lookup requests. If
+     * partition number has been increased, more producer/consumer of increased partitions will be created.
+     * Default is 60 seconds.
+     *
+     * @param intervalInSeconds the seconds between two lookup request for partitioned topic's metadata
+     */
+    ClientConfiguration& setPartititionsUpdateInterval(unsigned int intervalInSeconds);
+
+    /**
+     * Get partitions update interval in seconds.
+     */
+    unsigned int getPartitionsUpdateInterval() const;
+
     friend class ClientImpl;
     friend class PulsarWrapper;
 
diff --git a/pulsar-client-cpp/lib/ClientConfiguration.cc b/pulsar-client-cpp/lib/ClientConfiguration.cc
index aac7296..920b168 100644
--- a/pulsar-client-cpp/lib/ClientConfiguration.cc
+++ b/pulsar-client-cpp/lib/ClientConfiguration.cc
@@ -119,4 +119,13 @@ ClientConfiguration& ClientConfiguration::setStatsIntervalInSeconds(
 const unsigned int& ClientConfiguration::getStatsIntervalInSeconds() const {
     return impl_->statsIntervalInSeconds;
 }
+
+ClientConfiguration& ClientConfiguration::setPartititionsUpdateInterval(unsigned int intervalInSeconds) {
+    impl_->partitionsUpdateInterval = intervalInSeconds;
+    return *this;
+}
+
+unsigned int ClientConfiguration::getPartitionsUpdateInterval() const {
+    return impl_->partitionsUpdateInterval;
+}
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ClientConfigurationImpl.h b/pulsar-client-cpp/lib/ClientConfigurationImpl.h
index 60e4ae6..e2f72fb 100644
--- a/pulsar-client-cpp/lib/ClientConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ClientConfigurationImpl.h
@@ -36,6 +36,7 @@ struct ClientConfigurationImpl {
     unsigned int statsIntervalInSeconds;
     LoggerFactoryPtr loggerFactory;
     bool validateHostName;
+    unsigned int partitionsUpdateInterval;
 
     ClientConfigurationImpl()
         : authenticationPtr(AuthFactory::Disabled()),
@@ -48,7 +49,9 @@ struct ClientConfigurationImpl {
           tlsAllowInsecureConnection(false),
           statsIntervalInSeconds(600),  // 10 minutes
           loggerFactory(),
-          validateHostName(false) {}
+          validateHostName(false),
+          partitionsUpdateInterval(60)  // 1 minute
+    {}
 };
 }  // namespace pulsar
 
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 0241a54..89fe1b9 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -53,6 +53,12 @@ PartitionedConsumerImpl::PartitionedConsumerImpl(ClientImplPtr client, const std
     } else {
         unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
     }
+    auto partitionsUpdateInterval = static_cast<unsigned int>(client_->conf().getPartitionsUpdateInterval());
+    if (partitionsUpdateInterval > 0) {
+        partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer();
+        partitionsUpdateInterval_ = boost::posix_time::seconds(partitionsUpdateInterval);
+        lookupServicePtr_ = client_->getLookup();
+    }
 }
 
 PartitionedConsumerImpl::~PartitionedConsumerImpl() {}
@@ -70,6 +76,8 @@ Result PartitionedConsumerImpl::receive(Message& msg) {
         lock.unlock();
         return ResultAlreadyClosed;
     }
+    // See comments in `receive(Message&, int)`
+    lock.unlock();
 
     if (messageListener_) {
         LOG_ERROR("Can not receive when a listener has been set");
@@ -87,6 +95,10 @@ Result PartitionedConsumerImpl::receive(Message& msg, int timeout) {
         lock.unlock();
         return ResultAlreadyClosed;
     }
+    // We unlocked `mutex_` here to avoid starvation of methods which are trying to acquire `mutex_`.
+    // In addition, `messageListener_` won't change once constructed, `BlockingQueue::pop` and
+    // `UnAckedMessageTracker::add` are thread-safe, so they don't need `mutex_` to achieve thread-safety.
+    lock.unlock();
 
     if (messageListener_) {
         LOG_ERROR("Can not receive when a listener has been set");
@@ -162,14 +174,15 @@ void PartitionedConsumerImpl::handleUnsubscribeAsync(Result result, unsigned int
         callback(ResultUnknownError);
         return;
     }
-    assert(unsubscribedSoFar_ <= numPartitions_);
-    assert(consumerIndex <= numPartitions_);
+    const auto numPartitions = getNumPartitionsWithLock();
+    assert(unsubscribedSoFar_ <= numPartitions);
+    assert(consumerIndex <= numPartitions);
     // this means we have successfully closed this partition consumer and no unsubscribe has failed so far
     LOG_INFO("Successfully Unsubscribed Consumer - " << consumerIndex << " for Subscription - "
                                                      << subscriptionName_ << " for Topic - "
                                                      << topicName_->toString());
     unsubscribedSoFar_++;
-    if (unsubscribedSoFar_ == numPartitions_) {
+    if (unsubscribedSoFar_ == numPartitions) {
         LOG_DEBUG("Unsubscribed all of the partition consumer for subscription - " << subscriptionName_);
         setState(Closed);
         callback(ResultOk);
@@ -179,7 +192,11 @@ void PartitionedConsumerImpl::handleUnsubscribeAsync(Result result, unsigned int
 
 void PartitionedConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
     int32_t partition = msgId.partition();
-    assert(partition < numPartitions_ && partition >= 0 && consumers_.size() > partition);
+#ifndef NDEBUG
+    Lock consumersLock(consumersMutex_);
+    assert(partition < getNumPartitions() && partition >= 0 && consumers_.size() > partition);
+    consumersLock.unlock();
+#endif
     unAckedMessageTrackerPtr_->remove(msgId);
     consumers_[partition]->acknowledgeAsync(msgId, callback);
 }
@@ -194,35 +211,62 @@ void PartitionedConsumerImpl::negativeAcknowledge(const MessageId& msgId) {
     consumers_[partition]->negativeAcknowledge(msgId);
 }
 
-void PartitionedConsumerImpl::start() {
-    ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get();
-    std::shared_ptr<ConsumerImpl> consumer;
+unsigned int PartitionedConsumerImpl::getNumPartitions() const { return numPartitions_; }
+
+unsigned int PartitionedConsumerImpl::getNumPartitionsWithLock() const {
+    Lock consumersLock(consumersMutex_);
+    return getNumPartitions();
+}
+
+ConsumerConfiguration PartitionedConsumerImpl::getSinglePartitionConsumerConfig() const {
+    using namespace std::placeholders;
+
     ConsumerConfiguration config = conf_.clone();
     // all the partitioned-consumer belonging to one partitioned topic should have same name
     config.setConsumerName(conf_.getConsumerName());
     config.setConsumerType(conf_.getConsumerType());
     config.setBrokerConsumerStatsCacheTimeInMs(conf_.getBrokerConsumerStatsCacheTimeInMs());
-    config.setMessageListener(std::bind(&PartitionedConsumerImpl::messageReceived, shared_from_this(),
-                                        std::placeholders::_1, std::placeholders::_2));
+
+    const auto shared_this = const_cast<PartitionedConsumerImpl*>(this)->shared_from_this();
+    config.setMessageListener(std::bind(&PartitionedConsumerImpl::messageReceived, shared_this, _1, _2));
 
     // Apply total limit of receiver queue size across partitions
+    // NOTE: if it's called by handleGetPartitions(), the queue size of new internal consumers may be smaller
+    // than previous created internal consumers.
     config.setReceiverQueueSize(
         std::min(conf_.getReceiverQueueSize(),
-                 (int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions_)));
+                 (int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / getNumPartitions())));
+
+    return config;
+}
+
+ConsumerImplPtr PartitionedConsumerImpl::newInternalConsumer(unsigned int partition,
+                                                             const ConsumerConfiguration& config) const {
+    using namespace std::placeholders;
+
+    std::string topicPartitionName = topicName_->getTopicPartitionName(partition);
+    auto consumer = std::make_shared<ConsumerImpl>(client_, topicPartitionName, subscriptionName_, config,
+                                                   internalListenerExecutor_, Partitioned);
+
+    const auto shared_this = const_cast<PartitionedConsumerImpl*>(this)->shared_from_this();
+    consumer->getConsumerCreatedFuture().addListener(std::bind(
+        &PartitionedConsumerImpl::handleSinglePartitionConsumerCreated, shared_this, _1, _2, partition));
+    consumer->setPartitionIndex(partition);
+
+    LOG_DEBUG("Creating Consumer for single Partition - " << topicPartitionName << "SubName - "
+                                                          << subscriptionName_);
+    return consumer;
+}
+
+void PartitionedConsumerImpl::start() {
+    internalListenerExecutor_ = client_->getPartitionListenerExecutorProvider()->get();
+    const auto config = getSinglePartitionConsumerConfig();
 
     // create consumer on each partition
-    for (unsigned int i = 0; i < numPartitions_; i++) {
-        std::string topicPartitionName = topicName_->getTopicPartitionName(i);
-        consumer = std::make_shared<ConsumerImpl>(client_, topicPartitionName, subscriptionName_, config,
-                                                  internalListenerExecutor, Partitioned);
-        consumer->getConsumerCreatedFuture().addListener(
-            std::bind(&PartitionedConsumerImpl::handleSinglePartitionConsumerCreated, shared_from_this(),
-                      std::placeholders::_1, std::placeholders::_2, i));
-        consumer->setPartitionIndex(i);
-        consumers_.push_back(consumer);
-
-        LOG_DEBUG("Creating Consumer for single Partition - " << topicPartitionName << "SubName - "
-                                                              << subscriptionName_);
+    // Here we don't need `consumersMutex` to protect `consumers_`, because `consumers_` can only be increased
+    // when `state_` is Ready
+    for (unsigned int i = 0; i < getNumPartitions(); i++) {
+        consumers_.push_back(newInternalConsumer(i, config));
     }
     for (ConsumerList::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
          consumer++) {
@@ -238,7 +282,8 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerCreated(
         // one of the consumer creation failed, and we are cleaning up
         return;
     }
-    assert(numConsumersCreated_ < numPartitions_);
+    const auto numPartitions = getNumPartitionsWithLock();
+    assert(numConsumersCreated_ < numPartitions);
 
     if (result != ResultOk) {
         state_ = Failed;
@@ -250,13 +295,16 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerCreated(
         return;
     }
 
-    assert(partitionIndex < numPartitions_ && partitionIndex >= 0);
+    assert(partitionIndex < numPartitions && partitionIndex >= 0);
     numConsumersCreated_++;
-    if (numConsumersCreated_ == numPartitions_) {
+    if (numConsumersCreated_ == numPartitions) {
         LOG_INFO("Successfully Subscribed to Partitioned Topic - " << topicName_->toString() << " with - "
-                                                                   << numPartitions_ << " Partitions.");
+                                                                   << numPartitions << " Partitions.");
         state_ = Ready;
         lock.unlock();
+        if (partitionsUpdateTimer_) {
+            runPartitionUpdateTask();
+        }
         receiveMessages();
         partitionedConsumerCreatedPromise_.setValue(shared_from_this());
         return;
@@ -280,7 +328,7 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerClose(Result result,
         }
         return;
     }
-    assert(partitionIndex < numPartitions_ && partitionIndex >= 0);
+    assert(partitionIndex < getNumPartitionsWithLock() && partitionIndex >= 0);
     if (numConsumersCreated_ > 0) {
         numConsumersCreated_--;
     }
@@ -305,6 +353,8 @@ void PartitionedConsumerImpl::closeAsync(ResultCallback callback) {
     int consumerIndex = 0;
     unsigned int consumerAlreadyClosed = 0;
     // close successfully subscribed consumers
+    // Here we don't need `consumersMutex` to protect `consumers_`, because `consumers_` can only be increased
+    // when `state_` is Ready
     for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
         ConsumerImplPtr consumer = *i;
         if (!consumer->isClosed()) {
@@ -459,9 +509,10 @@ void PartitionedConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCal
         callback(ResultConsumerNotInitialized, BrokerConsumerStats());
         return;
     }
+    const auto numPartitions = getNumPartitionsWithLock();
     PartitionedBrokerConsumerStatsPtr statsPtr =
-        std::make_shared<PartitionedBrokerConsumerStatsImpl>(numPartitions_);
-    LatchPtr latchPtr = std::make_shared<Latch>(numPartitions_);
+        std::make_shared<PartitionedBrokerConsumerStatsImpl>(numPartitions);
+    LatchPtr latchPtr = std::make_shared<Latch>(numPartitions);
     ConsumerList consumerList = consumers_;
     lock.unlock();
     for (int i = 0; i < consumerList.size(); i++) {
@@ -498,4 +549,47 @@ void PartitionedConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callb
     callback(ResultOperationNotSupported);
 }
 
+void PartitionedConsumerImpl::runPartitionUpdateTask() {
+    partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
+    partitionsUpdateTimer_->async_wait(
+        std::bind(&PartitionedConsumerImpl::getPartitionMetadata, shared_from_this()));
+}
+
+void PartitionedConsumerImpl::getPartitionMetadata() {
+    using namespace std::placeholders;
+    lookupServicePtr_->getPartitionMetadataAsync(topicName_)
+        .addListener(std::bind(&PartitionedConsumerImpl::handleGetPartitions, shared_from_this(), _1, _2));
+}
+
+void PartitionedConsumerImpl::handleGetPartitions(Result result,
+                                                  const LookupDataResultPtr& lookupDataResult) {
+    Lock stateLock(mutex_);
+    if (state_ != Ready) {
+        return;
+    }
+
+    if (!result) {
+        const auto newNumPartitions = static_cast<unsigned int>(lookupDataResult->getPartitions());
+        Lock consumersLock(consumersMutex_);
+        const auto currentNumPartitions = getNumPartitions();
+        assert(currentNumPartitions == consumers_.size());
+        if (newNumPartitions > currentNumPartitions) {
+            LOG_INFO("new partition count: " << newNumPartitions);
+            numPartitions_ = newNumPartitions;
+            const auto config = getSinglePartitionConsumerConfig();
+            for (unsigned int i = currentNumPartitions; i < newNumPartitions; i++) {
+                auto consumer = newInternalConsumer(i, config);
+                consumer->start();
+                consumers_.push_back(consumer);
+            }
+            // `runPartitionUpdateTask()` will be called in `handleSinglePartitionConsumerCreated()`
+            return;
+        }
+    } else {
+        LOG_WARN("Failed to getPartitionMetadata: " << strResult(result));
+    }
+
+    runPartitionUpdateTask();
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
index fb4b047..02de7bc 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
@@ -84,6 +84,8 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
     const ConsumerConfiguration conf_;
     typedef std::vector<ConsumerImplPtr> ConsumerList;
     ConsumerList consumers_;
+    // consumersMutex_ is used to share consumers_ and numPartitions_
+    mutable std::mutex consumersMutex_;
     std::mutex mutex_;
     std::mutex pendingReceiveMutex_;
     PartitionedConsumerState state_;
@@ -94,7 +96,15 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
     const std::string topic_;
     const std::string name_;
     const std::string partitionStr_;
+    ExecutorServicePtr internalListenerExecutor_;
+    DeadlineTimerPtr partitionsUpdateTimer_;
+    boost::posix_time::time_duration partitionsUpdateInterval_;
+    LookupServicePtr lookupServicePtr_;
     /* methods */
+    unsigned int getNumPartitions() const;
+    unsigned int getNumPartitionsWithLock() const;
+    ConsumerConfiguration getSinglePartitionConsumerConfig() const;
+    ConsumerImplPtr newInternalConsumer(unsigned int partition, const ConsumerConfiguration& config) const;
     void setState(PartitionedConsumerState state);
     void handleUnsubscribeAsync(Result result, unsigned int consumerIndex, ResultCallback callback);
     void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
@@ -109,6 +119,9 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
     Promise<Result, ConsumerImplBaseWeakPtr> partitionedConsumerCreatedPromise_;
     UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
     std::queue<ReceiveCallback> pendingReceives_;
+    void runPartitionUpdateTask();
+    void getPartitionMetadata();
+    void handleGetPartitions(const Result result, const LookupDataResultPtr& lookupDataResult);
 };
 typedef std::weak_ptr<PartitionedConsumerImpl> PartitionedConsumerImplWeakPtr;
 typedef std::shared_ptr<PartitionedConsumerImpl> PartitionedConsumerImplPtr;
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index f79b5d3..0461ee3 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -50,6 +50,14 @@ PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client, const Top
         std::min(config.getMaxPendingMessages(),
                  (int)(config.getMaxPendingMessagesAcrossPartitions() / numPartitions));
     conf_.setMaxPendingMessages(maxPendingMessagesPerPartition);
+
+    auto partitionsUpdateInterval = static_cast<unsigned int>(client_->conf().getPartitionsUpdateInterval());
+    if (partitionsUpdateInterval > 0) {
+        listenerExecutor_ = client_->getListenerExecutorProvider()->get();
+        partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer();
+        partitionsUpdateInterval_ = boost::posix_time::seconds(partitionsUpdateInterval);
+        lookupServicePtr_ = client_->getLookup();
+    }
 }
 
 MessageRoutingPolicyPtr PartitionedProducerImpl::getMessageRouter() {
@@ -61,7 +69,7 @@ MessageRoutingPolicyPtr PartitionedProducerImpl::getMessageRouter() {
         case ProducerConfiguration::UseSinglePartition:
         default:
             unsigned int random = rand();
-            return std::make_shared<SinglePartitionMessageRouter>(random % topicMetadata_->getNumPartitions(),
+            return std::make_shared<SinglePartitionMessageRouter>(random % getNumPartitions(),
                                                                   conf_.getHashingScheme());
     }
 }
@@ -70,18 +78,34 @@ PartitionedProducerImpl::~PartitionedProducerImpl() {}
 // override
 const std::string& PartitionedProducerImpl::getTopic() const { return topic_; }
 
+unsigned int PartitionedProducerImpl::getNumPartitions() const {
+    return static_cast<unsigned int>(topicMetadata_->getNumPartitions());
+}
+
+unsigned int PartitionedProducerImpl::getNumPartitionsWithLock() const {
+    Lock lock(producersMutex_);
+    return getNumPartitions();
+}
+
+ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition) const {
+    using namespace std::placeholders;
+    std::string topicPartitionName = topicName_->getTopicPartitionName(partition);
+    auto producer = std::make_shared<ProducerImpl>(client_, topicPartitionName, conf_);
+    producer->getProducerCreatedFuture().addListener(
+        std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated,
+                  const_cast<PartitionedProducerImpl*>(this)->shared_from_this(), _1, _2, partition));
+
+    LOG_DEBUG("Creating Producer for single Partition - " << topicPartitionName);
+    return producer;
+}
+
 // override
 void PartitionedProducerImpl::start() {
-    std::shared_ptr<ProducerImpl> producer;
     // create producer per partition
-    for (unsigned int i = 0; i < topicMetadata_->getNumPartitions(); i++) {
-        std::string topicPartitionName = topicName_->getTopicPartitionName(i);
-        producer = std::make_shared<ProducerImpl>(client_, topicPartitionName, conf_);
-        producer->getProducerCreatedFuture().addListener(
-            std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated, shared_from_this(),
-                      std::placeholders::_1, std::placeholders::_2, i));
-        producers_.push_back(producer);
-        LOG_DEBUG("Creating Producer for single Partition - " << topicPartitionName);
+    // Here we don't need `producersMutex` to protect `producers_`, because `producers_` can only be increased
+    // when `state_` is Ready
+    for (unsigned int i = 0; i < getNumPartitions(); i++) {
+        producers_.push_back(newInternalProducer(i));
     }
 
     for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
@@ -100,7 +124,8 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result
         // Ignore, we have already informed client that producer creation failed
         return;
     }
-    assert(numProducersCreated_ <= topicMetadata_->getNumPartitions());
+    const auto numPartitions = getNumPartitionsWithLock();
+    assert(numProducersCreated_ <= numPartitions);
     if (result != ResultOk) {
         state_ = Failed;
         lock.unlock();
@@ -110,10 +135,14 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result
         return;
     }
 
-    assert(partitionIndex <= topicMetadata_->getNumPartitions());
+    assert(partitionIndex <= numPartitions);
     numProducersCreated_++;
-    if (numProducersCreated_ == topicMetadata_->getNumPartitions()) {
+    if (numProducersCreated_ == numPartitions) {
+        state_ = Ready;
         lock.unlock();
+        if (partitionsUpdateTimer_) {
+            runPartitionUpdateTask();
+        }
         partitionedProducerCreatedPromise_.setValue(shared_from_this());
     }
 }
@@ -121,8 +150,9 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result
 // override
 void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
     // get partition for this message from router policy
+    Lock producersLock(producersMutex_);
     short partition = (short)(routerPolicy_->getPartition(msg, *topicMetadata_));
-    if (partition >= topicMetadata_->getNumPartitions() || partition >= producers_.size()) {
+    if (partition >= getNumPartitions() || partition >= producers_.size()) {
         LOG_ERROR("Got Invalid Partition for message from Router Policy, Partition - " << partition);
         // change me: abort or notify failure in callback?
         //          change to appropriate error if callback
@@ -130,7 +160,8 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
         return;
     }
     // find a producer for that partition, index should start from 0
-    ProducerImplPtr& producer = producers_[partition];
+    ProducerImplPtr producer = producers_[partition];
+    producersLock.unlock();
     // send message on that partition
     producer->sendAsync(msg, callback);
 }
@@ -145,10 +176,12 @@ void PartitionedProducerImpl::setState(const PartitionedProducerState state) {
 }
 
 const std::string& PartitionedProducerImpl::getProducerName() const {
+    Lock producersLock(producersMutex_);
     return producers_[0]->getProducerName();
 }
 
 const std::string& PartitionedProducerImpl::getSchemaVersion() const {
+    Lock producersLock(producersMutex_);
     // Since the schema is atomically assigned on the partitioned-topic,
     // it's guaranteed that all the partitions will have the same schema version.
     return producers_[0]->getSchemaVersion();
@@ -156,6 +189,7 @@ const std::string& PartitionedProducerImpl::getSchemaVersion() const {
 
 int64_t PartitionedProducerImpl::getLastSequenceId() const {
     int64_t currentMax = -1L;
+    Lock producersLock(producersMutex_);
     for (int i = 0; i < producers_.size(); i++) {
         currentMax = std::max(currentMax, producers_[i]->getLastSequenceId());
     }
@@ -168,9 +202,13 @@ int64_t PartitionedProducerImpl::getLastSequenceId() const {
  * create one or many producers for partitions. So, we have to notify with ERROR on createProducerFailure
  */
 void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) {
+    setState(Closing);
+
     int producerIndex = 0;
     unsigned int producerAlreadyClosed = 0;
 
+    // Here we don't need `producersMutex` to protect `producers_`, because `producers_` can only be increased
+    // when `state_` is Ready
     for (ProducerList::const_iterator i = producers_.begin(); i != producers_.end(); i++) {
         ProducerImplPtr prod = *i;
         if (!prod->isClosed()) {
@@ -181,6 +219,7 @@ void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) {
             producerAlreadyClosed++;
         }
     }
+    const auto numProducers = producers_.size();
 
     /*
      * No need to set state since:-
@@ -191,7 +230,7 @@ void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) {
      * c. If closeAsync called due to failure in creating just one sub producer then state is set by
      * handleSinglePartitionProducerCreated
      */
-    if (producerAlreadyClosed == producers_.size() && closeCallback) {
+    if (producerAlreadyClosed == numProducers && closeCallback) {
         setState(Closed);
         closeCallback(ResultOk);
     }
@@ -214,7 +253,7 @@ void PartitionedProducerImpl::handleSinglePartitionProducerClose(Result result,
         }
         return;
     }
-    assert(partitionIndex < topicMetadata_->getNumPartitions());
+    assert(partitionIndex < getNumPartitionsWithLock());
     if (numProducersCreated_ > 0) {
         numProducersCreated_--;
     }
@@ -244,6 +283,7 @@ Future<Result, ProducerImplBaseWeakPtr> PartitionedProducerImpl::getProducerCrea
 bool PartitionedProducerImpl::isClosed() { return state_ == Closed; }
 
 void PartitionedProducerImpl::triggerFlush() {
+    Lock producersLock(producersMutex_);
     for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
         (*prod)->triggerFlush();
     }
@@ -267,9 +307,13 @@ void PartitionedProducerImpl::flushAsync(FlushCallback callback) {
         return;
     }
 
-    FlushCallback subFlushCallback = [this, callback](Result result) {
+    Lock producersLock(producersMutex_);
+    const int numProducers = static_cast<int>(producers_.size());
+    FlushCallback subFlushCallback = [this, callback, numProducers](Result result) {
+        // We shouldn't lock `producersMutex_` here because `subFlushCallback` may be called in
+        // `ProducerImpl::flushAsync`, and then deadlock occurs.
         int previous = flushedPartitions_.fetch_add(1);
-        if (previous == producers_.size() - 1) {
+        if (previous == numProducers - 1) {
             flushedPartitions_.store(0);
             flushPromise_->setValue(true);
             callback(result);
@@ -282,4 +326,47 @@ void PartitionedProducerImpl::flushAsync(FlushCallback callback) {
     }
 }
 
+void PartitionedProducerImpl::runPartitionUpdateTask() {
+    partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
+    partitionsUpdateTimer_->async_wait(
+        std::bind(&PartitionedProducerImpl::getPartitionMetadata, shared_from_this()));
+}
+
+void PartitionedProducerImpl::getPartitionMetadata() {
+    using namespace std::placeholders;
+    lookupServicePtr_->getPartitionMetadataAsync(topicName_)
+        .addListener(std::bind(&PartitionedProducerImpl::handleGetPartitions, shared_from_this(), _1, _2));
+}
+
+void PartitionedProducerImpl::handleGetPartitions(Result result,
+                                                  const LookupDataResultPtr& lookupDataResult) {
+    Lock stateLock(mutex_);
+    if (state_ != Ready) {
+        return;
+    }
+
+    if (!result) {
+        const auto newNumPartitions = static_cast<unsigned int>(lookupDataResult->getPartitions());
+        Lock producersLock(producersMutex_);
+        const auto currentNumPartitions = getNumPartitions();
+        assert(currentNumPartitions == producers_.size());
+        if (newNumPartitions > currentNumPartitions) {
+            LOG_INFO("new partition count: " << newNumPartitions);
+            topicMetadata_.reset(new TopicMetadataImpl(newNumPartitions));
+
+            for (unsigned int i = currentNumPartitions; i < newNumPartitions; i++) {
+                auto producer = newInternalProducer(i);
+                producer->start();
+                producers_.push_back(producer);
+            }
+            // `runPartitionUpdateTask()` will be called in `handleSinglePartitionProducerCreated()`
+            return;
+        }
+    } else {
+        LOG_WARN("Failed to getPartitionMetadata: " << strResult(result));
+    }
+
+    runPartitionUpdateTask();
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
index 72be5fe..e6b511d 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
@@ -107,6 +107,14 @@ class PartitionedProducerImpl : public ProducerImplBase,
 
     ProducerList producers_;
 
+    // producersMutex_ is used to share producers_ and topicMetadata_
+    mutable std::mutex producersMutex_;
+
+    unsigned int getNumPartitions() const;
+    unsigned int getNumPartitionsWithLock() const;
+
+    ProducerImplPtr newInternalProducer(unsigned int partition) const;
+
     MessageRoutingPolicyPtr routerPolicy_;
 
     // mutex_ is used to share state_, and numProducersCreated_
@@ -121,6 +129,15 @@ class PartitionedProducerImpl : public ProducerImplBase,
 
     std::atomic<int> flushedPartitions_;
     std::shared_ptr<Promise<Result, bool_type>> flushPromise_;
+
+    ExecutorServicePtr listenerExecutor_;
+    DeadlineTimerPtr partitionsUpdateTimer_;
+    boost::posix_time::time_duration partitionsUpdateInterval_;
+    LookupServicePtr lookupServicePtr_;
+
+    void runPartitionUpdateTask();
+    void getPartitionMetadata();
+    void handleGetPartitions(const Result result, const LookupDataResultPtr& partitionMetadata);
 };
 
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/tests/HttpHelper.cc b/pulsar-client-cpp/tests/HttpHelper.cc
index c3d7333..c4118e6 100644
--- a/pulsar-client-cpp/tests/HttpHelper.cc
+++ b/pulsar-client-cpp/tests/HttpHelper.cc
@@ -30,7 +30,9 @@ static int makeRequest(const std::string& method, const std::string& url, const
     curl_easy_setopt(curl, CURLOPT_HTTPHEADER, list);
     curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
     curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, method.c_str());
-    curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body.c_str());
+    if (!body.empty()) {
+        curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body.c_str());
+    }
     int res = curl_easy_perform(curl);
     curl_slist_free_all(list); /* free the list again */
 
@@ -49,3 +51,5 @@ int makePutRequest(const std::string& url, const std::string& body) { return mak
 int makePostRequest(const std::string& url, const std::string& body) {
     return makeRequest("POST", url, body);
 }
+
+int makeDeleteRequest(const std::string& url) { return makeRequest("DELETE", url, ""); }
diff --git a/pulsar-client-cpp/tests/HttpHelper.h b/pulsar-client-cpp/tests/HttpHelper.h
index 31abc2d..68119a7 100644
--- a/pulsar-client-cpp/tests/HttpHelper.h
+++ b/pulsar-client-cpp/tests/HttpHelper.h
@@ -23,5 +23,6 @@
 
 int makePutRequest(const std::string& url, const std::string& body);
 int makePostRequest(const std::string& url, const std::string& body);
+int makeDeleteRequest(const std::string& url);
 
 #endif /* end of include guard: HTTP_HELPER */
diff --git a/pulsar-client-cpp/tests/PartitionsUpdateTest.cc b/pulsar-client-cpp/tests/PartitionsUpdateTest.cc
new file mode 100644
index 0000000..e1bf68c
--- /dev/null
+++ b/pulsar-client-cpp/tests/PartitionsUpdateTest.cc
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+
+#include <set>
+#include <chrono>
+#include <thread>
+#include <memory>
+
+#include "HttpHelper.h"
+
+using namespace pulsar;
+
+static const std::string serviceUrl = "pulsar://localhost:6650";
+static const std::string adminUrl = "http://localhost:8080/";
+
+static const std::string topicNameSuffix = "public/default/partitions-update-test-topic";
+static const std::string topicName = "persistent://" + topicNameSuffix;
+static const std::string topicOperateUrl =
+    adminUrl + "admin/v2/persistent/" + topicNameSuffix + "/partitions";
+
+static ClientConfiguration newClientConfig(bool enablePartitionsUpdate) {
+    ClientConfiguration clientConfig;
+    if (enablePartitionsUpdate) {
+        clientConfig.setPartititionsUpdateInterval(1);  // 1s
+    } else {
+        clientConfig.setPartititionsUpdateInterval(0);  // disable
+    }
+    return clientConfig;
+}
+
+// In round robin routing mode, if N messages were sent to a topic with N partitions, each partition must have
+// received 1 message. So we check whether producer/consumer have increased along with partitions by checking
+// partitions' count of N messages.
+// Use std::set because it doesn't allow repeated elements.
+class PartitionsSet {
+   public:
+    size_t size() const { return names_.size(); }
+
+    Result initProducer(bool enablePartitionsUpdate) {
+        clientForProducer_.reset(new Client(serviceUrl, newClientConfig(enablePartitionsUpdate)));
+        const auto producerConfig =
+            ProducerConfiguration().setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+        return clientForProducer_->createProducer(topicName, producerConfig, producer_);
+    }
+
+    Result initConsumer(bool enablePartitionsUpdate) {
+        clientForConsumer_.reset(new Client(serviceUrl, newClientConfig(enablePartitionsUpdate)));
+        return clientForConsumer_->subscribe(topicName, "SubscriptionName", consumer_);
+    }
+
+    void close() {
+        producer_.close();
+        clientForProducer_->close();
+        consumer_.close();
+        clientForConsumer_->close();
+    }
+
+    void doSendAndReceive(int numMessagesSend, int numMessagesReceive) {
+        names_.clear();
+        for (int i = 0; i < numMessagesSend; i++) {
+            producer_.send(MessageBuilder().setContent("a").build());
+        }
+        while (numMessagesReceive > 0) {
+            Message msg;
+            if (consumer_.receive(msg, 100) == ResultOk) {
+                names_.emplace(msg.getTopicName());
+                consumer_.acknowledge(msg);
+                numMessagesReceive--;
+            }
+        }
+    }
+
+   private:
+    std::set<std::string> names_;
+
+    std::unique_ptr<Client> clientForProducer_;
+    Producer producer_;
+
+    std::unique_ptr<Client> clientForConsumer_;
+    Consumer consumer_;
+};
+
+static void waitForPartitionsUpdated() {
+    // Assume producer and consumer have updated partitions in 3 seconds if enabled
+    std::this_thread::sleep_for(std::chrono::seconds(3));
+}
+
+TEST(PartitionsUpdateTest, testConfigPartitionsUpdateInterval) {
+    ClientConfiguration clientConfig;
+    ASSERT_EQ(60, clientConfig.getPartitionsUpdateInterval());
+
+    clientConfig.setPartititionsUpdateInterval(0);
+    ASSERT_EQ(0, clientConfig.getPartitionsUpdateInterval());
+
+    clientConfig.setPartititionsUpdateInterval(1);
+    ASSERT_EQ(1, clientConfig.getPartitionsUpdateInterval());
+
+    clientConfig.setPartititionsUpdateInterval(-1);
+    ASSERT_EQ(static_cast<unsigned int>(-1), clientConfig.getPartitionsUpdateInterval());
+}
+
+TEST(PartitionsUpdateTest, testPartitionsUpdate) {
+    // Ensure `topicName` doesn't exist before created
+    makeDeleteRequest(topicOperateUrl);
+    // Create a 2 partitions topic
+    int res = makePutRequest(topicOperateUrl, "2");
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    PartitionsSet partitionsSet;
+
+    // 1. Both producer and consumer enable partitions update
+    ASSERT_EQ(ResultOk, partitionsSet.initProducer(true));
+    ASSERT_EQ(ResultOk, partitionsSet.initConsumer(true));
+
+    res = makePostRequest(topicOperateUrl, "3");  // update partitions to 3
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+    waitForPartitionsUpdated();
+
+    partitionsSet.doSendAndReceive(3, 3);
+    ASSERT_EQ(3, partitionsSet.size());
+    partitionsSet.close();
+
+    // 2. Only producer enables partitions update
+    ASSERT_EQ(ResultOk, partitionsSet.initProducer(true));
+    ASSERT_EQ(ResultOk, partitionsSet.initConsumer(false));
+
+    res = makePostRequest(topicOperateUrl, "5");  // update partitions to 5
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+    waitForPartitionsUpdated();
+
+    partitionsSet.doSendAndReceive(5, 3);  // can't consume partition-3,4
+    ASSERT_EQ(3, partitionsSet.size());
+    partitionsSet.close();
+
+    // 3. Only consumer enables partitions update
+    ASSERT_EQ(ResultOk, partitionsSet.initProducer(false));
+    ASSERT_EQ(ResultOk, partitionsSet.initConsumer(true));
+
+    res = makePostRequest(topicOperateUrl, "7");  // update partitions to 7
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+    waitForPartitionsUpdated();
+
+    partitionsSet.doSendAndReceive(7, 7);
+    ASSERT_EQ(5, partitionsSet.size());
+    partitionsSet.close();
+
+    // 4. Both producer and consumer disables partitions update
+    ASSERT_EQ(ResultOk, partitionsSet.initProducer(false));
+    ASSERT_EQ(ResultOk, partitionsSet.initConsumer(false));
+
+    res = makePostRequest(topicOperateUrl, "10");  // update partitions to 10
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+    waitForPartitionsUpdated();
+
+    partitionsSet.doSendAndReceive(10, 10);
+    ASSERT_EQ(7, partitionsSet.size());
+    partitionsSet.close();
+}


[pulsar] 01/17: [functions] Fix typos in exceptions related to functions (#6910)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7a71328a597fe04a942b7d105391bb30297534e6
Author: Sergii Zhevzhyk <vz...@users.noreply.github.com>
AuthorDate: Fri May 8 21:25:12 2020 +0200

    [functions] Fix typos in exceptions related to functions (#6910)
    
    Fix typos in exceptions related to functions. The tests were updated as well.
    (cherry picked from commit 20216d1cb80826ebb246f4ed75aa7187a1488f94)
---
 .../pulsar/functions/utils/FunctionConfigUtils.java      | 16 ++++++++++------
 .../apache/pulsar/functions/utils/SinkConfigUtils.java   |  4 ++--
 .../apache/pulsar/functions/utils/SourceConfigUtils.java |  3 +--
 .../pulsar/functions/utils/FunctionConfigUtilsTest.java  |  8 ++++----
 .../pulsar/functions/utils/SinkConfigUtilsTest.java      |  8 ++++----
 .../pulsar/functions/utils/SourceConfigUtilsTest.java    |  4 ++--
 6 files changed, 23 insertions(+), 20 deletions(-)

diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 4876a82..5062e2a 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -34,7 +34,11 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import java.io.File;
 import java.lang.reflect.Type;
 import java.net.MalformedURLException;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 
 import static org.apache.commons.lang.StringUtils.isNotBlank;
 import static org.apache.commons.lang.StringUtils.isNotEmpty;
@@ -425,7 +429,7 @@ public class FunctionConfigUtils {
                 // Need to make sure that one and only one of schema/serde is set
                 if (!isEmpty(conf.getSchemaType()) && !isEmpty(conf.getSerdeClassName())) {
                     throw new IllegalArgumentException(
-                            String.format("Only one of schemaType or serdeClassName should be set in inputSpec"));
+                        "Only one of schemaType or serdeClassName should be set in inputSpec");
                 }
                 if (!isEmpty(conf.getSerdeClassName())) {
                     ValidatorUtils.validateSerde(conf.getSerdeClassName(), typeArgs[0], clsLoader, true);
@@ -443,7 +447,7 @@ public class FunctionConfigUtils {
         // One and only one of outputSchemaType and outputSerdeClassName should be set
         if (!isEmpty(functionConfig.getOutputSerdeClassName()) && !isEmpty(functionConfig.getOutputSchemaType())) {
             throw new IllegalArgumentException(
-                    String.format("Only one of outputSchemaType or outputSerdeClassName should be set"));
+                "Only one of outputSchemaType or outputSerdeClassName should be set");
         }
 
         if (!isEmpty(functionConfig.getOutputSchemaType())) {
@@ -600,7 +604,7 @@ public class FunctionConfigUtils {
                 // receiver queue size should be >= 0
                 if (conf.getReceiverQueueSize() != null && conf.getReceiverQueueSize() < 0) {
                     throw new IllegalArgumentException(
-                            String.format("Receiver queue size should be >= zero"));
+                        "Receiver queue size should be >= zero");
                 }
             });
         }
@@ -737,10 +741,10 @@ public class FunctionConfigUtils {
             mergedConfig.setLogTopic(newConfig.getLogTopic());
         }
         if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees())) {
-            throw new IllegalArgumentException("Processing Guarantess cannot be altered");
+            throw new IllegalArgumentException("Processing Guarantees cannot be altered");
         }
         if (newConfig.getRetainOrdering() != null && !newConfig.getRetainOrdering().equals(existingConfig.getRetainOrdering())) {
-            throw new IllegalArgumentException("Retain Orderning cannot be altered");
+            throw new IllegalArgumentException("Retain Ordering cannot be altered");
         }
         if (!StringUtils.isEmpty(newConfig.getOutput())) {
             mergedConfig.setOutput(newConfig.getOutput());
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 8e9fd13..0c50265 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -546,7 +546,7 @@ public class SinkConfigUtils {
             });
         }
         if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees())) {
-            throw new IllegalArgumentException("Processing Guarantess cannot be altered");
+            throw new IllegalArgumentException("Processing Guarantees cannot be altered");
         }
         if (newConfig.getConfigs() != null) {
             mergedConfig.setConfigs(newConfig.getConfigs());
@@ -558,7 +558,7 @@ public class SinkConfigUtils {
             mergedConfig.setParallelism(newConfig.getParallelism());
         }
         if (newConfig.getRetainOrdering() != null && !newConfig.getRetainOrdering().equals(existingConfig.getRetainOrdering())) {
-            throw new IllegalArgumentException("Retain Orderning cannot be altered");
+            throw new IllegalArgumentException("Retain Ordering cannot be altered");
         }
         if (newConfig.getAutoAck() != null && !newConfig.getAutoAck().equals(existingConfig.getAutoAck())) {
             throw new IllegalArgumentException("AutoAck cannot be altered");
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index b50b828..c3d6d0a 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -86,7 +86,6 @@ public class SourceConfigUtils {
             functionDetailsBuilder.setProcessingGuarantees(
                     convertProcessingGuarantee(sourceConfig.getProcessingGuarantees()));
         }
-
         // set source spec
         Function.SourceSpec.Builder sourceSpecBuilder = Function.SourceSpec.newBuilder();
         if (sourceDetails.getSourceClassName() != null) {
@@ -366,7 +365,7 @@ public class SourceConfigUtils {
             mergedConfig.setSecrets(newConfig.getSecrets());
         }
         if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees())) {
-            throw new IllegalArgumentException("Processing Guarantess cannot be altered");
+            throw new IllegalArgumentException("Processing Guarantees cannot be altered");
         }
         if (newConfig.getParallelism() != null) {
             mergedConfig.setParallelism(newConfig.getParallelism());
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index e0f1cea..82608b0 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -215,18 +215,18 @@ public class FunctionConfigUtilsTest {
         assertTrue(mergedConfig.getCleanupSubscription());
     }
 
-    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be altered")
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantees cannot be altered")
     public void testMergeDifferentProcessingGuarantees() {
         FunctionConfig functionConfig = createFunctionConfig();
         FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("processingGuarantees", EFFECTIVELY_ONCE);
-        FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
     }
 
-    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Retain Orderning cannot be altered")
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Retain Ordering cannot be altered")
     public void testMergeDifferentRetainOrdering() {
         FunctionConfig functionConfig = createFunctionConfig();
         FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("retainOrdering", true);
-        FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
     }
 
     @Test
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index eb6ab39..47515d6 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -148,18 +148,18 @@ public class SinkConfigUtilsTest {
         assertEquals(mergedConfig.getInputSpecs().get("test-input"), newSinkConfig.getInputSpecs().get("test-input"));
     }
 
-    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be altered")
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantees cannot be altered")
     public void testMergeDifferentProcessingGuarantees() {
         SinkConfig sinkConfig = createSinkConfig();
         SinkConfig newSinkConfig = createUpdatedSinkConfig("processingGuarantees", EFFECTIVELY_ONCE);
-        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
+        SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
     }
 
-    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Retain Orderning cannot be altered")
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Retain Ordering cannot be altered")
     public void testMergeDifferentRetainOrdering() {
         SinkConfig sinkConfig = createSinkConfig();
         SinkConfig newSinkConfig = createUpdatedSinkConfig("retainOrdering", true);
-        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
+        SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
     }
 
     @Test
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index 2005902..3c6ee73 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -119,11 +119,11 @@ public class SourceConfigUtilsTest {
         );
     }
 
-    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be altered")
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantees cannot be altered")
     public void testMergeDifferentProcessingGuarantees() {
         SourceConfig sourceConfig = createSourceConfig();
         SourceConfig newSourceConfig = createUpdatedSourceConfig("processingGuarantees", EFFECTIVELY_ONCE);
-        SourceConfig mergedConfig = SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
+        SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
     }
 
     @Test


[pulsar] 17/17: fix autoSkipConf (#6863)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2dae2b5d2ea4eb689f8f181b9a5326398ce1687b
Author: pheecian <ph...@gmail.com>
AuthorDate: Tue May 12 20:56:13 2020 +0800

    fix autoSkipConf (#6863)
    
    Fixes #6841
    Co-authored-by: Xiaopeng Zhang <xi...@Xiaopengs-MacBook-Pro.local>
    Co-authored-by: Jia Zhai <zh...@apache.org>
    (cherry picked from commit 3198f4ffab051b237aa7692b1451cd933eb80f98)
---
 .../src/main/java/org/apache/pulsar/broker/service/BrokerService.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index ce040ed..bf10d5a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1049,7 +1049,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
             managedLedgerConfig.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
             managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
             managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
-
+            managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
             OffloadPolicies offloadPolicies = policies.map(p -> p.offload_policies).orElse(null);
 
             if (offloadPolicies == null) {


[pulsar] 06/17: Pulsar SQL Support Avro Schema `ByteBuffer` Type (#6925)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e7b8e3d5efcb2f7f84ff8f89960ffb40cc67d3ff
Author: ran <ga...@126.com>
AuthorDate: Mon May 11 15:23:38 2020 +0800

    Pulsar SQL Support Avro Schema `ByteBuffer` Type (#6925)
    
    Fixes #6749
    
    ### Motivation
    
    Currently, the Pulsar SQL couldn't support AvroSchema use the `ByteBuffer` as the field type. For example, use the POJO class as below.
    
    ```
    @Data
    public static class LogFile {
        int id;
        String name;
        ByteBuffer data;
    }
    
    Producer<LogFile> producer = pulsarClient.newProducer(Schema.AVRO(LogFile.class)).topic(topic).create();
    ```
    
    Error Log
    ```
    2020-05-08T23:34:47.079+0800	ERROR	SplitRunner-5-101	com.facebook.presto.execution.executor.TaskExecutor	Error processing Split 20200508_153445_00006_nxngm.1.0-1 PulsarSplit{splitId=1, connectorId='pulsar', originSchemaName='bytes-sql-test4', schemaName='public/default', tableName='bytes-sql-test4', splitSize=4, schema='{"type":"record","name":"LogFile","namespace":"com.ran.schema.KeyValueSchemaTest$","fields":[{"name":"id","type":"int"},{"name":"name","type":["null","string"],"defaul [...]
    java.lang.ClassCastException: java.nio.HeapByteBuffer cannot be cast to [B
    	at org.apache.pulsar.sql.presto.PulsarRecordCursor.getSlice(PulsarRecordCursor.java:516)
    	at com.facebook.presto.spi.RecordPageSource.getNextPage(RecordPageSource.java:117)
    	at com.facebook.presto.operator.TableScanOperator.getOutput(TableScanOperator.java:242)
    	at com.facebook.presto.operator.Driver.processInternal(Driver.java:373)
    	at com.facebook.presto.operator.Driver.lambda$processFor$8(Driver.java:282)
    	at com.facebook.presto.operator.Driver.tryWithLock(Driver.java:672)
    	at com.facebook.presto.operator.Driver.processFor(Driver.java:276)
    	at com.facebook.presto.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:973)
    	at com.facebook.presto.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:162)
    	at com.facebook.presto.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:477)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)
    ```
    
    ### Modifications
    
    When the presto field record type is `VarbinaryType.VARBINARY`, check the record type is `ByteBuffer`, `byte[]`, `ByteBuf` or others, and to process the field record by the type.
    
    * pulsar sql support avro schema `ByteBuffer` type
    
    * add ByteBuf check and unit tests.
    (cherry picked from commit 3aaed249f5c200431f5e8dacad2de2cbd64ee1ad)
---
 .../pulsar/sql/presto/PulsarRecordCursor.java      | 30 ++++++++++++-
 .../pulsar/sql/presto/TestPulsarRecordCursor.java  | 51 ++++++++++++++++++++++
 2 files changed, 80 insertions(+), 1 deletion(-)

diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 6899f97..6ac16b3 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -42,6 +42,7 @@ import io.airlift.slice.Slice;
 import io.airlift.slice.Slices;
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
@@ -513,12 +514,39 @@ public class PulsarRecordCursor implements RecordCursor {
         if (type == VarcharType.VARCHAR) {
             return Slices.utf8Slice(record.toString());
         } else if (type == VarbinaryType.VARBINARY) {
-            return Slices.wrappedBuffer((byte[]) record);
+            return Slices.wrappedBuffer(toBytes(record));
         } else {
             throw new PrestoException(NOT_SUPPORTED, "Unsupported type " + type);
         }
     }
 
+    private byte[] toBytes(Object record) {
+        if (record instanceof ByteBuffer) {
+            ByteBuffer byteBuffer = (ByteBuffer) record;
+            if (byteBuffer.hasArray()) {
+                return byteBuffer.array();
+            }
+            byte[] bytes = new byte[byteBuffer.position()];
+            byteBuffer.flip();
+            byteBuffer.get(bytes);
+            return bytes;
+        } else if (record instanceof ByteBuf) {
+            ByteBuf byteBuf = (ByteBuf) record;
+            if (byteBuf.hasArray()) {
+                return byteBuf.array();
+            }
+            byte[] bytes = new byte[byteBuf.readableBytes()];
+            byteBuf.readBytes(bytes);
+            return bytes;
+        } else {
+            try {
+                return (byte[]) record;
+            } catch (Exception e) {
+                throw new PrestoException(NOT_SUPPORTED, "Unsupported type " + record.getClass().getName());
+            }
+        }
+    }
+
     @Override
     public Object getObject(int field) {
         throw new UnsupportedOperationException();
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
index c81fc40..c2a3371 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
@@ -19,14 +19,24 @@
 package org.apache.pulsar.sql.presto;
 
 import io.airlift.log.Logger;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import org.apache.pulsar.common.naming.TopicName;
+import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 
 @Test(singleThreaded = true)
 public class TestPulsarRecordCursor extends TestPulsarConnector {
@@ -129,4 +139,45 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
             pulsarRecordCursor.close();
         }
     }
+
+    @Test
+    public void testRecordToBytes() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+        PulsarRecordCursor pulsarRecordCursor = Mockito.mock(PulsarRecordCursor.class);
+        Method method = PulsarRecordCursor.class.getDeclaredMethod("toBytes", Object.class);
+        method.setAccessible(true);
+
+        final String msg = "Hello!";
+
+        byte[] bytes = msg.getBytes();
+        Object obj = method.invoke(pulsarRecordCursor, bytes);
+        assertNotNull(obj);
+        assertEquals(new String((byte[]) obj), msg);
+
+        ByteBuffer byteBuffer1 = ByteBuffer.wrap(msg.getBytes());
+        assertTrue(byteBuffer1.hasArray());
+        obj = method.invoke(pulsarRecordCursor, byteBuffer1);
+        assertNotNull(obj);
+        assertEquals(new String((byte[]) obj), msg);
+
+        ByteBuffer byteBuffer2 = ByteBuffer.allocateDirect(msg.getBytes().length);
+        byteBuffer2.put(msg.getBytes());
+        assertFalse(byteBuffer2.hasArray());
+        obj = method.invoke(pulsarRecordCursor, byteBuffer2);
+        assertNotNull(obj);
+        assertEquals(new String((byte[]) obj), msg);
+
+        ByteBuf byteBuf1 = Unpooled.wrappedBuffer(msg.getBytes());
+        assertTrue(byteBuf1.hasArray());
+        obj = method.invoke(pulsarRecordCursor, byteBuf1);
+        assertNotNull(obj);
+        assertEquals(new String((byte[]) obj), msg);
+
+        ByteBuf byteBuf2 = Unpooled.directBuffer();
+        byteBuf2.writeBytes(msg.getBytes());
+        assertFalse(byteBuf2.hasArray());
+        obj = method.invoke(pulsarRecordCursor, byteBuf2);
+        assertNotNull(obj);
+        assertEquals(new String((byte[]) obj), msg);
+    }
+
 }


[pulsar] 03/17: Expose pulsar_out_bytes_total and pulsar_out_messages_total for namespace/subscription/consumer. (#6918)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9c5644b4b5bac05b8c3f82b7eda89ee750e5c725
Author: lipenghui <pe...@apache.org>
AuthorDate: Sat May 9 03:21:28 2020 +0800

    Expose pulsar_out_bytes_total and pulsar_out_messages_total for namespace/subscription/consumer. (#6918)
    
    Fixes #6891
    Rated to #5802
    
    Add pulsar_out_bytes_total and pulsar_out_messages_total for namespace/subscription/consumer.
    
    New unit test added.
    (cherry picked from commit 204f3271914f5fbdeb7bf0a7ff5a71e77b8cccbf)
---
 .../org/apache/pulsar/broker/service/Consumer.java |   9 ++
 .../nonpersistent/NonPersistentSubscription.java   |   2 +
 .../service/nonpersistent/NonPersistentTopic.java  |   2 +
 .../service/persistent/PersistentSubscription.java |   2 +
 .../broker/service/persistent/PersistentTopic.java |   2 +
 .../stats/prometheus/AggregatedConsumerStats.java  |   4 +
 .../stats/prometheus/AggregatedNamespaceStats.java |  10 ++
 .../prometheus/AggregatedSubscriptionStats.java    |   4 +
 .../stats/prometheus/NamespaceStatsAggregator.java |  77 +++++----
 .../pulsar/broker/stats/prometheus/TopicStats.java |   8 +
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 172 ++++++++++++++++++++-
 .../pulsar/common/policies/data/ConsumerStats.java |   8 +
 .../common/policies/data/SubscriptionStats.java    |  10 ++
 .../pulsar/common/policies/data/TopicStats.java    |  19 ++-
 site2/docs/reference-metrics.md                    |   2 +
 15 files changed, 294 insertions(+), 37 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index c5f5bbf..31227db 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -35,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.stream.Collectors;
 
 import org.apache.bookkeeper.mledger.Entry;
@@ -78,6 +79,8 @@ public class Consumer {
     private final String consumerName;
     private final Rate msgOut;
     private final Rate msgRedeliver;
+    private final LongAdder msgOutCounter;
+    private final LongAdder bytesOutCounter;
 
     private long lastConsumedTimestamp;
     private long lastAckedTimestamp;
@@ -129,6 +132,8 @@ public class Consumer {
         this.cnx = cnx;
         this.msgOut = new Rate();
         this.msgRedeliver = new Rate();
+        this.bytesOutCounter = new LongAdder();
+        this.msgOutCounter = new LongAdder();
         this.appId = appId;
         this.authenticationData = cnx.authenticationData;
         PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
@@ -222,6 +227,8 @@ public class Consumer {
         MESSAGE_PERMITS_UPDATER.addAndGet(this, -totalMessages);
         incrementUnackedMessages(totalMessages);
         msgOut.recordMultipleEvents(totalMessages, totalBytes);
+        msgOutCounter.add(totalMessages);
+        bytesOutCounter.add(totalBytes);
 
         ctx.channel().eventLoop().execute(() -> {
             for (int i = 0; i < entries.size(); i++) {
@@ -457,6 +464,8 @@ public class Consumer {
     }
 
     public ConsumerStats getStats() {
+        stats.msgOutCounter = msgOutCounter.longValue();
+        stats.bytesOutCounter = bytesOutCounter.longValue();
         stats.lastAckedTimestamp = lastAckedTimestamp;
         stats.lastConsumedTimestamp = lastConsumedTimestamp;
         stats.availablePermits = getAvailablePermits();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index f653ee5..316024a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -384,6 +384,8 @@ public class NonPersistentSubscription implements Subscription {
                 subStats.consumers.add(consumerStats);
                 subStats.msgRateOut += consumerStats.msgRateOut;
                 subStats.msgThroughputOut += consumerStats.msgThroughputOut;
+                subStats.bytesOutCounter += consumerStats.bytesOutCounter;
+                subStats.msgOutCounter += consumerStats.msgOutCounter;
                 subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
             });
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 332abfb..0a99a64 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -778,6 +778,8 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
 
             stats.msgRateOut += subStats.msgRateOut;
             stats.msgThroughputOut += subStats.msgThroughputOut;
+            stats.bytesOutCounter += subStats.bytesOutCounter;
+            stats.msgOutCounter += subStats.msgOutCounter;
             stats.getSubscriptions().put(name, subStats);
         });
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 5e6a213..25633f9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -946,6 +946,8 @@ public class PersistentSubscription implements Subscription {
                 subStats.consumers.add(consumerStats);
                 subStats.msgRateOut += consumerStats.msgRateOut;
                 subStats.msgThroughputOut += consumerStats.msgThroughputOut;
+                subStats.bytesOutCounter += consumerStats.bytesOutCounter;
+                subStats.msgOutCounter += consumerStats.msgOutCounter;
                 subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
                 subStats.unackedMessages += consumerStats.unackedMessages;
                 subStats.lastConsumedTimestamp = Math.max(subStats.lastConsumedTimestamp, consumerStats.lastConsumedTimestamp);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index b7470fa..28911bc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1510,6 +1510,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
             stats.msgRateOut += subStats.msgRateOut;
             stats.msgThroughputOut += subStats.msgThroughputOut;
+            stats.bytesOutCounter += subStats.bytesOutCounter;
+            stats.msgOutCounter += subStats.msgOutCounter;
             stats.subscriptions.put(name, subStats);
         });
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
index 0fadf3e..8b6bf7d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
@@ -31,4 +31,8 @@ public class AggregatedConsumerStats {
     public double msgThroughputOut;
 
     public long availablePermits;
+
+    long msgOutCounter;
+
+    long bytesOutCounter;
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index ea05ed0..1100523 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -34,6 +34,11 @@ public class AggregatedNamespaceStats {
     public double throughputIn;
     public double throughputOut;
 
+    public long bytesInCounter;
+    public long msgInCounter;
+    public long bytesOutCounter;
+    public long msgOutCounter;
+
     public long storageSize;
     public long msgBacklog;
     public long msgDelayed;
@@ -65,6 +70,11 @@ public class AggregatedNamespaceStats {
         throughputIn += stats.throughputIn;
         throughputOut += stats.throughputOut;
 
+        bytesInCounter += stats.bytesInCounter;
+        msgInCounter += stats.msgInCounter;
+        bytesOutCounter += stats.bytesOutCounter;
+        msgOutCounter += stats.msgOutCounter;
+
         storageSize += stats.storageSize;
         backlogSize += stats.backlogSize;
         offloadedStorageUsed += stats.offloadedStorageUsed;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
index d5b5353..1f1e879 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -41,5 +41,9 @@ public class AggregatedSubscriptionStats {
 
     public long msgDelayed;
 
+    long msgOutCounter;
+
+    long bytesOutCounter;
+
     public Map<Consumer, AggregatedConsumerStats> consumerStat = new HashMap<>();
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index a7b35b8..f032ea1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -27,6 +27,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
@@ -104,8 +105,11 @@ public class NamespaceStatsAggregator {
             stats.storageReadRate = mlStats.getReadEntriesRate();
         }
 
-        stats.msgInCounter = topic.getStats(getPreciseBacklog).msgInCounter;
-        stats.bytesInCounter = topic.getStats(getPreciseBacklog).bytesInCounter;
+        org.apache.pulsar.common.policies.data.TopicStats tStatus = topic.getStats(getPreciseBacklog);
+        stats.msgInCounter = tStatus.msgInCounter;
+        stats.bytesInCounter = tStatus.bytesInCounter;
+        stats.msgOutCounter = tStatus.msgOutCounter;
+        stats.bytesOutCounter = tStatus.bytesOutCounter;
 
         stats.producersCount = 0;
         topic.getProducers().values().forEach(producer -> {
@@ -123,43 +127,53 @@ public class NamespaceStatsAggregator {
             }
         });
 
-        topic.getSubscriptions().forEach((name, subscription) -> {
+        tStatus.subscriptions.forEach((subName, subscriptionStats) -> {
             stats.subscriptionsCount++;
-            stats.msgBacklog += subscription.getNumberOfEntriesInBacklog(getPreciseBacklog);
+            stats.msgBacklog += subscriptionStats.msgBacklog;
 
             AggregatedSubscriptionStats subsStats = stats.subscriptionStats
-                    .computeIfAbsent(name, k -> new AggregatedSubscriptionStats());
-            subsStats.msgBacklog = subscription.getNumberOfEntriesInBacklog(getPreciseBacklog);
-            subsStats.msgDelayed = subscription.getNumberOfEntriesDelayed();
+                    .computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
+            subsStats.msgBacklog = subscriptionStats.msgBacklog;
+            subsStats.msgDelayed = subscriptionStats.msgDelayed;
             subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
+            stats.rateOut += subsStats.msgRateOut;
+            stats.throughputOut += subsStats.msgThroughputOut;
+            subscriptionStats.consumers.forEach(cStats -> {
+                stats.consumersCount++;
+                subsStats.unackedMessages += cStats.unackedMessages;
+                subsStats.msgRateRedeliver += cStats.msgRateRedeliver;
+                subsStats.msgRateOut += cStats.msgRateOut;
+                subsStats.msgThroughputOut += cStats.msgThroughputOut;
+                subsStats.bytesOutCounter += cStats.bytesOutCounter;
+                subsStats.msgOutCounter += cStats.msgOutCounter;
+                if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) {
+                    subsStats.blockedSubscriptionOnUnackedMsgs = true;
+                }
+            });
+        });
 
-            subscription.getConsumers().forEach(consumer -> {
+        // Consumer stats can be a lot if a subscription has many consumers
+        if (includeConsumerMetrics) {
+            topic.getSubscriptions().forEach((name, subscription) -> {
+                AggregatedSubscriptionStats subsStats = stats.subscriptionStats
+                        .computeIfAbsent(name, k -> new AggregatedSubscriptionStats());
+                subscription.getConsumers().forEach(consumer -> {
+                    ConsumerStats conStats = consumer.getStats();
 
-                // Consumer stats can be a lot if a subscription has many consumers
-                if (includeConsumerMetrics) {
                     AggregatedConsumerStats consumerStats = subsStats.consumerStat
                             .computeIfAbsent(consumer, k -> new AggregatedConsumerStats());
-                    consumerStats.unackedMessages = consumer.getStats().unackedMessages;
-                    consumerStats.msgRateRedeliver = consumer.getStats().msgRateRedeliver;
-                    consumerStats.msgRateOut = consumer.getStats().msgRateOut;
-                    consumerStats.msgThroughputOut = consumer.getStats().msgThroughputOut;
-                    consumerStats.availablePermits = consumer.getStats().availablePermits;
-                    consumerStats.blockedSubscriptionOnUnackedMsgs = consumer.getStats().blockedConsumerOnUnackedMsgs;
-                }
-
-                subsStats.unackedMessages += consumer.getStats().unackedMessages;
-                subsStats.msgRateRedeliver += consumer.getStats().msgRateRedeliver;
-                subsStats.msgRateOut += consumer.getStats().msgRateOut;
-                subsStats.msgThroughputOut += consumer.getStats().msgThroughputOut;
-                if (!subsStats.blockedSubscriptionOnUnackedMsgs && consumer.getStats().blockedConsumerOnUnackedMsgs) {
-                    subsStats.blockedSubscriptionOnUnackedMsgs = true;
-                }
 
-                stats.consumersCount++;
-                stats.rateOut += consumer.getStats().msgRateOut;
-                stats.throughputOut += consumer.getStats().msgThroughputOut;
+                    consumerStats.unackedMessages = conStats.unackedMessages;
+                    consumerStats.msgRateRedeliver = conStats.msgRateRedeliver;
+                    consumerStats.msgRateOut = conStats.msgRateOut;
+                    consumerStats.msgThroughputOut = conStats.msgThroughputOut;
+                    consumerStats.bytesOutCounter = conStats.bytesOutCounter;
+                    consumerStats.msgOutCounter = conStats.msgOutCounter;
+                    consumerStats.availablePermits = conStats.availablePermits;
+                    consumerStats.blockedSubscriptionOnUnackedMsgs = conStats.blockedConsumerOnUnackedMsgs;
+                });
             });
-        });
+        }
 
         topic.getReplicators().forEach((cluster, replicator) -> {
             AggregatedReplicationStats aggReplStats = stats.replicationStats.computeIfAbsent(cluster,
@@ -206,6 +220,11 @@ public class NamespaceStatsAggregator {
         metric(stream, cluster, namespace, "pulsar_throughput_in", stats.throughputIn);
         metric(stream, cluster, namespace, "pulsar_throughput_out", stats.throughputOut);
 
+        metric(stream, cluster, namespace, "pulsar_in_bytes_total", stats.bytesInCounter);
+        metric(stream, cluster, namespace, "pulsar_in_messages_total", stats.msgInCounter);
+        metric(stream, cluster, namespace, "pulsar_out_bytes_total", stats.bytesOutCounter);
+        metric(stream, cluster, namespace, "pulsar_out_messages_total", stats.msgOutCounter);
+
         metric(stream, cluster, namespace, "pulsar_storage_size", stats.storageSize);
         metric(stream, cluster, namespace, "pulsar_storage_backlog_size", stats.backlogSize);
         metric(stream, cluster, namespace, "pulsar_storage_offloaded_size", stats.offloadedStorageUsed);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index caf0ce8..0510d34 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -36,6 +36,8 @@ class TopicStats {
     double throughputOut;
     long msgInCounter;
     long bytesInCounter;
+    long msgOutCounter;
+    long bytesOutCounter;
 
     long storageSize;
     public long msgBacklog;
@@ -67,6 +69,8 @@ class TopicStats {
         throughputOut = 0;
         bytesInCounter = 0;
         msgInCounter = 0;
+        bytesOutCounter = 0;
+        msgOutCounter = 0;
 
         storageSize = 0;
         msgBacklog = 0;
@@ -141,6 +145,8 @@ class TopicStats {
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_blocked_on_unacked_messages", subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out", subsStats.msgRateOut);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out", subsStats.msgThroughputOut);
+            metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total", subsStats.bytesOutCounter);
+            metric(stream, cluster, namespace, topic, n, "pulsar_out_messages_total", subsStats.msgOutCounter);
             subsStats.consumerStat.forEach((c, consumerStats) -> {
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver);
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_unacked_messages", consumerStats.unackedMessages);
@@ -148,6 +154,8 @@ class TopicStats {
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut);
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut);
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_available_permits", consumerStats.availablePermits);
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_out_bytes_total", consumerStats.bytesOutCounter);
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_out_messages_total", consumerStats.msgOutCounter);
             });
         });
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index e569caf..92abb5c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -34,9 +34,12 @@ import java.util.regex.Pattern;
 
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
 import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import com.google.common.base.MoreObjects;
@@ -45,13 +48,14 @@ import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 
 public class PrometheusMetricsTest extends BrokerTestBase {
-    @BeforeClass
+
+    @BeforeMethod
     @Override
     protected void setup() throws Exception {
         super.baseSetup();
     }
 
-    @AfterClass
+    @AfterMethod
     @Override
     protected void cleanup() throws Exception {
         super.internalCleanup();
@@ -61,12 +65,30 @@ public class PrometheusMetricsTest extends BrokerTestBase {
     public void testPerTopicStats() throws Exception {
         Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
         Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
-        for (int i = 0; i < 10; i++) {
+
+        Consumer<byte[]> c1 = pulsarClient.newConsumer()
+                .topic("persistent://my-property/use/my-ns/my-topic1")
+                .subscriptionName("test")
+                .subscribe();
+
+        Consumer<byte[]> c2 = pulsarClient.newConsumer()
+                .topic("persistent://my-property/use/my-ns/my-topic2")
+                .subscriptionName("test")
+                .subscribe();
+
+        final int messages = 10;
+
+        for (int i = 0; i < messages; i++) {
             String message = "my-message-" + i;
             p1.send(message.getBytes());
             p2.send(message.getBytes());
         }
 
+        for (int i = 0; i < messages; i++) {
+            c1.acknowledge(c1.receive());
+            c2.acknowledge(c2.receive());
+        }
+
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsar, true, false, statsOut);
         String metricsStr = new String(statsOut.toByteArray());
@@ -109,20 +131,58 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
         assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
 
+        cm = (List<Metric>) metrics.get("pulsar_out_bytes_total");
+        assertEquals(cm.size(), 2);
+        assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
+        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(0).tags.get("subscription"), "test");
+        assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+        assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(1).tags.get("subscription"), "test");
+
+        cm = (List<Metric>) metrics.get("pulsar_out_messages_total");
+        assertEquals(cm.size(), 2);
+        assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
+        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(0).tags.get("subscription"), "test");
+        assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+        assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(1).tags.get("subscription"), "test");
+
         p1.close();
         p2.close();
+        c1.close();
+        c2.close();
     }
 
     @Test
     public void testPerNamespaceStats() throws Exception {
         Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
         Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
-        for (int i = 0; i < 10; i++) {
+
+        Consumer<byte[]> c1 = pulsarClient.newConsumer()
+                .topic("persistent://my-property/use/my-ns/my-topic1")
+                .subscriptionName("test")
+                .subscribe();
+
+        Consumer<byte[]> c2 = pulsarClient.newConsumer()
+                .topic("persistent://my-property/use/my-ns/my-topic2")
+                .subscriptionName("test")
+                .subscribe();
+
+        final int messages = 10;
+
+        for (int i = 0; i < messages; i++) {
             String message = "my-message-" + i;
             p1.send(message.getBytes());
             p2.send(message.getBytes());
         }
 
+        for (int i = 0; i < messages; i++) {
+            c1.acknowledge(c1.receive());
+            c2.acknowledge(c2.receive());
+        }
+
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsar, false, false, statsOut);
         String metricsStr = new String(statsOut.toByteArray());
@@ -141,12 +201,114 @@ public class PrometheusMetricsTest extends BrokerTestBase {
 
         cm = (List<Metric>) metrics.get("pulsar_producers_count");
         assertEquals(cm.size(), 2);
-        assertEquals(cm.get(1).value, 2.0);
         assertNull(cm.get(1).tags.get("topic"));
         assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
 
+        cm = (List<Metric>) metrics.get("pulsar_in_bytes_total");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+
+        cm = (List<Metric>) metrics.get("pulsar_in_messages_total");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+
+        cm = (List<Metric>) metrics.get("pulsar_out_bytes_total");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+
+        cm = (List<Metric>) metrics.get("pulsar_out_messages_total");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+
+        p1.close();
+        p2.close();
+        c1.close();
+        c2.close();
+    }
+
+    @Test
+    public void testPerConsumerStats() throws Exception {
+        Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
+        Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
+
+        Consumer<byte[]> c1 = pulsarClient.newConsumer()
+                .topic("persistent://my-property/use/my-ns/my-topic1")
+                .subscriptionName("test")
+                .subscribe();
+
+        Consumer<byte[]> c2 = pulsarClient.newConsumer()
+                .topic("persistent://my-property/use/my-ns/my-topic2")
+                .subscriptionName("test")
+                .subscribe();
+
+        final int messages = 10;
+
+        for (int i = 0; i < messages; i++) {
+            String message = "my-message-" + i;
+            p1.send(message.getBytes());
+            p2.send(message.getBytes());
+        }
+
+        for (int i = 0; i < messages; i++) {
+            c1.acknowledge(c1.receive());
+            c2.acknowledge(c2.receive());
+        }
+
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, true, true, statsOut);
+        String metricsStr = new String(statsOut.toByteArray());
+
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+
+        metrics.entries().forEach(e -> {
+            System.out.println(e.getKey() + ": " + e.getValue());
+        });
+
+        // There should be 1 metric aggregated per namespace
+        List<Metric> cm = (List<Metric>) metrics.get("pulsar_out_bytes_total");
+        assertEquals(cm.size(), 4);
+        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
+        assertEquals(cm.get(0).tags.get("subscription"), "test");
+
+        assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
+        assertEquals(cm.get(1).tags.get("subscription"), "test");
+        assertEquals(cm.get(1).tags.get("consumer_id"), "1");
+
+        assertEquals(cm.get(2).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(2).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+        assertEquals(cm.get(2).tags.get("subscription"), "test");
+
+        assertEquals(cm.get(3).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(3).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+        assertEquals(cm.get(3).tags.get("subscription"), "test");
+        assertEquals(cm.get(3).tags.get("consumer_id"), "0");
+
+        cm = (List<Metric>) metrics.get("pulsar_out_messages_total");
+        assertEquals(cm.size(), 4);
+        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
+        assertEquals(cm.get(0).tags.get("subscription"), "test");
+
+        assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
+        assertEquals(cm.get(1).tags.get("subscription"), "test");
+        assertEquals(cm.get(1).tags.get("consumer_id"), "1");
+
+        assertEquals(cm.get(2).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(2).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+        assertEquals(cm.get(2).tags.get("subscription"), "test");
+
+        assertEquals(cm.get(3).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(3).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+        assertEquals(cm.get(3).tags.get("subscription"), "test");
+        assertEquals(cm.get(3).tags.get("consumer_id"), "0");
+
         p1.close();
         p2.close();
+        c1.close();
+        c2.close();
     }
 
     /** Checks for duplicate type definitions for a metric in the Prometheus metrics output. If the Prometheus parser
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
index 7411f03..0ecb944 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
@@ -32,6 +32,12 @@ public class ConsumerStats {
     /** Total throughput delivered to the consumer (bytes/s). */
     public double msgThroughputOut;
 
+    /** Total bytes delivered to consumer (bytes). */
+    public long bytesOutCounter;
+
+    /** Total messages delivered to consumer (msg). */
+    public long msgOutCounter;
+
     /** Total rate of messages redelivered by this consumer (msg/s). */
     public double msgRateRedeliver;
 
@@ -75,6 +81,8 @@ public class ConsumerStats {
         checkNotNull(stats);
         this.msgRateOut += stats.msgRateOut;
         this.msgThroughputOut += stats.msgThroughputOut;
+        this.bytesOutCounter += stats.bytesOutCounter;
+        this.msgOutCounter += stats.msgOutCounter;
         this.msgRateRedeliver += stats.msgRateRedeliver;
         this.availablePermits += stats.availablePermits;
         this.unackedMessages += stats.unackedMessages;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index 7064883..df8fc72 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -34,6 +34,12 @@ public class SubscriptionStats {
     /** Total throughput delivered on this subscription (bytes/s). */
     public double msgThroughputOut;
 
+    /** Total bytes delivered to consumer (bytes). */
+    public long bytesOutCounter;
+
+    /** Total messages delivered to consumer (msg). */
+    public long msgOutCounter;
+
     /** Total rate of messages redelivered on this subscription (msg/s). */
     public double msgRateRedeliver;
 
@@ -86,6 +92,8 @@ public class SubscriptionStats {
     public void reset() {
         msgRateOut = 0;
         msgThroughputOut = 0;
+        bytesOutCounter = 0;
+        msgOutCounter = 0;
         msgRateRedeliver = 0;
         msgBacklog = 0;
         msgBacklogNoDelayed = 0;
@@ -101,6 +109,8 @@ public class SubscriptionStats {
         checkNotNull(stats);
         this.msgRateOut += stats.msgRateOut;
         this.msgThroughputOut += stats.msgThroughputOut;
+        this.bytesOutCounter += stats.bytesOutCounter;
+        this.msgOutCounter += stats.msgOutCounter;
         this.msgRateRedeliver += stats.msgRateRedeliver;
         this.msgBacklog += stats.msgBacklog;
         this.msgBacklogNoDelayed += stats.msgBacklogNoDelayed;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
index 9962fa9..be471e3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
@@ -43,6 +43,18 @@ public class TopicStats {
     /** Total throughput of messages dispatched for the topic (byte/s). */
     public double msgThroughputOut;
 
+    /** Total bytes published to the topic (bytes). */
+    public long bytesInCounter;
+
+    /** Total messages published to the topic (msg). */
+    public long msgInCounter;
+
+    /** Total bytes delivered to consumer (bytes). */
+    public long bytesOutCounter;
+
+    /** Total messages delivered to consumer (msg). */
+    public long msgOutCounter;
+
     /** Average size of published messages (bytes). */
     public double averageMsgSize;
 
@@ -63,9 +75,6 @@ public class TopicStats {
 
     public String deduplicationStatus;
 
-    public long bytesInCounter;
-    public long msgInCounter;
-
     public TopicStats() {
         this.publishers = Lists.newArrayList();
         this.subscriptions = Maps.newHashMap();
@@ -83,6 +92,8 @@ public class TopicStats {
         this.backlogSize = 0;
         this.bytesInCounter = 0;
         this.msgInCounter = 0;
+        this.bytesOutCounter = 0;
+        this.msgOutCounter = 0;
         this.publishers.clear();
         this.subscriptions.clear();
         this.replication.clear();
@@ -100,6 +111,8 @@ public class TopicStats {
         this.msgThroughputOut += stats.msgThroughputOut;
         this.bytesInCounter += stats.bytesInCounter;
         this.msgInCounter += stats.msgInCounter;
+        this.bytesOutCounter += stats.bytesOutCounter;
+        this.msgOutCounter += stats.msgOutCounter;
         double newAverageMsgSize = (this.averageMsgSize * (this.count - 1) + stats.averageMsgSize) / this.count;
         this.averageMsgSize = newAverageMsgSize;
         this.storageSize += stats.storageSize;
diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md
index 3983dcd..b57406c 100644
--- a/site2/docs/reference-metrics.md
+++ b/site2/docs/reference-metrics.md
@@ -174,6 +174,8 @@ All the topic metrics are labelled with the following labels:
 | pulsar_entry_size_le_* | Histogram | The entry rate of a topic that the entry size is smaller with a given threshold.<br> Available thresholds: <br><ul><li>pulsar_entry_size_le_128: <= 128 bytes </li><li>pulsar_entry_size_le_512: <= 512 bytes</li><li>pulsar_entry_size_le_1_kb: <= 1 KB</li><li>pulsar_entry_size_le_2_kb: <= 2 KB</li><li>pulsar_entry_size_le_4_kb: <= 4 KB</li><li>pulsar_entry_size_le_16_kb: <= 16 KB</li><li>pulsar_entry_size_le_100_kb: <= 100 KB</li><li>pulsar_entry_size_ [...]
 | pulsar_in_bytes_total | Counter | The total number of bytes received for this topic |
 | pulsar_producers_count | Counter | The total number of messages received for this topic |
+| pulsar_out_bytes_total | Counter | The total number of bytes read from this topic |
+| pulsar_out_messages_total | Counter | The total number of messages read from this topic |
 
 #### Replication metrics
 


[pulsar] 12/17: Fix pulsar client admin thread number explode (#6940)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 71e78783b0f7241603d0518b6ac0f368ef624f60
Author: hangc0276 <ha...@163.com>
AuthorDate: Tue May 12 14:45:26 2020 +0800

    Fix pulsar client admin thread number explode (#6940)
    
    (cherry picked from commit 90dba138c79b5e8a9d5c576ec75c31e24f778914)
---
 .../pulsar/client/admin/internal/http/AsyncHttpConnector.java       | 1 +
 .../client/admin/internal/http/AsyncHttpConnectorProvider.java      | 6 +++++-
 2 files changed, 6 insertions(+), 1 deletion(-)

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index 42e9c49..000746d 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -93,6 +93,7 @@ public class AsyncHttpConnector implements Connector {
         confBuilder.setReadTimeout(readTimeoutMs);
         confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()));
         confBuilder.setRequestTimeout(requestTimeoutMs);
+        confBuilder.setIoThreadsCount(conf.getNumIoThreads());
         confBuilder.setKeepAliveStrategy(new DefaultKeepAliveStrategy() {
             @Override
             public boolean keepAlive(Request ahcRequest, HttpRequest request, HttpResponse response) {
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
index 3ea4757..6c58ce9 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
@@ -28,6 +28,7 @@ import org.glassfish.jersey.client.spi.ConnectorProvider;
 public class AsyncHttpConnectorProvider implements ConnectorProvider {
 
     private final ClientConfigurationData conf;
+    private Connector connector;
 
     public AsyncHttpConnectorProvider(ClientConfigurationData conf) {
         this.conf = conf;
@@ -35,7 +36,10 @@ public class AsyncHttpConnectorProvider implements ConnectorProvider {
 
     @Override
     public Connector getConnector(Client client, Configuration runtimeConfig) {
-        return new AsyncHttpConnector(client, conf);
+        if (connector == null) {
+            connector = new AsyncHttpConnector(client, conf);
+        }
+        return connector;
     }