You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 10:41:09 UTC

[pulsar] branch branch-2.6 updated (375fc00 -> a1f56e3)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 375fc00  fix typo
     new fcea0f4  Fixes #7512 handle exception when policy is updated. (#7514)
     new 460abac  Ensure the create subscription can be completed when the operation timeout happens (#7522)
     new af654c4  Fix: function BC issue introduced in 2.6 (#7528)
     new 602735e  [Issue 7489] Remove timestamp from metrics (#7539)
     new 56ea8a8  [PROTOBUF] Fix protobuf generation on handling repeated long number … (#7540)
     new df3cd42  Add more logging to the auth operations on failure (#7567)
     new bcb90d2  [CPP] Fix segment crashes that caused by race condition of timer in cpp client (#7572)
     new 7df08ca  Improve security setting of Pulsar Functions (#7578)
     new 56a0d69  Use Consume/Produce/Lookup interfaces for specific operations in allowTopicOperation (#7587)
     new f58aea8  [C++] Fix multitopic consumer segfault on connect error (#7588)
     new ad6edd5  Fix race condition on close consumer while reconnect to broker. (#7589)
     new 45d2c8f  fix validation never return false (#7593)
     new b246da9  fix the command for starting bookies in the foreground (#7596)
     new d4a1ca5  Support configuring DeleteInactiveTopic setting in namespace policy (#7598)
     new 8268665  [oauth2 cpp] add support to read credentials from file (#7606)
     new 0825f86  Make OAuth2 auth plugin to use AsyncHttpClient (#7615)
     new 53ee16d  fix NPE when using advertisedListeners (#7620)
     new 180e5f2  fix batchReceiveAsync not completed exceptionally when closing Consumer (#7661)
     new 12cca8a  Fix backward compatibility issues with batch index acknowledgment. (#7655)
     new 304924c  fix:apache#7669 stats recorder time unit error (#7670)
     new 0be16ea  Fix batch index filter issue in Consumer. (#7654)
     new 50493d0  [docs] Fix wrong required properties for HDFS2 sink (#7643)
     new 85c5979  [pulsar-perf] Supports `tlsAllowInsecureConnection` in pulsar-perf produce/consume/read. (#7300)
     new 1842f7c  Support to set listener name for client cli (#7621)
     new a1f56e3  [doc] add cpp client document for oauth2 authentication

The 25 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/ci-go-functions-style.yaml       |   8 +-
 .github/workflows/ci-go-functions-test.yaml        |   6 +-
 pom.xml                                            |   2 +
 .../authorization/AuthorizationProvider.java       |  29 +-
 .../validator/MultipleListenerValidator.java       |   8 +-
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  48 +-
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  38 +-
 .../pulsar/broker/namespace/NamespaceService.java  |   6 +-
 .../pulsar/broker/service/AbstractTopic.java       |  28 +-
 .../pulsar/broker/service/BrokerService.java       |   8 +-
 .../org/apache/pulsar/broker/service/Topic.java    |   2 +-
 .../service/nonpersistent/NonPersistentTopic.java  |  23 +-
 .../broker/service/persistent/PersistentTopic.java |  23 +-
 .../broker/service/persistent/SystemTopic.java     |   2 +-
 .../pulsar/broker/service/BrokerTestBase.java      |   7 +-
 .../broker/service/InactiveTopicDeleteTest.java    | 165 ++++++
 .../service/PersistentTopicConcurrentTest.java     |   4 +-
 .../client/api/SimpleProducerConsumerTest.java     |  74 +++
 .../client/impl/BatchMessageIndexAckTest.java      |  26 +-
 .../pulsar/client/impl/TopicsConsumerImplTest.java |  57 ++
 ...entials_file.json => cpp_credentials_file.json} |   0
 .../org/apache/pulsar/client/admin/Namespaces.java |  48 ++
 .../client/admin/internal/NamespacesImpl.java      |  81 +++
 .../apache/pulsar/client/api/ConsumerBuilder.java  |   6 +
 pulsar-client-cpp/lib/ClientConnection.cc          |  40 +-
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   |  13 +-
 pulsar-client-cpp/lib/auth/AuthOauth2.cc           |  45 +-
 pulsar-client-cpp/lib/auth/AuthOauth2.h            |   2 +
 pulsar-client-cpp/tests/AuthPluginTest.cc          |  19 +
 pulsar-client-cpp/tests/BasicEndToEndTest.cc       |  13 +
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  14 +-
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  73 ++-
 .../apache/pulsar/client/cli/PulsarClientTool.java |   6 +
 .../apache/pulsar/client/impl/ConsumerBase.java    |  26 +-
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |   6 +
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  60 +-
 .../client/impl/MultiTopicsConsumerImpl.java       |  25 +-
 .../client/impl/ProducerStatsRecorderImpl.java     |   8 +-
 .../impl/auth/oauth2/AuthenticationOAuth2.java     |   6 +-
 .../impl/auth/oauth2/ClientCredentialsFlow.java    |   5 +-
 .../pulsar/client/impl/auth/oauth2/Flow.java       |   4 +-
 .../protocol/ClientCredentialsExchanger.java       |   7 +-
 .../impl/auth/oauth2/protocol/TokenClient.java     | 143 +++--
 .../impl/conf/ConsumerConfigurationData.java       |   2 +
 .../apache/pulsar/common/api/proto/PulsarApi.java  |  18 +
 ...eryPolicies.java => InactiveTopicPolicies.java} |  13 +-
 .../pulsar/common/policies/data/Policies.java      |   6 +-
 .../pulsar/common/policies/data/PolicyName.java    |   1 +
 .../util/protobuf/ByteBufCodedInputStream.java     |  30 +
 .../apache/pulsar/common/api/proto/TestApi.java    | 641 +++++++++++++++++++++
 .../common/protocol/RepeatedLongNonPackedTest.java |  65 +++
 .../common/protocol/RepeatedLongPackedTest.java    |  65 +++
 .../src/test/proto/TestApi.proto                   |  23 +-
 pulsar-function-go/pf/stats.go                     |  15 +-
 .../instance/stats/ComponentStatsManager.java      |   3 +-
 .../instance/stats/FunctionStatsManager.java       |  11 +-
 .../functions/instance/stats/SinkStatsManager.java |  11 +-
 .../instance/stats/SourceStatsManager.java         |  11 +-
 .../instance/src/main/python/function_stats.py     |  16 +-
 .../functions/worker/FunctionWorkerStarter.java    |   7 +-
 .../pulsar/functions/worker/WorkerUtils.java       |  14 +-
 .../pulsar/testclient/PerformanceConsumer.java     |  22 +-
 .../pulsar/testclient/PerformanceProducer.java     |  25 +-
 .../pulsar/testclient/PerformanceReader.java       |  20 +
 site2/docs/client-libraries-java.md                |   4 +-
 site2/docs/deploy-bare-metal.md                    |   2 +-
 site2/docs/functions-worker.md                     |  20 +-
 site2/docs/io-hdfs2-sink.md                        |   6 +-
 site2/docs/reference-cli-tools.md                  |   8 +-
 .../docs/{security-oauth.md => security-oauth2.md} |  30 +-
 site2/website/sidebars.json                        |   2 +-
 ...e2_3.java => PulsarStandaloneTestSuite2_5.java} |   4 +-
 .../backwardscompatibility/SmokeTest2_2.java       |   4 +
 .../backwardscompatibility/SmokeTest2_3.java       |   4 +
 .../backwardscompatibility/SmokeTest2_4.java       |   4 +
 .../{SmokeTest2_3.java => SmokeTest2_5.java}       |   6 +-
 .../integration/containers/PulsarContainer.java    |   1 +
 .../integration/topologies/PulsarTestBase.java     |  46 ++
 78 files changed, 2072 insertions(+), 302 deletions(-)
 copy pulsar-broker/src/test/resources/authentication/token/{credentials_file.json => cpp_credentials_file.json} (100%)
 copy pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/{DelayedDeliveryPolicies.java => InactiveTopicPolicies.java} (81%)
 create mode 100644 pulsar-common/src/test/java/org/apache/pulsar/common/api/proto/TestApi.java
 create mode 100644 pulsar-common/src/test/java/org/apache/pulsar/common/protocol/RepeatedLongNonPackedTest.java
 create mode 100644 pulsar-common/src/test/java/org/apache/pulsar/common/protocol/RepeatedLongPackedTest.java
 copy pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/Category.java => pulsar-common/src/test/proto/TestApi.proto (68%)
 rename site2/docs/{security-oauth.md => security-oauth2.md} (81%)
 copy tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/{PulsarStandaloneTestSuite2_3.java => PulsarStandaloneTestSuite2_5.java} (91%)
 copy tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/{SmokeTest2_3.java => SmokeTest2_5.java} (83%)


[pulsar] 15/25: [oauth2 cpp] add support to read credentials from file (#7606)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8268665f78c2266725836dc0aa12c7f260a34950
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Tue Jul 28 23:49:20 2020 +0800

    [oauth2 cpp] add support to read credentials from file (#7606)
    
    ### Motivation
    
    Add support to read credentials from file, make it align with java client.
    
    ### Modifications
    
    - Add support to read credentials from file, make it align with java client.
    - add a test for it.
    
    Co-authored-by: xiaolong.ran <rx...@apache.org>
    (cherry picked from commit 42d17c85fc9460ec0aa678cf3e3663de2aa61524)
---
 .../authentication/token/cpp_credentials_file.json |  4 ++
 pulsar-client-cpp/lib/auth/AuthOauth2.cc           | 45 +++++++++++++++++++++-
 pulsar-client-cpp/lib/auth/AuthOauth2.h            |  2 +
 pulsar-client-cpp/tests/AuthPluginTest.cc          | 19 +++++++++
 4 files changed, 68 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/test/resources/authentication/token/cpp_credentials_file.json b/pulsar-broker/src/test/resources/authentication/token/cpp_credentials_file.json
new file mode 100644
index 0000000..db1eccd
--- /dev/null
+++ b/pulsar-broker/src/test/resources/authentication/token/cpp_credentials_file.json
@@ -0,0 +1,4 @@
+{
+  "client_id":"Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x",
+  "client_secret":"rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ-g07ZH52N_poGAb"
+}
diff --git a/pulsar-client-cpp/lib/auth/AuthOauth2.cc b/pulsar-client-cpp/lib/auth/AuthOauth2.cc
index 3104c4d..72cdf97 100644
--- a/pulsar-client-cpp/lib/auth/AuthOauth2.cc
+++ b/pulsar-client-cpp/lib/auth/AuthOauth2.cc
@@ -119,6 +119,12 @@ Oauth2Flow::Oauth2Flow() {}
 Oauth2Flow::~Oauth2Flow() {}
 
 // ClientCredentialFlow
+static std::string readFromFile(const std::string& credentialsFilePath) {
+    std::ifstream input(credentialsFilePath);
+    std::stringstream buffer;
+    buffer << input.rdbuf();
+    return buffer.str();
+}
 
 ClientCredentialFlow::ClientCredentialFlow(const std::string& issuerUrl, const std::string& clientId,
                                            const std::string& clientSecret, const std::string& audience) {
@@ -128,6 +134,33 @@ ClientCredentialFlow::ClientCredentialFlow(const std::string& issuerUrl, const s
     audience_ = audience;
 }
 
+// read clientId/clientSecret from passed in `credentialsFilePath`
+ClientCredentialFlow::ClientCredentialFlow(const std::string& issuerUrl,
+                                           const std::string& credentialsFilePath,
+                                           const std::string& audience) {
+    issuerUrl_ = issuerUrl;
+    audience_ = audience;
+
+    boost::property_tree::ptree loadPtreeRoot;
+    try {
+        boost::property_tree::read_json(credentialsFilePath, loadPtreeRoot);
+    } catch (boost::property_tree::json_parser_error& e) {
+        LOG_ERROR("Failed to parse json input file for credentialsFilePath: " << credentialsFilePath
+                                                                              << "with error:" << e.what());
+        return;
+    }
+
+    const std::string defaultNotFoundString = "Client Id / Secret Not Found";
+
+    clientId_ = loadPtreeRoot.get<std::string>("client_id", defaultNotFoundString);
+    clientSecret_ = loadPtreeRoot.get<std::string>("client_secret", defaultNotFoundString);
+
+    if (clientId_ == defaultNotFoundString || clientSecret_ == defaultNotFoundString) {
+        LOG_ERROR("Not get valid clientId / clientSecret: " << clientId_ << "/" << clientSecret_);
+        return;
+    }
+}
+
 void ClientCredentialFlow::initialize() {}
 void ClientCredentialFlow::close() {}
 
@@ -225,8 +258,16 @@ Oauth2TokenResultPtr ClientCredentialFlow::authenticate() {
 // AuthOauth2
 
 AuthOauth2::AuthOauth2(ParamMap& params) {
-    flowPtr_ = FlowPtr(new ClientCredentialFlow(params["issuer_url"], params["client_id"],
-                                                params["client_secret"], params["audience"]));
+    std::map<std::string, std::string>::iterator it;
+    it = params.find("private_key");
+
+    if (it != params.end()) {
+        flowPtr_ = FlowPtr(
+            new ClientCredentialFlow(params["issuer_url"], params["private_key"], params["audience"]));
+    } else {
+        flowPtr_ = FlowPtr(new ClientCredentialFlow(params["issuer_url"], params["client_id"],
+                                                    params["client_secret"], params["audience"]));
+    }
 }
 
 AuthOauth2::~AuthOauth2() {}
diff --git a/pulsar-client-cpp/lib/auth/AuthOauth2.h b/pulsar-client-cpp/lib/auth/AuthOauth2.h
index 0090976..874dc39 100644
--- a/pulsar-client-cpp/lib/auth/AuthOauth2.h
+++ b/pulsar-client-cpp/lib/auth/AuthOauth2.h
@@ -33,6 +33,8 @@ class ClientCredentialFlow : public Oauth2Flow {
    public:
     ClientCredentialFlow(const std::string& issuerUrl, const std::string& clientId,
                          const std::string& clientSecret, const std::string& audience);
+    ClientCredentialFlow(const std::string& issuerUrl, const std::string& credentialsFilePath,
+                         const std::string& audience);
     void initialize();
     Oauth2TokenResultPtr authenticate();
     void close();
diff --git a/pulsar-client-cpp/tests/AuthPluginTest.cc b/pulsar-client-cpp/tests/AuthPluginTest.cc
index 183c880..6430fd8 100644
--- a/pulsar-client-cpp/tests/AuthPluginTest.cc
+++ b/pulsar-client-cpp/tests/AuthPluginTest.cc
@@ -381,3 +381,22 @@ TEST(AuthPluginTest, testOauth2WrongSecret) {
         // expected
     }
 }
+
+TEST(AuthPluginTest, testOauth2CredentialFile) {
+    // test success get token from oauth2 server.
+    pulsar::AuthenticationDataPtr data;
+    std::string params = R"({
+        "type": "client_credentials",
+        "issuer_url": "https://dev-kt-aa9ne.us.auth0.com/oauth/token",
+        "private_key": "../../pulsar-broker/src/test/resources/authentication/token/cpp_credentials_file.json",
+        "audience": "https://dev-kt-aa9ne.us.auth0.com/api/v2/"})";
+
+    int expectedTokenLength = 3379;
+    LOG_INFO("PARAMS: " << params);
+    pulsar::AuthenticationPtr auth = pulsar::AuthOauth2::create(params);
+    ASSERT_EQ(auth->getAuthMethodName(), "token");
+    ASSERT_EQ(auth->getAuthData(data), pulsar::ResultOk);
+    ASSERT_EQ(data->hasDataForHttp(), true);
+    ASSERT_EQ(data->hasDataFromCommand(), true);
+    ASSERT_EQ(data->getCommandData().length(), expectedTokenLength);
+}


[pulsar] 10/25: [C++] Fix multitopic consumer segfault on connect error (#7588)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f58aea85607f8be011412e646dbe6b4c8732a0e3
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Jul 24 10:08:24 2020 -0700

    [C++] Fix multitopic consumer segfault on connect error (#7588)
    
    
    (cherry picked from commit 09caa92565f2a0eb862becabde6ff2028fc47da1)
---
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 13 ++++++++-----
 pulsar-client-cpp/tests/BasicEndToEndTest.cc     | 13 +++++++++++++
 2 files changed, 21 insertions(+), 5 deletions(-)

diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index b9b4635..85a9868 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -104,8 +104,7 @@ void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c
         } else {
             LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result);
             // unsubscribed all of the successfully subscribed partitioned consumers
-            ResultCallback nullCallbackForCleanup = NULL;
-            closeAsync(nullCallbackForCleanup);
+            closeAsync(nullptr);
             multiTopicsConsumerCreatedPromise_.setFailed(result);
             return;
         }
@@ -372,17 +371,21 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
     if (state_ == Closing || state_ == Closed) {
         LOG_ERROR("TopicsConsumer already closed "
                   << " topic" << topic_ << " consumer - " << consumerStr_);
-        callback(ResultAlreadyClosed);
+        if (callback) {
+            callback(ResultAlreadyClosed);
+        }
         return;
     }
 
     setState(Closing);
 
     if (consumers_.empty()) {
-        LOG_ERROR("TopicsConsumer have no consumers to close "
+        LOG_DEBUG("TopicsConsumer have no consumers to close "
                   << " topic" << topic_ << " subscription - " << subscriptionName_);
         setState(Closed);
-        callback(ResultAlreadyClosed);
+        if (callback) {
+            callback(ResultAlreadyClosed);
+        }
         return;
     }
 
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 564aba2..cc71a35 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -1820,6 +1820,19 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerTopicNameInvalid) {
     client.shutdown();
 }
 
+TEST(BasicEndToEndTest, testMultiTopicsConsumerConnectError) {
+    Client client("pulsar://invalid-hostname:6650");
+    std::vector<std::string> topicNames;
+    topicNames.push_back("topic-1");
+    topicNames.push_back("topic-2");
+
+    Consumer consumer;
+    Result res = client.subscribe(topicNames, "sub", consumer);
+    ASSERT_EQ(ResultConnectError, res);
+
+    client.shutdown();
+}
+
 TEST(BasicEndToEndTest, testMultiTopicsConsumerDifferentNamespace) {
     Client client(lookupUrl);
     std::vector<std::string> topicNames;


[pulsar] 01/25: Fixes #7512 handle exception when policy is updated. (#7514)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fcea0f4122090921bfc69df52d0e334d567d9337
Author: mikecaat <35...@users.noreply.github.com>
AuthorDate: Sun Jul 12 00:51:26 2020 +0900

    Fixes #7512 handle exception when policy is updated. (#7514)
    
    
    (cherry picked from commit a55a405c794741ec10094ddd174fccf0cb4840c7)
---
 .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index ca7fbd4..2b2e14b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1810,7 +1810,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             }
         });
         replicators.forEach((name, replicator) ->
-            replicator.getRateLimiter().get().onPoliciesUpdate(data)
+            replicator.getRateLimiter().ifPresent(rateLimiter -> rateLimiter.onPoliciesUpdate(data))
         );
         checkMessageExpiry();
         CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();


[pulsar] 23/25: [pulsar-perf] Supports `tlsAllowInsecureConnection` in pulsar-perf produce/consume/read. (#7300)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 85c5979dc079ac285767050664c25b39577943d5
Author: Yang Yang <yy...@streamnative.io>
AuthorDate: Thu Jul 2 12:58:00 2020 +0800

    [pulsar-perf] Supports `tlsAllowInsecureConnection` in pulsar-perf produce/consume/read. (#7300)
    
    ### Motivation
    
    Add support of `tlsAllowInsecureConnection` config to the command-line tool `pulsar-perf`, to support `produce/consume/read` performance tests to clusters with insecure tls connections.
    
    ### Modifications
    
    Parse option `tlsAllowInsecureConnection` from program arguments or the config file when executing `produce/consume/read` performance tests.
    
    (cherry picked from commit 9b178c8160cb2d0819ad09c758c3a27adafaf17f)
---
 .../apache/pulsar/testclient/PerformanceConsumer.java  | 15 ++++++++++++++-
 .../apache/pulsar/testclient/PerformanceProducer.java  | 18 +++++++++++++++---
 .../apache/pulsar/testclient/PerformanceReader.java    | 13 +++++++++++++
 site2/docs/reference-cli-tools.md                      |  4 +++-
 4 files changed, 45 insertions(+), 5 deletions(-)

diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 61b027d..196c557 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -141,6 +141,10 @@ public class PerformanceConsumer {
                 "--trust-cert-file" }, description = "Path for the trusted TLS certificate file")
         public String tlsTrustCertsFilePath = "";
 
+        @Parameter(names = {
+                "--tls-allow-insecure" }, description = "Allow insecure TLS connection")
+        public Boolean tlsAllowInsecureConnection = null;
+
         @Parameter(names = { "-k", "--encryption-key-name" }, description = "The private key name to decrypt payload")
         public String encKeyName = null;
 
@@ -205,6 +209,11 @@ public class PerformanceConsumer {
             if (isBlank(arguments.tlsTrustCertsFilePath)) {
                 arguments.tlsTrustCertsFilePath = prop.getProperty("tlsTrustCertsFilePath", "");
             }
+
+            if (arguments.tlsAllowInsecureConnection == null) {
+                arguments.tlsAllowInsecureConnection = Boolean.parseBoolean(prop
+                        .getProperty("tlsAllowInsecureConnection", ""));
+            }
         }
 
         // Dump config variables
@@ -254,6 +263,10 @@ public class PerformanceConsumer {
             clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
         }
 
+        if (arguments.tlsAllowInsecureConnection != null) {
+            clientBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection);
+        }
+
         PulsarClient pulsarClient = clientBuilder.build();
 
         class EncKeyReader implements CryptoKeyReader {
@@ -390,4 +403,4 @@ public class PerformanceConsumer {
     }
 
     private static final Logger log = LoggerFactory.getLogger(PerformanceConsumer.class);
-}
\ No newline at end of file
+}
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 9e9d0e8..d24b9b9 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -164,7 +164,7 @@ public class PerformanceProducer {
         @Parameter(names = { "-b",
                 "--batch-time-window" }, description = "Batch messages in 'x' ms window (Default: 1ms)")
         public double batchTimeMillis = 1.0;
-        
+
         @Parameter(names = {
             "-bm", "--batch-max-messages"
         }, description = "Maximum number of messages per batch")
@@ -186,6 +186,10 @@ public class PerformanceProducer {
                 "--trust-cert-file" }, description = "Path for the trusted TLS certificate file")
         public String tlsTrustCertsFilePath = "";
 
+        @Parameter(names = {
+                "--tls-allow-insecure" }, description = "Allow insecure TLS connection")
+        public Boolean tlsAllowInsecureConnection = null;
+
         @Parameter(names = { "-k", "--encryption-key-name" }, description = "The public key name to encrypt payload")
         public String encKeyName = null;
 
@@ -196,7 +200,7 @@ public class PerformanceProducer {
         @Parameter(names = { "-d",
                 "--delay" }, description = "Mark messages with a given delay in seconds")
         public long delay = 0;
-        
+
         @Parameter(names = { "-ef",
                 "--exit-on-failure" }, description = "Exit from the process on publish failure (default: disable)")
         public boolean exitOnFailure = false;
@@ -281,6 +285,10 @@ public class PerformanceProducer {
             if (isBlank(arguments.tlsTrustCertsFilePath)) {
                arguments.tlsTrustCertsFilePath = prop.getProperty("tlsTrustCertsFilePath", "");
             }
+            if (arguments.tlsAllowInsecureConnection == null) {
+                arguments.tlsAllowInsecureConnection = Boolean.parseBoolean(prop
+                        .getProperty("tlsAllowInsecureConnection", ""));
+            }
         }
 
         // Dump config variables
@@ -416,6 +424,10 @@ public class PerformanceProducer {
                 clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
             }
 
+            if (arguments.tlsAllowInsecureConnection != null) {
+                clientBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection);
+            }
+
             client = clientBuilder.build();
             ProducerBuilder<byte[]> producerBuilder = client.newProducer() //
                     .sendTimeout(0, TimeUnit.SECONDS) //
@@ -587,4 +599,4 @@ public class PerformanceProducer {
     static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
     static final DecimalFormat totalFormat = new DecimalFormat("0.000");
     private static final Logger log = LoggerFactory.getLogger(PerformanceProducer.class);
-}
\ No newline at end of file
+}
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index 9e5480e..218ea30 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -107,6 +107,10 @@ public class PerformanceReader {
                 "--trust-cert-file" }, description = "Path for the trusted TLS certificate file")
         public String tlsTrustCertsFilePath = "";
 
+        @Parameter(names = {
+                "--tls-allow-insecure" }, description = "Allow insecure TLS connection")
+        public Boolean tlsAllowInsecureConnection = null;
+
         @Parameter(names = { "-time",
                 "--test-duration" }, description = "Test duration in secs. If 0, it will keep consuming")
         public long testTime = 0;
@@ -168,6 +172,11 @@ public class PerformanceReader {
             if (isBlank(arguments.tlsTrustCertsFilePath)) {
                 arguments.tlsTrustCertsFilePath = prop.getProperty("tlsTrustCertsFilePath", "");
             }
+
+            if (arguments.tlsAllowInsecureConnection == null) {
+                arguments.tlsAllowInsecureConnection = Boolean.parseBoolean(prop
+                        .getProperty("tlsAllowInsecureConnection", ""));
+            }
         }
 
         // Dump config variables
@@ -207,6 +216,10 @@ public class PerformanceReader {
             clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
         }
 
+        if (arguments.tlsAllowInsecureConnection != null) {
+            clientBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection);
+        }
+
         PulsarClient pulsarClient = clientBuilder.build();
 
         List<CompletableFuture<Reader<byte[]>>> futures = Lists.newArrayList();
diff --git a/site2/docs/reference-cli-tools.md b/site2/docs/reference-cli-tools.md
index 9a87c8c..eb3a989 100644
--- a/site2/docs/reference-cli-tools.md
+++ b/site2/docs/reference-cli-tools.md
@@ -431,6 +431,7 @@ Options
 |`-st`, `--subscription-type`|Subscriber type. Possible values are Exclusive, Shared, Failover, Key_Shared.|Exclusive|
 |`-sp`, `--subscription-position`|Subscriber position. Possible values are Latest, Earliest.|Latest|
 |`--trust-cert-file`|Path for the trusted TLS certificate file||
+|`--tls-allow-insecure`|Allow insecure TLS connection||
 
 
 ### `produce`
@@ -468,6 +469,7 @@ Options
 |`-time`, `--test-duration`|Test duration in secs. If set to 0, it will keep publishing.|0|
 |`--trust-cert-file`|Path for the trusted TLS certificate file||
 |`--warmup-time`|Warm-up time in seconds|1|
+|`--tls-allow-insecure`|Allow insecure TLS connection||
 
 
 ### `read`
@@ -494,7 +496,7 @@ Options
 |`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled.|0|
 |`--trust-cert-file`|Path for the trusted TLS certificate file||
 |`--use-tls`|Use TLS encryption on the connection|false|
-
+|`--tls-allow-insecure`|Allow insecure TLS connection||
 
 ### `websocket-producer`
 Run a websocket producer


[pulsar] 16/25: Make OAuth2 auth plugin to use AsyncHttpClient (#7615)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0825f86ada22cdd6429add175fed9b04410213ec
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Jul 24 14:49:16 2020 -0700

    Make OAuth2 auth plugin to use AsyncHttpClient (#7615)
    
    
    (cherry picked from commit 396ecebe3e2bdb88b1599535a8d477f7a198f062)
---
 .../impl/auth/oauth2/AuthenticationOAuth2.java     |   6 +-
 .../impl/auth/oauth2/ClientCredentialsFlow.java    |   5 +-
 .../pulsar/client/impl/auth/oauth2/Flow.java       |   4 +-
 .../protocol/ClientCredentialsExchanger.java       |   7 +-
 .../impl/auth/oauth2/protocol/TokenClient.java     | 143 ++++++++++-----------
 5 files changed, 81 insertions(+), 84 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
index f7f41d0..be48efe 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
@@ -107,7 +107,11 @@ public class AuthenticationOAuth2 implements Authentication, EncodedAuthenticati
 
     @Override
     public void close() throws IOException {
-        flow.close();
+        try {
+            flow.close();
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
     }
 
     @Data
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
index 13bf0f5..a7b30ab 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
@@ -34,7 +34,6 @@ import java.nio.charset.StandardCharsets;
 import java.util.Map;
 import lombok.Builder;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.http.entity.ContentType;
 import org.apache.pulsar.client.api.PulsarClientException;
 
 /**
@@ -98,7 +97,7 @@ class ClientCredentialsFlow extends FlowBase {
     }
 
     @Override
-    public void close() {
+    public void close() throws Exception {
         exchanger.close();
     }
 
@@ -130,7 +129,7 @@ class ClientCredentialsFlow extends FlowBase {
 
             String protocol = urlConnection.getURL().getProtocol();
             String contentType = urlConnection.getContentType();
-            if ("data".equals(protocol) && !ContentType.APPLICATION_JSON.getMimeType().equals(contentType)) {
+            if ("data".equals(protocol) && !"application/json".equals(contentType)) {
                 throw new IllegalArgumentException(
                         "Unsupported media type or encoding format: " + urlConnection.getContentType());
             }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java
index b572325..0e7b864 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java
@@ -25,7 +25,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 /**
  * An OAuth 2.0 authorization flow.
  */
-interface Flow extends Serializable {
+interface Flow extends Serializable, AutoCloseable {
 
     /**
      * Initializes the authorization flow.
@@ -43,5 +43,5 @@ interface Flow extends Serializable {
     /**
      * Closes the authorization flow.
      */
-    void close();
+    void close() throws Exception;
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java
index e6a956a..7d004c7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 /**
  * An interface for exchanging client credentials for an access token.
  */
-public interface ClientCredentialsExchanger {
+public interface ClientCredentialsExchanger extends AutoCloseable {
     /**
      * Requests an exchange of client credentials for an access token.
      * @param req the request details.
@@ -33,9 +33,4 @@ public interface ClientCredentialsExchanger {
      */
     TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest req)
             throws TokenExchangeException, IOException;
-
-    /**
-     * Closes the exchanger.
-     */
-    void close();
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
index 715579d..0718073 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
@@ -18,54 +18,49 @@
  */
 package org.apache.pulsar.client.impl.auth.oauth2.protocol;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectReader;
 import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.net.HttpURLConnection;
+import java.io.UnsupportedEncodingException;
 import java.net.URL;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.http.Consts;
-import org.apache.http.HttpEntity;
-import org.apache.http.NameValuePair;
-import org.apache.http.StatusLine;
-import org.apache.http.client.ClientProtocolException;
-import org.apache.http.client.HttpResponseException;
-import org.apache.http.client.entity.UrlEncodedFormEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.ContentType;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.message.BasicNameValuePair;
-import org.apache.http.util.EntityUtils;
+import java.net.URLEncoder;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.AsyncHttpClientConfig;
+import org.asynchttpclient.DefaultAsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.asynchttpclient.Response;
 
 /**
  * A client for an OAuth 2.0 token endpoint.
  */
-public class TokenClient implements AutoCloseable, ClientCredentialsExchanger {
-
-    private static final ObjectReader resultReader;
-    private static final ObjectReader errorReader;
+public class TokenClient implements ClientCredentialsExchanger {
 
-    static {
-        resultReader = new ObjectMapper().readerFor(TokenResult.class);
-        errorReader = new ObjectMapper().readerFor(TokenError.class);
-    }
+    protected final static int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10;
+    protected final static int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30;
 
     private final URL tokenUrl;
-    private final CloseableHttpClient httpclient;
+    private final AsyncHttpClient httpClient;
 
     public TokenClient(URL tokenUrl) {
         this.tokenUrl = tokenUrl;
-        this.httpclient = HttpClientBuilder.create().useSystemProperties().disableCookieManagement().build();
+
+        DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
+        confBuilder.setFollowRedirect(true);
+        confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000);
+        confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000);
+        confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()));
+        AsyncHttpClientConfig config = confBuilder.build();
+        httpClient = new DefaultAsyncHttpClient(config);
     }
 
-    public void close() {
+    @Override
+    public void close() throws Exception {
+        httpClient.close();
     }
 
     /**
@@ -76,46 +71,50 @@ public class TokenClient implements AutoCloseable, ClientCredentialsExchanger {
      */
     public TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest req)
             throws TokenExchangeException, IOException {
-        List<NameValuePair> params = new ArrayList<>(4);
-        params.add(new BasicNameValuePair("grant_type", "client_credentials"));
-        params.add(new BasicNameValuePair("client_id", req.getClientId()));
-        params.add(new BasicNameValuePair("client_secret", req.getClientSecret()));
-        params.add(new BasicNameValuePair("audience", req.getAudience()));
-        HttpPost post = new HttpPost(tokenUrl.toString());
-        post.setHeader("Accept", ContentType.APPLICATION_JSON.getMimeType());
-        post.setEntity(new UrlEncodedFormEntity(params, Consts.UTF_8));
-
-        try (CloseableHttpResponse response = httpclient.execute(post)) {
-            StatusLine status = response.getStatusLine();
-            HttpEntity entity = response.getEntity();
-            try {
-                switch(status.getStatusCode()) {
-                    case HttpURLConnection.HTTP_OK:
-                        return readResponse(entity, resultReader);
-                    case HttpURLConnection.HTTP_BAD_REQUEST:
-                    case HttpURLConnection.HTTP_UNAUTHORIZED:
-                        throw new TokenExchangeException(readResponse(entity, errorReader));
-                    default:
-                        throw new HttpResponseException(status.getStatusCode(), status.getReasonPhrase());
-                }
-            } finally {
-                EntityUtils.consume(entity);
+        Map<String, String> bodyMap = new TreeMap<>();
+        bodyMap.put("grant_type", "client_credentials");
+        bodyMap.put("client_id", req.getClientId());
+        bodyMap.put("client_secret", req.getClientSecret());
+        bodyMap.put("audience", req.getAudience());
+        String body = bodyMap.entrySet().stream()
+                .map(e -> {
+                    try {
+                        return URLEncoder.encode(e.getKey(), "UTF-8") + '=' + URLEncoder.encode(e.getValue(), "UTF-8");
+                    } catch (UnsupportedEncodingException e1) {
+                        throw new RuntimeException(e1);
+                    }
+                })
+                .collect(Collectors.joining("&"));
+
+        try {
+
+            Response res = httpClient.preparePost(tokenUrl.toString())
+                    .setHeader("Accept", "application/json")
+                    .setHeader("Content-Type", "application/x-www-form-urlencoded")
+                    .setBody(body)
+                    .execute()
+                    .get();
+
+            switch (res.getStatusCode()) {
+            case 200:
+                return ObjectMapperFactory.getThreadLocal().reader().readValue(res.getResponseBodyAsBytes(),
+                        TokenResult.class);
+
+            case 400: // Bad request
+            case 401: // Unauthorized
+                throw new TokenExchangeException(
+                        ObjectMapperFactory.getThreadLocal().reader().readValue(res.getResponseBodyAsBytes(),
+                                TokenError.class));
+
+            default:
+                throw new IOException(
+                        "Failed to perform HTTP request. res: " + res.getStatusCode() + " " + res.getStatusText());
             }
-        }
-    }
 
-    private static <T> T readResponse(HttpEntity entity, ObjectReader objectReader) throws IOException {
-        ContentType contentType = ContentType.getOrDefault(entity);
-        if (!ContentType.APPLICATION_JSON.getMimeType().equals(contentType.getMimeType())) {
-            throw new ClientProtocolException("Unsupported content type: " + contentType.getMimeType());
-        }
-        Charset charset = contentType.getCharset();
-        if (charset == null) {
-            charset = StandardCharsets.UTF_8;
-        }
-        try (Reader reader = new InputStreamReader(entity.getContent(), charset)) {
-            @SuppressWarnings("unchecked") T obj = (T) objectReader.readValue(reader);
-            return obj;
+
+
+        } catch (InterruptedException | ExecutionException e1) {
+            throw new IOException(e1);
         }
     }
 }


[pulsar] 04/25: [Issue 7489] Remove timestamp from metrics (#7539)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 602735e8496ea93a68553dccdd876425fa42dd3a
Author: Sergii Zhevzhyk <vz...@users.noreply.github.com>
AuthorDate: Sat Jul 25 16:33:08 2020 +0200

    [Issue 7489] Remove timestamp from metrics (#7539)
    
    * [Issue 7489] Remove timestamp from exception metrics for functions and connectors
    
    * [Issue 7489] Remove timestamp from exception metrics for Go functions
    
    * [Issue 7489] Remove unused import in go stats
    
    * [Issue 7489] Remove timestamp from metrics in python stats
    
    * Change to v2 of go github actions
    
    * Update go github actions
    
    * Remove the version from the go test command
    
    * Rename github jobs for go
    
    Co-authored-by: Matteo Merli <mm...@splunk.com>
    (cherry picked from commit 777ed1651221449da61c3e3cb8a7eb8157a8239f)
---
 .github/workflows/ci-go-functions-style.yaml             |  8 +++++---
 .github/workflows/ci-go-functions-test.yaml              |  6 +++---
 pulsar-function-go/pf/stats.go                           | 15 +++++++--------
 .../functions/instance/stats/ComponentStatsManager.java  |  3 +--
 .../functions/instance/stats/FunctionStatsManager.java   | 11 +++++------
 .../functions/instance/stats/SinkStatsManager.java       | 11 +++++------
 .../functions/instance/stats/SourceStatsManager.java     | 11 +++++------
 .../instance/src/main/python/function_stats.py           | 16 ++++++++--------
 8 files changed, 39 insertions(+), 42 deletions(-)

diff --git a/.github/workflows/ci-go-functions-style.yaml b/.github/workflows/ci-go-functions-style.yaml
index 628e437..bea77c7 100644
--- a/.github/workflows/ci-go-functions-style.yaml
+++ b/.github/workflows/ci-go-functions-style.yaml
@@ -27,12 +27,14 @@ on:
       - 'pulsar-function-go/**'
 
 jobs:
-  build:
-    name: Build
+  check-style:
+
+    name: Go ${{ matrix.go-version }} Functions style check
     runs-on: ubuntu-latest
     strategy:
       matrix:
         go-version: [1.11, 1.12, 1.13, 1.14]
+
     steps:
       - name: Check out code into the Go module directory
         uses: actions/checkout@v2
@@ -47,7 +49,7 @@ jobs:
           args: site2 .github deployment .asf.yaml .ci ct.yaml
 
       - name: Set up Go
-        uses: actions/setup-go@v1
+        uses: actions/setup-go@v2
         if: steps.docs.outputs.changed_only == 'no'
         with:
           go-version: ${{ matrix.go-version }}
diff --git a/.github/workflows/ci-go-functions-test.yaml b/.github/workflows/ci-go-functions-test.yaml
index 5b6f942..8fb7d95 100644
--- a/.github/workflows/ci-go-functions-test.yaml
+++ b/.github/workflows/ci-go-functions-test.yaml
@@ -37,7 +37,7 @@ jobs:
     timeout-minutes: 120
 
     steps:
-      - name: checkout
+      - name: Check out code into the Go module directory
         uses: actions/checkout@v2
         with:
           fetch-depth: 0
@@ -50,13 +50,13 @@ jobs:
           args: site2 .github deployment .asf.yaml .ci ct.yaml
 
       - name: Set up Go
-        uses: actions/setup-go@v1
+        uses: actions/setup-go@v2
         if: steps.docs.outputs.changed_only == 'no'
         with:
           go-version: ${{ matrix.go-version }}
         id: go
 
-      - name: run tests
+      - name: Run tests
         if: steps.docs.outputs.changed_only == 'no'
         run: |
           cd pulsar-function-go
diff --git a/pulsar-function-go/pf/stats.go b/pulsar-function-go/pf/stats.go
index 2d8f15b..f7952fb 100644
--- a/pulsar-function-go/pf/stats.go
+++ b/pulsar-function-go/pf/stats.go
@@ -20,7 +20,6 @@
 package pf
 
 import (
-	"strconv"
 	"time"
 
 	"github.com/prometheus/client_golang/prometheus"
@@ -30,7 +29,7 @@ import (
 
 var (
 	metricsLabelNames          = []string{"tenant", "namespace", "name", "instance_id", "cluster", "fqfn"}
-	exceptionLabelNames        = []string{"error", "ts"}
+	exceptionLabelNames        = []string{"error"}
 	exceptionMetricsLabelNames = append(metricsLabelNames, exceptionLabelNames...)
 )
 
@@ -254,12 +253,12 @@ func (stat *StatWithLabelValues) addUserException(err error) {
 		stat.latestUserException = stat.latestUserException[1:]
 	}
 	// report exception via prometheus
-	stat.reportUserExceptionPrometheus(err, ts)
+	stat.reportUserExceptionPrometheus(err)
 }
 
 //@limits(calls=5, period=60)
-func (stat *StatWithLabelValues) reportUserExceptionPrometheus(exception error, ts int64) {
-	errorTs := []string{exception.Error(), strconv.FormatInt(ts, 10)}
+func (stat *StatWithLabelValues) reportUserExceptionPrometheus(exception error) {
+	errorTs := []string{exception.Error()}
 	exceptionMetricLabels := append(stat.metricsLabels, errorTs...)
 	userExceptions.WithLabelValues(exceptionMetricLabels...).Set(1.0)
 }
@@ -284,12 +283,12 @@ func (stat *StatWithLabelValues) addSysException(exception error) {
 		stat.latestSysException = stat.latestSysException[1:]
 	}
 	// report exception via prometheus
-	stat.reportSystemExceptionPrometheus(exception, ts)
+	stat.reportSystemExceptionPrometheus(exception)
 }
 
 //@limits(calls=5, period=60)
-func (stat *StatWithLabelValues) reportSystemExceptionPrometheus(exception error, ts int64) {
-	errorTs := []string{exception.Error(), strconv.FormatInt(ts, 10)}
+func (stat *StatWithLabelValues) reportSystemExceptionPrometheus(exception error) {
+	errorTs := []string{exception.Error()}
 	exceptionMetricLabels := append(stat.metricsLabels, errorTs...)
 	systemExceptions.WithLabelValues(exceptionMetricLabels...).Set(1.0)
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
index daa51b7..cbdcc0f 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
@@ -49,9 +49,8 @@ public abstract class ComponentStatsManager implements AutoCloseable {
     protected static final String[] exceptionMetricsLabelNames;
 
     static {
-        exceptionMetricsLabelNames = Arrays.copyOf(metricsLabelNames, metricsLabelNames.length + 2);
+        exceptionMetricsLabelNames = Arrays.copyOf(metricsLabelNames, metricsLabelNames.length + 1);
         exceptionMetricsLabelNames[metricsLabelNames.length] = "error";
-        exceptionMetricsLabelNames[metricsLabelNames.length + 1] = "ts";
     }
 
     public static ComponentStatsManager getStatsManager(CollectorRegistry collectorRegistry,
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
index fdedb74..f02b850 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
@@ -243,7 +243,7 @@ public class FunctionStatsManager extends ComponentStatsManager{
 
         // report exception throw prometheus
         if (userExceptionRateLimiter.tryAcquire()) {
-            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
+            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex);
             userExceptions.labels(exceptionMetricsLabels).set(1.0);
         }
     }
@@ -255,15 +255,14 @@ public class FunctionStatsManager extends ComponentStatsManager{
 
         // report exception throw prometheus
         if (sysExceptionRateLimiter.tryAcquire()) {
-            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
+            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex);
             sysExceptions.labels(exceptionMetricsLabels).set(1.0);
         }
     }
 
-    private String[] getExceptionMetricsLabels(Throwable ex, long ts) {
-        String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
-        exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
-        exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+    private String[] getExceptionMetricsLabels(Throwable ex) {
+        String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 1);
+        exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = ex.getMessage() != null ? ex.getMessage() : "";
         return exceptionMetricsLabels;
     }
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
index c913225..401aa34 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
@@ -213,7 +213,7 @@ public class SinkStatsManager extends ComponentStatsManager {
 
         // report exception throw prometheus
         if (sysExceptionRateLimiter.tryAcquire()) {
-            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
+            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex);
             sysExceptions.labels(exceptionMetricsLabels).set(1.0);
         }
     }
@@ -236,15 +236,14 @@ public class SinkStatsManager extends ComponentStatsManager {
 
         // report exception throw prometheus
         if (sinkExceptionRateLimiter.tryAcquire()) {
-            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
+            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex);
             sinkExceptions.labels(exceptionMetricsLabels).set(1.0);
         }
     }
 
-    private String[] getExceptionMetricsLabels(Throwable ex, long ts) {
-        String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
-        exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
-        exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+    private String[] getExceptionMetricsLabels(Throwable ex) {
+        String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 1);
+        exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = ex.getMessage() != null ? ex.getMessage() : "";
         return exceptionMetricsLabels;
     }
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
index 0ec7352..287240c 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
@@ -212,7 +212,7 @@ public class SourceStatsManager extends ComponentStatsManager {
 
         // report exception throw prometheus
         if (sysExceptionRateLimiter.tryAcquire()) {
-            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
+            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex);
             sysExceptions.labels(exceptionMetricsLabels).set(1.0);
         }
     }
@@ -230,15 +230,14 @@ public class SourceStatsManager extends ComponentStatsManager {
 
         // report exception throw prometheus
         if (sourceExceptionRateLimiter.tryAcquire()) {
-            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
+            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex);
             sourceExceptions.labels(exceptionMetricsLabels).set(1.0);
         }
     }
 
-    private String[] getExceptionMetricsLabels(Throwable ex, long ts) {
-        String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
-        exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
-        exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+    private String[] getExceptionMetricsLabels(Throwable ex) {
+        String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 1);
+        exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = ex.getMessage() != null ? ex.getMessage() : "";
         return exceptionMetricsLabels;
     }
 
diff --git a/pulsar-functions/instance/src/main/python/function_stats.py b/pulsar-functions/instance/src/main/python/function_stats.py
index 63089b6..dd236c5 100644
--- a/pulsar-functions/instance/src/main/python/function_stats.py
+++ b/pulsar-functions/instance/src/main/python/function_stats.py
@@ -29,7 +29,7 @@ from ratelimit import limits, RateLimitException
 class Stats(object):
   metrics_label_names = ['tenant', 'namespace', 'name', 'instance_id', 'cluster', 'fqfn']
 
-  exception_metrics_label_names = metrics_label_names + ['error', 'ts']
+  exception_metrics_label_names = metrics_label_names + ['error']
 
   PULSAR_FUNCTION_METRICS_PREFIX = "pulsar_function_"
   USER_METRIC_PREFIX = "user_metric_";
@@ -185,13 +185,13 @@ class Stats(object):
 
     # report exception via prometheus
     try:
-      self.report_user_exception_prometheus(exception, ts)
+      self.report_user_exception_prometheus(exception)
     except RateLimitException:
       pass
 
   @limits(calls=5, period=60)
-  def report_user_exception_prometheus(self, exception, ts):
-    exception_metric_labels = self.metrics_labels + [str(exception), str(ts)]
+  def report_user_exception_prometheus(self, exception):
+    exception_metric_labels = self.metrics_labels + [str(exception)]
     self.user_exceptions.labels(*exception_metric_labels).set(1.0)
 
   def add_sys_exception(self, exception):
@@ -203,13 +203,13 @@ class Stats(object):
 
     # report exception via prometheus
     try:
-      self.report_system_exception_prometheus(exception, ts)
+      self.report_system_exception_prometheus(exception)
     except RateLimitException:
       pass
 
   @limits(calls=5, period=60)
-  def report_system_exception_prometheus(self, exception, ts):
-    exception_metric_labels = self.metrics_labels + [str(exception), str(ts)]
+  def report_system_exception_prometheus(self, exception):
+    exception_metric_labels = self.metrics_labels + [str(exception)]
     self.system_exceptions.labels(*exception_metric_labels).set(1.0)
 
   def reset(self):
@@ -218,4 +218,4 @@ class Stats(object):
     self._stat_total_sys_exceptions_1min._value.set(0.0)
     self._stat_process_latency_ms_1min._sum.set(0.0)
     self._stat_process_latency_ms_1min._count.set(0.0)
-    self._stat_total_received_1min._value.set(0.0)
\ No newline at end of file
+    self._stat_total_received_1min._value.set(0.0)


[pulsar] 03/25: Fix: function BC issue introduced in 2.6 (#7528)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit af654c419d7c75028c1045dccaee1dcc4e2c26bc
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue Jul 14 14:02:29 2020 -0700

    Fix: function BC issue introduced in 2.6 (#7528)
    
    Co-authored-by: Jerry Peng <je...@splunk.com>
    (cherry picked from commit eef63deeeee897d67b2f3a781c4bbf8dbbe17683)
---
 .../pulsar/functions/worker/FunctionWorkerStarter.java     |  7 ++++---
 .../org/apache/pulsar/functions/worker/WorkerUtils.java    | 14 +++++++++++---
 2 files changed, 15 insertions(+), 6 deletions(-)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java
index 90ee762..e33787f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java
@@ -51,7 +51,7 @@ public class FunctionWorkerStarter {
 
         if (workerArguments.help) {
             commander.usage();
-            System.exit(-1);
+            System.exit(1);
             return;
         }
 
@@ -66,11 +66,12 @@ public class FunctionWorkerStarter {
         try {
             worker.start();
         } catch (Throwable th) {
+            log.error("Encountered error in function worker.", th);
             worker.stop();
-            System.exit(-1);
+            Runtime.getRuntime().halt(1);
         }
         Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-            log.info("Stopping function worker service ..");
+            log.info("Stopping function worker service...");
             worker.stop();
         }));
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index 5cad320..fc4d5f8 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -154,9 +154,17 @@ public final class WorkerUtils {
 
     public static URI initializeDlogNamespace(InternalConfigurationData internalConf) throws IOException {
         String zookeeperServers = internalConf.getZookeeperServers();
-        URI metadataServiceUri = URI.create(internalConf.getBookkeeperMetadataServiceUri());
-        String ledgersStoreServers = metadataServiceUri.getAuthority().replace(";", ",");
-        String ledgersRootPath = metadataServiceUri.getPath();
+        String ledgersRootPath;
+        String ledgersStoreServers;
+        // for BC purposes
+        if (internalConf.getBookkeeperMetadataServiceUri() == null) {
+            ledgersRootPath = internalConf.getLedgersRootPath();
+            ledgersStoreServers = zookeeperServers;
+        } else {
+            URI metadataServiceUri = URI.create(internalConf.getBookkeeperMetadataServiceUri());
+            ledgersStoreServers = metadataServiceUri.getAuthority().replace(";", ",");
+            ledgersRootPath = metadataServiceUri.getPath();
+        }
         BKDLConfig dlConfig = new BKDLConfig(ledgersStoreServers, ledgersRootPath);
         DLMetadata dlMetadata = DLMetadata.create(dlConfig);
         URI dlogUri = URI.create(String.format("distributedlog://%s/pulsar/functions", zookeeperServers));


[pulsar] 02/25: Ensure the create subscription can be completed when the operation timeout happens (#7522)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 460abace09ddb12344e7c3235450accf02553689
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Jul 16 09:25:49 2020 +0800

    Ensure the create subscription can be completed when the operation timeout happens (#7522)
    
    
    (cherry picked from commit 6eaf8a68d00879813ac6948a30fe5990e9193dc9)
---
 .../pulsar/client/impl/TopicsConsumerImplTest.java | 35 ++++++++++++++++++++++
 .../client/impl/MultiTopicsConsumerImpl.java       |  6 ++--
 2 files changed, 37 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 06cae44..debc250 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -31,8 +31,10 @@ import org.apache.pulsar.client.api.MessageRouter;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TopicMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -42,6 +44,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -76,6 +79,7 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
     @BeforeMethod
     public void setup() throws Exception {
         super.internalSetup();
+        super.producerBaseSetup();
     }
 
     @Override
@@ -1075,4 +1079,35 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
         producer2.close();
     }
 
+    @Test(timeOut = testTimeout)
+    public void testSubscriptionMustCompleteWhenOperationTimeoutOnMultipleTopics() throws PulsarClientException {
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(lookupUrl.toString())
+                .ioThreads(2)
+                .listenerThreads(3)
+                .operationTimeout(2, TimeUnit.MILLISECONDS) // Set this very small so the operation timeout can be triggered
+                .build();
+
+        String topic0 = "public/default/topic0";
+        String topic1 = "public/default/topic1";
+
+        for (int i = 0; i < 10; i++) {
+            try {
+                client.newConsumer(Schema.STRING)
+                        .subscriptionName("subName")
+                        .topics(Lists.<String>newArrayList(topic0, topic1))
+                        .receiverQueueSize(2)
+                        .subscriptionType(SubscriptionType.Shared)
+                        .ackTimeout(365, TimeUnit.DAYS)
+                        .ackTimeoutTickTime(36, TimeUnit.DAYS)
+                        .acknowledgmentGroupTime(0, TimeUnit.MILLISECONDS)
+                        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                        .subscribe();
+                Thread.sleep(3000);
+            } catch (Exception ex) {
+                Assert.assertTrue(ex instanceof PulsarClientException.TimeoutException);
+            }
+        }
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index c79b437..4e2a6be 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -750,7 +750,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
 
         client.getPartitionedTopicMetadata(topicName)
                 .thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, topicName, metadata.partitions,
-                        createTopicIfDoesNotExist))
+                    createTopicIfDoesNotExist))
                 .exceptionally(ex1 -> {
                     log.warn("[{}] Failed to get partitioned topic metadata: {}", topicName, ex1.getMessage());
                     subscribeResult.completeExceptionally(ex1);
@@ -904,12 +904,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     // handling failure during subscribe new topic, unsubscribe success created partitions
     private void handleSubscribeOneTopicError(String topicName, Throwable error, CompletableFuture<Void> subscribeFuture) {
         log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer {}", topic, topicName, error.getMessage());
-
         client.externalExecutorProvider().getExecutor().submit(() -> {
             AtomicInteger toCloseNum = new AtomicInteger(0);
             consumers.values().stream().filter(consumer1 -> {
                 String consumerTopicName = consumer1.getTopic();
-                if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicName)) {
+                if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals(TopicName.get(topicName).getPartitionedTopicName())) {
                     toCloseNum.incrementAndGet();
                     return true;
                 } else {
@@ -924,7 +923,6 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                         log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer, subscribe error: {}",
                             topic, topicName, error.getMessage());
                         topics.remove(topicName);
-                        checkState(allTopicPartitionsNumber.get() == consumers.values().size());
                         subscribeFuture.completeExceptionally(error);
                     }
                     return;


[pulsar] 07/25: [CPP] Fix segment crashes that caused by race condition of timer in cpp client (#7572)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bcb90d2751203301e89a5adbf2a29d71a9073cfa
Author: Isaiah Rairdon <ir...@gmail.com>
AuthorDate: Tue Jul 28 08:22:08 2020 -0600

    [CPP] Fix segment crashes that caused by race condition of timer in cpp client (#7572)
    
    * Kevin Wilson changes to fix segment crashes in pulsar
    
    * after merge with master, change following comments: ptr, unnecessary change
    
    * add lock to avoid concurrent access
    
    * Remove comments
    
    Co-authored-by: Isaiah Rairdon <is...@microfocus.com>
    Co-authored-by: Jia Zhai <zh...@apache.org>
    Co-authored-by: Sijie Guo <si...@apache.org>
    Co-authored-by: xiaolong.ran <rx...@apache.org>
    (cherry picked from commit 15d5254e49b96719638f9efec391a6beeed00bb9)
---
 pulsar-client-cpp/lib/ClientConnection.cc | 40 +++++++++++++++++++++----------
 1 file changed, 28 insertions(+), 12 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index 3628dd5..ebb7268 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -256,10 +256,14 @@ void ClientConnection::handlePulsarConnected(const CommandConnected& cmdConnecte
 
     if (serverProtocolVersion_ >= v1) {
         // Only send keep-alive probes if the broker supports it
-        DeadlineTimerPtr keepAliveTimer = executor_->createDeadlineTimer();
-        keepAliveTimer->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
-        keepAliveTimer->async_wait(std::bind(&ClientConnection::handleKeepAliveTimeout, shared_from_this()));
-        keepAliveTimer_ = keepAliveTimer;
+        keepAliveTimer_ = executor_->createDeadlineTimer();
+        Lock lock(mutex_);
+        if (keepAliveTimer_) {
+            keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
+            keepAliveTimer_->async_wait(
+                std::bind(&ClientConnection::handleKeepAliveTimeout, shared_from_this()));
+        }
+        lock.unlock();
     }
 
     if (serverProtocolVersion_ >= v8) {
@@ -289,13 +293,14 @@ void ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta
         consumerStatsRequests.push_back(it->first);
     }
 
-    DeadlineTimerPtr timer = consumerStatsRequestTimer_;
-    if (timer) {
-        timer->expires_from_now(operationsTimeout_);
-        timer->async_wait(std::bind(&ClientConnection::handleConsumerStatsTimeout, shared_from_this(),
-                                    std::placeholders::_1, consumerStatsRequests));
+    // If the close operation has reset the consumerStatsRequestTimer_ then the use_count will be zero
+    // Check if we have a timer still before we set the request timer to pop again.
+    if (consumerStatsRequestTimer_) {
+        consumerStatsRequestTimer_->expires_from_now(operationsTimeout_);
+        consumerStatsRequestTimer_->async_wait(std::bind(&ClientConnection::handleConsumerStatsTimeout,
+                                                         shared_from_this(), std::placeholders::_1,
+                                                         consumerStatsRequests));
     }
-
     lock.unlock();
     // Complex logic since promises need to be fulfilled outside the lock
     for (int i = 0; i < consumerStatsPromises.size(); i++) {
@@ -1344,8 +1349,15 @@ void ClientConnection::handleKeepAliveTimeout() {
         havePendingPingRequest_ = true;
         sendCommand(Commands::newPing());
 
-        keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
-        keepAliveTimer_->async_wait(std::bind(&ClientConnection::handleKeepAliveTimeout, shared_from_this()));
+        // If the close operation has already called the keepAliveTimer_.reset() then the use_count will be
+        // zero And we do not attempt to dereference the pointer.
+        Lock lock(mutex_);
+        if (keepAliveTimer_) {
+            keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
+            keepAliveTimer_->async_wait(
+                std::bind(&ClientConnection::handleKeepAliveTimeout, shared_from_this()));
+        }
+        lock.unlock();
     }
 }
 
@@ -1375,13 +1387,17 @@ void ClientConnection::close() {
     LOG_INFO(cnxString_ << "Connection closed");
 
     if (keepAliveTimer_) {
+        lock.lock();
         keepAliveTimer_->cancel();
         keepAliveTimer_.reset();
+        lock.unlock();
     }
 
     if (consumerStatsRequestTimer_) {
+        lock.lock();
         consumerStatsRequestTimer_->cancel();
         consumerStatsRequestTimer_.reset();
+        lock.unlock();
     }
 
     for (ProducersMap::iterator it = producers.begin(); it != producers.end(); ++it) {


[pulsar] 19/25: Fix backward compatibility issues with batch index acknowledgment. (#7655)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 12cca8a5c9cef45eba0cd72c3df9d30c95931af1
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Jul 28 23:49:42 2020 +0800

    Fix backward compatibility issues with batch index acknowledgment. (#7655)
    
    ### Motivation
    
    Fix backward compatibility issues with batch index acknowledgment.
    
    ### Modifications
    
    Disable batch index acknowledgment by default at the consumer side.
    
    (cherry picked from commit fffd9f144bb14a220d17e951fea29b16ad2db103)
---
 .../client/impl/BatchMessageIndexAckTest.java      |  3 ++
 .../apache/pulsar/client/api/ConsumerBuilder.java  |  6 +++
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |  6 +++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  8 ++--
 .../impl/conf/ConsumerConfigurationData.java       |  2 +
 ...t2_2.java => PulsarStandaloneTestSuite2_5.java} | 26 +++++++-----
 .../backwardscompatibility/SmokeTest2_2.java       |  4 ++
 .../backwardscompatibility/SmokeTest2_3.java       |  4 ++
 .../backwardscompatibility/SmokeTest2_4.java       |  4 ++
 .../{SmokeTest2_2.java => SmokeTest2_5.java}       |  6 ++-
 .../integration/containers/PulsarContainer.java    |  1 +
 .../integration/topologies/PulsarTestBase.java     | 46 ++++++++++++++++++++++
 12 files changed, 102 insertions(+), 14 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
index 3150f10..582d461 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
@@ -67,6 +67,7 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase {
             .subscriptionName("sub")
             .receiverQueueSize(100)
             .subscriptionType(SubscriptionType.Shared)
+            .enableBatchIndexAcknowledgment(true)
             .negativeAckRedeliveryDelay(2, TimeUnit.SECONDS)
             .subscribe();
 
@@ -125,6 +126,7 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase {
             .topic(topic)
             .subscriptionName("sub")
             .receiverQueueSize(100)
+            .enableBatchIndexAcknowledgment(true)
             .subscribe();
 
         @Cleanup
@@ -194,6 +196,7 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase {
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .acknowledgmentGroupTime(1, TimeUnit.MILLISECONDS)
                 .topic(topic)
+                .enableBatchIndexAcknowledgment(true)
                 .subscriptionName("test")
                 .subscribe();
 
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index def1807..9bb8ce0 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -610,6 +610,12 @@ public interface ConsumerBuilder<T> extends Cloneable {
     ConsumerBuilder<T> enableRetry(boolean retryEnable);
 
     /**
+     * Enable or disable the batch index acknowledgment. To enable this feature must ensure batch index acknowledgment
+     * feature is enabled at the broker side.
+     */
+    ConsumerBuilder<T> enableBatchIndexAcknowledgment(boolean batchIndexAcknowledgmentEnabled);
+
+    /**
      * Consumer buffers chunk messages into memory until it receives all the chunks of the original message. While
      * consuming chunk-messages, chunks from same message might not be contiguous in the stream and they might be mixed
      * with other messages' chunks. so, consumer has to maintain multiple buffers to manage chunks coming from different
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 9240ab3..64eadfe 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -416,6 +416,12 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
     }
 
     @Override
+    public ConsumerBuilder<T> enableBatchIndexAcknowledgment(boolean batchIndexAcknowledgmentEnabled) {
+        conf.setBatchIndexAckEnabled(batchIndexAcknowledgmentEnabled);
+        return this;
+    }
+
+    @Override
     public ConsumerBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit) {
         conf.setExpireTimeOfIncompleteChunkedMessageMillis(unit.toMillis(duration));
         return null;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 10320da..28995ad 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -543,9 +543,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                             ackType);
                 }
             } else {
-                BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
-                acknowledgmentsGroupingTracker.addBatchIndexAcknowledgment(batchMessageId, batchMessageId.getBatchIndex(),
-                    batchMessageId.getBatchSize(), ackType, properties);
+                if (conf.isBatchIndexAckEnabled()) {
+                    BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
+                    acknowledgmentsGroupingTracker.addBatchIndexAcknowledgment(batchMessageId, batchMessageId.getBatchIndex(),
+                            batchMessageId.getBatchSize(), ackType, properties);
+                }
                 // other messages in batch are still pending ack.
                 return CompletableFuture.completedFuture(null);
             }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index eb82703..14595e6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -125,6 +125,8 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
 
     private KeySharedPolicy keySharedPolicy;
 
+    private boolean batchIndexAckEnabled = false;
+
     @JsonIgnore
     public String getSingleTopic() {
         checkArgument(topicNames.size() == 1);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_5.java
similarity index 56%
copy from tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java
copy to tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_5.java
index 7c1c2a1..cf88ca2 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_5.java
@@ -16,21 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.pulsar.tests.integration.backwardscompatibility;
 
-import org.testng.annotations.Test;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarStandaloneTestBase;
+import org.testng.ITest;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
 
-public class SmokeTest2_2 extends PulsarStandaloneTestSuite2_2 {
+public class PulsarStandaloneTestSuite2_5 extends PulsarStandaloneTestBase implements ITest {
 
-    @Test(dataProvider = "StandaloneServiceUrlAndTopics")
-    public void testPublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception {
-        super.testPublishAndConsume(serviceUrl, isPersistent);
+    @BeforeSuite
+    public void setUpCluster() throws Exception {
+        super.startCluster(PulsarContainer.PULSAR_2_5_IMAGE_NAME);
     }
 
-    @Test(dataProvider = "StandaloneServiceUrlAndTopics")
-    public void testBatchMessagePublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception {
-        super.testBatchMessagePublishAndConsume(serviceUrl, isPersistent);
+    @AfterSuite
+    public void tearDownCluster() throws Exception {
+        super.stopCluster();
+    }
+    @Override
+    public String getTestName() {
+        return "pulsar-standalone-suite";
     }
-
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java
index 7c1c2a1..20e9926 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java
@@ -33,4 +33,8 @@ public class SmokeTest2_2 extends PulsarStandaloneTestSuite2_2 {
         super.testBatchMessagePublishAndConsume(serviceUrl, isPersistent);
     }
 
+    @Test(dataProvider = "StandaloneServiceUrlAndTopics")
+    public void testBatchIndexAckDisabled(String serviceUrl, boolean isPersistent) throws Exception {
+        super.testBatchIndexAckDisabled(serviceUrl);
+    }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_3.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_3.java
index ab317d0..e1b37e3 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_3.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_3.java
@@ -33,4 +33,8 @@ public class SmokeTest2_3 extends PulsarStandaloneTestSuite2_3 {
         super.testBatchMessagePublishAndConsume(serviceUrl, isPersistent);
     }
 
+    @Test(dataProvider = "StandaloneServiceUrlAndTopics")
+    public void testBatchIndexAckDisabled(String serviceUrl, boolean isPersistent) throws Exception {
+        super.testBatchIndexAckDisabled(serviceUrl);
+    }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_4.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_4.java
index d74ad8e..eb77eaa 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_4.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_4.java
@@ -33,4 +33,8 @@ public class SmokeTest2_4 extends PulsarStandaloneTestSuite2_4 {
         super.testBatchMessagePublishAndConsume(serviceUrl, isPersistent);
     }
 
+    @Test(dataProvider = "StandaloneServiceUrlAndTopics")
+    public void testBatchIndexAckDisabled(String serviceUrl, boolean isPersistent) throws Exception {
+        super.testBatchIndexAckDisabled(serviceUrl);
+    }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_5.java
similarity index 83%
copy from tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java
copy to tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_5.java
index 7c1c2a1..2bcf584 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_5.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.tests.integration.backwardscompatibility;
 
 import org.testng.annotations.Test;
 
-public class SmokeTest2_2 extends PulsarStandaloneTestSuite2_2 {
+public class SmokeTest2_5 extends PulsarStandaloneTestSuite2_5 {
 
     @Test(dataProvider = "StandaloneServiceUrlAndTopics")
     public void testPublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception {
@@ -33,4 +33,8 @@ public class SmokeTest2_2 extends PulsarStandaloneTestSuite2_2 {
         super.testBatchMessagePublishAndConsume(serviceUrl, isPersistent);
     }
 
+    @Test(dataProvider = "StandaloneServiceUrlAndTopics")
+    public void testBatchIndexAckDisabled(String serviceUrl, boolean isPersistent) throws Exception {
+        super.testBatchIndexAckDisabled(serviceUrl);
+    }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
index 15c99cc..ae5e57a 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
@@ -41,6 +41,7 @@ public abstract class PulsarContainer<SelfT extends PulsarContainer<SelfT>> exte
     public static final int BROKER_HTTP_PORT = 8080;
 
     public static final String DEFAULT_IMAGE_NAME = "apachepulsar/pulsar-test-latest-version:latest";
+    public static final String PULSAR_2_5_IMAGE_NAME = "apachepulsar/pulsar:2.5.0";
     public static final String PULSAR_2_4_IMAGE_NAME = "apachepulsar/pulsar:2.4.0";
     public static final String PULSAR_2_3_IMAGE_NAME = "apachepulsar/pulsar:2.3.0";
     public static final String PULSAR_2_2_IMAGE_NAME = "apachepulsar/pulsar:2.2.0";
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java
index 9e75e95..da58713 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java
@@ -24,13 +24,17 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.junit.Assert;
 
 public class PulsarTestBase {
 
@@ -130,4 +134,46 @@ public class PulsarTestBase {
         }
     }
 
+    public void testBatchIndexAckDisabled(String serviceUrl) throws Exception {
+        String topicName = generateTopicName("test-batch-index-ack-disabled", true);
+        final int numMessages = 100;
+        try (PulsarClient client = PulsarClient.builder()
+                .serviceUrl(serviceUrl)
+                .build()) {
+
+            try (Consumer<Integer> consumer = client.newConsumer(Schema.INT32)
+                    .topic(topicName)
+                    .subscriptionName("sub")
+                    .receiverQueueSize(100)
+                    .subscriptionType(SubscriptionType.Shared)
+                    .enableBatchIndexAcknowledgment(false)
+                    .ackTimeout(1, TimeUnit.SECONDS)
+                    .subscribe();) {
+
+                try (Producer<Integer> producer = client.newProducer(Schema.INT32)
+                        .topic(topicName)
+                        .batchingMaxPublishDelay(50, TimeUnit.MILLISECONDS)
+                        .create()) {
+
+                    List<CompletableFuture<MessageId>> futures = new ArrayList<>();
+                    for (int i = 0; i < numMessages; i++) {
+                        futures.add(producer.sendAsync(i));
+                    }
+                    // Wait for all messages are publish succeed.
+                    FutureUtil.waitForAll(futures).get();
+                }
+
+                for (int i = 0; i < numMessages; i++) {
+                    Message<Integer> m = consumer.receive();
+                    if (i % 2 == 0) {
+                        consumer.acknowledge(m);
+                    }
+                }
+
+                Message<Integer> redelivery = consumer.receive(3, TimeUnit.SECONDS);
+                Assert.assertNotNull(redelivery);
+            }
+        }
+    }
+
 }


[pulsar] 17/25: fix NPE when using advertisedListeners (#7620)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 53ee16dce2ef5e5011ef1e676be4eba635d8706e
Author: Fangbin Sun <su...@gmail.com>
AuthorDate: Tue Jul 28 16:17:24 2020 +0800

    fix NPE when using advertisedListeners (#7620)
    
    ### Modifications
    
    Correct `BrokerServiceUrlTls` when `tls` is not enabled.
    
    (cherry picked from commit fcb0dc04b3262249c1a8f7e7964ff8f0487f0acc)
---
 .../apache/pulsar/broker/validator/MultipleListenerValidator.java | 8 ++------
 .../java/org/apache/pulsar/broker/namespace/NamespaceService.java | 6 ++++--
 2 files changed, 6 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java
index 9f44da8..f97f4aa 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java
@@ -52,7 +52,7 @@ public final class MultipleListenerValidator {
             throw new IllegalArgumentException("`advertisedListeners` and `advertisedAddress` must not appear together");
         }
         if (StringUtils.isBlank(config.getAdvertisedListeners())) {
-            return Collections.EMPTY_MAP;
+            return Collections.emptyMap();
         }
         Optional<String> firstListenerName = Optional.empty();
         Map<String, List<String>> listeners = Maps.newHashMap();
@@ -101,11 +101,7 @@ public final class MultipleListenerValidator {
                     }
                     String hostPort = String.format("%s:%d", uri.getHost(), uri.getPort());
                     reverseMappings.computeIfAbsent(hostPort, k -> Sets.newTreeSet());
-                    Set<String> sets = reverseMappings.get(hostPort);
-                    if (sets == null) {
-                        sets = Sets.newTreeSet();
-                        reverseMappings.put(hostPort, sets);
-                    }
+                    Set<String> sets = reverseMappings.computeIfAbsent(hostPort, k -> Sets.newTreeSet());
                     sets.add(entry.getKey());
                     if (sets.size() > 1) {
                         throw new IllegalArgumentException("must not specify `" + hostPort + "` to different listener.");
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 0fa4957..b50e769 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -395,8 +395,9 @@ public class NamespaceService {
                             future.completeExceptionally(
                                     new PulsarServerException("the broker do not have " + advertisedListenerName + " listener"));
                         } else {
+                            URI urlTls = listener.getBrokerServiceUrlTls();
                             future.complete(Optional.of(new LookupResult(nsData.get(),
-                                    listener.getBrokerServiceUrl().toString(), listener.getBrokerServiceUrlTls().toString())));
+                                    listener.getBrokerServiceUrl().toString(), urlTls == null ? null : urlTls.toString())));
                         }
                         return;
                     } else {
@@ -496,8 +497,9 @@ public class NamespaceService {
                                         new PulsarServerException("the broker do not have " + advertisedListenerName + " listener"));
                                 return;
                             } else {
+                                URI urlTls = listener.getBrokerServiceUrlTls();
                                 lookupFuture.complete(Optional.of(new LookupResult(ownerInfo, listener.getBrokerServiceUrl().toString(),
-                                        listener.getBrokerServiceUrlTls().toString())));
+                                        urlTls == null ? null : urlTls.toString())));
                                 return;
                             }
                         } else {


[pulsar] 06/25: Add more logging to the auth operations on failure (#7567)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit df3cd424887f3b6c2deed3b4867a4c0a9f6fa7bd
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Thu Jul 16 20:51:21 2020 -0700

    Add more logging to the auth operations on failure (#7567)
    
    * Added upgrade notes
    
    * Add more logs on failure of auth operations
    
    Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
    (cherry picked from commit 2a0cb69e865d650d3eaf5e18329272d38372722a)
---
 .../broker/authorization/AuthorizationProvider.java       | 15 ++++++++++++---
 1 file changed, 12 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index 4eb5d93..0424c00 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -238,7 +238,10 @@ public interface AuthorizationProvider extends Closeable {
                                                                  String role, NamespaceOperation operation,
                                                                  AuthenticationDataSource authData) {
         return FutureUtil.failedFuture(
-            new IllegalStateException("NamespaceOperation is not supported by the Authorization provider you are using."));
+            new IllegalStateException(
+                    String.format("NamespaceOperation(%s) on namespace(%s) by role(%s) is not supported" +
+                    " by the Authorization provider you are using.",
+                            operation.toString(), namespaceName.toString(), role == null ? "null" : role)));
     }
 
     default Boolean allowNamespaceOperation(NamespaceName namespaceName, String originalRole, String role,
@@ -265,7 +268,10 @@ public interface AuthorizationProvider extends Closeable {
                                                                           PolicyOperation operation, String originalRole,
                                                                           String role, AuthenticationDataSource authData) {
         return FutureUtil.failedFuture(
-                new IllegalStateException("NamespacePolicyOperation is not supported by the Authorization provider you are using."));
+                new IllegalStateException(
+                        String.format("NamespacePolicyOperation(%s) on namespace(%s) by role(%s) is not supported" +
+                                " by the Authorization provider you are using.", operation.toString(),
+                                namespaceName.toString(), role == null ? "null" : role)));
     }
 
     default Boolean allowNamespacePolicyOperation(NamespaceName namespaceName, PolicyName policy, PolicyOperation operation,
@@ -293,7 +299,10 @@ public interface AuthorizationProvider extends Closeable {
                                                              TopicOperation operation,
                                                              AuthenticationDataSource authData) {
         return FutureUtil.failedFuture(
-            new IllegalStateException("TopicOperation is not supported by the Authorization provider you are using."));
+            new IllegalStateException(
+                    String.format("TopicOperation(%s) on topic(%s) by role(%s) is not supported" +
+                            " by the Authorization provider you are using.",
+                            operation.toString(), topic.toString(), role == null ? "null" : null)));
     }
 
     default Boolean allowTopicOperation(TopicName topicName, String originalRole, String role, TopicOperation operation,


[pulsar] 25/25: [doc] add cpp client document for oauth2 authentication

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a1f56e38849d79e167807b8f205a1cb649d30f79
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Fri Jul 24 08:37:03 2020 +0800

    [doc] add cpp client document for oauth2 authentication
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
---
 site2/docs/client-libraries-java.md                |  4 +--
 .../docs/{security-oauth.md => security-oauth2.md} | 30 +++++++++++++++++++---
 site2/website/sidebars.json                        |  2 +-
 3 files changed, 29 insertions(+), 7 deletions(-)

diff --git a/site2/docs/client-libraries-java.md b/site2/docs/client-libraries-java.md
index bbcf8f2..da6fbc0 100644
--- a/site2/docs/client-libraries-java.md
+++ b/site2/docs/client-libraries-java.md
@@ -781,7 +781,7 @@ The following schema formats are currently available for Java:
 
 ## Authentication
 
-Pulsar currently supports three authentication schemes: [TLS](security-tls-authentication.md), [Athenz](security-athenz.md), and [Oauth2](security-oauth.md). You can use the Pulsar Java client with all of them.
+Pulsar currently supports three authentication schemes: [TLS](security-tls-authentication.md), [Athenz](security-athenz.md), and [Oauth2](security-oauth2.md). You can use the Pulsar Java client with all of them.
 
 ### TLS Authentication
 
@@ -843,7 +843,7 @@ PulsarClient client = PulsarClient.builder()
 
 ### Oauth2
 
-The following example shows how to use [Oauth2](security-oauth.md) as an authentication provider for the Pulsar Java client.
+The following example shows how to use [Oauth2](security-oauth2.md) as an authentication provider for the Pulsar Java client.
 
 You can use the factory method to configure authentication for Pulsar Java client.
 
diff --git a/site2/docs/security-oauth.md b/site2/docs/security-oauth2.md
similarity index 81%
rename from site2/docs/security-oauth.md
rename to site2/docs/security-oauth2.md
index b399dea..c847112 100644
--- a/site2/docs/security-oauth.md
+++ b/site2/docs/security-oauth2.md
@@ -1,5 +1,5 @@
 ---
-id: security-oauth
+id: security-oauth2
 title: Client authentication using OAuth 2.0 access tokens
 sidebar_label: Authentication using OAuth 2.0 access tokens
 ---
@@ -74,10 +74,14 @@ You can use the Oauth2 authentication provider with the following Pulsar clients
 You can use the factory method to configure authentication for Pulsar Java client.
 
 ```java
+String issuerUrl = "https://dev-kt-aa9ne.us.auth0.com/oauth/token";
+String credentialsUrl = "file:///path/to/KeyFile.json";
+String audience = "https://dev-kt-aa9ne.us.auth0.com/api/v2/";
+
 PulsarClient client = PulsarClient.builder()
     .serviceUrl("pulsar://broker.example.com:6650/")
     .authentication(
-        AuthenticationFactoryOAuth2.clientCredentials(this.issuerUrl, this.credentialsUrl, this.audience))
+        AuthenticationFactoryOAuth2.clientCredentials(issuerUrl, credentialsUrl, audience))
     .build();
 ```
 
@@ -85,9 +89,27 @@ In addition, you can also use the encoded parameters to configure authentication
 
 ```java
 Authentication auth = AuthenticationFactory
-    .create(AuthenticationOAuth2.class.getName(), "{"type":"client_credentials","privateKey":"...","issuerUrl":"...","audience":"..."}");
+    .create(AuthenticationOAuth2.class.getName(), "{"type":"client_credentials","privateKey":"./key/path/..","issuerUrl":"...","audience":"..."}");
 PulsarClient client = PulsarClient.builder()
     .serviceUrl("pulsar://broker.example.com:6650/")
     .authentication(auth)
     .build();
-```
\ No newline at end of file
+```
+
+### C++ client
+
+The C++ client is similar to the Java client. You need to provide parameters of `issuerUrl`, `private_key` (the credentials file path), and the audience.
+
+```c++
+#include <pulsar/Client.h>
+
+pulsar::ClientConfiguration config;
+std::string params = R"({
+    "issuer_url": "https://dev-kt-aa9ne.us.auth0.com/oauth/token",
+    "private_key": "../../pulsar-broker/src/test/resources/authentication/token/cpp_credentials_file.json",
+    "audience": "https://dev-kt-aa9ne.us.auth0.com/api/v2/"})";
+    
+config.setAuth(pulsar::AuthOauth2::create(params));
+
+pulsar::Client client("pulsar://broker.example.com:6650/", config);
+```
diff --git a/site2/website/sidebars.json b/site2/website/sidebars.json
index 42356fd..3c10720 100644
--- a/site2/website/sidebars.json
+++ b/site2/website/sidebars.json
@@ -82,7 +82,7 @@
       "security-jwt",
       "security-athenz",
       "security-kerberos",
-      "security-oauth"
+      "security-oauth2",
       "security-authorization",
       "security-encryption",
       "security-extending",


[pulsar] 05/25: [PROTOBUF] Fix protobuf generation on handling repeated long number … (#7540)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 56ea8a887178167f7a437d62a26489fb6ea8e436
Author: Sijie Guo <si...@apache.org>
AuthorDate: Thu Jul 16 05:53:30 2020 -0700

    [PROTOBUF] Fix protobuf generation on handling repeated long number … (#7540)
    
    *Motivation*
    
    The code generation for `repeated long` is not handled properly. (I am not sure how changes were made to PulsarApi.proto)
    
    *Modification*
    
    This pull request adds the code to handle generating code for `repeated long`.
    
    *Test*
    
    Add unit test to ensure `repeated long` is processed. Add test cases to cover both packed and non-package serialization for `repeated long`.
    
    See more details about packed serialization: https://developers.google.com/protocol-buffers/docs/encoding#optional
    
    (cherry picked from commit 4e358ef9f3fa8c6164286d6e80f6e75f66c31eab)
---
 pom.xml                                            |   2 +
 .../apache/pulsar/common/api/proto/PulsarApi.java  |  18 +
 .../util/protobuf/ByteBufCodedInputStream.java     |  30 +
 .../apache/pulsar/common/api/proto/TestApi.java    | 641 +++++++++++++++++++++
 .../common/protocol/RepeatedLongNonPackedTest.java |  65 +++
 .../common/protocol/RepeatedLongPackedTest.java    |  65 +++
 pulsar-common/src/test/proto/TestApi.proto         |  32 +
 7 files changed, 853 insertions(+)

diff --git a/pom.xml b/pom.xml
index 822dd65..246edd6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1272,6 +1272,7 @@ flexible messaging model and an intuitive client API.</description>
             <exclude>src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java</exclude>
             <exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude>
             <exclude>src/main/java/org/apache/pulsar/common/api/proto/*.java</exclude>
+            <exclude>src/test/java/org/apache/pulsar/common/api/proto/*.java</exclude>
             <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java</exclude>
             <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java</exclude>
             <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java</exclude>
@@ -1347,6 +1348,7 @@ flexible messaging model and an intuitive client API.</description>
                  and are included in source tree for convenience -->
             <exclude>src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java</exclude>
             <exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java</exclude>
+            <exclude>src/test/java/org/apache/pulsar/common/api/proto/TestApi.java</exclude>
             <exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java</exclude>
             <exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude>
             <exclude>bin/proto/MLDataFormats_pb2.py</exclude>
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 4d1babf..2f822e8 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -1578,6 +1578,15 @@ public final class PulsarApi {
               ackSet_.add(input.readInt64());
               break;
             }
+            case 42: {
+              int length = input.readRawVarint32();
+              int limit = input.pushLimit(length);
+              while (input.getBytesUntilLimit() > 0) {
+                addAckSet(input.readInt64());
+              }
+              input.popLimit(limit);
+              break;
+            }
           }
         }
       }
@@ -18860,6 +18869,15 @@ public final class PulsarApi {
               ackSet_.add(input.readInt64());
               break;
             }
+            case 34: {
+              int length = input.readRawVarint32();
+              int limit = input.pushLimit(length);
+              while (input.getBytesUntilLimit() > 0) {
+                addAckSet(input.readInt64());
+              }
+              input.popLimit(limit);
+              break;
+            }
           }
         }
       }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java
index e4dad0c..caac6d6 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java
@@ -346,4 +346,34 @@ public class ByteBufCodedInputStream {
 
         buf.readerIndex(buf.readerIndex() + size);
     }
+
+    public int pushLimit(int byteLimit) throws InvalidProtocolBufferException {
+        if (byteLimit < 0) {
+            throw new InvalidProtocolBufferException("CodedInputStream encountered an embedded string or message"
+                + " which claimed to have negative size.");
+        }
+
+        byteLimit += buf.readerIndex();
+        final int oldLimit = buf.writerIndex();
+        if (byteLimit > oldLimit) {
+            throw new InvalidProtocolBufferException("While parsing a protocol message, the input ended unexpectedly"
+                + " in the middle of a field.  This could mean either than the input has been truncated or that an"
+                + " embedded message misreported its own length.");
+        }
+        buf.writerIndex(byteLimit);
+        return oldLimit;
+    }
+
+    /**
+     * Discards the current limit, returning to the previous limit.
+     *
+     * @param oldLimit The old limit, as returned by {@code pushLimit}.
+     */
+    public void popLimit(final int oldLimit) {
+        buf.writerIndex(oldLimit);
+    }
+
+    public int getBytesUntilLimit() {
+        return buf.readableBytes();
+    }
 }
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/api/proto/TestApi.java b/pulsar-common/src/test/java/org/apache/pulsar/common/api/proto/TestApi.java
new file mode 100644
index 0000000..f165a94
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/api/proto/TestApi.java
@@ -0,0 +1,641 @@
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: src/test/proto/TestApi.proto
+
+package org.apache.pulsar.common.api.proto;
+
+public final class TestApi {
+  private TestApi() {}
+  public static void registerAllExtensions(
+      org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite registry) {
+  }
+  public interface MessageIdDataOrBuilder
+      extends org.apache.pulsar.shaded.com.google.protobuf.v241.MessageLiteOrBuilder {
+    
+    // required uint64 ledgerId = 1;
+    boolean hasLedgerId();
+    long getLedgerId();
+    
+    // required uint64 entryId = 2;
+    boolean hasEntryId();
+    long getEntryId();
+    
+    // optional int32 partition = 3 [default = -1];
+    boolean hasPartition();
+    int getPartition();
+    
+    // optional int32 batch_index = 4 [default = -1];
+    boolean hasBatchIndex();
+    int getBatchIndex();
+    
+    // repeated int64 ack_set = 5 [packed = true];
+    java.util.List<java.lang.Long> getAckSetList();
+    int getAckSetCount();
+    long getAckSet(int index);
+  }
+  public static final class MessageIdData extends
+      org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
+      implements MessageIdDataOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
+    // Use MessageIdData.newBuilder() to construct.
+    private io.netty.util.Recycler.Handle handle;
+    private MessageIdData(io.netty.util.Recycler.Handle handle) {
+      this.handle = handle;
+    }
+    
+     private static final io.netty.util.Recycler<MessageIdData> RECYCLER = new io.netty.util.Recycler<MessageIdData>() {
+            protected MessageIdData newObject(Handle handle) {
+              return new MessageIdData(handle);
+            }
+          };
+        
+        public void recycle() {
+            this.initFields();
+            this.memoizedIsInitialized = -1;
+            this.bitField0_ = 0;
+            this.memoizedSerializedSize = -1;
+            if (handle != null) { RECYCLER.recycle(this, handle); }
+        }
+         
+    private MessageIdData(boolean noInit) {}
+    
+    private static final MessageIdData defaultInstance;
+    public static MessageIdData getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public MessageIdData getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // required uint64 ledgerId = 1;
+    public static final int LEDGERID_FIELD_NUMBER = 1;
+    private long ledgerId_;
+    public boolean hasLedgerId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public long getLedgerId() {
+      return ledgerId_;
+    }
+    
+    // required uint64 entryId = 2;
+    public static final int ENTRYID_FIELD_NUMBER = 2;
+    private long entryId_;
+    public boolean hasEntryId() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public long getEntryId() {
+      return entryId_;
+    }
+    
+    // optional int32 partition = 3 [default = -1];
+    public static final int PARTITION_FIELD_NUMBER = 3;
+    private int partition_;
+    public boolean hasPartition() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    public int getPartition() {
+      return partition_;
+    }
+    
+    // optional int32 batch_index = 4 [default = -1];
+    public static final int BATCH_INDEX_FIELD_NUMBER = 4;
+    private int batchIndex_;
+    public boolean hasBatchIndex() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    public int getBatchIndex() {
+      return batchIndex_;
+    }
+    
+    // repeated int64 ack_set = 5 [packed = true];
+    public static final int ACK_SET_FIELD_NUMBER = 5;
+    private java.util.List<java.lang.Long> ackSet_;
+    public java.util.List<java.lang.Long>
+        getAckSetList() {
+      return ackSet_;
+    }
+    public int getAckSetCount() {
+      return ackSet_.size();
+    }
+    public long getAckSet(int index) {
+      return ackSet_.get(index);
+    }
+    private int ackSetMemoizedSerializedSize = -1;
+    
+    private void initFields() {
+      ledgerId_ = 0L;
+      entryId_ = 0L;
+      partition_ = -1;
+      batchIndex_ = -1;
+      ackSet_ = java.util.Collections.emptyList();;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasLedgerId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasEntryId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream output)
+                        throws java.io.IOException {
+        throw new RuntimeException("Cannot use CodedOutputStream");
+    }
+    
+    public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeUInt64(1, ledgerId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeUInt64(2, entryId_);
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeInt32(3, partition_);
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeInt32(4, batchIndex_);
+      }
+      if (getAckSetList().size() > 0) {
+        output.writeRawVarint32(42);
+        output.writeRawVarint32(ackSetMemoizedSerializedSize);
+      }
+      for (int i = 0; i < ackSet_.size(); i++) {
+        output.writeInt64NoTag(ackSet_.get(i));
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeUInt64Size(1, ledgerId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeUInt64Size(2, entryId_);
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeInt32Size(3, partition_);
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeInt32Size(4, batchIndex_);
+      }
+      {
+        int dataSize = 0;
+        for (int i = 0; i < ackSet_.size(); i++) {
+          dataSize += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+            .computeInt64SizeNoTag(ackSet_.get(i));
+        }
+        size += dataSize;
+        if (!getAckSetList().isEmpty()) {
+          size += 1;
+          size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+              .computeInt32SizeNoTag(dataSize);
+        }
+        ackSetMemoizedSerializedSize = dataSize;
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom(byte[] data)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom(
+        byte[] data,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom(
+        java.io.InputStream input,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseDelimitedFrom(
+        java.io.InputStream input,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.pulsar.common.api.proto.TestApi.MessageIdData prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite.Builder<
+          org.apache.pulsar.common.api.proto.TestApi.MessageIdData, Builder>
+        implements org.apache.pulsar.common.api.proto.TestApi.MessageIdDataOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
+      // Construct using org.apache.pulsar.common.api.proto.TestApi.MessageIdData.newBuilder()
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
+        this.handle = handle;
+        maybeForceBuilderInitialization();
+      }
+      private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
+               return new Builder(handle);
+             }
+            };
+      
+       public void recycle() {
+                clear();
+                if (handle != null) {RECYCLER.recycle(this, handle);}
+            }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return RECYCLER.get();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        ledgerId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        entryId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        partition_ = -1;
+        bitField0_ = (bitField0_ & ~0x00000004);
+        batchIndex_ = -1;
+        bitField0_ = (bitField0_ & ~0x00000008);
+        ackSet_ = java.util.Collections.emptyList();;
+        bitField0_ = (bitField0_ & ~0x00000010);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public org.apache.pulsar.common.api.proto.TestApi.MessageIdData getDefaultInstanceForType() {
+        return org.apache.pulsar.common.api.proto.TestApi.MessageIdData.getDefaultInstance();
+      }
+      
+      public org.apache.pulsar.common.api.proto.TestApi.MessageIdData build() {
+        org.apache.pulsar.common.api.proto.TestApi.MessageIdData result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.pulsar.common.api.proto.TestApi.MessageIdData buildParsed()
+          throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+        org.apache.pulsar.common.api.proto.TestApi.MessageIdData result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.pulsar.common.api.proto.TestApi.MessageIdData buildPartial() {
+        org.apache.pulsar.common.api.proto.TestApi.MessageIdData result = org.apache.pulsar.common.api.proto.TestApi.MessageIdData.RECYCLER.get();
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.ledgerId_ = ledgerId_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.entryId_ = entryId_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.partition_ = partition_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.batchIndex_ = batchIndex_;
+        if (((bitField0_ & 0x00000010) == 0x00000010)) {
+          ackSet_ = java.util.Collections.unmodifiableList(ackSet_);
+          bitField0_ = (bitField0_ & ~0x00000010);
+        }
+        result.ackSet_ = ackSet_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder mergeFrom(org.apache.pulsar.common.api.proto.TestApi.MessageIdData other) {
+        if (other == org.apache.pulsar.common.api.proto.TestApi.MessageIdData.getDefaultInstance()) return this;
+        if (other.hasLedgerId()) {
+          setLedgerId(other.getLedgerId());
+        }
+        if (other.hasEntryId()) {
+          setEntryId(other.getEntryId());
+        }
+        if (other.hasPartition()) {
+          setPartition(other.getPartition());
+        }
+        if (other.hasBatchIndex()) {
+          setBatchIndex(other.getBatchIndex());
+        }
+        if (!other.ackSet_.isEmpty()) {
+          if (ackSet_.isEmpty()) {
+            ackSet_ = other.ackSet_;
+            bitField0_ = (bitField0_ & ~0x00000010);
+          } else {
+            ensureAckSetIsMutable();
+            ackSet_.addAll(other.ackSet_);
+          }
+          
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasLedgerId()) {
+          
+          return false;
+        }
+        if (!hasEntryId()) {
+          
+          return false;
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input,
+                              org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+                              throws java.io.IOException {
+         throw new java.io.IOException("Merge from CodedInputStream is disabled");
+                              }
+      public Builder mergeFrom(
+          org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+          org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!input.skipField(tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              ledgerId_ = input.readUInt64();
+              break;
+            }
+            case 16: {
+              bitField0_ |= 0x00000002;
+              entryId_ = input.readUInt64();
+              break;
+            }
+            case 24: {
+              bitField0_ |= 0x00000004;
+              partition_ = input.readInt32();
+              break;
+            }
+            case 32: {
+              bitField0_ |= 0x00000008;
+              batchIndex_ = input.readInt32();
+              break;
+            }
+            case 40: {
+              ensureAckSetIsMutable();
+              ackSet_.add(input.readInt64());
+              break;
+            }
+            case 42: {
+              int length = input.readRawVarint32();
+              int limit = input.pushLimit(length);
+              while (input.getBytesUntilLimit() > 0) {
+                addAckSet(input.readInt64());
+              }
+              input.popLimit(limit);
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required uint64 ledgerId = 1;
+      private long ledgerId_ ;
+      public boolean hasLedgerId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public long getLedgerId() {
+        return ledgerId_;
+      }
+      public Builder setLedgerId(long value) {
+        bitField0_ |= 0x00000001;
+        ledgerId_ = value;
+        
+        return this;
+      }
+      public Builder clearLedgerId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        ledgerId_ = 0L;
+        
+        return this;
+      }
+      
+      // required uint64 entryId = 2;
+      private long entryId_ ;
+      public boolean hasEntryId() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public long getEntryId() {
+        return entryId_;
+      }
+      public Builder setEntryId(long value) {
+        bitField0_ |= 0x00000002;
+        entryId_ = value;
+        
+        return this;
+      }
+      public Builder clearEntryId() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        entryId_ = 0L;
+        
+        return this;
+      }
+      
+      // optional int32 partition = 3 [default = -1];
+      private int partition_ = -1;
+      public boolean hasPartition() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public int getPartition() {
+        return partition_;
+      }
+      public Builder setPartition(int value) {
+        bitField0_ |= 0x00000004;
+        partition_ = value;
+        
+        return this;
+      }
+      public Builder clearPartition() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        partition_ = -1;
+        
+        return this;
+      }
+      
+      // optional int32 batch_index = 4 [default = -1];
+      private int batchIndex_ = -1;
+      public boolean hasBatchIndex() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      public int getBatchIndex() {
+        return batchIndex_;
+      }
+      public Builder setBatchIndex(int value) {
+        bitField0_ |= 0x00000008;
+        batchIndex_ = value;
+        
+        return this;
+      }
+      public Builder clearBatchIndex() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        batchIndex_ = -1;
+        
+        return this;
+      }
+      
+      // repeated int64 ack_set = 5 [packed = true];
+      private java.util.List<java.lang.Long> ackSet_ = java.util.Collections.emptyList();;
+      private void ensureAckSetIsMutable() {
+        if (!((bitField0_ & 0x00000010) == 0x00000010)) {
+          ackSet_ = new java.util.ArrayList<java.lang.Long>(ackSet_);
+          bitField0_ |= 0x00000010;
+         }
+      }
+      public java.util.List<java.lang.Long>
+          getAckSetList() {
+        return java.util.Collections.unmodifiableList(ackSet_);
+      }
+      public int getAckSetCount() {
+        return ackSet_.size();
+      }
+      public long getAckSet(int index) {
+        return ackSet_.get(index);
+      }
+      public Builder setAckSet(
+          int index, long value) {
+        ensureAckSetIsMutable();
+        ackSet_.set(index, value);
+        
+        return this;
+      }
+      public Builder addAckSet(long value) {
+        ensureAckSetIsMutable();
+        ackSet_.add(value);
+        
+        return this;
+      }
+      public Builder addAllAckSet(
+          java.lang.Iterable<? extends java.lang.Long> values) {
+        ensureAckSetIsMutable();
+        super.addAll(values, ackSet_);
+        
+        return this;
+      }
+      public Builder clearAckSet() {
+        ackSet_ = java.util.Collections.emptyList();;
+        bitField0_ = (bitField0_ & ~0x00000010);
+        
+        return this;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.proto.MessageIdData)
+    }
+    
+    static {
+      defaultInstance = new MessageIdData(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.proto.MessageIdData)
+  }
+  
+  
+  static {
+  }
+  
+  // @@protoc_insertion_point(outer_class_scope)
+}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/RepeatedLongNonPackedTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/RepeatedLongNonPackedTest.java
new file mode 100644
index 0000000..7c50d13
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/RepeatedLongNonPackedTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.protocol;
+
+import static org.testng.Assert.assertEquals;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
+import org.testng.annotations.Test;
+
+public class RepeatedLongNonPackedTest {
+
+    @Test
+    public void testRepeatedLongPacked() throws Exception {
+        MessageIdData messageIdData = MessageIdData.newBuilder()
+            .setLedgerId(0L)
+            .setEntryId(0L)
+            .setPartition(0)
+            .setBatchIndex(0)
+            .addAckSet(1000)
+            .addAckSet(1001)
+            .addAckSet(1003)
+            .build();
+
+        int cmdSize = messageIdData.getSerializedSize();
+        ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(cmdSize);
+        ByteBufCodedOutputStream outputStream = ByteBufCodedOutputStream.get(buf);
+        messageIdData.writeTo(outputStream);
+
+        messageIdData.recycle();
+        outputStream.recycle();
+
+        ByteBufCodedInputStream inputStream = ByteBufCodedInputStream.get(buf);
+        MessageIdData newMessageIdData = MessageIdData.newBuilder()
+            .mergeFrom(inputStream, null)
+            .build();
+        inputStream.recycle();
+
+        assertEquals(3, newMessageIdData.getAckSetCount());
+        assertEquals(1000, newMessageIdData.getAckSet(0));
+        assertEquals(1001, newMessageIdData.getAckSet(1));
+        assertEquals(1003, newMessageIdData.getAckSet(2));
+        newMessageIdData.recycle();
+    }
+
+}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/RepeatedLongPackedTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/RepeatedLongPackedTest.java
new file mode 100644
index 0000000..e0569a8
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/RepeatedLongPackedTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.protocol;
+
+import static org.testng.Assert.assertEquals;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.proto.TestApi.MessageIdData;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
+import org.testng.annotations.Test;
+
+public class RepeatedLongPackedTest {
+
+    @Test
+    public void testRepeatedLongPacked() throws Exception {
+        MessageIdData messageIdData = MessageIdData.newBuilder()
+            .setLedgerId(0L)
+            .setEntryId(0L)
+            .setPartition(0)
+            .setBatchIndex(0)
+            .addAckSet(1000)
+            .addAckSet(1001)
+            .addAckSet(1003)
+            .build();
+
+        int cmdSize = messageIdData.getSerializedSize();
+        ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(cmdSize);
+        ByteBufCodedOutputStream outputStream = ByteBufCodedOutputStream.get(buf);
+        messageIdData.writeTo(outputStream);
+
+        messageIdData.recycle();
+        outputStream.recycle();
+
+        ByteBufCodedInputStream inputStream = ByteBufCodedInputStream.get(buf);
+        MessageIdData newMessageIdData = MessageIdData.newBuilder()
+            .mergeFrom(inputStream, null)
+            .build();
+        inputStream.recycle();
+
+        assertEquals(3, newMessageIdData.getAckSetCount());
+        assertEquals(1000, newMessageIdData.getAckSet(0));
+        assertEquals(1001, newMessageIdData.getAckSet(1));
+        assertEquals(1003, newMessageIdData.getAckSet(2));
+        newMessageIdData.recycle();
+    }
+
+}
diff --git a/pulsar-common/src/test/proto/TestApi.proto b/pulsar-common/src/test/proto/TestApi.proto
new file mode 100644
index 0000000..24c90e4
--- /dev/null
+++ b/pulsar-common/src/test/proto/TestApi.proto
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+syntax = "proto2";
+
+package pulsar.proto;
+option java_package = "org.apache.pulsar.common.api.proto";
+option optimize_for = LITE_RUNTIME;
+
+message MessageIdData {
+    required uint64 ledgerId = 1;
+    required uint64 entryId  = 2;
+    optional int32 partition = 3 [default = -1];
+    optional int32 batch_index = 4 [default = -1];
+    repeated int64 ack_set = 5 [packed = true];
+}
+


[pulsar] 08/25: Improve security setting of Pulsar Functions (#7578)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7df08cac3689afe0b5c870256d9cea565b71db1c
Author: 冉小龙 <rx...@apache.org>
AuthorDate: Sat Jul 18 09:38:19 2020 +0800

    Improve security setting of Pulsar Functions (#7578)
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
    
    
    ### Motivation
    
    Improve security setting of Pulsar Functions
    
    ### Modifications
    
    - Add `TLS Authentication` example
    - Improve `TLS Transport` config
    - Add `authorizationProvider` filed for **Authorization**
    
    (cherry picked from commit 2374cd9c4260d9528e911878c49dc107508a2067)
---
 site2/docs/functions-worker.md | 20 +++++++++++++++++++-
 1 file changed, 19 insertions(+), 1 deletion(-)

diff --git a/site2/docs/functions-worker.md b/site2/docs/functions-worker.md
index 0a4664f..35bfd9a 100644
--- a/site2/docs/functions-worker.md
+++ b/site2/docs/functions-worker.md
@@ -105,10 +105,17 @@ If you want to enable security on functions workers, you *should*:
 To enable TLS transport encryption, configure the following settings.
 
 ```
+useTLS: true
+pulsarServiceUrl: pulsar+ssl://localhost:6651/
+pulsarWebServiceUrl: https://localhost:8443
+
 tlsEnabled: true
 tlsCertificateFilePath: /path/to/functions-worker.cert.pem
 tlsKeyFilePath:         /path/to/functions-worker.key-pk8.pem
 tlsTrustCertsFilePath:  /path/to/ca.cert.pem
+
+// The path to trusted certificates used by the Pulsar client to authenticate with Pulsar brokers
+brokerClientTrustCertsFilePath: /path/to/ca.cert.pem
 ```
 
 For details on TLS encryption, refer to [Transport Encryption using TLS](security-tls-transport.md).
@@ -124,6 +131,16 @@ authenticationEnabled: true
 authenticationProviders: [ provider1, provider2 ]
 ```
 
+For *TLS Authentication* provider, follow the example below to add the necessary settings.
+See [TLS Authentication](security-tls-authentication.md) for more details.
+```
+brokerClientAuthenticationPlugin: org.apache.pulsar.client.impl.auth.AuthenticationTls
+brokerClientAuthenticationParameters: tlsCertFile:/path/to/admin.cert.pem,tlsKeyFile:/path/to/admin.key-pk8.pem
+
+authenticationEnabled: true
+authenticationProviders: ['org.apache.pulsar.broker.authentication.AuthenticationProviderTls']
+```
+
 For *SASL Authentication* provider, add `saslJaasClientAllowedIds` and `saslJaasBrokerSectionName`
 under `properties` if needed. 
 
@@ -144,10 +161,11 @@ properties:
 
 ##### Enable Authorization Provider
 
-To enable authorization on Functions Worker, you need to configure `authorizationEnabled` and `configurationStoreServers`. The authentication provider connects to `configurationStoreServers` to receive namespace policies.
+To enable authorization on Functions Worker, you need to configure `authorizationEnabled`, `authorizationProvider` and `configurationStoreServers`. The authentication provider connects to `configurationStoreServers` to receive namespace policies.
 
 ```yaml
 authorizationEnabled: true
+authorizationProvider: org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
 configurationStoreServers: <configuration-store-servers>
 ```
 


[pulsar] 13/25: fix the command for starting bookies in the foreground (#7596)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b246da98433a34e74f23f7b5a220e58d14d3c132
Author: sijia-w <53...@users.noreply.github.com>
AuthorDate: Sun Jul 19 05:55:23 2020 +0200

    fix the command for starting bookies in the foreground (#7596)
    
    
    (cherry picked from commit 9475b450201869b3ddfd8a94aec198bbb2b5015c)
---
 site2/docs/deploy-bare-metal.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/site2/docs/deploy-bare-metal.md b/site2/docs/deploy-bare-metal.md
index 3c1e4c2..da3682d 100644
--- a/site2/docs/deploy-bare-metal.md
+++ b/site2/docs/deploy-bare-metal.md
@@ -285,7 +285,7 @@ $ bin/pulsar-daemon start bookie
 To start the bookie in the foreground:
 
 ```bash
-$ bin/bookkeeper bookie
+$ bin/pulsar bookie
 ```
 
 You can verify that a bookie works properly by running the `bookiesanity` command on the [BookKeeper shell](reference-cli-tools.md#shell):


[pulsar] 18/25: fix batchReceiveAsync not completed exceptionally when closing Consumer (#7661)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 180e5f26e4afdfda178e7253fb22a8b21b2c7f54
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Tue Jul 28 14:02:34 2020 +0800

    fix batchReceiveAsync not completed exceptionally when closing Consumer (#7661)
    
    ### Motivation
    
    CompletableFuture<Messages<T>> from Consumer.batchReceiveAsync() not completed exceptionnally when closing Consumer.
    
    ### Modifications
    
    pendingBatchReceives was not cleaned up when the connection was closed, so I added pendingBatchReceives cleanup.
    
    (cherry picked from commit 48156ad9a5c2e0d85813367bcaaf6ea845fffc2c)
---
 .../client/api/SimpleProducerConsumerTest.java     | 74 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    | 26 +++++++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 34 +++-------
 .../client/impl/MultiTopicsConsumerImpl.java       | 11 +---
 4 files changed, 111 insertions(+), 34 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index dce6c10..669e259 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -3352,4 +3352,78 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         }
         log.info("-- Exiting {} test --", methodName);
     }
+
+    @Test(timeOut = 5000)
+    public void testReceiveAsyncCompletedWhenClosing() throws Exception {
+        final String topic = "persistent://my-property/my-ns/testCompletedWhenClosing";
+        final String partitionedTopic = "persistent://my-property/my-ns/testCompletedWhenClosing-partitioned";
+        final String errorMsg = "cleaning and closing the consumers";
+        BatchReceivePolicy batchReceivePolicy
+                = BatchReceivePolicy.builder().maxNumBytes(10 * 1024).maxNumMessages(10).timeout(-1, TimeUnit.SECONDS).build();
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic).subscriptionName("my-subscriber-name")
+                .batchReceivePolicy(batchReceivePolicy).subscribe();
+        // 1) Test receiveAsync is interrupted
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        new Thread(() -> {
+            try {
+                consumer.receiveAsync().get();
+                Assert.fail("should be interrupted");
+            } catch (Exception e) {
+                Assert.assertTrue(e.getMessage().contains(errorMsg));
+                countDownLatch.countDown();
+            }
+        }).start();
+        new Thread(() -> {
+            try {
+                consumer.close();
+            } catch (PulsarClientException ignore) {
+            }
+        }).start();
+        countDownLatch.await();
+
+        // 2) Test batchReceiveAsync is interrupted
+        CountDownLatch countDownLatch2 = new CountDownLatch(1);
+        Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic).subscriptionName("my-subscriber-name")
+                .batchReceivePolicy(batchReceivePolicy).subscribe();
+        new Thread(() -> {
+            try {
+                consumer2.batchReceiveAsync().get();
+                Assert.fail("should be interrupted");
+            } catch (Exception e) {
+                Assert.assertTrue(e.getMessage().contains(errorMsg));
+                countDownLatch2.countDown();
+            }
+        }).start();
+        new Thread(() -> {
+            try {
+                consumer2.close();
+            } catch (PulsarClientException ignore) {
+            }
+        }).start();
+        countDownLatch2.await();
+        // 3) Test partitioned topic batchReceiveAsync is interrupted
+        CountDownLatch countDownLatch3 = new CountDownLatch(1);
+        admin.topics().createPartitionedTopic(partitionedTopic, 3);
+        Consumer<String> partitionedTopicConsumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(partitionedTopic).subscriptionName("my-subscriber-name-partitionedTopic")
+                .batchReceivePolicy(batchReceivePolicy).subscribe();
+        new Thread(() -> {
+            try {
+                partitionedTopicConsumer.batchReceiveAsync().get();
+                Assert.fail("should be interrupted");
+            } catch (Exception e) {
+                Assert.assertTrue(e.getMessage().contains(errorMsg));
+                countDownLatch3.countDown();
+            }
+        }).start();
+        new Thread(() -> {
+            try {
+                partitionedTopicConsumer.close();
+            } catch (PulsarClientException ignore) {
+            }
+        }).start();
+        countDownLatch3.await();
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 3d45931..5391cb6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -195,6 +195,30 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         }
     }
 
+    protected void failPendingReceives(ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives) {
+        while (!pendingReceives.isEmpty()) {
+            CompletableFuture<Message<T>> receiveFuture = pendingReceives.poll();
+            if (receiveFuture == null) {
+                break;
+            }
+            receiveFuture.completeExceptionally(
+                    new PulsarClientException.AlreadyClosedException(String.format("The consumer which subscribes the topic %s with subscription name %s " +
+                            "was already closed when cleaning and closing the consumers", topic, subscription)));
+        }
+    }
+
+    protected void failPendingBatchReceives(ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives) {
+        while (!pendingBatchReceives.isEmpty()) {
+            OpBatchReceive<T> opBatchReceive = pendingBatchReceives.poll();
+            if (opBatchReceive == null || opBatchReceive.future == null) {
+                break;
+            }
+            opBatchReceive.future.completeExceptionally(
+                    new PulsarClientException.AlreadyClosedException(String.format("The consumer which subscribes the topic %s with subscription name %s " +
+                            "was already closed when cleaning and closing the consumers", topic, subscription)));
+        }
+    }
+
     abstract protected Messages<T> internalBatchReceive() throws PulsarClientException;
 
     abstract protected CompletableFuture<Messages<T>> internalBatchReceiveAsync();
@@ -405,7 +429,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
                                                              TransactionImpl txn);
 
     protected abstract CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType,
-                                                                Map<String,Long> properties, 
+                                                                Map<String,Long> properties,
                                                                 long delayTime,
                                                                 TimeUnit unit);
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 1757f1e..10320da 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -25,7 +25,6 @@ import static org.apache.pulsar.common.protocol.Commands.readChecksum;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -40,8 +39,6 @@ import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -84,7 +81,6 @@ import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.client.util.RetryMessageUtil;
-import org.apache.pulsar.common.api.proto.PulsarApi.IntRange;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
@@ -172,9 +168,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     private volatile Producer<T> retryLetterProducer;
     private final ReadWriteLock createProducerLock = new ReentrantReadWriteLock();
-    
+
     protected volatile boolean paused;
-    
+
     protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap = new ConcurrentOpenHashMap<>();
     private int pendingChunckedMessageCount = 0;
     protected long expireTimeOfIncompleteChunkedMessageMillis = 0;
@@ -560,7 +556,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     @SuppressWarnings("unchecked")
     @Override
     protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType,
-                                                       Map<String,Long> properties, 
+                                                       Map<String,Long> properties,
                                                        long delayTime,
                                                        TimeUnit unit) {
         MessageId messageId = message.getMessageId();
@@ -620,7 +616,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 if (propertiesMap.containsKey(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
                     reconsumetimes = Integer.valueOf(propertiesMap.get(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES));
                     reconsumetimes = reconsumetimes + 1;
-                   
+
                 } else {
                     propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, originTopicNameStr);
                     propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr);
@@ -628,7 +624,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
                 propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, String.valueOf(reconsumetimes));
                 propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME, String.valueOf(unit.toMillis(delayTime)));
-                
+
                if (reconsumetimes > this.deadLetterPolicy.getMaxRedeliverCount()) {
                    processPossibleToDLQ((MessageIdImpl)messageId);
                     if (deadLetterProducer == null) {
@@ -996,18 +992,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         lock.readLock().lock();
         try {
             if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
-                while (!pendingReceives.isEmpty()) {
-                    CompletableFuture<Message<T>> receiveFuture = pendingReceives.poll();
-                    if (receiveFuture != null) {
-                        receiveFuture.completeExceptionally(
-                            new PulsarClientException.AlreadyClosedException(
-                                String.format("The consumer which subscribes the topic %s with subscription name %s " +
-                                        "was already closed when cleaning and closing the consumers",
-                                    topicName.toString(), subscription)));
-                    } else {
-                        break;
-                    }
-                }
+                failPendingReceives(this.pendingReceives);
+                failPendingBatchReceives(this.pendingBatchReceives);
             }
         } finally {
             lock.readLock().unlock();
@@ -1083,7 +1069,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         // if message is not decryptable then it can't be parsed as a batch-message. so, add EncyrptionCtx to message
         // and return undecrypted payload
         if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) {
-            
+
             // right now, chunked messages are only supported by non-shared subscription
             if (isChunkedMessage) {
                 uncompressedPayload = processMessageChunk(uncompressedPayload, msgMetadata, msgId, messageId, cnx);
@@ -1152,7 +1138,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                     TimeUnit.MILLISECONDS);
             expireChunkMessageTaskScheduled = true;
         }
-        
+
         if (msgMetadata.getChunkId() == 0) {
             ByteBuf chunkedMsgBuffer = Unpooled.directBuffer(msgMetadata.getTotalChunkMsgSize(),
                     msgMetadata.getTotalChunkMsgSize());
@@ -1222,7 +1208,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         compressedPayload.release();
         return uncompressedPayload;
     }
-    
+
     protected void triggerListener(int numMessages) {
         // Trigger the notification on the message listener in a separate thread to avoid blocking the networking
         // thread while the message processing happens
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 7f44836..50e4db9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -566,15 +566,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         lock.readLock().lock();
         try {
             if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
-                while (!pendingReceives.isEmpty()) {
-                    CompletableFuture<Message<T>> receiveFuture = pendingReceives.poll();
-                    if (receiveFuture != null) {
-                        receiveFuture.completeExceptionally(
-                                new PulsarClientException.AlreadyClosedException("Consumer is already closed"));
-                    } else {
-                        break;
-                    }
-                }
+                failPendingReceives(pendingReceives);
+                failPendingBatchReceives(pendingBatchReceives);
             }
         } finally {
             lock.readLock().unlock();


[pulsar] 14/25: Support configuring DeleteInactiveTopic setting in namespace policy (#7598)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d4a1ca59dafa1a8f4050bbaa442543250f789bf2
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Tue Jul 28 14:25:58 2020 +0800

    Support configuring DeleteInactiveTopic setting in namespace policy (#7598)
    
    ### Motivation
    
    Support configuring DeleteInactiveTopic setting in namespace policy
    
    ### Modifications
    
    Only the two parameters `brokerDeleteInactiveTopicsMode` and `brokerDeleteInactiveTopicsMaxInactiveDurationSeconds` support namespace policy. The parameters are changed to Map structure, the key is the namespace, and the value is the parameter value.
    Such as: namespace1=delete_when_no_subscriptions, namespace2=delete_when_no_subscriptions.
    
    In addition, there is a key name called `default`. If it is set, other namespaces that do not specify parameters will use this parameter.
    Such as: default=delete_when_no_subscriptions
    
    (cherry picked from commit 00e30895b22129d5189db1592851bdb62e8c498b)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  48 ++++--
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  36 +++++
 .../pulsar/broker/service/AbstractTopic.java       |  28 +++-
 .../pulsar/broker/service/BrokerService.java       |   8 +-
 .../org/apache/pulsar/broker/service/Topic.java    |   2 +-
 .../service/nonpersistent/NonPersistentTopic.java  |  23 ++-
 .../broker/service/persistent/PersistentTopic.java |  21 ++-
 .../broker/service/persistent/SystemTopic.java     |   2 +-
 .../pulsar/broker/service/BrokerTestBase.java      |   7 +-
 .../broker/service/InactiveTopicDeleteTest.java    | 165 +++++++++++++++++++++
 .../service/PersistentTopicConcurrentTest.java     |   4 +-
 .../org/apache/pulsar/client/admin/Namespaces.java |  48 ++++++
 .../client/admin/internal/NamespacesImpl.java      |  81 ++++++++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  14 +-
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  73 ++++++++-
 ...{PolicyName.java => InactiveTopicPolicies.java} |  34 ++---
 .../pulsar/common/policies/data/Policies.java      |   6 +-
 .../pulsar/common/policies/data/PolicyName.java    |   1 +
 18 files changed, 543 insertions(+), 58 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 812ebd0..2607673 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -28,6 +28,8 @@ import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_PO
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.collect.Sets.SetView;
+
+import java.lang.reflect.Field;
 import java.net.URI;
 import java.net.URL;
 import java.util.Collections;
@@ -92,6 +94,7 @@ import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
 import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.zookeeper.KeeperException;
@@ -1812,39 +1815,64 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
-    protected void internalSetDelayedDelivery(DelayedDeliveryPolicies delayedDeliveryPolicies) {
+    protected InactiveTopicPolicies internalGetInactiveTopic() {
+        validateNamespacePolicyOperation(namespaceName, PolicyName.INACTIVE_TOPIC, PolicyOperation.READ);
+
+        Policies policies = getNamespacePolicies(namespaceName);
+        if (policies.inactive_topic_policies == null) {
+            return new InactiveTopicPolicies(config().getBrokerDeleteInactiveTopicsMode()
+                    , config().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds()
+                    , config().isBrokerDeleteInactiveTopicsEnabled());
+        } else {
+            return policies.inactive_topic_policies;
+        }
+    }
+
+    protected void internalSetInactiveTopic(InactiveTopicPolicies inactiveTopicPolicies){
         validateSuperUserAccess();
         validatePoliciesReadOnlyAccess();
+        internalSetPolicies("inactive_topic_policies", inactiveTopicPolicies);
+    }
 
+    protected void internalSetPolicies(String fieldName, Object value){
         try {
             Stat nodeStat = new Stat();
             final String path = path(POLICIES, namespaceName.toString());
             byte[] content = globalZk().getData(path, null, nodeStat);
             Policies policies = jsonMapper().readValue(content, Policies.class);
 
-            policies.delayed_delivery_policies = delayedDeliveryPolicies;
+            Field field = Policies.class.getDeclaredField(fieldName);
+            field.setAccessible(true);
+            field.set(policies, value);
+
             globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
             policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
-            log.info("[{}] Successfully updated delayed delivery messages configuration: namespace={}, map={}", clientAppId(),
-                    namespaceName, jsonMapper().writeValueAsString(policies.retention_policies));
+            log.info("[{}] Successfully updated {} configuration: namespace={}, value={}", clientAppId(), fieldName,
+                    namespaceName, jsonMapper().writeValueAsString(value));
 
         } catch (KeeperException.NoNodeException e) {
-            log.warn("[{}] Failed to update delayed delivery messages configuration for namespace {}: does not exist", clientAppId(),
-                    namespaceName);
+            log.warn("[{}] Failed to update {} configuration for namespace {}: does not exist", clientAppId(),
+                    fieldName, namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
-            log.warn("[{}] Failed to update delayed delivery messages configuration for namespace {}: concurrent modification",
-                    clientAppId(), namespaceName);
+            log.warn("[{}] Failed to update {} configuration for namespace {}: concurrent modification",
+                    clientAppId(), fieldName, namespaceName);
             throw new RestException(Status.CONFLICT, "Concurrent modification");
         } catch (RestException pfe) {
             throw pfe;
         } catch (Exception e) {
-            log.error("[{}] Failed to update delayed delivery messages configuration for namespace {}", clientAppId(), namespaceName,
-                    e);
+            log.error("[{}] Failed to update {} configuration for namespace {}", clientAppId(), fieldName
+                    , namespaceName, e);
             throw new RestException(e);
         }
     }
 
+    protected void internalSetDelayedDelivery(DelayedDeliveryPolicies delayedDeliveryPolicies) {
+        validateSuperUserAccess();
+        validatePoliciesReadOnlyAccess();
+        internalSetPolicies("delayed_delivery_policies", delayedDeliveryPolicies);
+    }
+
     protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) {
         validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 4c082d8..4c1b879 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -68,6 +68,7 @@ import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrat
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 
 import org.apache.pulsar.common.policies.data.TenantOperation;
 import org.slf4j.Logger;
@@ -857,6 +858,41 @@ public class Namespaces extends NamespacesBase {
     }
 
     @GET
+    @Path("/{tenant}/{namespace}/inactiveTopicPolicies")
+    @ApiOperation(value = "Get inactive topic policies config on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"), })
+    public InactiveTopicPolicies getInactiveTopicPolicies(@PathParam("tenant") String tenant,
+                                                              @PathParam("namespace") String namespace) {
+        validateNamespaceName(tenant, namespace);
+        return internalGetInactiveTopic();
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/inactiveTopicPolicies")
+    @ApiOperation(value = "Remove inactive topic policies from a namespace.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void removeInactiveTopicPolicies(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
+        validateNamespaceName(tenant, namespace);
+        internalSetInactiveTopic( null);
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/inactiveTopicPolicies")
+    @ApiOperation(value = "Set inactive topic policies config on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), })
+    public void setInactiveTopicPolicies(@PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Inactive topic policies for the specified namespace") InactiveTopicPolicies inactiveTopicPolicies) {
+        validateNamespaceName(tenant, namespace);
+        internalSetInactiveTopic(inactiveTopicPolicies);
+    }
+
+    @GET
     @Path("/{tenant}/{namespace}/maxProducersPerTopic")
     @ApiOperation(value = "Get maxProducersPerTopic config on a namespace.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 4a02f1f..09a0521 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -36,6 +36,8 @@ import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -63,8 +65,8 @@ public abstract class AbstractTopic implements Topic {
 
     protected volatile boolean isFenced;
 
-    // When set to false, this inactive topic can not be deleted
-    protected boolean deleteWhileInactive;
+    // Inactive topic policies
+    protected InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies();
 
     // Timestamp of when this topic was last seen active
     protected volatile long lastActive;
@@ -98,8 +100,9 @@ public abstract class AbstractTopic implements Topic {
         this.producers = new ConcurrentHashMap<>();
         this.isFenced = false;
         this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
-        this.deleteWhileInactive =
-                brokerService.pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled();
+        this.inactiveTopicPolicies.setDeleteWhileInactive(brokerService.pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled());
+        this.inactiveTopicPolicies.setMaxInactiveDurationSeconds(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds());
+        this.inactiveTopicPolicies.setInactiveTopicDeleteMode(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMode());
         this.lastActive = System.nanoTime();
         Policies policies = null;
         try {
@@ -132,12 +135,14 @@ public abstract class AbstractTopic implements Topic {
         return false;
     }
 
+    @Override
     public void disableCnxAutoRead() {
         if (producers != null) {
             producers.values().forEach(producer -> producer.getCnx().disableCnxAutoRead());
         }
     }
 
+    @Override
     public void enableCnxAutoRead() {
         if (producers != null) {
             producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead());
@@ -466,12 +471,23 @@ public abstract class AbstractTopic implements Topic {
     }
 
     public boolean isDeleteWhileInactive() {
-        return deleteWhileInactive;
+        return this.inactiveTopicPolicies.isDeleteWhileInactive();
     }
 
     public void setDeleteWhileInactive(boolean deleteWhileInactive) {
-        this.deleteWhileInactive = deleteWhileInactive;
+        this.inactiveTopicPolicies.setDeleteWhileInactive(deleteWhileInactive);
     }
 
     private static final Logger log = LoggerFactory.getLogger(AbstractTopic.class);
+
+    public InactiveTopicPolicies getInactiveTopicPolicies() {
+        return inactiveTopicPolicies;
+    }
+
+    public void resetInactiveTopicPolicies(InactiveTopicDeleteMode inactiveTopicDeleteMode
+            , int maxInactiveDurationSeconds, boolean deleteWhileInactive) {
+        inactiveTopicPolicies.setInactiveTopicDeleteMode(inactiveTopicDeleteMode);
+        inactiveTopicPolicies.setMaxInactiveDurationSeconds(maxInactiveDurationSeconds);
+        inactiveTopicPolicies.setDeleteWhileInactive(deleteWhileInactive);
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 4286e57..b4dbdcd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -441,8 +441,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
     protected void startInactivityMonitor() {
         if (pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled()) {
             int interval = pulsar().getConfiguration().getBrokerDeleteInactiveTopicsFrequencySeconds();
-            int maxInactiveDurationInSec = pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds();
-            inactivityMonitor.scheduleAtFixedRate(safeRun(() -> checkGC(maxInactiveDurationInSec)), interval, interval,
+            inactivityMonitor.scheduleAtFixedRate(safeRun(() -> checkGC()), interval, interval,
                     TimeUnit.SECONDS);
         }
 
@@ -1243,9 +1242,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         return lookupRequestSemaphore.get();
     }
 
-    public void checkGC(int maxInactiveDurationInSec) {
-        forEachTopic(topic -> topic.checkGC(maxInactiveDurationInSec,
-            pulsar.getConfiguration().getBrokerDeleteInactiveTopicsMode()));
+    public void checkGC() {
+        forEachTopic(Topic::checkGC);
     }
 
     public void checkMessageExpiry() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index d20e700..4d14326 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -124,7 +124,7 @@ public interface Topic {
 
     CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect);
 
-    void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode);
+    void checkGC();
 
     void checkInactiveSubscriptions();
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index eb8d9d3..5a4d7cd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -40,6 +40,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerService;
@@ -68,7 +69,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
-import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
 import org.apache.pulsar.common.policies.data.NonPersistentReplicatorStats;
 import org.apache.pulsar.common.policies.data.NonPersistentSubscriptionStats;
@@ -145,6 +145,9 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
                     .orElseThrow(() -> new KeeperException.NoNodeException());
             isEncryptionRequired = policies.encryption_required;
             isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
+            if (policies.inactive_topic_policies != null) {
+                inactiveTopicPolicies = policies.inactive_topic_policies;
+            }
             setSchemaCompatibilityStrategy(policies);
 
             schemaValidationEnforced = policies.schema_validation_enforced;
@@ -420,7 +423,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
 
     /**
      * Close this topic - close all producers and subscriptions associated with this topic
-     * 
+     *
      * @param closeWithoutWaitingClientDisconnect
      *            don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
@@ -626,6 +629,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
         return replicators.get(remoteCluster);
     }
 
+    @Override
     public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats,
             StatsOutputStream topicStatsStream, ClusterReplicationMetrics replStats, String namespace,
             boolean hydratePublishers) {
@@ -755,6 +759,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
         topicStatsStream.endObject();
     }
 
+    @Override
     public NonPersistentTopicStats getStats(boolean getPreciseBacklog) {
 
         NonPersistentTopicStats stats = new NonPersistentTopicStats();
@@ -808,6 +813,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
         return stats;
     }
 
+    @Override
     public PersistentTopicInternalStats getInternalStats() {
 
         PersistentTopicInternalStats stats = new PersistentTopicInternalStats();
@@ -829,11 +835,12 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
     }
 
     @Override
-    public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode) {
-        if (!deleteWhileInactive) {
+    public void checkGC() {
+        if (!isDeleteWhileInactive()) {
             // This topic is not included in GC
             return;
         }
+        int maxInactiveDurationInSec = inactiveTopicPolicies.getMaxInactiveDurationSeconds();
         if (isActive()) {
             lastActive = System.nanoTime();
         } else {
@@ -895,6 +902,14 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
             producer.checkEncryption();
         });
         subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(Consumer::checkPermissions));
+
+        if (data.inactive_topic_policies != null) {
+            this.inactiveTopicPolicies = data.inactive_topic_policies;
+        } else {
+            ServiceConfiguration cfg = brokerService.getPulsar().getConfiguration();
+            resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
+                    , cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), cfg.isBrokerDeleteInactiveTopicsEnabled());
+        }
         return checkReplicationAndRetryOnFailure();
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 2b2e14b..16c2ac9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -62,6 +62,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerService;
@@ -254,6 +255,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
 
             schemaValidationEnforced = policies.schema_validation_enforced;
+            if (policies.inactive_topic_policies != null) {
+                inactiveTopicPolicies = policies.inactive_topic_policies;
+            }
 
             maxUnackedMessagesOnConsumer = unackedMessagesExceededOnConsumer(policies);
             maxUnackedMessagesOnSubscription = unackedMessagesExceededOnSubscription(policies);
@@ -1279,6 +1283,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         return ledger;
     }
 
+    @Override
     public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats, StatsOutputStream topicStatsStream,
             ClusterReplicationMetrics replStats, String namespace, boolean hydratePublishers) {
 
@@ -1494,6 +1499,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         return lastUpdatedAvgPublishRateInByte;
     }
 
+    @Override
     public TopicStats getStats(boolean getPreciseBacklog) {
 
         TopicStats stats = new TopicStats();
@@ -1552,6 +1558,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         return stats;
     }
 
+    @Override
     public PersistentTopicInternalStats getInternalStats() {
         PersistentTopicInternalStats stats = new PersistentTopicInternalStats();
 
@@ -1634,11 +1641,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     }
 
     @Override
-    public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode) {
-        if (!deleteWhileInactive) {
+    public void checkGC() {
+        if (!isDeleteWhileInactive()) {
             // This topic is not included in GC
             return;
         }
+        InactiveTopicDeleteMode deleteMode = inactiveTopicPolicies.getInactiveTopicDeleteMode();
+        int maxInactiveDurationInSec = inactiveTopicPolicies.getMaxInactiveDurationSeconds();
         if (isActive(deleteMode)) {
             lastActive = System.nanoTime();
         } else if (System.nanoTime() - lastActive < TimeUnit.SECONDS.toNanos(maxInactiveDurationInSec)) {
@@ -1793,6 +1802,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             delayedDeliveryTickTimeMillis = data.delayed_delivery_policies.getTickTime();
             delayedDeliveryEnabled = data.delayed_delivery_policies.isActive();
         }
+        if (data.inactive_topic_policies != null) {
+            this.inactiveTopicPolicies = data.inactive_topic_policies;
+        } else {
+            ServiceConfiguration cfg = brokerService.getPulsar().getConfiguration();
+            resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
+                    , cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), cfg.isBrokerDeleteInactiveTopicsEnabled());
+        }
 
         initializeDispatchRateLimiterIfNeeded(Optional.ofNullable(data));
 
@@ -1983,6 +1999,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         return FutureUtil.failedFuture(new BrokerServiceException("Cursor not found"));
     }
 
+    @Override
     public Optional<DispatchRateLimiter> getDispatchRateLimiter() {
         return this.dispatchRateLimiter;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
index 6720209..4b338a9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
@@ -46,7 +46,7 @@ public class SystemTopic extends PersistentTopic {
     }
 
     @Override
-    public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode) {
+    public void checkGC() {
         // do nothing for system topic
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
index 314ccfa..ada2242 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
@@ -55,7 +55,12 @@ public abstract class BrokerTestBase extends MockedPulsarServiceBaseTest {
 
     void runGC() {
         try {
-            pulsar.getExecutor().submit(() -> pulsar.getBrokerService().checkGC(0)).get();
+            pulsar.getBrokerService().forEachTopic(topic -> {
+                if (topic instanceof AbstractTopic) {
+                    ((AbstractTopic) topic).getInactiveTopicPolicies().setMaxInactiveDurationSeconds(0);
+                }
+            });
+            pulsar.getExecutor().submit(() -> pulsar.getBrokerService().checkGC()).get();
             Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
         } catch (Exception e) {
             LOG.error("GC executor error", e);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
index c3d353b..143a4ff 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
@@ -19,10 +19,18 @@
 package org.apache.pulsar.broker.service;
 
 
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Sets;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Consumer;
 
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -68,6 +76,163 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
         super.internalCleanup();
     }
 
+    @Test(timeOut = 20000)
+    public void testTopicPolicyUpdateAndClean() throws Exception {
+        final String namespace = "prop/ns-abc";
+        final String namespace2 = "prop/ns-abc2";
+        final String namespace3 = "prop/ns-abc3";
+        List<String> namespaceList = Arrays.asList(namespace2, namespace3);
+
+        super.resetConfig();
+        conf.setBrokerDeleteInactiveTopicsEnabled(true);
+        conf.setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(1000);
+        conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
+        InactiveTopicPolicies defaultPolicy = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions
+                , 1000, true);
+
+        super.baseSetup();
+
+        for (String ns : namespaceList) {
+            admin.namespaces().createNamespace(ns);
+            admin.namespaces().setNamespaceReplicationClusters(ns, Sets.newHashSet("test"));
+        }
+
+        final String topic = "persistent://prop/ns-abc/testDeletePolicyUpdate";
+        final String topic2 = "persistent://prop/ns-abc2/testDeletePolicyUpdate";
+        final String topic3 = "persistent://prop/ns-abc3/testDeletePolicyUpdate";
+        List<String> topics = Arrays.asList(topic, topic2, topic3);
+
+        for (String tp : topics) {
+            admin.topics().createNonPartitionedTopic(tp);
+        }
+
+        InactiveTopicPolicies inactiveTopicPolicies =
+                new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true);
+        admin.namespaces().setInactiveTopicPolicies(namespace, inactiveTopicPolicies);
+        inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
+        admin.namespaces().setInactiveTopicPolicies(namespace2, inactiveTopicPolicies);
+        inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
+        admin.namespaces().setInactiveTopicPolicies(namespace3, inactiveTopicPolicies);
+
+        InactiveTopicPolicies policies;
+        //wait for zk
+        while (true) {
+            policies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
+            if (policies.isDeleteWhileInactive()) {
+                break;
+            }
+            Thread.sleep(1000);
+        }
+
+        Assert.assertTrue(policies.isDeleteWhileInactive());
+        Assert.assertEquals(policies.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_no_subscriptions);
+        Assert.assertEquals(policies.getMaxInactiveDurationSeconds(), 1);
+        Assert.assertEquals(policies, admin.namespaces().getInactiveTopicPolicies(namespace));
+
+        admin.namespaces().removeInactiveTopicPolicies(namespace);
+        while (true) {
+            Thread.sleep(500);
+            policies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
+            if (policies.getMaxInactiveDurationSeconds() == 1000) {
+                break;
+            }
+        }
+        Assert.assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies
+                , defaultPolicy);
+
+        policies = ((PersistentTopic)pulsar.getBrokerService().getTopic(topic2,false).get().get()).inactiveTopicPolicies;
+        Assert.assertTrue(policies.isDeleteWhileInactive());
+        Assert.assertEquals(policies.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
+        Assert.assertEquals(policies.getMaxInactiveDurationSeconds(), 1);
+        Assert.assertEquals(policies, admin.namespaces().getInactiveTopicPolicies(namespace2));
+
+        admin.namespaces().removeInactiveTopicPolicies(namespace2);
+        while (true) {
+            Thread.sleep(500);
+            policies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic2, false).get().get()).inactiveTopicPolicies;
+            if (policies.getMaxInactiveDurationSeconds() == 1000) {
+                break;
+            }
+        }
+        Assert.assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic2, false).get().get()).inactiveTopicPolicies
+                , defaultPolicy);
+
+        super.internalCleanup();
+    }
+
+    @Test(timeOut = 20000)
+    public void testDeleteWhenNoSubscriptionsWithMultiConfig() throws Exception {
+        final String namespace = "prop/ns-abc";
+        final String namespace2 = "prop/ns-abc2";
+        final String namespace3 = "prop/ns-abc3";
+        List<String> namespaceList = Arrays.asList(namespace2, namespace3);
+
+        conf.setBrokerDeleteInactiveTopicsEnabled(true);
+        conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
+        super.baseSetup();
+
+        for (String ns : namespaceList) {
+            admin.namespaces().createNamespace(ns);
+            admin.namespaces().setNamespaceReplicationClusters(ns, Sets.newHashSet("test"));
+        }
+
+        final String topic = "persistent://prop/ns-abc/testDeleteWhenNoSubscriptionsWithMultiConfig";
+        final String topic2 = "persistent://prop/ns-abc2/testDeleteWhenNoSubscriptionsWithMultiConfig";
+        final String topic3 = "persistent://prop/ns-abc3/testDeleteWhenNoSubscriptionsWithMultiConfig";
+        List<String> topics = Arrays.asList(topic, topic2, topic3);
+        //create producer/consumer and close
+        Map<String, String> topicToSub = new HashMap<>();
+        for (String tp : topics) {
+            Producer<byte[]> producer = pulsarClient.newProducer().topic(tp).create();
+            String subName = "sub" + System.currentTimeMillis();
+            topicToSub.put(tp, subName);
+            Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(tp).subscriptionName(subName).subscribe();
+            for (int i = 0; i < 10; i++) {
+                producer.send("Pulsar".getBytes());
+            }
+            consumer.close();
+            producer.close();
+            Thread.sleep(1);
+        }
+        // namespace use delete_when_no_subscriptions, namespace2 use delete_when_subscriptions_caught_up
+        // namespace3 use default:delete_when_no_subscriptions
+        InactiveTopicPolicies inactiveTopicPolicies =
+                new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions,1,true);
+        admin.namespaces().setInactiveTopicPolicies(namespace, inactiveTopicPolicies);
+        inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
+        admin.namespaces().setInactiveTopicPolicies(namespace2, inactiveTopicPolicies);
+
+        //wait for zk
+        while (true) {
+            InactiveTopicPolicies policies = ((PersistentTopic) pulsar.getBrokerService()
+                    .getTopic(topic, false).get().get()).inactiveTopicPolicies;
+            if (policies.isDeleteWhileInactive()) {
+                break;
+            }
+            Thread.sleep(100);
+        }
+
+        // topic should still exist
+        Thread.sleep(2000);
+        Assert.assertTrue(admin.topics().getList(namespace).contains(topic));
+        Assert.assertTrue(admin.topics().getList(namespace2).contains(topic2));
+        Assert.assertTrue(admin.topics().getList(namespace3).contains(topic3));
+
+        // no backlog, trigger delete_when_subscriptions_caught_up
+        admin.topics().skipAllMessages(topic2, topicToSub.remove(topic2));
+        Thread.sleep(2000);
+        Assert.assertFalse(admin.topics().getList(namespace2).contains(topic2));
+        // delete subscription, trigger delete_when_no_subscriptions
+        for (Map.Entry<String, String> entry : topicToSub.entrySet()) {
+            admin.topics().deleteSubscription(entry.getKey(), entry.getValue());
+        }
+        Thread.sleep(2000);
+        Assert.assertFalse(admin.topics().getList(namespace).contains(topic));
+        Assert.assertFalse(admin.topics().getList(namespace3).contains(topic3));
+
+        super.internalCleanup();
+    }
+
     @Test
     public void testDeleteWhenNoBacklogs() throws Exception {
         conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index cfc3937..12773b0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -196,7 +196,9 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase {
                     // Thread.sleep(5,0);
                     log.info("{} forcing topic GC ", Thread.currentThread());
                     for (int i = 0; i < 2000; i++) {
-                        topic.checkGC(0, InactiveTopicDeleteMode.delete_when_no_subscriptions);
+                        topic.getInactiveTopicPolicies().setMaxInactiveDurationSeconds(0);
+                        topic.getInactiveTopicPolicies().setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
+                        topic.checkGC();
                     }
                     log.info("GC done..");
                 } catch (Exception e) {
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 7cb7349..f4315e5 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -2207,6 +2208,53 @@ public interface Namespaces {
             String namespace, DelayedDeliveryPolicies delayedDeliveryPolicies);
 
     /**
+     * Get the inactive deletion strategy for all topics within a namespace synchronously.
+     * @param namespace
+     * @return
+     * @throws PulsarAdminException
+     */
+    InactiveTopicPolicies getInactiveTopicPolicies(String namespace) throws PulsarAdminException;
+
+    /**
+     * remove InactiveTopicPolicies from a namespace asynchronously.
+     * @param namespace
+     * @return
+     */
+    CompletableFuture<Void> removeInactiveTopicPoliciesAsync(String namespace);
+
+    /**
+     * Remove inactive topic policies from a namespace.
+     * @param namespace
+     * @throws PulsarAdminException
+     */
+    void removeInactiveTopicPolicies(String namespace) throws PulsarAdminException;
+
+    /**
+     * Get the inactive deletion strategy for all topics within a namespace asynchronously.
+     * @param namespace
+     * @return
+     */
+    CompletableFuture<InactiveTopicPolicies> getInactiveTopicPoliciesAsync(String namespace);
+
+    /**
+     * As same as setInactiveTopicPoliciesAsync,but it is synchronous.
+     * @param namespace
+     * @param inactiveTopicPolicies
+     */
+    void setInactiveTopicPolicies(
+            String namespace, InactiveTopicPolicies inactiveTopicPolicies) throws PulsarAdminException;
+
+    /**
+     * You can set the inactive deletion strategy at the namespace level.
+     * Its priority is higher than the inactive deletion strategy at the broker level.
+     * All topics under this namespace will follow this strategy.
+     * @param namespace
+     * @param inactiveTopicPolicies
+     * @return
+     */
+    CompletableFuture<Void> setInactiveTopicPoliciesAsync(
+            String namespace, InactiveTopicPolicies inactiveTopicPolicies);
+    /**
      * Set the given subscription auth mode on all topics on a namespace.
      *
      * @param namespace
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 1a9f010..83c6ce4 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -956,6 +957,28 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
     }
 
     @Override
+    public void removeInactiveTopicPolicies(String namespace) throws PulsarAdminException {
+        try {
+            removeInactiveTopicPoliciesAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeInactiveTopicPoliciesAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "inactiveTopicPolicies");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public CompletableFuture<Void> removeBacklogQuotaAsync(String namespace) {
         NamespaceName ns = NamespaceName.get(namespace);
         WebTarget path = namespacePath(ns, "backlogQuota")
@@ -1775,6 +1798,64 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
     }
 
     @Override
+    public InactiveTopicPolicies getInactiveTopicPolicies(String namespace) throws PulsarAdminException {
+        try {
+            return getInactiveTopicPoliciesAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<InactiveTopicPolicies> getInactiveTopicPoliciesAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "inactiveTopicPolicies");
+        final CompletableFuture<InactiveTopicPolicies> future = new CompletableFuture<>();
+        asyncGetRequest(path, new InvocationCallback<InactiveTopicPolicies>() {
+                    @Override
+                    public void completed(InactiveTopicPolicies inactiveTopicPolicies) {
+                        future.complete(inactiveTopicPolicies);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
+    public void setInactiveTopicPolicies(
+            String namespace, InactiveTopicPolicies inactiveTopicPolicies) throws PulsarAdminException {
+        try {
+            setInactiveTopicPoliciesAsync(namespace, inactiveTopicPolicies)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setInactiveTopicPoliciesAsync(
+            String namespace, InactiveTopicPolicies inactiveTopicPolicies) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "inactiveTopicPolicies");
+        return asyncPostRequest(path, Entity.entity(inactiveTopicPolicies, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public int getMaxProducersPerTopic(String namespace) throws PulsarAdminException {
         try {
             return getMaxProducersPerTopicAsync(namespace).
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index d5ca27e..18f0d39 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -76,6 +76,8 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
@@ -406,6 +408,16 @@ public class PulsarAdminToolTest {
         namespaces.run(split("get-delayed-delivery myprop/clust/ns1"));
         verify(mockNamespaces).getDelayedDelivery("myprop/clust/ns1");
 
+        namespaces.run(split("set-inactive-topic-policies myprop/clust/ns1 -e -t 1s -m delete_when_no_subscriptions"));
+        verify(mockNamespaces).setInactiveTopicPolicies("myprop/clust/ns1"
+                , new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1,true));
+
+        namespaces.run(split("get-inactive-topic-policies myprop/clust/ns1"));
+        verify(mockNamespaces).getInactiveTopicPolicies("myprop/clust/ns1");
+
+        namespaces.run(split("remove-inactive-topic-policies myprop/clust/ns1"));
+        verify(mockNamespaces).removeInactiveTopicPolicies("myprop/clust/ns1");
+
         namespaces.run(split("clear-backlog myprop/clust/ns1 -force"));
         verify(mockNamespaces).clearNamespaceBacklog("myprop/clust/ns1");
 
@@ -484,7 +496,7 @@ public class PulsarAdminToolTest {
 
         namespaces.run(split("get-dispatch-rate myprop/clust/ns1"));
         verify(mockNamespaces).getDispatchRate("myprop/clust/ns1");
-        
+
         namespaces.run(split("set-publish-rate myprop/clust/ns1 -m 10 -b 20"));
         verify(mockNamespaces).setPublishRate("myprop/clust/ns1", new PublishRate(10, 20));
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 78d034f..661a56a 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -44,6 +44,8 @@ import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -658,7 +660,7 @@ public class CmdNamespaces extends CmdBase {
         @Parameter(names = { "--relative-to-publish-rate",
                 "-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled then broker will apply throttling value to (publish-rate + dispatch rate))\n", required = false)
         private boolean relativeToPublishRate = false;
-        
+
         @Override
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(params);
@@ -1014,6 +1016,67 @@ public class CmdNamespaces extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get the inactive topic policy for a namespace")
+    private class GetInactiveTopicPolicies extends CliCommand {
+        @Parameter(description = "tenant/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            print(admin.namespaces().getInactiveTopicPolicies(namespace));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove inactive topic policies from a namespace")
+    private class RemoveInactiveTopicPolicies extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().removeInactiveTopicPolicies(namespace);
+        }
+    }
+
+    @Parameters(commandDescription = "Set the inactive topic policies on a namespace")
+    private class SetInactiveTopicPolicies extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--enable-delete-while-inactive", "-e" }, description = "Enable delete while inactive")
+        private boolean enableDeleteWhileInactive = false;
+
+        @Parameter(names = { "--disable-delete-while-inactive", "-d" }, description = "Disable delete while inactive")
+        private boolean disableDeleteWhileInactive = false;
+
+        @Parameter(names = {"--max-inactive-duration", "-t"}, description = "Max duration of topic inactivity in seconds" +
+                ",topics that are inactive for longer than this value will be deleted (eg: 1s, 10s, 1m, 5h, 3d)", required = true)
+        private String deleteInactiveTopicsMaxInactiveDuration;
+
+        @Parameter(names = { "--delete-mode", "-m" }, description = "Mode of delete inactive topic" +
+                ",Valid options are: [delete_when_no_subscriptions, delete_when_subscriptions_caught_up]", required = true)
+        private String inactiveTopicDeleteMode;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            long maxInactiveDurationInSeconds = TimeUnit.SECONDS.toSeconds(RelativeTimeUtil.parseRelativeTimeInSeconds(deleteInactiveTopicsMaxInactiveDuration));
+
+            if (enableDeleteWhileInactive == disableDeleteWhileInactive) {
+                throw new ParameterException("Need to specify either enable-delete-while-inactive or disable-delete-while-inactive");
+            }
+            InactiveTopicDeleteMode deleteMode = null;
+            try {
+                deleteMode = InactiveTopicDeleteMode.valueOf(inactiveTopicDeleteMode);
+            } catch (IllegalArgumentException e) {
+                throw new ParameterException("delete mode can only be set to delete_when_no_subscriptions or delete_when_subscriptions_caught_up");
+            }
+            admin.namespaces().setInactiveTopicPolicies(namespace, new InactiveTopicPolicies(deleteMode, (int) maxInactiveDurationInSeconds, enableDeleteWhileInactive));
+        }
+    }
+
     @Parameters(commandDescription = "Set the delayed delivery policy on a namespace")
     private class SetDelayedDelivery extends CliCommand {
         @Parameter(description = "tenant/namespace", required = true)
@@ -1662,7 +1725,7 @@ public class CmdNamespaces extends CmdBase {
 
         jcommander.addCommand("get-retention", new GetRetention());
         jcommander.addCommand("set-retention", new SetRetention());
-        
+
         jcommander.addCommand("set-bookie-affinity-group", new SetBookieAffinityGroup());
         jcommander.addCommand("get-bookie-affinity-group", new GetBookieAffinityGroup());
         jcommander.addCommand("delete-bookie-affinity-group", new DeleteBookieAffinityGroup());
@@ -1679,7 +1742,7 @@ public class CmdNamespaces extends CmdBase {
 
         jcommander.addCommand("set-subscription-dispatch-rate", new SetSubscriptionDispatchRate());
         jcommander.addCommand("get-subscription-dispatch-rate", new GetSubscriptionDispatchRate());
-        
+
         jcommander.addCommand("set-publish-rate", new SetPublishRate());
         jcommander.addCommand("get-publish-rate", new GetPublishRate());
 
@@ -1696,6 +1759,10 @@ public class CmdNamespaces extends CmdBase {
         jcommander.addCommand("set-delayed-delivery", new SetDelayedDelivery());
         jcommander.addCommand("get-delayed-delivery", new GetDelayedDelivery());
 
+        jcommander.addCommand("get-inactive-topic-policies", new GetInactiveTopicPolicies());
+        jcommander.addCommand("set-inactive-topic-policies", new SetInactiveTopicPolicies());
+        jcommander.addCommand("remove-inactive-topic-policies", new RemoveInactiveTopicPolicies());
+
         jcommander.addCommand("get-max-producers-per-topic", new GetMaxProducersPerTopic());
         jcommander.addCommand("set-max-producers-per-topic", new SetMaxProducersPerTopic());
         jcommander.addCommand("get-max-consumers-per-topic", new GetMaxConsumersPerTopic());
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicPolicies.java
similarity index 68%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicPolicies.java
index 439ed7b..ac4607b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicPolicies.java
@@ -16,30 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.pulsar.common.policies.data;
 
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
 /**
- * PolicyName authorization operations.
+ * Definition of the inactive topic policy.
  */
-public enum PolicyName {
-    ALL,
-    ANTI_AFFINITY,
-    BACKLOG,
-    COMPACTION,
-    DELAYED_DELIVERY,
-    DEDUPLICATION,
-    MAX_CONSUMERS,
-    MAX_PRODUCERS,
-    MAX_UNACKED,
-    OFFLOAD,
-    PERSISTENCE,
-    RATE,
-    RETENTION,
-    REPLICATION,
-    REPLICATION_RATE,
-    SCHEMA_COMPATIBILITY_STRATEGY,
-    SUBSCRIPTION_AUTH_MODE,
-    ENCRYPTION,
-    TTL,
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class InactiveTopicPolicies {
+    private InactiveTopicDeleteMode inactiveTopicDeleteMode;
+    private int maxInactiveDurationSeconds;
+    private boolean deleteWhileInactive;
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 2946157..0fe0811 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -74,6 +74,8 @@ public class Policies {
     @SuppressWarnings("checkstyle:MemberName")
     public DelayedDeliveryPolicies delayed_delivery_policies = null;
     @SuppressWarnings("checkstyle:MemberName")
+    public InactiveTopicPolicies inactive_topic_policies = null;
+    @SuppressWarnings("checkstyle:MemberName")
     public SubscriptionAuthMode subscription_auth_mode = SubscriptionAuthMode.None;
 
     @SuppressWarnings("checkstyle:MemberName")
@@ -120,7 +122,7 @@ public class Policies {
                 autoSubscriptionCreationOverride, persistence,
                 bundles, latency_stats_sample_rate,
                 message_ttl_in_seconds, subscription_expiration_time_minutes, retention_policies,
-                encryption_required, delayed_delivery_policies,
+                encryption_required, delayed_delivery_policies, inactive_topic_policies,
                 subscription_auth_mode,
                 antiAffinityGroup, max_producers_per_topic,
                 max_consumers_per_topic, max_consumers_per_subscription,
@@ -158,6 +160,7 @@ public class Policies {
                     && Objects.equals(retention_policies, other.retention_policies)
                     && Objects.equals(encryption_required, other.encryption_required)
                     && Objects.equals(delayed_delivery_policies, other.delayed_delivery_policies)
+                    && Objects.equals(inactive_topic_policies, other.inactive_topic_policies)
                     && Objects.equals(subscription_auth_mode, other.subscription_auth_mode)
                     && Objects.equals(antiAffinityGroup, other.antiAffinityGroup)
                     && max_producers_per_topic == other.max_producers_per_topic
@@ -218,6 +221,7 @@ public class Policies {
                 .add("deleted", deleted)
                 .add("encryption_required", encryption_required)
                 .add("delayed_delivery_policies", delayed_delivery_policies)
+                .add("inactive_topic_policies", inactive_topic_policies)
                 .add("subscription_auth_mode", subscription_auth_mode)
                 .add("max_producers_per_topic", max_producers_per_topic)
                 .add("max_consumers_per_topic", max_consumers_per_topic)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
index 439ed7b..8439a1f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
@@ -28,6 +28,7 @@ public enum PolicyName {
     BACKLOG,
     COMPACTION,
     DELAYED_DELIVERY,
+    INACTIVE_TOPIC,
     DEDUPLICATION,
     MAX_CONSUMERS,
     MAX_PRODUCERS,


[pulsar] 20/25: fix:apache#7669 stats recorder time unit error (#7670)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 304924cd837a92fb2ac1e02b92a4107d554bb49b
Author: Lagranmoon <la...@outlook.com>
AuthorDate: Mon Jul 27 20:26:52 2020 +0800

    fix:apache#7669 stats recorder time unit error (#7670)
    
    
    (cherry picked from commit 19d17de2ef1bdeb60ead077b6670da7b9f97d7ad)
---
 .../org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java  | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
index 16d0dc9..923b663 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
@@ -59,7 +59,7 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder {
 
     private volatile double sendMsgsRate;
     private volatile double sendBytesRate;
-    private volatile double[] latencyPctValues;
+    private volatile double[] latencyPctValues = new double[PERCENTILES.length];
 
     private static final double[] PERCENTILES = { 0.5, 0.75, 0.95, 0.99, 0.999, 1.0 };
 
@@ -148,9 +148,9 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder {
                             producer.getProducerName(), producer.getPendingQueueSize(),
                             THROUGHPUT_FORMAT.format(sendMsgsRate),
                             THROUGHPUT_FORMAT.format(sendBytesRate / 1024 / 1024 * 8),
-                            DEC.format(latencyPctValues[0] / 1000.0), DEC.format(latencyPctValues[2] / 1000.0),
-                            DEC.format(latencyPctValues[3] / 1000.0), DEC.format(latencyPctValues[4] / 1000.0),
-                            DEC.format(latencyPctValues[5] / 1000.0),
+                            DEC.format(latencyPctValues[0]), DEC.format(latencyPctValues[2]),
+                            DEC.format(latencyPctValues[3]), DEC.format(latencyPctValues[4]),
+                            DEC.format(latencyPctValues[5]),
                             THROUGHPUT_FORMAT.format(currentNumAcksReceived / elapsed), currentNumSendFailedMsgs);
                 }
 


[pulsar] 11/25: Fix race condition on close consumer while reconnect to broker. (#7589)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ad6edd5ad98f9c706013e4882ca1c71ed3e66d70
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Jul 28 13:57:46 2020 +0800

    Fix race condition on close consumer while reconnect to broker. (#7589)
    
    ### Modifications
    
    Add state check when connection opened of the consumer. If the consumer state is closing or closed, we don’t need to send the subscribe command
    
    (cherry picked from commit 0b37b0c76dd2faaa9b8cc8d5b316ff35a307cfc1)
---
 .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index e0f1bcb..1757f1e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -728,6 +728,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     @Override
     public void connectionOpened(final ClientCnx cnx) {
+        if (getState() == State.Closing || getState() == State.Closed) {
+            setState(State.Closed);
+            closeConsumerTasks();
+            client.cleanupConsumer(this);
+            failPendingReceive();
+            clearReceiverQueue();
+            return;
+        }
         setClientCnx(cnx);
         cnx.registerConsumer(consumerId, this);
 


[pulsar] 21/25: Fix batch index filter issue in Consumer. (#7654)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0be16ea566084706ae34b388e7c1d5b0fcca9e4d
Author: lipenghui <pe...@apache.org>
AuthorDate: Mon Jul 27 21:12:10 2020 +0800

    Fix batch index filter issue in Consumer. (#7654)
    
    ### Motivation
    
    Fix batch index filter issue in Consumer. The previous logic is wrong at https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1344, this should be opposite.
    
    (cherry picked from commit e9a0fd1e9415b1d19e877e315261f923a86fe073)
---
 .../client/impl/BatchMessageIndexAckTest.java      | 23 +++++++++++++++++-----
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 10 +++++++++-
 2 files changed, 27 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
index 582d461..8f76561 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
@@ -84,22 +84,35 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase {
         }
         FutureUtil.waitForAll(futures).get();
 
+        List<MessageId> acked = new ArrayList<>(50);
         for (int i = 0; i < messages; i++) {
+            Message<Integer> msg = consumer.receive();
             if (i % 2 == 0) {
-                consumer.acknowledge(consumer.receive());
+                consumer.acknowledge(msg);
+                acked.add(msg.getMessageId());
             } else {
                 consumer.negativeAcknowledge(consumer.receive());
             }
         }
 
-        List<Message<Integer>> received = new ArrayList<>(50);
+        List<MessageId> received = new ArrayList<>(50);
         for (int i = 0; i < 50; i++) {
-            received.add(consumer.receive());
+            received.add(consumer.receive().getMessageId());
         }
 
         Assert.assertEquals(received.size(), 50);
+        acked.retainAll(received);
+        Assert.assertEquals(acked.size(), 0);
 
-        Message<Integer> moreMessage = consumer.receive(1, TimeUnit.SECONDS);
+        for (MessageId messageId : received) {
+            consumer.acknowledge(messageId);
+        }
+
+        Thread.sleep(1000);
+
+        consumer.redeliverUnacknowledgedMessages();
+
+        Message<Integer> moreMessage = consumer.receive(2, TimeUnit.SECONDS);
         Assert.assertNull(moreMessage);
 
         futures.clear();
@@ -109,7 +122,7 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase {
         FutureUtil.waitForAll(futures).get();
 
         for (int i = 0; i < 50; i++) {
-            received.add(consumer.receive());
+            received.add(consumer.receive().getMessageId());
         }
 
         // Ensure the flow permit is work well since the client skip the acked batch index,
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 28995ad..f9546e9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -101,6 +101,7 @@ import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.SafeCollectionUtils;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.slf4j.Logger;
@@ -1304,6 +1305,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             possibleToDeadLetter = new ArrayList<>();
         }
         int skippedMessages = 0;
+        BitSetRecyclable ackBitSet = null;
+        if (ackSet != null && ackSet.size() > 0) {
+            ackBitSet = BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(ackSet));
+        }
         try {
             for (int i = 0; i < batchSize; ++i) {
                 if (log.isDebugEnabled()) {
@@ -1337,7 +1342,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                     continue;
                 }
 
-                if (ackSet != null && BitSet.valueOf(SafeCollectionUtils.longListToArray(ackSet)).get(i)) {
+                if (ackBitSet != null && !ackBitSet.get(i)) {
                     singleMessagePayload.release();
                     singleMessageMetadataBuilder.recycle();
                     ++skippedMessages;
@@ -1367,6 +1372,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 singleMessagePayload.release();
                 singleMessageMetadataBuilder.recycle();
             }
+            if (ackBitSet != null) {
+                ackBitSet.recycle();
+            }
         } catch (IOException e) {
             log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName);
             discardCorruptedMessage(messageId, cnx, ValidationError.BatchDeSerializeError);


[pulsar] 12/25: fix validation never return false (#7593)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 45d2c8f6802e9fbc7f3790e20a889bf2fec0cedc
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Mon Jul 27 12:26:51 2020 +0800

    fix validation never return false (#7593)
    
    Fixes #7543
    
    (cherry picked from commit 27820358a92704c2cd24a08e295369dfbcb145cc)
---
 .../pulsar/client/impl/TopicsConsumerImplTest.java | 22 ++++++++++++++++++++++
 .../client/impl/MultiTopicsConsumerImpl.java       |  8 +++-----
 2 files changed, 25 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index debc250..7b5309f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -456,6 +456,28 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
     }
 
     @Test
+    public void testTopicNameValid() throws Exception{
+        final String topicName = "persistent://prop/use/ns-abc/testTopicNameValid";
+        TenantInfo tenantInfo = createDefaultTenantInfo();
+        admin.tenants().createTenant("prop", tenantInfo);
+        admin.topics().createPartitionedTopic(topicName, 3);
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName("subscriptionName")
+                .subscribe();
+        ((MultiTopicsConsumerImpl) consumer).subscribeAsync("ns-abc/testTopicNameValid", 5).handle((res, exception) -> {
+            assertTrue(exception instanceof PulsarClientException.AlreadyClosedException);
+            assertEquals(((PulsarClientException.AlreadyClosedException) exception).getMessage(), "Topic name not valid");
+            return null;
+        }).get();
+        ((MultiTopicsConsumerImpl) consumer).subscribeAsync(topicName, 3).handle((res, exception) -> {
+            assertTrue(exception instanceof PulsarClientException.AlreadyClosedException);
+            assertEquals(((PulsarClientException.AlreadyClosedException) exception).getMessage(), "Topic name not valid");
+            return null;
+        }).get();
+    }
+
+    @Test
     public void testSubscribeUnsubscribeSingleTopic() throws Exception {
         String key = "TopicsConsumerSubscribeUnsubscribeSingleTopicTest";
         final String subscriptionName = "my-ex-subscription-" + key;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 4e2a6be..7f44836 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -728,10 +728,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     }
 
     private boolean topicNameValid(String topicName) {
-        checkArgument(TopicName.isValid(topicName), "Invalid topic name:" + topicName);
-        checkArgument(!topics.containsKey(topicName), "Topics already contains topic:" + topicName);
-
-        return true;
+        return TopicName.isValid(topicName) && !topics.containsKey(topicName);
     }
 
     // subscribe one more given topic
@@ -792,7 +789,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     }
 
     // subscribe one more given topic, but already know the numberPartitions
-    private CompletableFuture<Void> subscribeAsync(String topicName, int numberPartitions) {
+    @VisibleForTesting
+    CompletableFuture<Void> subscribeAsync(String topicName, int numberPartitions) {
         if (!topicNameValid(topicName)) {
             return FutureUtil.failedFuture(
                 new PulsarClientException.AlreadyClosedException("Topic name not valid"));


[pulsar] 22/25: [docs] Fix wrong required properties for HDFS2 sink (#7643)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 50493d07d42c25dc2c2f018626fc21f645bce4c4
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Jul 28 17:38:25 2020 +0800

    [docs] Fix wrong required properties for HDFS2 sink (#7643)
    
    * Fix wrong required properties for HDFS2 sink
    
    * Fix description of filenamePrefix
    
    (cherry picked from commit bf90e55214d04c3f8b34433c119c43a49bb1c1e8)
---
 site2/docs/io-hdfs2-sink.md | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/site2/docs/io-hdfs2-sink.md b/site2/docs/io-hdfs2-sink.md
index 9769695..56c4c7b 100644
--- a/site2/docs/io-hdfs2-sink.md
+++ b/site2/docs/io-hdfs2-sink.md
@@ -21,8 +21,8 @@ The configuration of the HDFS2 sink connector has the following properties.
 | `compression` | Compression |false |None |The compression code used to compress or de-compress the files on HDFS. <br/><br/>Below are the available options:<br/><li>BZIP2<br/><li>DEFLATE<br/><li>GZIP<br/><li>LZ4<br/><li>SNAPPY|
 | `kerberosUserPrincipal` |String| false| None|The principal account of Kerberos user used for authentication. |
 | `keytab` | String|false|None| The full pathname of the Kerberos keytab file used for authentication. |
-| `filenamePrefix` |String| false |None |The prefix of the files created inside the HDFS directory.<br/><br/>**Example**<br/> The value of topicA result in files named topicA-. |
-| `fileExtension` | String| false | None| The extension added to the files written to HDFS.<br/><br/>**Example**<br/>'.txt'<br/> '.seq' |
+| `filenamePrefix` |String| true, if `compression` is set to `None`. | None |The prefix of the files created inside the HDFS directory.<br/><br/>**Example**<br/> The value of topicA result in files named topicA-. |
+| `fileExtension` | String| true | None | The extension added to the files written to HDFS.<br/><br/>**Example**<br/>'.txt'<br/> '.seq' |
 | `separator` | char|false |None |The character used to separate records in a text file. <br/><br/>If no value is provided, the contents from all records are concatenated together in one continuous byte array. |
 | `syncInterval` | long| false |0| The interval between calls to flush data to HDFS disk in milliseconds. |
 | `maxPendingRecords` |int| false|Integer.MAX_VALUE |  The maximum number of records that hold in memory before acking. <br/><br/>Setting this property to 1 makes every record send to disk before the record is acked.<br/><br/>Setting this property to a higher value allows buffering records before flushing them to disk. 
@@ -38,6 +38,7 @@ Before using the HDFS2 sink connector, you need to create a configuration file t
         "hdfsConfigResources": "core-site.xml",
         "directory": "/foo/bar",
         "filenamePrefix": "prefix",
+        "fileExtension": ".log",
         "compression": "SNAPPY"
     }
     ```
@@ -49,5 +50,6 @@ Before using the HDFS2 sink connector, you need to create a configuration file t
         hdfsConfigResources: "core-site.xml"
         directory: "/foo/bar"
         filenamePrefix: "prefix"
+        fileExtension: ".log"
         compression: "SNAPPY"
     ```


[pulsar] 24/25: Support to set listener name for client cli (#7621)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1842f7c63645121ee910bb1047403e5a24681c24
Author: Fangbin Sun <su...@gmail.com>
AuthorDate: Tue Jul 28 15:42:53 2020 +0800

    Support to set listener name for client cli (#7621)
    
    ### Motivation
    
    Currently, CLI tools `pulsar-client` and `pulsar-perf` cann't specify required listener name when using `advertisedListeners`.
    
    ### Modifications
    
    Add option `--listener-name` for `pulsar-client` and `pulsar-perf`.
    
    (cherry picked from commit 4f5b39a1f432a4eb9ae1fe6a6e1edde29075c6c6)
---
 .../main/java/org/apache/pulsar/client/cli/PulsarClientTool.java   | 6 ++++++
 .../java/org/apache/pulsar/testclient/PerformanceConsumer.java     | 7 +++++++
 .../java/org/apache/pulsar/testclient/PerformanceProducer.java     | 7 +++++++
 .../main/java/org/apache/pulsar/testclient/PerformanceReader.java  | 7 +++++++
 site2/docs/reference-cli-tools.md                                  | 4 ++++
 5 files changed, 31 insertions(+)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
index 0ba7227..e2cd8db 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
@@ -53,6 +53,9 @@ public class PulsarClientTool {
     @Parameter(names = { "--auth-plugin" }, description = "Authentication plugin class name.")
     String authPluginClassName = null;
 
+    @Parameter(names = { "--listener-name" }, description = "Listener name for the broker.")
+    String listenerName = null;
+
     @Parameter(
         names = { "--auth-params" },
         description = "Authentication parameters, whose format is determined by the implementation " +
@@ -115,6 +118,9 @@ public class PulsarClientTool {
             authentication = AuthenticationFactory.create(authPluginClassName, authParams);
             clientBuilder.authentication(authentication);
         }
+        if (isNotBlank(this.listenerName)) {
+            clientBuilder.listenerName(this.listenerName);
+        }
         clientBuilder.allowTlsInsecureConnection(this.tlsAllowInsecureConnection);
         clientBuilder.tlsTrustCertsFilePath(this.tlsTrustCertsFilePath);
         clientBuilder.serviceUrl(serviceURL);
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 196c557..670578b 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -119,6 +119,9 @@ public class PerformanceConsumer {
         @Parameter(names = { "--auth_plugin" }, description = "Authentication plugin class name")
         public String authPluginClassName;
 
+        @Parameter(names = { "--listener-name" }, description = "Listener name for the broker.")
+        String listenerName = null;
+
         @Parameter(names = { "-mc", "--max_chunked_msg" }, description = "Max pending chunk messages")
         private int maxPendingChuckedMessage = 0;
 
@@ -267,6 +270,10 @@ public class PerformanceConsumer {
             clientBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection);
         }
 
+        if (isNotBlank(arguments.listenerName)) {
+            clientBuilder.listenerName(arguments.listenerName);
+        }
+
         PulsarClient pulsarClient = clientBuilder.build();
 
         class EncKeyReader implements CryptoKeyReader {
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index d24b9b9..1484058 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -122,6 +122,9 @@ public class PerformanceProducer {
         @Parameter(names = { "--auth_plugin" }, description = "Authentication plugin class name")
         public String authPluginClassName;
 
+        @Parameter(names = { "--listener-name" }, description = "Listener name for the broker.")
+        String listenerName = null;
+
         @Parameter(names = { "-ch",
                 "--chunking" }, description = "Should split the message and publish in chunks if message size is larger than allowed max size")
         private boolean chunkingAllowed = false;
@@ -428,6 +431,10 @@ public class PerformanceProducer {
                 clientBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection);
             }
 
+            if (isNotBlank(arguments.listenerName)) {
+                clientBuilder.listenerName(arguments.listenerName);
+            }
+
             client = clientBuilder.build();
             ProducerBuilder<byte[]> producerBuilder = client.newProducer() //
                     .sendTimeout(0, TimeUnit.SECONDS) //
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index 218ea30..694b667 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -92,6 +92,9 @@ public class PerformanceReader {
         @Parameter(names = { "--auth-plugin" }, description = "Authentication plugin class name")
         public String authPluginClassName;
 
+        @Parameter(names = { "--listener-name" }, description = "Listener name for the broker.")
+        String listenerName = null;
+
         @Parameter(
             names = { "--auth-params" },
             description = "Authentication parameters, whose format is determined by the implementation " +
@@ -220,6 +223,10 @@ public class PerformanceReader {
             clientBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection);
         }
 
+        if (isNotBlank(arguments.listenerName)) {
+            clientBuilder.listenerName(arguments.listenerName);
+        }
+
         PulsarClient pulsarClient = clientBuilder.build();
 
         List<CompletableFuture<Reader<byte[]>>> futures = Lists.newArrayList();
diff --git a/site2/docs/reference-cli-tools.md b/site2/docs/reference-cli-tools.md
index eb3a989..29b5842 100644
--- a/site2/docs/reference-cli-tools.md
+++ b/site2/docs/reference-cli-tools.md
@@ -291,6 +291,7 @@ Options
 |---|---|---|
 |`--auth-params`|Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class, for example "key1:val1,key2:val2" or "{\"key1\":\"val1\",\"key2\":\"val2\"}"|{"saslJaasClientSectionName":"PulsarClient", "serverType":"broker"}|
 |`--auth-plugin`|Authentication plugin class name|org.apache.pulsar.client.impl.auth.AuthenticationSasl|
+|`--listener-name`|Listener name for the broker||
 |`--url`|Broker URL to which to connect|pulsar://localhost:6650/ </br> ws://localhost:8080 |
 
 
@@ -415,6 +416,7 @@ Options
 |---|---|---|
 |`--auth_params`|Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class, for example "key1:val1,key2:val2" or "{"key1":"val1","key2":"val2"}.||
 |`--auth_plugin`|Authentication plugin class name||
+|`--listener-name`|Listener name for the broker||
 |`--acks-delay-millis`|Acknowlegments grouping delay in millis|100|
 |`-k`, `--encryption-key-name`|The private key name to decrypt payload||
 |`-v`, `--encryption-key-value-file`|The file which contains the private key to decrypt payload||
@@ -448,6 +450,7 @@ Options
 |---|---|---|
 |`--auth_params`|Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class, for example "key1:val1,key2:val2" or "{"key1":"val1","key2":"val2"}.||
 |`--auth_plugin`|Authentication plugin class name||
+|`--listener-name`|Listener name for the broker||
 |`-b`, `--batch-time-window`|Batch messages in a window of the specified number of milliseconds|1|
 |`-z`, `--compression`|Compress messages’ payload. Possible values are NONE, LZ4, ZLIB, ZSTD or SNAPPY.||
 |`--conf-file`|Configuration file||
@@ -485,6 +488,7 @@ Options
 |---|---|---|
 |`--auth_params`|Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class, for example "key1:val1,key2:val2" or "{"key1":"val1","key2":"val2"}.||
 |`--auth_plugin`|Authentication plugin class name||
+|`--listener-name`|Listener name for the broker||
 |`--conf-file`|Configuration file||
 |`-h`, `--help`|Help message|false|
 |`-c`, `--max-connections`|Max number of TCP connections to a single broker|100|


[pulsar] 09/25: Use Consume/Produce/Lookup interfaces for specific operations in allowTopicOperation (#7587)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 56a0d690976a96725f1aee8e02cb788ad4e480b3
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Sun Jul 26 21:35:31 2020 -0700

    Use Consume/Produce/Lookup interfaces for specific operations in allowTopicOperation (#7587)
    
    ### Motivation
    Several parts of the code use allowTopicOperation while others use canConsume/canProduce/canLookup for those specific operations. This mr makes the former use the latter calls for specific operataions
    
    (cherry picked from commit 516bad1079830b3f5f5046b4237e12861f9ec3a9)
---
 .../authorization/AuthorizationProvider.java       | 30 ++++++++++++----------
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  2 --
 2 files changed, 16 insertions(+), 16 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index 0424c00..d1e7596 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -208,10 +208,7 @@ public interface AuthorizationProvider extends Closeable {
     default CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String originalRole, String role,
                                                             TenantOperation operation,
                                                             AuthenticationDataSource authData) {
-        return FutureUtil.failedFuture(new IllegalStateException(
-                String.format("allowTenantOperation(%s) on tenant %s is not supported by the Authorization" +
-                                " provider you are using.",
-                        operation.toString(), tenantName)));
+        return isTenantAdmin(tenantName, role, null, authData);
     }
 
     default Boolean allowTenantOperation(String tenantName, String originalRole, String role, TenantOperation operation,
@@ -267,11 +264,7 @@ public interface AuthorizationProvider extends Closeable {
     default CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy,
                                                                           PolicyOperation operation, String originalRole,
                                                                           String role, AuthenticationDataSource authData) {
-        return FutureUtil.failedFuture(
-                new IllegalStateException(
-                        String.format("NamespacePolicyOperation(%s) on namespace(%s) by role(%s) is not supported" +
-                                " by the Authorization provider you are using.", operation.toString(),
-                                namespaceName.toString(), role == null ? "null" : role)));
+        return isTenantAdmin(namespaceName.getTenant(), role, null, authData);
     }
 
     default Boolean allowNamespacePolicyOperation(NamespaceName namespaceName, PolicyName policy, PolicyOperation operation,
@@ -298,11 +291,20 @@ public interface AuthorizationProvider extends Closeable {
     default CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic, String originalRole, String role,
                                                              TopicOperation operation,
                                                              AuthenticationDataSource authData) {
-        return FutureUtil.failedFuture(
-            new IllegalStateException(
-                    String.format("TopicOperation(%s) on topic(%s) by role(%s) is not supported" +
-                            " by the Authorization provider you are using.",
-                            operation.toString(), topic.toString(), role == null ? "null" : null)));
+        switch (operation) {
+            case PRODUCE:
+                return canProduceAsync(topic, role, authData);
+            case CONSUME:
+                return canConsumeAsync(topic, role, authData, null);
+            case LOOKUP:
+                return canLookupAsync(topic, role, authData);
+            default:
+                return FutureUtil.failedFuture(
+                        new IllegalStateException(
+                                String.format("TopicOperation(%s) on topic(%s) by role(%s) is not supported" +
+                                                " by the Authorization provider you are using.",
+                                        operation.toString(), topic.toString(), role == null ? "null" : null)));
+        }
     }
 
     default Boolean allowTopicOperation(TopicName topicName, String originalRole, String role, TopicOperation operation,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 3ccfde9..4c082d8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -133,7 +133,6 @@ public class Namespaces extends NamespacesBase {
     public void createNamespace(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
             @ApiParam(value = "Policies for the namespace") Policies policies) {
         validateNamespaceName(tenant, namespace);
-        validateTenantOperation(tenant, TenantOperation.CREATE_NAMESPACE);
         policies = getDefaultPolicesIfNull(policies);
         internalCreateNamespace(policies);
     }
@@ -250,7 +249,6 @@ public class Namespaces extends NamespacesBase {
     public Set<String> getNamespaceReplicationClusters(@PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace), PolicyName.REPLICATION, PolicyOperation.READ);
         return internalGetNamespaceReplicationClusters();
     }