You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/29 06:25:40 UTC

[pulsar] branch branch-2.9 updated (e04c19eec38 -> 2fbdb174718)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from e04c19eec38 Add log when update namespace policies with error. (#15056)
     new 025500ae8cc Support shrink in ConcurrentLongHashMap (#14497)
     new a052f938130 Optimize memory usage: support to  shrink for pendingAcks map (#14515)
     new f2564912969 support shrink for map or set (#14663)
     new e38d75a0950 Reduce unnecessary expansions for ConcurrentLong map and set (#14562)
     new cc17736dff5 [fix][txn] Fix potentially unfinishable future. (#15208)
     new fab8c7c6d95 Skip unnecessary DNS resolution when creating AuthenticationDataHttp instance (#15221)
     new f91a920bad0 Improve skipping of DNS resolution when creating AuthenticationDataHttp instance (#15228)
     new 433a48a22a7 Fix topic closed normally but still call `closeFencedTopicForcefully`. (#15196) (#15202)
     new 76e1f740e86 [Functions] Check executor null when closing the FileSource (#15247)
     new f95b98f1edb [Fix][Broker] Fix race condition in `OpAddEntry` (#15233)
     new 91a9e0b79c8 Pulsar SQL support for Decimal data type (#15153)
     new ce7af777926 [fix] [broker] Fix problem at RateLimiter#tryAcquire (#15306)
     new 8094fe55802 [C++] Wait until event loop terminates when closing the Client (#15316)
     new a462731bbfb [fix][broker] fix resource group does not report usage (#15292)
     new b0eff93b8fd [improve][broker] Use shrink map for message redelivery. (#15342)
     new 38057e42840 [fix][test]: fix flaky test of ManagedCursorMetricsTest.testManagedCursorMetrics (#9919) (#14720)
     new 2fbdb174718 [improve][broker] Support shrink for ConcurrentSortedLongPairSet  (#15354)

The 17 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  10 +-
 .../mledger/impl/ManagedLedgerOfflineBacklog.java  |   3 +-
 .../apache/pulsar/broker/ServiceConfiguration.java |   8 +
 .../authentication/AuthenticationDataHttp.java     |   2 +-
 .../broker/TransactionMetadataStoreService.java    |  11 +-
 .../broker/loadbalance/impl/LoadManagerShared.java |  20 +-
 .../loadbalance/impl/ModularLoadManagerImpl.java   |  18 +-
 .../loadbalance/impl/SimpleLoadManagerImpl.java    |  18 +-
 .../pulsar/broker/namespace/NamespaceService.java  |  13 +-
 .../resourcegroup/ResourceQuotaCalculatorImpl.java |   4 +-
 .../org/apache/pulsar/broker/rest/TopicsBase.java  |   3 +-
 .../pulsar/broker/service/BrokerService.java       |  42 +-
 .../org/apache/pulsar/broker/service/Consumer.java |  11 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |  10 +-
 .../service/nonpersistent/NonPersistentTopic.java  |  12 +-
 .../service/persistent/MessageDeduplication.java   |  12 +-
 .../persistent/MessageRedeliveryController.java    |  11 +-
 .../broker/service/persistent/PersistentTopic.java |  33 +-
 .../broker/stats/ClusterReplicationMetrics.java    |   3 +-
 .../AntiAffinityNamespaceGroupTest.java            |  15 +-
 .../loadbalance/impl/LoadManagerSharedTest.java    |  13 +-
 .../ResourceQuotaCalculatorImplTest.java           |  10 +
 .../pulsar/broker/service/PersistentTopicTest.java |  46 +-
 .../MessageRedeliveryControllerTest.java           |   2 +-
 .../broker/stats/ManagedCursorMetricsTest.java     |  26 +-
 pulsar-client-cpp/lib/ClientImpl.cc                |  37 +-
 pulsar-client-cpp/lib/ExecutorService.cc           |  33 +-
 pulsar-client-cpp/lib/ExecutorService.h            |  11 +-
 pulsar-client-cpp/lib/TimeUtils.h                  |  48 ++
 pulsar-client-cpp/tests/CustomLoggerTest.cc        |  26 +-
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  24 +-
 .../apache/pulsar/client/impl/ConsumerBase.java    |   3 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   3 +-
 .../client/impl/PartitionedProducerImpl.java       |   3 +-
 .../apache/pulsar/client/impl/ProducerBase.java    |   3 +-
 .../client/impl/TransactionMetaStoreHandler.java   |   5 +-
 .../TransactionCoordinatorClientImpl.java          |   6 +-
 .../impl/AcknowledgementsGroupingTrackerTest.java  |   3 +-
 .../org/apache/pulsar/common/util/RateLimiter.java |   3 +-
 .../util/collections/ConcurrentLongHashMap.java    | 149 ++++-
 .../collections/ConcurrentLongLongPairHashMap.java | 673 +++++++++++++++++++++
 .../util/collections/ConcurrentLongPairSet.java    | 174 +++++-
 .../util/collections/ConcurrentOpenHashMap.java    | 159 ++++-
 .../util/collections/ConcurrentOpenHashSet.java    | 158 ++++-
 .../collections/ConcurrentSortedLongPairSet.java   |  32 +-
 .../common/util/collections/LongPairSet.java       |   7 +
 .../apache/pulsar/common/util/RateLimiterTest.java |  20 +-
 .../collections/ConcurrentLongHashMapTest.java     | 141 ++++-
 .../ConcurrentLongLongPairHashMapTest.java         | 427 +++++++++++++
 .../collections/ConcurrentLongPairSetTest.java     | 130 +++-
 .../collections/ConcurrentOpenHashMapTest.java     | 144 ++++-
 .../collections/ConcurrentOpenHashSetTest.java     |  93 ++-
 .../ConcurrentSortedLongPairSetTest.java           |  43 ++
 .../java/org/apache/pulsar/io/file/FileSource.java |  12 +-
 .../pulsar/sql/presto/PulsarRecordCursor.java      |   3 +-
 .../decoder/avro/PulsarAvroColumnDecoder.java      |  19 +-
 .../decoder/avro/PulsarAvroRowDecoderFactory.java  |  10 +-
 .../decoder/json/PulsarJsonRowDecoderFactory.java  |   6 +
 .../pulsar/sql/presto/TestPulsarConnector.java     |   8 +-
 .../pulsar/sql/presto/TestPulsarRecordCursor.java  |  15 +
 .../sql/presto/decoder/AbstractDecoderTester.java  |   5 +
 .../sql/presto/decoder/DecoderTestMessage.java     |   6 +-
 .../pulsar/sql/presto/decoder/DecoderTestUtil.java |  20 +
 .../sql/presto/decoder/avro/TestAvroDecoder.java   |  11 +
 .../apache/pulsar/websocket/WebSocketService.java  |  23 +-
 .../apache/pulsar/websocket/stats/ProxyStats.java  |   4 +-
 66 files changed, 2800 insertions(+), 256 deletions(-)
 create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongLongPairHashMap.java
 create mode 100644 pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongLongPairHashMapTest.java


[pulsar] 13/17: [C++] Wait until event loop terminates when closing the Client (#15316)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8094fe558028806f44fb84d1cdaaab4c9841089e
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Apr 27 15:10:01 2022 +0800

    [C++] Wait until event loop terminates when closing the Client (#15316)
    
    * [C++] Wait until event loops terminates when closing the Client
    
    Fixes #13267
    
    ### Motivation
    
    Unlike Java client, the `Client` of C++ client has a `shutdown` method
    that is responsible to execute the following steps:
    1. Call `shutdown` on all internal producers and consumers
    2. Close all connections in the pool
    3. Close all executors of the executor providers.
    
    When an executor is closed, it call `io_service::stop()`, which makes
    the event loop (`io_service::run()`) in another thread return as soon as
    possible. However, there is no wait operation. If a client failed to
    create a producer or consumer, the `close` method will call `shutdown`
    and close all executors immediately and exits the application. In this
    case, the detached event loop thread might not exit ASAP, then valgrind
    will detect the memory leak.
    
    This memory leak can be avoided by sleeping for a while after
    `Client::close` returns or there are still other things to do after
    that. However, we should still adopt the semantics that after
    `Client::shutdown` returns, all event loop threads should be terminated.
    
    ### Modifications
    - Add a timeout parameter to the `close` method of `ExecutorService` and
      `ExecutorServiceProvider` as the max blocking timeout if it's
      non-negative.
    - Add a `TimeoutProcessor` helper class to update the left timeout after
      calling all methods that accept the timeout parameter.
    - Call `close` on all `ExecutorServiceProvider`s in
      `ClientImpl::shutdown` with 500ms timeout, which could be long enough.
      In addition, in `handleClose` method, call `shutdown` in another
      thread to avoid the deadlock.
    
    ### Verifying this change
    
    After applying this patch, the reproduce code in #13627 will pass the
    valgrind check.
    
    ```
    ==3013== LEAK SUMMARY:
    ==3013==    definitely lost: 0 bytes in 0 blocks
    ==3013==    indirectly lost: 0 bytes in 0 blocks
    ==3013==      possibly lost: 0 bytes in 0 blocks
    ```
    
    (cherry picked from commit cd78f39a92521f3847b022580a6e66e651b5cb4b)
---
 pulsar-client-cpp/lib/ClientImpl.cc         | 37 +++++++++++++++++-----
 pulsar-client-cpp/lib/ExecutorService.cc    | 33 +++++++++++++++-----
 pulsar-client-cpp/lib/ExecutorService.h     | 11 +++++--
 pulsar-client-cpp/lib/TimeUtils.h           | 48 +++++++++++++++++++++++++++++
 pulsar-client-cpp/tests/CustomLoggerTest.cc | 26 ++++++++++------
 5 files changed, 130 insertions(+), 25 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index 8cdaacc8b25..6d9c4ed044e 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -26,6 +26,7 @@
 #include "PartitionedConsumerImpl.h"
 #include "MultiTopicsConsumerImpl.h"
 #include "PatternMultiTopicsConsumerImpl.h"
+#include "TimeUtils.h"
 #include <pulsar/ConsoleLoggerFactory.h>
 #include <boost/algorithm/string/predicate.hpp>
 #include <sstream>
@@ -34,6 +35,7 @@
 #include <algorithm>
 #include <random>
 #include <mutex>
+#include <thread>
 #ifdef USE_LOG4CXX
 #include "Log4CxxLogger.h"
 #endif
@@ -534,13 +536,20 @@ void ClientImpl::handleClose(Result result, SharedInt numberOfOpenHandlers, Resu
         lock.unlock();
 
         LOG_DEBUG("Shutting down producers and consumers for client");
-        shutdown();
-        if (callback) {
-            if (closingError != ResultOk) {
-                LOG_DEBUG("Problem in closing client, could not close one or more consumers or producers");
+        // handleClose() is called in ExecutorService's event loop, while shutdown() tried to wait the event
+        // loop exits. So here we use another thread to call shutdown().
+        auto self = shared_from_this();
+        std::thread shutdownTask{[this, self, callback] {
+            shutdown();
+            if (callback) {
+                if (closingError != ResultOk) {
+                    LOG_DEBUG(
+                        "Problem in closing client, could not close one or more consumers or producers");
+                }
+                callback(closingError);
             }
-            callback(closingError);
-        }
+        }};
+        shutdownTask.detach();
     }
 }
 
@@ -576,11 +585,25 @@ void ClientImpl::shutdown() {
         return;
     }
     LOG_DEBUG("ConnectionPool is closed");
-    ioExecutorProvider_->close();
+
+    // 500ms as the timeout is long enough because ExecutorService::close calls io_service::stop() internally
+    // and waits until io_service::run() in another thread returns, which should be as soon as possible after
+    // stop() is called.
+    TimeoutProcessor<std::chrono::milliseconds> timeoutProcessor{500};
+
+    timeoutProcessor.tik();
+    ioExecutorProvider_->close(timeoutProcessor.getLeftTimeout());
+    timeoutProcessor.tok();
     LOG_DEBUG("ioExecutorProvider_ is closed");
+
+    timeoutProcessor.tik();
     listenerExecutorProvider_->close();
+    timeoutProcessor.tok();
     LOG_DEBUG("listenerExecutorProvider_ is closed");
+
+    timeoutProcessor.tik();
     partitionListenerExecutorProvider_->close();
+    timeoutProcessor.tok();
     LOG_DEBUG("partitionListenerExecutorProvider_ is closed");
 }
 
diff --git a/pulsar-client-cpp/lib/ExecutorService.cc b/pulsar-client-cpp/lib/ExecutorService.cc
index 9cfbd82881d..b9b5ed46478 100644
--- a/pulsar-client-cpp/lib/ExecutorService.cc
+++ b/pulsar-client-cpp/lib/ExecutorService.cc
@@ -21,6 +21,7 @@
 #include <boost/asio.hpp>
 #include <functional>
 #include <memory>
+#include "TimeUtils.h"
 
 #include "LogUtils.h"
 DECLARE_LOG_OBJECT()
@@ -29,7 +30,7 @@ namespace pulsar {
 
 ExecutorService::ExecutorService() {}
 
-ExecutorService::~ExecutorService() { close(); }
+ExecutorService::~ExecutorService() { close(0); }
 
 void ExecutorService::start() {
     auto self = shared_from_this();
@@ -37,11 +38,16 @@ void ExecutorService::start() {
         if (self->isClosed()) {
             return;
         }
+        LOG_INFO("Run io_service in a single thread");
         boost::system::error_code ec;
         self->getIOService().run(ec);
         if (ec) {
             LOG_ERROR("Failed to run io_service: " << ec.message());
+        } else {
+            LOG_INFO("Event loop of ExecutorService exits successfully");
         }
+        self->ioServiceDone_ = true;
+        self->cond_.notify_all();
     }};
     t.detach();
 }
@@ -79,13 +85,23 @@ DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
     return DeadlineTimerPtr(new boost::asio::deadline_timer(io_service_));
 }
 
-void ExecutorService::close() {
+void ExecutorService::close(long timeoutMs) {
     bool expectedState = false;
     if (!closed_.compare_exchange_strong(expectedState, true)) {
         return;
     }
+    if (timeoutMs == 0) {  // non-blocking
+        io_service_.stop();
+        return;
+    }
 
+    std::unique_lock<std::mutex> lock{mutex_};
     io_service_.stop();
+    if (timeoutMs > 0) {
+        cond_.wait_for(lock, std::chrono::milliseconds(timeoutMs), [this] { return ioServiceDone_.load(); });
+    } else {  // < 0
+        cond_.wait(lock, [this] { return ioServiceDone_.load(); });
+    }
 }
 
 void ExecutorService::postWork(std::function<void(void)> task) { io_service_.post(task); }
@@ -106,14 +122,17 @@ ExecutorServicePtr ExecutorServiceProvider::get() {
     return executors_[idx];
 }
 
-void ExecutorServiceProvider::close() {
+void ExecutorServiceProvider::close(long timeoutMs) {
     Lock lock(mutex_);
 
-    for (ExecutorList::iterator it = executors_.begin(); it != executors_.end(); ++it) {
-        if (*it != NULL) {
-            (*it)->close();
+    TimeoutProcessor<std::chrono::milliseconds> timeoutProcessor{timeoutMs};
+    for (auto &&executor : executors_) {
+        timeoutProcessor.tik();
+        if (executor) {
+            executor->close(timeoutProcessor.getLeftTimeout());
         }
-        it->reset();
+        timeoutProcessor.tok();
+        executor.reset();
     }
 }
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ExecutorService.h b/pulsar-client-cpp/lib/ExecutorService.h
index 6b0909194b7..e4cbb3ce62e 100644
--- a/pulsar-client-cpp/lib/ExecutorService.h
+++ b/pulsar-client-cpp/lib/ExecutorService.h
@@ -20,6 +20,8 @@
 #define _PULSAR_EXECUTOR_SERVICE_HEADER_
 
 #include <atomic>
+#include <condition_variable>
+#include <chrono>
 #include <memory>
 #include <boost/asio.hpp>
 #include <boost/asio/ssl.hpp>
@@ -50,7 +52,8 @@ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<Execut
     DeadlineTimerPtr createDeadlineTimer();
     void postWork(std::function<void(void)> task);
 
-    void close();
+    // See TimeoutProcessor for the semantics of the parameter.
+    void close(long timeoutMs = 3000);
 
     IOService &getIOService() { return io_service_; }
     bool isClosed() const noexcept { return closed_; }
@@ -68,6 +71,9 @@ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<Execut
     IOService::work work_{io_service_};
 
     std::atomic_bool closed_{false};
+    std::mutex mutex_;
+    std::condition_variable cond_;
+    std::atomic_bool ioServiceDone_{false};
 
     ExecutorService();
 
@@ -82,7 +88,8 @@ class PULSAR_PUBLIC ExecutorServiceProvider {
 
     ExecutorServicePtr get();
 
-    void close();
+    // See TimeoutProcessor for the semantics of the parameter.
+    void close(long timeoutMs = 3000);
 
    private:
     typedef std::vector<ExecutorServicePtr> ExecutorList;
diff --git a/pulsar-client-cpp/lib/TimeUtils.h b/pulsar-client-cpp/lib/TimeUtils.h
index 1da7d65923a..45157ae855b 100644
--- a/pulsar-client-cpp/lib/TimeUtils.h
+++ b/pulsar-client-cpp/lib/TimeUtils.h
@@ -19,6 +19,8 @@
 #pragma once
 
 #include <boost/date_time/local_time/local_time.hpp>
+#include <atomic>
+#include <chrono>
 
 #include <pulsar/defines.h>
 
@@ -33,4 +35,50 @@ class PULSAR_PUBLIC TimeUtils {
     static ptime now();
     static int64_t currentTimeMillis();
 };
+
+// This class processes a timeout with the following semantics:
+//  > 0: wait at most the timeout until a blocking operation completes
+//  == 0: do not wait the blocking operation
+//  < 0: wait infinitely until a blocking operation completes.
+//
+// Here is a simple example usage:
+//
+// ```c++
+// // Wait at most 300 milliseconds
+// TimeoutProcessor<std::chrono::milliseconds> timeoutProcessor{300};
+// while (!allOperationsAreDone()) {
+//     timeoutProcessor.tik();
+//     // This method may block for some time
+//     performBlockingOperation(timeoutProcessor.getLeftTimeout());
+//     timeoutProcessor.tok();
+// }
+// ```
+//
+// The template argument is the same as std::chrono::duration.
+template <typename Duration>
+class TimeoutProcessor {
+   public:
+    using Clock = std::chrono::high_resolution_clock;
+
+    TimeoutProcessor(long timeout) : leftTimeout_(timeout) {}
+
+    long getLeftTimeout() const noexcept { return leftTimeout_; }
+
+    void tik() { before_ = Clock::now(); }
+
+    void tok() {
+        if (leftTimeout_ > 0) {
+            leftTimeout_ -= std::chrono::duration_cast<Duration>(Clock::now() - before_).count();
+            if (leftTimeout_ <= 0) {
+                // The timeout exceeds, getLeftTimeout() will return 0 to indicate we should not wait more
+                leftTimeout_ = 0;
+            }
+        }
+    }
+
+   private:
+    std::atomic_long leftTimeout_;
+    std::chrono::time_point<Clock> before_;
+};
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/tests/CustomLoggerTest.cc b/pulsar-client-cpp/tests/CustomLoggerTest.cc
index 0b4e76adcc4..bd80c312e3b 100644
--- a/pulsar-client-cpp/tests/CustomLoggerTest.cc
+++ b/pulsar-client-cpp/tests/CustomLoggerTest.cc
@@ -20,6 +20,7 @@
 #include <pulsar/ConsoleLoggerFactory.h>
 #include <LogUtils.h>
 #include <gtest/gtest.h>
+#include <atomic>
 #include <thread>
 
 using namespace pulsar;
@@ -28,35 +29,42 @@ static std::vector<std::string> logLines;
 
 class MyTestLogger : public Logger {
    public:
-    MyTestLogger() = default;
+    MyTestLogger(const std::string &fileName) : fileName_(fileName) {}
 
     bool isEnabled(Level level) override { return true; }
 
     void log(Level level, int line, const std::string &message) override {
         std::stringstream ss;
-        ss << " " << level << ":" << line << " " << message << std::endl;
+        ss << std::this_thread::get_id() << " " << level << " " << fileName_ << ":" << line << " " << message
+           << std::endl;
         logLines.emplace_back(ss.str());
     }
+
+   private:
+    const std::string fileName_;
 };
 
 class MyTestLoggerFactory : public LoggerFactory {
    public:
-    Logger *getLogger(const std::string &fileName) override { return logger; }
-
-   private:
-    MyTestLogger *logger = new MyTestLogger;
+    Logger *getLogger(const std::string &fileName) override { return new MyTestLogger(fileName); }
 };
 
 TEST(CustomLoggerTest, testCustomLogger) {
     // simulate new client created on a different thread (because logging factory is called once per thread)
-    auto testThread = std::thread([] {
+    std::atomic_int numLogLines{0};
+    auto testThread = std::thread([&numLogLines] {
         ClientConfiguration clientConfig;
         auto customLogFactory = new MyTestLoggerFactory();
         clientConfig.setLogger(customLogFactory);
         // reset to previous log factory
         Client client("pulsar://localhost:6650", clientConfig);
         client.close();
-        ASSERT_EQ(logLines.size(), 7);
+        ASSERT_TRUE(logLines.size() > 0);
+        for (auto &&line : logLines) {
+            std::cout << line;
+            std::cout.flush();
+        }
+        numLogLines = logLines.size();
         LogUtils::resetLoggerFactory();
     });
     testThread.join();
@@ -65,7 +73,7 @@ TEST(CustomLoggerTest, testCustomLogger) {
     Client client("pulsar://localhost:6650", clientConfig);
     client.close();
     // custom logger didn't get any new lines
-    ASSERT_EQ(logLines.size(), 7);
+    ASSERT_EQ(logLines.size(), numLogLines);
 }
 
 TEST(CustomLoggerTest, testConsoleLoggerFactory) {


[pulsar] 06/17: Skip unnecessary DNS resolution when creating AuthenticationDataHttp instance (#15221)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fab8c7c6d95220daa4da0e7a45b52a2c98949d83
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Apr 20 06:08:36 2022 +0300

    Skip unnecessary DNS resolution when creating AuthenticationDataHttp instance (#15221)
    
    (cherry picked from commit 14991c93533927c35dd3cba74fe52ba3d57f244b)
---
 .../apache/pulsar/broker/authentication/AuthenticationDataHttp.java  | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttp.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttp.java
index 958e5eab9c4..9ffb29c0376 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttp.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttp.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.authentication;
 
+import io.netty.util.NetUtil;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 
@@ -35,7 +36,9 @@ public class AuthenticationDataHttp implements AuthenticationDataSource {
             throw new IllegalArgumentException();
         }
         this.request = request;
-        this.remoteAddress = new InetSocketAddress(request.getRemoteAddr(), request.getRemotePort());
+        this.remoteAddress =
+                new InetSocketAddress(NetUtil.createInetAddressFromIpAddressString(request.getRemoteAddr()),
+                        request.getRemotePort());
     }
 
     /*


[pulsar] 11/17: Pulsar SQL support for Decimal data type (#15153)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 91a9e0b79c88b0c889be200b8b182eaf3661bf44
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Thu Apr 21 17:26:35 2022 +0800

    Pulsar SQL support for Decimal data type (#15153)
    
    (cherry picked from commit 6b004ed6a2554ab826a00aa2a177963de3c5f44b)
---
 .../presto/decoder/avro/PulsarAvroColumnDecoder.java | 19 ++++++++++++++++++-
 .../decoder/avro/PulsarAvroRowDecoderFactory.java    | 10 +++++++++-
 .../decoder/json/PulsarJsonRowDecoderFactory.java    |  6 ++++++
 .../pulsar/sql/presto/TestPulsarConnector.java       |  8 +++++++-
 .../pulsar/sql/presto/TestPulsarRecordCursor.java    | 15 +++++++++++++++
 .../sql/presto/decoder/AbstractDecoderTester.java    |  5 +++++
 .../sql/presto/decoder/DecoderTestMessage.java       |  6 +++++-
 .../pulsar/sql/presto/decoder/DecoderTestUtil.java   | 20 ++++++++++++++++++++
 .../sql/presto/decoder/avro/TestAvroDecoder.java     | 11 +++++++++++
 9 files changed, 96 insertions(+), 4 deletions(-)

diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java
index 3b7e2935579..fc122fc1fa4 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java
@@ -41,6 +41,8 @@ import io.prestosql.spi.type.ArrayType;
 import io.prestosql.spi.type.BigintType;
 import io.prestosql.spi.type.BooleanType;
 import io.prestosql.spi.type.DateType;
+import io.prestosql.spi.type.DecimalType;
+import io.prestosql.spi.type.Decimals;
 import io.prestosql.spi.type.DoubleType;
 import io.prestosql.spi.type.IntegerType;
 import io.prestosql.spi.type.MapType;
@@ -54,6 +56,7 @@ import io.prestosql.spi.type.TinyintType;
 import io.prestosql.spi.type.Type;
 import io.prestosql.spi.type.VarbinaryType;
 import io.prestosql.spi.type.VarcharType;
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
 
 import java.util.List;
@@ -142,7 +145,7 @@ public class PulsarAvroColumnDecoder {
     }
 
     private boolean isSupportedPrimitive(Type type) {
-        return type instanceof VarcharType || SUPPORTED_PRIMITIVE_TYPES.contains(type);
+        return type instanceof VarcharType || type instanceof DecimalType || SUPPORTED_PRIMITIVE_TYPES.contains(type);
     }
 
     public FieldValueProvider decodeField(GenericRecord avroRecord) {
@@ -208,6 +211,13 @@ public class PulsarAvroColumnDecoder {
                 return floatToIntBits((Float) value);
             }
 
+            if (columnType instanceof DecimalType) {
+                ByteBuffer buffer = (ByteBuffer) value;
+                byte[] bytes = new byte[buffer.remaining()];
+                buffer.get(bytes);
+                return new BigInteger(bytes).longValue();
+            }
+
             throw new PrestoException(DECODER_CONVERSION_NOT_SUPPORTED,
                     format("cannot decode object of '%s' as '%s' for column '%s'",
                             value.getClass(), columnType, columnName));
@@ -237,6 +247,13 @@ public class PulsarAvroColumnDecoder {
             }
         }
 
+        // The returned Slice size must be equals to 18 Byte
+        if (type instanceof DecimalType) {
+            ByteBuffer buffer = (ByteBuffer) value;
+            BigInteger bigInteger = new BigInteger(buffer.array());
+            return Decimals.encodeUnscaledValue(bigInteger);
+        }
+
         throw new PrestoException(DECODER_CONVERSION_NOT_SUPPORTED,
                 format("cannot decode object of '%s' as '%s' for column '%s'",
                         value.getClass(), type, columnName));
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java
index 26c333ab56b..7f9169e5273 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java
@@ -34,6 +34,7 @@ import io.prestosql.spi.connector.ColumnMetadata;
 import io.prestosql.spi.type.ArrayType;
 import io.prestosql.spi.type.BigintType;
 import io.prestosql.spi.type.BooleanType;
+import io.prestosql.spi.type.DecimalType;
 import io.prestosql.spi.type.DoubleType;
 import io.prestosql.spi.type.IntegerType;
 import io.prestosql.spi.type.RealType;
@@ -130,7 +131,14 @@ public class PulsarAvroRowDecoderFactory implements PulsarRowDecoderFactory {
                                 + "please check the schema or report the bug.", fieldname));
             case FIXED:
             case BYTES:
-                //TODO: support decimal logicalType
+                //  When the precision <= 0, throw Exception.
+                //  When the precision > 0 and <= 18, use ShortDecimalType. and mapping Long
+                //  When the precision > 18 and <= 36, use LongDecimalType. and mapping Slice
+                //  When the precision > 36, throw Exception.
+                if (logicalType instanceof LogicalTypes.Decimal) {
+                    LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
+                    return DecimalType.createDecimalType(decimal.getPrecision(), decimal.getScale());
+                }
                 return VarbinaryType.VARBINARY;
             case INT:
                 if (logicalType == LogicalTypes.timeMillis()) {
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
index 10a500ba361..843c2c7f836 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
@@ -128,6 +128,12 @@ public class PulsarJsonRowDecoderFactory implements PulsarRowDecoderFactory {
                                 + "please check the schema or report the bug.", fieldname));
             case FIXED:
             case BYTES:
+                // In the current implementation, since JsonSchema is generated by Avro,
+                // there may exist LogicalTypes.Decimal.
+                // Mapping decimalType with varcharType in JsonSchema.
+                if (logicalType instanceof LogicalTypes.Decimal) {
+                    return createUnboundedVarcharType();
+                }
                 return VarbinaryType.VARBINARY;
             case INT:
                 if (logicalType == LogicalTypes.timeMillis()) {
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index 39d8ba8213c..fdfde36cb28 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -25,6 +25,7 @@ import io.prestosql.spi.connector.ColumnMetadata;
 import io.prestosql.spi.connector.ConnectorContext;
 import io.prestosql.spi.predicate.TupleDomain;
 import io.prestosql.testing.TestingConnectorContext;
+import java.math.BigDecimal;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -166,6 +167,8 @@ public abstract class TestPulsarConnector {
         public int time;
         @org.apache.avro.reflect.AvroSchema("{ \"type\": \"int\", \"logicalType\": \"date\" }")
         public int date;
+        @org.apache.avro.reflect.AvroSchema("{ \"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 4, \"scale\": 2 }")
+        public BigDecimal decimal;
         public TestPulsarConnector.Bar bar;
         public TestEnum field7;
     }
@@ -253,6 +256,7 @@ public abstract class TestPulsarConnector {
             fooFieldNames.add("date");
             fooFieldNames.add("bar");
             fooFieldNames.add("field7");
+            fooFieldNames.add("decimal");
 
 
             ConnectorContext prestoConnectorContext = new TestingConnectorContext();
@@ -313,6 +317,7 @@ public abstract class TestPulsarConnector {
                 LocalDate epoch = LocalDate.ofEpochDay(0);
                 return Math.toIntExact(ChronoUnit.DAYS.between(epoch, localDate));
             });
+            fooFunctions.put("decimal", integer -> BigDecimal.valueOf(1234, 2));
             fooFunctions.put("bar.field1", integer -> integer % 3 == 0 ? null : integer + 1);
             fooFunctions.put("bar.field2", integer -> integer % 2 == 0 ? null : String.valueOf(integer + 2));
             fooFunctions.put("bar.field3", integer -> integer + 3.0f);
@@ -331,7 +336,6 @@ public abstract class TestPulsarConnector {
      * @param schemaInfo
      * @param handleKeyValueType
      * @param includeInternalColumn
-     * @param dispatchingRowDecoderFactory
      * @return
      */
     protected static List<PulsarColumnHandle> getColumnColumnHandles(TopicName topicName, SchemaInfo schemaInfo,
@@ -393,6 +397,7 @@ public abstract class TestPulsarConnector {
             LocalDate localDate = LocalDate.now();
             LocalDate epoch = LocalDate.ofEpochDay(0);
             foo.date = Math.toIntExact(ChronoUnit.DAYS.between(epoch, localDate));
+            foo.decimal= BigDecimal.valueOf(count, 2);
 
             MessageMetadata messageMetadata = new MessageMetadata()
                     .setProducerName("test-producer").setSequenceId(i)
@@ -609,6 +614,7 @@ public abstract class TestPulsarConnector {
                                     foo.timestamp = (long) fooFunctions.get("timestamp").apply(count);
                                     foo.time = (int) fooFunctions.get("time").apply(count);
                                     foo.date = (int) fooFunctions.get("date").apply(count);
+                                    foo.decimal = (BigDecimal) fooFunctions.get("decimal").apply(count);
                                     foo.bar = bar;
                                     foo.field7 = (Foo.TestEnum) fooFunctions.get("field7").apply(count);
 
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
index d60ff20522c..2ea2616b6f1 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
@@ -22,7 +22,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import io.airlift.log.Logger;
 import io.netty.buffer.ByteBuf;
 import io.prestosql.spi.predicate.TupleDomain;
+import io.prestosql.spi.type.DecimalType;
 import io.prestosql.spi.type.RowType;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.VarcharType;
+import java.math.BigDecimal;
 import lombok.Data;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
@@ -143,6 +147,17 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
                         }else if (fooColumnHandles.get(i).getName().equals("field7")) {
                             assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), fooFunctions.get("field7").apply(count).toString().getBytes());
                             columnsSeen.add(fooColumnHandles.get(i).getName());
+                        }else if (fooColumnHandles.get(i).getName().equals("decimal")) {
+                            Type type = fooColumnHandles.get(i).getType();
+                            // In JsonDecoder, decimal trans to varcharType
+                            if (type instanceof VarcharType) {
+                                assertEquals(new String(pulsarRecordCursor.getSlice(i).getBytes()),
+                                        fooFunctions.get("decimal").apply(count).toString());
+                            } else {
+                                DecimalType decimalType = (DecimalType) fooColumnHandles.get(i).getType();
+                                assertEquals(BigDecimal.valueOf(pulsarRecordCursor.getLong(i), decimalType.getScale()), fooFunctions.get("decimal").apply(count));
+                            }
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
                         } else {
                             if (PulsarInternalColumn.getInternalFieldsMap().containsKey(fooColumnHandles.get(i).getName())) {
                                 columnsSeen.add(fooColumnHandles.get(i).getName());
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java
index cd4fcaf0d1e..98b7d8b6f69 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java
@@ -26,6 +26,7 @@ import io.prestosql.spi.connector.ColumnMetadata;
 import io.prestosql.spi.connector.ConnectorContext;
 import io.prestosql.spi.type.Type;
 import io.prestosql.testing.TestingConnectorContext;
+import java.math.BigDecimal;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -102,6 +103,10 @@ public abstract class AbstractDecoderTester {
         decoderTestUtil.checkValue(decodedRow, handle, value);
     }
 
+    protected void checkValue(Map<DecoderColumnHandle, FieldValueProvider> decodedRow, DecoderColumnHandle handle, BigDecimal value) {
+        decoderTestUtil.checkValue(decodedRow, handle, value);
+    }
+
     protected Block getBlock(Map<DecoderColumnHandle, FieldValueProvider> decodedRow, DecoderColumnHandle handle) {
         FieldValueProvider provider = decodedRow.get(handle);
         assertNotNull(provider);
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java
index 115f3691c00..da6d92e5158 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.sql.presto.decoder;
 
+import java.math.BigDecimal;
 import lombok.Data;
 
 import java.util.List;
@@ -45,6 +46,10 @@ public class DecoderTestMessage {
     public int dateField;
     public TestRow rowField;
     public TestEnum enumField;
+    @org.apache.avro.reflect.AvroSchema("{ \"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 4, \"scale\": 2 }")
+    public BigDecimal decimalField;
+    @org.apache.avro.reflect.AvroSchema("{ \"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 30, \"scale\": 2 }")
+    public BigDecimal longDecimalField;
 
     public List<String> arrayField;
     public Map<String, Long> mapField;
@@ -62,7 +67,6 @@ public class DecoderTestMessage {
         public long longField;
     }
 
-
     public static class CompositeRow {
         public String stringField;
         public List<NestedRow> arrayField;
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestUtil.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestUtil.java
index 4c3c4a63447..496a6f061bf 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestUtil.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestUtil.java
@@ -23,11 +23,16 @@ import io.prestosql.decoder.DecoderColumnHandle;
 import io.prestosql.decoder.FieldValueProvider;
 import io.prestosql.spi.block.Block;
 import io.prestosql.spi.type.ArrayType;
+import io.prestosql.spi.type.DecimalType;
+import io.prestosql.spi.type.Decimals;
 import io.prestosql.spi.type.MapType;
 import io.prestosql.spi.type.RowType;
 import io.prestosql.spi.type.Type;
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.util.Map;
 
+import static io.prestosql.spi.type.UnscaledDecimal128Arithmetic.UNSCALED_DECIMAL_128_SLICE_LENGTH;
 import static io.prestosql.testing.TestingConnectorSession.SESSION;
 import static org.testng.Assert.*;
 
@@ -113,6 +118,21 @@ public abstract class DecoderTestUtil {
         assertEquals(provider.getBoolean(), value);
     }
 
+    public void checkValue(Map<DecoderColumnHandle, FieldValueProvider> decodedRow, DecoderColumnHandle handle, BigDecimal value) {
+        FieldValueProvider provider = decodedRow.get(handle);
+        DecimalType decimalType = (DecimalType) handle.getType();
+        BigDecimal actualDecimal;
+        if (decimalType.getFixedSize() == UNSCALED_DECIMAL_128_SLICE_LENGTH) {
+            Slice slice = provider.getSlice();
+            BigInteger bigInteger = Decimals.decodeUnscaledValue(slice);
+            actualDecimal = new BigDecimal(bigInteger, decimalType.getScale());
+        } else {
+            actualDecimal = BigDecimal.valueOf(provider.getLong(), decimalType.getScale());
+        }
+        assertNotNull(provider);
+        assertEquals(actualDecimal, value);
+    }
+
     public void checkIsNull(Map<DecoderColumnHandle, FieldValueProvider> decodedRow, DecoderColumnHandle handle) {
         FieldValueProvider provider = decodedRow.get(handle);
         assertNotNull(provider);
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
index 1cfbbb4fce5..7b270c7995b 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
@@ -25,11 +25,13 @@ import io.prestosql.decoder.FieldValueProvider;
 import io.prestosql.spi.PrestoException;
 import io.prestosql.spi.type.ArrayType;
 import io.prestosql.spi.type.BigintType;
+import io.prestosql.spi.type.DecimalType;
 import io.prestosql.spi.type.RowType;
 import io.prestosql.spi.type.StandardTypes;
 import io.prestosql.spi.type.Type;
 import io.prestosql.spi.type.TypeSignatureParameter;
 import io.prestosql.spi.type.VarcharType;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -87,6 +89,8 @@ public class TestAvroDecoder extends AbstractDecoderTester {
         message.longField = 222L;
         message.timestampField = System.currentTimeMillis();
         message.enumField = DecoderTestMessage.TestEnum.TEST_ENUM_1;
+        message.decimalField = BigDecimal.valueOf(2233, 2);
+        message.longDecimalField = new BigDecimal("1234567891234567891234567891.23");
 
         LocalTime now = LocalTime.now(ZoneId.systemDefault());
         message.timeField = now.toSecondOfDay() * 1000;
@@ -127,6 +131,13 @@ public class TestAvroDecoder extends AbstractDecoderTester {
                 "enumField", VARCHAR, false, false, "enumField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
         checkValue(decodedRow, enumFieldColumnHandle, message.enumField.toString());
 
+        PulsarColumnHandle decimalFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
+                "decimalField", DecimalType.createDecimalType(4, 2), false, false, "decimalField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
+        checkValue(decodedRow, decimalFieldColumnHandle, message.decimalField);
+
+        PulsarColumnHandle longDecimalFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
+                "longDecimalField", DecimalType.createDecimalType(30, 2), false, false, "longDecimalField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
+        checkValue(decodedRow, longDecimalFieldColumnHandle, message.longDecimalField);
     }
 
     @Test


[pulsar] 08/17: Fix topic closed normally but still call `closeFencedTopicForcefully`. (#15196) (#15202)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 433a48a22a744030bd50386284b27565d3e8abb8
Author: Shen Liu <li...@126.com>
AuthorDate: Thu Apr 21 11:54:34 2022 +0800

    Fix topic closed normally but still call `closeFencedTopicForcefully`. (#15196) (#15202)
    
    Co-authored-by: druidliu <dr...@tencent.com>
    
    Fixes #15196.
    
    ### Motivation
    
    If broker having conf `topicFencingTimeoutSeconds`>0, a topic is trigged closed and closed normally, `closeFencedTopicForcefully` should not be called.
    
    ### Modifications
    
    Cancel fenced topic monitoring task if topic close normally, which cancel running `closeFencedTopicForcefully`.
    
    ### Verifying this change
    
    - [ ] Make sure that the change passes the CI checks.
    
    This change added tests and can be verified as follows:
      - Add `org.apache.pulsar.broker.service.PersistentTopicTest#testTopicCloseFencingTimeout`
    
    ### Does this pull request potentially affect one of the following parts:
    
    *If `yes` was chosen, please highlight the changes*
    
      - Dependencies (does it add or upgrade a dependency): (yes / no)
      - The public API: (yes / no)
      - The schema: (yes / no / don't know)
      - The default values of configurations: (yes / no)
      - The wire protocol: (yes / no)
      - The rest endpoints: (yes / no)
      - The admin cli options: (yes / no)
      - Anything that affects deployment: (yes / no / don't know)
    
    ### Documentation
    
    Check the box below or label this PR directly.
    
    Need to update docs?
    
    - [ ] `doc-required`
    - [x] `no-need-doc`
    - [ ] `doc`
    - [ ] `doc-added`
    
    (cherry picked from commit e4a8de1eb9605e36060f0740a0097203a21e34ef)
---
 .../broker/service/persistent/PersistentTopic.java | 13 +++++++++----
 .../pulsar/broker/service/PersistentTopicTest.java | 22 ++++++++++++++++++++++
 2 files changed, 31 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 756622c78db..7249d4054fe 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1300,6 +1300,7 @@ public class PersistentTopic extends AbstractTopic
 
                                 unregisterTopicPolicyListener();
                                 log.info("[{}] Topic closed", topic);
+                                cancelFencedTopicMonitoringTask();
                                 closeFuture.complete(null);
                             })
                     .exceptionally(ex -> {
@@ -2982,6 +2983,13 @@ public class PersistentTopic extends AbstractTopic
         return true;
     }
 
+    private synchronized void cancelFencedTopicMonitoringTask() {
+        ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
+        if (monitoringTask != null && !monitoringTask.isDone()) {
+            monitoringTask.cancel(false);
+        }
+    }
+
     private synchronized void fence() {
         isFenced = true;
         ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
@@ -2996,10 +3004,7 @@ public class PersistentTopic extends AbstractTopic
 
     private synchronized void unfence() {
         isFenced = false;
-        ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
-        if (monitoringTask != null && !monitoringTask.isDone()) {
-            monitoringTask.cancel(false);
-        }
+        cancelFencedTopicMonitoringTask();
     }
 
     private void closeFencedTopicForcefully() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 067cbccf273..5cb945e436f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -2181,6 +2181,28 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         assertTrue((boolean) isClosingOrDeletingField.get(topic));
     }
 
+    @Test
+    public void testTopicCloseFencingTimeout() throws Exception {
+        pulsar.getConfiguration().setTopicFencingTimeoutSeconds(10);
+        Method fence = PersistentTopic.class.getDeclaredMethod("fence");
+        fence.setAccessible(true);
+        Field fencedTopicMonitoringTaskField = PersistentTopic.class.getDeclaredField("fencedTopicMonitoringTask");
+        fencedTopicMonitoringTaskField.setAccessible(true);
+
+        // create topic
+        PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();
+
+        // fence topic to init fencedTopicMonitoringTask
+        fence.invoke(topic);
+
+        // close topic
+        topic.close().get();
+        assertFalse(brokerService.getTopicReference(successTopicName).isPresent());
+        ScheduledFuture<?> fencedTopicMonitoringTask = (ScheduledFuture<?>) fencedTopicMonitoringTaskField.get(topic);
+        assertTrue(fencedTopicMonitoringTask.isDone());
+        assertTrue(fencedTopicMonitoringTask.isCancelled());
+    }
+
     @Test
     public void testGetDurableSubscription() throws Exception {
         ManagedLedger mockLedger = mock(ManagedLedger.class);


[pulsar] 05/17: [fix][txn] Fix potentially unfinishable future. (#15208)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit cc17736dff5770bed1e9224343920ba2173d11c3
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Tue Apr 19 14:31:20 2022 +0800

    [fix][txn] Fix potentially unfinishable future. (#15208)
    
    (cherry picked from commit 6aaabdb8acfc9ecf07b1f2799b9d8e2a980343a5)
---
 .../org/apache/pulsar/broker/TransactionMetadataStoreService.java   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index cd188397989..902546958c5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -243,7 +243,11 @@ public class TransactionMetadataStoreService {
                             LOG.debug("Handle tc client connect added into pending queue! tcId : {}", tcId.toString());
                         }
                     }
-                }));
+                })).exceptionally(ex -> {
+                    Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+                    completableFuture.completeExceptionally(realCause);
+                    return null;
+                });
             }
         });
         return completableFuture;


[pulsar] 14/17: [fix][broker] fix resource group does not report usage (#15292)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a462731bbfb24c519d4a5edf163e6dd789039129
Author: WangJialing <65...@users.noreply.github.com>
AuthorDate: Wed Apr 27 15:29:05 2022 +0800

    [fix][broker] fix resource group does not report usage (#15292)
    
    * fix resource group does not report usage
    
    * fix checkstyle
    
    * fix mistake
    
    Co-authored-by: wangjialing <wa...@cmss.chinamobile.com>
    (cherry picked from commit 4560737bf9c0a8f419c37f6e2cb3a230dcfd4352)
---
 .../broker/resourcegroup/ResourceQuotaCalculatorImpl.java      |  4 ++--
 .../broker/resourcegroup/ResourceQuotaCalculatorImplTest.java  | 10 ++++++++++
 2 files changed, 12 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImpl.java
index ca83cae91c5..5dc50f2a255 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImpl.java
@@ -108,7 +108,7 @@ public class ResourceQuotaCalculatorImpl implements ResourceQuotaCalculator {
         final float toleratedDriftPercentage = ResourceGroupService.UsageReportSuppressionTolerancePercentage;
         if (currentBytesUsed > 0) {
             long diff = abs(currentBytesUsed - lastReportedBytes);
-            float diffPercentage = (diff / currentBytesUsed) * 100;
+            float diffPercentage = (float) diff * 100 / lastReportedBytes;
             if (diffPercentage > toleratedDriftPercentage) {
                 return true;
             }
@@ -116,7 +116,7 @@ public class ResourceQuotaCalculatorImpl implements ResourceQuotaCalculator {
 
         if (currentMessagesUsed > 0) {
             long diff = abs(currentMessagesUsed - lastReportedMessages);
-            float diffPercentage = (diff / currentMessagesUsed) * 100;
+            float diffPercentage = (float) diff * 100 / lastReportedMessages;
             if (diffPercentage > toleratedDriftPercentage) {
                 return true;
             }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImplTest.java
index 1a98838bb45..af8615936cc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImplTest.java
@@ -112,5 +112,15 @@ public class ResourceQuotaCalculatorImplTest extends MockedPulsarServiceBaseTest
         Assert.assertTrue(newQuota == config);
     }
 
+    @Test
+    public void testNeedToReportLocalUsage() {
+        // If the percentage change (increase or decrease) in usage is more than 5% for
+        // either bytes or messages, send a report.
+        Assert.assertFalse(rqCalc.needToReportLocalUsage(1040, 1000, 104, 100, System.currentTimeMillis()));
+        Assert.assertFalse(rqCalc.needToReportLocalUsage(950, 1000, 95, 100, System.currentTimeMillis()));
+        Assert.assertTrue(rqCalc.needToReportLocalUsage(1060, 1000, 106, 100, System.currentTimeMillis()));
+        Assert.assertTrue(rqCalc.needToReportLocalUsage(940, 1000, 94, 100, System.currentTimeMillis()));
+    }
+
     private ResourceQuotaCalculatorImpl rqCalc;
 }
\ No newline at end of file


[pulsar] 01/17: Support shrink in ConcurrentLongHashMap (#14497)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 025500ae8cc93665b88e3b3bee8020b1dcf80711
Author: lin chen <15...@qq.com>
AuthorDate: Tue Mar 1 21:16:52 2022 +0800

    Support shrink in ConcurrentLongHashMap (#14497)
    
    (cherry picked from commit 297941964ed739e35ca68aa46d74410cf112b7bc)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |   7 +-
 .../broker/TransactionMetadataStoreService.java    |   5 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |  10 +-
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  23 +++-
 .../client/impl/TransactionMetaStoreHandler.java   |   5 +-
 .../TransactionCoordinatorClientImpl.java          |   6 +-
 .../util/collections/ConcurrentLongHashMap.java    | 139 ++++++++++++++++++---
 .../collections/ConcurrentLongHashMapTest.java     | 122 +++++++++++++++---
 8 files changed, 275 insertions(+), 42 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 8d5f0a6ed8c..62edcd5ca9d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -152,8 +152,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     protected Map<String, String> propertiesMap;
     protected final MetaStore store;
 
-    final ConcurrentLongHashMap<CompletableFuture<ReadHandle>> ledgerCache = new ConcurrentLongHashMap<>(
-            16 /* initial capacity */, 1 /* number of sections */);
+    final ConcurrentLongHashMap<CompletableFuture<ReadHandle>> ledgerCache =
+            ConcurrentLongHashMap.<CompletableFuture<ReadHandle>>newBuilder()
+                    .expectedItems(16) // initial capacity
+                    .concurrencyLevel(1) // number of sections
+                    .build();
     protected final NavigableMap<Long, LedgerInfo> ledgers = new ConcurrentSkipListMap<>();
     private volatile Stat ledgersStat;
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 7297c334c4c..cd188397989 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -107,8 +107,9 @@ public class TransactionMetadataStoreService {
         this.tbClient = tbClient;
         this.timeoutTrackerFactory = new TransactionTimeoutTrackerFactoryImpl(this, timer);
         this.transactionOpRetryTimer = timer;
-        this.tcLoadSemaphores = new ConcurrentLongHashMap<>();
-        this.pendingConnectRequests = new ConcurrentLongHashMap<>();
+        this.tcLoadSemaphores = ConcurrentLongHashMap.<Semaphore>newBuilder().build();
+        this.pendingConnectRequests =
+                ConcurrentLongHashMap.<ConcurrentLinkedDeque<CompletableFuture<Void>>>newBuilder().build();
         this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index d8255f6015d..d8a51f60d65 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -241,8 +241,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         ServiceConfiguration conf = pulsar.getConfiguration();
 
         // This maps are not heavily contended since most accesses are within the cnx thread
-        this.producers = new ConcurrentLongHashMap<>(8, 1);
-        this.consumers = new ConcurrentLongHashMap<>(8, 1);
+        this.producers = ConcurrentLongHashMap.<CompletableFuture<Producer>>newBuilder()
+                .expectedItems(8)
+                .concurrencyLevel(1)
+                .build();
+        this.consumers = ConcurrentLongHashMap.<CompletableFuture<Consumer>>newBuilder()
+                .expectedItems(8)
+                .concurrencyLevel(1)
+                .build();
         this.replicatorPrefix = conf.getReplicatorPrefix();
         this.maxNonPersistentPendingMessages = conf.getMaxConcurrentNonPersistentMessagePerConnection();
         this.proxyRoles = conf.getProxyRoles();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index cae1594219e..00b8e4f7561 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -110,13 +110,28 @@ public class ClientCnx extends PulsarHandler {
     private State state;
 
     private final ConcurrentLongHashMap<TimedCompletableFuture<? extends Object>> pendingRequests =
-        new ConcurrentLongHashMap<>(16, 1);
+            ConcurrentLongHashMap.<TimedCompletableFuture<? extends Object>>newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(1)
+                    .build();
     // LookupRequests that waiting in client side.
     private final Queue<Pair<Long, Pair<ByteBuf, TimedCompletableFuture<LookupDataResult>>>> waitingLookupRequests;
 
-    private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new ConcurrentLongHashMap<>(16, 1);
-    private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = new ConcurrentLongHashMap<>(16, 1);
-    private final ConcurrentLongHashMap<TransactionMetaStoreHandler> transactionMetaStoreHandlers = new ConcurrentLongHashMap<>(16, 1);
+    private final ConcurrentLongHashMap<ProducerImpl<?>> producers =
+            ConcurrentLongHashMap.<ProducerImpl<?>>newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(1)
+                    .build();
+    private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers =
+            ConcurrentLongHashMap.<ConsumerImpl<?>>newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(1)
+                    .build();
+    private final ConcurrentLongHashMap<TransactionMetaStoreHandler> transactionMetaStoreHandlers =
+            ConcurrentLongHashMap.<TransactionMetaStoreHandler>newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(1)
+                    .build();
 
     private final CompletableFuture<Void> connectionFuture = new CompletableFuture<Void>();
     private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new ConcurrentLinkedQueue<>();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index a30c2350908..5b91a1cd84b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -60,7 +60,10 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
     private final long transactionCoordinatorId;
     private final ConnectionHandler connectionHandler;
     private final ConcurrentLongHashMap<OpBase<?>> pendingRequests =
-        new ConcurrentLongHashMap<>(16, 1);
+            ConcurrentLongHashMap.<OpBase<?>>newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(1)
+                    .build();
     private final ConcurrentLinkedQueue<RequestTime> timeoutQueue;
 
     protected final Timer timer;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
index 8db80545ad2..e8baec784a7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
@@ -52,7 +52,11 @@ public class TransactionCoordinatorClientImpl implements TransactionCoordinatorC
 
     private final PulsarClientImpl pulsarClient;
     private TransactionMetaStoreHandler[] handlers;
-    private ConcurrentLongHashMap<TransactionMetaStoreHandler> handlerMap = new ConcurrentLongHashMap<>(16, 1);
+    private ConcurrentLongHashMap<TransactionMetaStoreHandler> handlerMap =
+            ConcurrentLongHashMap.<TransactionMetaStoreHandler>newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(1)
+                    .build();
     private final AtomicLong epoch = new AtomicLong(0);
 
     private static final AtomicReferenceFieldUpdater<TransactionCoordinatorClientImpl, State> STATE_UPDATER =
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
index cd285221bc8..a4779357a44 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
@@ -44,33 +44,112 @@ public class ConcurrentLongHashMap<V> {
     private static final Object EmptyValue = null;
     private static final Object DeletedValue = new Object();
 
-    private static final float MapFillFactor = 0.66f;
-
     private static final int DefaultExpectedItems = 256;
     private static final int DefaultConcurrencyLevel = 16;
 
+    private static final float DefaultMapFillFactor = 0.66f;
+    private static final float DefaultMapIdleFactor = 0.15f;
+
+    private static final float DefaultExpandFactor = 2;
+    private static final float DefaultShrinkFactor = 2;
+
+    private static final boolean DefaultAutoShrink = false;
+
+    public static <V> Builder<V> newBuilder() {
+        return new Builder<>();
+    }
+
+    /**
+     * Builder of ConcurrentLongHashMap.
+     */
+    public static class Builder<T> {
+        int expectedItems = DefaultExpectedItems;
+        int concurrencyLevel = DefaultConcurrencyLevel;
+        float mapFillFactor = DefaultMapFillFactor;
+        float mapIdleFactor = DefaultMapIdleFactor;
+        float expandFactor = DefaultExpandFactor;
+        float shrinkFactor = DefaultShrinkFactor;
+        boolean autoShrink = DefaultAutoShrink;
+
+        public Builder<T> expectedItems(int expectedItems) {
+            this.expectedItems = expectedItems;
+            return this;
+        }
+
+        public Builder<T> concurrencyLevel(int concurrencyLevel) {
+            this.concurrencyLevel = concurrencyLevel;
+            return this;
+        }
+
+        public Builder<T> mapFillFactor(float mapFillFactor) {
+            this.mapFillFactor = mapFillFactor;
+            return this;
+        }
+
+        public Builder<T> mapIdleFactor(float mapIdleFactor) {
+            this.mapIdleFactor = mapIdleFactor;
+            return this;
+        }
+
+        public Builder<T> expandFactor(float expandFactor) {
+            this.expandFactor = expandFactor;
+            return this;
+        }
+
+        public Builder<T> shrinkFactor(float shrinkFactor) {
+            this.shrinkFactor = shrinkFactor;
+            return this;
+        }
+
+        public Builder<T> autoShrink(boolean autoShrink) {
+            this.autoShrink = autoShrink;
+            return this;
+        }
+
+        public ConcurrentLongHashMap<T> build() {
+            return new ConcurrentLongHashMap<>(expectedItems, concurrencyLevel,
+                    mapFillFactor, mapIdleFactor, autoShrink, expandFactor, shrinkFactor);
+        }
+    }
+
     private final Section<V>[] sections;
 
+    @Deprecated
     public ConcurrentLongHashMap() {
         this(DefaultExpectedItems);
     }
 
+    @Deprecated
     public ConcurrentLongHashMap(int expectedItems) {
         this(expectedItems, DefaultConcurrencyLevel);
     }
 
+    @Deprecated
     public ConcurrentLongHashMap(int expectedItems, int concurrencyLevel) {
+        this(expectedItems, concurrencyLevel, DefaultMapFillFactor, DefaultMapIdleFactor,
+                DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
+    }
+
+    public ConcurrentLongHashMap(int expectedItems, int concurrencyLevel,
+                                 float mapFillFactor, float mapIdleFactor,
+                                 boolean autoShrink, float expandFactor, float shrinkFactor) {
         checkArgument(expectedItems > 0);
         checkArgument(concurrencyLevel > 0);
         checkArgument(expectedItems >= concurrencyLevel);
+        checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+        checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+        checkArgument(mapFillFactor > mapIdleFactor);
+        checkArgument(expandFactor > 1);
+        checkArgument(shrinkFactor > 1);
 
         int numSections = concurrencyLevel;
         int perSectionExpectedItems = expectedItems / numSections;
-        int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
+        int perSectionCapacity = (int) (perSectionExpectedItems / mapFillFactor);
         this.sections = (Section<V>[]) new Section[numSections];
 
         for (int i = 0; i < numSections; i++) {
-            sections[i] = new Section<>(perSectionCapacity);
+            sections[i] = new Section<>(perSectionCapacity, mapFillFactor, mapIdleFactor,
+                    autoShrink, expandFactor, shrinkFactor);
         }
     }
 
@@ -195,20 +274,35 @@ public class ConcurrentLongHashMap<V> {
         private volatile V[] values;
 
         private volatile int capacity;
+        private final int initCapacity;
         private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER =
                 AtomicIntegerFieldUpdater.newUpdater(Section.class, "size");
 
         private volatile int size;
         private int usedBuckets;
-        private int resizeThreshold;
-
-        Section(int capacity) {
+        private int resizeThresholdUp;
+        private int resizeThresholdBelow;
+        private final float mapFillFactor;
+        private final float mapIdleFactor;
+        private final float expandFactor;
+        private final float shrinkFactor;
+        private final boolean autoShrink;
+
+        Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink,
+                float expandFactor, float shrinkFactor) {
             this.capacity = alignToPowerOfTwo(capacity);
+            this.initCapacity = this.capacity;
             this.keys = new long[this.capacity];
             this.values = (V[]) new Object[this.capacity];
             this.size = 0;
             this.usedBuckets = 0;
-            this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+            this.autoShrink = autoShrink;
+            this.mapFillFactor = mapFillFactor;
+            this.mapIdleFactor = mapIdleFactor;
+            this.expandFactor = expandFactor;
+            this.shrinkFactor = shrinkFactor;
+            this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+            this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
         }
 
         V get(long key, int keyHash) {
@@ -322,9 +416,10 @@ public class ConcurrentLongHashMap<V> {
                     ++bucket;
                 }
             } finally {
-                if (usedBuckets >= resizeThreshold) {
+                if (usedBuckets > resizeThresholdUp) {
                     try {
-                        rehash();
+                        int newCapacity = alignToPowerOfTwo((int) (capacity * expandFactor));
+                        rehash(newCapacity);
                     } finally {
                         unlockWrite(stamp);
                     }
@@ -373,7 +468,20 @@ public class ConcurrentLongHashMap<V> {
                 }
 
             } finally {
-                unlockWrite(stamp);
+                if (autoShrink && size < resizeThresholdBelow) {
+                    try {
+                        int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
+                        int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
+                        if (newCapacity < capacity && newResizeThresholdUp > size) {
+                            // shrink the hashmap
+                            rehash(newCapacity);
+                        }
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
             }
         }
 
@@ -385,6 +493,9 @@ public class ConcurrentLongHashMap<V> {
                 Arrays.fill(values, EmptyValue);
                 this.size = 0;
                 this.usedBuckets = 0;
+                if (autoShrink) {
+                    rehash(initCapacity);
+                }
             } finally {
                 unlockWrite(stamp);
             }
@@ -439,9 +550,8 @@ public class ConcurrentLongHashMap<V> {
             }
         }
 
-        private void rehash() {
+        private void rehash(int newCapacity) {
             // Expand the hashmap
-            int newCapacity = capacity * 2;
             long[] newKeys = new long[newCapacity];
             V[] newValues = (V[]) new Object[newCapacity];
 
@@ -458,7 +568,8 @@ public class ConcurrentLongHashMap<V> {
             values = newValues;
             capacity = newCapacity;
             usedBuckets = size;
-            resizeThreshold = (int) (capacity * MapFillFactor);
+            resizeThresholdUp = (int) (capacity * mapFillFactor);
+            resizeThresholdBelow = (int) (capacity * mapIdleFactor);
         }
 
         private static <V> void insertKeyValueNoLock(long[] keys, V[] values, long key, V value) {
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
index 14d8395ae8c..6cf126cf2ff 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
@@ -48,21 +48,29 @@ public class ConcurrentLongHashMapTest {
     @Test
     public void testConstructor() {
         try {
-            new ConcurrentLongHashMap<String>(0);
+            ConcurrentLongHashMap.<String>newBuilder()
+                    .expectedItems(0)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            new ConcurrentLongHashMap<String>(16, 0);
+            ConcurrentLongHashMap.<String>newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(0)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            new ConcurrentLongHashMap<String>(4, 8);
+            ConcurrentLongHashMap.<String>newBuilder()
+                    .expectedItems(4)
+                    .concurrencyLevel(8)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
@@ -71,7 +79,9 @@ public class ConcurrentLongHashMapTest {
 
     @Test
     public void simpleInsertions() {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16);
+        ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(16)
+                .build();
 
         assertTrue(map.isEmpty());
         assertNull(map.put(1, "one"));
@@ -97,9 +107,64 @@ public class ConcurrentLongHashMapTest {
         assertEquals(map.size(), 3);
     }
 
+    @Test
+    public void testClear() {
+        ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertNull(map.put(1, "v1"));
+        assertNull(map.put(2, "v2"));
+        assertNull(map.put(3, "v3"));
+
+        assertTrue(map.capacity() == 8);
+        map.clear();
+        assertTrue(map.capacity() == 4);
+    }
+
+    @Test
+    public void testExpandAndShrink() {
+        ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertNull(map.put(1, "v1"));
+        assertNull(map.put(2, "v2"));
+        assertNull(map.put(3, "v3"));
+
+        // expand hashmap
+        assertTrue(map.capacity() == 8);
+
+        assertTrue(map.remove(1, "v1"));
+        // not shrink
+        assertTrue(map.capacity() == 8);
+        assertTrue(map.remove(2, "v2"));
+        // shrink hashmap
+        assertTrue(map.capacity() == 4);
+
+        // expand hashmap
+        assertNull(map.put(4, "v4"));
+        assertNull(map.put(5, "v5"));
+        assertTrue(map.capacity() == 8);
+
+        //verify that the map does not keep shrinking at every remove() operation
+        assertNull(map.put(6, "v6"));
+        assertTrue(map.remove(6, "v6"));
+        assertTrue(map.capacity() == 8);
+    }
+
     @Test
     public void testRemove() {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+        ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+                .build();
 
         assertTrue(map.isEmpty());
         assertNull(map.put(1, "one"));
@@ -115,7 +180,10 @@ public class ConcurrentLongHashMapTest {
 
     @Test
     public void testNegativeUsedBucketCount() {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16, 1);
+        ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
 
         map.put(0, "zero");
         assertEquals(1, map.getUsedBucketCount());
@@ -130,7 +198,10 @@ public class ConcurrentLongHashMapTest {
     @Test
     public void testRehashing() {
         int n = 16;
-        ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(n / 2, 1);
+        ConcurrentLongHashMap<Integer> map = ConcurrentLongHashMap.<Integer>newBuilder()
+                .expectedItems(n / 2)
+                .concurrencyLevel(1)
+                .build();
         assertEquals(map.capacity(), n);
         assertEquals(map.size(), 0);
 
@@ -145,7 +216,10 @@ public class ConcurrentLongHashMapTest {
     @Test
     public void testRehashingWithDeletes() {
         int n = 16;
-        ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(n / 2, 1);
+        ConcurrentLongHashMap<Integer> map = ConcurrentLongHashMap.<Integer>newBuilder()
+                .expectedItems(n / 2)
+                .concurrencyLevel(1)
+                .build();
         assertEquals(map.capacity(), n);
         assertEquals(map.size(), 0);
 
@@ -167,7 +241,8 @@ public class ConcurrentLongHashMapTest {
 
     @Test
     public void concurrentInsertions() throws Throwable {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+        ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+                .build();
         @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -201,7 +276,8 @@ public class ConcurrentLongHashMapTest {
 
     @Test
     public void concurrentInsertionsAndReads() throws Throwable {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+        ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+                .build();
         @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -235,7 +311,10 @@ public class ConcurrentLongHashMapTest {
 
     @Test
     public void stressConcurrentInsertionsAndReads() throws Throwable {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(4, 1);
+        ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(4)
+                .concurrencyLevel(1)
+                .build();
         @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newCachedThreadPool();
         final int writeThreads = 16;
@@ -286,7 +365,8 @@ public class ConcurrentLongHashMapTest {
 
     @Test
     public void testIteration() {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+        ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+                .build();
 
         assertEquals(map.keys(), Collections.emptyList());
         assertEquals(map.values(), Collections.emptyList());
@@ -330,7 +410,10 @@ public class ConcurrentLongHashMapTest {
     @Test
     public void testHashConflictWithDeletion() {
         final int Buckets = 16;
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(Buckets, 1);
+        ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(Buckets)
+                .concurrencyLevel(1)
+                .build();
 
         // Pick 2 keys that fall into the same bucket
         long key1 = 1;
@@ -363,7 +446,8 @@ public class ConcurrentLongHashMapTest {
 
     @Test
     public void testPutIfAbsent() {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+        ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+                .build();
         assertNull(map.putIfAbsent(1, "one"));
         assertEquals(map.get(1), "one");
 
@@ -373,7 +457,10 @@ public class ConcurrentLongHashMapTest {
 
     @Test
     public void testComputeIfAbsent() {
-        ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(16, 1);
+        ConcurrentLongHashMap<Integer> map = ConcurrentLongHashMap.<Integer>newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
         AtomicInteger counter = new AtomicInteger();
         LongFunction<Integer> provider = key -> counter.getAndIncrement();
 
@@ -395,7 +482,10 @@ public class ConcurrentLongHashMapTest {
     static final int N = 100_000;
 
     public void benchConcurrentLongHashMap() throws Exception {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(N, 1);
+        ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(N)
+                .concurrencyLevel(1)
+                .build();
 
         for (long i = 0; i < Iterations; i++) {
             for (int j = 0; j < N; j++) {


[pulsar] 17/17: [improve][broker] Support shrink for ConcurrentSortedLongPairSet (#15354)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2fbdb1747186b4f319704178e1cd5d2685e7b00a
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Apr 28 20:30:30 2022 +0800

    [improve][broker] Support shrink for ConcurrentSortedLongPairSet  (#15354)
    
    (cherry picked from commit 24d4d76bb9e39010bae3f4cbd8ddba6422570b4e)
---
 .../persistent/MessageRedeliveryController.java    |  2 +-
 .../util/collections/ConcurrentLongPairSet.java    | 53 ++++++++++++----------
 .../collections/ConcurrentSortedLongPairSet.java   | 27 +++++++++--
 .../common/util/collections/LongPairSet.java       |  7 +++
 .../ConcurrentSortedLongPairSetTest.java           | 43 ++++++++++++++++++
 5 files changed, 105 insertions(+), 27 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
index c7f96fffcef..46fa1b2b050 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
@@ -36,7 +36,7 @@ public class MessageRedeliveryController {
     private final ConcurrentLongLongPairHashMap hashesToBeBlocked;
 
     public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
-        this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
+        this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2, true);
         this.hashesToBeBlocked = allowOutOfOrderDelivery
                 ? null
                 : ConcurrentLongLongPairHashMap
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
index 66ecaee4bfa..7b5e75813fa 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
@@ -175,6 +175,7 @@ public class ConcurrentLongPairSet implements LongPairSet {
         return size;
     }
 
+    @Override
     public long capacity() {
         long capacity = 0;
         for (int i = 0; i < sections.length; i++) {
@@ -447,20 +448,7 @@ public class ConcurrentLongPairSet implements LongPairSet {
                     bucket = (bucket + 2) & (table.length - 1);
                 }
             } finally {
-                if (autoShrink && size < resizeThresholdBelow) {
-                    try {
-                        int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
-                        int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
-                        if (newCapacity < capacity && newResizeThresholdUp > size) {
-                            // shrink the hashmap
-                            rehash(newCapacity);
-                        }
-                    } finally {
-                        unlockWrite(stamp);
-                    }
-                } else {
-                    unlockWrite(stamp);
-                }
+                tryShrinkThenUnlock(stamp);
             }
         }
 
@@ -469,23 +457,42 @@ public class ConcurrentLongPairSet implements LongPairSet {
             int removedItems = 0;
 
             // Go through all the buckets for this section
-            for (int bucket = 0; bucket < table.length; bucket += 2) {
-                long storedItem1 = table[bucket];
-                long storedItem2 = table[bucket + 1];
-
-                if (storedItem1 != DeletedItem && storedItem1 != EmptyItem) {
-                    if (filter.test(storedItem1, storedItem2)) {
-                        long h = hash(storedItem1, storedItem2);
-                        if (remove(storedItem1, storedItem2, (int) h)) {
+            long stamp = writeLock();
+            try {
+                for (int bucket = 0; bucket < table.length; bucket += 2) {
+                    long storedItem1 = table[bucket];
+                    long storedItem2 = table[bucket + 1];
+                    if (storedItem1 != DeletedItem && storedItem1 != EmptyItem) {
+                        if (filter.test(storedItem1, storedItem2)) {
+                            SIZE_UPDATER.decrementAndGet(this);
+                            cleanBucket(bucket);
                             removedItems++;
                         }
                     }
                 }
+            } finally {
+                tryShrinkThenUnlock(stamp);
             }
-
             return removedItems;
         }
 
+        private void tryShrinkThenUnlock(long stamp) {
+            if (autoShrink && size < resizeThresholdBelow) {
+                try {
+                    int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
+                    int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
+                    if (newCapacity < capacity && newResizeThresholdUp > size) {
+                        // shrink the hashmap
+                        rehash(newCapacity);
+                    }
+                } finally {
+                    unlockWrite(stamp);
+                }
+            } else {
+                unlockWrite(stamp);
+            }
+        }
+
         private void cleanBucket(int bucket) {
             int nextInArray = (bucket + 2) & (table.length - 1);
             if (table[nextInArray] == EmptyItem) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
index e4cb668fc92..06efd0490d1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
@@ -48,14 +48,15 @@ import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet.LongPairC
 public class ConcurrentSortedLongPairSet implements LongPairSet {
 
     protected final NavigableMap<Long, ConcurrentLongPairSet> longPairSets = new ConcurrentSkipListMap<>();
-    private int expectedItems;
-    private int concurrencyLevel;
+    private final int expectedItems;
+    private final int concurrencyLevel;
     /**
      * If {@link #longPairSets} adds and removes the item-set frequently then it allocates and removes
      * {@link ConcurrentLongPairSet} for the same item multiple times which can lead to gc-puases. To avoid such
      * situation, avoid removing empty LogPairSet until it reaches max limit.
      */
-    private int maxAllowedSetOnRemove;
+    private final int maxAllowedSetOnRemove;
+    private final boolean autoShrink;
     private static final int DEFAULT_MAX_ALLOWED_SET_ON_REMOVE = 10;
 
     public ConcurrentSortedLongPairSet() {
@@ -70,10 +71,20 @@ public class ConcurrentSortedLongPairSet implements LongPairSet {
         this(expectedItems, concurrencyLevel, DEFAULT_MAX_ALLOWED_SET_ON_REMOVE);
     }
 
+    public ConcurrentSortedLongPairSet(int expectedItems, int concurrencyLevel, boolean autoShrink) {
+        this(expectedItems, concurrencyLevel, DEFAULT_MAX_ALLOWED_SET_ON_REMOVE, autoShrink);
+    }
+
     public ConcurrentSortedLongPairSet(int expectedItems, int concurrencyLevel, int maxAllowedSetOnRemove) {
+        this(expectedItems, concurrencyLevel, maxAllowedSetOnRemove, false);
+    }
+
+    public ConcurrentSortedLongPairSet(int expectedItems, int concurrencyLevel, int maxAllowedSetOnRemove,
+                                       boolean autoShrink) {
         this.expectedItems = expectedItems;
         this.concurrencyLevel = concurrencyLevel;
         this.maxAllowedSetOnRemove = maxAllowedSetOnRemove;
+        this.autoShrink = autoShrink;
     }
 
     @Override
@@ -82,6 +93,7 @@ public class ConcurrentSortedLongPairSet implements LongPairSet {
                 (key) -> ConcurrentLongPairSet.newBuilder()
                         .expectedItems(expectedItems)
                         .concurrencyLevel(concurrencyLevel)
+                        .autoShrink(autoShrink)
                         .build());
         return messagesToReplay.add(item1, item2);
     }
@@ -194,6 +206,15 @@ public class ConcurrentSortedLongPairSet implements LongPairSet {
         return size.get();
     }
 
+    @Override
+    public long capacity() {
+        AtomicLong capacity = new AtomicLong(0);
+        longPairSets.forEach((item1, longPairSet) -> {
+            capacity.getAndAdd(longPairSet.capacity());
+        });
+        return capacity.get();
+    }
+
     @Override
     public boolean contains(long item1, long item2) {
         ConcurrentLongPairSet longPairSet = longPairSets.get(item1);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairSet.java
index 32de7e4c232..f27b994f777 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairSet.java
@@ -107,6 +107,13 @@ public interface LongPairSet {
      */
     long size();
 
+    /**
+     * Returns capacity of the set.
+     *
+     * @return
+     */
+    long capacity();
+
     /**
      * Checks if given (item1,item2) composite value exists into set.
      *
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java
index fcb9884a795..62dfa21dc81 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
+import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.List;
@@ -181,6 +182,20 @@ public class ConcurrentSortedLongPairSetTest {
         values = new ArrayList<>(set.items());
         values.sort(null);
         assertEquals(values, Lists.newArrayList(new LongPair(6, 6), new LongPair(7, 7)));
+
+        set = new ConcurrentSortedLongPairSet(128, 2, true);
+        set.add(2, 2);
+        set.add(1, 3);
+        set.add(3, 1);
+        set.add(2, 1);
+        set.add(3, 2);
+        set.add(1, 2);
+        set.add(1, 1);
+        removeItems = set.removeIf((ledgerId, entryId) -> {
+            return ComparisonChain.start().compare(ledgerId, 1).compare(entryId, 3)
+                    .result() <= 0;
+        });
+        assertEquals(removeItems, 3);
     }
 
     @Test
@@ -245,4 +260,32 @@ public class ConcurrentSortedLongPairSetTest {
         set.add(1, 1);
         assertFalse(set.isEmpty());
     }
+
+    @Test
+    public void testShrink() {
+        LongPairSet set = new ConcurrentSortedLongPairSet(2, 1, true);
+        set.add(0, 0);
+        assertTrue(set.capacity() == 4);
+        set.add(0, 1);
+        assertTrue(set.capacity() == 4);
+        set.add(1, 1);
+        assertTrue(set.capacity() == 8);
+        set.add(1, 2);
+        assertTrue(set.capacity() == 8);
+        set.add(1, 3);
+        set.add(1, 4);
+        set.add(1, 5);
+        assertTrue(set.capacity() == 12);
+        set.remove(1, 5);
+        // not shrink
+        assertTrue(set.capacity() == 12);
+        set.remove(1, 4);
+        // the internal map does not keep shrinking at every remove() operation
+        assertTrue(set.capacity() == 12);
+        set.remove(1, 3);
+        set.remove(1, 2);
+        set.remove(1, 1);
+        // shrink
+        assertTrue(set.capacity() == 8);
+    }
 }


[pulsar] 03/17: support shrink for map or set (#14663)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f256491296907b575c62137327c10a1b49ccb3a7
Author: LinChen <15...@qq.com>
AuthorDate: Mon Mar 14 23:23:47 2022 +0800

    support shrink for map or set (#14663)
    
    * support shrink for map or set
    
    * check style
    
    * check style
    
    (cherry picked from commit 1d10dff757ac7b9a203c14d2085a480495fb141b)
---
 .../mledger/impl/ManagedLedgerOfflineBacklog.java  |   3 +-
 .../broker/loadbalance/impl/LoadManagerShared.java |  20 ++-
 .../loadbalance/impl/ModularLoadManagerImpl.java   |  18 ++-
 .../loadbalance/impl/SimpleLoadManagerImpl.java    |  18 ++-
 .../pulsar/broker/namespace/NamespaceService.java  |  13 +-
 .../org/apache/pulsar/broker/rest/TopicsBase.java  |   3 +-
 .../pulsar/broker/service/BrokerService.java       |  42 ++++--
 .../service/nonpersistent/NonPersistentTopic.java  |  12 +-
 .../service/persistent/MessageDeduplication.java   |  12 +-
 .../broker/service/persistent/PersistentTopic.java |  20 ++-
 .../broker/stats/ClusterReplicationMetrics.java    |   3 +-
 .../AntiAffinityNamespaceGroupTest.java            |  15 ++-
 .../loadbalance/impl/LoadManagerSharedTest.java    |  13 +-
 .../pulsar/broker/service/PersistentTopicTest.java |  24 +++-
 .../apache/pulsar/client/impl/ConsumerBase.java    |   3 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   3 +-
 .../client/impl/PartitionedProducerImpl.java       |   3 +-
 .../apache/pulsar/client/impl/ProducerBase.java    |   3 +-
 .../impl/AcknowledgementsGroupingTrackerTest.java  |   3 +-
 .../util/collections/ConcurrentLongPairSet.java    | 148 +++++++++++++++++++--
 .../util/collections/ConcurrentOpenHashMap.java    | 140 +++++++++++++++++--
 .../util/collections/ConcurrentOpenHashSet.java    | 140 +++++++++++++++++--
 .../collections/ConcurrentSortedLongPairSet.java   |   5 +-
 .../collections/ConcurrentLongPairSetTest.java     | 111 +++++++++++++---
 .../collections/ConcurrentOpenHashMapTest.java     | 125 ++++++++++++++---
 .../collections/ConcurrentOpenHashSetTest.java     |  73 +++++++++-
 .../pulsar/sql/presto/PulsarRecordCursor.java      |   3 +-
 .../apache/pulsar/websocket/WebSocketService.java  |  23 +++-
 .../apache/pulsar/websocket/stats/ProxyStats.java  |   4 +-
 29 files changed, 860 insertions(+), 143 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java
index e00dd47a739..d258c1ca339 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java
@@ -220,7 +220,8 @@ public class ManagedLedgerOfflineBacklog {
         BookKeeper bk = factory.getBookKeeper();
         final CountDownLatch allCursorsCounter = new CountDownLatch(1);
         final long errorInReadingCursor = -1;
-        ConcurrentOpenHashMap<String, Long> ledgerRetryMap = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<String, Long> ledgerRetryMap =
+                ConcurrentOpenHashMap.<String, Long>newBuilder().build();
 
         final MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ledgers.lastEntry().getValue();
         final PositionImpl lastLedgerPosition = new PositionImpl(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index 74165c78dfa..20b14a9d220 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -190,7 +190,9 @@ public class LoadManagerShared {
         bundles.forEach(bundleName -> {
             final String namespaceName = getNamespaceNameFromBundleName(bundleName);
             final String bundleRange = getBundleRangeFromBundleName(bundleName);
-            target.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).add(bundleRange);
+            target.computeIfAbsent(namespaceName,
+                    k -> ConcurrentOpenHashSet.<String>newBuilder().build())
+                    .add(bundleRange);
         });
     }
 
@@ -263,8 +265,12 @@ public class LoadManagerShared {
 
         for (final String broker : candidates) {
             int bundles = (int) brokerToNamespaceToBundleRange
-                    .computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>())
-                    .computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).size();
+                    .computeIfAbsent(broker,
+                            k -> ConcurrentOpenHashMap.<String,
+                                    ConcurrentOpenHashSet<String>>newBuilder().build())
+                    .computeIfAbsent(namespaceName,
+                            k -> ConcurrentOpenHashSet.<String>newBuilder().build())
+                    .size();
             leastBundles = Math.min(leastBundles, bundles);
             if (leastBundles == 0) {
                 break;
@@ -276,8 +282,12 @@ public class LoadManagerShared {
 
         final int finalLeastBundles = leastBundles;
         candidates.removeIf(
-                broker -> brokerToNamespaceToBundleRange.computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>())
-                        .computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).size() > finalLeastBundles);
+                broker -> brokerToNamespaceToBundleRange.computeIfAbsent(broker,
+                        k -> ConcurrentOpenHashMap.<String,
+                                ConcurrentOpenHashSet<String>>newBuilder().build())
+                        .computeIfAbsent(namespaceName,
+                                k -> ConcurrentOpenHashSet.<String>newBuilder().build())
+                        .size() > finalLeastBundles);
     }
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 7bcfcab0e49..68833199bf4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -201,7 +201,10 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
      */
     public ModularLoadManagerImpl() {
         brokerCandidateCache = new HashSet<>();
-        brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
+        brokerToNamespaceToBundleRange =
+                ConcurrentOpenHashMap.<String,
+                        ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder()
+                        .build();
         defaultStats = new NamespaceBundleStats();
         filterPipeline = new ArrayList<>();
         loadData = new LoadData();
@@ -547,7 +550,10 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
             brokerData.getTimeAverageData().reset(statsMap.keySet(), bundleData, defaultStats);
             final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
                     brokerToNamespaceToBundleRange
-                            .computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>());
+                            .computeIfAbsent(broker, k ->
+                                    ConcurrentOpenHashMap.<String,
+                                            ConcurrentOpenHashSet<String>>newBuilder()
+                                            .build());
             synchronized (namespaceToBundleRange) {
                 namespaceToBundleRange.clear();
                 LoadManagerShared.fillNamespaceToBundlesMap(statsMap.keySet(), namespaceToBundleRange);
@@ -830,9 +836,13 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
                 final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
                 final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
                         brokerToNamespaceToBundleRange
-                                .computeIfAbsent(broker.get(), k -> new ConcurrentOpenHashMap<>());
+                                .computeIfAbsent(broker.get(),
+                                        k -> ConcurrentOpenHashMap.<String,
+                                                ConcurrentOpenHashSet<String>>newBuilder()
+                                                .build());
                 synchronized (namespaceToBundleRange) {
-                    namespaceToBundleRange.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>())
+                    namespaceToBundleRange.computeIfAbsent(namespaceName,
+                            k -> ConcurrentOpenHashSet.<String>newBuilder().build())
                             .add(bundleRange);
                 }
                 return broker;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index e1829e68aed..212d299953e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -200,7 +200,10 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
         bundleLossesCache = new HashSet<>();
         brokerCandidateCache = new HashSet<>();
         availableBrokersCache = new HashSet<>();
-        brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
+        brokerToNamespaceToBundleRange =
+                ConcurrentOpenHashMap.<String,
+                        ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder()
+                        .build();
         this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
             @Override
             public boolean isEnablePersistentTopics(String brokerUrl) {
@@ -833,8 +836,12 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
                 // same broker.
                 brokerToNamespaceToBundleRange
                         .computeIfAbsent(selectedRU.getResourceId().replace("http://", ""),
-                                k -> new ConcurrentOpenHashMap<>())
-                        .computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).add(bundleRange);
+                                k -> ConcurrentOpenHashMap.<String,
+                                        ConcurrentOpenHashSet<String>>newBuilder()
+                                        .build())
+                        .computeIfAbsent(namespaceName, k ->
+                                ConcurrentOpenHashSet.<String>newBuilder().build())
+                        .add(bundleRange);
                 ranking.addPreAllocatedServiceUnit(serviceUnitId, quota);
                 resourceUnitRankings.put(selectedRU, ranking);
             }
@@ -1252,7 +1259,10 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
             final Set<String> preallocatedBundles = resourceUnitRankings.get(resourceUnit).getPreAllocatedBundles();
             final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
                     brokerToNamespaceToBundleRange
-                            .computeIfAbsent(broker.replace("http://", ""), k -> new ConcurrentOpenHashMap<>());
+                            .computeIfAbsent(broker.replace("http://", ""),
+                                    k -> ConcurrentOpenHashMap.<String,
+                                            ConcurrentOpenHashSet<String>>newBuilder()
+                                            .build());
             namespaceToBundleRange.clear();
             LoadManagerShared.fillNamespaceToBundlesMap(loadedBundles, namespaceToBundleRange);
             LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundles, namespaceToBundleRange);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index e576e864467..25d935d8355 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -166,7 +166,8 @@ public class NamespaceService implements AutoCloseable {
         this.loadManager = pulsar.getLoadManager();
         this.bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
         this.ownershipCache = new OwnershipCache(pulsar, bundleFactory, this);
-        this.namespaceClients = new ConcurrentOpenHashMap<>();
+        this.namespaceClients =
+                ConcurrentOpenHashMap.<ClusterDataImpl, PulsarClientImpl>newBuilder().build();
         this.bundleOwnershipListeners = new CopyOnWriteArrayList<>();
         this.localBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
         this.localPoliciesCache = pulsar.getLocalMetadataStore().getMetadataCache(LocalPolicies.class);
@@ -356,9 +357,15 @@ public class NamespaceService implements AutoCloseable {
     }
 
     private final ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>>
-            findingBundlesAuthoritative = new ConcurrentOpenHashMap<>();
+            findingBundlesAuthoritative =
+            ConcurrentOpenHashMap.<NamespaceBundle,
+                    CompletableFuture<Optional<LookupResult>>>newBuilder()
+                    .build();
     private final ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>>
-            findingBundlesNotAuthoritative = new ConcurrentOpenHashMap<>();
+            findingBundlesNotAuthoritative =
+            ConcurrentOpenHashMap.<NamespaceBundle,
+                    CompletableFuture<Optional<LookupResult>>>newBuilder()
+                    .build();
 
     /**
      * Main internal method to lookup and setup ownership of service unit to a broker.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
index f89abf9bea3..770d77794d5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
@@ -431,7 +431,8 @@ public class TopicsBase extends PersistentTopicsBase {
                             partitionedTopicName, result.getLookupData());
                 }
                 pulsar().getBrokerService().getOwningTopics().computeIfAbsent(partitionedTopicName
-                                .getPartitionedTopicName(), (key) -> new ConcurrentOpenHashSet<Integer>())
+                                .getPartitionedTopicName(),
+                        (key) -> ConcurrentOpenHashSet.<Integer>newBuilder().build())
                         .add(partitionedTopicName.getPartitionIndex());
                 completeLookup(Pair.of(Collections.emptyList(), false), redirectAddresses, future);
             } else {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index a4b745d7b65..a9f9c7fe090 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -277,17 +277,28 @@ public class BrokerService implements Closeable {
         this.preciseTopicPublishRateLimitingEnable =
                 pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
         this.managedLedgerFactory = pulsar.getManagedLedgerFactory();
-        this.topics = new ConcurrentOpenHashMap<>();
-        this.replicationClients = new ConcurrentOpenHashMap<>();
-        this.clusterAdmins = new ConcurrentOpenHashMap<>();
+        this.topics =
+                ConcurrentOpenHashMap.<String, CompletableFuture<Optional<Topic>>>newBuilder()
+                .build();
+        this.replicationClients =
+                ConcurrentOpenHashMap.<String, PulsarClient>newBuilder().build();
+        this.clusterAdmins =
+                ConcurrentOpenHashMap.<String, PulsarAdmin>newBuilder().build();
         this.keepAliveIntervalSeconds = pulsar.getConfiguration().getKeepAliveIntervalSeconds();
-        this.configRegisteredListeners = new ConcurrentOpenHashMap<>();
+        this.configRegisteredListeners =
+                ConcurrentOpenHashMap.<String, Consumer<?>>newBuilder().build();
         this.pendingTopicLoadingQueue = Queues.newConcurrentLinkedQueue();
 
-        this.multiLayerTopicsMap = new ConcurrentOpenHashMap<>();
-        this.owningTopics = new ConcurrentOpenHashMap<>();
+        this.multiLayerTopicsMap = ConcurrentOpenHashMap.<String,
+                ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>>>newBuilder()
+                .build();
+        this.owningTopics = ConcurrentOpenHashMap.<String,
+                ConcurrentOpenHashSet<Integer>>newBuilder()
+                .build();
         this.pulsarStats = new PulsarStats(pulsar);
-        this.offlineTopicStatCache = new ConcurrentOpenHashMap<>();
+        this.offlineTopicStatCache =
+                ConcurrentOpenHashMap.<TopicName,
+                        PersistentOfflineTopicStats>newBuilder().build();
 
         this.topicOrderedExecutor = OrderedScheduler.newSchedulerBuilder()
                 .numThreads(pulsar.getConfiguration().getNumWorkerThreadsForNonPersistentTopic())
@@ -319,7 +330,8 @@ public class BrokerService implements Closeable {
         this.backlogQuotaChecker = Executors
                 .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));
         this.authenticationService = new AuthenticationService(pulsar.getConfiguration());
-        this.blockedDispatchers = new ConcurrentOpenHashSet<>();
+        this.blockedDispatchers =
+                ConcurrentOpenHashSet.<PersistentDispatcherMultipleConsumers>newBuilder().build();
         // update dynamic configuration and register-listener
         updateConfigurationAndRegisterListeners();
         this.lookupRequestSemaphore = new AtomicReference<Semaphore>(
@@ -1532,8 +1544,12 @@ public class BrokerService implements Closeable {
                         synchronized (multiLayerTopicsMap) {
                             String serviceUnit = namespaceBundle.toString();
                             multiLayerTopicsMap //
-                                    .computeIfAbsent(topicName.getNamespace(), k -> new ConcurrentOpenHashMap<>()) //
-                                    .computeIfAbsent(serviceUnit, k -> new ConcurrentOpenHashMap<>()) //
+                                    .computeIfAbsent(topicName.getNamespace(),
+                                            k -> ConcurrentOpenHashMap.<String,
+                                                    ConcurrentOpenHashMap<String, Topic>>newBuilder()
+                                                    .build()) //
+                                    .computeIfAbsent(serviceUnit,
+                                            k -> ConcurrentOpenHashMap.<String, Topic>newBuilder().build()) //
                                     .put(topicName.toString(), topic);
                         }
                     }
@@ -2311,7 +2327,8 @@ public class BrokerService implements Closeable {
     }
 
     private static ConcurrentOpenHashMap<String, ConfigField> prepareDynamicConfigurationMap() {
-        ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap =
+                ConcurrentOpenHashMap.<String, ConfigField>newBuilder().build();
         for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
             if (field != null && field.isAnnotationPresent(FieldContext.class)) {
                 field.setAccessible(true);
@@ -2324,7 +2341,8 @@ public class BrokerService implements Closeable {
     }
 
     private ConcurrentOpenHashMap<String, Object> getRuntimeConfigurationMap() {
-        ConcurrentOpenHashMap<String, Object> runtimeConfigurationMap = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<String, Object> runtimeConfigurationMap =
+                ConcurrentOpenHashMap.<String, Object>newBuilder().build();
         for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
             if (field != null && field.isAnnotationPresent(FieldContext.class)) {
                 field.setAccessible(true);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index b9a85ab97c5..c6ea96179d6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -138,8 +138,16 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
     public NonPersistentTopic(String topic, BrokerService brokerService) {
         super(topic, brokerService);
 
-        this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
-        this.replicators = new ConcurrentOpenHashMap<>(16, 1);
+        this.subscriptions =
+                ConcurrentOpenHashMap.<String, NonPersistentSubscription>newBuilder()
+                        .expectedItems(16)
+                        .concurrencyLevel(1)
+                        .build();
+        this.replicators =
+                ConcurrentOpenHashMap.<String, NonPersistentReplicator>newBuilder()
+                        .expectedItems(16)
+                        .concurrencyLevel(1)
+                        .build();
         this.isFenced = false;
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 9913ddf91cf..bafe93a5ffe 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -101,12 +101,20 @@ public class MessageDeduplication {
     // Map that contains the highest sequenceId that have been sent by each producers. The map will be updated before
     // the messages are persisted
     @VisibleForTesting
-    final ConcurrentOpenHashMap<String, Long> highestSequencedPushed = new ConcurrentOpenHashMap<>(16, 1);
+    final ConcurrentOpenHashMap<String, Long> highestSequencedPushed =
+            ConcurrentOpenHashMap.<String, Long>newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(1)
+                    .build();
 
     // Map that contains the highest sequenceId that have been persistent by each producers. The map will be updated
     // after the messages are persisted
     @VisibleForTesting
-    final ConcurrentOpenHashMap<String, Long> highestSequencedPersisted = new ConcurrentOpenHashMap<>(16, 1);
+    final ConcurrentOpenHashMap<String, Long> highestSequencedPersisted =
+            ConcurrentOpenHashMap.<String, Long>newBuilder()
+            .expectedItems(16)
+            .concurrencyLevel(1)
+            .build();
 
     // Number of persisted entries after which to store a snapshot of the sequence ids map
     private final int snapshotInterval;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 333438849e6..756622c78db 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -255,8 +255,14 @@ public class PersistentTopic extends AbstractTopic
     public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) {
         super(topic, brokerService);
         this.ledger = ledger;
-        this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
-        this.replicators = new ConcurrentOpenHashMap<>(16, 1);
+        this.subscriptions = ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
+        this.replicators = ConcurrentOpenHashMap.<String, Replicator>newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
         this.delayedDeliveryEnabled = brokerService.pulsar().getConfiguration().isDelayedDeliveryEnabled();
         this.delayedDeliveryTickTimeMillis =
                 brokerService.pulsar().getConfiguration().getDelayedDeliveryTickTimeMillis();
@@ -349,8 +355,14 @@ public class PersistentTopic extends AbstractTopic
         super(topic, brokerService);
         this.ledger = ledger;
         this.messageDeduplication = messageDeduplication;
-        this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
-        this.replicators = new ConcurrentOpenHashMap<>(16, 1);
+        this.subscriptions = ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
+        this.replicators = ConcurrentOpenHashMap.<String, Replicator>newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
         this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
         this.backloggedCursorThresholdEntries =
                 brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/ClusterReplicationMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/ClusterReplicationMetrics.java
index 1086563085b..6718f074c67 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/ClusterReplicationMetrics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/ClusterReplicationMetrics.java
@@ -35,7 +35,8 @@ public class ClusterReplicationMetrics {
     public ClusterReplicationMetrics(String localCluster, boolean metricsEnabled) {
         metricsList = new ArrayList<>();
         this.localCluster = localCluster;
-        metricsMap = new ConcurrentOpenHashMap<>();
+        metricsMap = ConcurrentOpenHashMap.<String, ReplicationMetrics>newBuilder()
+                .build();
         this.metricsEnabled = metricsEnabled;
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index 1429c7376f4..9e81a3e1db9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -234,7 +234,8 @@ public class AntiAffinityNamespaceGroupTest {
         brokerToDomainMap.put("brokerName-3", "domain-1");
 
         Set<String> candidate = Sets.newHashSet();
-        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange =
+                ConcurrentOpenHashMap.<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder().build();
 
         assertEquals(brokers.size(), totalBrokers);
 
@@ -320,7 +321,8 @@ public class AntiAffinityNamespaceGroupTest {
 
         Set<String> brokers = Sets.newHashSet();
         Set<String> candidate = Sets.newHashSet();
-        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange =
+                ConcurrentOpenHashMap.<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder().build();
         brokers.add("broker-0");
         brokers.add("broker-1");
         brokers.add("broker-2");
@@ -367,9 +369,11 @@ public class AntiAffinityNamespaceGroupTest {
     private void selectBrokerForNamespace(
             ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange,
             String broker, String namespace, String assignedBundleName) {
-        ConcurrentOpenHashSet<String> bundleSet = new ConcurrentOpenHashSet<>();
+        ConcurrentOpenHashSet<String> bundleSet =
+                ConcurrentOpenHashSet.<String>newBuilder().build();
         bundleSet.add(assignedBundleName);
-        ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> nsToBundleMap = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> nsToBundleMap =
+                ConcurrentOpenHashMap.<String, ConcurrentOpenHashSet<String>>newBuilder().build();
         nsToBundleMap.put(namespace, bundleSet);
         brokerToNamespaceToBundleRange.put(broker, nsToBundleMap);
     }
@@ -469,7 +473,8 @@ public class AntiAffinityNamespaceGroupTest {
 
         Set<String> brokers = Sets.newHashSet();
         Set<String> candidate = Sets.newHashSet();
-        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange =
+                ConcurrentOpenHashMap.<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder().build();
         brokers.add("broker-0");
         brokers.add("broker-1");
         brokers.add("broker-2");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java
index 716b9716425..d23772185f1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java
@@ -36,7 +36,10 @@ public class LoadManagerSharedTest {
         String assignedBundle = namespace + "/0x00000000_0x40000000";
 
         Set<String> candidates = Sets.newHashSet();
-        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> map = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> map =
+                ConcurrentOpenHashMap.<String,
+                        ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder()
+                        .build();
         LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, candidates, map);
         Assert.assertEquals(candidates.size(), 0);
 
@@ -80,8 +83,12 @@ public class LoadManagerSharedTest {
     private static void fillBrokerToNamespaceToBundleMap(
             ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> map,
             String broker, String namespace, String bundle) {
-        map.computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>())
-                .computeIfAbsent(namespace, k -> new ConcurrentOpenHashSet<>()).add(bundle);
+        map.computeIfAbsent(broker,
+                k -> ConcurrentOpenHashMap.<String,
+                        ConcurrentOpenHashSet<String>>newBuilder().build())
+                .computeIfAbsent(namespace,
+                        k -> ConcurrentOpenHashSet.<String>newBuilder().build())
+                .add(bundle);
     }
 
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 2314acdc3f5..067cbccf273 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -838,7 +838,11 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         addConsumerToSubscription.setAccessible(true);
 
         // for count consumers on topic
-        ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = new ConcurrentOpenHashMap<>(16, 1);
+        ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions =
+                ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder()
+                        .expectedItems(16)
+                        .concurrencyLevel(1)
+                        .build();
         subscriptions.put("sub-1", sub);
         subscriptions.put("sub-2", sub2);
         Field field = topic.getClass().getDeclaredField("subscriptions");
@@ -937,7 +941,11 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         addConsumerToSubscription.setAccessible(true);
 
         // for count consumers on topic
-        ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = new ConcurrentOpenHashMap<>(16, 1);
+        ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions =
+                ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder()
+                        .expectedItems(16)
+                        .concurrencyLevel(1)
+                        .build();
         subscriptions.put("sub-1", sub);
         subscriptions.put("sub-2", sub2);
         Field field = topic.getClass().getDeclaredField("subscriptions");
@@ -1063,7 +1071,11 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         addConsumerToSubscription.setAccessible(true);
 
         // for count consumers on topic
-        ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = new ConcurrentOpenHashMap<>(16, 1);
+        ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions =
+                ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder()
+                        .expectedItems(16)
+                        .concurrencyLevel(1)
+                        .build();
         subscriptions.put("sub1", sub1);
         subscriptions.put("sub2", sub2);
         Field field = topic.getClass().getDeclaredField("subscriptions");
@@ -2088,7 +2100,11 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
     public void testCheckInactiveSubscriptions() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
 
-        ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = new ConcurrentOpenHashMap<>(16, 1);
+        ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions =
+                ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder()
+                        .expectedItems(16)
+                        .concurrencyLevel(1)
+                        .build();
         // This subscription is connected by consumer.
         PersistentSubscription nonDeletableSubscription1 = spy(new PersistentSubscription(topic, "nonDeletableSubscription1", cursorMock, false));
         subscriptions.put(nonDeletableSubscription1.getName(), nonDeletableSubscription1);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 062b277165b..71fb2d62756 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -99,7 +99,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         this.consumerEventListener = conf.getConsumerEventListener();
         // Always use growable queue since items can exceed the advertised size
         this.incomingMessages = new GrowableArrayBlockingQueue<>();
-        this.unAckedChunkedMessageIdSequenceMap = new ConcurrentOpenHashMap<>();
+        this.unAckedChunkedMessageIdSequenceMap =
+                ConcurrentOpenHashMap.<MessageIdImpl, MessageIdImpl[]>newBuilder().build();
         this.executorProvider = executorProvider;
         this.externalPinnedExecutor = (ScheduledExecutorService) executorProvider.getExecutor();
         this.internalPinnedExecutor = (ScheduledExecutorService) client.getInternalExecutorService();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 85f7e05c460..603dae6ecbe 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -183,7 +183,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     protected volatile boolean paused;
 
-    protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap = new ConcurrentOpenHashMap<>();
+    protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap =
+            ConcurrentOpenHashMap.<String, ChunkedMessageCtx>newBuilder().build();
     private int pendingChunkedMessageCount = 0;
     protected long expireTimeOfIncompleteChunkedMessageMillis = 0;
     private boolean expireChunkMessageTaskScheduled = false;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 037f4016340..f8d3e885504 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -73,7 +73,8 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
     public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, int numPartitions,
             CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> schema, ProducerInterceptors interceptors) {
         super(client, topic, conf, producerCreatedFuture, schema, interceptors);
-        this.producers = new ConcurrentOpenHashMap<>();
+        this.producers =
+                ConcurrentOpenHashMap.<Integer, ProducerImpl<T>>newBuilder().build();
         this.topicMetadata = new TopicMetadataImpl(numPartitions);
         this.routerPolicy = getMessageRouter();
         stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ProducerStatsRecorderImpl() : null;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
index d164d09c4c4..180319034da 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
@@ -53,7 +53,8 @@ public abstract class ProducerBase<T> extends HandlerState implements Producer<T
         this.conf = conf;
         this.schema = schema;
         this.interceptors = interceptors;
-        this.schemaCache = new ConcurrentOpenHashMap<>();
+        this.schemaCache =
+                ConcurrentOpenHashMap.<SchemaHash, byte[]>newBuilder().build();
         if (!conf.isMultiSchema()) {
             multiSchemaMode = MultiSchemaMode.Disabled;
         }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index c0b952a281a..d577f48357c 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -60,7 +60,8 @@ public class AcknowledgementsGroupingTrackerTest {
     public void setup() throws NoSuchFieldException, IllegalAccessException {
         eventLoopGroup = new NioEventLoopGroup(1);
         consumer = mock(ConsumerImpl.class);
-        consumer.unAckedChunkedMessageIdSequenceMap = new ConcurrentOpenHashMap<>();
+        consumer.unAckedChunkedMessageIdSequenceMap =
+                ConcurrentOpenHashMap.<MessageIdImpl, MessageIdImpl[]>newBuilder().build();
         cnx = spy(new ClientCnxTest(new ClientConfigurationData(), new NioEventLoopGroup()));
         PulsarClientImpl client = mock(PulsarClientImpl.class);
         doReturn(client).when(consumer).getClient();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
index f1806c511e2..abbe11576a9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
@@ -45,8 +45,74 @@ public class ConcurrentLongPairSet implements LongPairSet {
     private static final int DefaultExpectedItems = 256;
     private static final int DefaultConcurrencyLevel = 16;
 
+    private static final float DefaultMapFillFactor = 0.66f;
+    private static final float DefaultMapIdleFactor = 0.15f;
+
+    private static final float DefaultExpandFactor = 2;
+    private static final float DefaultShrinkFactor = 2;
+
+    private static final boolean DefaultAutoShrink = false;
+
     private final Section[] sections;
 
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder of ConcurrentLongPairSet.
+     */
+    public static class Builder {
+        int expectedItems = DefaultExpectedItems;
+        int concurrencyLevel = DefaultConcurrencyLevel;
+        float mapFillFactor = DefaultMapFillFactor;
+        float mapIdleFactor = DefaultMapIdleFactor;
+        float expandFactor = DefaultExpandFactor;
+        float shrinkFactor = DefaultShrinkFactor;
+        boolean autoShrink = DefaultAutoShrink;
+
+        public Builder expectedItems(int expectedItems) {
+            this.expectedItems = expectedItems;
+            return this;
+        }
+
+        public Builder concurrencyLevel(int concurrencyLevel) {
+            this.concurrencyLevel = concurrencyLevel;
+            return this;
+        }
+
+        public Builder mapFillFactor(float mapFillFactor) {
+            this.mapFillFactor = mapFillFactor;
+            return this;
+        }
+
+        public Builder mapIdleFactor(float mapIdleFactor) {
+            this.mapIdleFactor = mapIdleFactor;
+            return this;
+        }
+
+        public Builder expandFactor(float expandFactor) {
+            this.expandFactor = expandFactor;
+            return this;
+        }
+
+        public Builder shrinkFactor(float shrinkFactor) {
+            this.shrinkFactor = shrinkFactor;
+            return this;
+        }
+
+        public Builder autoShrink(boolean autoShrink) {
+            this.autoShrink = autoShrink;
+            return this;
+        }
+
+        public ConcurrentLongPairSet build() {
+            return new ConcurrentLongPairSet(expectedItems, concurrencyLevel,
+                    mapFillFactor, mapIdleFactor, autoShrink, expandFactor, shrinkFactor);
+        }
+    }
+
+
     /**
      * Represents a function that accepts an object of the {@code LongPair} type.
      */
@@ -61,18 +127,33 @@ public class ConcurrentLongPairSet implements LongPairSet {
         void accept(long v1, long v2);
     }
 
+    @Deprecated
     public ConcurrentLongPairSet() {
         this(DefaultExpectedItems);
     }
 
+    @Deprecated
     public ConcurrentLongPairSet(int expectedItems) {
         this(expectedItems, DefaultConcurrencyLevel);
     }
 
+    @Deprecated
     public ConcurrentLongPairSet(int expectedItems, int concurrencyLevel) {
+        this(expectedItems, concurrencyLevel, DefaultMapFillFactor, DefaultMapIdleFactor,
+                DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
+    }
+
+    public ConcurrentLongPairSet(int expectedItems, int concurrencyLevel,
+                                 float mapFillFactor, float mapIdleFactor,
+                                 boolean autoShrink, float expandFactor, float shrinkFactor) {
         checkArgument(expectedItems > 0);
         checkArgument(concurrencyLevel > 0);
         checkArgument(expectedItems >= concurrencyLevel);
+        checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+        checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+        checkArgument(mapFillFactor > mapIdleFactor);
+        checkArgument(expandFactor > 1);
+        checkArgument(shrinkFactor > 1);
 
         int numSections = concurrencyLevel;
         int perSectionExpectedItems = expectedItems / numSections;
@@ -80,10 +161,12 @@ public class ConcurrentLongPairSet implements LongPairSet {
         this.sections = new Section[numSections];
 
         for (int i = 0; i < numSections; i++) {
-            sections[i] = new Section(perSectionCapacity);
+            sections[i] = new Section(perSectionCapacity, mapFillFactor, mapIdleFactor,
+                    autoShrink, expandFactor, shrinkFactor);
         }
     }
 
+    @Override
     public long size() {
         long size = 0;
         for (int i = 0; i < sections.length; i++) {
@@ -214,18 +297,33 @@ public class ConcurrentLongPairSet implements LongPairSet {
         private volatile long[] table;
 
         private volatile int capacity;
+        private final int initCapacity;
         private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER = AtomicIntegerFieldUpdater
                 .newUpdater(Section.class, "size");
         private volatile int size;
         private int usedBuckets;
-        private int resizeThreshold;
-
-        Section(int capacity) {
+        private int resizeThresholdUp;
+        private int resizeThresholdBelow;
+        private final float mapFillFactor;
+        private final float mapIdleFactor;
+        private final float expandFactor;
+        private final float shrinkFactor;
+        private final boolean autoShrink;
+
+        Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink,
+                float expandFactor, float shrinkFactor) {
             this.capacity = alignToPowerOfTwo(capacity);
+            this.initCapacity = this.capacity;
             this.table = new long[2 * this.capacity];
             this.size = 0;
             this.usedBuckets = 0;
-            this.resizeThreshold = (int) (this.capacity * SetFillFactor);
+            this.autoShrink = autoShrink;
+            this.mapFillFactor = mapFillFactor;
+            this.mapIdleFactor = mapIdleFactor;
+            this.expandFactor = expandFactor;
+            this.shrinkFactor = shrinkFactor;
+            this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+            this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
             Arrays.fill(table, EmptyItem);
         }
 
@@ -314,9 +412,11 @@ public class ConcurrentLongPairSet implements LongPairSet {
                     bucket = (bucket + 2) & (table.length - 1);
                 }
             } finally {
-                if (usedBuckets > resizeThreshold) {
+                if (usedBuckets > resizeThresholdUp) {
                     try {
-                        rehash();
+                        // Expand the hashmap
+                        int newCapacity = alignToPowerOfTwo((int) (capacity * expandFactor));
+                        rehash(newCapacity);
                     } finally {
                         unlockWrite(stamp);
                     }
@@ -347,7 +447,20 @@ public class ConcurrentLongPairSet implements LongPairSet {
                     bucket = (bucket + 2) & (table.length - 1);
                 }
             } finally {
-                unlockWrite(stamp);
+                if (autoShrink && size < resizeThresholdBelow) {
+                    try {
+                        int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
+                        int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
+                        if (newCapacity < capacity && newResizeThresholdUp > size) {
+                            // shrink the hashmap
+                            rehash(newCapacity);
+                        }
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
             }
         }
 
@@ -379,6 +492,16 @@ public class ConcurrentLongPairSet implements LongPairSet {
                 table[bucket] = EmptyItem;
                 table[bucket + 1] = EmptyItem;
                 --usedBuckets;
+
+                // Cleanup all the buckets that were in `DeletedKey` state,
+                // so that we can reduce unnecessary expansions
+                bucket = (bucket - 1) & (table.length - 1);
+                while (table[bucket] == DeletedItem) {
+                    table[bucket] = EmptyItem;
+                    --usedBuckets;
+
+                    bucket = (bucket - 1) & (table.length - 1);
+                }
             } else {
                 table[bucket] = DeletedItem;
                 table[bucket + 1] = DeletedItem;
@@ -392,6 +515,9 @@ public class ConcurrentLongPairSet implements LongPairSet {
                 Arrays.fill(table, EmptyItem);
                 this.size = 0;
                 this.usedBuckets = 0;
+                if (autoShrink) {
+                    rehash(initCapacity);
+                }
             } finally {
                 unlockWrite(stamp);
             }
@@ -431,9 +557,8 @@ public class ConcurrentLongPairSet implements LongPairSet {
             }
         }
 
-        private void rehash() {
+        private void rehash(int newCapacity) {
             // Expand the hashmap
-            int newCapacity = capacity * 2;
             long[] newTable = new long[2 * newCapacity];
             Arrays.fill(newTable, EmptyItem);
 
@@ -451,7 +576,8 @@ public class ConcurrentLongPairSet implements LongPairSet {
             // Capacity needs to be updated after the values, so that we won't see
             // a capacity value bigger than the actual array size
             capacity = newCapacity;
-            resizeThreshold = (int) (capacity * SetFillFactor);
+            resizeThresholdUp = (int) (capacity * mapFillFactor);
+            resizeThresholdBelow = (int) (capacity * mapIdleFactor);
         }
 
         private static void insertKeyValueNoLock(long[] table, int capacity, long item1, long item2) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
index 2c7eed1b58e..255844cf4ba 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
@@ -64,33 +64,112 @@ public class ConcurrentOpenHashMap<K, V> {
         }
     };
 
-    private static final float MapFillFactor = 0.66f;
-
     private static final int DefaultExpectedItems = 256;
     private static final int DefaultConcurrencyLevel = 16;
 
+    private static final float DefaultMapFillFactor = 0.66f;
+    private static final float DefaultMapIdleFactor = 0.15f;
+
+    private static final float DefaultExpandFactor = 2;
+    private static final float DefaultShrinkFactor = 2;
+
+    private static final boolean DefaultAutoShrink = false;
+
     private final Section<K, V>[] sections;
 
+    public static <K, V> Builder<K, V> newBuilder() {
+        return new Builder<>();
+    }
+
+    /**
+     * Builder of ConcurrentOpenHashMap.
+     */
+    public static class Builder<K, V> {
+        int expectedItems = DefaultExpectedItems;
+        int concurrencyLevel = DefaultConcurrencyLevel;
+        float mapFillFactor = DefaultMapFillFactor;
+        float mapIdleFactor = DefaultMapIdleFactor;
+        float expandFactor = DefaultExpandFactor;
+        float shrinkFactor = DefaultShrinkFactor;
+        boolean autoShrink = DefaultAutoShrink;
+
+        public Builder<K, V> expectedItems(int expectedItems) {
+            this.expectedItems = expectedItems;
+            return this;
+        }
+
+        public Builder<K, V> concurrencyLevel(int concurrencyLevel) {
+            this.concurrencyLevel = concurrencyLevel;
+            return this;
+        }
+
+        public Builder<K, V> mapFillFactor(float mapFillFactor) {
+            this.mapFillFactor = mapFillFactor;
+            return this;
+        }
+
+        public Builder<K, V> mapIdleFactor(float mapIdleFactor) {
+            this.mapIdleFactor = mapIdleFactor;
+            return this;
+        }
+
+        public Builder<K, V> expandFactor(float expandFactor) {
+            this.expandFactor = expandFactor;
+            return this;
+        }
+
+        public Builder<K, V> shrinkFactor(float shrinkFactor) {
+            this.shrinkFactor = shrinkFactor;
+            return this;
+        }
+
+        public Builder<K, V> autoShrink(boolean autoShrink) {
+            this.autoShrink = autoShrink;
+            return this;
+        }
+
+        public ConcurrentOpenHashMap<K, V> build() {
+            return new ConcurrentOpenHashMap<>(expectedItems, concurrencyLevel,
+                    mapFillFactor, mapIdleFactor, autoShrink, expandFactor, shrinkFactor);
+        }
+    }
+
+    @Deprecated
     public ConcurrentOpenHashMap() {
         this(DefaultExpectedItems);
     }
 
+    @Deprecated
     public ConcurrentOpenHashMap(int expectedItems) {
         this(expectedItems, DefaultConcurrencyLevel);
     }
 
+    @Deprecated
     public ConcurrentOpenHashMap(int expectedItems, int concurrencyLevel) {
+        this(expectedItems, concurrencyLevel, DefaultMapFillFactor, DefaultMapIdleFactor,
+                DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
+    }
+
+    public ConcurrentOpenHashMap(int expectedItems, int concurrencyLevel,
+                                 float mapFillFactor, float mapIdleFactor,
+                                 boolean autoShrink, float expandFactor, float shrinkFactor) {
         checkArgument(expectedItems > 0);
         checkArgument(concurrencyLevel > 0);
         checkArgument(expectedItems >= concurrencyLevel);
+        checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+        checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+        checkArgument(mapFillFactor > mapIdleFactor);
+        checkArgument(expandFactor > 1);
+        checkArgument(shrinkFactor > 1);
 
         int numSections = concurrencyLevel;
         int perSectionExpectedItems = expectedItems / numSections;
-        int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
+        int perSectionCapacity = (int) (perSectionExpectedItems / mapFillFactor);
         this.sections = (Section<K, V>[]) new Section[numSections];
 
         for (int i = 0; i < numSections; i++) {
-            sections[i] = new Section<>(perSectionCapacity);
+            sections[i] = new Section<>(perSectionCapacity, mapFillFactor, mapIdleFactor,
+                    autoShrink, expandFactor, shrinkFactor);
         }
     }
 
@@ -208,18 +287,33 @@ public class ConcurrentOpenHashMap<K, V> {
         private volatile Object[] table;
 
         private volatile int capacity;
+        private final int initCapacity;
         private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER =
                 AtomicIntegerFieldUpdater.newUpdater(Section.class, "size");
         private volatile int size;
         private int usedBuckets;
-        private int resizeThreshold;
-
-        Section(int capacity) {
+        private int resizeThresholdUp;
+        private int resizeThresholdBelow;
+        private final float mapFillFactor;
+        private final float mapIdleFactor;
+        private final float expandFactor;
+        private final float shrinkFactor;
+        private final boolean autoShrink;
+
+        Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink,
+                float expandFactor, float shrinkFactor) {
             this.capacity = alignToPowerOfTwo(capacity);
+            this.initCapacity = this.capacity;
             this.table = new Object[2 * this.capacity];
             this.size = 0;
             this.usedBuckets = 0;
-            this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+            this.autoShrink = autoShrink;
+            this.mapFillFactor = mapFillFactor;
+            this.mapIdleFactor = mapIdleFactor;
+            this.expandFactor = expandFactor;
+            this.shrinkFactor = shrinkFactor;
+            this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+            this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
         }
 
         V get(K key, int keyHash) {
@@ -316,9 +410,11 @@ public class ConcurrentOpenHashMap<K, V> {
                     bucket = (bucket + 2) & (table.length - 1);
                 }
             } finally {
-                if (usedBuckets > resizeThreshold) {
+                if (usedBuckets > resizeThresholdUp) {
                     try {
-                        rehash();
+                        // Expand the hashmap
+                        int newCapacity = alignToPowerOfTwo((int) (capacity * expandFactor));
+                        rehash(newCapacity);
                     } finally {
                         unlockWrite(stamp);
                     }
@@ -363,7 +459,20 @@ public class ConcurrentOpenHashMap<K, V> {
                 }
 
             } finally {
-                unlockWrite(stamp);
+                if (autoShrink && size < resizeThresholdBelow) {
+                    try {
+                        int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
+                        int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
+                        if (newCapacity < capacity && newResizeThresholdUp > size) {
+                            // shrink the hashmap
+                            rehash(newCapacity);
+                        }
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
             }
         }
 
@@ -374,6 +483,9 @@ public class ConcurrentOpenHashMap<K, V> {
                 Arrays.fill(table, EmptyKey);
                 this.size = 0;
                 this.usedBuckets = 0;
+                if (autoShrink) {
+                    rehash(initCapacity);
+                }
             } finally {
                 unlockWrite(stamp);
             }
@@ -415,9 +527,8 @@ public class ConcurrentOpenHashMap<K, V> {
             }
         }
 
-        private void rehash() {
+        private void rehash(int newCapacity) {
             // Expand the hashmap
-            int newCapacity = capacity * 2;
             Object[] newTable = new Object[2 * newCapacity];
 
             // Re-hash table
@@ -432,7 +543,8 @@ public class ConcurrentOpenHashMap<K, V> {
             table = newTable;
             capacity = newCapacity;
             usedBuckets = size;
-            resizeThreshold = (int) (capacity * MapFillFactor);
+            resizeThresholdUp = (int) (capacity * mapFillFactor);
+            resizeThresholdBelow = (int) (capacity * mapIdleFactor);
         }
 
         private static <K, V> void insertKeyValueNoLock(Object[] table, int capacity, K key, V value) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
index 8b77d9052b3..6dc8552174e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
@@ -43,33 +43,112 @@ public class ConcurrentOpenHashSet<V> {
     private static final Object EmptyValue = null;
     private static final Object DeletedValue = new Object();
 
-    private static final float MapFillFactor = 0.66f;
-
     private static final int DefaultExpectedItems = 256;
     private static final int DefaultConcurrencyLevel = 16;
 
+    private static final float DefaultMapFillFactor = 0.66f;
+    private static final float DefaultMapIdleFactor = 0.15f;
+
+    private static final float DefaultExpandFactor = 2;
+    private static final float DefaultShrinkFactor = 2;
+
+    private static final boolean DefaultAutoShrink = false;
+
     private final Section<V>[] sections;
 
+    public static <V> Builder<V> newBuilder() {
+        return new Builder<>();
+    }
+
+    /**
+     * Builder of ConcurrentOpenHashSet.
+     */
+    public static class Builder<V> {
+        int expectedItems = DefaultExpectedItems;
+        int concurrencyLevel = DefaultConcurrencyLevel;
+        float mapFillFactor = DefaultMapFillFactor;
+        float mapIdleFactor = DefaultMapIdleFactor;
+        float expandFactor = DefaultExpandFactor;
+        float shrinkFactor = DefaultShrinkFactor;
+        boolean autoShrink = DefaultAutoShrink;
+
+        public Builder<V> expectedItems(int expectedItems) {
+            this.expectedItems = expectedItems;
+            return this;
+        }
+
+        public Builder<V> concurrencyLevel(int concurrencyLevel) {
+            this.concurrencyLevel = concurrencyLevel;
+            return this;
+        }
+
+        public Builder<V> mapFillFactor(float mapFillFactor) {
+            this.mapFillFactor = mapFillFactor;
+            return this;
+        }
+
+        public Builder<V> mapIdleFactor(float mapIdleFactor) {
+            this.mapIdleFactor = mapIdleFactor;
+            return this;
+        }
+
+        public Builder<V> expandFactor(float expandFactor) {
+            this.expandFactor = expandFactor;
+            return this;
+        }
+
+        public Builder<V> shrinkFactor(float shrinkFactor) {
+            this.shrinkFactor = shrinkFactor;
+            return this;
+        }
+
+        public Builder<V> autoShrink(boolean autoShrink) {
+            this.autoShrink = autoShrink;
+            return this;
+        }
+
+        public ConcurrentOpenHashSet<V> build() {
+            return new ConcurrentOpenHashSet<>(expectedItems, concurrencyLevel,
+                    mapFillFactor, mapIdleFactor, autoShrink, expandFactor, shrinkFactor);
+        }
+    }
+
+    @Deprecated
     public ConcurrentOpenHashSet() {
         this(DefaultExpectedItems);
     }
 
+    @Deprecated
     public ConcurrentOpenHashSet(int expectedItems) {
         this(expectedItems, DefaultConcurrencyLevel);
     }
 
+    @Deprecated
     public ConcurrentOpenHashSet(int expectedItems, int concurrencyLevel) {
+        this(expectedItems, concurrencyLevel, DefaultMapFillFactor, DefaultMapIdleFactor,
+                DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
+    }
+
+    public ConcurrentOpenHashSet(int expectedItems, int concurrencyLevel,
+                                 float mapFillFactor, float mapIdleFactor,
+                                 boolean autoShrink, float expandFactor, float shrinkFactor) {
         checkArgument(expectedItems > 0);
         checkArgument(concurrencyLevel > 0);
         checkArgument(expectedItems >= concurrencyLevel);
+        checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+        checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+        checkArgument(mapFillFactor > mapIdleFactor);
+        checkArgument(expandFactor > 1);
+        checkArgument(shrinkFactor > 1);
 
         int numSections = concurrencyLevel;
         int perSectionExpectedItems = expectedItems / numSections;
-        int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
+        int perSectionCapacity = (int) (perSectionExpectedItems / mapFillFactor);
         this.sections = (Section<V>[]) new Section[numSections];
 
         for (int i = 0; i < numSections; i++) {
-            sections[i] = new Section<>(perSectionCapacity);
+            sections[i] = new Section<>(perSectionCapacity, mapFillFactor, mapIdleFactor,
+                    autoShrink, expandFactor, shrinkFactor);
         }
     }
 
@@ -177,18 +256,33 @@ public class ConcurrentOpenHashSet<V> {
         private volatile V[] values;
 
         private volatile int capacity;
+        private final int initCapacity;
         private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER =
                 AtomicIntegerFieldUpdater.newUpdater(Section.class, "size");
         private volatile int size;
         private int usedBuckets;
-        private int resizeThreshold;
-
-        Section(int capacity) {
+        private int resizeThresholdUp;
+        private int resizeThresholdBelow;
+        private final float mapFillFactor;
+        private final float mapIdleFactor;
+        private final float expandFactor;
+        private final float shrinkFactor;
+        private final boolean autoShrink;
+
+        Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink,
+                float expandFactor, float shrinkFactor) {
             this.capacity = alignToPowerOfTwo(capacity);
+            this.initCapacity = this.capacity;
             this.values = (V[]) new Object[this.capacity];
             this.size = 0;
             this.usedBuckets = 0;
-            this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+            this.autoShrink = autoShrink;
+            this.mapFillFactor = mapFillFactor;
+            this.mapIdleFactor = mapIdleFactor;
+            this.expandFactor = expandFactor;
+            this.shrinkFactor = shrinkFactor;
+            this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+            this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
         }
 
         boolean contains(V value, int keyHash) {
@@ -284,9 +378,11 @@ public class ConcurrentOpenHashSet<V> {
                     ++bucket;
                 }
             } finally {
-                if (usedBuckets > resizeThreshold) {
+                if (usedBuckets > resizeThresholdUp) {
                     try {
-                        rehash();
+                        // Expand the hashmap
+                        int newCapacity = alignToPowerOfTwo((int) (capacity * expandFactor));
+                        rehash(newCapacity);
                     } finally {
                         unlockWrite(stamp);
                     }
@@ -319,7 +415,20 @@ public class ConcurrentOpenHashSet<V> {
                 }
 
             } finally {
-                unlockWrite(stamp);
+                if (autoShrink && size < resizeThresholdBelow) {
+                    try {
+                        int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
+                        int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
+                        if (newCapacity < capacity && newResizeThresholdUp > size) {
+                            // shrink the hashmap
+                            rehash(newCapacity);
+                        }
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
             }
         }
 
@@ -330,6 +439,9 @@ public class ConcurrentOpenHashSet<V> {
                 Arrays.fill(values, EmptyValue);
                 this.size = 0;
                 this.usedBuckets = 0;
+                if (autoShrink) {
+                    rehash(initCapacity);
+                }
             } finally {
                 unlockWrite(stamp);
             }
@@ -402,9 +514,8 @@ public class ConcurrentOpenHashSet<V> {
             }
         }
 
-        private void rehash() {
+        private void rehash(int newCapacity) {
             // Expand the hashmap
-            int newCapacity = capacity * 2;
             V[] newValues = (V[]) new Object[newCapacity];
 
             // Re-hash table
@@ -418,7 +529,8 @@ public class ConcurrentOpenHashSet<V> {
             values = newValues;
             capacity = newCapacity;
             usedBuckets = size;
-            resizeThreshold = (int) (capacity * MapFillFactor);
+            resizeThresholdUp = (int) (capacity * mapFillFactor);
+            resizeThresholdBelow = (int) (capacity * mapIdleFactor);
         }
 
         private static <V> void insertValueNoLock(V[] values, V value) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
index 95e2302dcb7..e4cb668fc92 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
@@ -79,7 +79,10 @@ public class ConcurrentSortedLongPairSet implements LongPairSet {
     @Override
     public boolean add(long item1, long item2) {
         ConcurrentLongPairSet messagesToReplay = longPairSets.computeIfAbsent(item1,
-                (key) -> new ConcurrentLongPairSet(expectedItems, concurrencyLevel));
+                (key) -> ConcurrentLongPairSet.newBuilder()
+                        .expectedItems(expectedItems)
+                        .concurrencyLevel(concurrencyLevel)
+                        .build());
         return messagesToReplay.add(item1, item2);
     }
 
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java
index 82cac712975..a8d3e1d0603 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java
@@ -45,21 +45,29 @@ public class ConcurrentLongPairSetTest {
     @Test
     public void testConstructor() {
         try {
-            new ConcurrentLongPairSet(0);
+            ConcurrentLongPairSet.newBuilder()
+                    .expectedItems(0)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            new ConcurrentLongPairSet(16, 0);
+            ConcurrentLongPairSet.newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(0)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            new ConcurrentLongPairSet(4, 8);
+            ConcurrentLongPairSet.newBuilder()
+                    .expectedItems(4)
+                    .concurrencyLevel(8)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
@@ -68,7 +76,9 @@ public class ConcurrentLongPairSetTest {
 
     @Test
     public void simpleInsertions() {
-        ConcurrentLongPairSet set = new ConcurrentLongPairSet(16);
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder()
+                .expectedItems(16)
+                .build();
 
         assertTrue(set.isEmpty());
         assertTrue(set.add(1, 1));
@@ -94,9 +104,64 @@ public class ConcurrentLongPairSetTest {
         assertEquals(set.size(), 3);
     }
 
+    @Test
+    public void testClear() {
+        ConcurrentLongPairSet map = ConcurrentLongPairSet.newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertTrue(map.add(1, 1));
+        assertTrue(map.add(2, 2));
+        assertTrue(map.add(3, 3));
+
+        assertTrue(map.capacity() == 8);
+        map.clear();
+        assertTrue(map.capacity() == 4);
+    }
+
+    @Test
+    public void testExpandAndShrink() {
+        ConcurrentLongPairSet map = ConcurrentLongPairSet.newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertTrue(map.add(1, 1));
+        assertTrue(map.add(2, 2));
+        assertTrue(map.add(3, 3));
+
+        // expand hashmap
+        assertTrue(map.capacity() == 8);
+
+        assertTrue(map.remove(1, 1));
+        // not shrink
+        assertTrue(map.capacity() == 8);
+        assertTrue(map.remove(2, 2));
+        // shrink hashmap
+        assertTrue(map.capacity() == 4);
+
+        // expand hashmap
+        assertTrue(map.add(4, 4));
+        assertTrue(map.add(5, 5));
+        assertTrue(map.capacity() == 8);
+
+        //verify that the map does not keep shrinking at every remove() operation
+        assertTrue(map.add(6, 6));
+        assertTrue(map.remove(6, 6));
+        assertTrue(map.capacity() == 8);
+    }
+
+
     @Test
     public void testRemove() {
-        ConcurrentLongPairSet set = new ConcurrentLongPairSet();
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build();
 
         assertTrue(set.isEmpty());
         assertTrue(set.add(1, 1));
@@ -111,7 +176,10 @@ public class ConcurrentLongPairSetTest {
     @Test
     public void testRehashing() {
         int n = 16;
-        ConcurrentLongPairSet set = new ConcurrentLongPairSet(n / 2, 1);
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder()
+                .expectedItems(n / 2)
+                .concurrencyLevel(1)
+                .build();
         assertEquals(set.capacity(), n);
         assertEquals(set.size(), 0);
 
@@ -126,7 +194,10 @@ public class ConcurrentLongPairSetTest {
     @Test
     public void testRehashingRemoval() {
         int n = 16;
-        ConcurrentLongPairSet set = new ConcurrentLongPairSet(n / 2, 1);
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder()
+                .expectedItems(n / 2)
+                .concurrencyLevel(1)
+                .build();
         assertEquals(set.capacity(), n);
         assertEquals(set.size(), 0);
 
@@ -152,7 +223,10 @@ public class ConcurrentLongPairSetTest {
     @Test
     public void testRehashingWithDeletes() {
         int n = 16;
-        ConcurrentLongPairSet set = new ConcurrentLongPairSet(n / 2, 1);
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder()
+                .expectedItems(n / 2)
+                .concurrencyLevel(1)
+                .build();
         assertEquals(set.capacity(), n);
         assertEquals(set.size(), 0);
 
@@ -177,7 +251,7 @@ public class ConcurrentLongPairSetTest {
 
     @Test
     public void concurrentInsertions() throws Throwable {
-        ConcurrentLongPairSet set = new ConcurrentLongPairSet();
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build();
         @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -210,7 +284,7 @@ public class ConcurrentLongPairSetTest {
 
     @Test
     public void concurrentInsertionsAndReads() throws Throwable {
-        ConcurrentLongPairSet map = new ConcurrentLongPairSet();
+        ConcurrentLongPairSet map = ConcurrentLongPairSet.newBuilder().build();
         @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -243,7 +317,7 @@ public class ConcurrentLongPairSetTest {
 
     @Test
     public void testIteration() {
-        ConcurrentLongPairSet set = new ConcurrentLongPairSet();
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build();
 
         assertEquals(set.items(), Collections.emptyList());
 
@@ -269,7 +343,7 @@ public class ConcurrentLongPairSetTest {
 
     @Test
     public void testRemoval() {
-        ConcurrentLongPairSet set = new ConcurrentLongPairSet();
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build();
 
         set.add(0, 0);
         set.add(1, 1);
@@ -295,7 +369,7 @@ public class ConcurrentLongPairSetTest {
 
     @Test
     public void testIfRemoval() {
-        ConcurrentLongPairSet set = new ConcurrentLongPairSet();
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build();
 
         set.add(0, 0);
         set.add(1, 1);
@@ -319,7 +393,7 @@ public class ConcurrentLongPairSetTest {
 
     @Test
     public void testItems() {
-        ConcurrentLongPairSet set = new ConcurrentLongPairSet();
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build();
 
         int n = 100;
         int limit = 10;
@@ -340,7 +414,10 @@ public class ConcurrentLongPairSetTest {
     @Test
     public void testHashConflictWithDeletion() {
         final int Buckets = 16;
-        ConcurrentLongPairSet set = new ConcurrentLongPairSet(Buckets, 1);
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder()
+                .expectedItems(Buckets)
+                .concurrencyLevel(1)
+                .build();
 
         // Pick 2 keys that fall into the same bucket
         long key1 = 1;
@@ -375,7 +452,7 @@ public class ConcurrentLongPairSetTest {
     @Test
     public void testEqualsObjects() {
 
-        ConcurrentLongPairSet set = new ConcurrentLongPairSet();
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build();
 
         long t1 = 1;
         long t2 = 2;
@@ -397,7 +474,7 @@ public class ConcurrentLongPairSetTest {
     @Test
     public void testToString() {
 
-        ConcurrentLongPairSet set = new ConcurrentLongPairSet();
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build();
 
         set.add(0, 0);
         set.add(1, 1);
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
index 254be51f292..7919485d9b6 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
@@ -49,21 +49,29 @@ public class ConcurrentOpenHashMapTest {
     @Test
     public void testConstructor() {
         try {
-            new ConcurrentOpenHashMap<String, String>(0);
+            ConcurrentOpenHashMap.<String, String>newBuilder()
+                    .expectedItems(0)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            new ConcurrentOpenHashMap<String, String>(16, 0);
+            ConcurrentOpenHashMap.<String, String>newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(0)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            new ConcurrentOpenHashMap<String, String>(4, 8);
+            ConcurrentOpenHashMap.<String, String>newBuilder()
+                    .expectedItems(4)
+                    .concurrencyLevel(8)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
@@ -72,7 +80,10 @@ public class ConcurrentOpenHashMapTest {
 
     @Test
     public void simpleInsertions() {
-        ConcurrentOpenHashMap<String, String> map = new ConcurrentOpenHashMap<>(16);
+        ConcurrentOpenHashMap<String, String> map =
+                ConcurrentOpenHashMap.<String, String>newBuilder()
+                .expectedItems(16)
+                .build();
 
         assertTrue(map.isEmpty());
         assertNull(map.put("1", "one"));
@@ -98,9 +109,64 @@ public class ConcurrentOpenHashMapTest {
         assertEquals(map.size(), 3);
     }
 
+    @Test
+    public void testClear() {
+        ConcurrentOpenHashMap<String, String> map = ConcurrentOpenHashMap.<String, String>newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertNull(map.put("k1", "v1"));
+        assertNull(map.put("k2", "v2"));
+        assertNull(map.put("k3", "v3"));
+
+        assertTrue(map.capacity() == 8);
+        map.clear();
+        assertTrue(map.capacity() == 4);
+    }
+
+    @Test
+    public void testExpandAndShrink() {
+        ConcurrentOpenHashMap<String, String> map = ConcurrentOpenHashMap.<String, String>newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertNull(map.put("k1", "v1"));
+        assertNull(map.put("k2", "v2"));
+        assertNull(map.put("k3", "v3"));
+
+        // expand hashmap
+        assertTrue(map.capacity() == 8);
+
+        assertTrue(map.remove("k1", "v1"));
+        // not shrink
+        assertTrue(map.capacity() == 8);
+        assertTrue(map.remove("k2", "v2"));
+        // shrink hashmap
+        assertTrue(map.capacity() == 4);
+
+        // expand hashmap
+        assertNull(map.put("k4", "v4"));
+        assertNull(map.put("k5", "v5"));
+        assertTrue(map.capacity() == 8);
+
+        //verify that the map does not keep shrinking at every remove() operation
+        assertNull(map.put("k6", "v6"));
+        assertTrue(map.remove("k6", "v6"));
+        assertTrue(map.capacity() == 8);
+    }
+
     @Test
     public void testRemove() {
-        ConcurrentOpenHashMap<String, String> map = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<String, String> map =
+                ConcurrentOpenHashMap.<String, String>newBuilder().build();
 
         assertTrue(map.isEmpty());
         assertNull(map.put("1", "one"));
@@ -117,7 +183,10 @@ public class ConcurrentOpenHashMapTest {
     @Test
     public void testRehashing() {
         int n = 16;
-        ConcurrentOpenHashMap<String, Integer> map = new ConcurrentOpenHashMap<>(n / 2, 1);
+        ConcurrentOpenHashMap<String, Integer> map = ConcurrentOpenHashMap.<String, Integer>newBuilder()
+                        .expectedItems(n / 2)
+                        .concurrencyLevel(1)
+                        .build();
         assertEquals(map.capacity(), n);
         assertEquals(map.size(), 0);
 
@@ -132,7 +201,11 @@ public class ConcurrentOpenHashMapTest {
     @Test
     public void testRehashingWithDeletes() {
         int n = 16;
-        ConcurrentOpenHashMap<Integer, Integer> map = new ConcurrentOpenHashMap<>(n / 2, 1);
+        ConcurrentOpenHashMap<Integer, Integer> map =
+                ConcurrentOpenHashMap.<Integer, Integer>newBuilder()
+                        .expectedItems(n / 2)
+                        .concurrencyLevel(1)
+                        .build();
         assertEquals(map.capacity(), n);
         assertEquals(map.size(), 0);
 
@@ -154,7 +227,10 @@ public class ConcurrentOpenHashMapTest {
 
     @Test
     public void concurrentInsertions() throws Throwable {
-        ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>(16, 1);
+        ConcurrentOpenHashMap<Long, String> map = ConcurrentOpenHashMap.<Long, String>newBuilder()
+                        .expectedItems(16)
+                        .concurrencyLevel(1)
+                        .build();
         @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -188,7 +264,8 @@ public class ConcurrentOpenHashMapTest {
 
     @Test
     public void concurrentInsertionsAndReads() throws Throwable {
-        ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<Long, String> map =
+                ConcurrentOpenHashMap.<Long, String>newBuilder().build();
         @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -222,7 +299,8 @@ public class ConcurrentOpenHashMapTest {
 
     @Test
     public void testIteration() {
-        ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<Long, String> map =
+                ConcurrentOpenHashMap.<Long, String>newBuilder().build();
 
         assertEquals(map.keys(), Collections.emptyList());
         assertEquals(map.values(), Collections.emptyList());
@@ -266,7 +344,10 @@ public class ConcurrentOpenHashMapTest {
     @Test
     public void testHashConflictWithDeletion() {
         final int Buckets = 16;
-        ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>(Buckets, 1);
+        ConcurrentOpenHashMap<Long, String> map = ConcurrentOpenHashMap.<Long, String>newBuilder()
+                .expectedItems(Buckets)
+                .concurrencyLevel(1)
+                .build();
 
         // Pick 2 keys that fall into the same bucket
         long key1 = 1;
@@ -299,7 +380,8 @@ public class ConcurrentOpenHashMapTest {
 
     @Test
     public void testPutIfAbsent() {
-        ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<Long, String> map =
+                ConcurrentOpenHashMap.<Long, String>newBuilder().build();
         assertNull(map.putIfAbsent(1l, "one"));
         assertEquals(map.get(1l), "one");
 
@@ -309,7 +391,10 @@ public class ConcurrentOpenHashMapTest {
 
     @Test
     public void testComputeIfAbsent() {
-        ConcurrentOpenHashMap<Integer, Integer> map = new ConcurrentOpenHashMap<>(16, 1);
+        ConcurrentOpenHashMap<Integer, Integer> map = ConcurrentOpenHashMap.<Integer, Integer>newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
         AtomicInteger counter = new AtomicInteger();
         Function<Integer, Integer> provider = key -> counter.getAndIncrement();
 
@@ -350,7 +435,8 @@ public class ConcurrentOpenHashMapTest {
             }
         }
 
-        ConcurrentOpenHashMap<T, String> map = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<T, String> map =
+                ConcurrentOpenHashMap.<T, String>newBuilder().build();
 
         T t1 = new T(1);
         T t1_b = new T(1);
@@ -372,7 +458,11 @@ public class ConcurrentOpenHashMapTest {
 
     @Test
     public void testNullValue() {
-        ConcurrentOpenHashMap<String, String> map = new ConcurrentOpenHashMap<>(16, 1);
+        ConcurrentOpenHashMap<String, String> map =
+                ConcurrentOpenHashMap.<String, String>newBuilder()
+                        .expectedItems(16)
+                        .concurrencyLevel(1)
+                        .build();
         String key = "a";
         assertThrows(NullPointerException.class, () -> map.put(key, null));
 
@@ -406,7 +496,10 @@ public class ConcurrentOpenHashMapTest {
     static final int N = 1_000_000;
 
     public void benchConcurrentOpenHashMap() throws Exception {
-        ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>(N, 1);
+        ConcurrentOpenHashMap<Long, String> map = ConcurrentOpenHashMap.<Long, String>newBuilder()
+                .expectedItems(N)
+                .concurrencyLevel(1)
+                .build();
 
         for (long i = 0; i < Iterations; i++) {
             for (int j = 0; j < N; j++) {
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java
index 3c1d99668d7..af62948b64a 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java
@@ -91,9 +91,66 @@ public class ConcurrentOpenHashSetTest {
         assertEquals(set.size(), 3);
     }
 
+    @Test
+    public void testClear() {
+        ConcurrentOpenHashSet<String> map =
+                ConcurrentOpenHashSet.<String>newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertTrue(map.add("k1"));
+        assertTrue(map.add("k2"));
+        assertTrue(map.add("k3"));
+
+        assertTrue(map.capacity() == 8);
+        map.clear();
+        assertTrue(map.capacity() == 4);
+    }
+
+    @Test
+    public void testExpandAndShrink() {
+        ConcurrentOpenHashSet<String> map =
+                ConcurrentOpenHashSet.<String>newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertTrue(map.add("k1"));
+        assertTrue(map.add("k2"));
+        assertTrue(map.add("k3"));
+
+        // expand hashmap
+        assertTrue(map.capacity() == 8);
+
+        assertTrue(map.remove("k1"));
+        // not shrink
+        assertTrue(map.capacity() == 8);
+        assertTrue(map.remove("k2"));
+        // shrink hashmap
+        assertTrue(map.capacity() == 4);
+
+        // expand hashmap
+        assertTrue(map.add("k4"));
+        assertTrue(map.add("k5"));
+        assertTrue(map.capacity() == 8);
+
+        //verify that the map does not keep shrinking at every remove() operation
+        assertTrue(map.add("k6"));
+        assertTrue(map.remove("k6"));
+        assertTrue(map.capacity() == 8);
+    }
+
     @Test
     public void testRemove() {
-        ConcurrentOpenHashSet<String> set = new ConcurrentOpenHashSet<>();
+        ConcurrentOpenHashSet<String> set =
+                ConcurrentOpenHashSet.<String>newBuilder().build();
 
         assertTrue(set.isEmpty());
         assertTrue(set.add("1"));
@@ -145,7 +202,8 @@ public class ConcurrentOpenHashSetTest {
 
     @Test
     public void concurrentInsertions() throws Throwable {
-        ConcurrentOpenHashSet<Long> set = new ConcurrentOpenHashSet<>();
+        ConcurrentOpenHashSet<Long> set =
+                ConcurrentOpenHashSet.<Long>newBuilder().build();
         @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -178,7 +236,8 @@ public class ConcurrentOpenHashSetTest {
 
     @Test
     public void concurrentInsertionsAndReads() throws Throwable {
-        ConcurrentOpenHashSet<Long> map = new ConcurrentOpenHashSet<>();
+        ConcurrentOpenHashSet<Long> map =
+                ConcurrentOpenHashSet.<Long>newBuilder().build();
         @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -211,7 +270,7 @@ public class ConcurrentOpenHashSetTest {
 
     @Test
     public void testIteration() {
-        ConcurrentOpenHashSet<Long> set = new ConcurrentOpenHashSet<>();
+        ConcurrentOpenHashSet<Long> set = ConcurrentOpenHashSet.<Long>newBuilder().build();
 
         assertEquals(set.values(), Collections.emptyList());
 
@@ -237,7 +296,8 @@ public class ConcurrentOpenHashSetTest {
 
     @Test
     public void testRemoval() {
-        ConcurrentOpenHashSet<Integer> set = new ConcurrentOpenHashSet<>();
+        ConcurrentOpenHashSet<Integer> set =
+                ConcurrentOpenHashSet.<Integer>newBuilder().build();
 
         set.add(0);
         set.add(1);
@@ -315,7 +375,8 @@ public class ConcurrentOpenHashSetTest {
             }
         }
 
-        ConcurrentOpenHashSet<T> set = new ConcurrentOpenHashSet<>();
+        ConcurrentOpenHashSet<T> set =
+                ConcurrentOpenHashSet.<T>newBuilder().build();
 
         T t1 = new T(1);
         T t1_b = new T(1);
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 56ae17be97a..195f40fd326 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -121,7 +121,8 @@ public class PulsarRecordCursor implements RecordCursor {
 
     PulsarDispatchingRowDecoderFactory decoderFactory;
 
-    protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap = new ConcurrentOpenHashMap<>();
+    protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap =
+            ConcurrentOpenHashMap.<String, ChunkedMessageCtx>newBuilder().build();
 
     private static final Logger log = Logger.get(PulsarRecordCursor.class);
 
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index 0753dd282f6..b2873d778ab 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -81,9 +81,17 @@ public class WebSocketService implements Closeable {
     public WebSocketService(ClusterData localCluster, ServiceConfiguration config) {
         this.config = config;
         this.localCluster = localCluster;
-        this.topicProducerMap = new ConcurrentOpenHashMap<>();
-        this.topicConsumerMap = new ConcurrentOpenHashMap<>();
-        this.topicReaderMap = new ConcurrentOpenHashMap<>();
+        this.topicProducerMap =
+                ConcurrentOpenHashMap.<String,
+                        ConcurrentOpenHashSet<ProducerHandler>>newBuilder()
+                        .build();
+        this.topicConsumerMap =
+                ConcurrentOpenHashMap.<String,
+                        ConcurrentOpenHashSet<ConsumerHandler>>newBuilder()
+                        .build();
+        this.topicReaderMap =
+                ConcurrentOpenHashMap.<String, ConcurrentOpenHashSet<ReaderHandler>>newBuilder()
+                        .build();
         this.proxyStats = new ProxyStats(this);
     }
 
@@ -247,7 +255,8 @@ public class WebSocketService implements Closeable {
 
     public boolean addProducer(ProducerHandler producer) {
         return topicProducerMap
-                .computeIfAbsent(producer.getProducer().getTopic(), topic -> new ConcurrentOpenHashSet<>())
+                .computeIfAbsent(producer.getProducer().getTopic(),
+                        topic -> ConcurrentOpenHashSet.<ProducerHandler>newBuilder().build())
                 .add(producer);
     }
 
@@ -265,7 +274,8 @@ public class WebSocketService implements Closeable {
 
     public boolean addConsumer(ConsumerHandler consumer) {
         return topicConsumerMap
-                .computeIfAbsent(consumer.getConsumer().getTopic(), topic -> new ConcurrentOpenHashSet<>())
+                .computeIfAbsent(consumer.getConsumer().getTopic(), topic ->
+                        ConcurrentOpenHashSet.<ConsumerHandler>newBuilder().build())
                 .add(consumer);
     }
 
@@ -282,7 +292,8 @@ public class WebSocketService implements Closeable {
     }
 
     public boolean addReader(ReaderHandler reader) {
-        return topicReaderMap.computeIfAbsent(reader.getConsumer().getTopic(), topic -> new ConcurrentOpenHashSet<>())
+        return topicReaderMap.computeIfAbsent(reader.getConsumer().getTopic(), topic ->
+                ConcurrentOpenHashSet.<ReaderHandler>newBuilder().build())
                 .add(reader);
     }
 
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyStats.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyStats.java
index 9bf5f4a68f6..8fa91130ae4 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyStats.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyStats.java
@@ -52,7 +52,9 @@ public class ProxyStats {
         super();
         this.service = service;
         this.jvmMetrics = new JvmMetrics(service);
-        this.topicStats = new ConcurrentOpenHashMap<>();
+        this.topicStats =
+                ConcurrentOpenHashMap.<String, ProxyNamespaceStats>newBuilder()
+                        .build();
         this.metricsCollection = Lists.newArrayList();
         this.tempMetricsCollection = Lists.newArrayList();
         // schedule stat generation task every 1 minute


[pulsar] 04/17: Reduce unnecessary expansions for ConcurrentLong map and set (#14562)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e38d75a0950de0dd5ab5dcc620a20cb332ee5fb8
Author: LinChen <15...@qq.com>
AuthorDate: Tue Mar 15 11:41:08 2022 +0800

    Reduce unnecessary expansions for ConcurrentLong map and set (#14562)
    
    (cherry picked from commit 8e7006f899bd2b9ed9482ab2ce1ee35233957d03)
---
 .../util/collections/ConcurrentLongHashMap.java    | 10 ++++++
 .../util/collections/ConcurrentLongPairSet.java    | 11 ++++---
 .../util/collections/ConcurrentOpenHashMap.java    | 19 ++++++++++++
 .../util/collections/ConcurrentOpenHashSet.java    | 18 +++++++++++
 .../collections/ConcurrentLongHashMapTest.java     | 19 ++++++++++++
 .../collections/ConcurrentLongPairSetTest.java     | 19 ++++++++++++
 .../collections/ConcurrentOpenHashMapTest.java     | 19 ++++++++++++
 .../collections/ConcurrentOpenHashSetTest.java     | 36 +++++++++++++++++-----
 8 files changed, 138 insertions(+), 13 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
index a4779357a44..6f2794468c4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
@@ -451,6 +451,16 @@ public class ConcurrentLongHashMap<V> {
                             if (nextValueInArray == EmptyValue) {
                                 values[bucket] = (V) EmptyValue;
                                 --usedBuckets;
+
+                                // Cleanup all the buckets that were in `DeletedValue` state,
+                                // so that we can reduce unnecessary expansions
+                                int lastBucket = signSafeMod(bucket - 1, capacity);
+                                while (values[lastBucket] == DeletedValue) {
+                                    values[lastBucket] = (V) EmptyValue;
+                                    --usedBuckets;
+
+                                    lastBucket = signSafeMod(lastBucket - 1, capacity);
+                                }
                             } else {
                                 values[bucket] = (V) DeletedValue;
                             }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
index abbe11576a9..66ecaee4bfa 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
@@ -493,14 +493,15 @@ public class ConcurrentLongPairSet implements LongPairSet {
                 table[bucket + 1] = EmptyItem;
                 --usedBuckets;
 
-                // Cleanup all the buckets that were in `DeletedKey` state,
+                // Cleanup all the buckets that were in `DeletedItem` state,
                 // so that we can reduce unnecessary expansions
-                bucket = (bucket - 1) & (table.length - 1);
-                while (table[bucket] == DeletedItem) {
-                    table[bucket] = EmptyItem;
+                int lastBucket = (bucket - 2) & (table.length - 1);
+                while (table[lastBucket] == DeletedItem) {
+                    table[lastBucket] = EmptyItem;
+                    table[lastBucket + 1] = EmptyItem;
                     --usedBuckets;
 
-                    bucket = (bucket - 1) & (table.length - 1);
+                    lastBucket = (lastBucket - 2) & (table.length - 1);
                 }
             } else {
                 table[bucket] = DeletedItem;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
index 255844cf4ba..f82bf11a90e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
@@ -173,6 +173,14 @@ public class ConcurrentOpenHashMap<K, V> {
         }
     }
 
+    long getUsedBucketCount() {
+        long usedBucketCount = 0;
+        for (Section<K, V> s : sections) {
+            usedBucketCount += s.usedBuckets;
+        }
+        return usedBucketCount;
+    }
+
     public long size() {
         long size = 0;
         for (Section<K, V> s : sections) {
@@ -441,6 +449,17 @@ public class ConcurrentOpenHashMap<K, V> {
                                 table[bucket] = EmptyKey;
                                 table[bucket + 1] = null;
                                 --usedBuckets;
+
+                                // Cleanup all the buckets that were in `DeletedKey` state,
+                                // so that we can reduce unnecessary expansions
+                                int lastBucket = (bucket - 2) & (table.length - 1);
+                                while (table[lastBucket] == DeletedKey) {
+                                    table[lastBucket] = EmptyKey;
+                                    table[lastBucket + 1] = null;
+                                    --usedBuckets;
+
+                                    lastBucket = (lastBucket - 2) & (table.length - 1);
+                                }
                             } else {
                                 table[bucket] = DeletedKey;
                                 table[bucket + 1] = null;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
index 6dc8552174e..cf5ed7ccdc8 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
@@ -152,6 +152,14 @@ public class ConcurrentOpenHashSet<V> {
         }
     }
 
+    long getUsedBucketCount() {
+        long usedBucketCount = 0;
+        for (Section<V> s : sections) {
+            usedBucketCount += s.usedBuckets;
+        }
+        return usedBucketCount;
+    }
+
     public long size() {
         long size = 0;
         for (int i = 0; i < sections.length; i++) {
@@ -477,6 +485,16 @@ public class ConcurrentOpenHashSet<V> {
             if (values[nextInArray] == EmptyValue) {
                 values[bucket] = (V) EmptyValue;
                 --usedBuckets;
+
+                // Cleanup all the buckets that were in `DeletedValue` state,
+                // so that we can reduce unnecessary expansions
+                int lastBucket = signSafeMod(bucket - 1, capacity);
+                while (values[lastBucket] == DeletedValue) {
+                    values[lastBucket] = (V) EmptyValue;
+                    --usedBuckets;
+
+                    lastBucket = signSafeMod(lastBucket - 1, capacity);
+                }
             } else {
                 values[bucket] = (V) DeletedValue;
             }
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
index 6cf126cf2ff..205cf91b47d 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
@@ -107,6 +107,25 @@ public class ConcurrentLongHashMapTest {
         assertEquals(map.size(), 3);
     }
 
+    @Test
+    public void testReduceUnnecessaryExpansions() {
+        ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .build();
+        assertNull(map.put(1, "v1"));
+        assertNull(map.put(2, "v2"));
+        assertNull(map.put(3, "v3"));
+        assertNull(map.put(4, "v4"));
+
+        assertTrue(map.remove(1, "v1"));
+        assertTrue(map.remove(2, "v2"));
+        assertTrue(map.remove(3, "v3"));
+        assertTrue(map.remove(4, "v4"));
+
+        assertEquals(0, map.getUsedBucketCount());
+    }
+
     @Test
     public void testClear() {
         ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java
index a8d3e1d0603..86030f21619 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java
@@ -74,6 +74,25 @@ public class ConcurrentLongPairSetTest {
         }
     }
 
+    @Test
+    public void testReduceUnnecessaryExpansions() {
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .build();
+        assertTrue(set.add(1, 1));
+        assertTrue(set.add(2, 2));
+        assertTrue(set.add(3, 3));
+        assertTrue(set.add(4, 4));
+
+        assertTrue(set.remove(1, 1));
+        assertTrue(set.remove(2, 2));
+        assertTrue(set.remove(3, 3));
+        assertTrue(set.remove(4, 4));
+
+        assertEquals(0, set.getUsedBucketCount());
+    }
+
     @Test
     public void simpleInsertions() {
         ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder()
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
index 7919485d9b6..cec52ea3ded 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
@@ -109,6 +109,25 @@ public class ConcurrentOpenHashMapTest {
         assertEquals(map.size(), 3);
     }
 
+    @Test
+    public void testReduceUnnecessaryExpansions() {
+        ConcurrentOpenHashMap<String, String> map = ConcurrentOpenHashMap.<String, String>newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .build();
+        assertNull(map.put("1", "1"));
+        assertNull(map.put("2", "2"));
+        assertNull(map.put("3", "3"));
+        assertNull(map.put("4", "4"));
+
+        assertEquals(map.remove("1"), "1");
+        assertEquals(map.remove("2"), "2");
+        assertEquals(map.remove("3"), "3");
+        assertEquals(map.remove("4"), "4");
+
+        assertEquals(0, map.getUsedBucketCount());
+    }
+
     @Test
     public void testClear() {
         ConcurrentOpenHashMap<String, String> map = ConcurrentOpenHashMap.<String, String>newBuilder()
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java
index af62948b64a..6c82293bec2 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java
@@ -91,24 +91,44 @@ public class ConcurrentOpenHashSetTest {
         assertEquals(set.size(), 3);
     }
 
+    @Test
+    public void testReduceUnnecessaryExpansions() {
+        ConcurrentOpenHashSet<String> set =
+                ConcurrentOpenHashSet.<String>newBuilder()
+                        .expectedItems(2)
+                        .concurrencyLevel(1)
+                        .build();
+
+        assertTrue(set.add("1"));
+        assertTrue(set.add("2"));
+        assertTrue(set.add("3"));
+        assertTrue(set.add("4"));
+
+        assertTrue(set.remove("1"));
+        assertTrue(set.remove("2"));
+        assertTrue(set.remove("3"));
+        assertTrue(set.remove("4"));
+        assertEquals(0, set.getUsedBucketCount());
+    }
+
     @Test
     public void testClear() {
-        ConcurrentOpenHashSet<String> map =
+        ConcurrentOpenHashSet<String> set =
                 ConcurrentOpenHashSet.<String>newBuilder()
                 .expectedItems(2)
                 .concurrencyLevel(1)
                 .autoShrink(true)
                 .mapIdleFactor(0.25f)
                 .build();
-        assertTrue(map.capacity() == 4);
+        assertTrue(set.capacity() == 4);
 
-        assertTrue(map.add("k1"));
-        assertTrue(map.add("k2"));
-        assertTrue(map.add("k3"));
+        assertTrue(set.add("k1"));
+        assertTrue(set.add("k2"));
+        assertTrue(set.add("k3"));
 
-        assertTrue(map.capacity() == 8);
-        map.clear();
-        assertTrue(map.capacity() == 4);
+        assertTrue(set.capacity() == 8);
+        set.clear();
+        assertTrue(set.capacity() == 4);
     }
 
     @Test


[pulsar] 16/17: [fix][test]: fix flaky test of ManagedCursorMetricsTest.testManagedCursorMetrics (#9919) (#14720)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 38057e42840f8becb4ae893133469b8cc26ee10f
Author: wuxuanqicn <89...@users.noreply.github.com>
AuthorDate: Thu Apr 28 11:57:39 2022 +0800

    [fix][test]: fix flaky test of ManagedCursorMetricsTest.testManagedCursorMetrics (#9919) (#14720)
    
    Fixes #9919
    
    ### Motivation
    
    we need make sure broker executed all ack command and updated metrics, then we can generate and check metric
    
    ### Modifications
    
    - enable AckReceipt
    - await until ack procedure complete(ACK and ACK_RESPONSE command)
    
    Co-authored-by: xuanqi.wu <xu...@weimob.com>
    (cherry picked from commit be7057a1fd878111e85ec112bac8f0b72350e744)
---
 .../broker/stats/ManagedCursorMetricsTest.java     | 26 +++++++++++++++++-----
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  1 +
 2 files changed, 22 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
index 5e20c09fed1..4648ae2fb8f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
@@ -18,22 +18,27 @@
  */
 package org.apache.pulsar.broker.stats;
 
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.PulsarTestClient;
 import org.apache.pulsar.common.stats.Metrics;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
 @Test(groups = "broker")
 public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
 
@@ -49,6 +54,11 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
         super.internalCleanup();
     }
 
+    @Override
+    protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) throws PulsarClientException {
+        return PulsarTestClient.create(clientBuilder);
+    }
+
     @Test
     public void testManagedCursorMetrics() throws Exception {
         final String subName = "my-sub";
@@ -63,14 +73,18 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
         metricsList = metrics.generate();
         Assert.assertTrue(metricsList.isEmpty());
 
-        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+        PulsarTestClient pulsarClient = (PulsarTestClient) this.pulsarClient;
+        @Cleanup
+        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) this.pulsarClient.newConsumer()
                 .topic(topicName)
                 .subscriptionType(SubscriptionType.Shared)
                 .ackTimeout(1, TimeUnit.SECONDS)
                 .subscriptionName(subName)
+                .isAckReceiptEnabled(true)
                 .subscribe();
 
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        @Cleanup
+        Producer<byte[]> producer = this.pulsarClient.newProducer()
                 .topic(topicName)
                 .create();
 
@@ -83,6 +97,8 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
             producer.send(message.getBytes());
             consumer.acknowledge(consumer.receive().getMessageId());
         }
+
+        Awaitility.await().until(() -> pulsarClient.getConnection(topicName).get().getPendingRequests().size() == 0);
         metricsList = metrics.generate();
         Assert.assertFalse(metricsList.isEmpty());
         Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 0L);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 00b8e4f7561..a8d1cf51c71 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -109,6 +109,7 @@ public class ClientCnx extends PulsarHandler {
     protected final Authentication authentication;
     private State state;
 
+    @Getter
     private final ConcurrentLongHashMap<TimedCompletableFuture<? extends Object>> pendingRequests =
             ConcurrentLongHashMap.<TimedCompletableFuture<? extends Object>>newBuilder()
                     .expectedItems(16)


[pulsar] 09/17: [Functions] Check executor null when closing the FileSource (#15247)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 76e1f740e8658fc841bc2a6b4569981c029c4592
Author: Neng Lu <nl...@streamnative.io>
AuthorDate: Wed Apr 20 23:23:44 2022 -0700

    [Functions] Check executor null when closing the FileSource (#15247)
    
    (cherry picked from commit 06ba587fb92eff81785f8d463c85aaa1095292e9)
---
 .../src/main/java/org/apache/pulsar/io/file/FileSource.java  | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)

diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSource.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSource.java
index bc09c978dd6..3a51736cc2a 100644
--- a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSource.java
+++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSource.java
@@ -58,13 +58,15 @@ public class FileSource extends PushSource<byte[]> {
 
     @Override
     public void close() throws Exception {
-        executor.shutdown();
-        try {
-            if (!executor.awaitTermination(800, TimeUnit.MILLISECONDS)) {
+        if (executor != null) {
+            executor.shutdown();
+            try {
+                if (!executor.awaitTermination(800, TimeUnit.MILLISECONDS)) {
+                    executor.shutdownNow();
+                }
+            } catch (InterruptedException e) {
                 executor.shutdownNow();
             }
-        } catch (InterruptedException e) {
-            executor.shutdownNow();
         }
     }
 }


[pulsar] 12/17: [fix] [broker] Fix problem at RateLimiter#tryAcquire (#15306)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ce7af7779268371dd4fdbd9635e449377350feb0
Author: Yan Zhao <ho...@apache.org>
AuthorDate: Mon Apr 25 22:17:52 2022 +0800

    [fix] [broker] Fix problem at RateLimiter#tryAcquire (#15306)
    
    (cherry picked from commit 84b65598481fd9bbb6e06e2deb335222a04b9c6b)
---
 .../org/apache/pulsar/common/util/RateLimiter.java   |  3 +--
 .../apache/pulsar/common/util/RateLimiterTest.java   | 20 +++++++++++++++++++-
 2 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
index 20ca181c400..8f02bcc0e5c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
@@ -189,8 +189,7 @@ public class RateLimiter implements AutoCloseable{
             canAcquire = acquirePermit < 0 || acquiredPermits < this.permits;
         } else {
             // acquired-permits can't be larger than the rate
-            if (acquirePermit > this.permits) {
-                acquiredPermits = this.permits;
+            if (acquirePermit + acquiredPermits > this.permits) {
                 return false;
             }
 
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java
index 788ab749390..57090fcc7b7 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java
@@ -133,6 +133,24 @@ public class RateLimiterTest {
         rate.close();
     }
 
+    @Test
+    public void testTryAcquireMoreThanPermits() {
+        final long rateTimeMSec = 1000;
+        RateLimiter rate = RateLimiter.builder().permits(3).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS)
+                .build();
+        assertTrue(rate.tryAcquire(2));
+        assertEquals(rate.getAvailablePermits(), 1);
+
+        //try to acquire failed, not decrease availablePermits.
+        assertFalse(rate.tryAcquire(2));
+        assertEquals(rate.getAvailablePermits(), 1);
+
+        assertTrue(rate.tryAcquire(1));
+        assertEquals(rate.getAvailablePermits(), 0);
+
+        rate.close();
+    }
+
     @Test
     public void testMultipleTryAcquire() {
         final long rateTimeMSec = 1000;
@@ -189,7 +207,7 @@ public class RateLimiterTest {
 
         Thread.sleep(rateTimeMSec);
         // check after three rate-time: acquiredPermits is 0
-        assertEquals(rate.getAvailablePermits() > 0, true);
+        assertTrue(rate.getAvailablePermits() > 0);
 
         rate.close();
     }


[pulsar] 02/17: Optimize memory usage: support to shrink for pendingAcks map (#14515)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a052f938130bb3fa228887fca2a2b505c4266457
Author: lin chen <15...@qq.com>
AuthorDate: Sun Mar 6 04:58:14 2022 +0800

    Optimize memory usage: support to  shrink for pendingAcks map (#14515)
    
    (cherry picked from commit e747b8f16b0b660231ff27a8c2100d67ad7c79a6)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |   8 +
 .../org/apache/pulsar/broker/service/Consumer.java |  11 +-
 .../collections/ConcurrentLongLongPairHashMap.java | 673 +++++++++++++++++++++
 .../ConcurrentLongLongPairHashMapTest.java         | 427 +++++++++++++
 4 files changed, 1116 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 1e7eac5ef9a..dc310d5056f 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -580,6 +580,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private boolean isAllowAutoUpdateSchemaEnabled = true;
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Whether to enable the automatic shrink of pendingAcks map, "
+                    + "the default is false, which means it is not enabled. "
+                    + "When there are a large number of share or key share consumers in the cluster, "
+                    + "it can be enabled to reduce the memory consumption caused by pendingAcks.")
+    private boolean autoShrinkForConsumerPendingAcksMap = false;
+
     @FieldContext(
         category = CATEGORY_SERVER,
         dynamic = true,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index b8358d06f3a..f7ed211fb2c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -37,8 +37,6 @@ import java.util.stream.Collectors;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
-import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -59,6 +57,8 @@ import org.apache.pulsar.common.stats.Rate;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap.LongPair;
 import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -177,7 +177,12 @@ public class Consumer {
         stats.metadata = this.metadata;
 
         if (Subscription.isIndividualAckMode(subType)) {
-            this.pendingAcks = new ConcurrentLongLongPairHashMap(256, 1);
+            this.pendingAcks = ConcurrentLongLongPairHashMap.newBuilder()
+                    .autoShrink(subscription.getTopic().getBrokerService()
+                            .getPulsar().getConfiguration().isAutoShrinkForConsumerPendingAcksMap())
+                    .expectedItems(256)
+                    .concurrencyLevel(1)
+                    .build();
         } else {
             // We don't need to keep track of pending acks if the subscription is not shared
             this.pendingAcks = null;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongLongPairHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongLongPairHashMap.java
new file mode 100644
index 00000000000..eac7268ba67
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongLongPairHashMap.java
@@ -0,0 +1,673 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.locks.StampedLock;
+
+/**
+ * Concurrent hash map where both keys and values are composed of pairs of longs.
+ *
+ * <p>(long,long) --&gt; (long,long)
+ *
+ * <p>Provides similar methods as a {@code ConcurrentMap<K,V>} but since it's an open hash map with linear probing,
+ * no node allocations are required to store the keys and values, and no boxing is required.
+ *
+ * <p>Keys <strong>MUST</strong> be &gt;= 0.
+ */
+public class ConcurrentLongLongPairHashMap {
+
+    private static final long EmptyKey = -1L;
+    private static final long DeletedKey = -2L;
+
+    private static final long ValueNotFound = -1L;
+
+
+    private static final int DefaultExpectedItems = 256;
+    private static final int DefaultConcurrencyLevel = 16;
+
+    private static final float DefaultMapFillFactor = 0.66f;
+    private static final float DefaultMapIdleFactor = 0.15f;
+
+    private static final float DefaultExpandFactor = 2;
+    private static final float DefaultShrinkFactor = 2;
+
+    private static final boolean DefaultAutoShrink = false;
+
+    private final Section[] sections;
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder of ConcurrentLongLongPairHashMap.
+     */
+    public static class Builder {
+        int expectedItems = DefaultExpectedItems;
+        int concurrencyLevel = DefaultConcurrencyLevel;
+        float mapFillFactor = DefaultMapFillFactor;
+        float mapIdleFactor = DefaultMapIdleFactor;
+        float expandFactor = DefaultExpandFactor;
+        float shrinkFactor = DefaultShrinkFactor;
+        boolean autoShrink = DefaultAutoShrink;
+
+        public Builder expectedItems(int expectedItems) {
+            this.expectedItems = expectedItems;
+            return this;
+        }
+
+        public Builder concurrencyLevel(int concurrencyLevel) {
+            this.concurrencyLevel = concurrencyLevel;
+            return this;
+        }
+
+        public Builder mapFillFactor(float mapFillFactor) {
+            this.mapFillFactor = mapFillFactor;
+            return this;
+        }
+
+        public Builder mapIdleFactor(float mapIdleFactor) {
+            this.mapIdleFactor = mapIdleFactor;
+            return this;
+        }
+
+        public Builder expandFactor(float expandFactor) {
+            this.expandFactor = expandFactor;
+            return this;
+        }
+
+        public Builder shrinkFactor(float shrinkFactor) {
+            this.shrinkFactor = shrinkFactor;
+            return this;
+        }
+
+        public Builder autoShrink(boolean autoShrink) {
+            this.autoShrink = autoShrink;
+            return this;
+        }
+
+        public ConcurrentLongLongPairHashMap build() {
+            return new ConcurrentLongLongPairHashMap(expectedItems, concurrencyLevel,
+                    mapFillFactor, mapIdleFactor, autoShrink, expandFactor, shrinkFactor);
+        }
+    }
+
+    /**
+     * A BiConsumer Long pair.
+     */
+    public interface BiConsumerLongPair {
+        void accept(long key1, long key2, long value1, long value2);
+    }
+
+    /**
+     * A Long pair function.
+     */
+    public interface LongLongPairFunction {
+        long apply(long key1, long key2);
+    }
+
+    /**
+     * A Long pair predicate.
+     */
+    public interface LongLongPairPredicate {
+        boolean test(long key1, long key2, long value1, long value2);
+    }
+
+    private ConcurrentLongLongPairHashMap(int expectedItems, int concurrencyLevel,
+                                          float mapFillFactor, float mapIdleFactor,
+                                         boolean autoShrink, float expandFactor, float shrinkFactor) {
+        checkArgument(expectedItems > 0);
+        checkArgument(concurrencyLevel > 0);
+        checkArgument(expectedItems >= concurrencyLevel);
+        checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+        checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+        checkArgument(mapFillFactor > mapIdleFactor);
+        checkArgument(expandFactor > 1);
+        checkArgument(shrinkFactor > 1);
+
+        int numSections = concurrencyLevel;
+        int perSectionExpectedItems = expectedItems / numSections;
+        int perSectionCapacity = (int) (perSectionExpectedItems / mapFillFactor);
+        this.sections = new Section[numSections];
+
+        for (int i = 0; i < numSections; i++) {
+            sections[i] = new Section(perSectionCapacity, mapFillFactor, mapIdleFactor,
+                    autoShrink, expandFactor, shrinkFactor);
+        }
+    }
+
+    public long size() {
+        long size = 0;
+        for (Section s : sections) {
+            size += s.size;
+        }
+        return size;
+    }
+
+    public long capacity() {
+        long capacity = 0;
+        for (Section s : sections) {
+            capacity += s.capacity;
+        }
+        return capacity;
+    }
+
+    public boolean isEmpty() {
+        for (Section s : sections) {
+            if (s.size != 0) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    long getUsedBucketCount() {
+        long usedBucketCount = 0;
+        for (Section s : sections) {
+            usedBucketCount += s.usedBuckets;
+        }
+        return usedBucketCount;
+    }
+
+    /**
+     * @param key1
+     * @param key2
+     * @return the value or -1 if the key was not present.
+     */
+    public LongPair get(long key1, long key2) {
+        checkBiggerEqualZero(key1);
+        long h = hash(key1, key2);
+        return getSection(h).get(key1, key2, (int) h);
+    }
+
+    public boolean containsKey(long key1, long key2) {
+        return get(key1, key2) != null;
+    }
+
+    public boolean put(long key1, long key2, long value1, long value2) {
+        checkBiggerEqualZero(key1);
+        checkBiggerEqualZero(value1);
+        long h = hash(key1, key2);
+        return getSection(h).put(key1, key2, value1, value2, (int) h, false);
+    }
+
+    public boolean putIfAbsent(long key1, long key2, long value1, long value2) {
+        checkBiggerEqualZero(key1);
+        checkBiggerEqualZero(value1);
+        long h = hash(key1, key2);
+        return getSection(h).put(key1, key2, value1, value2, (int) h, true);
+    }
+
+    /**
+     * Remove an existing entry if found.
+     *
+     * @param key1
+     * @param key2
+     * @return the value associated with the key or -1 if key was not present.
+     */
+    public boolean remove(long key1, long key2) {
+        checkBiggerEqualZero(key1);
+        long h = hash(key1, key2);
+        return getSection(h).remove(key1, key2, ValueNotFound, ValueNotFound, (int) h);
+    }
+
+    public boolean remove(long key1, long key2, long value1, long value2) {
+        checkBiggerEqualZero(key1);
+        checkBiggerEqualZero(value1);
+        long h = hash(key1, key2);
+        return getSection(h).remove(key1, key2, value1, value2, (int) h);
+    }
+
+    private Section getSection(long hash) {
+        // Use 32 msb out of long to get the section
+        final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
+        return sections[sectionIdx];
+    }
+
+    public void clear() {
+        for (Section s : sections) {
+            s.clear();
+        }
+    }
+
+    public void forEach(BiConsumerLongPair processor) {
+        for (Section s : sections) {
+            s.forEach(processor);
+        }
+    }
+
+    /**
+     * @return a new list of all keys (makes a copy).
+     */
+    public List<LongPair> keys() {
+        List<LongPair> keys = Lists.newArrayListWithExpectedSize((int) size());
+        forEach((key1, key2, value1, value2) -> keys.add(new LongPair(key1, key2)));
+        return keys;
+    }
+
+    public List<LongPair> values() {
+        List<LongPair> values = Lists.newArrayListWithExpectedSize((int) size());
+        forEach((key1, key2, value1, value2) -> values.add(new LongPair(value1, value2)));
+        return values;
+    }
+
+    public Map<LongPair, LongPair> asMap() {
+        Map<LongPair, LongPair> map = Maps.newHashMapWithExpectedSize((int) size());
+        forEach((key1, key2, value1, value2) -> map.put(new LongPair(key1, key2), new LongPair(value1, value2)));
+        return map;
+    }
+
+    // A section is a portion of the hash map that is covered by a single
+    @SuppressWarnings("serial")
+    private static final class Section extends StampedLock {
+        // Keys and values are stored interleaved in the table array
+        private volatile long[] table;
+
+        private volatile int capacity;
+        private final int initCapacity;
+        private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER =
+                AtomicIntegerFieldUpdater.newUpdater(Section.class, "size");
+
+        private volatile int size;
+        private int usedBuckets;
+        private int resizeThresholdUp;
+        private int resizeThresholdBelow;
+        private final float mapFillFactor;
+        private final float mapIdleFactor;
+        private final float expandFactor;
+        private final float shrinkFactor;
+        private final boolean autoShrink;
+
+        Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink,
+                float expandFactor, float shrinkFactor) {
+            this.capacity = alignToPowerOfTwo(capacity);
+            this.initCapacity = this.capacity;
+            this.table = new long[4 * this.capacity];
+            this.size = 0;
+            this.usedBuckets = 0;
+            this.autoShrink = autoShrink;
+            this.mapFillFactor = mapFillFactor;
+            this.mapIdleFactor = mapIdleFactor;
+            this.expandFactor = expandFactor;
+            this.shrinkFactor = shrinkFactor;
+            this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+            this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
+            Arrays.fill(table, EmptyKey);
+        }
+
+        LongPair get(long key1, long key2, int keyHash) {
+            long stamp = tryOptimisticRead();
+            boolean acquiredLock = false;
+            int bucket = signSafeMod(keyHash, capacity);
+
+            try {
+                while (true) {
+                    // First try optimistic locking
+                    long storedKey1 = table[bucket];
+                    long storedKey2 = table[bucket + 1];
+                    long storedValue1 = table[bucket + 2];
+                    long storedValue2 = table[bucket + 3];
+
+                    if (!acquiredLock && validate(stamp)) {
+                        // The values we have read are consistent
+                        if (key1 == storedKey1 && key2 == storedKey2) {
+                            return new LongPair(storedValue1, storedValue2);
+                        } else if (storedKey1 == EmptyKey) {
+                            // Not found
+                            return null;
+                        }
+                    } else {
+                        // Fallback to acquiring read lock
+                        if (!acquiredLock) {
+                            stamp = readLock();
+                            acquiredLock = true;
+
+                            bucket = signSafeMod(keyHash, capacity);
+                            storedKey1 = table[bucket];
+                            storedKey2 = table[bucket + 1];
+                            storedValue1 = table[bucket + 2];
+                            storedValue2 = table[bucket + 3];
+                        }
+
+                        if (key1 == storedKey1 && key2 == storedKey2) {
+                            return new LongPair(storedValue1, storedValue2);
+                        } else if (storedKey1 == EmptyKey) {
+                            // Not found
+                            return null;
+                        }
+                    }
+
+                    bucket = (bucket + 4) & (table.length - 1);
+                }
+            } finally {
+                if (acquiredLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        boolean put(long key1, long key2, long value1, long value2, int keyHash, boolean onlyIfAbsent) {
+            long stamp = writeLock();
+            int bucket = signSafeMod(keyHash, capacity);
+
+            // Remember where we find the first available spot
+            int firstDeletedKey = -1;
+
+            try {
+                while (true) {
+                    long storedKey1 = table[bucket];
+                    long storedKey2 = table[bucket + 1];
+
+                    if (key1 == storedKey1 && key2 == storedKey2) {
+                        if (!onlyIfAbsent) {
+                            // Over written an old value for same key
+                            table[bucket + 2] = value1;
+                            table[bucket + 3] = value2;
+                            return true;
+                        } else {
+                            return false;
+                        }
+                    } else if (storedKey1 == EmptyKey) {
+                        // Found an empty bucket. This means the key is not in the map. If we've already seen a deleted
+                        // key, we should write at that position
+                        if (firstDeletedKey != -1) {
+                            bucket = firstDeletedKey;
+                        } else {
+                            ++usedBuckets;
+                        }
+
+                        table[bucket] = key1;
+                        table[bucket + 1] = key2;
+                        table[bucket + 2] = value1;
+                        table[bucket + 3] = value2;
+                        SIZE_UPDATER.incrementAndGet(this);
+                        return true;
+                    } else if (storedKey1 == DeletedKey) {
+                        // The bucket contained a different deleted key
+                        if (firstDeletedKey == -1) {
+                            firstDeletedKey = bucket;
+                        }
+                    }
+
+                    bucket = (bucket + 4) & (table.length - 1);
+                }
+            } finally {
+                if (usedBuckets > resizeThresholdUp) {
+                    try {
+                        // Expand the hashmap
+                        int newCapacity = alignToPowerOfTwo((int) (capacity * expandFactor));
+                        rehash(newCapacity);
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
+            }
+        }
+
+        private boolean remove(long key1, long key2, long value1, long value2, int keyHash) {
+            long stamp = writeLock();
+            int bucket = signSafeMod(keyHash, capacity);
+
+            try {
+                while (true) {
+                    long storedKey1 = table[bucket];
+                    long storedKey2 = table[bucket + 1];
+                    long storedValue1 = table[bucket + 2];
+                    long storedValue2 = table[bucket + 3];
+                    if (key1 == storedKey1 && key2 == storedKey2) {
+                        if (value1 == ValueNotFound || (value1 == storedValue1 && value2 == storedValue2)) {
+                            SIZE_UPDATER.decrementAndGet(this);
+
+                            cleanBucket(bucket);
+                            return true;
+                        } else {
+                            return false;
+                        }
+                    } else if (storedKey1 == EmptyKey) {
+                        // Key wasn't found
+                        return false;
+                    }
+
+                    bucket = (bucket + 4) & (table.length - 1);
+                }
+
+            } finally {
+                if (autoShrink && size < resizeThresholdBelow) {
+                    try {
+                        int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
+                        int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
+                        if (newCapacity < capacity && newResizeThresholdUp > size) {
+                            // shrink the hashmap
+                            rehash(newCapacity);
+                        }
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
+            }
+        }
+
+        private void cleanBucket(int bucket) {
+            int nextInArray = (bucket + 4) & (table.length - 1);
+            if (table[nextInArray] == EmptyKey) {
+                table[bucket] = EmptyKey;
+                table[bucket + 1] = EmptyKey;
+                table[bucket + 2] = ValueNotFound;
+                table[bucket + 3] = ValueNotFound;
+                --usedBuckets;
+
+                // Cleanup all the buckets that were in `DeletedKey` state, so that we can reduce unnecessary expansions
+                bucket = (bucket - 4) & (table.length - 1);
+                while (table[bucket] == DeletedKey) {
+                    table[bucket] = EmptyKey;
+                    table[bucket + 1] = EmptyKey;
+                    table[bucket + 2] = ValueNotFound;
+                    table[bucket + 3] = ValueNotFound;
+                    --usedBuckets;
+
+                    bucket = (bucket - 4) & (table.length - 1);
+                }
+            } else {
+                table[bucket] = DeletedKey;
+                table[bucket + 1] = DeletedKey;
+                table[bucket + 2] = ValueNotFound;
+                table[bucket + 3] = ValueNotFound;
+            }
+        }
+
+        void clear() {
+            long stamp = writeLock();
+
+            try {
+                Arrays.fill(table, EmptyKey);
+                this.size = 0;
+                this.usedBuckets = 0;
+                if (autoShrink) {
+                    rehash(initCapacity);
+                }
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        public void forEach(BiConsumerLongPair processor) {
+            long stamp = tryOptimisticRead();
+
+            long[] table = this.table;
+            boolean acquiredReadLock = false;
+
+            try {
+
+                // Validate no rehashing
+                if (!validate(stamp)) {
+                    // Fallback to read lock
+                    stamp = readLock();
+                    acquiredReadLock = true;
+                    table = this.table;
+                }
+
+                // Go through all the buckets for this section
+                for (int bucket = 0; bucket < table.length; bucket += 4) {
+                    long storedKey1 = table[bucket];
+                    long storedKey2 = table[bucket + 1];
+                    long storedValue1 = table[bucket + 2];
+                    long storedValue2 = table[bucket + 3];
+
+                    if (!acquiredReadLock && !validate(stamp)) {
+                        // Fallback to acquiring read lock
+                        stamp = readLock();
+                        acquiredReadLock = true;
+
+                        storedKey1 = table[bucket];
+                        storedKey2 = table[bucket + 1];
+                        storedValue1 = table[bucket + 2];
+                        storedValue2 = table[bucket + 3];
+                    }
+
+                    if (storedKey1 != DeletedKey && storedKey1 != EmptyKey) {
+                        processor.accept(storedKey1, storedKey2, storedValue1, storedValue2);
+                    }
+                }
+            } finally {
+                if (acquiredReadLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        private void rehash(int newCapacity) {
+            long[] newTable = new long[4 * newCapacity];
+            Arrays.fill(newTable, EmptyKey);
+
+            // Re-hash table
+            for (int i = 0; i < table.length; i += 4) {
+                long storedKey1 = table[i];
+                long storedKey2 = table[i + 1];
+                long storedValue1 = table[i + 2];
+                long storedValue2 = table[i + 3];
+                if (storedKey1 != EmptyKey && storedKey1 != DeletedKey) {
+                    insertKeyValueNoLock(newTable, newCapacity, storedKey1, storedKey2, storedValue1, storedValue2);
+                }
+            }
+
+            table = newTable;
+            usedBuckets = size;
+            // Capacity needs to be updated after the values, so that we won't see
+            // a capacity value bigger than the actual array size
+            capacity = newCapacity;
+            resizeThresholdUp = (int) (capacity * mapFillFactor);
+            resizeThresholdBelow = (int) (capacity * mapIdleFactor);
+        }
+
+        private static void insertKeyValueNoLock(long[] table, int capacity, long key1, long key2, long value1,
+                long value2) {
+            int bucket = signSafeMod(hash(key1, key2), capacity);
+
+            while (true) {
+                long storedKey1 = table[bucket];
+
+                if (storedKey1 == EmptyKey) {
+                    // The bucket is empty, so we can use it
+                    table[bucket] = key1;
+                    table[bucket + 1] = key2;
+                    table[bucket + 2] = value1;
+                    table[bucket + 3] = value2;
+                    return;
+                }
+
+                bucket = (bucket + 4) & (table.length - 1);
+            }
+        }
+    }
+
+    private static final long HashMixer = 0xc6a4a7935bd1e995L;
+    private static final int R = 47;
+
+    static final long hash(long key1, long key2) {
+        long hash = key1 * HashMixer;
+        hash ^= hash >>> R;
+        hash *= HashMixer;
+        hash += 31 + (key2 * HashMixer);
+        hash ^= hash >>> R;
+        hash *= HashMixer;
+        return hash;
+    }
+
+    static final int signSafeMod(long n, int max) {
+        return (int) (n & (max - 1)) << 2;
+    }
+
+    private static int alignToPowerOfTwo(int n) {
+        return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
+    }
+
+    private static void checkBiggerEqualZero(long n) {
+        if (n < 0L) {
+            throw new IllegalArgumentException("Keys and values must be >= 0");
+        }
+    }
+
+    /**
+     * A pair of long values.
+     */
+    public static class LongPair implements Comparable<LongPair> {
+        public final long first;
+        public final long second;
+
+        public LongPair(long first, long second) {
+            this.first = first;
+            this.second = second;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj instanceof LongPair) {
+                LongPair other = (LongPair) obj;
+                return first == other.first && second == other.second;
+            }
+            return false;
+        }
+
+        @Override
+        public int hashCode() {
+            return (int) hash(first, second);
+        }
+
+        @Override
+        public int compareTo(LongPair o) {
+            if (first != o.first) {
+                return Long.compare(first, o.first);
+            } else {
+                return Long.compare(second, o.second);
+            }
+        }
+    }
+}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongLongPairHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongLongPairHashMapTest.java
new file mode 100644
index 00000000000..98a96804d25
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongLongPairHashMapTest.java
@@ -0,0 +1,427 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.junit.Test;
+
+/**
+ * Test the concurrent long-long pair hashmap class.
+ */
+public class ConcurrentLongLongPairHashMapTest {
+
+    @Test
+    public void testConstructor() {
+        try {
+             ConcurrentLongLongPairHashMap.newBuilder()
+                    .expectedItems(0)
+                    .build();
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            ConcurrentLongLongPairHashMap.newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(0)
+                    .build();
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            ConcurrentLongLongPairHashMap.newBuilder()
+                    .expectedItems(4)
+                    .concurrencyLevel(8)
+                    .build();
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void simpleInsertions() {
+        ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap.newBuilder()
+                .expectedItems(16)
+                .build();
+        assertTrue(map.isEmpty());
+        assertTrue(map.put(1, 1, 11, 11));
+        assertFalse(map.isEmpty());
+
+        assertTrue(map.put(2, 2, 22, 22));
+        assertTrue(map.put(3, 3, 33, 33));
+
+        assertEquals(map.size(), 3);
+
+        assertEquals(map.get(1, 1), new LongPair(11, 11));
+        assertEquals(map.size(), 3);
+
+        assertTrue(map.remove(1, 1));
+        assertEquals(map.size(), 2);
+        assertEquals(map.get(1, 1), null);
+        assertEquals(map.get(5, 5), null);
+        assertEquals(map.size(), 2);
+
+        assertTrue(map.put(1, 1, 11, 11));
+        assertEquals(map.size(), 3);
+        assertTrue(map.put(1, 1, 111, 111));
+        assertEquals(map.size(), 3);
+    }
+
+    @Test
+    public void testRemove() {
+        ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap
+                .newBuilder()
+                .build();
+
+        assertTrue(map.isEmpty());
+        assertTrue(map.put(1, 1, 11, 11));
+        assertFalse(map.isEmpty());
+
+        assertFalse(map.remove(0, 0));
+        assertFalse(map.remove(1, 1, 111, 111));
+
+        assertFalse(map.isEmpty());
+        assertTrue(map.remove(1, 1, 11, 11));
+        assertTrue(map.isEmpty());
+    }
+
+    @Test
+    public void testClear() {
+        ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap.newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertTrue(map.put(1, 1, 11, 11));
+        assertTrue(map.put(2, 2, 22, 22));
+        assertTrue(map.put(3, 3, 33, 33));
+
+        assertTrue(map.capacity() == 8);
+        map.clear();
+        assertTrue(map.capacity() == 4);
+    }
+
+    @Test
+    public void testExpandAndShrink() {
+        ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap.newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.put(1, 1, 11, 11));
+        assertTrue(map.put(2, 2, 22, 22));
+        assertTrue(map.put(3, 3, 33, 33));
+
+        // expand hashmap
+        assertTrue(map.capacity() == 8);
+
+        assertTrue(map.remove(1, 1, 11, 11));
+        // not shrink
+        assertTrue(map.capacity() == 8);
+        assertTrue(map.remove(2, 2, 22, 22));
+        // shrink hashmap
+        assertTrue(map.capacity() == 4);
+
+        // expand hashmap
+        assertTrue(map.put(4, 4, 44, 44));
+        assertTrue(map.put(5, 5, 55, 55));
+        assertTrue(map.capacity() == 8);
+
+        //verify that the map does not keep shrinking at every remove() operation
+        assertTrue(map.put(6, 6, 66, 66));
+        assertTrue(map.remove(6, 6, 66, 66));
+        assertTrue(map.capacity() == 8);
+    }
+
+    @Test
+    public void testNegativeUsedBucketCount() {
+        ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap.newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
+
+        map.put(0, 0, 0, 0);
+        assertEquals(1, map.getUsedBucketCount());
+        map.put(0, 0, 1, 1);
+        assertEquals(1, map.getUsedBucketCount());
+        map.remove(0, 0);
+        assertEquals(0, map.getUsedBucketCount());
+        map.remove(0, 0);
+        assertEquals(0, map.getUsedBucketCount());
+    }
+
+    @Test
+    public void testRehashing() {
+        int n = 16;
+        ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap.newBuilder()
+                .expectedItems(n / 2)
+                .concurrencyLevel(1)
+                .build();
+        assertEquals(map.capacity(), n);
+        assertEquals(map.size(), 0);
+
+        for (int i = 0; i < n; i++) {
+            map.put(i, i, i, i);
+        }
+
+        assertEquals(map.capacity(), 2 * n);
+        assertEquals(map.size(), n);
+    }
+
+    @Test
+    public void testRehashingWithDeletes() {
+        int n = 16;
+        ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap.newBuilder()
+                .expectedItems(n / 2)
+                .concurrencyLevel(1)
+                .build();
+        assertEquals(map.capacity(), n);
+        assertEquals(map.size(), 0);
+
+        for (int i = 0; i < n / 2; i++) {
+            map.put(i, i, i, i);
+        }
+
+        for (int i = 0; i < n / 2; i++) {
+            map.remove(i, i);
+        }
+
+        for (int i = n; i < (2 * n); i++) {
+            map.put(i, i, i, i);
+        }
+
+        assertEquals(map.capacity(), 2 * n);
+        assertEquals(map.size(), n);
+    }
+
+    @Test
+    public void concurrentInsertions() throws Throwable {
+        ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap.newBuilder()
+                .build();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int nThreads = 16;
+        final int n = 100_000;
+        long value = 55;
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < nThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random();
+
+                for (int j = 0; j < n; j++) {
+                    long key1 = Math.abs(random.nextLong());
+                    // Ensure keys are uniques
+                    key1 -= key1 % (threadIdx + 1);
+
+                    long key2 = Math.abs(random.nextLong());
+                    // Ensure keys are uniques
+                    key2 -= key2 % (threadIdx + 1);
+
+                    map.put(key1, key2, value, value);
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(map.size(), n * nThreads);
+
+        executor.shutdown();
+    }
+
+    @Test
+    public void concurrentInsertionsAndReads() throws Throwable {
+        ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap.newBuilder()
+                .build();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int nThreads = 16;
+        final int n = 100_000;
+        final long value = 55;
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < nThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random();
+
+                for (int j = 0; j < n; j++) {
+                    long key1 = Math.abs(random.nextLong());
+                    // Ensure keys are uniques
+                    key1 -= key1 % (threadIdx + 1);
+
+                    long key2 = Math.abs(random.nextLong());
+                    // Ensure keys are uniques
+                    key2 -= key2 % (threadIdx + 1);
+
+                    map.put(key1, key2, value, value);
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(map.size(), n * nThreads);
+
+        executor.shutdown();
+    }
+
+    @Test
+    public void testIteration() {
+        ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap.newBuilder()
+                .build();
+
+        assertEquals(map.keys(), Collections.emptyList());
+        assertEquals(map.values(), Collections.emptyList());
+
+        map.put(0, 0, 0, 0);
+
+        assertEquals(map.keys(), Lists.newArrayList(new LongPair(0, 0)));
+        assertEquals(map.values(), Lists.newArrayList(new LongPair(0, 0)));
+
+        map.remove(0, 0);
+
+        assertEquals(map.keys(), Collections.emptyList());
+        assertEquals(map.values(), Collections.emptyList());
+
+        map.put(0, 0, 0, 0);
+        map.put(1, 1, 11, 11);
+        map.put(2, 2, 22, 22);
+
+        List<LongPair> keys = map.keys();
+        Collections.sort(keys);
+        assertEquals(keys, Lists.newArrayList(new LongPair(0, 0), new LongPair(1, 1), new LongPair(2, 2)));
+
+        List<LongPair> values = map.values();
+        Collections.sort(values);
+        assertEquals(values, Lists.newArrayList(new LongPair(0, 0), new LongPair(11, 11), new LongPair(22, 22)));
+
+        map.put(1, 1, 111, 111);
+
+        keys = map.keys();
+        Collections.sort(keys);
+        assertEquals(keys, Lists.newArrayList(new LongPair(0, 0), new LongPair(1, 1), new LongPair(2, 2)));
+
+        values = map.values();
+        Collections.sort(values);
+        assertEquals(values, Lists.newArrayList(new LongPair(0, 0), new LongPair(22, 22), new LongPair(111, 111)));
+
+        map.clear();
+        assertTrue(map.isEmpty());
+    }
+
+    @Test
+    public void testPutIfAbsent() {
+        ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap.newBuilder()
+                .build();
+
+        assertTrue(map.putIfAbsent(1, 1, 11, 11));
+        assertEquals(map.get(1, 1), new LongPair(11, 11));
+
+        assertFalse(map.putIfAbsent(1, 1, 111, 111));
+        assertEquals(map.get(1, 1), new LongPair(11, 11));
+    }
+
+    @Test
+    public void testIvalidKeys() {
+        ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap.newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
+
+
+        try {
+            map.put(-5, 3, 4, 4);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            map.get(-1, 0);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            map.containsKey(-1, 0);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            map.putIfAbsent(-1, 1, 1, 1);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void testAsMap() {
+        ConcurrentLongLongPairHashMap lmap = ConcurrentLongLongPairHashMap.newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
+        lmap.put(1, 1, 11, 11);
+        lmap.put(2, 2, 22, 22);
+        lmap.put(3, 3, 33, 33);
+
+        Map<LongPair, LongPair> map = new HashMap<>();
+        map.put(new LongPair(1, 1), new LongPair(11, 11));
+        map.put(new LongPair(2, 2), new LongPair(22, 22));
+        map.put(new LongPair(3, 3), new LongPair(33, 33));
+
+        assertEquals(map, lmap.asMap());
+    }
+}


[pulsar] 15/17: [improve][broker] Use shrink map for message redelivery. (#15342)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b0eff93b8fdbee40caf269a430acbef617ed31fd
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Apr 28 11:06:15 2022 +0800

    [improve][broker] Use shrink map for message redelivery. (#15342)
    
    (cherry picked from commit 615f05af3e7c72d83b3fe24f64566ed58244ea5d)
---
 .../broker/service/persistent/MessageRedeliveryController.java   | 9 ++++++---
 .../service/persistent/MessageRedeliveryControllerTest.java      | 2 +-
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
index be143565c48..c7f96fffcef 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
@@ -26,8 +26,8 @@ import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
-import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap.LongPair;
 import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
 import org.apache.pulsar.common.util.collections.LongPairSet;
 
@@ -37,7 +37,10 @@ public class MessageRedeliveryController {
 
     public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
         this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
-        this.hashesToBeBlocked = allowOutOfOrderDelivery ? null : new ConcurrentLongLongPairHashMap(128, 2);
+        this.hashesToBeBlocked = allowOutOfOrderDelivery
+                ? null
+                : ConcurrentLongLongPairHashMap
+                    .newBuilder().concurrencyLevel(2).expectedItems(128).autoShrink(true).build();
     }
 
     public boolean add(long ledgerId, long entryId) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
index 9a785f6f95f..478677a25e4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
@@ -30,7 +30,7 @@ import java.lang.reflect.Field;
 import java.util.Set;
 import java.util.TreeSet;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
 import org.apache.pulsar.common.util.collections.LongPairSet;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;


[pulsar] 10/17: [Fix][Broker] Fix race condition in `OpAddEntry` (#15233)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f95b98f1edba68ab22b121f77f7af96183f40e72
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Thu Apr 21 14:58:01 2022 +0800

    [Fix][Broker] Fix race condition in `OpAddEntry` (#15233)
    
    (cherry picked from commit b083e9a72227a3360d1ec33b5f239d82f0804e65)
---
 .../java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java     | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 62edcd5ca9d..d03d98a3f06 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3743,12 +3743,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         }
         OpAddEntry opAddEntry = pendingAddEntries.peek();
         if (opAddEntry != null) {
+            final long finalAddOpCount = opAddEntry.addOpCount;
             boolean isTimedOut = opAddEntry.lastInitTime != -1
                     && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - opAddEntry.lastInitTime) >= timeoutSec;
             if (isTimedOut) {
                 log.error("Failed to add entry for ledger {} in time-out {} sec",
                         (opAddEntry.ledger != null ? opAddEntry.ledger.getId() : -1), timeoutSec);
-                opAddEntry.handleAddTimeoutFailure(opAddEntry.ledger, opAddEntry.addOpCount);
+                opAddEntry.handleAddTimeoutFailure(opAddEntry.ledger, finalAddOpCount);
             }
         }
     }


[pulsar] 07/17: Improve skipping of DNS resolution when creating AuthenticationDataHttp instance (#15228)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f91a920bad0d6ab6d335061e4d5e44efed61d552
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Apr 20 11:54:51 2022 +0300

    Improve skipping of DNS resolution when creating AuthenticationDataHttp instance (#15228)
    
    - improves solution added in #15221
      - It's better to use the JDK provided InetSocketAddress.createUnresolved method
        to prevent unnecessary DNS resolution
    
    (cherry picked from commit e71b98ae157c4c108802661eaa72913e9c9e0bef)
---
 .../apache/pulsar/broker/authentication/AuthenticationDataHttp.java  | 5 +----
 1 file changed, 1 insertion(+), 4 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttp.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttp.java
index 9ffb29c0376..8a8dda2f177 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttp.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttp.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.authentication;
 
-import io.netty.util.NetUtil;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 
@@ -36,9 +35,7 @@ public class AuthenticationDataHttp implements AuthenticationDataSource {
             throw new IllegalArgumentException();
         }
         this.request = request;
-        this.remoteAddress =
-                new InetSocketAddress(NetUtil.createInetAddressFromIpAddressString(request.getRemoteAddr()),
-                        request.getRemotePort());
+        this.remoteAddress = InetSocketAddress.createUnresolved(request.getRemoteAddr(), request.getRemotePort());
     }
 
     /*