You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/03/13 06:56:43 UTC

[pulsar] branch branch-2.5 updated (bd4d879 -> aaa9062)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a change to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from bd4d879  Fix broker to specify a list of bookie groups. (#6349)
     new 74b3564  [Issue 6168] Fix Unacked Message Tracker by Using Time Partition on C++ (#6391)
     new af106d8  Fix memory leak when running topic compaction. (#6485)
     new b3ce70b  Fix create partitioned topic with a substring of an existing topic name. (#6478)
     new 3c89abb  [proxy] Fix proxy routing to functions worker (#6486)
     new aaa9062  pulsar-proxy: fix correct name for proxy thread executor name (#6460)

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/pulsar/broker/admin/AdminResource.java  | 114 +++++++++++++++++++-
 .../broker/admin/impl/PersistentTopicsBase.java    |  97 +++--------------
 .../broker/admin/v1/NonPersistentTopics.java       |  34 +-----
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  16 ++-
 .../broker/admin/v2/NonPersistentTopics.java       |  37 ++-----
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  29 +++--
 .../pulsar/client/impl/RawBatchConverter.java      |   5 +-
 .../pulsar/compaction/CompactedTopicImpl.java      |  19 ++--
 .../pulsar/compaction/TwoPhaseCompactor.java       | 119 +++++++++++----------
 .../apache/pulsar/broker/admin/AdminApiTest.java   |   8 +-
 .../org/apache/pulsar/broker/admin/AdminTest.java  |   6 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  57 +++++++---
 .../pulsar/broker/admin/v1/V1_AdminApiTest.java    |   4 +-
 .../apache/pulsar/client/impl/RawReaderTest.java   |   4 +-
 .../include/pulsar/ConsumerConfiguration.h         |   4 +
 .../include/pulsar/c/producer_configuration.h      |   2 +-
 pulsar-client-cpp/lib/ConsumerConfiguration.cc     |   6 ++
 pulsar-client-cpp/lib/ConsumerConfigurationImpl.h  |   2 +
 pulsar-client-cpp/lib/ConsumerImpl.cc              |  21 +++-
 pulsar-client-cpp/lib/ConsumerImpl.h               |   1 +
 pulsar-client-cpp/lib/ConsumerImplBase.h           |   1 +
 pulsar-client-cpp/lib/LogUtils.cc                  |   2 +-
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   |  25 ++++-
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h    |   1 +
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc   |  24 ++++-
 pulsar-client-cpp/lib/PartitionedConsumerImpl.h    |   1 +
 .../lib/UnAckedMessageTrackerEnabled.cc            | 106 +++++++++++-------
 .../lib/UnAckedMessageTrackerEnabled.h             |   6 +-
 .../pulsar/proxy/server/AdminProxyHandler.java     |  26 ++++-
 .../proxy/server/util/ZookeeperCacheLoader.java    |   2 +-
 .../proxy/server/FunctionWorkerRoutingTest.java    |  66 ++++++++++++
 31 files changed, 551 insertions(+), 294 deletions(-)
 create mode 100644 pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java


[pulsar] 01/05: [Issue 6168] Fix Unacked Message Tracker by Using Time Partition on C++ (#6391)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 74b3564a7060041c0091ed31a794a8d190330614
Author: k2la <mz...@gmail.com>
AuthorDate: Tue Mar 3 04:55:33 2020 +0900

    [Issue 6168] Fix Unacked Message Tracker by Using Time Partition on C++ (#6391)
    
    ### Motivation
    Fix #6168 .
    >On C++ lib, like the following log, unacked messages are redelivered after about 2 * unAckedMessagesTimeout.
    
    ### Modifications
    As same #3118, by using TimePartition, fixed ` UnackedMessageTracker` .
    - Add `TickDurationInMs`
    - Add `redeliverUnacknowledgedMessages` which require `MessageIds` to `ConsumerImpl`, `MultiTopicsConsumerImpl` and `PartitionedConsumerImpl`.
    (cherry picked from commit 333888ad61c062f9e3d2946918ffc21fafd441af)
---
 .../include/pulsar/ConsumerConfiguration.h         |   4 +
 .../include/pulsar/c/producer_configuration.h      |   2 +-
 pulsar-client-cpp/lib/ConsumerConfiguration.cc     |   6 ++
 pulsar-client-cpp/lib/ConsumerConfigurationImpl.h  |   2 +
 pulsar-client-cpp/lib/ConsumerImpl.cc              |  21 +++-
 pulsar-client-cpp/lib/ConsumerImpl.h               |   1 +
 pulsar-client-cpp/lib/ConsumerImplBase.h           |   1 +
 pulsar-client-cpp/lib/LogUtils.cc                  |   2 +-
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   |  25 ++++-
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h    |   1 +
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc   |  24 ++++-
 pulsar-client-cpp/lib/PartitionedConsumerImpl.h    |   1 +
 .../lib/UnAckedMessageTrackerEnabled.cc            | 106 +++++++++++++--------
 .../lib/UnAckedMessageTrackerEnabled.h             |   6 +-
 14 files changed, 152 insertions(+), 50 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index 08b7c54..5468b37 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -155,6 +155,10 @@ class PULSAR_PUBLIC ConsumerConfiguration {
      */
     long getUnAckedMessagesTimeoutMs() const;
 
+    void setTickDurationInMs(const uint64_t milliSeconds);
+
+    long getTickDurationInMs() const;
+
     /**
      * Set the delay to wait before re-delivering messages that have failed to be process.
      * <p>
diff --git a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
index c846451..1fe44e0 100644
--- a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
@@ -178,4 +178,4 @@ PULSAR_PUBLIC void pulsar_producer_configuration_set_property(pulsar_producer_co
 
 #ifdef __cplusplus
 }
-#endif
\ No newline at end of file
+#endif
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index 38fa1fe..546b8b9 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -98,6 +98,12 @@ void ConsumerConfiguration::setUnAckedMessagesTimeoutMs(const uint64_t milliSeco
     impl_->unAckedMessagesTimeoutMs = milliSeconds;
 }
 
+long ConsumerConfiguration::getTickDurationInMs() const { return impl_->tickDurationInMs; }
+
+void ConsumerConfiguration::setTickDurationInMs(const uint64_t milliSeconds) {
+    impl_->tickDurationInMs = milliSeconds;
+}
+
 void ConsumerConfiguration::setNegativeAckRedeliveryDelayMs(long redeliveryDelayMillis) {
     impl_->negativeAckRedeliveryDelayMs = redeliveryDelayMillis;
 }
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 55dafd3..8dd1263 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -27,6 +27,7 @@ namespace pulsar {
 struct ConsumerConfigurationImpl {
     SchemaInfo schemaInfo;
     long unAckedMessagesTimeoutMs;
+    long tickDurationInMs;
 
     long negativeAckRedeliveryDelayMs;
     ConsumerType consumerType;
@@ -45,6 +46,7 @@ struct ConsumerConfigurationImpl {
     ConsumerConfigurationImpl()
         : schemaInfo(),
           unAckedMessagesTimeoutMs(0),
+          tickDurationInMs(1000),
           negativeAckRedeliveryDelayMs(60000),
           consumerType(ConsumerExclusive),
           messageListener(),
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 9a7e506..a58e09b 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -66,8 +66,13 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
     consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << consumerId_ << "] ";
     consumerStr_ = consumerStrStream.str();
     if (conf.getUnAckedMessagesTimeoutMs() != 0) {
-        unAckedMessageTrackerPtr_.reset(
-            new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
+        if (conf.getTickDurationInMs() > 0) {
+            unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+                conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this));
+        } else {
+            unAckedMessageTrackerPtr_.reset(
+                new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
+        }
     } else {
         unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
     }
@@ -954,6 +959,18 @@ Result ConsumerImpl::resumeMessageListener() {
 void ConsumerImpl::redeliverUnacknowledgedMessages() {
     static std::set<MessageId> emptySet;
     redeliverMessages(emptySet);
+    unAckedMessageTrackerPtr_->clear();
+}
+
+void ConsumerImpl::redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) {
+    if (messageIds.empty()) {
+        return;
+    }
+    if (config_.getConsumerType() != ConsumerShared && config_.getConsumerType() != ConsumerKeyShared) {
+        redeliverUnacknowledgedMessages();
+        return;
+    }
+    redeliverMessages(messageIds);
 }
 
 void ConsumerImpl::redeliverMessages(const std::set<MessageId>& messageIds) {
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index 8a25b49..6d81fd0 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -98,6 +98,7 @@ class ConsumerImpl : public ConsumerImplBase,
     virtual bool isCumulativeAcknowledgementAllowed(ConsumerType consumerType);
 
     virtual void redeliverMessages(const std::set<MessageId>& messageIds);
+    virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds);
     virtual void negativeAcknowledge(const MessageId& msgId);
 
     virtual void closeAsync(ResultCallback callback);
diff --git a/pulsar-client-cpp/lib/ConsumerImplBase.h b/pulsar-client-cpp/lib/ConsumerImplBase.h
index ab6ed9f..fc15066 100644
--- a/pulsar-client-cpp/lib/ConsumerImplBase.h
+++ b/pulsar-client-cpp/lib/ConsumerImplBase.h
@@ -48,6 +48,7 @@ class ConsumerImplBase {
     virtual Result pauseMessageListener() = 0;
     virtual Result resumeMessageListener() = 0;
     virtual void redeliverUnacknowledgedMessages() = 0;
+    virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) = 0;
     virtual const std::string& getName() const = 0;
     virtual int getNumOfPrefetchedMessages() const = 0;
     virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) = 0;
diff --git a/pulsar-client-cpp/lib/LogUtils.cc b/pulsar-client-cpp/lib/LogUtils.cc
index e2615a5..e4f6a17 100644
--- a/pulsar-client-cpp/lib/LogUtils.cc
+++ b/pulsar-client-cpp/lib/LogUtils.cc
@@ -55,4 +55,4 @@ std::string LogUtils::getLoggerName(const std::string& path) {
     return path.substr(startIdx + 1, endIdx - startIdx - 1);
 }
 
-}  // namespace pulsar
\ No newline at end of file
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 25addb9..e837adf 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -45,8 +45,13 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
     consumerStr_ = consumerStrStream.str();
 
     if (conf.getUnAckedMessagesTimeoutMs() != 0) {
-        unAckedMessageTrackerPtr_.reset(
-            new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
+        if (conf.getTickDurationInMs() > 0) {
+            unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+                conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this));
+        } else {
+            unAckedMessageTrackerPtr_.reset(
+                new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
+        }
     } else {
         unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
     }
@@ -654,6 +659,22 @@ void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages() {
          consumer++) {
         (consumer->second)->redeliverUnacknowledgedMessages();
     }
+    unAckedMessageTrackerPtr_->clear();
+}
+
+void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) {
+    if (messageIds.empty()) {
+        return;
+    }
+    if (conf_.getConsumerType() != ConsumerShared && conf_.getConsumerType() != ConsumerKeyShared) {
+        redeliverUnacknowledgedMessages();
+        return;
+    }
+    LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned consumer.");
+    for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
+         consumer++) {
+        (consumer->second)->redeliverUnacknowledgedMessages(messageIds);
+    }
 }
 
 int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return messages_.size(); }
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index d190664..fa271fe 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -69,6 +69,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     virtual Result pauseMessageListener();
     virtual Result resumeMessageListener();
     virtual void redeliverUnacknowledgedMessages();
+    virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds);
     virtual int getNumOfPrefetchedMessages() const;
     virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback);
     void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, MultiTopicsBrokerConsumerStatsPtr,
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 03fd2d2..0241a54 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -43,8 +43,13 @@ PartitionedConsumerImpl::PartitionedConsumerImpl(ClientImplPtr client, const std
     consumerStrStream << "[Partitioned Consumer: " << topic_ << "," << subscriptionName << ","
                       << numPartitions << "]";
     if (conf.getUnAckedMessagesTimeoutMs() != 0) {
-        unAckedMessageTrackerPtr_.reset(
-            new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
+        if (conf.getTickDurationInMs() > 0) {
+            unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+                conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this));
+        } else {
+            unAckedMessageTrackerPtr_.reset(
+                new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
+        }
     } else {
         unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
     }
@@ -426,6 +431,21 @@ void PartitionedConsumerImpl::redeliverUnacknowledgedMessages() {
     for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
         (*i)->redeliverUnacknowledgedMessages();
     }
+    unAckedMessageTrackerPtr_->clear();
+}
+
+void PartitionedConsumerImpl::redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) {
+    if (messageIds.empty()) {
+        return;
+    }
+    if (conf_.getConsumerType() != ConsumerShared && conf_.getConsumerType() != ConsumerKeyShared) {
+        redeliverUnacknowledgedMessages();
+        return;
+    }
+    LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned consumer.");
+    for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
+        (*i)->redeliverUnacknowledgedMessages(messageIds);
+    }
 }
 
 const std::string& PartitionedConsumerImpl::getName() const { return partitionStr_; }
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
index 71b2a30..fb4b047 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
@@ -64,6 +64,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
     virtual Result pauseMessageListener();
     virtual Result resumeMessageListener();
     virtual void redeliverUnacknowledgedMessages();
+    virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds);
     virtual const std::string& getName() const;
     virtual int getNumOfPrefetchedMessages() const;
     virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback);
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
index 2c768b2..7894e64 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
@@ -28,7 +28,7 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() {
     timeoutHandlerHelper();
     ExecutorServicePtr executorService = client_->getIOExecutorProvider()->get();
     timer_ = executorService->createDeadlineTimer();
-    timer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs_));
+    timer_->expires_from_now(boost::posix_time::milliseconds(tickDurationInMs_));
     timer_->async_wait([&](const boost::system::error_code& ec) {
         if (ec) {
             LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]");
@@ -42,86 +42,112 @@ void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() {
     std::lock_guard<std::mutex> acquire(lock_);
     LOG_DEBUG("UnAckedMessageTrackerEnabled::timeoutHandlerHelper invoked for consumerPtr_ "
               << consumerReference_.getName().c_str());
-    if (!oldSet_.empty()) {
+
+    std::set<MessageId> headPartition = timePartitions.front();
+    timePartitions.pop_front();
+
+    std::set<MessageId> msgIdsToRedeliver;
+    if (!headPartition.empty()) {
         LOG_INFO(consumerReference_.getName().c_str()
-                 << ": " << oldSet_.size() << " Messages were not acked within " << timeoutMs_ << " time");
-        oldSet_.clear();
-        currentSet_.clear();
-        consumerReference_.redeliverUnacknowledgedMessages();
+                 << ": " << headPartition.size() << " Messages were not acked within "
+                 << timePartitions.size() * tickDurationInMs_ << " time");
+        for (auto it = headPartition.begin(); it != headPartition.end(); it++) {
+            msgIdsToRedeliver.insert(*it);
+            messageIdPartitionMap.erase(*it);
+        }
+    }
+    headPartition.clear();
+    timePartitions.push_back(headPartition);
+
+    if (msgIdsToRedeliver.size() > 0) {
+        consumerReference_.redeliverUnacknowledgedMessages(msgIdsToRedeliver);
     }
-    oldSet_.swap(currentSet_);
 }
 
 UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, const ClientImplPtr client,
                                                            ConsumerImplBase& consumer)
     : consumerReference_(consumer) {
+    UnAckedMessageTrackerEnabled(timeoutMs, timeoutMs, client, consumer);
+}
+
+UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, long tickDurationInMs,
+                                                           const ClientImplPtr client,
+                                                           ConsumerImplBase& consumer)
+    : consumerReference_(consumer) {
     timeoutMs_ = timeoutMs;
+    tickDurationInMs_ = (timeoutMs >= tickDurationInMs) ? tickDurationInMs : timeoutMs;
     client_ = client;
+
+    int blankPartitions = (int)std::ceil((double)timeoutMs_ / tickDurationInMs_);
+    for (int i = 0; i < blankPartitions + 1; i++) {
+        std::set<MessageId> msgIds;
+        timePartitions.push_back(msgIds);
+    }
+
     timeoutHandler();
 }
 
 bool UnAckedMessageTrackerEnabled::add(const MessageId& m) {
     std::lock_guard<std::mutex> acquire(lock_);
-    oldSet_.erase(m);
-    return currentSet_.insert(m).second;
+    if (messageIdPartitionMap.count(m) == 0) {
+        bool insert = messageIdPartitionMap.insert(std::make_pair(m, timePartitions.back())).second;
+        return insert && timePartitions.back().insert(m).second;
+    }
+    return false;
 }
 
 bool UnAckedMessageTrackerEnabled::isEmpty() {
     std::lock_guard<std::mutex> acquire(lock_);
-    return oldSet_.empty() && currentSet_.empty();
+    return messageIdPartitionMap.empty();
 }
 
 bool UnAckedMessageTrackerEnabled::remove(const MessageId& m) {
     std::lock_guard<std::mutex> acquire(lock_);
-    return oldSet_.erase(m) || currentSet_.erase(m);
+    bool removed = false;
+    std::map<MessageId, std::set<MessageId>>::iterator exist = messageIdPartitionMap.find(m);
+    if (exist != messageIdPartitionMap.end()) {
+        removed = exist->second.erase(m);
+    }
+    return removed;
 }
 
 long UnAckedMessageTrackerEnabled::size() {
     std::lock_guard<std::mutex> acquire(lock_);
-    return oldSet_.size() + currentSet_.size();
+    return messageIdPartitionMap.size();
 }
 
 void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
     std::lock_guard<std::mutex> acquire(lock_);
-    for (std::set<MessageId>::iterator it = oldSet_.begin(); it != oldSet_.end();) {
-        if (*it < msgId && it->partition() == msgId.partition()) {
-            oldSet_.erase(it++);
-        } else {
-            it++;
-        }
-    }
-    for (std::set<MessageId>::iterator it = currentSet_.begin(); it != currentSet_.end();) {
-        if (*it < msgId && it->partition() == msgId.partition()) {
-            currentSet_.erase(it++);
-        } else {
-            it++;
+    for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end(); it++) {
+        MessageId msgIdInMap = it->first;
+        if (msgIdInMap < msgId) {
+            std::map<MessageId, std::set<MessageId>>::iterator exist = messageIdPartitionMap.find(msgId);
+            if (exist != messageIdPartitionMap.end()) {
+                exist->second.erase(msgId);
+            }
         }
     }
 }
 
 // this is only for MultiTopicsConsumerImpl, when un-subscribe a single topic, should remove all it's message.
 void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string& topic) {
-    for (std::set<MessageId>::iterator it = oldSet_.begin(); it != oldSet_.end();) {
-        const std::string& topicPartitionName = it->getTopicName();
-        if (topicPartitionName.find(topic) != std::string::npos) {
-            oldSet_.erase(it++);
-        } else {
-            it++;
-        }
-    }
-    for (std::set<MessageId>::iterator it = currentSet_.begin(); it != currentSet_.end();) {
-        const std::string& topicPartitionName = it->getTopicName();
-        if (topicPartitionName.find(topic) != std::string::npos) {
-            currentSet_.erase(it++);
-        } else {
-            it++;
+    std::lock_guard<std::mutex> acquire(lock_);
+    for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end(); it++) {
+        MessageId msgIdInMap = it->first;
+        if (msgIdInMap.getTopicName().compare(topic) == 0) {
+            std::map<MessageId, std::set<MessageId>>::iterator exist = messageIdPartitionMap.find(msgIdInMap);
+            if (exist != messageIdPartitionMap.end()) {
+                exist->second.erase(msgIdInMap);
+            }
         }
     }
 }
 
 void UnAckedMessageTrackerEnabled::clear() {
-    currentSet_.clear();
-    oldSet_.clear();
+    messageIdPartitionMap.clear();
+    for (auto it = timePartitions.begin(); it != timePartitions.end(); it++) {
+        it->clear();
+    }
 }
 
 UnAckedMessageTrackerEnabled::~UnAckedMessageTrackerEnabled() {
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
index 921e747..c2b4012 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
@@ -28,6 +28,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
    public:
     ~UnAckedMessageTrackerEnabled();
     UnAckedMessageTrackerEnabled(long timeoutMs, const ClientImplPtr, ConsumerImplBase&);
+    UnAckedMessageTrackerEnabled(long timeoutMs, long tickDuration, const ClientImplPtr, ConsumerImplBase&);
     bool add(const MessageId& m);
     bool remove(const MessageId& m);
     void removeMessagesTill(const MessageId& msgId);
@@ -40,13 +41,14 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
     void timeoutHandlerHelper();
     bool isEmpty();
     long size();
-    std::set<MessageId> currentSet_;
-    std::set<MessageId> oldSet_;
+    std::map<MessageId, std::set<MessageId>> messageIdPartitionMap;
+    std::deque<std::set<MessageId>> timePartitions;
     std::mutex lock_;
     DeadlineTimerPtr timer_;
     ConsumerImplBase& consumerReference_;
     ClientImplPtr client_;
     long timeoutMs_;
+    long tickDurationInMs_;
 };
 }  // namespace pulsar
 


[pulsar] 03/05: Fix create partitioned topic with a substring of an existing topic name. (#6478)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b3ce70b64b3182bd29926d041b8dc280f3156dd9
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Mar 6 16:50:10 2020 +0800

    Fix create partitioned topic with a substring of an existing topic name. (#6478)
    
    Fixes #6468
    
    Fix create a partitioned topic with a substring of an existing topic name. And make create partitioned topic async.
    (cherry picked from commit 19ccfd5c60020a32bceeca128a9846ca006f0dc7)
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 114 ++++++++++++++++++++-
 .../broker/admin/impl/PersistentTopicsBase.java    |  97 +++---------------
 .../broker/admin/v1/NonPersistentTopics.java       |  34 +-----
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  16 ++-
 .../broker/admin/v2/NonPersistentTopics.java       |  37 ++-----
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  29 ++++--
 .../apache/pulsar/broker/admin/AdminApiTest.java   |   8 +-
 .../org/apache/pulsar/broker/admin/AdminTest.java  |   6 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  57 ++++++++---
 .../pulsar/broker/admin/v1/V1_AdminApiTest.java    |   4 +-
 10 files changed, 227 insertions(+), 175 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 79fe998..965b8db 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Lists;
 
 import java.net.MalformedURLException;
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -36,6 +37,7 @@ import java.util.stream.Collectors;
 
 import javax.servlet.ServletContext;
 import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
@@ -46,6 +48,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.naming.Constants;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
@@ -255,16 +258,19 @@ public abstract class AdminResource extends PulsarWebResource {
         return namespaces;
     }
 
-    protected void tryCreatePartitionsAsync(int numPartitions) {
+    protected CompletableFuture<Void> tryCreatePartitionsAsync(int numPartitions) {
         if (!topicName.isPersistent()) {
-            return;
+            return CompletableFuture.completedFuture(null);
         }
+        List<CompletableFuture<Void>> futures = new ArrayList<>(numPartitions);
         for (int i = 0; i < numPartitions; i++) {
-            tryCreatePartitionAsync(i);
+            futures.add(tryCreatePartitionAsync(i, null));
         }
+        return FutureUtil.waitForAll(futures);
     }
 
-    private void tryCreatePartitionAsync(final int partition) {
+    private CompletableFuture<Void> tryCreatePartitionAsync(final int partition, CompletableFuture<Void> reuseFuture) {
+        CompletableFuture<Void> result = reuseFuture == null ? new CompletableFuture<>() : reuseFuture;
         zkCreateOptimisticAsync(localZk(), ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0],
             (rc, s, o, s1) -> {
                 if (KeeperException.Code.OK.intValue() == rc) {
@@ -272,18 +278,22 @@ public abstract class AdminResource extends PulsarWebResource {
                         log.debug("[{}] Topic partition {} created.", clientAppId(),
                             topicName.getPartition(partition));
                     }
+                    result.complete(null);
                 } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
                     log.info("[{}] Topic partition {} is exists, doing nothing.", clientAppId(),
                         topicName.getPartition(partition));
+                    result.completeExceptionally(KeeperException.create(KeeperException.Code.NODEEXISTS));
                 } else if (KeeperException.Code.BADVERSION.intValue() == rc) {
                     log.warn("[{}] Fail to create topic partition {} with concurrent modification, retry now.",
                             clientAppId(), topicName.getPartition(partition));
-                    tryCreatePartitionAsync(partition);
+                    tryCreatePartitionAsync(partition, result);
                 } else {
                     log.error("[{}] Fail to create topic partition {}", clientAppId(),
                         topicName.getPartition(partition), KeeperException.create(KeeperException.Code.get(rc)));
+                    result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
                 }
         });
+        return result;
     }
 
     protected NamespaceName namespaceName;
@@ -699,4 +709,98 @@ public abstract class AdminResource extends PulsarWebResource {
         partitionedTopics.sort(null);
         return partitionedTopics;
     }
+
+    protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions) {
+        try {
+            validateAdminAccessForTenant(topicName.getTenant());
+        } catch (Exception e) {
+            log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+        if (numPartitions <= 0) {
+            asyncResponse.resume(new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0"));
+            return;
+        }
+        checkTopicExistsAsync(topicName).thenAccept(exists -> {
+            if (exists) {
+                log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
+                asyncResponse.resume(new RestException(Status.CONFLICT, "This topic already exists"));
+            } else {
+                try {
+                    String path = ZkAdminPaths.partitionedTopicPath(topicName);
+                    byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
+                    zkCreateOptimisticAsync(globalZk(), path, data, (rc, s, o, s1) -> {
+                        if (KeeperException.Code.OK.intValue() == rc) {
+                            globalZk().sync(path, (rc2, s2, ctx) -> {
+                                if (KeeperException.Code.OK.intValue() == rc2) {
+                                    log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
+                                    tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
+                                        log.info("[{}] Successfully created partitions for topic {}", clientAppId(), topicName);
+                                        asyncResponse.resume(Response.noContent().build());
+                                    }).exceptionally(e -> {
+                                        log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName);
+                                        // The partitioned topic is created but there are some partitions create failed
+                                        asyncResponse.resume(new RestException(e));
+                                        return null;
+                                    });
+                                } else {
+                                    log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc2)));
+                                    asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc2))));
+                                }
+                            }, null);
+                        } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
+                            log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
+                            asyncResponse.resume(new RestException(Status.CONFLICT, "Partitioned topic already exists"));
+                        } else if (KeeperException.Code.BADVERSION.intValue() == rc) {
+                            log.warn("[{}] Failed to create partitioned topic {}: concurrent modification", clientAppId(),
+                                    topicName);
+                            asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+                        } else {
+                            log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc)));
+                            asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc))));
+                        }
+                    });
+                } catch (Exception e) {
+                    log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
+                    resumeAsyncResponseExceptionally(asyncResponse, e);
+                }
+            }
+        }).exceptionally(ex -> {
+            log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, ex);
+            resumeAsyncResponseExceptionally(asyncResponse, ex);
+            return null;
+        });
+    }
+
+    /**
+     * Check the exists topics contains the given topic.
+     * Since there are topic partitions and non-partitioned topics in Pulsar, must ensure both partitions
+     * and non-partitioned topics are not duplicated. So, if compare with a partition name, we should compare
+     * to the partitioned name of this partition.
+     *
+     * @param topicName given topic name
+     */
+    protected CompletableFuture<Boolean> checkTopicExistsAsync(TopicName topicName) {
+        return pulsar().getNamespaceService().getListOfTopics(topicName.getNamespaceObject(),
+                PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
+                .thenCompose(topics -> {
+                    boolean exists = false;
+                    for (String topic : topics) {
+                        if (topicName.getPartitionedTopicName().equals(TopicName.get(topic).getPartitionedTopicName())) {
+                            exists = true;
+                            break;
+                        }
+                    }
+                    return CompletableFuture.completedFuture(exists);
+                });
+    }
+
+    protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable throwable) {
+        if (throwable instanceof WebApplicationException) {
+            asyncResponse.resume(throwable);
+        } else {
+            asyncResponse.resume(new RestException(throwable));
+        }
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index b07f018..7832777 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.broker.admin.impl;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-import org.apache.pulsar.common.api.proto.PulsarApi;
+
 import static org.apache.pulsar.common.util.Codec.decode;
 
 import com.github.zafarkhaja.semver.Version;
@@ -390,46 +390,6 @@ public class PersistentTopicsBase extends AdminResource {
         revokePermissions(topicName.toString(), role);
     }
 
-    protected void internalCreatePartitionedTopic(int numPartitions) {
-        validateAdminAccessForTenant(topicName.getTenant());
-        if (numPartitions <= 0) {
-            throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
-        }
-        validatePartitionTopicName(topicName.getLocalName());
-        try {
-            boolean topicExist = pulsar().getNamespaceService()
-                    .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
-                    .join()
-                    .contains(topicName.toString());
-            if (topicExist) {
-                log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
-                throw new RestException(Status.CONFLICT, "This topic already exists");
-            }
-        } catch (Exception e) {
-            log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
-            throw new RestException(e);
-        }
-        try {
-            String path = ZkAdminPaths.partitionedTopicPath(topicName);
-            byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
-            zkCreateOptimistic(path, data);
-            tryCreatePartitionsAsync(numPartitions);
-            // Sync data to all quorums and the observers
-            zkSync(path);
-            log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
-        } catch (KeeperException.NodeExistsException e) {
-            log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
-            throw new RestException(Status.CONFLICT, "Partitioned topic already exists");
-        } catch (KeeperException.BadVersionException e) {
-                log.warn("[{}] Failed to create partitioned topic {}: concurrent modification", clientAppId(),
-                        topicName);
-                throw new RestException(Status.CONFLICT, "Concurrent modification");
-        } catch (Exception e) {
-            log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
-            throw new RestException(e);
-        }
-    }
-
     protected void internalCreateNonPartitionedTopic(boolean authoritative) {
         validateAdminAccessForTenant(topicName.getTenant());
         validateNonPartitionTopicName(topicName.getLocalName());
@@ -540,11 +500,22 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
-    protected void internalCreateMissedPartitions() {
-        PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName, false, false);
-        if (metadata != null) {
-            tryCreatePartitionsAsync(metadata.partitions);
-        }
+    protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
+        getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> {
+            if (metadata != null) {
+                tryCreatePartitionsAsync(metadata.partitions).thenAccept(v -> {
+                    asyncResponse.resume(Response.noContent().build());
+                }).exceptionally(e -> {
+                    log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName);
+                    resumeAsyncResponseExceptionally(asyncResponse, e);
+                    return null;
+                });
+            }
+        }).exceptionally(e -> {
+            log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return null;
+        });
     }
 
     private CompletableFuture<Void> updatePartitionInOtherCluster(int numPartitions, Set<String> clusters) {
@@ -2072,40 +2043,6 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     /**
-     * Validate partitioned topic name.
-     * Validation will fail and throw RestException if
-     * 1) There's already a partitioned topic with same topic name and have some of its partition created.
-     * 2) There's already non partition topic with same name and contains partition suffix "-partition-"
-     * followed by numeric value. In this case internal created partition of partitioned topic could override
-     * the existing non partition topic.
-     *
-     * @param topicName
-     */
-    private void validatePartitionTopicName(String topicName) {
-        List<String> existingTopicList = internalGetList();
-        String prefix = topicName + TopicName.PARTITIONED_TOPIC_SUFFIX;
-        for (String existingTopicName : existingTopicList) {
-            if (existingTopicName.contains(prefix)) {
-                try {
-                    Long.parseLong(existingTopicName.substring(
-                            existingTopicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX)
-                                    + TopicName.PARTITIONED_TOPIC_SUFFIX.length()));
-                    log.warn("[{}] Already have topic {} which contains partition " +
-                            "suffix '-partition-' and end with numeric value. Creation of partitioned topic {}"
-                            + "could cause conflict.", clientAppId(), existingTopicName, topicName);
-                    throw new RestException(Status.PRECONDITION_FAILED,
-                            "Already have topic " + existingTopicName + " which contains partition suffix '-partition-' " +
-                                    "and end with numeric value, Creation of partitioned topic " + topicName +
-                                    " could cause conflict.");
-                } catch (NumberFormatException e) {
-                    // Do nothing, if value after partition suffix is not pure numeric value,
-                    // as it can't conflict with internal created partitioned topic's name.
-                }
-            }
-        }
-    }
-
-    /**
      * Validate non partition topic name,
      * Validation will fail and throw RestException if
      * 1) Topic name contains partition suffix "-partition-" and the remaining part follow the partition
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 12b9622..a1691d7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -118,41 +118,15 @@ public class NonPersistentTopics extends PersistentTopics {
     @ApiOperation(hidden = true, value = "Create a partitioned topic.", notes = "It needs to be called before creating a producer on a partitioned topic.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 409, message = "Partitioned topic already exist") })
-    public void createPartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
+    public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
             int numPartitions) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
-        validateAdminAccessForTenant(topicName.getTenant());
-        if (numPartitions <= 0) {
-            throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
-        }
         try {
-            boolean topicExist = pulsar().getNamespaceService()
-                    .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
-                    .join()
-                    .contains(topicName.toString());
-            if (topicExist) {
-                log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
-                throw new RestException(Status.CONFLICT, "This topic already exists");
-            }
-        } catch (Exception e) {
-            log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
-            throw new RestException(e);
-        }
-        try {
-            String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(),
-                    topicName.getEncodedLocalName());
-            byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
-            zkCreateOptimistic(path, data);
-            // Sync data to all quorums and the observers
-            zkSync(path);
-            log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
-        } catch (KeeperException.NodeExistsException e) {
-            log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
-            throw new RestException(Status.CONFLICT, "Partitioned topic already exist");
+            validateTopicName(property, cluster, namespace, encodedTopic);
+            internalCreatePartitionedTopic(asyncResponse, numPartitions);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
-            throw new RestException(e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 9944ca3..57c91a5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -57,7 +57,8 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
-import javax.ws.rs.container.AsyncResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  */
@@ -66,7 +67,7 @@ import javax.ws.rs.container.AsyncResponse;
 @Api(value = "/persistent", description = "Persistent topic admin apis", tags = "persistent topic", hidden = true)
 @SuppressWarnings("deprecation")
 public class PersistentTopics extends PersistentTopicsBase {
-
+    private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
     @GET
     @Path("/{property}/{cluster}/{namespace}")
     @ApiOperation(hidden = true, value = "Get the list of topics under a namespace.", response = String.class, responseContainer = "List")
@@ -141,11 +142,16 @@ public class PersistentTopics extends PersistentTopicsBase {
     @ApiOperation(hidden = true, value = "Create a partitioned topic.", notes = "It needs to be called before creating a producer on a partitioned topic.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 409, message = "Partitioned topic already exist") })
-    public void createPartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
+    public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
             int numPartitions) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
-        internalCreatePartitionedTopic(numPartitions);
+        try {
+            validateTopicName(property, cluster, namespace, encodedTopic);
+            internalCreatePartitionedTopic(asyncResponse, numPartitions);
+        } catch (Exception e) {
+            log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+        }
     }
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index add815d..c13822d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -158,6 +158,7 @@ public class NonPersistentTopics extends PersistentTopics {
             @ApiResponse(code = 503, message = "Failed to validate global cluster configuration"),
     })
     public void createPartitionedTopic(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -166,39 +167,15 @@ public class NonPersistentTopics extends PersistentTopics {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "The number of partitions for the topic", required = true, type = "int", defaultValue = "0")
             int numPartitions) {
-        validateGlobalNamespaceOwnership(tenant,namespace);
-        validateTopicName(tenant, namespace, encodedTopic);
-        validateAdminAccessForTenant(topicName.getTenant());
-        if (numPartitions <= 0) {
-            throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
-        }
-        try {
-            boolean topicExist = pulsar().getNamespaceService()
-                    .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
-                    .join()
-                    .contains(topicName.toString());
-            if (topicExist) {
-                log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
-                throw new RestException(Status.CONFLICT, "This topic already exists");
-            }
-        } catch (Exception e) {
-            log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
-            throw new RestException(e);
-        }
+
         try {
-            String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(),
-                    topicName.getEncodedLocalName());
-            byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
-            zkCreateOptimistic(path, data);
-            // Sync data to all quorums and the observers
-            zkSync(path);
-            log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
-        } catch (KeeperException.NodeExistsException e) {
-            log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
-            throw new RestException(Status.CONFLICT, "Partitioned topic already exists");
+            validateGlobalNamespaceOwnership(tenant,namespace);
+            validateTopicName(tenant, namespace, encodedTopic);
+            validateAdminAccessForTenant(topicName.getTenant());
+            internalCreatePartitionedTopic(asyncResponse, numPartitions);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
-            throw new RestException(e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 57dd7e1..89d00a2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -57,6 +57,9 @@ import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import io.swagger.annotations.ApiParam;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import static org.apache.pulsar.common.util.Codec.decode;
 
 /**
@@ -191,6 +194,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
     })
     public void createPartitionedTopic(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -199,9 +203,15 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "The number of partitions for the topic", required = true, type = "int", defaultValue = "0")
             int numPartitions) {
-        validateGlobalNamespaceOwnership(tenant,namespace);
-        validatePartitionedTopicName(tenant, namespace, encodedTopic);
-        internalCreatePartitionedTopic(numPartitions);
+        try {
+            validateGlobalNamespaceOwnership(tenant,namespace);
+            validatePartitionedTopicName(tenant, namespace, encodedTopic);
+            validateAdminAccessForTenant(topicName.getTenant());
+            internalCreatePartitionedTopic(asyncResponse, numPartitions);
+        } catch (Exception e) {
+            log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+        }
     }
 
     @PUT
@@ -273,7 +283,7 @@ public class PersistentTopics extends PersistentTopicsBase {
 
     @POST
     @Path("/{tenant}/{namespace}/{topic}/createMissedPartitions")
-    @ApiOperation(value = "Create missed partitions of an existing partitioned topic.", notes = "This is a best-effort operation for create missed partitions of existing non-global partitioned-topic and does't throw any exceptions when create failed")
+    @ApiOperation(value = "Create missed partitions of an existing partitioned topic.")
     @ApiResponses(value = {
             @ApiResponse(code = 401, message = "Don't have permission to adminisActions to be grantedtrate resources on this tenant"),
             @ApiResponse(code = 403, message = "Don't have admin permission"),
@@ -283,6 +293,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 500, message = "Internal server error")
     })
     public void createMissedPartitions(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -290,8 +301,12 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic) {
 
-        validatePartitionedTopicName(tenant, namespace, encodedTopic);
-        internalCreateMissedPartitions();
+        try {
+            validatePartitionedTopicName(tenant, namespace, encodedTopic);
+            internalCreateMissedPartitions(asyncResponse);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+        }
     }
 
     @GET
@@ -1041,4 +1056,6 @@ public class PersistentTopics extends PersistentTopicsBase {
         validateTopicName(tenant, namespace, encodedTopic);
         return internalGetLastMessageId(authoritative);
     }
+
+    private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index b83c5c2..af1a95f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -915,9 +915,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         try {
             admin.topics().createPartitionedTopic(partitionedTopicName, 32);
             fail("Should have failed as the partitioned topic already exists");
-        } catch (PreconditionFailedException e) {
-            // Expecting PreconditionFailedException instead of ConflictException as it'll
-            // fail validation before actually try to create metadata in ZK.
+        } catch (ConflictException ignore) {
         }
 
         producer = client.newProducer(Schema.BYTES)
@@ -1917,6 +1915,10 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         } catch (PulsarAdminException e) {
             assertTrue(e instanceof ConflictException);
         }
+
+        // Check create partitioned topic with substring topic name
+        admin.topics().createPartitionedTopic("persistent://prop-xyz/ns1/create_substring_topic", 1);
+        admin.topics().createPartitionedTopic("persistent://prop-xyz/ns1/substring_topic", 1);
     }
 
     /**
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 06ec02a..9ef835d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -650,7 +650,11 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         verify(response, times(1)).resume(Lists.newArrayList());
         // create topic
         assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists.newArrayList());
-        persistentTopics.createPartitionedTopic(property, cluster, namespace, topic, 5);
+        response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response, property, cluster, namespace, topic, 5);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists
                 .newArrayList(String.format("persistent://%s/%s/%s/%s", property, cluster, namespace, topic)));
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 1825d31..cac35a6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -56,6 +56,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
@@ -157,13 +158,17 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
                 "Partitioned Topic not found: persistent://my-tenant/my-namespace/topic-not-found-partition-0 has zero partitions");
 
         // 3) Create the partitioned topic
-        persistentTopics.createPartitionedTopic(testTenant, testNamespace, testLocalTopicName, 3);
+        response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 3);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
         // 4) Create a subscription
         response = mock(AsyncResponse.class);
         persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", true,
                 (MessageIdImpl) MessageId.earliest, false);
-        ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
@@ -239,7 +244,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true);
     }
 
-    @Test(expectedExceptions = RestException.class)
+    @Test
     public void testCreatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix() throws KeeperException, InterruptedException {
         // Test the case in which user already has topic like topic-name-partition-123 created before we enforce the validation.
         final String nonPartitionTopicName1 = "standard-topic";
@@ -250,7 +255,12 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         doReturn(mockLocalZooKeeperCacheService).when(pulsar).getLocalZkCacheService();
         doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache();
         doReturn(ImmutableSet.of(nonPartitionTopicName1, nonPartitionTopicName2)).when(mockZooKeeperChildrenCache).get(anyString());
-        persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionedTopicName, 5);
+        doReturn(CompletableFuture.completedFuture(ImmutableSet.of(nonPartitionTopicName1, nonPartitionTopicName2))).when(mockZooKeeperChildrenCache).getAsync(anyString());
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5);
+        verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
+        Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.CONFLICT.getStatusCode());
     }
 
     @Test(expectedExceptions = RestException.class)
@@ -263,13 +273,18 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         doReturn(mockLocalZooKeeperCacheService).when(pulsar).getLocalZkCacheService();
         doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache();
         doReturn(ImmutableSet.of(nonPartitionTopicName2)).when(mockZooKeeperChildrenCache).get(anyString());
+        doReturn(CompletableFuture.completedFuture(ImmutableSet.of(nonPartitionTopicName2))).when(mockZooKeeperChildrenCache).getAsync(anyString());
         doAnswer(invocation -> {
             persistentTopics.namespaceName = NamespaceName.get("tenant", "namespace");
             persistentTopics.topicName = TopicName.get("persistent", "tenant", "cluster", "namespace", "topicname");
             return null;
         }).when(persistentTopics).validatePartitionedTopicName(any(), any(), any());
         doNothing().when(persistentTopics).validateAdminAccessForTenant(anyString());
-        persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionedTopicName, 5);
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         persistentTopics.updatePartitionedTopic(testTenant, testNamespace, partitionedTopicName, true, 10);
     }
 
@@ -295,7 +310,11 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
 
         // 3) create partitioned topic and unload
         response = mock(AsyncResponse.class);
-        persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionTopicName, 6);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 6);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+        response = mock(AsyncResponse.class);
         persistentTopics.unloadTopic(response, testTenant, testNamespace, partitionTopicName, true);
         responseCaptor = ArgumentCaptor.forClass(Response.class);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
@@ -320,10 +339,17 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
 
     @Test
     public void testGetPartitionedTopicsList() throws KeeperException, InterruptedException, PulsarAdminException {
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, "test-topic1", 3);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
-        persistentTopics.createPartitionedTopic(testTenant, testNamespace, "test-topic1", 3);
-
-        nonPersistentTopic.createPartitionedTopic(testTenant, testNamespace, "test-topic2", 3);
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        nonPersistentTopic.createPartitionedTopic(response, testTenant, testNamespace, "test-topic2", 3);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
         List<String> persistentPartitionedTopics = persistentTopics.getPartitionedTopicList(testTenant, testNamespace);
 
@@ -351,7 +377,11 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
     public void testGrantPartitionedTopic() {
         final String partitionedTopicName = "partitioned-topic";
         final int numPartitions = 5;
-        persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionedTopicName, numPartitions);
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, numPartitions);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
         String role = "role";
         Set<AuthAction> expectActions = new HashSet<>();
@@ -387,8 +417,11 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
     public void testRevokePartitionedTopic() {
         final String partitionedTopicName = "partitioned-topic";
         final int numPartitions = 5;
-        persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionedTopicName, numPartitions);
-
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, numPartitions);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         String role = "role";
         Set<AuthAction> expectActions = new HashSet<>();
         expectActions.add(AuthAction.produce);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index cf5db2b..27af49e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -876,9 +876,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
         try {
             admin.topics().createPartitionedTopic(partitionedTopicName, 32);
             fail("Should have failed as the partitioned topic exists with its partition created");
-        } catch (PreconditionFailedException e) {
-            // Expecting PreconditionFailedException instead of ConflictException as it'll
-            // fail validation before actually try to create metadata in ZK.
+        } catch (ConflictException ignore) {
         }
 
         producer = client.newProducer(Schema.BYTES)


[pulsar] 05/05: pulsar-proxy: fix correct name for proxy thread executor name (#6460)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit aaa90628f7beebc589e1852e8622dab853d8ce1e
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sat Mar 7 18:18:33 2020 -0800

    pulsar-proxy: fix correct name for proxy thread executor name (#6460)
    
    ### Motivation
    fix correct name for proxy thread executor name
    (cherry picked from commit 5c2c058ff9d8c783fe7a6b00e55da524da7ef4bb)
---
 .../java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
index c3afdec..82dd42c 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
@@ -54,7 +54,7 @@ public class ZookeeperCacheLoader implements Closeable {
     private volatile List<LoadManagerReport> availableBrokers;
 
     private final OrderedScheduler orderedExecutor = OrderedScheduler.newSchedulerBuilder().numThreads(8)
-            .name("pulsar-discovery-ordered-cache").build();
+            .name("pulsar-proxy-ordered-cache").build();
 
     public static final String LOADBALANCE_BROKERS_ROOT = "/loadbalance/brokers";
 


[pulsar] 04/05: [proxy] Fix proxy routing to functions worker (#6486)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3c89abbdb1af5f3b144a7620187d67732359a427
Author: Addison Higham <ad...@gmail.com>
AuthorDate: Fri Mar 6 16:06:11 2020 -0700

    [proxy] Fix proxy routing to functions worker (#6486)
    
    ### Motivation
    
    Currently, the proxy only works to proxy v1/v2 functions routes to the
    function worker.
    
    ### Modifications
    
    This changes this code to proxy all routes for the function worker when
    those routes match. At the moment this is still a static list of
    prefixes, but in the future it may be possible to have this list of
    prefixes be dynamically fetched from the REST routes.
    
    ### Verifying this change
    - added some tests to ensure the routing works as expected
    (cherry picked from commit 329e2310069b61e25ce3f87f2828fab78f97187a)
---
 .../pulsar/proxy/server/AdminProxyHandler.java     | 26 ++++++++-
 .../proxy/server/FunctionWorkerRoutingTest.java    | 66 ++++++++++++++++++++++
 2 files changed, 89 insertions(+), 3 deletions(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
index ca44c8f..56a933b 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
@@ -26,9 +26,12 @@ import java.io.InputStream;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.security.cert.X509Certificate;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.Executor;
 
 import javax.net.ssl.SSLContext;
@@ -60,6 +63,21 @@ import org.slf4j.LoggerFactory;
 
 class AdminProxyHandler extends ProxyServlet {
     private static final Logger LOG = LoggerFactory.getLogger(AdminProxyHandler.class);
+    private static final Set<String> functionRoutes = new HashSet<>(Arrays.asList(
+        "/admin/v3/function",
+        "/admin/v2/function",
+        "/admin/function",
+        "/admin/v3/source",
+        "/admin/v2/source",
+        "/admin/source",
+        "/admin/v3/sink",
+        "/admin/v2/sink",
+        "/admin/sink",
+        "/admin/v2/worker",
+        "/admin/v2/worker-stats",
+        "/admin/worker",
+        "/admin/worker-stats"
+    ));
 
     private final ProxyConfiguration config;
     private final BrokerDiscoveryProvider discoveryProvider;
@@ -260,9 +278,11 @@ class AdminProxyHandler extends ProxyServlet {
 
         boolean isFunctionsRestRequest = false;
         String requestUri = request.getRequestURI();
-        if (requestUri.startsWith("/admin/v2/functions")
-            || requestUri.startsWith("/admin/functions")) {
-            isFunctionsRestRequest = true;
+        for (String routePrefix: functionRoutes) {
+            if (requestUri.startsWith(routePrefix)) {
+                isFunctionsRestRequest = true;
+                break;
+            }
         }
 
         if (isFunctionsRestRequest && !isBlank(functionWorkerWebServiceUrl)) {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java
new file mode 100644
index 0000000..b5d89cc
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import javax.servlet.http.HttpServletRequest;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class FunctionWorkerRoutingTest {
+
+    @Test
+    public void testFunctionWorkerRedirect() throws Exception {
+        String functionWorkerUrl = "http://function";
+        String brokerUrl = "http://broker";
+
+        ProxyConfiguration proxyConfig = new ProxyConfiguration();
+        proxyConfig.setBrokerWebServiceURL(brokerUrl);
+        proxyConfig.setFunctionWorkerWebServiceURL(functionWorkerUrl);
+
+        BrokerDiscoveryProvider discoveryProvider = mock(BrokerDiscoveryProvider.class);
+        AdminProxyHandler handler = new AdminProxyHandler(proxyConfig, discoveryProvider);
+
+        String funcUrl = handler.rewriteTarget(buildRequest("/admin/v3/functions/test/test"));
+        Assert.assertEquals(funcUrl, String.format("%s/admin/v3/functions/%s/%s",
+                functionWorkerUrl, "test", "test"));
+
+        String sourceUrl = handler.rewriteTarget(buildRequest("/admin/v3/sources/test/test"));
+        Assert.assertEquals(sourceUrl, String.format("%s/admin/v3/sources/%s/%s",
+                functionWorkerUrl, "test", "test"));
+
+        String sinkUrl = handler.rewriteTarget(buildRequest("/admin/v3/sinks/test/test"));
+        Assert.assertEquals(sinkUrl, String.format("%s/admin/v3/sinks/%s/%s",
+                functionWorkerUrl, "test", "test"));
+
+        String tenantUrl = handler.rewriteTarget(buildRequest("/admin/v2/tenants/test"));
+        Assert.assertEquals(tenantUrl, String.format("%s/admin/v2/tenants/%s",
+                brokerUrl, "test"));
+    }
+
+    static HttpServletRequest buildRequest(String url) {
+        HttpServletRequest mockReq = mock(HttpServletRequest.class);
+        when(mockReq.getRequestURI()).thenReturn(url);
+        return mockReq;
+    }
+
+}


[pulsar] 02/05: Fix memory leak when running topic compaction. (#6485)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit af106d879d1db6ab9c8b7d1821c672086bfa3421
Author: Rolf Arne Corneliussen <ra...@users.noreply.github.com>
AuthorDate: Fri Mar 6 07:32:28 2020 +0100

    Fix memory leak when running topic compaction. (#6485)
    
    Fixes #6482
    
    ### Motivation
    Prevent topic compaction from leaking direct memory
    
    ### Modifications
    
    Several leaks were discovered using Netty leak detection and code review.
    * `CompactedTopicImpl.readOneMessageId` would get an `Enumeration` of `LedgerEntry`, but did not release the underlying buffers. Fix: iterate though the `Enumeration` and release underlying buffer. Instead of logging the case where the `Enumeration` did not contain any elements, complete the future exceptionally with the message (will be logged by Caffeine).
    * Two main sources of leak in `TwoPhaseCompactor`. The `RawBacthConverter.rebatchMessage` method failed to close/release a `ByteBuf` (uncompressedPayload). Also, the return ByteBuf of `RawBacthConverter.rebatchMessage` was not closed. The first one was easy to fix (release buffer), to fix the second one and make the code easier to read, I decided to not let `RawBacthConverter.rebatchMessage`  close the message read from the topic, instead the message read from the topic can be closed  [...]
    
    ### Verifying this change
    Modified `RawReaderTest.testBatchingRebatch` to show new contract.
    
    One can run the test described to reproduce the issue, to verify no leak is detected.
    (cherry picked from commit f2ec1b4e2836859b0a6beb9b5a12656e4bcaf8f9)
---
 .../pulsar/client/impl/RawBatchConverter.java      |   5 +-
 .../pulsar/compaction/CompactedTopicImpl.java      |  19 ++--
 .../pulsar/compaction/TwoPhaseCompactor.java       | 119 +++++++++++----------
 .../apache/pulsar/client/impl/RawReaderTest.java   |   4 +-
 4 files changed, 82 insertions(+), 65 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index e252426..8c21a73 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -91,8 +91,7 @@ public class RawBatchConverter {
      * Take a batched message and a filter, and returns a message with the only the sub-messages
      * which match the filter. Returns an empty optional if no messages match.
      *
-     * This takes ownership of the passes in message, and if the returned optional is not empty,
-     * the ownership of that message is returned also.
+     *  NOTE: this message does not alter the reference count of the RawMessage argument.
      */
     public static Optional<RawMessage> rebatchMessage(RawMessage msg,
                                                       BiPredicate<String, MessageId> filter)
@@ -161,9 +160,9 @@ public class RawBatchConverter {
                 return Optional.empty();
             }
         } finally {
+            uncompressedPayload.release();
             batchBuffer.release();
             metadata.recycle();
-            msg.close();
         }
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index b1378b6..22efe8e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -164,12 +164,19 @@ public class CompactedTopicImpl implements CompactedTopic {
                                 if (rc != BKException.Code.OK) {
                                     promise.completeExceptionally(BKException.create(rc));
                                 } else {
-                                    try (RawMessage m = RawMessageImpl.deserializeFrom(
-                                                 seq.nextElement().getEntryBuffer())) {
-                                        promise.complete(m.getMessageIdData());
-                                    } catch (NoSuchElementException e) {
-                                        log.error("No such entry {} in ledger {}", entryId, lh.getId());
-                                        promise.completeExceptionally(e);
+                                    // Need to release buffers for all entries in the sequence
+                                    if (seq.hasMoreElements()) {
+                                        LedgerEntry entry = seq.nextElement();
+                                        try (RawMessage m = RawMessageImpl.deserializeFrom(entry.getEntryBuffer())) {
+                                            entry.getEntryBuffer().release();
+                                            while (seq.hasMoreElements()) {
+                                                seq.nextElement().getEntryBuffer().release();
+                                            }
+                                            promise.complete(m.getMessageIdData());
+                                        }
+                                    } else {
+                                        promise.completeExceptionally(new NoSuchElementException(
+                                                String.format("No such entry %d in ledger %d", entryId, lh.getId())));
                                     }
                                 }
                             }, null);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 06afe93..58d7369 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -208,77 +208,88 @@ public class TwoPhaseCompactor extends Compactor {
 
     private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId> latestForKey,
                               LedgerHandle lh, Semaphore outstanding, CompletableFuture<Void> promise) {
+        if (promise.isDone()) {
+            return;
+        }
         reader.readNextAsync().whenCompleteAsync(
                 (m, exception) -> {
                     if (exception != null) {
                         promise.completeExceptionally(exception);
                         return;
                     } else if (promise.isDone()) {
+                        m.close();
                         return;
                     }
-                    MessageId id = m.getMessageId();
-                    Optional<RawMessage> messageToAdd = Optional.empty();
-                    if (RawBatchConverter.isReadableBatch(m)) {
-                        try {
-                            messageToAdd = RawBatchConverter.rebatchMessage(
-                                    m, (key, subid) -> latestForKey.get(key).equals(subid));
-                        } catch (IOException ioe) {
-                            log.info("Error decoding batch for message {}. Whole batch will be included in output",
-                                     id, ioe);
-                            messageToAdd = Optional.of(m);
-                        }
-                    } else {
-                        Pair<String,Integer> keyAndSize = extractKeyAndSize(m);
-                        MessageId msg;
-                        if (keyAndSize == null) { // pass through messages without a key
-                            messageToAdd = Optional.of(m);
-                        } else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null
-                                && msg.equals(id)) { // consider message only if present into latestForKey map
-                            if (keyAndSize.getRight() <= 0) {
-                                promise.completeExceptionally(new IllegalArgumentException(
-                                        "Compaction phase found empty record from sorted key-map"));
+                    try {
+                        MessageId id = m.getMessageId();
+                        Optional<RawMessage> messageToAdd = Optional.empty();
+                        if (RawBatchConverter.isReadableBatch(m)) {
+                            try {
+                                messageToAdd = RawBatchConverter.rebatchMessage(
+                                        m, (key, subid) -> latestForKey.get(key).equals(subid));
+                            } catch (IOException ioe) {
+                                log.info("Error decoding batch for message {}. Whole batch will be included in output",
+                                        id, ioe);
+                                messageToAdd = Optional.of(m);
                             }
-                            messageToAdd = Optional.of(m);
                         } else {
-                            m.close();
+                            Pair<String,Integer> keyAndSize = extractKeyAndSize(m);
+                            MessageId msg;
+                            if (keyAndSize == null) { // pass through messages without a key
+                                messageToAdd = Optional.of(m);
+                            } else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null
+                                    && msg.equals(id)) { // consider message only if present into latestForKey map
+                                if (keyAndSize.getRight() <= 0) {
+                                    promise.completeExceptionally(new IllegalArgumentException(
+                                            "Compaction phase found empty record from sorted key-map"));
+                                }
+                                messageToAdd = Optional.of(m);
+                            }
                         }
-                    }
 
-                    if (messageToAdd.isPresent()) {
-                        try {
-                            outstanding.acquire();
-                            CompletableFuture<Void> addFuture = addToCompactedLedger(lh, messageToAdd.get())
-                                    .whenComplete((res, exception2) -> {
-                                        outstanding.release();
-                                        if (exception2 != null) {
-                                            promise.completeExceptionally(exception2);
+                        if (messageToAdd.isPresent()) {
+                            RawMessage message = messageToAdd.get();
+                            try {
+                                outstanding.acquire();
+                                CompletableFuture<Void> addFuture = addToCompactedLedger(lh, message)
+                                        .whenComplete((res, exception2) -> {
+                                            outstanding.release();
+                                            if (exception2 != null) {
+                                                promise.completeExceptionally(exception2);
+                                            }
+                                        });
+                                if (to.equals(id)) {
+                                    addFuture.whenComplete((res, exception2) -> {
+                                        if (exception2 == null) {
+                                            promise.complete(null);
                                         }
                                     });
-                            if (to.equals(id)) {
-                                addFuture.whenComplete((res, exception2) -> {
-                                    if (exception2 == null) {
-                                        promise.complete(null);
-                                    }
-                                });
+                                }
+                            } catch (InterruptedException ie) {
+                                Thread.currentThread().interrupt();
+                                promise.completeExceptionally(ie);
+                            } finally {
+                                if (message != m) {
+                                    message.close();
+                                }
                             }
-                        } catch (InterruptedException ie) {
-                            Thread.currentThread().interrupt();
-                            promise.completeExceptionally(ie);
-                        }
-                    } else if (to.equals(id)) {
-                        // Reached to last-id and phase-one found it deleted-message while iterating on ledger so, not
-                        // present under latestForKey. Complete the compaction.
-                        try {
-                            // make sure all inflight writes have finished
-                            outstanding.acquire(MAX_OUTSTANDING);
-                            promise.complete(null);
-                        } catch (InterruptedException e) {
-                            Thread.currentThread().interrupt();
-                            promise.completeExceptionally(e);
+                        } else if (to.equals(id)) {
+                            // Reached to last-id and phase-one found it deleted-message while iterating on ledger so,
+                            // not present under latestForKey. Complete the compaction.
+                            try {
+                                // make sure all inflight writes have finished
+                                outstanding.acquire(MAX_OUTSTANDING);
+                                promise.complete(null);
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                promise.completeExceptionally(e);
+                            }
+                            return;
                         }
-                        return;
+                        phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise);
+                    } finally {
+                        m.close();
                     }
-                    phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise);
                 }, scheduler);
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index b0c7cd1..5ae4618 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -319,13 +319,13 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest {
         }
 
         RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
-        try {
-            RawMessage m1 = reader.readNextAsync().get();
+        try (RawMessage m1 = reader.readNextAsync().get()) {
             RawMessage m2 = RawBatchConverter.rebatchMessage(m1, (key, id) -> key.equals("key2")).get();
             List<ImmutablePair<MessageId,String>> idsAndKeys = RawBatchConverter.extractIdsAndKeys(m2);
             Assert.assertEquals(idsAndKeys.size(), 1);
             Assert.assertEquals(idsAndKeys.get(0).getRight(), "key2");
             m2.close();
+            Assert.assertEquals(m1.getHeadersAndPayload().refCnt(), 1);
         } finally {
             reader.closeAsync().get();
         }