You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/10/24 18:29:06 UTC

[pulsar-client-cpp] branch main updated: [refactor] Apply forward declaration as much as possible (#64)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 872f8ab  [refactor] Apply forward declaration as much as possible (#64)
872f8ab is described below

commit 872f8abaade7ecd346d3f59e2f6b3901c65ef7de
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Oct 25 02:29:00 2022 +0800

    [refactor] Apply forward declaration as much as possible (#64)
    
    * [refactor] Apply forward declaration as much as possible
    
    Fixes https://github.com/apache/pulsar-client-cpp/issues/60
    
    ### Motivation
    
    The includes in pulsar-client-cpp is very casual. There are a lot of
    implicit includes and the forward declaration is not used much. For
    example, if `lib/ClientConnection.h` was modified, 27 files would be
    recompiled.
    
    The other problem is the `SortIncludes` attribute in `.clang-format`
    file is false. It might be okay in early days. However, as the project
    grows, including many headers without ordering brings a very bad
    experience. Combining with the very few usages of the forward
    declaration, it's hard to determine whether a header is still required
    after a change.
    
    ### Modifications
    
    Apply forward declarations as much as possible and change the
    `SortIncludes` to true in `.clang-format` file.
    
    It's special for the `PulsarApi.pb.h` file because the size of this
    header is over 1 MiB. For classes we can use forward declaration, but
    for enumerations we have to include this header.
    
    To solve this problem, `ProtoApiEnums.h` is added to define some
    constant integers that can be cast implicitly from the enumerations. If
    we want to use enumerations from `PulsarApi.pb.h`, we can include
    `ProtoApiEnums.h` instead.
    
    Finally, to unify the include rules, in `lib/*.h`, if we want to include
    a header (e.g. `xxx.h`) from the same directory, just use `#include
    "xxx.h"`. Don't use `#include "lib/xxx.h"` or `#include <lib/xxx.h>` in
    headers of `lib/` directory.
    
    ### Improvements
    
    Since forward declaration is applied everywhere now, take
    `lib/ClientConnection.h` for example. After this patch, only 10 files
    needs to be recompiled, while 27 files would be recompiled before.
    
    This patch also reduces the binary size and speeds up the compilation
    time.
    
    Binary size:
    - `libpulsar.a`: 319234696 (305 MiB) -> 286716564 (274 MiB)
    - `libpulsar.so`: 110162496 (106 MiB) -> 102428456 (98 MiB)
    
    Compilation time with the following commands:
    
    ```bash
    cmake -B build -DBUILD_TESTS=OFF
    cmake --build build -j8
    ```
    
    * Fix MSVC build
---
 .clang-format                             |   2 +-
 examples/SampleAsyncProducer.cc           |   6 +-
 examples/SampleConsumer.cc                |   6 +-
 examples/SampleConsumerListener.cc        |   6 +-
 examples/SampleProducer.cc                |   6 +-
 include/pulsar/Authentication.h           |   9 +-
 include/pulsar/BatchReceivePolicy.h       |   1 +
 include/pulsar/BrokerConsumerStats.h      |   8 +-
 include/pulsar/Client.h                   |  13 +--
 include/pulsar/ClientConfiguration.h      |   2 +-
 include/pulsar/Consumer.h                 |   5 +-
 include/pulsar/ConsumerConfiguration.h    |  18 ++--
 include/pulsar/CryptoKeyReader.h          |   4 +-
 include/pulsar/DeprecatedException.h      |   3 +-
 include/pulsar/EncryptionKeyInfo.h        |   5 +-
 include/pulsar/KeySharedPolicy.h          |   1 -
 include/pulsar/Logger.h                   |   3 +-
 include/pulsar/Message.h                  |   6 +-
 include/pulsar/MessageBatch.h             |   6 +-
 include/pulsar/MessageBuilder.h           |   6 +-
 include/pulsar/MessageId.h                |   5 +-
 include/pulsar/MessageRoutingPolicy.h     |   3 +-
 include/pulsar/Producer.h                 |   5 +-
 include/pulsar/ProducerConfiguration.h    |  10 +-
 include/pulsar/ProtobufNativeSchema.h     |   2 +-
 include/pulsar/Reader.h                   |   2 +-
 include/pulsar/ReaderConfiguration.h      |  13 +--
 include/pulsar/Result.h                   |   3 +-
 include/pulsar/Schema.h                   |   4 +-
 include/pulsar/c/client.h                 |   8 +-
 include/pulsar/c/consumer.h               |   3 +-
 include/pulsar/c/consumer_configuration.h |   1 +
 include/pulsar/c/message.h                |   2 +-
 include/pulsar/c/message_id.h             |   2 +-
 include/pulsar/c/message_router.h         |   2 +-
 include/pulsar/c/producer.h               |   5 +-
 include/pulsar/c/producer_configuration.h |   5 +-
 include/pulsar/c/reader.h                 |   4 +-
 include/pulsar/c/reader_configuration.h   |   2 +-
 lib/AckGroupingTracker.cc                 |  22 ++---
 lib/AckGroupingTracker.h                  |  13 +--
 lib/AckGroupingTrackerDisabled.cc         |   8 +-
 lib/AckGroupingTrackerDisabled.h          |   4 +-
 lib/AckGroupingTrackerEnabled.cc          |  11 +--
 lib/AckGroupingTrackerEnabled.h           |  18 +++-
 lib/Authentication.cc                     |  23 ++---
 lib/Backoff.cc                            |   6 +-
 lib/Backoff.h                             |   5 +-
 lib/BatchAcknowledgementTracker.cc        |  15 +--
 lib/BatchAcknowledgementTracker.h         |  18 ++--
 lib/BatchMessageContainer.cc              |   9 +-
 lib/BatchMessageContainerBase.cc          |  32 +++++-
 lib/BatchMessageContainerBase.h           |  35 +------
 lib/BatchMessageKeyBasedContainer.cc      |   8 +-
 lib/BatchReceivePolicy.cc                 |   1 +
 lib/BinaryProtoLookupService.cc           |  10 +-
 lib/BinaryProtoLookupService.h            |  14 +--
 lib/BlockingQueue.h                       |   5 +-
 lib/BoostHash.h                           |   5 +-
 lib/BrokerConsumerStats.cc                |   5 +-
 lib/BrokerConsumerStatsImpl.cc            |   3 +-
 lib/BrokerConsumerStatsImpl.h             |  12 +--
 lib/BrokerConsumerStatsImplBase.h         |   1 -
 lib/Client.cc                             |   7 +-
 lib/ClientConfiguration.cc                |   2 +-
 lib/ClientConnection.cc                   | 121 +++++++++++------------
 lib/ClientConnection.h                    |  54 ++++++-----
 lib/ClientImpl.cc                         |  32 +++---
 lib/ClientImpl.h                          |  30 ++++--
 lib/Commands.cc                           |  81 +++++++++++-----
 lib/Commands.h                            |  28 +++---
 lib/CompressionCodec.cc                   |  37 +------
 lib/CompressionCodec.h                    |   9 +-
 lib/CompressionCodecLZ4.cc                |   3 +-
 lib/CompressionCodecSnappy.cc             |   2 +-
 lib/CompressionCodecZLib.cc               |   4 +-
 lib/CompressionCodecZLib.h                |   2 -
 lib/ConnectionPool.cc                     |   9 +-
 lib/ConnectionPool.h                      |  14 ++-
 lib/ConsoleLoggerFactory.cc               |   3 +-
 lib/ConsoleLoggerFactoryImpl.h            |   3 +-
 lib/Consumer.cc                           |   6 +-
 lib/ConsumerConfiguration.cc              |   5 +-
 lib/ConsumerImpl.cc                       |  76 ++++++++-------
 lib/ConsumerImpl.h                        |  61 ++++++------
 lib/ConsumerImplBase.cc                   |  13 +--
 lib/ConsumerImplBase.h                    |  11 ++-
 lib/CryptoKeyReader.cc                    |   9 +-
 lib/DeprecatedException.cc                |   2 +-
 lib/EncryptionKeyInfoImpl.h               |   5 +-
 lib/ExecutorService.cc                    |   6 +-
 lib/ExecutorService.h                     |  15 +--
 lib/FileLoggerFactory.cc                  |   3 +-
 lib/FileLoggerFactoryImpl.h               |   5 +-
 lib/Future.h                              |   9 +-
 lib/GetLastMessageIdResponse.h            |   1 +
 lib/HTTPLookupService.cc                  |   9 +-
 lib/HTTPLookupService.h                   |  13 ++-
 lib/HandlerBase.cc                        |   7 +-
 lib/HandlerBase.h                         |  20 +++-
 lib/JavaStringHash.cc                     |   1 +
 lib/JavaStringHash.h                      |   3 +-
 lib/KeySharedPolicy.cc                    |   4 +-
 lib/Latch.h                               |   5 +-
 lib/Log4CxxLogger.h                       |   2 +-
 lib/Log4cxxLogger.cc                      |   5 +-
 lib/LogUtils.cc                           |   3 +-
 lib/LogUtils.h                            |  10 +-
 lib/LookupDataResult.h                    |   7 +-
 lib/LookupService.h                       |  18 ++--
 lib/MemoryLimitController.h               |   2 +-
 lib/Message.cc                            |   7 +-
 lib/MessageAndCallbackBatch.cc            |   1 +
 lib/MessageAndCallbackBatch.h             |   5 +-
 lib/MessageBuilder.cc                     |   5 +-
 lib/MessageCrypto.cc                      |  20 ++--
 lib/MessageCrypto.h                       |  33 ++++---
 lib/MessageId.cc                          |  11 +--
 lib/MessageIdImpl.h                       |   1 +
 lib/MessageIdUtil.h                       |   6 --
 lib/MessageImpl.h                         |   3 +-
 lib/MessageRouterBase.cc                  |   2 +-
 lib/MessageRouterBase.h                   |   7 +-
 lib/MessagesImpl.cc                       |   3 +-
 lib/MessagesImpl.h                        |   3 +-
 lib/MultiTopicsBrokerConsumerStatsImpl.cc |   5 +-
 lib/MultiTopicsBrokerConsumerStatsImpl.h  |   9 +-
 lib/MultiTopicsConsumerImpl.cc            |  25 ++++-
 lib/MultiTopicsConsumerImpl.h             |  46 +++++----
 lib/Murmur3_32Hash.h                      |   3 +-
 lib/NamespaceName.cc                      |  10 +-
 lib/NamespaceName.h                       |   3 +-
 lib/NegativeAcksTracker.cc                |   7 +-
 lib/NegativeAcksTracker.h                 |  16 ++-
 lib/ObjectPool.h                          |   3 +-
 lib/OpSendMsg.h                           |   4 +-
 lib/PartitionedProducerImpl.cc            |  14 ++-
 lib/PartitionedProducerImpl.h             |  30 ++++--
 lib/PatternMultiTopicsConsumerImpl.cc     |   5 +
 lib/PatternMultiTopicsConsumerImpl.h      |  16 +--
 lib/PeriodicTask.cc                       |   3 +-
 lib/PeriodicTask.h                        |   4 +-
 lib/Producer.cc                           |   5 +-
 lib/ProducerConfiguration.cc              |   4 +-
 lib/ProducerConfigurationImpl.h           |   1 +
 lib/ProducerImpl.cc                       |  27 ++++--
 lib/ProducerImpl.h                        |  42 ++++----
 lib/ProducerImplBase.h                    |   2 +
 lib/ProtoApiEnums.h                       | 156 ++++++++++++++++++++++++++++++
 lib/ProtobufNativeSchema.cc               |   6 +-
 lib/Reader.cc                             |   2 +-
 lib/ReaderConfiguration.cc                |   2 +-
 lib/ReaderImpl.cc                         |   6 +-
 lib/ReaderImpl.h                          |  23 ++++-
 lib/Result.cc                             |   2 +-
 lib/RetryableLookupService.h              |  13 ++-
 lib/RoundRobinMessageRouter.cc            |   5 +-
 lib/RoundRobinMessageRouter.h             |   8 +-
 lib/Schema.cc                             |   2 +-
 lib/Semaphore.h                           |   2 +-
 lib/ServiceNameResolver.h                 |   2 +
 lib/ServiceURI.cc                         |   1 +
 lib/ServiceURI.h                          |   1 +
 lib/SharedBuffer.h                        |   4 +-
 lib/SimpleLogger.h                        |   6 +-
 lib/SinglePartitionMessageRouter.cc       |   2 +
 lib/SinglePartitionMessageRouter.h        |   6 +-
 lib/SynchronizedHashMap.h                 |   1 +
 lib/TimeUtils.h                           |   6 +-
 lib/TopicMetadataImpl.cc                  |   2 +-
 lib/TopicMetadataImpl.h                   |   1 -
 lib/TopicName.cc                          |  20 ++--
 lib/TopicName.h                           |  13 ++-
 lib/UnAckedMessageTrackerDisabled.h       |   2 +-
 lib/UnAckedMessageTrackerEnabled.cc       |   5 +
 lib/UnAckedMessageTrackerEnabled.h        |  20 +++-
 lib/UnAckedMessageTrackerInterface.h      |  14 +--
 lib/UnboundedBlockingQueue.h              |   4 +-
 lib/Url.cc                                |   1 -
 lib/Url.h                                 |   3 +-
 lib/UtilAllocator.h                       |   1 +
 lib/Utils.h                               |   6 +-
 lib/auth/AuthAthenz.cc                    |  10 +-
 lib/auth/AuthAthenz.h                     |   4 +-
 lib/auth/AuthBasic.cc                     |   7 +-
 lib/auth/AuthBasic.h                      |   2 -
 lib/auth/AuthOauth2.cc                    |   9 +-
 lib/auth/AuthOauth2.h                     |   2 +-
 lib/auth/AuthTls.cc                       |   2 +-
 lib/auth/AuthTls.h                        |   1 -
 lib/auth/AuthToken.cc                     |   6 +-
 lib/auth/AuthToken.h                      |   2 -
 lib/auth/athenz/ZTSClient.cc              |  16 +--
 lib/auth/athenz/ZTSClient.h               |   6 +-
 lib/c/cStringMap.cc                       |   2 +-
 lib/c/c_Authentication.cc                 |   5 +-
 lib/c/c_Message.cc                        |   1 +
 lib/c/c_MessageId.cc                      |   4 +-
 lib/c/c_ProducerConfiguration.cc          |   2 +-
 lib/c/c_Reader.cc                         |   2 +-
 lib/c/c_ReaderConfiguration.cc            |   8 +-
 lib/c/c_Result.cc                         |   2 +-
 lib/c/c_structs.h                         |   4 +-
 lib/checksum/ChecksumProvider.h           |   2 +-
 lib/checksum/crc32c_arm.cc                |   1 +
 lib/checksum/crc32c_sse42.cc              |   3 +-
 lib/checksum/crc32c_sw.cc                 |   1 +
 lib/lz4/lz4.h                             |   2 +-
 lib/stats/ConsumerStatsBase.h             |   6 +-
 lib/stats/ConsumerStatsDisabled.h         |   4 +-
 lib/stats/ConsumerStatsImpl.cc            |  15 ++-
 lib/stats/ConsumerStatsImpl.h             |  25 +++--
 lib/stats/ProducerStatsBase.h             |   1 +
 lib/stats/ProducerStatsDisabled.h         |   2 +-
 lib/stats/ProducerStatsImpl.cc            |   8 +-
 lib/stats/ProducerStatsImpl.h             |  20 ++--
 perf/PerfConsumer.cc                      |  15 ++-
 perf/PerfProducer.cc                      |  19 ++--
 perf/RateLimiter.h                        |   2 +-
 tests/AuthBasicTest.cc                    |   3 +-
 tests/AuthPluginTest.cc                   |  11 ++-
 tests/AuthTokenTest.cc                    |  11 +--
 tests/BackoffTest.cc                      |   6 +-
 tests/BasicEndToEndTest.cc                |  57 ++++++-----
 tests/BatchMessageTest.cc                 |  34 +++----
 tests/BlockingQueueTest.cc                |   5 +-
 tests/ClientDeduplicationTest.cc          |   7 +-
 tests/ClientTest.cc                       |  10 +-
 tests/CompressionCodecSnappyTest.cc       |   2 +-
 tests/ConsumerConfigurationTest.cc        |   3 +-
 tests/ConsumerStatsTest.cc                |  17 ++--
 tests/ConsumerTest.cc                     |  21 ++--
 tests/ConsumerTest.h                      |   3 +-
 tests/CustomLoggerTest.cc                 |   6 +-
 tests/CustomRoutingPolicy.h               |   5 +-
 tests/HashTest.cc                         |  11 ++-
 tests/KeyBasedBatchingTest.cc             |   7 +-
 tests/KeySharedConsumerTest.cc            |  10 +-
 tests/KeySharedPolicyTest.cc              |  12 +--
 tests/LatchTest.cc                        |   6 +-
 tests/LoggerTest.cc                       |   3 +-
 tests/LookupServiceTest.cc                |  22 +++--
 tests/MapCacheTest.cc                     |   3 +-
 tests/MemoryLimitControllerTest.cc        |   7 +-
 tests/MemoryLimitTest.cc                  |  14 +--
 tests/MessageChunkingTest.cc              |   7 +-
 tests/MessageIdTest.cc                    |   8 +-
 tests/MessageTest.cc                      |   8 +-
 tests/MessagesImplTest.cc                 |   5 +-
 tests/NamespaceNameTest.cc                |   4 +-
 tests/PartitionsUpdateTest.cc             |   6 +-
 tests/PeriodicTaskTest.cc                 |   2 +
 tests/ProducerConfigurationTest.cc        |   1 +
 tests/ProducerTest.cc                     |   6 +-
 tests/PromiseTest.cc                      |   4 +-
 tests/ProtobufNativeSchemaTest.cc         |   4 +-
 tests/PulsarFriend.h                      |   8 +-
 tests/ReaderConfigurationTest.cc          |   3 +-
 tests/ReaderTest.cc                       |  16 +--
 tests/RoundRobinMessageRouterTest.cc      |   7 +-
 tests/SemaphoreTest.cc                    |   5 +-
 tests/ServiceURITest.cc                   |   1 +
 tests/ShutdownTest.cc                     |   8 +-
 tests/SinglePartitionMessageRouterTest.cc |  12 +--
 tests/SynchronizedHashMapTest.cc          |   2 +
 tests/TopicMetadataImplTest.cc            |   4 +-
 tests/TopicNameTest.cc                    |   4 +-
 tests/UnboundedBlockingQueueTest.cc       |   5 +-
 tests/UrlTest.cc                          |   3 +-
 tests/VersionTest.cc                      |   2 +-
 tests/ZLibCompressionTest.cc              |   3 +-
 tests/ZTSClientTest.cc                    |   3 +-
 tests/ZeroQueueSizeTest.cc                |   7 +-
 tests/c/c_BasicEndToEndTest.cc            |   6 +-
 tests/c/c_ConsumerConfigurationTest.cc    |   2 +-
 tests/c/c_ProducerConfigurationTest.cc    |   2 +-
 tests/main.cc                             |   1 -
 wireshark/pulsarDissector.cc              |   6 +-
 278 files changed, 1557 insertions(+), 1123 deletions(-)

diff --git a/.clang-format b/.clang-format
index cb40b50..8519646 100644
--- a/.clang-format
+++ b/.clang-format
@@ -19,7 +19,7 @@
 BasedOnStyle: Google
 IndentWidth: 4
 ColumnLimit: 110
-SortIncludes: false
+SortIncludes: true
 BreakBeforeBraces: Custom
 BraceWrapping:
   AfterEnum: true
diff --git a/examples/SampleAsyncProducer.cc b/examples/SampleAsyncProducer.cc
index 9701ccb..b1bd0f7 100644
--- a/examples/SampleAsyncProducer.cc
+++ b/examples/SampleAsyncProducer.cc
@@ -16,12 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#include <pulsar/Client.h>
+
 #include <iostream>
 #include <thread>
 
-#include <pulsar/Client.h>
-
-#include <lib/LogUtils.h>
+#include "lib/LogUtils.h"
 
 DECLARE_LOG_OBJECT()
 
diff --git a/examples/SampleConsumer.cc b/examples/SampleConsumer.cc
index 1dcc550..bbf210d 100644
--- a/examples/SampleConsumer.cc
+++ b/examples/SampleConsumer.cc
@@ -16,11 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <iostream>
-
 #include <pulsar/Client.h>
 
-#include <lib/LogUtils.h>
+#include <iostream>
+
+#include "lib/LogUtils.h"
 
 DECLARE_LOG_OBJECT()
 
diff --git a/examples/SampleConsumerListener.cc b/examples/SampleConsumerListener.cc
index 9ce2291..a3a90cc 100644
--- a/examples/SampleConsumerListener.cc
+++ b/examples/SampleConsumerListener.cc
@@ -16,11 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <iostream>
-
 #include <pulsar/Client.h>
 
-#include <lib/LogUtils.h>
+#include <iostream>
+
+#include "lib/LogUtils.h"
 
 DECLARE_LOG_OBJECT()
 
diff --git a/examples/SampleProducer.cc b/examples/SampleProducer.cc
index ff50487..0a3c693 100644
--- a/examples/SampleProducer.cc
+++ b/examples/SampleProducer.cc
@@ -16,11 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <iostream>
-
 #include <pulsar/Client.h>
 
-#include <lib/LogUtils.h>
+#include <iostream>
+
+#include "lib/LogUtils.h"
 
 DECLARE_LOG_OBJECT()
 
diff --git a/include/pulsar/Authentication.h b/include/pulsar/Authentication.h
index 7f8f7d2..fb74df6 100644
--- a/include/pulsar/Authentication.h
+++ b/include/pulsar/Authentication.h
@@ -19,13 +19,14 @@
 #ifndef PULSAR_AUTHENTICATION_H_
 #define PULSAR_AUTHENTICATION_H_
 
+#include <pulsar/Result.h>
 #include <pulsar/defines.h>
-#include <vector>
-#include <string>
+
+#include <functional>
 #include <map>
 #include <memory>
-#include <pulsar/Result.h>
-#include <functional>
+#include <string>
+#include <vector>
 
 namespace pulsar {
 
diff --git a/include/pulsar/BatchReceivePolicy.h b/include/pulsar/BatchReceivePolicy.h
index 3c66da2..bc8b791 100644
--- a/include/pulsar/BatchReceivePolicy.h
+++ b/include/pulsar/BatchReceivePolicy.h
@@ -20,6 +20,7 @@
 #define BATCH_RECEIVE_POLICY_HPP_
 
 #include <pulsar/defines.h>
+
 #include <memory>
 
 namespace pulsar {
diff --git a/include/pulsar/BrokerConsumerStats.h b/include/pulsar/BrokerConsumerStats.h
index b4fe9e0..e179e4b 100644
--- a/include/pulsar/BrokerConsumerStats.h
+++ b/include/pulsar/BrokerConsumerStats.h
@@ -19,13 +19,13 @@
 #ifndef PULSAR_CPP_BROKERCONSUMERSTATS_H
 #define PULSAR_CPP_BROKERCONSUMERSTATS_H
 
-#include <pulsar/defines.h>
-#include <string.h>
-#include <iostream>
+#include <pulsar/ConsumerType.h>
 #include <pulsar/Result.h>
+#include <pulsar/defines.h>
+
 #include <functional>
+#include <iostream>
 #include <memory>
-#include <pulsar/ConsumerType.h>
 
 namespace pulsar {
 class BrokerConsumerStatsImplBase;
diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h
index 3edb03b..c189a20 100644
--- a/include/pulsar/Client.h
+++ b/include/pulsar/Client.h
@@ -19,17 +19,18 @@
 #ifndef PULSAR_CLIENT_HPP_
 #define PULSAR_CLIENT_HPP_
 
-#include <pulsar/defines.h>
+#include <pulsar/ClientConfiguration.h>
+#include <pulsar/ConsoleLoggerFactory.h>
 #include <pulsar/Consumer.h>
+#include <pulsar/FileLoggerFactory.h>
+#include <pulsar/Message.h>
+#include <pulsar/MessageBuilder.h>
 #include <pulsar/Producer.h>
 #include <pulsar/Reader.h>
 #include <pulsar/Result.h>
-#include <pulsar/Message.h>
-#include <pulsar/MessageBuilder.h>
-#include <pulsar/ClientConfiguration.h>
 #include <pulsar/Schema.h>
-#include <pulsar/ConsoleLoggerFactory.h>
-#include <pulsar/FileLoggerFactory.h>
+#include <pulsar/defines.h>
+
 #include <string>
 
 namespace pulsar {
diff --git a/include/pulsar/ClientConfiguration.h b/include/pulsar/ClientConfiguration.h
index 32ad32b..9df92ac 100644
--- a/include/pulsar/ClientConfiguration.h
+++ b/include/pulsar/ClientConfiguration.h
@@ -19,9 +19,9 @@
 #ifndef PULSAR_CLIENTCONFIGURATION_H_
 #define PULSAR_CLIENTCONFIGURATION_H_
 
-#include <pulsar/defines.h>
 #include <pulsar/Authentication.h>
 #include <pulsar/Logger.h>
+#include <pulsar/defines.h>
 
 namespace pulsar {
 class PulsarWrapper;
diff --git a/include/pulsar/Consumer.h b/include/pulsar/Consumer.h
index c7911b9..d37c7f9 100644
--- a/include/pulsar/Consumer.h
+++ b/include/pulsar/Consumer.h
@@ -19,10 +19,11 @@
 #ifndef CONSUMER_HPP_
 #define CONSUMER_HPP_
 
-#include <iostream>
-#include <pulsar/defines.h>
 #include <pulsar/BrokerConsumerStats.h>
 #include <pulsar/ConsumerConfiguration.h>
+#include <pulsar/defines.h>
+
+#include <iostream>
 
 namespace pulsar {
 class PulsarWrapper;
diff --git a/include/pulsar/ConsumerConfiguration.h b/include/pulsar/ConsumerConfiguration.h
index 13d5cc0..0418cfa 100644
--- a/include/pulsar/ConsumerConfiguration.h
+++ b/include/pulsar/ConsumerConfiguration.h
@@ -19,18 +19,20 @@
 #ifndef PULSAR_CONSUMERCONFIGURATION_H_
 #define PULSAR_CONSUMERCONFIGURATION_H_
 
-#include <functional>
-#include <memory>
-#include <pulsar/defines.h>
-#include <pulsar/Result.h>
-#include <pulsar/ConsumerType.h>
-#include <pulsar/Message.h>
-#include <pulsar/Schema.h>
 #include <pulsar/ConsumerCryptoFailureAction.h>
+#include <pulsar/ConsumerEventListener.h>
+#include <pulsar/ConsumerType.h>
 #include <pulsar/CryptoKeyReader.h>
 #include <pulsar/InitialPosition.h>
 #include <pulsar/KeySharedPolicy.h>
-#include <pulsar/ConsumerEventListener.h>
+#include <pulsar/Message.h>
+#include <pulsar/Result.h>
+#include <pulsar/Schema.h>
+#include <pulsar/defines.h>
+
+#include <functional>
+#include <memory>
+
 #include "BatchReceivePolicy.h"
 
 namespace pulsar {
diff --git a/include/pulsar/CryptoKeyReader.h b/include/pulsar/CryptoKeyReader.h
index e0b2a77..d11e1f8 100644
--- a/include/pulsar/CryptoKeyReader.h
+++ b/include/pulsar/CryptoKeyReader.h
@@ -19,9 +19,9 @@
 #ifndef CRYPTOKEYREADER_H_
 #define CRYPTOKEYREADER_H_
 
-#include <pulsar/defines.h>
-#include <pulsar/Result.h>
 #include <pulsar/EncryptionKeyInfo.h>
+#include <pulsar/Result.h>
+#include <pulsar/defines.h>
 
 namespace pulsar {
 
diff --git a/include/pulsar/DeprecatedException.h b/include/pulsar/DeprecatedException.h
index 9680591..affdf6f 100644
--- a/include/pulsar/DeprecatedException.h
+++ b/include/pulsar/DeprecatedException.h
@@ -19,9 +19,10 @@
 #ifndef DEPRECATED_EXCEPTION_HPP_
 #define DEPRECATED_EXCEPTION_HPP_
 
+#include <pulsar/defines.h>
+
 #include <stdexcept>
 #include <string>
-#include <pulsar/defines.h>
 
 namespace pulsar {
 class PULSAR_PUBLIC DeprecatedException : public std::runtime_error {
diff --git a/include/pulsar/EncryptionKeyInfo.h b/include/pulsar/EncryptionKeyInfo.h
index 0357622..401d7be 100644
--- a/include/pulsar/EncryptionKeyInfo.h
+++ b/include/pulsar/EncryptionKeyInfo.h
@@ -19,10 +19,11 @@
 #ifndef ENCRYPTIONKEYINFO_H_
 #define ENCRYPTIONKEYINFO_H_
 
-#include <memory>
+#include <pulsar/defines.h>
+
 #include <iostream>
 #include <map>
-#include <pulsar/defines.h>
+#include <memory>
 
 namespace pulsar {
 
diff --git a/include/pulsar/KeySharedPolicy.h b/include/pulsar/KeySharedPolicy.h
index 53efc4c..08eee77 100644
--- a/include/pulsar/KeySharedPolicy.h
+++ b/include/pulsar/KeySharedPolicy.h
@@ -21,7 +21,6 @@
 #include <pulsar/defines.h>
 
 #include <memory>
-
 #include <utility>
 #include <vector>
 
diff --git a/include/pulsar/Logger.h b/include/pulsar/Logger.h
index e9487a7..710cdc2 100644
--- a/include/pulsar/Logger.h
+++ b/include/pulsar/Logger.h
@@ -18,9 +18,10 @@
  */
 #pragma once
 
+#include <pulsar/defines.h>
+
 #include <memory>
 #include <string>
-#include <pulsar/defines.h>
 
 namespace pulsar {
 
diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h
index 935236b..0c4afc2 100644
--- a/include/pulsar/Message.h
+++ b/include/pulsar/Message.h
@@ -19,12 +19,12 @@
 #ifndef MESSAGE_HPP_
 #define MESSAGE_HPP_
 
-#include <map>
-#include <string>
+#include <pulsar/defines.h>
 
+#include <map>
 #include <memory>
+#include <string>
 
-#include <pulsar/defines.h>
 #include "MessageId.h"
 
 namespace pulsar {
diff --git a/include/pulsar/MessageBatch.h b/include/pulsar/MessageBatch.h
index be94358..952feb7 100644
--- a/include/pulsar/MessageBatch.h
+++ b/include/pulsar/MessageBatch.h
@@ -19,10 +19,10 @@
 
 #ifndef LIB_MESSAGE_BATCH_H
 #define LIB_MESSAGE_BATCH_H
-#include <vector>
-
-#include <pulsar/defines.h>
 #include <pulsar/Message.h>
+#include <pulsar/defines.h>
+
+#include <vector>
 
 namespace pulsar {
 
diff --git a/include/pulsar/MessageBuilder.h b/include/pulsar/MessageBuilder.h
index 71dafaa..2b84d20 100644
--- a/include/pulsar/MessageBuilder.h
+++ b/include/pulsar/MessageBuilder.h
@@ -19,13 +19,13 @@
 #ifndef MESSAGE_BUILDER_H
 #define MESSAGE_BUILDER_H
 
+#include <pulsar/Message.h>
+#include <pulsar/defines.h>
+
 #include <chrono>
 #include <string>
 #include <vector>
 
-#include <pulsar/Message.h>
-#include <pulsar/defines.h>
-
 namespace pulsar {
 class PulsarWrapper;
 
diff --git a/include/pulsar/MessageId.h b/include/pulsar/MessageId.h
index 06be790..fd17df6 100644
--- a/include/pulsar/MessageId.h
+++ b/include/pulsar/MessageId.h
@@ -19,11 +19,12 @@
 #ifndef MESSAGE_ID_H
 #define MESSAGE_ID_H
 
-#include <iosfwd>
+#include <pulsar/defines.h>
 #include <stdint.h>
+
+#include <iosfwd>
 #include <memory>
 #include <string>
-#include <pulsar/defines.h>
 
 namespace pulsar {
 
diff --git a/include/pulsar/MessageRoutingPolicy.h b/include/pulsar/MessageRoutingPolicy.h
index bc76259..26ab75e 100644
--- a/include/pulsar/MessageRoutingPolicy.h
+++ b/include/pulsar/MessageRoutingPolicy.h
@@ -19,10 +19,11 @@
 #ifndef PULSAR_MESSAGE_ROUTING_POLICY_HEADER_
 #define PULSAR_MESSAGE_ROUTING_POLICY_HEADER_
 
-#include <pulsar/defines.h>
 #include <pulsar/DeprecatedException.h>
 #include <pulsar/Message.h>
 #include <pulsar/TopicMetadata.h>
+#include <pulsar/defines.h>
+
 #include <memory>
 
 /*
diff --git a/include/pulsar/Producer.h b/include/pulsar/Producer.h
index f414b76..955d985 100644
--- a/include/pulsar/Producer.h
+++ b/include/pulsar/Producer.h
@@ -19,11 +19,12 @@
 #ifndef PRODUCER_HPP_
 #define PRODUCER_HPP_
 
-#include <pulsar/defines.h>
 #include <pulsar/ProducerConfiguration.h>
-#include <memory>
+#include <pulsar/defines.h>
 #include <stdint.h>
 
+#include <memory>
+
 namespace pulsar {
 class ProducerImplBase;
 class PulsarWrapper;
diff --git a/include/pulsar/ProducerConfiguration.h b/include/pulsar/ProducerConfiguration.h
index fb331ea..39ecbe0 100644
--- a/include/pulsar/ProducerConfiguration.h
+++ b/include/pulsar/ProducerConfiguration.h
@@ -18,16 +18,16 @@
  */
 #ifndef PULSAR_PRODUCERCONFIGURATION_H_
 #define PULSAR_PRODUCERCONFIGURATION_H_
-#include <pulsar/defines.h>
 #include <pulsar/CompressionType.h>
-#include <pulsar/MessageRoutingPolicy.h>
-#include <pulsar/Result.h>
+#include <pulsar/CryptoKeyReader.h>
 #include <pulsar/Message.h>
-#include <functional>
+#include <pulsar/MessageRoutingPolicy.h>
 #include <pulsar/ProducerCryptoFailureAction.h>
-#include <pulsar/CryptoKeyReader.h>
+#include <pulsar/Result.h>
 #include <pulsar/Schema.h>
+#include <pulsar/defines.h>
 
+#include <functional>
 #include <set>
 
 namespace pulsar {
diff --git a/include/pulsar/ProtobufNativeSchema.h b/include/pulsar/ProtobufNativeSchema.h
index ef9a7b1..2feff5a 100644
--- a/include/pulsar/ProtobufNativeSchema.h
+++ b/include/pulsar/ProtobufNativeSchema.h
@@ -18,8 +18,8 @@
  */
 #pragma once
 
-#include <pulsar/Schema.h>
 #include <google/protobuf/descriptor.h>
+#include <pulsar/Schema.h>
 
 namespace pulsar {
 
diff --git a/include/pulsar/Reader.h b/include/pulsar/Reader.h
index 554788e..233da4f 100644
--- a/include/pulsar/Reader.h
+++ b/include/pulsar/Reader.h
@@ -19,9 +19,9 @@
 #ifndef PULSAR_READER_HPP_
 #define PULSAR_READER_HPP_
 
-#include <pulsar/defines.h>
 #include <pulsar/Message.h>
 #include <pulsar/ReaderConfiguration.h>
+#include <pulsar/defines.h>
 
 namespace pulsar {
 class PulsarWrapper;
diff --git a/include/pulsar/ReaderConfiguration.h b/include/pulsar/ReaderConfiguration.h
index 5b88553..9ae8e1c 100644
--- a/include/pulsar/ReaderConfiguration.h
+++ b/include/pulsar/ReaderConfiguration.h
@@ -19,14 +19,15 @@
 #ifndef PULSAR_READER_CONFIGURATION_H_
 #define PULSAR_READER_CONFIGURATION_H_
 
-#include <functional>
-#include <memory>
-#include <pulsar/defines.h>
-#include <pulsar/Result.h>
+#include <pulsar/ConsumerCryptoFailureAction.h>
+#include <pulsar/CryptoKeyReader.h>
 #include <pulsar/Message.h>
+#include <pulsar/Result.h>
 #include <pulsar/Schema.h>
-#include <pulsar/CryptoKeyReader.h>
-#include <pulsar/ConsumerCryptoFailureAction.h>
+#include <pulsar/defines.h>
+
+#include <functional>
+#include <memory>
 
 namespace pulsar {
 
diff --git a/include/pulsar/Result.h b/include/pulsar/Result.h
index cc7b457..0f7d8a8 100644
--- a/include/pulsar/Result.h
+++ b/include/pulsar/Result.h
@@ -19,9 +19,10 @@
 #ifndef ERROR_HPP_
 #define ERROR_HPP_
 
-#include <iosfwd>
 #include <pulsar/defines.h>
 
+#include <iosfwd>
+
 namespace pulsar {
 
 /**
diff --git a/include/pulsar/Schema.h b/include/pulsar/Schema.h
index 7e7a5ae..ec0802e 100644
--- a/include/pulsar/Schema.h
+++ b/include/pulsar/Schema.h
@@ -18,12 +18,12 @@
  */
 #pragma once
 
-#include <map>
+#include <pulsar/defines.h>
 
 #include <iosfwd>
+#include <map>
 #include <memory>
 #include <string>
-#include <pulsar/defines.h>
 
 namespace pulsar {
 
diff --git a/include/pulsar/c/client.h b/include/pulsar/c/client.h
index f5da826..3a53c01 100644
--- a/include/pulsar/c/client.h
+++ b/include/pulsar/c/client.h
@@ -19,18 +19,18 @@
 
 #pragma once
 
-#include <pulsar/defines.h>
 #include <pulsar/c/client_configuration.h>
+#include <pulsar/c/consumer.h>
+#include <pulsar/c/consumer_configuration.h>
 #include <pulsar/c/message.h>
 #include <pulsar/c/message_id.h>
 #include <pulsar/c/producer.h>
-#include <pulsar/c/consumer.h>
-#include <pulsar/c/reader.h>
-#include <pulsar/c/consumer_configuration.h>
 #include <pulsar/c/producer_configuration.h>
+#include <pulsar/c/reader.h>
 #include <pulsar/c/reader_configuration.h>
 #include <pulsar/c/result.h>
 #include <pulsar/c/string_list.h>
+#include <pulsar/defines.h>
 
 #ifdef __cplusplus
 extern "C" {
diff --git a/include/pulsar/c/consumer.h b/include/pulsar/c/consumer.h
index 52610d2..99ff8c8 100644
--- a/include/pulsar/c/consumer.h
+++ b/include/pulsar/c/consumer.h
@@ -24,9 +24,8 @@
 extern "C" {
 #endif
 
-#include <pulsar/c/result.h>
 #include <pulsar/c/message.h>
-
+#include <pulsar/c/result.h>
 #include <stdint.h>
 
 typedef struct _pulsar_consumer pulsar_consumer_t;
diff --git a/include/pulsar/c/consumer_configuration.h b/include/pulsar/c/consumer_configuration.h
index 128fa24..96abc5c 100644
--- a/include/pulsar/c/consumer_configuration.h
+++ b/include/pulsar/c/consumer_configuration.h
@@ -19,6 +19,7 @@
 #pragma once
 
 #include <pulsar/defines.h>
+
 #include "consumer.h"
 #include "producer_configuration.h"
 
diff --git a/include/pulsar/c/message.h b/include/pulsar/c/message.h
index f54d025..353e609 100644
--- a/include/pulsar/c/message.h
+++ b/include/pulsar/c/message.h
@@ -23,10 +23,10 @@
 extern "C" {
 #endif
 
+#include <pulsar/defines.h>
 #include <stddef.h>
 #include <stdint.h>
 
-#include <pulsar/defines.h>
 #include "string_map.h"
 
 typedef struct _pulsar_message pulsar_message_t;
diff --git a/include/pulsar/c/message_id.h b/include/pulsar/c/message_id.h
index 289c3bd..1367934 100644
--- a/include/pulsar/c/message_id.h
+++ b/include/pulsar/c/message_id.h
@@ -23,9 +23,9 @@
 extern "C" {
 #endif
 
+#include <pulsar/defines.h>
 #include <stddef.h>
 #include <stdint.h>
-#include <pulsar/defines.h>
 
 typedef struct _pulsar_message_id pulsar_message_id_t;
 
diff --git a/include/pulsar/c/message_router.h b/include/pulsar/c/message_router.h
index ed74f07..309a5b2 100644
--- a/include/pulsar/c/message_router.h
+++ b/include/pulsar/c/message_router.h
@@ -19,8 +19,8 @@
 
 #pragma once
 
-#include <pulsar/defines.h>
 #include <pulsar/c/message.h>
+#include <pulsar/defines.h>
 
 #ifdef __cplusplus
 extern "C" {
diff --git a/include/pulsar/c/producer.h b/include/pulsar/c/producer.h
index bf51f56..1de8134 100644
--- a/include/pulsar/c/producer.h
+++ b/include/pulsar/c/producer.h
@@ -23,10 +23,9 @@
 extern "C" {
 #endif
 
-#include <pulsar/defines.h>
-#include <pulsar/c/result.h>
 #include <pulsar/c/message.h>
-
+#include <pulsar/c/result.h>
+#include <pulsar/defines.h>
 #include <stdint.h>
 
 typedef struct _pulsar_producer pulsar_producer_t;
diff --git a/include/pulsar/c/producer_configuration.h b/include/pulsar/c/producer_configuration.h
index 9e5e5b0..f8f74c2 100644
--- a/include/pulsar/c/producer_configuration.h
+++ b/include/pulsar/c/producer_configuration.h
@@ -19,10 +19,9 @@
 
 #pragma once
 
-#include <stdint.h>
-
-#include <pulsar/defines.h>
 #include <pulsar/c/message_router.h>
+#include <pulsar/defines.h>
+#include <stdint.h>
 
 #ifdef __cplusplus
 extern "C" {
diff --git a/include/pulsar/c/reader.h b/include/pulsar/c/reader.h
index 4c09ff5..4c546f8 100644
--- a/include/pulsar/c/reader.h
+++ b/include/pulsar/c/reader.h
@@ -18,9 +18,9 @@
  */
 #pragma once
 
-#include <pulsar/defines.h>
-#include <pulsar/c/result.h>
 #include <pulsar/c/message.h>
+#include <pulsar/c/result.h>
+#include <pulsar/defines.h>
 
 #ifdef __cplusplus
 extern "C" {
diff --git a/include/pulsar/c/reader_configuration.h b/include/pulsar/c/reader_configuration.h
index cc8436c..66ce8ef 100644
--- a/include/pulsar/c/reader_configuration.h
+++ b/include/pulsar/c/reader_configuration.h
@@ -19,9 +19,9 @@
 
 #pragma once
 
-#include <pulsar/defines.h>
 #include <pulsar/c/message.h>
 #include <pulsar/c/reader.h>
+#include <pulsar/defines.h>
 
 #ifdef __cplusplus
 extern "C" {
diff --git a/lib/AckGroupingTracker.cc b/lib/AckGroupingTracker.cc
index 7d1d706..4abfcb1 100644
--- a/lib/AckGroupingTracker.cc
+++ b/lib/AckGroupingTracker.cc
@@ -19,33 +19,23 @@
 
 #include "AckGroupingTracker.h"
 
-#include <cstdint>
-
-#include <set>
-
+#include "ClientConnection.h"
 #include "Commands.h"
 #include "LogUtils.h"
-#include "PulsarApi.pb.h"
-#include "ClientConnection.h"
-#include <pulsar/MessageId.h>
 
 namespace pulsar {
 
 DECLARE_LOG_OBJECT();
 
 inline void sendAck(ClientConnectionPtr cnx, uint64_t consumerId, const MessageId& msgId,
-                    proto::CommandAck_AckType ackType) {
-    proto::MessageIdData msgIdData;
-    msgIdData.set_ledgerid(msgId.ledgerId());
-    msgIdData.set_entryid(msgId.entryId());
-    auto cmd = Commands::newAck(consumerId, msgIdData, ackType, -1);
+                    CommandAck_AckType ackType) {
+    auto cmd = Commands::newAck(consumerId, msgId.ledgerId(), msgId.entryId(), ackType, -1);
     cnx->sendCommand(cmd);
-    LOG_DEBUG("ACK request is sent for message - [" << msgIdData.ledgerid() << ", " << msgIdData.entryid()
-                                                    << "]");
+    LOG_DEBUG("ACK request is sent for message - [" << msgId.ledgerId() << ", " << msgId.entryId() << "]");
 }
 
 bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t consumerId,
-                                        const MessageId& msgId, proto::CommandAck_AckType ackType) {
+                                        const MessageId& msgId, CommandAck_AckType ackType) {
     auto cnx = connWeakPtr.lock();
     if (cnx == nullptr) {
         LOG_DEBUG("Connection is not ready, ACK failed for message - [" << msgId.ledgerId() << ", "
@@ -65,7 +55,7 @@ bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uin
     }
 
     for (const auto& msgId : msgIds) {
-        sendAck(cnx, consumerId, msgId, proto::CommandAck::Individual);
+        sendAck(cnx, consumerId, msgId, CommandAck_AckType_Individual);
     }
     return true;
 }
diff --git a/lib/AckGroupingTracker.h b/lib/AckGroupingTracker.h
index f4410e4..d0e800d 100644
--- a/lib/AckGroupingTracker.h
+++ b/lib/AckGroupingTracker.h
@@ -19,17 +19,18 @@
 #ifndef LIB_ACKGROUPINGTRACKER_H_
 #define LIB_ACKGROUPINGTRACKER_H_
 
-#include <cstdint>
+#include <pulsar/MessageId.h>
 
+#include <cstdint>
 #include <set>
-#include <memory>
 
-#include "PulsarApi.pb.h"
-#include "ClientConnection.h"
-#include <pulsar/MessageId.h>
+#include "ProtoApiEnums.h"
 
 namespace pulsar {
 
+class ClientConnection;
+using ClientConnectionWeakPtr = std::weak_ptr<ClientConnection>;
+
 /**
  * @class AckGroupingTracker
  * Default ACK grouping tracker, it actually neither tracks ACK requests nor sends them to brokers.
@@ -93,7 +94,7 @@ class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracke
      * @return true if the ACK is sent successfully, otherwise false.
      */
     bool doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t consumerId, const MessageId& msgId,
-                        proto::CommandAck_AckType ackType);
+                        CommandAck_AckType ackType);
 
     /**
      * Immediately send a set of ACK requests one by one to the broker, it only supports individual
diff --git a/lib/AckGroupingTrackerDisabled.cc b/lib/AckGroupingTrackerDisabled.cc
index 4c2b11f..ca53792 100644
--- a/lib/AckGroupingTrackerDisabled.cc
+++ b/lib/AckGroupingTrackerDisabled.cc
@@ -20,8 +20,8 @@
 #include "AckGroupingTrackerDisabled.h"
 
 #include "HandlerBase.h"
-#include "PulsarApi.pb.h"
-#include <pulsar/MessageId.h>
+#include "LogUtils.h"
+#include "ProtoApiEnums.h"
 
 namespace pulsar {
 
@@ -33,11 +33,11 @@ AckGroupingTrackerDisabled::AckGroupingTrackerDisabled(HandlerBase& handler, uin
 }
 
 void AckGroupingTrackerDisabled::addAcknowledge(const MessageId& msgId) {
-    this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId, proto::CommandAck::Individual);
+    this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId, CommandAck_AckType_Individual);
 }
 
 void AckGroupingTrackerDisabled::addAcknowledgeCumulative(const MessageId& msgId) {
-    this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId, proto::CommandAck::Cumulative);
+    this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId, CommandAck_AckType_Cumulative);
 }
 
 }  // namespace pulsar
diff --git a/lib/AckGroupingTrackerDisabled.h b/lib/AckGroupingTrackerDisabled.h
index 6e66718..ef6bfbe 100644
--- a/lib/AckGroupingTrackerDisabled.h
+++ b/lib/AckGroupingTrackerDisabled.h
@@ -21,12 +21,12 @@
 
 #include <cstdint>
 
-#include "HandlerBase.h"
-#include <pulsar/MessageId.h>
 #include "AckGroupingTracker.h"
 
 namespace pulsar {
 
+class HandlerBase;
+
 /**
  * @class AckGroupingTrackerDisabled
  * ACK grouping tracker that does not tracker or group ACK requests. The ACK requests are diretly
diff --git a/lib/AckGroupingTrackerEnabled.cc b/lib/AckGroupingTrackerEnabled.cc
index 5b6fe4e..2683d28 100644
--- a/lib/AckGroupingTrackerEnabled.cc
+++ b/lib/AckGroupingTrackerEnabled.cc
@@ -20,14 +20,13 @@
 #include "AckGroupingTrackerEnabled.h"
 
 #include <mutex>
-#include <algorithm>
 
-#include "Commands.h"
-#include "LogUtils.h"
+#include "ClientConnection.h"
 #include "ClientImpl.h"
+#include "Commands.h"
+#include "ExecutorService.h"
 #include "HandlerBase.h"
-#include "PulsarApi.pb.h"
-#include <pulsar/MessageId.h>
+#include "LogUtils.h"
 
 namespace pulsar {
 
@@ -111,7 +110,7 @@ void AckGroupingTrackerEnabled::flush() {
         std::lock_guard<std::mutex> lock(this->mutexCumulativeAckMsgId_);
         if (this->requireCumulativeAck_) {
             if (!this->doImmediateAck(cnx, this->consumerId_, this->nextCumulativeAckMsgId_,
-                                      proto::CommandAck::Cumulative)) {
+                                      CommandAck_AckType_Cumulative)) {
                 // Failed to send ACK.
                 LOG_WARN("Failed to send cumulative ACK.");
                 return;
diff --git a/lib/AckGroupingTrackerEnabled.h b/lib/AckGroupingTrackerEnabled.h
index c3926aa..89b13f8 100644
--- a/lib/AckGroupingTrackerEnabled.h
+++ b/lib/AckGroupingTrackerEnabled.h
@@ -19,18 +19,26 @@
 #ifndef LIB_ACKGROUPINGTRACKERENABLED_H_
 #define LIB_ACKGROUPINGTRACKERENABLED_H_
 
-#include <cstdint>
+#include <pulsar/MessageId.h>
 
-#include <set>
+#include <boost/asio/deadline_timer.hpp>
+#include <cstdint>
 #include <mutex>
+#include <set>
 
-#include "ClientImpl.h"
-#include "HandlerBase.h"
-#include <pulsar/MessageId.h>
 #include "AckGroupingTracker.h"
 
 namespace pulsar {
 
+class ClientImpl;
+using ClientImplPtr = std::shared_ptr<ClientImpl>;
+using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
+class ExecutorService;
+using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
+class HandlerBase;
+using HandlerBasePtr = std::shared_ptr<HandlerBase>;
+using HandlerBaseWeakPtr = std::weak_ptr<HandlerBase>;
+
 /**
  * @class AckGroupingTrackerEnabled
  * Ack grouping tracker for consumers of persistent topics that enabled ACK grouping.
diff --git a/lib/Authentication.cc b/lib/Authentication.cc
index 4695a03..1bdac05 100644
--- a/lib/Authentication.cc
+++ b/lib/Authentication.cc
@@ -16,23 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <stdio.h>
-
+#include <dlfcn.h>
 #include <pulsar/Authentication.h>
-#include "auth/AuthTls.h"
-#include "auth/AuthAthenz.h"
-#include "auth/AuthToken.h"
-#include "auth/AuthOauth2.h"
-#include "auth/AuthBasic.h"
-#include <lib/LogUtils.h>
 
+#include <boost/algorithm/string.hpp>
+#include <mutex>
 #include <string>
 #include <vector>
-#include <iostream>
-#include <dlfcn.h>
-#include <cstdlib>
-#include <mutex>
-#include <boost/algorithm/string.hpp>
+
+#include "LogUtils.h"
+#include "auth/AuthAthenz.h"
+#include "auth/AuthBasic.h"
+#include "auth/AuthOauth2.h"
+#include "auth/AuthTls.h"
+#include "auth/AuthToken.h"
 
 DECLARE_LOG_OBJECT()
 
diff --git a/lib/Backoff.cc b/lib/Backoff.cc
index 790d3f8..4d95422 100644
--- a/lib/Backoff.cc
+++ b/lib/Backoff.cc
@@ -17,10 +17,12 @@
  * under the License.
  */
 #include "Backoff.h"
-#include <boost/random/uniform_int_distribution.hpp>
-#include <algorithm>
+
 #include <time.h> /* time */
 
+#include <algorithm>
+#include <boost/random/uniform_int_distribution.hpp>
+
 namespace pulsar {
 
 Backoff::Backoff(const TimeDuration& initial, const TimeDuration& max, const TimeDuration& mandatoryStop)
diff --git a/lib/Backoff.h b/lib/Backoff.h
index 93b97ad..4bcebc7 100644
--- a/lib/Backoff.h
+++ b/lib/Backoff.h
@@ -18,13 +18,14 @@
  */
 #ifndef _PULSAR_BACKOFF_HEADER_
 #define _PULSAR_BACKOFF_HEADER_
+#include <pulsar/defines.h>
+
 #include <boost/date_time/posix_time/posix_time.hpp>
 #include <boost/random/mersenne_twister.hpp>
-#include <pulsar/defines.h>
 
 namespace pulsar {
 
-typedef boost::posix_time::time_duration TimeDuration;
+using TimeDuration = boost::posix_time::time_duration;
 
 class PULSAR_PUBLIC Backoff {
    public:
diff --git a/lib/BatchAcknowledgementTracker.cc b/lib/BatchAcknowledgementTracker.cc
index 3d6d920..1df4984 100644
--- a/lib/BatchAcknowledgementTracker.cc
+++ b/lib/BatchAcknowledgementTracker.cc
@@ -18,6 +18,9 @@
  */
 #include "BatchAcknowledgementTracker.h"
 
+#include "LogUtils.h"
+#include "MessageImpl.h"
+
 namespace pulsar {
 DECLARE_LOG_OBJECT()
 
@@ -62,10 +65,9 @@ void BatchAcknowledgementTracker::receivedMessage(const Message& message) {
         TrackerPair(msgID, boost::dynamic_bitset<>(message.impl_->metadata.num_messages_in_batch()).set()));
 }
 
-void BatchAcknowledgementTracker::deleteAckedMessage(const MessageId& messageId,
-                                                     proto::CommandAck_AckType ackType) {
+void BatchAcknowledgementTracker::deleteAckedMessage(const MessageId& messageId, CommandAck_AckType ackType) {
     // Not a batch message and a individual ack
-    if (messageId.batchIndex() == -1 && ackType == proto::CommandAck_AckType_Individual) {
+    if (messageId.batchIndex() == -1 && ackType == CommandAck_AckType_Individual) {
         return;
     }
 
@@ -73,7 +75,7 @@ void BatchAcknowledgementTracker::deleteAckedMessage(const MessageId& messageId,
         MessageId(messageId.partition(), messageId.ledgerId(), messageId.entryId(), -1 /* Batch index */);
 
     Lock lock(mutex_);
-    if (ackType == proto::CommandAck_AckType_Cumulative) {
+    if (ackType == CommandAck_AckType_Cumulative) {
         // delete from trackerMap and sendList all messageIDs less than or equal to this one
         // equal to - since getGreatestCumulativeAckReady already gives us the exact message id to be acked
 
@@ -110,8 +112,7 @@ void BatchAcknowledgementTracker::deleteAckedMessage(const MessageId& messageId,
     }
 }
 
-bool BatchAcknowledgementTracker::isBatchReady(const MessageId& msgID,
-                                               const proto::CommandAck_AckType ackType) {
+bool BatchAcknowledgementTracker::isBatchReady(const MessageId& msgID, CommandAck_AckType ackType) {
     Lock lock(mutex_);
     // Remove batch index
     MessageId batchMessageId =
@@ -130,7 +131,7 @@ bool BatchAcknowledgementTracker::isBatchReady(const MessageId& msgID,
     assert(batchIndex < pos->second.size());
     pos->second.set(batchIndex, false);
 
-    if (ackType == proto::CommandAck_AckType_Cumulative) {
+    if (ackType == CommandAck_AckType_Cumulative) {
         for (int i = 0; i < batchIndex; i++) {
             pos->second.set(i, false);
         }
diff --git a/lib/BatchAcknowledgementTracker.h b/lib/BatchAcknowledgementTracker.h
index 6a709b3..6cbe753 100644
--- a/lib/BatchAcknowledgementTracker.h
+++ b/lib/BatchAcknowledgementTracker.h
@@ -19,15 +19,17 @@
 #ifndef LIB_BATCHACKNOWLEDGEMENTTRACKER_H_
 #define LIB_BATCHACKNOWLEDGEMENTTRACKER_H_
 
-#include "MessageImpl.h"
+#include <pulsar/Message.h>
+#include <pulsar/MessageId.h>
+
+#include <boost/dynamic_bitset.hpp>
 #include <map>
 #include <mutex>
-#include <boost/dynamic_bitset.hpp>
-#include <lib/PulsarApi.pb.h>
-#include <algorithm>
-#include "LogUtils.h"
+#include <ostream>
 #include <string>
-#include <sstream>
+
+#include "ProtoApiEnums.h"
+
 namespace pulsar {
 
 class ConsumerImpl;
@@ -57,10 +59,10 @@ class BatchAcknowledgementTracker {
     BatchAcknowledgementTracker(const std::string topic, const std::string subscription,
                                 const long consumerId);
 
-    bool isBatchReady(const MessageId& msgID, const proto::CommandAck_AckType ackType);
+    bool isBatchReady(const MessageId& msgID, CommandAck_AckType ackType);
     const MessageId getGreatestCumulativeAckReady(const MessageId& messageId);
 
-    void deleteAckedMessage(const MessageId& messageId, proto::CommandAck_AckType ackType);
+    void deleteAckedMessage(const MessageId& messageId, CommandAck_AckType ackType);
     void receivedMessage(const Message& message);
 
     void clear();
diff --git a/lib/BatchMessageContainer.cc b/lib/BatchMessageContainer.cc
index e25b72e..ae0425e 100644
--- a/lib/BatchMessageContainer.cc
+++ b/lib/BatchMessageContainer.cc
@@ -17,14 +17,11 @@
  * under the License.
  */
 #include "BatchMessageContainer.h"
-#include "ClientConnection.h"
-#include "Commands.h"
-#include "LogUtils.h"
-#include "MessageImpl.h"
-#include "ProducerImpl.h"
-#include "TimeUtils.h"
+
 #include <stdexcept>
 
+#include "LogUtils.h"
+
 DECLARE_LOG_OBJECT()
 
 namespace pulsar {
diff --git a/lib/BatchMessageContainerBase.cc b/lib/BatchMessageContainerBase.cc
index e9e6b98..0cf338f 100644
--- a/lib/BatchMessageContainerBase.cc
+++ b/lib/BatchMessageContainerBase.cc
@@ -17,11 +17,16 @@
  * under the License.
  */
 #include "BatchMessageContainerBase.h"
+
+#include "ClientConnection.h"
+#include "CompressionCodec.h"
+#include "MessageAndCallbackBatch.h"
 #include "MessageCrypto.h"
 #include "MessageImpl.h"
+#include "OpSendMsg.h"
 #include "ProducerImpl.h"
-#include "SharedBuffer.h"
 #include "PulsarApi.pb.h"
+#include "SharedBuffer.h"
 
 namespace pulsar {
 
@@ -55,7 +60,7 @@ Result BatchMessageContainerBase::createOpSendMsgHelper(OpSendMsg& opSendMsg,
     impl->metadata.set_num_messages_in_batch(batch.size());
     auto compressionType = producerConfig_.getCompressionType();
     if (compressionType != CompressionNone) {
-        impl->metadata.set_compression(CompressionCodecProvider::convertType(compressionType));
+        impl->metadata.set_compression(static_cast<proto::CompressionType>(compressionType));
         impl->metadata.set_uncompressed_size(impl->payload.readableBytes());
     }
     impl->payload = CompressionCodecProvider::getCodec(compressionType).encode(impl->payload);
@@ -83,4 +88,27 @@ Result BatchMessageContainerBase::createOpSendMsgHelper(OpSendMsg& opSendMsg,
     return ResultOk;
 }
 
+void BatchMessageContainerBase::processAndClear(
+    std::function<void(Result, const OpSendMsg&)> opSendMsgCallback, FlushCallback flushCallback) {
+    if (isEmpty()) {
+        if (flushCallback) {
+            flushCallback(ResultOk);
+        }
+    } else {
+        const auto numBatches = getNumBatches();
+        if (numBatches == 1) {
+            OpSendMsg opSendMsg;
+            Result result = createOpSendMsg(opSendMsg, flushCallback);
+            opSendMsgCallback(result, opSendMsg);
+        } else if (numBatches > 1) {
+            std::vector<OpSendMsg> opSendMsgs;
+            std::vector<Result> results = createOpSendMsgs(opSendMsgs, flushCallback);
+            for (size_t i = 0; i < results.size(); i++) {
+                opSendMsgCallback(results[i], opSendMsgs[i]);
+            }
+        }  // else numBatches is 0, do nothing
+    }
+    clear();
+}
+
 }  // namespace pulsar
diff --git a/lib/BatchMessageContainerBase.h b/lib/BatchMessageContainerBase.h
index 71eef5f..e9cf7ef 100644
--- a/lib/BatchMessageContainerBase.h
+++ b/lib/BatchMessageContainerBase.h
@@ -19,24 +19,22 @@
 #ifndef LIB_BATCHMESSAGECONTAINERBASE_H_
 #define LIB_BATCHMESSAGECONTAINERBASE_H_
 
-#include <pulsar/Result.h>
 #include <pulsar/Message.h>
-#include <pulsar/ProducerConfiguration.h>
 #include <pulsar/Producer.h>
+#include <pulsar/ProducerConfiguration.h>
+#include <pulsar/Result.h>
 
+#include <boost/noncopyable.hpp>
 #include <memory>
 #include <vector>
 
-#include <boost/noncopyable.hpp>
-
-#include "MessageAndCallbackBatch.h"
-#include "OpSendMsg.h"
-
 namespace pulsar {
 
 class MessageCrypto;
 class ProducerImpl;
 class SharedBuffer;
+struct OpSendMsg;
+class MessageAndCallbackBatch;
 
 namespace proto {
 class MessageMetadata;
@@ -160,29 +158,6 @@ inline void BatchMessageContainerBase::resetStats() {
     sizeInBytes_ = 0;
 }
 
-inline void BatchMessageContainerBase::processAndClear(
-    std::function<void(Result, const OpSendMsg&)> opSendMsgCallback, FlushCallback flushCallback) {
-    if (isEmpty()) {
-        if (flushCallback) {
-            flushCallback(ResultOk);
-        }
-    } else {
-        const auto numBatches = getNumBatches();
-        if (numBatches == 1) {
-            OpSendMsg opSendMsg;
-            Result result = createOpSendMsg(opSendMsg, flushCallback);
-            opSendMsgCallback(result, opSendMsg);
-        } else if (numBatches > 1) {
-            std::vector<OpSendMsg> opSendMsgs;
-            std::vector<Result> results = createOpSendMsgs(opSendMsgs, flushCallback);
-            for (size_t i = 0; i < results.size(); i++) {
-                opSendMsgCallback(results[i], opSendMsgs[i]);
-            }
-        }  // else numBatches is 0, do nothing
-    }
-    clear();
-}
-
 inline std::ostream& operator<<(std::ostream& os, const BatchMessageContainerBase& container) {
     container.serialize(os);
     return os;
diff --git a/lib/BatchMessageKeyBasedContainer.cc b/lib/BatchMessageKeyBasedContainer.cc
index 7441a3e..05baf34 100644
--- a/lib/BatchMessageKeyBasedContainer.cc
+++ b/lib/BatchMessageKeyBasedContainer.cc
@@ -17,16 +17,18 @@
  * under the License.
  */
 #include "BatchMessageKeyBasedContainer.h"
+
+#include <algorithm>
+#include <map>
+
 #include "ClientConnection.h"
 #include "Commands.h"
 #include "LogUtils.h"
 #include "MessageImpl.h"
+#include "OpSendMsg.h"
 #include "ProducerImpl.h"
 #include "TimeUtils.h"
 
-#include <algorithm>
-#include <map>
-
 DECLARE_LOG_OBJECT()
 
 namespace pulsar {
diff --git a/lib/BatchReceivePolicy.cc b/lib/BatchReceivePolicy.cc
index 08aa368..8487656 100644
--- a/lib/BatchReceivePolicy.cc
+++ b/lib/BatchReceivePolicy.cc
@@ -18,6 +18,7 @@
  */
 
 #include <pulsar/BatchReceivePolicy.h>
+
 #include "BatchReceivePolicyImpl.h"
 #include "LogUtils.h"
 
diff --git a/lib/BinaryProtoLookupService.cc b/lib/BinaryProtoLookupService.cc
index ff42b91..b863d52 100644
--- a/lib/BinaryProtoLookupService.cc
+++ b/lib/BinaryProtoLookupService.cc
@@ -17,13 +17,13 @@
  * under the License.
  */
 #include "BinaryProtoLookupService.h"
-#include "SharedBuffer.h"
-
-#include <lib/TopicName.h>
 
+#include "ClientConnection.h"
 #include "ConnectionPool.h"
-
-#include <string>
+#include "LogUtils.h"
+#include "NamespaceName.h"
+#include "ServiceNameResolver.h"
+#include "TopicName.h"
 
 DECLARE_LOG_OBJECT()
 
diff --git a/lib/BinaryProtoLookupService.h b/lib/BinaryProtoLookupService.h
index d068c3d..9adb648 100644
--- a/lib/BinaryProtoLookupService.h
+++ b/lib/BinaryProtoLookupService.h
@@ -19,17 +19,19 @@
 #ifndef _PULSAR_BINARY_LOOKUP_SERVICE_HEADER_
 #define _PULSAR_BINARY_LOOKUP_SERVICE_HEADER_
 
-#include <iostream>
-#include <pulsar/defines.h>
 #include <pulsar/Authentication.h>
-#include "ConnectionPool.h"
-#include "Backoff.h"
-#include <lib/LookupService.h>
+
 #include <mutex>
-#include "ServiceNameResolver.h"
+
+#include "LookupService.h"
 
 namespace pulsar {
+class ClientConnection;
+using ClientConnectionWeakPtr = std::weak_ptr<ClientConnection>;
+class ConnectionPool;
 class LookupDataResult;
+class ServiceNameResolver;
+using NamespaceTopicsPromisePtr = std::shared_ptr<Promise<Result, NamespaceTopicsPtr>>;
 
 class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
    public:
diff --git a/lib/BlockingQueue.h b/lib/BlockingQueue.h
index d09166f..ab1902b 100644
--- a/lib/BlockingQueue.h
+++ b/lib/BlockingQueue.h
@@ -19,10 +19,9 @@
 #ifndef LIB_BLOCKINGQUEUE_H_
 #define LIB_BLOCKINGQUEUE_H_
 
-#include <assert.h>
-#include <mutex>
-#include <condition_variable>
 #include <boost/circular_buffer.hpp>
+#include <condition_variable>
+#include <mutex>
 
 /**
  * Following structs are defined for holding a predicate in wait() call on condition variables.
diff --git a/lib/BoostHash.h b/lib/BoostHash.h
index 10d62e1..be48ce3 100644
--- a/lib/BoostHash.h
+++ b/lib/BoostHash.h
@@ -20,11 +20,12 @@
 #define BOOST_HASH_HPP_
 
 #include <pulsar/defines.h>
-#include "Hash.h"
 
+#include <boost/functional/hash.hpp>
 #include <cstdint>
 #include <string>
-#include <boost/functional/hash.hpp>
+
+#include "Hash.h"
 
 namespace pulsar {
 class PULSAR_PUBLIC BoostHash : public Hash {
diff --git a/lib/BrokerConsumerStats.cc b/lib/BrokerConsumerStats.cc
index e3e1dea..a403772 100644
--- a/lib/BrokerConsumerStats.cc
+++ b/lib/BrokerConsumerStats.cc
@@ -16,9 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <pulsar/defines.h>
 #include <pulsar/BrokerConsumerStats.h>
-#include <lib/BrokerConsumerStatsImplBase.h>
+#include <pulsar/defines.h>
+
+#include "BrokerConsumerStatsImplBase.h"
 
 namespace pulsar {
 BrokerConsumerStats::BrokerConsumerStats(std::shared_ptr<BrokerConsumerStatsImplBase> impl) : impl_(impl) {}
diff --git a/lib/BrokerConsumerStatsImpl.cc b/lib/BrokerConsumerStatsImpl.cc
index 220415a..2f7b6ad 100644
--- a/lib/BrokerConsumerStatsImpl.cc
+++ b/lib/BrokerConsumerStatsImpl.cc
@@ -16,7 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <lib/BrokerConsumerStatsImpl.h>
+#include "BrokerConsumerStatsImpl.h"
+
 #include <boost/date_time/local_time/local_time.hpp>
 
 namespace pulsar {
diff --git a/lib/BrokerConsumerStatsImpl.h b/lib/BrokerConsumerStatsImpl.h
index eb238c6..721af59 100644
--- a/lib/BrokerConsumerStatsImpl.h
+++ b/lib/BrokerConsumerStatsImpl.h
@@ -19,14 +19,14 @@
 #ifndef PULSAR_CPP_BROKERCONSUMERSTATSIMPL_H
 #define PULSAR_CPP_BROKERCONSUMERSTATSIMPL_H
 
-#include <string.h>
-#include <iostream>
+#include <pulsar/BrokerConsumerStats.h>
 #include <pulsar/defines.h>
-#include <pulsar/Result.h>
+
+#include <boost/date_time/posix_time/posix_time.hpp>
 #include <functional>
-#include <boost/date_time/microsec_time_clock.hpp>
-#include <pulsar/BrokerConsumerStats.h>
-#include <lib/BrokerConsumerStatsImplBase.h>
+#include <iostream>
+
+#include "BrokerConsumerStatsImplBase.h"
 
 namespace pulsar {
 class PULSAR_PUBLIC BrokerConsumerStatsImpl : public BrokerConsumerStatsImplBase {
diff --git a/lib/BrokerConsumerStatsImplBase.h b/lib/BrokerConsumerStatsImplBase.h
index 282dfc0..f2094a6 100644
--- a/lib/BrokerConsumerStatsImplBase.h
+++ b/lib/BrokerConsumerStatsImplBase.h
@@ -20,7 +20,6 @@
 #define PULSAR_CPP_BROKERCONSUMERSTATSIMPLBASE_H
 
 #include <pulsar/BrokerConsumerStats.h>
-#include <boost/date_time/posix_time/ptime.hpp>
 
 namespace pulsar {
 class BrokerConsumerStatsImplBase {
diff --git a/lib/Client.cc b/lib/Client.cc
index c72232a..03823fb 100644
--- a/lib/Client.cc
+++ b/lib/Client.cc
@@ -16,16 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <iostream>
 #include <pulsar/Client.h>
-#include <utility>
 
+#include <iostream>
 #include <memory>
+#include <utility>
 
 #include "ClientImpl.h"
-#include "Utils.h"
-#include "ExecutorService.h"
 #include "LogUtils.h"
+#include "Utils.h"
 
 DECLARE_LOG_OBJECT()
 
diff --git a/lib/ClientConfiguration.cc b/lib/ClientConfiguration.cc
index 1a161c3..70d85cb 100644
--- a/lib/ClientConfiguration.cc
+++ b/lib/ClientConfiguration.cc
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <lib/ClientConfigurationImpl.h>
+#include "ClientConfigurationImpl.h"
 
 namespace pulsar {
 
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index a037ff3..b3df831 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -18,38 +18,35 @@
  */
 #include "ClientConnection.h"
 
-#include "PulsarApi.pb.h"
-
-#include <iostream>
-#include <algorithm>
-#include <boost/date_time/posix_time/posix_time.hpp>
-#include <boost/date_time/gregorian/gregorian.hpp>
-#include <climits>
+#include <fstream>
 
-#include "ExecutorService.h"
 #include "Commands.h"
+#include "ConsumerImpl.h"
+#include "ExecutorService.h"
 #include "LogUtils.h"
-#include "Url.h"
-
-#include <functional>
-#include <string>
-
+#include "OpSendMsg.h"
 #include "ProducerImpl.h"
-#include "ConsumerImpl.h"
+#include "PulsarApi.pb.h"
+#include "Url.h"
 #include "checksum/ChecksumProvider.h"
-#include "MessageIdUtil.h"
 
 DECLARE_LOG_OBJECT()
 
-using namespace pulsar::proto;
 using namespace boost::asio::ip;
 
 namespace pulsar {
 
+using proto::BaseCommand;
+
 static const uint32_t DefaultBufferSize = 64 * 1024;
 
 static const int KeepAliveIntervalInSeconds = 30;
 
+static MessageId toMessageId(const proto::MessageIdData& messageIdData) {
+    return MessageId{messageIdData.partition(), static_cast<int64_t>(messageIdData.ledgerid()),
+                     static_cast<int64_t>(messageIdData.entryid()), messageIdData.batch_index()};
+}
+
 // Convert error codes from protobuf to client API Result
 static Result getResult(ServerError serverError, const std::string& message) {
     switch (serverError) {
@@ -139,7 +136,7 @@ static Result getResult(ServerError serverError, const std::string& message) {
     return ResultUnknownError;
 }
 
-inline std::ostream& operator<<(std::ostream& os, ServerError error) {
+inline std::ostream& operator<<(std::ostream& os, proto::ServerError error) {
     os << getResult(error, "");
     return os;
 }
@@ -160,7 +157,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
                                    const AuthenticationPtr& authentication)
     : operationsTimeout_(seconds(clientConfiguration.getOperationTimeoutSeconds())),
       authentication_(authentication),
-      serverProtocolVersion_(ProtocolVersion_MIN),
+      serverProtocolVersion_(proto::ProtocolVersion_MIN),
       executor_(executor),
       resolver_(executor_->createTcpResolver()),
 #if BOOST_VERSION >= 107000
@@ -268,7 +265,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
 
 ClientConnection::~ClientConnection() { LOG_INFO(cnxString_ << "Destroyed connection"); }
 
-void ClientConnection::handlePulsarConnected(const CommandConnected& cmdConnected) {
+void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdConnected) {
     if (!cmdConnected.has_server_version()) {
         LOG_ERROR(cnxString_ << "Server version is not set");
         close();
@@ -286,7 +283,7 @@ void ClientConnection::handlePulsarConnected(const CommandConnected& cmdConnecte
     serverProtocolVersion_ = cmdConnected.protocol_version();
     connectPromise_.setValue(shared_from_this());
 
-    if (serverProtocolVersion_ >= v1) {
+    if (serverProtocolVersion_ >= proto::v1) {
         // Only send keep-alive probes if the broker supports it
         keepAliveTimer_ = executor_->createDeadlineTimer();
         Lock lock(mutex_);
@@ -298,7 +295,7 @@ void ClientConnection::handlePulsarConnected(const CommandConnected& cmdConnecte
         lock.unlock();
     }
 
-    if (serverProtocolVersion_ >= v8) {
+    if (serverProtocolVersion_ >= proto::v8) {
         startConsumerStatsTimer(std::vector<uint64_t>());
     }
 }
@@ -659,7 +656,8 @@ void ClientConnection::processIncomingBuffer() {
 
         // At this point,  we have at least one complete frame available in the buffer
         uint32_t cmdSize = incomingBuffer_.readUnsignedInt();
-        if (!incomingCmd_.ParseFromArray(incomingBuffer_.data(), cmdSize)) {
+        proto::BaseCommand incomingCmd;
+        if (!incomingCmd.ParseFromArray(incomingBuffer_.data(), cmdSize)) {
             LOG_ERROR(cnxString_ << "Error parsing protocol buffer command");
             close();
             return;
@@ -667,20 +665,20 @@ void ClientConnection::processIncomingBuffer() {
 
         incomingBuffer_.consume(cmdSize);
 
-        if (incomingCmd_.type() == BaseCommand::MESSAGE) {
+        if (incomingCmd.type() == BaseCommand::MESSAGE) {
             // Parse message metadata and extract payload
-            MessageMetadata msgMetadata;
+            proto::MessageMetadata msgMetadata;
 
             // read checksum
             uint32_t remainingBytes = frameSize - (cmdSize + 4);
-            bool isChecksumValid = verifyChecksum(incomingBuffer_, remainingBytes, incomingCmd_);
+            bool isChecksumValid = verifyChecksum(incomingBuffer_, remainingBytes, incomingCmd);
 
             uint32_t metadataSize = incomingBuffer_.readUnsignedInt();
             if (!msgMetadata.ParseFromArray(incomingBuffer_.data(), metadataSize)) {
-                LOG_ERROR(cnxString_ << "[consumer id " << incomingCmd_.message().consumer_id()  //
+                LOG_ERROR(cnxString_ << "[consumer id " << incomingCmd.message().consumer_id()  //
                                      << ", message ledger id "
-                                     << incomingCmd_.message().message_id().ledgerid()  //
-                                     << ", entry id " << incomingCmd_.message().message_id().entryid()
+                                     << incomingCmd.message().message_id().ledgerid()  //
+                                     << ", entry id " << incomingCmd.message().message_id().entryid()
                                      << "] Error parsing message metadata");
                 close();
                 return;
@@ -692,9 +690,9 @@ void ClientConnection::processIncomingBuffer() {
             uint32_t payloadSize = remainingBytes;
             SharedBuffer payload = SharedBuffer::copy(incomingBuffer_.data(), payloadSize);
             incomingBuffer_.consume(payloadSize);
-            handleIncomingMessage(incomingCmd_.message(), isChecksumValid, msgMetadata, payload);
+            handleIncomingMessage(incomingCmd.message(), isChecksumValid, msgMetadata, payload);
         } else {
-            handleIncomingCommand();
+            handleIncomingCommand(incomingCmd);
         }
     }
     if (incomingBuffer_.readableBytes() > 0) {
@@ -722,7 +720,7 @@ void ClientConnection::processIncomingBuffer() {
 }
 
 bool ClientConnection::verifyChecksum(SharedBuffer& incomingBuffer_, uint32_t& remainingBytes,
-                                      proto::BaseCommand& incomingCmd_) {
+                                      proto::BaseCommand& incomingCmd) {
     int readerIndex = incomingBuffer_.readerIndex();
     bool isChecksumValid = true;
 
@@ -738,9 +736,9 @@ bool ClientConnection::verifyChecksum(SharedBuffer& incomingBuffer_, uint32_t& r
 
         if (!isChecksumValid) {
             LOG_ERROR("[consumer id "
-                      << incomingCmd_.message().consumer_id()                                           //
-                      << ", message ledger id " << incomingCmd_.message().message_id().ledgerid()       //
-                      << ", entry id " << incomingCmd_.message().message_id().entryid()                 //
+                      << incomingCmd.message().consumer_id()                                            //
+                      << ", message ledger id " << incomingCmd.message().message_id().ledgerid()        //
+                      << ", entry id " << incomingCmd.message().message_id().entryid()                  //
                       << "stored-checksum" << storedChecksum << "computedChecksum" << computedChecksum  //
                       << "] Checksum verification failed");
         }
@@ -795,8 +793,8 @@ void ClientConnection::handleIncomingMessage(const proto::CommandMessage& msg, b
     }
 }
 
-void ClientConnection::handleIncomingCommand() {
-    LOG_DEBUG(cnxString_ << "Handling incoming command: " << Commands::messageType(incomingCmd_.type()));
+void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
+    LOG_DEBUG(cnxString_ << "Handling incoming command: " << Commands::messageType(incomingCmd.type()));
 
     switch (state_) {
         case Pending: {
@@ -806,11 +804,11 @@ void ClientConnection::handleIncomingCommand() {
 
         case TcpConnected: {
             // Handle Pulsar Connected
-            if (incomingCmd_.type() != BaseCommand::CONNECTED) {
+            if (incomingCmd.type() != BaseCommand::CONNECTED) {
                 // Wrong cmd
                 close();
             } else {
-                handlePulsarConnected(incomingCmd_.connected());
+                handlePulsarConnected(incomingCmd.connected());
             }
             break;
         }
@@ -826,9 +824,9 @@ void ClientConnection::handleIncomingCommand() {
             havePendingPingRequest_ = false;
 
             // Handle normal commands
-            switch (incomingCmd_.type()) {
+            switch (incomingCmd.type()) {
                 case BaseCommand::SEND_RECEIPT: {
-                    const CommandSendReceipt& sendReceipt = incomingCmd_.send_receipt();
+                    const auto& sendReceipt = incomingCmd.send_receipt();
                     int producerId = sendReceipt.producer_id();
                     uint64_t sequenceId = sendReceipt.sequence_id();
                     const proto::MessageIdData& messageIdData = sendReceipt.message_id();
@@ -860,7 +858,7 @@ void ClientConnection::handleIncomingCommand() {
                 }
 
                 case BaseCommand::SEND_ERROR: {
-                    const CommandSendError& error = incomingCmd_.send_error();
+                    const auto& error = incomingCmd.send_error();
                     LOG_WARN(cnxString_ << "Received send error from server: " << error.message());
                     if (ChecksumError == error.error()) {
                         long producerId = error.producer_id();
@@ -886,7 +884,7 @@ void ClientConnection::handleIncomingCommand() {
                 }
 
                 case BaseCommand::SUCCESS: {
-                    const CommandSuccess& success = incomingCmd_.success();
+                    const auto& success = incomingCmd.success();
                     LOG_DEBUG(cnxString_ << "Received success response from server. req_id: "
                                          << success.request_id());
 
@@ -904,8 +902,7 @@ void ClientConnection::handleIncomingCommand() {
                 }
 
                 case BaseCommand::PARTITIONED_METADATA_RESPONSE: {
-                    const CommandPartitionedTopicMetadataResponse& partitionMetadataResponse =
-                        incomingCmd_.partitionmetadataresponse();
+                    const auto& partitionMetadataResponse = incomingCmd.partitionmetadataresponse();
                     LOG_DEBUG(cnxString_ << "Received partition-metadata response from server. req_id: "
                                          << partitionMetadataResponse.request_id());
 
@@ -921,7 +918,7 @@ void ClientConnection::handleIncomingCommand() {
 
                         if (!partitionMetadataResponse.has_response() ||
                             (partitionMetadataResponse.response() ==
-                             CommandPartitionedTopicMetadataResponse::Failed)) {
+                             proto::CommandPartitionedTopicMetadataResponse::Failed)) {
                             if (partitionMetadataResponse.has_error()) {
                                 LOG_ERROR(cnxString_ << "Failed partition-metadata lookup req_id: "
                                                      << partitionMetadataResponse.request_id()
@@ -950,8 +947,7 @@ void ClientConnection::handleIncomingCommand() {
                 }
 
                 case BaseCommand::CONSUMER_STATS_RESPONSE: {
-                    const CommandConsumerStatsResponse& consumerStatsResponse =
-                        incomingCmd_.consumerstatsresponse();
+                    const auto& consumerStatsResponse = incomingCmd.consumerstatsresponse();
                     LOG_DEBUG(cnxString_ << "ConsumerStatsResponse command - Received consumer stats "
                                             "response from server. req_id: "
                                          << consumerStatsResponse.request_id());
@@ -994,8 +990,7 @@ void ClientConnection::handleIncomingCommand() {
                 }
 
                 case BaseCommand::LOOKUP_RESPONSE: {
-                    const CommandLookupTopicResponse& lookupTopicResponse =
-                        incomingCmd_.lookuptopicresponse();
+                    const auto& lookupTopicResponse = incomingCmd.lookuptopicresponse();
                     LOG_DEBUG(cnxString_ << "Received lookup response from server. req_id: "
                                          << lookupTopicResponse.request_id());
 
@@ -1010,7 +1005,7 @@ void ClientConnection::handleIncomingCommand() {
                         lock.unlock();
 
                         if (!lookupTopicResponse.has_response() ||
-                            (lookupTopicResponse.response() == CommandLookupTopicResponse::Failed)) {
+                            (lookupTopicResponse.response() == proto::CommandLookupTopicResponse::Failed)) {
                             if (lookupTopicResponse.has_error()) {
                                 LOG_ERROR(cnxString_
                                           << "Failed lookup req_id: " << lookupTopicResponse.request_id()
@@ -1045,7 +1040,7 @@ void ClientConnection::handleIncomingCommand() {
                             lookupResultPtr->setBrokerUrlTls(lookupTopicResponse.brokerserviceurltls());
                             lookupResultPtr->setAuthoritative(lookupTopicResponse.authoritative());
                             lookupResultPtr->setRedirect(lookupTopicResponse.response() ==
-                                                         CommandLookupTopicResponse::Redirect);
+                                                         proto::CommandLookupTopicResponse::Redirect);
                             lookupResultPtr->setShouldProxyThroughServiceUrl(
                                 lookupTopicResponse.proxy_through_service_url());
                             lookupDataPromise->setValue(lookupResultPtr);
@@ -1059,7 +1054,7 @@ void ClientConnection::handleIncomingCommand() {
                 }
 
                 case BaseCommand::PRODUCER_SUCCESS: {
-                    const CommandProducerSuccess& producerSuccess = incomingCmd_.producer_success();
+                    const auto& producerSuccess = incomingCmd.producer_success();
                     LOG_DEBUG(cnxString_ << "Received success producer response from server. req_id: "
                                          << producerSuccess.request_id()  //
                                          << " -- producer name: " << producerSuccess.producer_name());
@@ -1089,7 +1084,7 @@ void ClientConnection::handleIncomingCommand() {
                 }
 
                 case BaseCommand::ERROR: {
-                    const CommandError& error = incomingCmd_.error();
+                    const auto& error = incomingCmd.error();
                     Result result = getResult(error.error(), error.message());
                     LOG_WARN(cnxString_ << "Received error response from server: " << result
                                         << (error.has_message() ? (" (" + error.message() + ")") : "")
@@ -1132,7 +1127,7 @@ void ClientConnection::handleIncomingCommand() {
                 }
 
                 case BaseCommand::CLOSE_PRODUCER: {
-                    const CommandCloseProducer& closeProducer = incomingCmd_.close_producer();
+                    const auto& closeProducer = incomingCmd.close_producer();
                     int producerId = closeProducer.producer_id();
 
                     LOG_DEBUG("Broker notification of Closed producer: " << producerId);
@@ -1156,7 +1151,7 @@ void ClientConnection::handleIncomingCommand() {
                 }
 
                 case BaseCommand::CLOSE_CONSUMER: {
-                    const CommandCloseConsumer& closeconsumer = incomingCmd_.close_consumer();
+                    const auto& closeconsumer = incomingCmd.close_consumer();
                     int consumerId = closeconsumer.consumer_id();
 
                     LOG_DEBUG("Broker notification of Closed consumer: " << consumerId);
@@ -1208,7 +1203,7 @@ void ClientConnection::handleIncomingCommand() {
                 }
 
                 case BaseCommand::ACTIVE_CONSUMER_CHANGE: {
-                    const CommandActiveConsumerChange& change = incomingCmd_.active_consumer_change();
+                    const auto& change = incomingCmd.active_consumer_change();
                     LOG_DEBUG(cnxString_
                               << "Received notification about active consumer change, consumer_id: "
                               << change.consumer_id() << " isActive: " << change.is_active());
@@ -1217,8 +1212,7 @@ void ClientConnection::handleIncomingCommand() {
                 }
 
                 case BaseCommand::GET_LAST_MESSAGE_ID_RESPONSE: {
-                    const CommandGetLastMessageIdResponse& getLastMessageIdResponse =
-                        incomingCmd_.getlastmessageidresponse();
+                    const auto& getLastMessageIdResponse = incomingCmd.getlastmessageidresponse();
                     LOG_DEBUG(cnxString_ << "Received getLastMessageIdResponse from server. req_id: "
                                          << getLastMessageIdResponse.request_id());
 
@@ -1249,8 +1243,7 @@ void ClientConnection::handleIncomingCommand() {
                 }
 
                 case BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE: {
-                    const CommandGetTopicsOfNamespaceResponse& response =
-                        incomingCmd_.gettopicsofnamespaceresponse();
+                    const auto& response = incomingCmd.gettopicsofnamespaceresponse();
 
                     LOG_DEBUG(cnxString_ << "Received GetTopicsOfNamespaceResponse from server. req_id: "
                                          << response.request_id() << " topicsSize" << response.topics_size());
@@ -1405,8 +1398,9 @@ void ClientConnection::sendMessage(const OpSendMsg& opSend) {
 }
 
 void ClientConnection::sendMessageInternal(const OpSendMsg& opSend) {
+    BaseCommand outgoingCmd;
     PairSharedBuffer buffer =
-        Commands::newSend(outgoingBuffer_, outgoingCmd_, opSend.producerId_, opSend.sequenceId_,
+        Commands::newSend(outgoingBuffer_, outgoingCmd, opSend.producerId_, opSend.sequenceId_,
                           getChecksumType(), opSend.metadata_, opSend.payload_);
 
     asyncWrite(buffer, customAllocWriteHandler(std::bind(&ClientConnection::handleSendPair,
@@ -1448,8 +1442,9 @@ void ClientConnection::sendPendingCommands() {
             assert(any.type() == typeid(OpSendMsg));
 
             const OpSendMsg& op = boost::any_cast<const OpSendMsg&>(any);
+            BaseCommand outgoingCmd;
             PairSharedBuffer buffer =
-                Commands::newSend(outgoingBuffer_, outgoingCmd_, op.producerId_, op.sequenceId_,
+                Commands::newSend(outgoingBuffer_, outgoingCmd, op.producerId_, op.sequenceId_,
                                   getChecksumType(), op.metadata_, op.payload_);
 
             asyncWrite(buffer, customAllocWriteHandler(std::bind(&ClientConnection::handleSendPair,
@@ -1697,7 +1692,7 @@ void ClientConnection::closeSocket() {
     }
 }
 
-void ClientConnection::checkServerError(const proto::ServerError& error) {
+void ClientConnection::checkServerError(ServerError error) {
     switch (error) {
         case proto::ServerError::ServiceNotReady:
             closeSocket();
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index 8a48408..ad5c3ad 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -19,42 +19,39 @@
 #ifndef _PULSAR_CLIENT_CONNECTION_HEADER_
 #define _PULSAR_CLIENT_CONNECTION_HEADER_
 
+#include <pulsar/ClientConfiguration.h>
 #include <pulsar/defines.h>
-#include <pulsar/Result.h>
 
-#include <boost/asio.hpp>
-#include <boost/asio/ssl.hpp>
-#include <boost/asio/strand.hpp>
 #include <boost/any.hpp>
-#include <mutex>
+#include <boost/asio/bind_executor.hpp>
+#include <boost/asio/deadline_timer.hpp>
+#include <boost/asio/io_service.hpp>
+#include <boost/asio/ip/tcp.hpp>
+#include <boost/asio/ssl/stream.hpp>
+#include <boost/asio/strand.hpp>
+#include <deque>
 #include <functional>
+#include <memory>
 #include <string>
 #include <vector>
-#include <deque>
-#include <atomic>
 
-#include "ExecutorService.h"
-#include "Future.h"
-#include "PulsarApi.pb.h"
-#include <pulsar/Result.h>
-#include "SharedBuffer.h"
-#include "Backoff.h"
 #include "Commands.h"
+#include "GetLastMessageIdResponse.h"
 #include "LookupDataResult.h"
+#include "SharedBuffer.h"
 #include "UtilAllocator.h"
-#include <pulsar/Client.h>
-#include <set>
-#include <lib/BrokerConsumerStatsImpl.h>
-#include "lib/PeriodicTask.h"
-#include "lib/GetLastMessageIdResponse.h"
-
-using namespace pulsar;
+#include "Utils.h"
 
 namespace pulsar {
 
 class PulsarFriend;
 
+using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
+using TimeDuration = boost::posix_time::time_duration;
+using TcpResolverPtr = std::shared_ptr<boost::asio::ip::tcp::resolver>;
+
 class ExecutorService;
+using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
 
 class ClientConnection;
 typedef std::shared_ptr<ClientConnection> ClientConnectionPtr;
@@ -69,9 +66,18 @@ typedef std::shared_ptr<ConsumerImpl> ConsumerImplPtr;
 typedef std::weak_ptr<ConsumerImpl> ConsumerImplWeakPtr;
 
 class LookupDataResult;
+class BrokerConsumerStatsImpl;
+class PeriodicTask;
 
 struct OpSendMsg;
 
+namespace proto {
+class BaseCommand;
+class CommandActiveConsumerChange;
+class CommandMessage;
+class CommandConnected;
+}  // namespace proto
+
 // Data returned on the request operation. Mostly used on create-producer command
 struct ResponseData {
     std::string producerName;
@@ -193,10 +199,10 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
 
     void processIncomingBuffer();
     bool verifyChecksum(SharedBuffer& incomingBuffer_, uint32_t& remainingBytes,
-                        proto::BaseCommand& incomingCmd_);
+                        proto::BaseCommand& incomingCmd);
 
     void handleActiveConsumerChange(const proto::CommandActiveConsumerChange& change);
-    void handleIncomingCommand();
+    void handleIncomingCommand(proto::BaseCommand& incomingCmd);
     void handleIncomingMessage(const proto::CommandMessage& msg, bool isChecksumValid,
                                proto::MessageMetadata& msgMetadata, SharedBuffer& payload);
 
@@ -288,7 +294,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
     boost::system::error_code error_;
 
     SharedBuffer incomingBuffer_;
-    proto::BaseCommand incomingCmd_;
 
     Promise<Result, ClientConnectionWeakPtr> connectPromise_;
     std::shared_ptr<PeriodicTask> connectTimeoutTask_;
@@ -322,7 +327,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
     int pendingWriteOperations_ = 0;
 
     SharedBuffer outgoingBuffer_;
-    proto::BaseCommand outgoingCmd_;
 
     HandlerAllocator readHandlerAllocator_;
     HandlerAllocator writeHandlerAllocator_;
@@ -343,7 +347,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
     bool isTlsAllowInsecureConnection_ = false;
 
     void closeSocket();
-    void checkServerError(const proto::ServerError& error);
+    void checkServerError(ServerError error);
 };
 }  // namespace pulsar
 
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index 025727a..a9c1653 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -17,27 +17,27 @@
  * under the License.
  */
 #include "ClientImpl.h"
+
+#include <pulsar/ClientConfiguration.h>
+
+#include <random>
+
+#include "BinaryProtoLookupService.h"
 #include "ClientConfigurationImpl.h"
-#include "LogUtils.h"
+#include "Commands.h"
 #include "ConsumerImpl.h"
-#include "ProducerImpl.h"
-#include "ReaderImpl.h"
-#include "PartitionedProducerImpl.h"
+#include "ExecutorService.h"
+#include "HTTPLookupService.h"
+#include "LogUtils.h"
 #include "MultiTopicsConsumerImpl.h"
+#include "PartitionedProducerImpl.h"
 #include "PatternMultiTopicsConsumerImpl.h"
+#include "ProducerImpl.h"
+#include "ReaderImpl.h"
+#include "RetryableLookupService.h"
 #include "TimeUtils.h"
-#include <pulsar/ConsoleLoggerFactory.h>
-#include <boost/algorithm/string/predicate.hpp>
-#include <sstream>
-#include <stdexcept>
-#include <lib/BinaryProtoLookupService.h>
-#include <lib/HTTPLookupService.h>
-#include <lib/RetryableLookupService.h>
-#include <lib/TopicName.h>
-#include <algorithm>
-#include <random>
-#include <mutex>
-#include <thread>
+#include "TopicName.h"
+
 #ifdef USE_LOG4CXX
 #include "Log4CxxLogger.h"
 #endif
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index 50ddeff..8a39396 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -20,22 +20,21 @@
 #define LIB_CLIENTIMPL_H_
 
 #include <pulsar/Client.h>
-#include "ExecutorService.h"
-#include "LookupService.h"
-#include "MemoryLimitController.h"
+
+#include <atomic>
+#include <memory>
+
 #include "ConnectionPool.h"
+#include "Future.h"
 #include "LookupDataResult.h"
-#include <mutex>
-#include <lib/TopicName.h>
-#include "ProducerImplBase.h"
-#include <atomic>
-#include <vector>
+#include "MemoryLimitController.h"
 #include "ServiceNameResolver.h"
 #include "SynchronizedHashMap.h"
 
 namespace pulsar {
 
 class PulsarFriend;
+class ClientImpl;
 typedef std::shared_ptr<ClientImpl> ClientImplPtr;
 typedef std::weak_ptr<ClientImpl> ClientImplWeakPtr;
 
@@ -46,6 +45,21 @@ typedef std::weak_ptr<ReaderImpl> ReaderImplWeakPtr;
 class ConsumerImplBase;
 typedef std::weak_ptr<ConsumerImplBase> ConsumerImplBaseWeakPtr;
 
+class ClientConnection;
+using ClientConnectionWeakPtr = std::weak_ptr<ClientConnection>;
+
+class LookupService;
+using LookupServicePtr = std::shared_ptr<LookupService>;
+
+class ProducerImplBase;
+using ProducerImplBaseWeakPtr = std::weak_ptr<ProducerImplBase>;
+class ConsumerImplBase;
+using ConsumerImplBaseWeakPtr = std::weak_ptr<ConsumerImplBase>;
+class TopicName;
+using TopicNamePtr = std::shared_ptr<TopicName>;
+
+using NamespaceTopicsPtr = std::shared_ptr<std::vector<std::string>>;
+
 std::string generateRandomName();
 
 class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
diff --git a/lib/Commands.cc b/lib/Commands.cc
index 26288d1..13febd0 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -17,25 +17,51 @@
  * under the License.
  */
 #include "Commands.h"
-#include "MessageImpl.h"
-#include "pulsar/Version.h"
-#include "pulsar/MessageBuilder.h"
+
+#include <pulsar/MessageBuilder.h>
+#include <pulsar/Schema.h>
+#include <pulsar/Version.h>
+
+#include <algorithm>
+#include <mutex>
+
 #include "LogUtils.h"
+#include "MessageImpl.h"
 #include "PulsarApi.pb.h"
-#include "Utils.h"
 #include "Url.h"
-#include <pulsar/Schema.h>
 #include "checksum/ChecksumProvider.h"
-#include <algorithm>
-#include <mutex>
 
 using namespace pulsar;
 namespace pulsar {
 
-using namespace pulsar::proto;
-
 DECLARE_LOG_OBJECT();
 
+using proto::AuthData;
+using proto::BaseCommand;
+using proto::CommandAck;
+using proto::CommandAuthResponse;
+using proto::CommandCloseConsumer;
+using proto::CommandCloseProducer;
+using proto::CommandConnect;
+using proto::CommandConsumerStats;
+using proto::CommandFlow;
+using proto::CommandGetLastMessageId;
+using proto::CommandGetTopicsOfNamespace;
+using proto::CommandLookupTopic;
+using proto::CommandPartitionedTopicMetadata;
+using proto::CommandProducer;
+using proto::CommandRedeliverUnacknowledgedMessages;
+using proto::CommandSeek;
+using proto::CommandSend;
+using proto::CommandSubscribe;
+using proto::CommandUnsubscribe;
+using proto::FeatureFlags;
+using proto::IntRange;
+using proto::KeySharedMeta;
+using proto::MessageIdData;
+using proto::ProtocolVersion_MAX;
+using proto::SingleMessageMetadata;
+
 static inline bool isBuiltInSchema(SchemaType schemaType) {
     switch (schemaType) {
         case STRING:
@@ -53,19 +79,19 @@ static inline bool isBuiltInSchema(SchemaType schemaType) {
 static inline proto::Schema_Type getSchemaType(SchemaType type) {
     switch (type) {
         case SchemaType::NONE:
-            return Schema_Type_None;
+            return proto::Schema_Type_None;
         case STRING:
-            return Schema_Type_String;
+            return proto::Schema_Type_String;
         case JSON:
-            return Schema_Type_Json;
+            return proto::Schema_Type_Json;
         case PROTOBUF:
-            return Schema_Type_Protobuf;
+            return proto::Schema_Type_Protobuf;
         case AVRO:
-            return Schema_Type_Avro;
+            return proto::Schema_Type_Avro;
         case PROTOBUF_NATIVE:
-            return Schema_Type_ProtobufNative;
+            return proto::Schema_Type_ProtobufNative;
         default:
-            return Schema_Type_None;
+            return proto::Schema_Type_None;
     }
 }
 
@@ -278,13 +304,14 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string&
     CommandSubscribe* subscribe = cmd.mutable_subscribe();
     subscribe->set_topic(topic);
     subscribe->set_subscription(subscription);
-    subscribe->set_subtype(subType);
+    subscribe->set_subtype(static_cast<proto::CommandSubscribe_SubType>(subType));
     subscribe->set_consumer_id(consumerId);
     subscribe->set_request_id(requestId);
     subscribe->set_consumer_name(consumerName);
     subscribe->set_durable(subscriptionMode == SubscriptionModeDurable);
     subscribe->set_read_compacted(readCompacted);
-    subscribe->set_initialposition(subscriptionInitialPosition);
+    subscribe->set_initialposition(
+        static_cast<proto::CommandSubscribe_InitialPosition>(subscriptionInitialPosition));
     subscribe->set_replicate_subscription_state(replicateSubscriptionState);
     subscribe->set_priority_level(priorityLevel);
 
@@ -362,7 +389,7 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
     producer->set_epoch(epoch);
     producer->set_user_provided_producer_name(userProvidedProducerName);
     producer->set_encrypted(encrypted);
-    producer->set_producer_access_mode(accessMode);
+    producer->set_producer_access_mode(static_cast<proto::ProducerAccessMode>(accessMode));
     if (topicEpoch.is_present()) {
         producer->set_topic_epoch(topicEpoch.value());
     }
@@ -386,17 +413,19 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
     return writeMessageWithSize(cmd);
 }
 
-SharedBuffer Commands::newAck(uint64_t consumerId, const MessageIdData& messageId, CommandAck_AckType ackType,
-                              int validationError) {
+SharedBuffer Commands::newAck(uint64_t consumerId, int64_t ledgerId, int64_t entryId,
+                              CommandAck_AckType ackType, CommandAck_ValidationError validationError) {
     BaseCommand cmd;
     cmd.set_type(BaseCommand::ACK);
     CommandAck* ack = cmd.mutable_ack();
     ack->set_consumer_id(consumerId);
-    ack->set_ack_type(ackType);
-    if (CommandAck_AckType_IsValid(validationError)) {
-        ack->set_validation_error((CommandAck_ValidationError)validationError);
+    ack->set_ack_type(static_cast<proto::CommandAck_AckType>(ackType));
+    if (proto::CommandAck_AckType_IsValid(validationError)) {
+        ack->set_validation_error((proto::CommandAck_ValidationError)validationError);
     }
-    *(ack->add_message_id()) = messageId;
+    auto* msgId = ack->add_message_id();
+    msgId->set_ledgerid(ledgerId);
+    msgId->set_entryid(entryId);
     return writeMessageWithSize(cmd);
 }
 
@@ -405,7 +434,7 @@ SharedBuffer Commands::newMultiMessageAck(uint64_t consumerId, const std::set<Me
     cmd.set_type(BaseCommand::ACK);
     CommandAck* ack = cmd.mutable_ack();
     ack->set_consumer_id(consumerId);
-    ack->set_ack_type(CommandAck_AckType_Individual);
+    ack->set_ack_type(proto::CommandAck_AckType_Individual);
     for (const auto& msgId : msgIds) {
         auto newMsgId = ack->add_message_id();
         newMsgId->set_ledgerid(msgId.ledgerId());
diff --git a/lib/Commands.h b/lib/Commands.h
index 4ff8674..09f6f8b 100644
--- a/lib/Commands.h
+++ b/lib/Commands.h
@@ -20,21 +20,27 @@
 #define LIB_COMMANDS_H_
 
 #include <pulsar/Authentication.h>
-#include <pulsar/defines.h>
+#include <pulsar/KeySharedPolicy.h>
 #include <pulsar/Message.h>
 #include <pulsar/Schema.h>
-#include <pulsar/KeySharedPolicy.h>
+#include <pulsar/defines.h>
 
-#include "PulsarApi.pb.h"
+#include <set>
+
+#include "ProtoApiEnums.h"
 #include "SharedBuffer.h"
 #include "Utils.h"
 
-#include <set>
-
 using namespace pulsar;
 
 namespace pulsar {
 
+namespace proto {
+class BaseCommand;
+class MessageIdData;
+class MessageMetadata;
+}  // namespace proto
+
 typedef std::shared_ptr<proto::MessageMetadata> MessageMetadataPtr;
 
 /**
@@ -85,12 +91,12 @@ class Commands {
 
     static SharedBuffer newSubscribe(const std::string& topic, const std::string& subscription,
                                      uint64_t consumerId, uint64_t requestId,
-                                     proto::CommandSubscribe_SubType subType, const std::string& consumerName,
+                                     CommandSubscribe_SubType subType, const std::string& consumerName,
                                      SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId,
                                      bool readCompacted, const std::map<std::string, std::string>& metadata,
                                      const std::map<std::string, std::string>& subscriptionProperties,
                                      const SchemaInfo& schemaInfo,
-                                     proto::CommandSubscribe_InitialPosition subscriptionInitialPosition,
+                                     CommandSubscribe_InitialPosition subscriptionInitialPosition,
                                      bool replicateSubscriptionState, KeySharedPolicy keySharedPolicy,
                                      int priorityLevel = 0);
 
@@ -101,10 +107,10 @@ class Commands {
                                     const std::map<std::string, std::string>& metadata,
                                     const SchemaInfo& schemaInfo, uint64_t epoch,
                                     bool userProvidedProducerName, bool encrypted,
-                                    proto::ProducerAccessMode accessMode, Optional<uint64_t> topicEpoch);
+                                    ProducerAccessMode accessMode, Optional<uint64_t> topicEpoch);
 
-    static SharedBuffer newAck(uint64_t consumerId, const proto::MessageIdData& messageId,
-                               proto::CommandAck_AckType ackType, int validationError);
+    static SharedBuffer newAck(uint64_t consumerId, int64_t ledgerId, int64_t entryId,
+                               CommandAck_AckType ackType, CommandAck_ValidationError validationError);
     static SharedBuffer newMultiMessageAck(uint64_t consumerId, const std::set<MessageId>& msgIds);
 
     static SharedBuffer newFlow(uint64_t consumerId, uint32_t messagePermits);
@@ -119,7 +125,7 @@ class Commands {
     static SharedBuffer newRedeliverUnacknowledgedMessages(uint64_t consumerId,
                                                            const std::set<MessageId>& messageIds);
 
-    static std::string messageType(proto::BaseCommand::Type type);
+    static std::string messageType(BaseCommand_Type type);
 
     static void initBatchMessageMetadata(const Message& msg, pulsar::proto::MessageMetadata& batchMetadata);
 
diff --git a/lib/CompressionCodec.cc b/lib/CompressionCodec.cc
index c17b534..991d52c 100644
--- a/lib/CompressionCodec.cc
+++ b/lib/CompressionCodec.cc
@@ -17,12 +17,11 @@
  * under the License.
  */
 #include "CompressionCodec.h"
+
 #include "CompressionCodecLZ4.h"
+#include "CompressionCodecSnappy.h"
 #include "CompressionCodecZLib.h"
 #include "CompressionCodecZstd.h"
-#include "CompressionCodecSnappy.h"
-
-#include <cassert>
 
 using namespace pulsar;
 namespace pulsar {
@@ -49,38 +48,6 @@ CompressionCodec& CompressionCodecProvider::getCodec(CompressionType compression
     BOOST_THROW_EXCEPTION(std::logic_error("Invalid CompressionType enumeration value"));
 }
 
-CompressionType CompressionCodecProvider::convertType(proto::CompressionType type) {
-    switch (type) {
-        case proto::NONE:
-            return CompressionNone;
-        case proto::LZ4:
-            return CompressionLZ4;
-        case proto::ZLIB:
-            return CompressionZLib;
-        case proto::ZSTD:
-            return CompressionZSTD;
-        case proto::SNAPPY:
-            return CompressionSNAPPY;
-    }
-    BOOST_THROW_EXCEPTION(std::logic_error("Invalid proto::CompressionType enumeration value"));
-}
-
-proto::CompressionType CompressionCodecProvider::convertType(CompressionType type) {
-    switch (type) {
-        case CompressionNone:
-            return proto::NONE;
-        case CompressionLZ4:
-            return proto::LZ4;
-        case CompressionZLib:
-            return proto::ZLIB;
-        case CompressionZSTD:
-            return proto::ZSTD;
-        case CompressionSNAPPY:
-            return proto::SNAPPY;
-    }
-    BOOST_THROW_EXCEPTION(std::logic_error("Invalid CompressionType enumeration value"));
-}
-
 SharedBuffer CompressionCodecNone::encode(const SharedBuffer& raw) { return raw; }
 
 bool CompressionCodecNone::decode(const SharedBuffer& encoded, uint32_t uncompressedSize,
diff --git a/lib/CompressionCodec.h b/lib/CompressionCodec.h
index fd65f9c..9fe3680 100644
--- a/lib/CompressionCodec.h
+++ b/lib/CompressionCodec.h
@@ -19,14 +19,12 @@
 #ifndef LIB_COMPRESSIONCODEC_H_
 #define LIB_COMPRESSIONCODEC_H_
 
-#include <pulsar/defines.h>
 #include <pulsar/Producer.h>
 
-#include "SharedBuffer.h"
-#include "PulsarApi.pb.h"
-
 #include <map>
 
+#include "SharedBuffer.h"
+
 using namespace pulsar;
 namespace pulsar {
 
@@ -39,9 +37,6 @@ class CompressionCodecSnappy;
 
 class PULSAR_PUBLIC CompressionCodecProvider {
    public:
-    static CompressionType convertType(proto::CompressionType type);
-    static proto::CompressionType convertType(CompressionType type);
-
     static CompressionCodec& getCodec(CompressionType compressionType);
 
    private:
diff --git a/lib/CompressionCodecLZ4.cc b/lib/CompressionCodecLZ4.cc
index 508e4f4..587849b 100644
--- a/lib/CompressionCodecLZ4.cc
+++ b/lib/CompressionCodecLZ4.cc
@@ -18,9 +18,10 @@
  */
 #include "CompressionCodecLZ4.h"
 
-#include "lz4/lz4.h"
 #include <cassert>
 
+#include "lz4/lz4.h"
+
 namespace pulsar {
 
 SharedBuffer CompressionCodecLZ4::encode(const SharedBuffer& raw) {
diff --git a/lib/CompressionCodecSnappy.cc b/lib/CompressionCodecSnappy.cc
index 04b0d97..efd9f38 100644
--- a/lib/CompressionCodecSnappy.cc
+++ b/lib/CompressionCodecSnappy.cc
@@ -19,8 +19,8 @@
 #include "CompressionCodecSnappy.h"
 
 #if HAS_SNAPPY
-#include <snappy.h>
 #include <snappy-sinksource.h>
+#include <snappy.h>
 
 namespace pulsar {
 
diff --git a/lib/CompressionCodecZLib.cc b/lib/CompressionCodecZLib.cc
index 657c548..3c42f0a 100644
--- a/lib/CompressionCodecZLib.cc
+++ b/lib/CompressionCodecZLib.cc
@@ -19,9 +19,7 @@
 #include "CompressionCodecZLib.h"
 
 #include <zlib.h>
-#include <cstring>
-#include <cstdlib>
-#include <cmath>
+
 #include "LogUtils.h"
 
 DECLARE_LOG_OBJECT()
diff --git a/lib/CompressionCodecZLib.h b/lib/CompressionCodecZLib.h
index cd4380b..d03d880 100644
--- a/lib/CompressionCodecZLib.h
+++ b/lib/CompressionCodecZLib.h
@@ -19,9 +19,7 @@
 #ifndef LIB_COMPRESSIONCODECZLIB_H_
 #define LIB_COMPRESSIONCODECZLIB_H_
 
-#include <pulsar/defines.h>
 #include "CompressionCodec.h"
-#include <zlib.h>
 
 // Make symbol visible to unit tests
 
diff --git a/lib/ConnectionPool.cc b/lib/ConnectionPool.cc
index e03697f..1c246d6 100644
--- a/lib/ConnectionPool.cc
+++ b/lib/ConnectionPool.cc
@@ -18,12 +18,13 @@
  */
 #include "ConnectionPool.h"
 
-#include "LogUtils.h"
-#include "Url.h"
-
-#include <boost/asio.hpp>
+#include <boost/asio/ip/tcp.hpp>
 #include <boost/asio/ssl.hpp>
 
+#include "ClientConnection.h"
+#include "ExecutorService.h"
+#include "LogUtils.h"
+
 using boost::asio::ip::tcp;
 namespace ssl = boost::asio::ssl;
 typedef ssl::stream<tcp::socket> ssl_socket;
diff --git a/lib/ConnectionPool.h b/lib/ConnectionPool.h
index 996df54..cd55fc2 100644
--- a/lib/ConnectionPool.h
+++ b/lib/ConnectionPool.h
@@ -19,18 +19,24 @@
 #ifndef _PULSAR_CONNECTION_POOL_HEADER_
 #define _PULSAR_CONNECTION_POOL_HEADER_
 
-#include <pulsar/defines.h>
+#include <pulsar/ClientConfiguration.h>
 #include <pulsar/Result.h>
-
-#include "ClientConnection.h"
+#include <pulsar/defines.h>
 
 #include <atomic>
-#include <string>
 #include <map>
+#include <memory>
 #include <mutex>
+#include <string>
+
+#include "Future.h"
 namespace pulsar {
 
+class ClientConnection;
+using ClientConnectionWeakPtr = std::weak_ptr<ClientConnection>;
 class ExecutorService;
+class ExecutorServiceProvider;
+using ExecutorServiceProviderPtr = std::shared_ptr<ExecutorServiceProvider>;
 
 class PULSAR_PUBLIC ConnectionPool {
    public:
diff --git a/lib/ConsoleLoggerFactory.cc b/lib/ConsoleLoggerFactory.cc
index 397c7fe..7c5df5e 100644
--- a/lib/ConsoleLoggerFactory.cc
+++ b/lib/ConsoleLoggerFactory.cc
@@ -18,7 +18,8 @@
  */
 
 #include <pulsar/ConsoleLoggerFactory.h>
-#include "lib/ConsoleLoggerFactoryImpl.h"
+
+#include "ConsoleLoggerFactoryImpl.h"
 
 namespace pulsar {
 
diff --git a/lib/ConsoleLoggerFactoryImpl.h b/lib/ConsoleLoggerFactoryImpl.h
index 61c1d90..7808547 100644
--- a/lib/ConsoleLoggerFactoryImpl.h
+++ b/lib/ConsoleLoggerFactoryImpl.h
@@ -20,7 +20,8 @@
 #pragma once
 
 #include <pulsar/Logger.h>
-#include "lib/SimpleLogger.h"
+
+#include "SimpleLogger.h"
 
 namespace pulsar {
 
diff --git a/lib/Consumer.cc b/lib/Consumer.cc
index 13fb9f4..8e5c7dd 100644
--- a/lib/Consumer.cc
+++ b/lib/Consumer.cc
@@ -16,12 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#include <pulsar/BrokerConsumerStats.h>
 #include <pulsar/Consumer.h>
+#include <pulsar/ConsumerConfiguration.h>
 #include <pulsar/MessageBuilder.h>
+
 #include "ConsumerImpl.h"
+#include "GetLastMessageIdResponse.h"
 #include "Utils.h"
-#include <lib/BrokerConsumerStatsImpl.h>
-#include <lib/Latch.h>
 
 namespace pulsar {
 
diff --git a/lib/ConsumerConfiguration.cc b/lib/ConsumerConfiguration.cc
index 0705cca..6298245 100644
--- a/lib/ConsumerConfiguration.cc
+++ b/lib/ConsumerConfiguration.cc
@@ -16,10 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <lib/ConsumerConfigurationImpl.h>
+#include <pulsar/ConsumerConfiguration.h>
 
 #include <stdexcept>
-#include <pulsar/ConsumerConfiguration.h>
+
+#include "ConsumerConfigurationImpl.h"
 
 namespace pulsar {
 
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 7be5a6a..155c5bf 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -17,21 +17,30 @@
  * under the License.
  */
 #include "ConsumerImpl.h"
-#include "MessageImpl.h"
-#include "MessagesImpl.h"
+
+#include <algorithm>
+
+#include "AckGroupingTracker.h"
+#include "AckGroupingTrackerDisabled.h"
+#include "AckGroupingTrackerEnabled.h"
+#include "ClientConnection.h"
+#include "ClientImpl.h"
 #include "Commands.h"
+#include "ExecutorService.h"
+#include "GetLastMessageIdResponse.h"
 #include "LogUtils.h"
+#include "MessageCrypto.h"
+#include "MessageIdUtil.h"
+#include "MessageImpl.h"
+#include "MessagesImpl.h"
+#include "PulsarApi.pb.h"
 #include "TimeUtils.h"
-#include <lib/TopicName.h>
-#include "pulsar/Result.h"
-#include "pulsar/MessageId.h"
+#include "TopicName.h"
+#include "UnAckedMessageTrackerDisabled.h"
+#include "UnAckedMessageTrackerEnabled.h"
 #include "Utils.h"
-#include "MessageIdUtil.h"
-#include "AckGroupingTracker.h"
-#include "AckGroupingTrackerEnabled.h"
-#include "AckGroupingTrackerDisabled.h"
-#include <exception>
-#include <algorithm>
+#include "stats/ConsumerStatsDisabled.h"
+#include "stats/ConsumerStatsImpl.h"
 
 namespace pulsar {
 
@@ -396,7 +405,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
 
     if (!isChecksumValid) {
         // Message discarded for checksum error
-        discardCorruptedMessage(cnx, msg.message_id(), proto::CommandAck::ChecksumMismatch);
+        discardCorruptedMessage(cnx, msg.message_id(), CommandAck_ValidationError_ChecksumMismatch);
         return;
     }
 
@@ -613,7 +622,7 @@ bool ConsumerImpl::decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const
         } else if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::DISCARD) {
             LOG_WARN(getName() << "Skipping decryption since CryptoKeyReader is not implemented and config "
                                   "is set to discard");
-            discardCorruptedMessage(cnx, msg.message_id(), proto::CommandAck::DecryptionError);
+            discardCorruptedMessage(cnx, msg.message_id(), CommandAck_ValidationError_DecryptionError);
         } else {
             LOG_ERROR(getName() << "Message delivery failed since CryptoKeyReader is not implemented to "
                                    "consume encrypted message");
@@ -634,7 +643,7 @@ bool ConsumerImpl::decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const
         return true;
     } else if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::DISCARD) {
         LOG_WARN(getName() << "Discarding message since decryption failed and config is set to discard");
-        discardCorruptedMessage(cnx, msg.message_id(), proto::CommandAck::DecryptionError);
+        discardCorruptedMessage(cnx, msg.message_id(), CommandAck_ValidationError_DecryptionError);
     } else {
         LOG_ERROR(getName() << "Message delivery failed since unable to decrypt incoming message");
     }
@@ -649,7 +658,7 @@ bool ConsumerImpl::uncompressMessageIfNeeded(const ClientConnectionPtr& cnx,
         return true;
     }
 
-    CompressionType compressionType = CompressionCodecProvider::convertType(metadata.compression());
+    CompressionType compressionType = static_cast<CompressionType>(metadata.compression());
 
     uint32_t uncompressedSize = metadata.uncompressed_size();
     uint32_t payloadSize = payload.readableBytes();
@@ -658,7 +667,8 @@ bool ConsumerImpl::uncompressMessageIfNeeded(const ClientConnectionPtr& cnx,
             // Uncompressed size is itself corrupted since it cannot be bigger than the MaxMessageSize
             LOG_ERROR(getName() << "Got corrupted payload message size " << payloadSize  //
                                 << " at  " << messageIdData.ledgerid() << ":" << messageIdData.entryid());
-            discardCorruptedMessage(cnx, messageIdData, proto::CommandAck::UncompressedSizeCorruption);
+            discardCorruptedMessage(cnx, messageIdData,
+                                    CommandAck_ValidationError_UncompressedSizeCorruption);
             return false;
         }
     } else {
@@ -669,7 +679,7 @@ bool ConsumerImpl::uncompressMessageIfNeeded(const ClientConnectionPtr& cnx,
     if (!CompressionCodecProvider::getCodec(compressionType).decode(payload, uncompressedSize, payload)) {
         LOG_ERROR(getName() << "Failed to decompress message with " << uncompressedSize  //
                             << " at  " << messageIdData.ledgerid() << ":" << messageIdData.entryid());
-        discardCorruptedMessage(cnx, messageIdData, proto::CommandAck::DecompressionError);
+        discardCorruptedMessage(cnx, messageIdData, CommandAck_ValidationError_DecompressionError);
         return false;
     }
 
@@ -678,12 +688,12 @@ bool ConsumerImpl::uncompressMessageIfNeeded(const ClientConnectionPtr& cnx,
 
 void ConsumerImpl::discardCorruptedMessage(const ClientConnectionPtr& cnx,
                                            const proto::MessageIdData& messageId,
-                                           proto::CommandAck::ValidationError validationError) {
+                                           CommandAck_ValidationError validationError) {
     LOG_ERROR(getName() << "Discarding corrupted message at " << messageId.ledgerid() << ":"
                         << messageId.entryid());
 
-    SharedBuffer cmd =
-        Commands::newAck(consumerId_, messageId, proto::CommandAck::Individual, validationError);
+    SharedBuffer cmd = Commands::newAck(consumerId_, messageId.ledgerid(), messageId.entryid(),
+                                        CommandAck_AckType_Individual, validationError);
 
     cnx->sendCommand(cmd);
     increaseAvailablePermits(cnx);
@@ -899,37 +909,37 @@ void ConsumerImpl::increaseAvailablePermits(const ClientConnectionPtr& currentCn
     }
 }
 
-inline proto::CommandSubscribe_SubType ConsumerImpl::getSubType() {
+inline CommandSubscribe_SubType ConsumerImpl::getSubType() {
     ConsumerType type = config_.getConsumerType();
     switch (type) {
         case ConsumerExclusive:
-            return proto::CommandSubscribe::Exclusive;
+            return CommandSubscribe_SubType_Exclusive;
 
         case ConsumerShared:
-            return proto::CommandSubscribe::Shared;
+            return CommandSubscribe_SubType_Shared;
 
         case ConsumerFailover:
-            return proto::CommandSubscribe::Failover;
+            return CommandSubscribe_SubType_Failover;
 
         case ConsumerKeyShared:
-            return proto::CommandSubscribe_SubType_Key_Shared;
+            return CommandSubscribe_SubType_Key_Shared;
     }
     BOOST_THROW_EXCEPTION(std::logic_error("Invalid ConsumerType enumeration value"));
 }
 
-inline proto::CommandSubscribe_InitialPosition ConsumerImpl::getInitialPosition() {
+inline CommandSubscribe_InitialPosition ConsumerImpl::getInitialPosition() {
     InitialPosition initialPosition = config_.getSubscriptionInitialPosition();
     switch (initialPosition) {
         case InitialPositionLatest:
-            return proto::CommandSubscribe_InitialPosition::CommandSubscribe_InitialPosition_Latest;
+            return CommandSubscribe_InitialPosition_Latest;
 
         case InitialPositionEarliest:
-            return proto::CommandSubscribe_InitialPosition::CommandSubscribe_InitialPosition_Earliest;
+            return CommandSubscribe_InitialPosition_Earliest;
     }
     BOOST_THROW_EXCEPTION(std::logic_error("Invalid InitialPosition enumeration value"));
 }
 
-void ConsumerImpl::statsCallback(Result res, ResultCallback callback, proto::CommandAck_AckType ackType) {
+void ConsumerImpl::statsCallback(Result res, ResultCallback callback, CommandAck_AckType ackType) {
     consumerStatsBasePtr_->messageAcknowledged(res, ackType);
     if (callback) {
         callback(res);
@@ -938,9 +948,9 @@ void ConsumerImpl::statsCallback(Result res, ResultCallback callback, proto::Com
 
 void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
     ResultCallback cb = std::bind(&ConsumerImpl::statsCallback, get_shared_this_ptr(), std::placeholders::_1,
-                                  callback, proto::CommandAck_AckType_Individual);
+                                  callback, CommandAck_AckType_Individual);
     if (msgId.batchIndex() != -1 &&
-        !batchAcknowledgementTracker_.isBatchReady(msgId, proto::CommandAck_AckType_Individual)) {
+        !batchAcknowledgementTracker_.isBatchReady(msgId, CommandAck_AckType_Individual)) {
         cb(ResultOk);
         return;
     }
@@ -949,13 +959,13 @@ void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callb
 
 void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) {
     ResultCallback cb = std::bind(&ConsumerImpl::statsCallback, get_shared_this_ptr(), std::placeholders::_1,
-                                  callback, proto::CommandAck_AckType_Cumulative);
+                                  callback, CommandAck_AckType_Cumulative);
     if (!isCumulativeAcknowledgementAllowed(config_.getConsumerType())) {
         cb(ResultCumulativeAcknowledgementNotAllowedError);
         return;
     }
     if (msgId.batchIndex() != -1 &&
-        !batchAcknowledgementTracker_.isBatchReady(msgId, proto::CommandAck_AckType_Cumulative)) {
+        !batchAcknowledgementTracker_.isBatchReady(msgId, CommandAck_AckType_Cumulative)) {
         MessageId messageId = batchAcknowledgementTracker_.getGreatestCumulativeAckReady(msgId);
         if (messageId == MessageId()) {
             // Nothing to ACK, because the batch that msgId belongs to is NOT completely consumed.
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 3aa632a..d65676b 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -19,46 +19,45 @@
 #ifndef LIB_CONSUMERIMPL_H_
 #define LIB_CONSUMERIMPL_H_
 
-#include <string>
+#include <pulsar/Reader.h>
 
-#include "pulsar/Result.h"
-#include "UnboundedBlockingQueue.h"
-#include "HandlerBase.h"
-#include "ClientConnection.h"
-#include "lib/UnAckedMessageTrackerEnabled.h"
-#include "NegativeAcksTracker.h"
-#include "Commands.h"
-#include "ExecutorService.h"
-#include "ConsumerImplBase.h"
-#include "lib/UnAckedMessageTrackerDisabled.h"
-#include "MessageCrypto.h"
-#include "AckGroupingTracker.h"
-#include "GetLastMessageIdResponse.h"
+#include <functional>
+#include <memory>
 
-#include "CompressionCodec.h"
-#include <boost/dynamic_bitset.hpp>
-#include <map>
 #include "BatchAcknowledgementTracker.h"
-#include <limits>
-#include <lib/BrokerConsumerStatsImpl.h>
-#include <lib/MapCache.h>
-#include <lib/stats/ConsumerStatsImpl.h>
-#include <lib/stats/ConsumerStatsDisabled.h>
-#include <queue>
-#include <atomic>
+#include "BrokerConsumerStatsImpl.h"
+#include "Commands.h"
+#include "CompressionCodec.h"
+#include "ConsumerImplBase.h"
+#include "MapCache.h"
+#include "NegativeAcksTracker.h"
 #include "Synchronized.h"
-
-using namespace pulsar;
+#include "TestUtil.h"
+#include "UnboundedBlockingQueue.h"
 
 namespace pulsar {
-class UnAckedMessageTracker;
+class UnAckedMessageTrackerInterface;
 class ExecutorService;
 class ConsumerImpl;
 class BatchAcknowledgementTracker;
+class MessageCrypto;
+class GetLastMessageIdResponse;
 typedef std::shared_ptr<MessageCrypto> MessageCryptoPtr;
 typedef std::function<void(Result, const GetLastMessageIdResponse&)> BrokerGetLastMessageIdCallback;
 typedef std::shared_ptr<Backoff> BackoffPtr;
 
+class AckGroupingTracker;
+using AckGroupingTrackerPtr = std::shared_ptr<AckGroupingTracker>;
+class ConsumerStatsBase;
+using ConsumerStatsBasePtr = std::shared_ptr<ConsumerStatsBase>;
+class UnAckedMessageTracker;
+using UnAckedMessageTrackerPtr = std::shared_ptr<UnAckedMessageTrackerInterface>;
+
+namespace proto {
+class CommandMessage;
+class MessageMetadata;
+}  // namespace proto
+
 enum ConsumerTopicType
 {
     NonPartitioned,
@@ -82,8 +81,8 @@ class ConsumerImpl : public ConsumerImplBase {
                          bool& isChecksumValid, proto::MessageMetadata& msgMetadata, SharedBuffer& payload);
     void messageProcessed(Message& msg, bool track = true);
     void activeConsumerChanged(bool isActive);
-    inline proto::CommandSubscribe_SubType getSubType();
-    inline proto::CommandSubscribe_InitialPosition getInitialPosition();
+    inline CommandSubscribe_SubType getSubType();
+    inline CommandSubscribe_InitialPosition getInitialPosition();
 
     /**
      * Send individual ACK request of given message ID to broker.
@@ -167,7 +166,7 @@ class ConsumerImpl : public ConsumerImplBase {
                                    const proto::MessageMetadata& metadata, SharedBuffer& payload,
                                    bool checkMaxMessageSize);
     void discardCorruptedMessage(const ClientConnectionPtr& cnx, const proto::MessageIdData& messageId,
-                                 proto::CommandAck::ValidationError validationError);
+                                 CommandAck_ValidationError validationError);
     void increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int delta = 1);
     void drainIncomingMessageQueue(size_t count);
     uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage,
@@ -182,7 +181,7 @@ class ConsumerImpl : public ConsumerImplBase {
     // TODO - Convert these functions to lambda when we move to C++11
     Result receiveHelper(Message& msg);
     Result receiveHelper(Message& msg, int timeout);
-    void statsCallback(Result, ResultCallback, proto::CommandAck_AckType);
+    void statsCallback(Result, ResultCallback, CommandAck_AckType);
     void executeNotifyCallback(Message& msg);
     void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback);
     void failPendingReceiveCallback();
diff --git a/lib/ConsumerImplBase.cc b/lib/ConsumerImplBase.cc
index 4a8c027..6c86aa4 100644
--- a/lib/ConsumerImplBase.cc
+++ b/lib/ConsumerImplBase.cc
@@ -16,18 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include "ConsumerImpl.h"
-#include "MessageImpl.h"
-#include "MessagesImpl.h"
-#include "LogUtils.h"
-#include "TimeUtils.h"
-#include "pulsar/Result.h"
-#include "MessageIdUtil.h"
-#include "AckGroupingTracker.h"
 #include "ConsumerImplBase.h"
 
 #include <algorithm>
 
+#include "ConsumerImpl.h"
+#include "ExecutorService.h"
+#include "LogUtils.h"
+#include "TimeUtils.h"
+
 DECLARE_LOG_OBJECT()
 
 namespace pulsar {
diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h
index 18b8bc1..37b6646 100644
--- a/lib/ConsumerImplBase.h
+++ b/lib/ConsumerImplBase.h
@@ -18,17 +18,18 @@
  */
 #ifndef PULSAR_CONSUMER_IMPL_BASE_HEADER
 #define PULSAR_CONSUMER_IMPL_BASE_HEADER
-#include <pulsar/Message.h>
 #include <pulsar/Consumer.h>
-#include "HandlerBase.h"
+#include <pulsar/Message.h>
+
 #include <queue>
 #include <set>
 
+#include "Future.h"
+#include "HandlerBase.h"
+
 namespace pulsar {
 class ConsumerImplBase;
-class HandlerBase;
-
-typedef std::weak_ptr<ConsumerImplBase> ConsumerImplBaseWeakPtr;
+using ConsumerImplBaseWeakPtr = std::weak_ptr<ConsumerImplBase>;
 
 class OpBatchReceive {
    public:
diff --git a/lib/CryptoKeyReader.cc b/lib/CryptoKeyReader.cc
index 1eb73e8..b64e865 100644
--- a/lib/CryptoKeyReader.cc
+++ b/lib/CryptoKeyReader.cc
@@ -16,12 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <fstream>
-#include <sstream>
-#include <pulsar/EncryptionKeyInfo.h>
 #include <pulsar/CryptoKeyReader.h>
+#include <pulsar/EncryptionKeyInfo.h>
 #include <pulsar/Result.h>
 
+#include <fstream>
+#include <sstream>
+
 using namespace pulsar;
 
 CryptoKeyReader::CryptoKeyReader() {}
@@ -77,4 +78,4 @@ Result DefaultCryptoKeyReader::getPrivateKey(const std::string& keyName,
 CryptoKeyReaderPtr DefaultCryptoKeyReader::create(const std::string& publicKeyPath,
                                                   const std::string& privateKeyPath) {
     return CryptoKeyReaderPtr(new DefaultCryptoKeyReader(publicKeyPath, privateKeyPath));
-}
\ No newline at end of file
+}
diff --git a/lib/DeprecatedException.cc b/lib/DeprecatedException.cc
index 283d8bb..4a5b7bd 100644
--- a/lib/DeprecatedException.cc
+++ b/lib/DeprecatedException.cc
@@ -23,4 +23,4 @@ const std::string DeprecatedException::message_prefix = "Deprecated: ";
 
 DeprecatedException::DeprecatedException(const std::string& __arg)
     : std::runtime_error(message_prefix + __arg) {}
-}  // namespace pulsar
\ No newline at end of file
+}  // namespace pulsar
diff --git a/lib/EncryptionKeyInfoImpl.h b/lib/EncryptionKeyInfoImpl.h
index 0470d1c..5ff4ceb 100644
--- a/lib/EncryptionKeyInfoImpl.h
+++ b/lib/EncryptionKeyInfoImpl.h
@@ -19,15 +19,16 @@
 #ifndef LIB_ENCRYPTIONKEYINFOIMPL_H_
 #define LIB_ENCRYPTIONKEYINFOIMPL_H_
 
+#include <pulsar/defines.h>
+
 #include <map>
 #include <string>
-#include <pulsar/defines.h>
 
 namespace pulsar {
 
 class PULSAR_PUBLIC EncryptionKeyInfoImpl {
    public:
-    typedef std::map<std::string, std::string> StringMap;
+    using StringMap = std::map<std::string, std::string>;
 
     EncryptionKeyInfoImpl() = default;
 
diff --git a/lib/ExecutorService.cc b/lib/ExecutorService.cc
index a7390f1..3a1d35d 100644
--- a/lib/ExecutorService.cc
+++ b/lib/ExecutorService.cc
@@ -18,12 +18,8 @@
  */
 #include "ExecutorService.h"
 
-#include <boost/asio.hpp>
-#include <functional>
-#include <memory>
-#include "TimeUtils.h"
-
 #include "LogUtils.h"
+#include "TimeUtils.h"
 DECLARE_LOG_OBJECT()
 
 namespace pulsar {
diff --git a/lib/ExecutorService.h b/lib/ExecutorService.h
index e4cbb3c..5a32c1b 100644
--- a/lib/ExecutorService.h
+++ b/lib/ExecutorService.h
@@ -19,16 +19,19 @@
 #ifndef _PULSAR_EXECUTOR_SERVICE_HEADER_
 #define _PULSAR_EXECUTOR_SERVICE_HEADER_
 
+#include <pulsar/defines.h>
+
 #include <atomic>
-#include <condition_variable>
-#include <chrono>
-#include <memory>
-#include <boost/asio.hpp>
+#include <boost/asio/deadline_timer.hpp>
+#include <boost/asio/io_service.hpp>
+#include <boost/asio/ip/tcp.hpp>
 #include <boost/asio/ssl.hpp>
+#include <chrono>
+#include <condition_variable>
 #include <functional>
-#include <thread>
+#include <memory>
 #include <mutex>
-#include <pulsar/defines.h>
+#include <thread>
 
 namespace pulsar {
 typedef std::shared_ptr<boost::asio::ip::tcp::socket> SocketPtr;
diff --git a/lib/FileLoggerFactory.cc b/lib/FileLoggerFactory.cc
index a82613f..41ea24a 100644
--- a/lib/FileLoggerFactory.cc
+++ b/lib/FileLoggerFactory.cc
@@ -17,7 +17,8 @@
  * under the License.
  */
 #include <pulsar/FileLoggerFactory.h>
-#include "lib/FileLoggerFactoryImpl.h"
+
+#include "FileLoggerFactoryImpl.h"
 
 namespace pulsar {
 
diff --git a/lib/FileLoggerFactoryImpl.h b/lib/FileLoggerFactoryImpl.h
index 75329c6..2b877a2 100644
--- a/lib/FileLoggerFactoryImpl.h
+++ b/lib/FileLoggerFactoryImpl.h
@@ -18,12 +18,13 @@
  */
 #pragma once
 
+#include <pulsar/Logger.h>
+
 #include <fstream>
 #include <ios>
 #include <string>
-#include <pulsar/Logger.h>
 
-#include "lib/SimpleLogger.h"
+#include "SimpleLogger.h"
 
 namespace pulsar {
 
diff --git a/lib/Future.h b/lib/Future.h
index 6754c89..3593057 100644
--- a/lib/Future.h
+++ b/lib/Future.h
@@ -19,14 +19,13 @@
 #ifndef LIB_FUTURE_H_
 #define LIB_FUTURE_H_
 
-#include <functional>
-#include <mutex>
-#include <memory>
 #include <condition_variable>
-
+#include <functional>
 #include <list>
+#include <memory>
+#include <mutex>
 
-typedef std::unique_lock<std::mutex> Lock;
+using Lock = std::unique_lock<std::mutex>;
 
 namespace pulsar {
 
diff --git a/lib/GetLastMessageIdResponse.h b/lib/GetLastMessageIdResponse.h
index 0acb783..1ff7933 100644
--- a/lib/GetLastMessageIdResponse.h
+++ b/lib/GetLastMessageIdResponse.h
@@ -19,6 +19,7 @@
 #pragma once
 
 #include <pulsar/MessageId.h>
+
 #include <iostream>
 
 namespace pulsar {
diff --git a/lib/HTTPLookupService.cc b/lib/HTTPLookupService.cc
index 91f5d79..8167b64 100644
--- a/lib/HTTPLookupService.cc
+++ b/lib/HTTPLookupService.cc
@@ -16,12 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <lib/HTTPLookupService.h>
+#include "HTTPLookupService.h"
 
 #include <curl/curl.h>
+#include <pulsar/Version.h>
 
 #include <boost/property_tree/json_parser.hpp>
 #include <boost/property_tree/ptree.hpp>
+
+#include "ExecutorService.h"
+#include "LogUtils.h"
+#include "NamespaceName.h"
+#include "ServiceNameResolver.h"
+#include "TopicName.h"
 namespace ptree = boost::property_tree;
 
 DECLARE_LOG_OBJECT()
diff --git a/lib/HTTPLookupService.h b/lib/HTTPLookupService.h
index c9dfc57..929d7ab 100644
--- a/lib/HTTPLookupService.h
+++ b/lib/HTTPLookupService.h
@@ -19,13 +19,16 @@
 #ifndef PULSAR_CPP_HTTPLOOKUPSERVICE_H
 #define PULSAR_CPP_HTTPLOOKUPSERVICE_H
 
-#include <lib/LookupService.h>
-#include <lib/ClientImpl.h>
-#include <lib/Url.h>
-#include <pulsar/Version.h>
-#include <lib/ServiceNameResolver.h>
+#include "ClientImpl.h"
+#include "LookupService.h"
+#include "Url.h"
 
 namespace pulsar {
+
+class ServiceNameResolver;
+using NamespaceTopicsPromise = Promise<Result, NamespaceTopicsPtr>;
+using NamespaceTopicsPromisePtr = std::shared_ptr<NamespaceTopicsPromise>;
+
 class HTTPLookupService : public LookupService, public std::enable_shared_from_this<HTTPLookupService> {
     class CurlInitializer {
        public:
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index 1f4ce6e..0989eac 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -17,11 +17,12 @@
  * under the License.
  */
 #include "HandlerBase.h"
-#include "TimeUtils.h"
-
-#include <cassert>
 
+#include "ClientConnection.h"
+#include "ClientImpl.h"
+#include "ExecutorService.h"
 #include "LogUtils.h"
+#include "TimeUtils.h"
 
 DECLARE_LOG_OBJECT()
 
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index 6616ec4..4a5df5c 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -18,13 +18,14 @@
  */
 #ifndef _PULSAR_HANDLER_BASE_HEADER_
 #define _PULSAR_HANDLER_BASE_HEADER_
-#include "Backoff.h"
-#include "ClientImpl.h"
-#include "ClientConnection.h"
+#include <pulsar/Result.h>
+
+#include <boost/asio/deadline_timer.hpp>
 #include <memory>
-#include <boost/asio.hpp>
+#include <mutex>
 #include <string>
-#include <boost/date_time/local_time/local_time.hpp>
+
+#include "Backoff.h"
 
 namespace pulsar {
 
@@ -35,6 +36,15 @@ using boost::posix_time::seconds;
 class HandlerBase;
 typedef std::weak_ptr<HandlerBase> HandlerBaseWeakPtr;
 typedef std::shared_ptr<HandlerBase> HandlerBasePtr;
+class ClientImpl;
+using ClientImplPtr = std::shared_ptr<ClientImpl>;
+using ClientImplWeakPtr = std::weak_ptr<ClientImpl>;
+class ClientConnection;
+using ClientConnectionPtr = std::shared_ptr<ClientConnection>;
+using ClientConnectionWeakPtr = std::weak_ptr<ClientConnection>;
+class ExecutorService;
+using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
+using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
 
 class HandlerBase {
    public:
diff --git a/lib/JavaStringHash.cc b/lib/JavaStringHash.cc
index bf809bf..2dcbfc2 100644
--- a/lib/JavaStringHash.cc
+++ b/lib/JavaStringHash.cc
@@ -17,6 +17,7 @@
  * under the License.
  */
 #include "JavaStringHash.h"
+
 #include <limits>
 
 namespace pulsar {
diff --git a/lib/JavaStringHash.h b/lib/JavaStringHash.h
index 6059b1a..1a66110 100644
--- a/lib/JavaStringHash.h
+++ b/lib/JavaStringHash.h
@@ -20,11 +20,12 @@
 #define JAVA_DEFAULT_HASH_HPP_
 
 #include <pulsar/defines.h>
-#include "Hash.h"
 
 #include <cstdint>
 #include <string>
 
+#include "Hash.h"
+
 namespace pulsar {
 class PULSAR_PUBLIC JavaStringHash : public Hash {
    public:
diff --git a/lib/KeySharedPolicy.cc b/lib/KeySharedPolicy.cc
index e23a942..6c3e36a 100644
--- a/lib/KeySharedPolicy.cc
+++ b/lib/KeySharedPolicy.cc
@@ -16,11 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <lib/KeySharedPolicyImpl.h>
-
 #include <algorithm>
 #include <stdexcept>
 
+#include "KeySharedPolicyImpl.h"
+
 namespace pulsar {
 
 static const int DefaultHashRangeSize = 2 << 15;
diff --git a/lib/Latch.h b/lib/Latch.h
index f5b711b..749ce29 100644
--- a/lib/Latch.h
+++ b/lib/Latch.h
@@ -19,10 +19,11 @@
 #ifndef LIB_LATCH_H_
 #define LIB_LATCH_H_
 
+#include <pulsar/defines.h>
+
+#include <condition_variable>
 #include <memory>
 #include <mutex>
-#include <condition_variable>
-#include <pulsar/defines.h>
 
 namespace pulsar {
 
diff --git a/lib/Log4CxxLogger.h b/lib/Log4CxxLogger.h
index cd2fe9e..2e3f819 100644
--- a/lib/Log4CxxLogger.h
+++ b/lib/Log4CxxLogger.h
@@ -19,8 +19,8 @@
 
 #pragma once
 
-#include <pulsar/defines.h>
 #include <pulsar/Logger.h>
+#include <pulsar/defines.h>
 
 #ifdef USE_LOG4CXX
 
diff --git a/lib/Log4cxxLogger.cc b/lib/Log4cxxLogger.cc
index fdd0395..fc5ae5b 100644
--- a/lib/Log4cxxLogger.cc
+++ b/lib/Log4cxxLogger.cc
@@ -18,15 +18,16 @@
  */
 
 #include "Log4CxxLogger.h"
+
 #include <iostream>
 
 #ifdef USE_LOG4CXX
 
+#include <log4cxx/consoleappender.h>
 #include <log4cxx/logger.h>
 #include <log4cxx/logmanager.h>
-#include <log4cxx/consoleappender.h>
-#include <log4cxx/propertyconfigurator.h>
 #include <log4cxx/patternlayout.h>
+#include <log4cxx/propertyconfigurator.h>
 
 using namespace log4cxx;
 
diff --git a/lib/LogUtils.cc b/lib/LogUtils.cc
index 3174608..6e8a866 100644
--- a/lib/LogUtils.cc
+++ b/lib/LogUtils.cc
@@ -18,9 +18,10 @@
  */
 #include "LogUtils.h"
 
+#include <pulsar/ConsoleLoggerFactory.h>
+
 #include <atomic>
 #include <iostream>
-#include <pulsar/ConsoleLoggerFactory.h>
 
 #include "Log4CxxLogger.h"
 
diff --git a/lib/LogUtils.h b/lib/LogUtils.h
index 67ddf43..7cfad5b 100644
--- a/lib/LogUtils.h
+++ b/lib/LogUtils.h
@@ -19,12 +19,12 @@
 
 #pragma once
 
-#include <string>
-#include <sstream>
-#include <memory>
-
-#include <pulsar/defines.h>
 #include <pulsar/Logger.h>
+#include <pulsar/defines.h>
+
+#include <memory>
+#include <sstream>
+#include <string>
 
 namespace pulsar {
 
diff --git a/lib/LookupDataResult.h b/lib/LookupDataResult.h
index b48b854..81e50cc 100644
--- a/lib/LookupDataResult.h
+++ b/lib/LookupDataResult.h
@@ -18,12 +18,13 @@
  */
 #ifndef _PULSAR_LOOKUP_DATA_RESULT_HEADER_
 #define _PULSAR_LOOKUP_DATA_RESULT_HEADER_
-#include <string>
-#include <lib/Future.h>
 #include <pulsar/Result.h>
 
 #include <iostream>
 #include <memory>
+#include <string>
+
+#include "Future.h"
 
 namespace pulsar {
 class LookupDataResult;
@@ -67,7 +68,7 @@ class LookupDataResult {
     bool proxyThroughServiceUrl_;
 };
 
-std::ostream& operator<<(std::ostream& os, const LookupDataResult& b) {
+inline std::ostream& operator<<(std::ostream& os, const LookupDataResult& b) {
     os << "{ LookupDataResult [brokerUrl_ = " << b.brokerUrl_ << "] [brokerUrlTls_ = " << b.brokerUrlTls_
        << "] [partitions = " << b.partitions << "] [authoritative = " << b.authoritative
        << "] [redirect = " << b.redirect << "] proxyThroughServiceUrl = " << b.proxyThroughServiceUrl_
diff --git a/lib/LookupService.h b/lib/LookupService.h
index 50f2d84..6af290c 100644
--- a/lib/LookupService.h
+++ b/lib/LookupService.h
@@ -19,19 +19,21 @@
 #ifndef PULSAR_CPP_LOOKUPSERVICE_H
 #define PULSAR_CPP_LOOKUPSERVICE_H
 
-#include <lib/LookupDataResult.h>
 #include <pulsar/Result.h>
-#include <lib/Future.h>
-#include <lib/LogUtils.h>
-#include <lib/TopicName.h>
 
-#include <iostream>
+#include <memory>
+#include <ostream>
 #include <vector>
 
+#include "Future.h"
+#include "LookupDataResult.h"
+
 namespace pulsar {
-typedef std::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
-typedef Promise<Result, NamespaceTopicsPtr> NamespaceTopicsPromise;
-typedef std::shared_ptr<Promise<Result, NamespaceTopicsPtr>> NamespaceTopicsPromisePtr;
+using NamespaceTopicsPtr = std::shared_ptr<std::vector<std::string>>;
+class TopicName;
+using TopicNamePtr = std::shared_ptr<TopicName>;
+class NamespaceName;
+using NamespaceNamePtr = std::shared_ptr<NamespaceName>;
 
 class LookupService {
    public:
diff --git a/lib/MemoryLimitController.h b/lib/MemoryLimitController.h
index 38987ea..30a1836 100644
--- a/lib/MemoryLimitController.h
+++ b/lib/MemoryLimitController.h
@@ -44,4 +44,4 @@ class MemoryLimitController {
     bool isClosed_ = false;
 };
 
-}  // namespace pulsar
\ No newline at end of file
+}  // namespace pulsar
diff --git a/lib/Message.cc b/lib/Message.cc
index b928945..cb7a75e 100644
--- a/lib/Message.cc
+++ b/lib/Message.cc
@@ -16,17 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <pulsar/defines.h>
 #include <pulsar/Message.h>
 #include <pulsar/MessageBuilder.h>
+#include <pulsar/defines.h>
 
-#include "PulsarApi.pb.h"
+#include <iostream>
 
 #include "MessageImpl.h"
+#include "PulsarApi.pb.h"
 #include "SharedBuffer.h"
 
-#include <iostream>
-
 using namespace pulsar;
 
 namespace pulsar {
diff --git a/lib/MessageAndCallbackBatch.cc b/lib/MessageAndCallbackBatch.cc
index 3e229b0..3f50dc0 100644
--- a/lib/MessageAndCallbackBatch.cc
+++ b/lib/MessageAndCallbackBatch.cc
@@ -17,6 +17,7 @@
  * under the License.
  */
 #include "MessageAndCallbackBatch.h"
+
 #include "ClientConnection.h"
 #include "Commands.h"
 #include "LogUtils.h"
diff --git a/lib/MessageAndCallbackBatch.h b/lib/MessageAndCallbackBatch.h
index 38c0d12..3d107c6 100644
--- a/lib/MessageAndCallbackBatch.h
+++ b/lib/MessageAndCallbackBatch.h
@@ -19,13 +19,12 @@
 #ifndef LIB_MESSAGEANDCALLBACK_BATCH_H_
 #define LIB_MESSAGEANDCALLBACK_BATCH_H_
 
-#include <atomic>
-#include <vector>
-
 #include <pulsar/Message.h>
 #include <pulsar/ProducerConfiguration.h>
 
+#include <atomic>
 #include <boost/noncopyable.hpp>
+#include <vector>
 
 namespace pulsar {
 
diff --git a/lib/MessageBuilder.cc b/lib/MessageBuilder.cc
index 977331b..7d8d8cb 100644
--- a/lib/MessageBuilder.cc
+++ b/lib/MessageBuilder.cc
@@ -25,14 +25,13 @@
 
 #include "LogUtils.h"
 #include "MessageImpl.h"
+#include "ObjectPool.h"
 #include "PulsarApi.pb.h"
 #include "SharedBuffer.h"
+#include "TimeUtils.h"
 
 DECLARE_LOG_OBJECT()
 
-#include "ObjectPool.h"
-#include "TimeUtils.h"
-
 using namespace pulsar;
 
 namespace pulsar {
diff --git a/lib/MessageCrypto.cc b/lib/MessageCrypto.cc
index 8798dbf..bab96d1 100644
--- a/lib/MessageCrypto.cc
+++ b/lib/MessageCrypto.cc
@@ -17,9 +17,13 @@
  * under the License.
  */
 
-#include "LogUtils.h"
 #include "MessageCrypto.h"
 
+#include <boost/date_time/posix_time/posix_time.hpp>
+
+#include "LogUtils.h"
+#include "PulsarApi.pb.h"
+
 namespace pulsar {
 
 DECLARE_LOG_OBJECT()
@@ -335,9 +339,10 @@ bool MessageCrypto::encrypt(const std::set<std::string>& encKeys, const CryptoKe
     return true;
 }
 
-bool MessageCrypto::decryptDataKey(const std::string& keyName, const std::string& encryptedDataKey,
-                                   const google::protobuf::RepeatedPtrField<proto::KeyValue>& encKeyMeta,
-                                   const CryptoKeyReaderPtr keyReader) {
+bool MessageCrypto::decryptDataKey(const proto::EncryptionKeys& encKeys, const CryptoKeyReader& keyReader) {
+    const auto& keyName = encKeys.key();
+    const auto& encryptedDataKey = encKeys.value();
+    const auto& encKeyMeta = encKeys.metadata();
     StringMap keyMeta;
     for (auto iter = encKeyMeta.begin(); iter != encKeyMeta.end(); iter++) {
         keyMeta[iter->key()] = iter->value();
@@ -345,7 +350,7 @@ bool MessageCrypto::decryptDataKey(const std::string& keyName, const std::string
 
     // Read the private key info using callback
     EncryptionKeyInfo keyInfo;
-    keyReader->getPrivateKey(keyName, keyMeta, keyInfo);
+    keyReader.getPrivateKey(keyName, keyMeta, keyInfo);
 
     // Convert key from string to RSA key
     RSA* privKey = loadPrivateKey(keyInfo.getKey());
@@ -498,10 +503,7 @@ bool MessageCrypto::decrypt(const proto::MessageMetadata& msgMetadata, SharedBuf
     bool isDataKeyDecrypted = false;
     for (int index = 0; index < msgMetadata.encryption_keys_size(); index++) {
         const proto::EncryptionKeys& encKeys = msgMetadata.encryption_keys(index);
-
-        const std::string& encDataKey = encKeys.value();
-        const google::protobuf::RepeatedPtrField<proto::KeyValue>& encKeyMeta = encKeys.metadata();
-        if (decryptDataKey(encKeys.key(), encDataKey, encKeyMeta, keyReader)) {
+        if (decryptDataKey(encKeys, *keyReader)) {
             isDataKeyDecrypted = true;
             break;
         }
diff --git a/lib/MessageCrypto.h b/lib/MessageCrypto.h
index 2172066..fd139c1 100644
--- a/lib/MessageCrypto.h
+++ b/lib/MessageCrypto.h
@@ -19,26 +19,31 @@
 #ifndef LIB_MESSAGECRYPTO_H_
 #define LIB_MESSAGECRYPTO_H_
 
-#include <iostream>
-#include <map>
-#include <set>
-#include <mutex>
-#include <boost/scoped_array.hpp>
-
-#include <openssl/ssl.h>
-#include <openssl/rand.h>
 #include <openssl/bio.h>
+#include <openssl/engine.h>
 #include <openssl/evp.h>
+#include <openssl/rand.h>
 #include <openssl/rsa.h>
-#include <openssl/engine.h>
+#include <openssl/ssl.h>
+#include <pulsar/CryptoKeyReader.h>
+
+#include <boost/date_time/posix_time/ptime.hpp>
+#include <boost/scoped_array.hpp>
+#include <iostream>
+#include <map>
+#include <mutex>
+#include <set>
 
 #include "SharedBuffer.h"
-#include "ExecutorService.h"
-#include "pulsar/CryptoKeyReader.h"
-#include "PulsarApi.pb.h"
 
 namespace pulsar {
 
+namespace proto {
+class EncryptionKeys;
+class MessageMetadata;
+class KeyValue;
+}  // namespace proto
+
 class MessageCrypto {
    public:
     typedef std::map<std::string, std::string> StringMap;
@@ -128,9 +133,7 @@ class MessageCrypto {
 
     Result addPublicKeyCipher(const std::string& keyName, const CryptoKeyReaderPtr keyReader);
 
-    bool decryptDataKey(const std::string& keyName, const std::string& encryptedDataKey,
-                        const google::protobuf::RepeatedPtrField<proto::KeyValue>& encKeyMeta,
-                        const CryptoKeyReaderPtr keyReader);
+    bool decryptDataKey(const proto::EncryptionKeys& encKeys, const CryptoKeyReader& keyReader);
     bool decryptData(const std::string& dataKeySecret, const proto::MessageMetadata& msgMetadata,
                      SharedBuffer& payload, SharedBuffer& decPayload);
     bool getKeyAndDecryptData(const proto::MessageMetadata& msgMetadata, SharedBuffer& payload,
diff --git a/lib/MessageId.cc b/lib/MessageId.cc
index 31b0154..5b13328 100644
--- a/lib/MessageId.cc
+++ b/lib/MessageId.cc
@@ -17,18 +17,15 @@
  * under the License.
  */
 
-#include <pulsar/defines.h>
 #include <pulsar/MessageId.h>
 
-#include "PulsarApi.pb.h"
-#include "MessageIdImpl.h"
-
 #include <iostream>
 #include <limits>
-#include <stdexcept>
-#include <tuple>
-#include <math.h>
 #include <memory>
+#include <stdexcept>
+
+#include "MessageIdImpl.h"
+#include "PulsarApi.pb.h"
 
 namespace pulsar {
 
diff --git a/lib/MessageIdImpl.h b/lib/MessageIdImpl.h
index ae33da4..9db758c 100644
--- a/lib/MessageIdImpl.h
+++ b/lib/MessageIdImpl.h
@@ -20,6 +20,7 @@
 #pragma once
 
 #include <cstdint>
+#include <string>
 
 namespace pulsar {
 
diff --git a/lib/MessageIdUtil.h b/lib/MessageIdUtil.h
index d6f80a1..1f4ffd3 100644
--- a/lib/MessageIdUtil.h
+++ b/lib/MessageIdUtil.h
@@ -17,15 +17,9 @@
  * under the License.
  */
 #include <pulsar/MessageId.h>
-#include "PulsarApi.pb.h"
 
 namespace pulsar {
 
-inline MessageId toMessageId(const proto::MessageIdData& messageIdData) {
-    return MessageId{messageIdData.partition(), static_cast<int64_t>(messageIdData.ledgerid()),
-                     static_cast<int64_t>(messageIdData.entryid()), messageIdData.batch_index()};
-}
-
 namespace internal {
 template <typename T>
 static int compare(T lhs, T rhs) {
diff --git a/lib/MessageImpl.h b/lib/MessageImpl.h
index c9a37f4..587b663 100644
--- a/lib/MessageImpl.h
+++ b/lib/MessageImpl.h
@@ -21,8 +21,9 @@
 
 #include <pulsar/Message.h>
 #include <pulsar/MessageId.h>
-#include "SharedBuffer.h"
+
 #include "PulsarApi.pb.h"
+#include "SharedBuffer.h"
 
 using namespace pulsar;
 namespace pulsar {
diff --git a/lib/MessageRouterBase.cc b/lib/MessageRouterBase.cc
index c0824f9..8338c57 100644
--- a/lib/MessageRouterBase.cc
+++ b/lib/MessageRouterBase.cc
@@ -37,4 +37,4 @@ MessageRouterBase::MessageRouterBase(ProducerConfiguration::HashingScheme hashin
             break;
     }
 }
-}  // namespace pulsar
\ No newline at end of file
+}  // namespace pulsar
diff --git a/lib/MessageRouterBase.h b/lib/MessageRouterBase.h
index 39374a1..0b290fd 100644
--- a/lib/MessageRouterBase.h
+++ b/lib/MessageRouterBase.h
@@ -19,13 +19,14 @@
 #ifndef PULSAR_CPP_MESSAGEROUTERBASE_H
 #define PULSAR_CPP_MESSAGEROUTERBASE_H
 
-#include <memory>
-
 #include <pulsar/MessageRoutingPolicy.h>
 #include <pulsar/ProducerConfiguration.h>
-#include "Hash.h"
+
+#include <memory>
 
 namespace pulsar {
+class Hash;
+using HashPtr = std::unique_ptr<Hash>;
 typedef std::unique_ptr<Hash> HashPtr;
 
 class MessageRouterBase : public MessageRoutingPolicy {
diff --git a/lib/MessagesImpl.cc b/lib/MessagesImpl.cc
index 7d45cdd..022f25f 100644
--- a/lib/MessagesImpl.cc
+++ b/lib/MessagesImpl.cc
@@ -17,7 +17,8 @@
  * under the License.
  */
 #include "MessagesImpl.h"
-#include "stdexcept"
+
+#include <stdexcept>
 
 MessagesImpl::MessagesImpl(int maxNumberOfMessages, long maxSizeOfMessages)
     : maxNumberOfMessages_(maxNumberOfMessages),
diff --git a/lib/MessagesImpl.h b/lib/MessagesImpl.h
index 0c12768..3ffaf6b 100644
--- a/lib/MessagesImpl.h
+++ b/lib/MessagesImpl.h
@@ -19,9 +19,10 @@
 #ifndef PULSAR_CPP_MESSAGESIMPL_H
 #define PULSAR_CPP_MESSAGESIMPL_H
 
-#include <vector>
 #include <pulsar/Message.h>
 
+#include <vector>
+
 using namespace pulsar;
 
 namespace pulsar {
diff --git a/lib/MultiTopicsBrokerConsumerStatsImpl.cc b/lib/MultiTopicsBrokerConsumerStatsImpl.cc
index 5220307..4f96922 100644
--- a/lib/MultiTopicsBrokerConsumerStatsImpl.cc
+++ b/lib/MultiTopicsBrokerConsumerStatsImpl.cc
@@ -16,10 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <lib/MultiTopicsBrokerConsumerStatsImpl.h>
-#include <boost/date_time/local_time/local_time.hpp>
+#include "MultiTopicsBrokerConsumerStatsImpl.h"
+
 #include <algorithm>
 #include <numeric>
+#include <sstream>
 
 using namespace pulsar;
 
diff --git a/lib/MultiTopicsBrokerConsumerStatsImpl.h b/lib/MultiTopicsBrokerConsumerStatsImpl.h
index a76ecdc..481318e 100644
--- a/lib/MultiTopicsBrokerConsumerStatsImpl.h
+++ b/lib/MultiTopicsBrokerConsumerStatsImpl.h
@@ -19,14 +19,11 @@
 #ifndef PULSAR_CPP_MULTITOPICSBROKERCONSUMERSTATSIMPL_H
 #define PULSAR_CPP_MULTITOPICSBROKERCONSUMERSTATSIMPL_H
 
-#include <string.h>
+#include <functional>
 #include <iostream>
 #include <vector>
-#include <pulsar/defines.h>
-#include <pulsar/Result.h>
-#include <functional>
-#include <boost/date_time/microsec_time_clock.hpp>
-#include <lib/BrokerConsumerStatsImpl.h>
+
+#include "BrokerConsumerStatsImplBase.h"
 
 namespace pulsar {
 class PULSAR_PUBLIC MultiTopicsBrokerConsumerStatsImpl : public BrokerConsumerStatsImplBase {
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index c54f8e8..a170faf 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -17,14 +17,35 @@
  * under the License.
  */
 #include "MultiTopicsConsumerImpl.h"
-#include "MultiResultCallback.h"
-#include "MessagesImpl.h"
+
 #include <stdexcept>
 
+#include "ClientImpl.h"
+#include "ConsumerImpl.h"
+#include "ExecutorService.h"
+#include "LogUtils.h"
+#include "LookupService.h"
+#include "MessageImpl.h"
+#include "MessagesImpl.h"
+#include "MultiResultCallback.h"
+#include "MultiTopicsBrokerConsumerStatsImpl.h"
+#include "TopicName.h"
+#include "UnAckedMessageTrackerDisabled.h"
+#include "UnAckedMessageTrackerEnabled.h"
+
 DECLARE_LOG_OBJECT()
 
 using namespace pulsar;
 
+MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName,
+                                                 int numPartitions, const std::string& subscriptionName,
+                                                 const ConsumerConfiguration& conf,
+                                                 LookupServicePtr lookupServicePtr)
+    : MultiTopicsConsumerImpl(client, {topicName->toString()}, subscriptionName, topicName, conf,
+                              lookupServicePtr) {
+    topicsPartitions_[topicName->toString()] = numPartitions;
+}
+
 MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector<std::string>& topics,
                                                  const std::string& subscriptionName, TopicNamePtr topicName,
                                                  const ConsumerConfiguration& conf,
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index 7c83da9..2b4f83d 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -18,38 +18,46 @@
  */
 #ifndef PULSAR_MULTI_TOPICS_CONSUMER_HEADER
 #define PULSAR_MULTI_TOPICS_CONSUMER_HEADER
-#include "lib/TestUtil.h"
-#include "ConsumerImpl.h"
-#include "ClientImpl.h"
-#include "BlockingQueue.h"
+
+#include <pulsar/Client.h>
+
+#include <memory>
 #include <vector>
-#include <queue>
-#include <mutex>
 
+#include "BlockingQueue.h"
 #include "ConsumerImplBase.h"
-#include "lib/UnAckedMessageTrackerDisabled.h"
-#include <lib/Latch.h>
-#include <lib/MultiTopicsBrokerConsumerStatsImpl.h>
-#include <lib/TopicName.h>
-#include <lib/NamespaceName.h>
-#include <lib/SynchronizedHashMap.h>
+#include "Future.h"
+#include "Latch.h"
+#include "LookupDataResult.h"
+#include "SynchronizedHashMap.h"
+#include "TestUtil.h"
 
 namespace pulsar {
 typedef std::shared_ptr<Promise<Result, Consumer>> ConsumerSubResultPromisePtr;
 
+class ConsumerImpl;
+using ConsumerImplPtr = std::shared_ptr<ConsumerImpl>;
+class ClientImpl;
+using ClientImplPtr = std::shared_ptr<ClientImpl>;
+class TopicName;
+using TopicNamePtr = std::shared_ptr<TopicName>;
+class MultiTopicsBrokerConsumerStatsImpl;
+using MultiTopicsBrokerConsumerStatsPtr = std::shared_ptr<MultiTopicsBrokerConsumerStatsImpl>;
+class UnAckedMessageTrackerInterface;
+using UnAckedMessageTrackerPtr = std::shared_ptr<UnAckedMessageTrackerInterface>;
+class LookupService;
+using LookupServicePtr = std::shared_ptr<LookupService>;
+
 class MultiTopicsConsumerImpl;
 class MultiTopicsConsumerImpl : public ConsumerImplBase {
    public:
+    MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName, int numPartitions,
+                            const std::string& subscriptionName, const ConsumerConfiguration& conf,
+                            LookupServicePtr lookupServicePtr);
     MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector<std::string>& topics,
                             const std::string& subscriptionName, TopicNamePtr topicName,
                             const ConsumerConfiguration& conf, LookupServicePtr lookupServicePtr_);
-    MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName, int numPartitions,
-                            const std::string& subscriptionName, const ConsumerConfiguration& conf,
-                            LookupServicePtr lookupServicePtr)
-        : MultiTopicsConsumerImpl(client, {topicName->toString()}, subscriptionName, topicName, conf,
-                                  lookupServicePtr) {
-        topicsPartitions_[topicName->toString()] = numPartitions;
-    }
+
     ~MultiTopicsConsumerImpl();
     // overrided methods from ConsumerImplBase
     Future<Result, ConsumerImplBaseWeakPtr> getConsumerCreatedFuture() override;
diff --git a/lib/Murmur3_32Hash.h b/lib/Murmur3_32Hash.h
index 50e6f16..d83635d 100644
--- a/lib/Murmur3_32Hash.h
+++ b/lib/Murmur3_32Hash.h
@@ -25,11 +25,12 @@
 #define MURMUR3_32_HASH_HPP_
 
 #include <pulsar/defines.h>
-#include "Hash.h"
 
 #include <cstdint>
 #include <string>
 
+#include "Hash.h"
+
 namespace pulsar {
 
 class PULSAR_PUBLIC Murmur3_32Hash : public Hash {
diff --git a/lib/NamespaceName.cc b/lib/NamespaceName.cc
index 02bde00..f493db2 100644
--- a/lib/NamespaceName.cc
+++ b/lib/NamespaceName.cc
@@ -17,14 +17,14 @@
  * under the License.
  */
 #include "NamespaceName.h"
-#include "NamedEntity.h"
-#include "LogUtils.h"
 
-#include <boost/algorithm/string.hpp>
-#include <memory>
-#include <vector>
 #include <iostream>
+#include <memory>
 #include <sstream>
+#include <vector>
+
+#include "LogUtils.h"
+#include "NamedEntity.h"
 
 DECLARE_LOG_OBJECT()
 namespace pulsar {
diff --git a/lib/NamespaceName.h b/lib/NamespaceName.h
index 86ffc2f..ce451a2 100644
--- a/lib/NamespaceName.h
+++ b/lib/NamespaceName.h
@@ -20,11 +20,12 @@
 #define _PULSAR_NAMESPACE_NAME_HEADER_
 
 #include <pulsar/defines.h>
-#include "ServiceUnitId.h"
 
 #include <memory>
 #include <string>
 
+#include "ServiceUnitId.h"
+
 namespace pulsar {
 
 class PULSAR_PUBLIC NamespaceName : public ServiceUnitId {
diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc
index 8e501dc..3ccf0be 100644
--- a/lib/NegativeAcksTracker.cc
+++ b/lib/NegativeAcksTracker.cc
@@ -19,11 +19,12 @@
 
 #include "NegativeAcksTracker.h"
 
-#include "ConsumerImpl.h"
-
-#include <set>
 #include <functional>
+#include <set>
 
+#include "ClientImpl.h"
+#include "ConsumerImpl.h"
+#include "ExecutorService.h"
 #include "LogUtils.h"
 DECLARE_LOG_OBJECT()
 
diff --git a/lib/NegativeAcksTracker.h b/lib/NegativeAcksTracker.h
index 1476275..c5a945b 100644
--- a/lib/NegativeAcksTracker.h
+++ b/lib/NegativeAcksTracker.h
@@ -19,16 +19,24 @@
 
 #pragma once
 
+#include <pulsar/ConsumerConfiguration.h>
 #include <pulsar/MessageId.h>
 
-#include "ExecutorService.h"
-#include "ClientImpl.h"
-
-#include <mutex>
+#include <boost/asio/deadline_timer.hpp>
+#include <chrono>
 #include <map>
+#include <memory>
+#include <mutex>
 
 namespace pulsar {
 
+class ConsumerImpl;
+class ClientImpl;
+using ClientImplPtr = std::shared_ptr<ClientImpl>;
+using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
+class ExecutorService;
+using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
+
 class NegativeAcksTracker {
    public:
     NegativeAcksTracker(ClientImplPtr client, ConsumerImpl &consumer, const ConsumerConfiguration &conf);
diff --git a/lib/ObjectPool.h b/lib/ObjectPool.h
index 87507a7..883e080 100644
--- a/lib/ObjectPool.h
+++ b/lib/ObjectPool.h
@@ -19,9 +19,8 @@
 #ifndef LIB_OBJECTPOOL_H_
 #define LIB_OBJECTPOOL_H_
 
-#include <algorithm>
-#include <mutex>
 #include <memory>
+#include <mutex>
 
 namespace pulsar {
 
diff --git a/lib/OpSendMsg.h b/lib/OpSendMsg.h
index 365301b..c94bcbe 100644
--- a/lib/OpSendMsg.h
+++ b/lib/OpSendMsg.h
@@ -21,10 +21,12 @@
 
 #include <pulsar/Message.h>
 #include <pulsar/Producer.h>
+
 #include <boost/date_time/posix_time/ptime.hpp>
 
+#include "PulsarApi.pb.h"
+#include "SharedBuffer.h"
 #include "TimeUtils.h"
-#include "MessageImpl.h"
 
 namespace pulsar {
 
diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc
index 3d383ff..26d5796 100644
--- a/lib/PartitionedProducerImpl.cc
+++ b/lib/PartitionedProducerImpl.cc
@@ -17,12 +17,18 @@
  * under the License.
  */
 #include "PartitionedProducerImpl.h"
-#include "LogUtils.h"
-#include <lib/TopicName.h>
+
 #include <sstream>
+
+#include "ClientImpl.h"
+#include "ExecutorService.h"
+#include "LogUtils.h"
+#include "LookupService.h"
+#include "ProducerImpl.h"
 #include "RoundRobinMessageRouter.h"
 #include "SinglePartitionMessageRouter.h"
 #include "TopicMetadataImpl.h"
+#include "TopicName.h"
 
 DECLARE_LOG_OBJECT()
 
@@ -352,10 +358,10 @@ void PartitionedProducerImpl::triggerFlush() {
 
 void PartitionedProducerImpl::flushAsync(FlushCallback callback) {
     if (!flushPromise_ || flushPromise_->isComplete()) {
-        flushPromise_ = std::make_shared<Promise<Result, bool_type>>();
+        flushPromise_ = std::make_shared<Promise<Result, bool>>();
     } else {
         // already in flushing, register a listener callback
-        auto listenerCallback = [callback](Result result, bool_type v) {
+        auto listenerCallback = [callback](Result result, bool v) {
             if (v) {
                 callback(ResultOk);
             } else {
diff --git a/lib/PartitionedProducerImpl.h b/lib/PartitionedProducerImpl.h
index cc7a4e0..b9a4b01 100644
--- a/lib/PartitionedProducerImpl.h
+++ b/lib/PartitionedProducerImpl.h
@@ -16,17 +16,33 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include "ProducerImpl.h"
-#include "ClientImpl.h"
-#include <vector>
-
-#include <mutex>
 #include <pulsar/MessageRoutingPolicy.h>
 #include <pulsar/TopicMetadata.h>
-#include <lib/TopicName.h>
+
+#include <atomic>
+#include <boost/asio/deadline_timer.hpp>
+#include <memory>
+#include <mutex>
+#include <vector>
+
+#include "LookupDataResult.h"
+#include "ProducerImplBase.h"
 
 namespace pulsar {
 
+class ClientImpl;
+using ClientImplPtr = std::shared_ptr<ClientImpl>;
+using ClientImplWeakPtr = std::weak_ptr<ClientImpl>;
+using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
+class ExecutorService;
+using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
+class LookupService;
+using LookupServicePtr = std::shared_ptr<LookupService>;
+class ProducerImpl;
+using ProducerImplPtr = std::shared_ptr<ProducerImpl>;
+class TopicName;
+using TopicNamePtr = std::shared_ptr<TopicName>;
+
 class PartitionedProducerImpl : public ProducerImplBase,
                                 public std::enable_shared_from_this<PartitionedProducerImpl> {
    public:
@@ -107,7 +123,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
     std::unique_ptr<TopicMetadata> topicMetadata_;
 
     std::atomic<int> flushedPartitions_;
-    std::shared_ptr<Promise<Result, bool_type>> flushPromise_;
+    std::shared_ptr<Promise<Result, bool>> flushPromise_;
 
     ExecutorServicePtr listenerExecutor_;
     DeadlineTimerPtr partitionsUpdateTimer_;
diff --git a/lib/PatternMultiTopicsConsumerImpl.cc b/lib/PatternMultiTopicsConsumerImpl.cc
index 8014078..657f869 100644
--- a/lib/PatternMultiTopicsConsumerImpl.cc
+++ b/lib/PatternMultiTopicsConsumerImpl.cc
@@ -18,6 +18,11 @@
  */
 #include "PatternMultiTopicsConsumerImpl.h"
 
+#include "ClientImpl.h"
+#include "ExecutorService.h"
+#include "LogUtils.h"
+#include "LookupService.h"
+
 DECLARE_LOG_OBJECT()
 
 using namespace pulsar;
diff --git a/lib/PatternMultiTopicsConsumerImpl.h b/lib/PatternMultiTopicsConsumerImpl.h
index 448f2e3..28ad23e 100644
--- a/lib/PatternMultiTopicsConsumerImpl.h
+++ b/lib/PatternMultiTopicsConsumerImpl.h
@@ -18,12 +18,14 @@
  */
 #ifndef PULSAR_PATTERN_MULTI_TOPICS_CONSUMER_HEADER
 #define PULSAR_PATTERN_MULTI_TOPICS_CONSUMER_HEADER
-#include "ConsumerImpl.h"
-#include "ClientImpl.h"
-#include <lib/TopicName.h>
-#include <lib/NamespaceName.h>
-#include "MultiTopicsConsumerImpl.h"
 #include <memory>
+#include <string>
+#include <vector>
+
+#include "LookupDataResult.h"
+#include "MultiTopicsConsumerImpl.h"
+#include "NamespaceName.h"
+#include "TopicName.h"
 
 #ifdef PULSAR_USE_BOOST_REGEX
 #include <boost/regex.hpp>
@@ -35,7 +37,9 @@
 
 namespace pulsar {
 
-class PatternMultiTopicsConsumerImpl;
+class ClientImpl;
+using ClientImplPtr = std::shared_ptr<ClientImpl>;
+using NamespaceTopicsPtr = std::shared_ptr<std::vector<std::string>>;
 
 class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl {
    public:
diff --git a/lib/PeriodicTask.cc b/lib/PeriodicTask.cc
index 65bdf23..a196da8 100644
--- a/lib/PeriodicTask.cc
+++ b/lib/PeriodicTask.cc
@@ -16,7 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include "lib/PeriodicTask.h"
+#include "PeriodicTask.h"
+
 #include <boost/date_time/posix_time/posix_time.hpp>
 
 namespace pulsar {
diff --git a/lib/PeriodicTask.h b/lib/PeriodicTask.h
index 159c86a..76f9039 100644
--- a/lib/PeriodicTask.h
+++ b/lib/PeriodicTask.h
@@ -19,12 +19,12 @@
 #pragma once
 
 #include <atomic>
+#include <boost/asio/deadline_timer.hpp>
+#include <boost/asio/io_service.hpp>
 #include <cstdint>
 #include <functional>
 #include <memory>
 
-#include <boost/asio.hpp>
-
 namespace pulsar {
 
 /**
diff --git a/lib/Producer.cc b/lib/Producer.cc
index ad60828..fc588ba 100644
--- a/lib/Producer.cc
+++ b/lib/Producer.cc
@@ -16,12 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <pulsar/Producer.h>
-#include "SharedBuffer.h"
 #include <pulsar/MessageBuilder.h>
+#include <pulsar/Producer.h>
 
-#include "Utils.h"
 #include "ProducerImpl.h"
+#include "Utils.h"
 
 namespace pulsar {
 
diff --git a/lib/ProducerConfiguration.cc b/lib/ProducerConfiguration.cc
index 4f64870..9b3fdfb 100644
--- a/lib/ProducerConfiguration.cc
+++ b/lib/ProducerConfiguration.cc
@@ -16,10 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <lib/ProducerConfigurationImpl.h>
-
 #include <stdexcept>
 
+#include "ProducerConfigurationImpl.h"
+
 namespace pulsar {
 
 const static std::string emptyString;
diff --git a/lib/ProducerConfigurationImpl.h b/lib/ProducerConfigurationImpl.h
index 80c6432..6c2b19d 100644
--- a/lib/ProducerConfigurationImpl.h
+++ b/lib/ProducerConfigurationImpl.h
@@ -20,6 +20,7 @@
 #define LIB_PRODUCERCONFIGURATIONIMPL_H_
 
 #include <pulsar/ProducerConfiguration.h>
+
 #include <memory>
 
 #include "Utils.h"
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index e228c83..1213fce 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -17,17 +17,26 @@
  * under the License.
  */
 #include "ProducerImpl.h"
+
+#include <boost/date_time/posix_time/posix_time.hpp>
+
+#include "BatchMessageContainer.h"
+#include "BatchMessageKeyBasedContainer.h"
+#include "ClientConnection.h"
+#include "ClientImpl.h"
+#include "Commands.h"
+#include "CompressionCodec.h"
+#include "ExecutorService.h"
 #include "LogUtils.h"
+#include "MemoryLimitController.h"
+#include "MessageCrypto.h"
 #include "MessageImpl.h"
-#include "TimeUtils.h"
+#include "OpSendMsg.h"
 #include "PulsarApi.pb.h"
-#include "Commands.h"
-#include "BatchMessageContainerBase.h"
-#include "BatchMessageContainer.h"
-#include "BatchMessageKeyBasedContainer.h"
-#include <boost/date_time/local_time/local_time.hpp>
-#include <lib/TopicName.h>
-#include "MessageAndCallbackBatch.h"
+#include "TimeUtils.h"
+#include "TopicName.h"
+#include "stats/ProducerStatsDisabled.h"
+#include "stats/ProducerStatsImpl.h"
 
 namespace pulsar {
 DECLARE_LOG_OBJECT()
@@ -314,7 +323,7 @@ void ProducerImpl::setMessageMetadata(const Message& msg, const uint64_t& sequen
     msgMetadata.set_publish_time(TimeUtils::currentTimeMillis());
     msgMetadata.set_sequence_id(sequenceId);
     if (conf_.getCompressionType() != CompressionNone) {
-        msgMetadata.set_compression(CompressionCodecProvider::convertType(conf_.getCompressionType()));
+        msgMetadata.set_compression(static_cast<proto::CompressionType>(conf_.getCompressionType()));
         msgMetadata.set_uncompressed_size(uncompressedSize);
     }
     if (!this->getSchemaVersion().empty()) {
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index 0559515..51bf1bb 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -19,36 +19,45 @@
 #ifndef LIB_PRODUCERIMPL_H_
 #define LIB_PRODUCERIMPL_H_
 
-#include <mutex>
-#include <boost/date_time/posix_time/ptime.hpp>
+#include <memory>
 
-#include "ClientImpl.h"
-#include "BlockingQueue.h"
+#include "Future.h"
 #include "HandlerBase.h"
-#include "SharedBuffer.h"
-#include "CompressionCodec.h"
-#include "MessageCrypto.h"
-#include "stats/ProducerStatsDisabled.h"
-#include "stats/ProducerStatsImpl.h"
-#include "PulsarApi.pb.h"
+// In MSVC, the value type of a STL container cannot be forward declared
+#if defined(_MSC_VER)
 #include "OpSendMsg.h"
-#include "BatchMessageContainerBase.h"
+#endif
 #include "PendingFailures.h"
-#include "Semaphore.h"
 #include "PeriodicTask.h"
-
-using namespace pulsar;
+#include "ProducerImplBase.h"
+#include "Semaphore.h"
+#include "Utils.h"
 
 namespace pulsar {
-typedef bool bool_type;
 
-typedef std::shared_ptr<MessageCrypto> MessageCryptoPtr;
+class BatchMessageContainerBase;
+class ClientImpl;
+using ClientImplPtr = std::shared_ptr<ClientImpl>;
+class MessageCrypto;
+using MessageCryptoPtr = std::shared_ptr<MessageCrypto>;
+class ProducerImpl;
+using ProducerImplWeakPtr = std::weak_ptr<ProducerImpl>;
+class ProducerStatsBase;
+using ProducerStatsBasePtr = std::shared_ptr<ProducerStatsBase>;
+struct ResponseData;
+class ProducerImpl;
+using ProducerImplPtr = std::shared_ptr<ProducerImpl>;
 
 class PulsarFriend;
 
 class Producer;
 class MemoryLimitController;
 class TopicName;
+struct OpSendMsg;
+
+namespace proto {
+class MessageMetadata;
+}  // namespace proto
 
 class ProducerImpl : public HandlerBase,
                      public std::enable_shared_from_this<ProducerImpl>,
@@ -160,7 +169,6 @@ class ProducerImpl : public HandlerBase,
     std::string producerStr_;
     uint64_t producerId_;
     int64_t msgSequenceGenerator_;
-    proto::BaseCommand cmd_;
 
     std::unique_ptr<BatchMessageContainerBase> batchMessageContainer_;
     boost::asio::deadline_timer batchTimer_;
diff --git a/lib/ProducerImplBase.h b/lib/ProducerImplBase.h
index 15a6e1d..0b1622c 100644
--- a/lib/ProducerImplBase.h
+++ b/lib/ProducerImplBase.h
@@ -21,6 +21,8 @@
 #include <pulsar/Message.h>
 #include <pulsar/Producer.h>
 
+#include "Future.h"
+
 namespace pulsar {
 class ProducerImplBase;
 
diff --git a/lib/ProtoApiEnums.h b/lib/ProtoApiEnums.h
new file mode 100644
index 0000000..1f1a79f
--- /dev/null
+++ b/lib/ProtoApiEnums.h
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// This file contains some constants with the same names of the enum constants in PulsarApi.pb.h to avoid
+// including the huge PulsarApi.pb.h in *.h and *.cc files.
+// Since it's safe to convert an enum constant to int, this file converts a enum from:
+//
+// ```c++
+// // in PulsarApi.pb.h
+// enum MyEnum {
+//     A = 0,
+//     B = 1
+// };
+// ```
+//
+// to
+//
+// ```c++
+// using MyEnum = int;
+// constexpr MyEnum A = 0;
+// constexpr MyEnum B = 1;
+// ```
+#pragma once
+
+namespace pulsar {
+
+using CommandAck_AckType = int;
+constexpr CommandAck_AckType CommandAck_AckType_Individual = 0;
+constexpr CommandAck_AckType CommandAck_AckType_Cumulative = 1;
+
+using CommandSubscribe_SubType = int;
+constexpr int CommandSubscribe_SubType_Exclusive = 0;
+constexpr int CommandSubscribe_SubType_Shared = 1;
+constexpr int CommandSubscribe_SubType_Failover = 2;
+constexpr int CommandSubscribe_SubType_Key_Shared = 3;
+
+using CommandAck_ValidationError = int;
+constexpr CommandAck_ValidationError CommandAck_ValidationError_UncompressedSizeCorruption = 0;
+constexpr CommandAck_ValidationError CommandAck_ValidationError_DecompressionError = 1;
+constexpr CommandAck_ValidationError CommandAck_ValidationError_ChecksumMismatch = 2;
+constexpr CommandAck_ValidationError CommandAck_ValidationError_BatchDeSerializeError = 3;
+constexpr CommandAck_ValidationError CommandAck_ValidationError_DecryptionError = 4;
+
+using CommandSubscribe_InitialPosition = int;
+constexpr CommandSubscribe_InitialPosition CommandSubscribe_InitialPosition_Latest = 0;
+constexpr CommandSubscribe_InitialPosition CommandSubscribe_InitialPosition_Earliest = 1;
+
+using ProducerAccessMode = int;
+constexpr ProducerAccessMode Shared = 0;
+constexpr ProducerAccessMode Exclusive = 1;
+constexpr ProducerAccessMode WaitForExclusive = 2;
+constexpr ProducerAccessMode ExclusiveWithFencing = 3;
+
+using ServerError = int;
+constexpr ServerError UnknownError = 0;
+constexpr ServerError MetadataError = 1;
+constexpr ServerError PersistenceError = 2;
+constexpr ServerError AuthenticationError = 3;
+constexpr ServerError AuthorizationError = 4;
+constexpr ServerError ConsumerBusy = 5;
+constexpr ServerError ServiceNotReady = 6;
+constexpr ServerError ProducerBlockedQuotaExceededError = 7;
+constexpr ServerError ProducerBlockedQuotaExceededException = 8;
+constexpr ServerError ChecksumError = 9;
+constexpr ServerError UnsupportedVersionError = 10;
+constexpr ServerError TopicNotFound = 11;
+constexpr ServerError SubscriptionNotFound = 12;
+constexpr ServerError ConsumerNotFound = 13;
+constexpr ServerError TooManyRequests = 14;
+constexpr ServerError TopicTerminatedError = 15;
+constexpr ServerError ProducerBusy = 16;
+constexpr ServerError InvalidTopicName = 17;
+constexpr ServerError IncompatibleSchema = 18;
+constexpr ServerError ConsumerAssignError = 19;
+constexpr ServerError TransactionCoordinatorNotFound = 20;
+constexpr ServerError InvalidTxnStatus = 21;
+constexpr ServerError NotAllowedError = 22;
+constexpr ServerError TransactionConflict = 23;
+constexpr ServerError TransactionNotFound = 24;
+constexpr ServerError ProducerFenced = 25;
+
+using BaseCommand_Type = int;
+constexpr BaseCommand_Type BaseCommand_Type_CONNECT = 2;
+constexpr BaseCommand_Type BaseCommand_Type_CONNECTED = 3;
+constexpr BaseCommand_Type BaseCommand_Type_SUBSCRIBE = 4;
+constexpr BaseCommand_Type BaseCommand_Type_PRODUCER = 5;
+constexpr BaseCommand_Type BaseCommand_Type_SEND = 6;
+constexpr BaseCommand_Type BaseCommand_Type_SEND_RECEIPT = 7;
+constexpr BaseCommand_Type BaseCommand_Type_SEND_ERROR = 8;
+constexpr BaseCommand_Type BaseCommand_Type_MESSAGE = 9;
+constexpr BaseCommand_Type BaseCommand_Type_ACK = 10;
+constexpr BaseCommand_Type BaseCommand_Type_FLOW = 11;
+constexpr BaseCommand_Type BaseCommand_Type_UNSUBSCRIBE = 12;
+constexpr BaseCommand_Type BaseCommand_Type_SUCCESS = 13;
+constexpr BaseCommand_Type BaseCommand_Type_ERROR = 14;
+constexpr BaseCommand_Type BaseCommand_Type_CLOSE_PRODUCER = 15;
+constexpr BaseCommand_Type BaseCommand_Type_CLOSE_CONSUMER = 16;
+constexpr BaseCommand_Type BaseCommand_Type_PRODUCER_SUCCESS = 17;
+constexpr BaseCommand_Type BaseCommand_Type_PING = 18;
+constexpr BaseCommand_Type BaseCommand_Type_PONG = 19;
+constexpr BaseCommand_Type BaseCommand_Type_REDELIVER_UNACKNOWLEDGED_MESSAGES = 20;
+constexpr BaseCommand_Type BaseCommand_Type_PARTITIONED_METADATA = 21;
+constexpr BaseCommand_Type BaseCommand_Type_PARTITIONED_METADATA_RESPONSE = 22;
+constexpr BaseCommand_Type BaseCommand_Type_LOOKUP = 23;
+constexpr BaseCommand_Type BaseCommand_Type_LOOKUP_RESPONSE = 24;
+constexpr BaseCommand_Type BaseCommand_Type_CONSUMER_STATS = 25;
+constexpr BaseCommand_Type BaseCommand_Type_CONSUMER_STATS_RESPONSE = 26;
+constexpr BaseCommand_Type BaseCommand_Type_REACHED_END_OF_TOPIC = 27;
+constexpr BaseCommand_Type BaseCommand_Type_SEEK = 28;
+constexpr BaseCommand_Type BaseCommand_Type_GET_LAST_MESSAGE_ID = 29;
+constexpr BaseCommand_Type BaseCommand_Type_GET_LAST_MESSAGE_ID_RESPONSE = 30;
+constexpr BaseCommand_Type BaseCommand_Type_ACTIVE_CONSUMER_CHANGE = 31;
+constexpr BaseCommand_Type BaseCommand_Type_GET_TOPICS_OF_NAMESPACE = 32;
+constexpr BaseCommand_Type BaseCommand_Type_GET_TOPICS_OF_NAMESPACE_RESPONSE = 33;
+constexpr BaseCommand_Type BaseCommand_Type_GET_SCHEMA = 34;
+constexpr BaseCommand_Type BaseCommand_Type_GET_SCHEMA_RESPONSE = 35;
+constexpr BaseCommand_Type BaseCommand_Type_AUTH_CHALLENGE = 36;
+constexpr BaseCommand_Type BaseCommand_Type_AUTH_RESPONSE = 37;
+constexpr BaseCommand_Type BaseCommand_Type_ACK_RESPONSE = 38;
+constexpr BaseCommand_Type BaseCommand_Type_GET_OR_CREATE_SCHEMA = 39;
+constexpr BaseCommand_Type BaseCommand_Type_GET_OR_CREATE_SCHEMA_RESPONSE = 40;
+constexpr BaseCommand_Type BaseCommand_Type_NEW_TXN = 50;
+constexpr BaseCommand_Type BaseCommand_Type_NEW_TXN_RESPONSE = 51;
+constexpr BaseCommand_Type BaseCommand_Type_ADD_PARTITION_TO_TXN = 52;
+constexpr BaseCommand_Type BaseCommand_Type_ADD_PARTITION_TO_TXN_RESPONSE = 53;
+constexpr BaseCommand_Type BaseCommand_Type_ADD_SUBSCRIPTION_TO_TXN = 54;
+constexpr BaseCommand_Type BaseCommand_Type_ADD_SUBSCRIPTION_TO_TXN_RESPONSE = 55;
+constexpr BaseCommand_Type BaseCommand_Type_END_TXN = 56;
+constexpr BaseCommand_Type BaseCommand_Type_END_TXN_RESPONSE = 57;
+constexpr BaseCommand_Type BaseCommand_Type_END_TXN_ON_PARTITION = 58;
+constexpr BaseCommand_Type BaseCommand_Type_END_TXN_ON_PARTITION_RESPONSE = 59;
+constexpr BaseCommand_Type BaseCommand_Type_END_TXN_ON_SUBSCRIPTION = 60;
+constexpr BaseCommand_Type BaseCommand_Type_END_TXN_ON_SUBSCRIPTION_RESPONSE = 61;
+constexpr BaseCommand_Type BaseCommand_Type_TC_CLIENT_CONNECT_REQUEST = 62;
+constexpr BaseCommand_Type BaseCommand_Type_TC_CLIENT_CONNECT_RESPONSE = 63;
+constexpr BaseCommand_Type BaseCommand_Type_WATCH_TOPIC_LIST = 64;
+constexpr BaseCommand_Type BaseCommand_Type_WATCH_TOPIC_LIST_SUCCESS = 65;
+constexpr BaseCommand_Type BaseCommand_Type_WATCH_TOPIC_UPDATE = 66;
+constexpr BaseCommand_Type BaseCommand_Type_WATCH_TOPIC_LIST_CLOSE = 67;
+
+}  // namespace pulsar
diff --git a/lib/ProtobufNativeSchema.cc b/lib/ProtobufNativeSchema.cc
index 3b8a404..edae2ec 100644
--- a/lib/ProtobufNativeSchema.cc
+++ b/lib/ProtobufNativeSchema.cc
@@ -18,12 +18,12 @@
  */
 #include "pulsar/ProtobufNativeSchema.h"
 
-#include <stdexcept>
-#include <vector>
+#include <google/protobuf/descriptor.pb.h>
 
 #include <boost/archive/iterators/base64_from_binary.hpp>
 #include <boost/archive/iterators/transform_width.hpp>
-#include <google/protobuf/descriptor.pb.h>
+#include <stdexcept>
+#include <vector>
 
 using google::protobuf::FileDescriptor;
 using google::protobuf::FileDescriptorSet;
diff --git a/lib/Reader.cc b/lib/Reader.cc
index fa48536..261c0fa 100644
--- a/lib/Reader.cc
+++ b/lib/Reader.cc
@@ -20,8 +20,8 @@
 #include <pulsar/Reader.h>
 
 #include "Future.h"
-#include "Utils.h"
 #include "ReaderImpl.h"
+#include "Utils.h"
 
 namespace pulsar {
 
diff --git a/lib/ReaderConfiguration.cc b/lib/ReaderConfiguration.cc
index 0dfdbed..3ba7fed 100644
--- a/lib/ReaderConfiguration.cc
+++ b/lib/ReaderConfiguration.cc
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <lib/ReaderConfigurationImpl.h>
+#include "ReaderConfigurationImpl.h"
 
 namespace pulsar {
 
diff --git a/lib/ReaderImpl.cc b/lib/ReaderImpl.cc
index 83fa6a5..d1b25b5 100644
--- a/lib/ReaderImpl.cc
+++ b/lib/ReaderImpl.cc
@@ -17,8 +17,12 @@
  * under the License.
  */
 
-#include "ClientImpl.h"
 #include "ReaderImpl.h"
+
+#include "ClientImpl.h"
+#include "ConsumerImpl.h"
+#include "ExecutorService.h"
+#include "GetLastMessageIdResponse.h"
 #include "TopicName.h"
 
 namespace pulsar {
diff --git a/lib/ReaderImpl.h b/lib/ReaderImpl.h
index b0d8a6b..ed16c7d 100644
--- a/lib/ReaderImpl.h
+++ b/lib/ReaderImpl.h
@@ -20,7 +20,15 @@
 #ifndef LIB_READERIMPL_H_
 #define LIB_READERIMPL_H_
 
-#include "ConsumerImpl.h"
+#include <pulsar/Client.h>
+#include <pulsar/ConsumerConfiguration.h>
+#include <pulsar/ReaderConfiguration.h>
+
+#include <atomic>
+#include <memory>
+#include <mutex>
+
+#include "Future.h"
 
 namespace pulsar {
 
@@ -29,6 +37,17 @@ class ReaderImpl;
 typedef std::shared_ptr<ReaderImpl> ReaderImplPtr;
 typedef std::weak_ptr<ReaderImpl> ReaderImplWeakPtr;
 
+class ClientImpl;
+using ClientImplPtr = std::shared_ptr<ClientImpl>;
+using ClientImplWeakPtr = std::weak_ptr<ClientImpl>;
+class ConsumerImplBase;
+using ConsumerImplBaseWeakPtr = std::weak_ptr<ConsumerImplBase>;
+class ConsumerImpl;
+using ConsumerImplPtr = std::shared_ptr<ConsumerImpl>;
+using ConsumerImplWeakPtr = std::weak_ptr<ConsumerImpl>;
+class ExecutorService;
+using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
+
 namespace test {
 
 extern PULSAR_PUBLIC std::mutex readerConfigTestMutex;
@@ -53,7 +72,7 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>
 
     Future<Result, ReaderImplWeakPtr> getReaderCreatedFuture();
 
-    ConsumerImplBaseWeakPtr getConsumer() const noexcept { return consumer_; }
+    ConsumerImplWeakPtr getConsumer() const noexcept { return consumer_; }
 
     void hasMessageAvailableAsync(HasMessageAvailableCallback callback);
 
diff --git a/lib/Result.cc b/lib/Result.cc
index 6682341..3533b1e 100644
--- a/lib/Result.cc
+++ b/lib/Result.cc
@@ -16,8 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <pulsar/defines.h>
 #include <pulsar/Result.h>
+#include <pulsar/defines.h>
 
 #include <iostream>
 
diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h
index a8f7bfc..7d704ec 100644
--- a/lib/RetryableLookupService.h
+++ b/lib/RetryableLookupService.h
@@ -20,11 +20,14 @@
 
 #include <algorithm>
 #include <memory>
-#include "lib/Backoff.h"
-#include "lib/ExecutorService.h"
-#include "lib/LookupService.h"
-#include "lib/SynchronizedHashMap.h"
-#include "lib/LogUtils.h"
+
+#include "Backoff.h"
+#include "ExecutorService.h"
+#include "LogUtils.h"
+#include "LookupDataResult.h"
+#include "LookupService.h"
+#include "SynchronizedHashMap.h"
+#include "TopicName.h"
 
 namespace pulsar {
 
diff --git a/lib/RoundRobinMessageRouter.cc b/lib/RoundRobinMessageRouter.cc
index 51d10e2..e33f8d9 100644
--- a/lib/RoundRobinMessageRouter.cc
+++ b/lib/RoundRobinMessageRouter.cc
@@ -18,11 +18,12 @@
  */
 #include "RoundRobinMessageRouter.h"
 
-#include "TimeUtils.h"
-
 #include <boost/random/mersenne_twister.hpp>
 #include <boost/random/uniform_int_distribution.hpp>
 
+#include "Hash.h"
+#include "TimeUtils.h"
+
 namespace pulsar {
 RoundRobinMessageRouter::RoundRobinMessageRouter(ProducerConfiguration::HashingScheme hashingScheme,
                                                  bool batchingEnabled, uint32_t maxBatchingMessages,
diff --git a/lib/RoundRobinMessageRouter.h b/lib/RoundRobinMessageRouter.h
index be172a0..753573a 100644
--- a/lib/RoundRobinMessageRouter.h
+++ b/lib/RoundRobinMessageRouter.h
@@ -19,15 +19,13 @@
 
 #pragma once
 
-#include <pulsar/defines.h>
-#include <pulsar/MessageRoutingPolicy.h>
 #include <pulsar/ProducerConfiguration.h>
 #include <pulsar/TopicMetadata.h>
-#include "Hash.h"
-#include "MessageRouterBase.h"
 
 #include <atomic>
-#include <boost/date_time/local_time/local_time.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+
+#include "MessageRouterBase.h"
 
 namespace pulsar {
 class PULSAR_PUBLIC RoundRobinMessageRouter : public MessageRouterBase {
diff --git a/lib/Schema.cc b/lib/Schema.cc
index af452f4..17a301e 100644
--- a/lib/Schema.cc
+++ b/lib/Schema.cc
@@ -16,8 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <pulsar/defines.h>
 #include <pulsar/Schema.h>
+#include <pulsar/defines.h>
 
 #include <iostream>
 #include <map>
diff --git a/lib/Semaphore.h b/lib/Semaphore.h
index dcef2ad..14ebb87 100644
--- a/lib/Semaphore.h
+++ b/lib/Semaphore.h
@@ -45,4 +45,4 @@ class Semaphore {
     bool isClosed_ = false;
 };
 
-}  // namespace pulsar
\ No newline at end of file
+}  // namespace pulsar
diff --git a/lib/ServiceNameResolver.h b/lib/ServiceNameResolver.h
index cf7a583..8457d0e 100644
--- a/lib/ServiceNameResolver.h
+++ b/lib/ServiceNameResolver.h
@@ -19,7 +19,9 @@
 #pragma once
 
 #include <assert.h>
+
 #include <atomic>
+
 #include "ServiceURI.h"
 
 namespace pulsar {
diff --git a/lib/ServiceURI.cc b/lib/ServiceURI.cc
index ec515b2..d95ec4a 100644
--- a/lib/ServiceURI.cc
+++ b/lib/ServiceURI.cc
@@ -17,6 +17,7 @@
  * under the License.
  */
 #include "ServiceURI.h"
+
 #include <stdexcept>
 
 namespace pulsar {
diff --git a/lib/ServiceURI.h b/lib/ServiceURI.h
index 4f459d9..6ee26a7 100644
--- a/lib/ServiceURI.h
+++ b/lib/ServiceURI.h
@@ -21,6 +21,7 @@
 #include <string>
 #include <utility>
 #include <vector>
+
 #include "PulsarScheme.h"
 
 namespace pulsar {
diff --git a/lib/SharedBuffer.h b/lib/SharedBuffer.h
index be889a7..7ee2618 100644
--- a/lib/SharedBuffer.h
+++ b/lib/SharedBuffer.h
@@ -19,9 +19,11 @@
 #ifndef LIB_SHARED_BUFFER_H_
 #define LIB_SHARED_BUFFER_H_
 
-#include <boost/asio.hpp>
+#include <assert.h>
 
 #include <array>
+#include <boost/asio/buffer.hpp>
+#include <boost/asio/detail/socket_ops.hpp>
 #include <memory>
 #include <string>
 #include <utility>
diff --git a/lib/SimpleLogger.h b/lib/SimpleLogger.h
index b750336..d1e44c7 100644
--- a/lib/SimpleLogger.h
+++ b/lib/SimpleLogger.h
@@ -19,11 +19,13 @@
 
 #pragma once
 
+#include <pulsar/Logger.h>
+
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/format.hpp>
 #include <iostream>
 #include <sstream>
 #include <thread>
-#include <boost/date_time/posix_time/posix_time.hpp>
-#include <boost/format.hpp>
 
 namespace pulsar {
 
diff --git a/lib/SinglePartitionMessageRouter.cc b/lib/SinglePartitionMessageRouter.cc
index 5ebe4c8..1658ba6 100644
--- a/lib/SinglePartitionMessageRouter.cc
+++ b/lib/SinglePartitionMessageRouter.cc
@@ -21,6 +21,8 @@
 #include <chrono>
 #include <random>
 
+#include "Hash.h"
+
 namespace pulsar {
 SinglePartitionMessageRouter::~SinglePartitionMessageRouter() {}
 
diff --git a/lib/SinglePartitionMessageRouter.h b/lib/SinglePartitionMessageRouter.h
index 4407bd4..4e5d24d 100644
--- a/lib/SinglePartitionMessageRouter.h
+++ b/lib/SinglePartitionMessageRouter.h
@@ -19,11 +19,11 @@
 #ifndef PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_
 #define PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_
 
-#include <pulsar/defines.h>
 #include <pulsar/MessageRoutingPolicy.h>
-#include <include/pulsar/ProducerConfiguration.h>
-#include "Hash.h"
+#include <pulsar/ProducerConfiguration.h>
 #include <pulsar/TopicMetadata.h>
+#include <pulsar/defines.h>
+
 #include "MessageRouterBase.h"
 
 namespace pulsar {
diff --git a/lib/SynchronizedHashMap.h b/lib/SynchronizedHashMap.h
index 9bed7d7..b8a8c91 100644
--- a/lib/SynchronizedHashMap.h
+++ b/lib/SynchronizedHashMap.h
@@ -23,6 +23,7 @@
 #include <unordered_map>
 #include <utility>
 #include <vector>
+
 #include "Utils.h"
 
 namespace pulsar {
diff --git a/lib/TimeUtils.h b/lib/TimeUtils.h
index 45157ae..a55773d 100644
--- a/lib/TimeUtils.h
+++ b/lib/TimeUtils.h
@@ -18,12 +18,12 @@
  */
 #pragma once
 
-#include <boost/date_time/local_time/local_time.hpp>
+#include <pulsar/defines.h>
+
 #include <atomic>
+#include <boost/date_time/posix_time/posix_time.hpp>
 #include <chrono>
 
-#include <pulsar/defines.h>
-
 namespace pulsar {
 
 using namespace boost::posix_time;
diff --git a/lib/TopicMetadataImpl.cc b/lib/TopicMetadataImpl.cc
index e29cd97..2f622b2 100644
--- a/lib/TopicMetadataImpl.cc
+++ b/lib/TopicMetadataImpl.cc
@@ -23,4 +23,4 @@ namespace pulsar {
 TopicMetadataImpl::TopicMetadataImpl(const int numPartitions) : numPartitions_(numPartitions) {}
 
 int TopicMetadataImpl::getNumPartitions() const { return numPartitions_; }
-}  // namespace pulsar
\ No newline at end of file
+}  // namespace pulsar
diff --git a/lib/TopicMetadataImpl.h b/lib/TopicMetadataImpl.h
index 76393c4..aff763d 100644
--- a/lib/TopicMetadataImpl.h
+++ b/lib/TopicMetadataImpl.h
@@ -19,7 +19,6 @@
 #ifndef TOPIC_METADATA_IMPL_HPP_
 #define TOPIC_METADATA_IMPL_HPP_
 
-#include <pulsar/defines.h>
 #include <pulsar/TopicMetadata.h>
 
 namespace pulsar {
diff --git a/lib/TopicName.cc b/lib/TopicName.cc
index 70b7b7e..48c52c4 100644
--- a/lib/TopicName.cc
+++ b/lib/TopicName.cc
@@ -16,26 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include "NamedEntity.h"
-#include "LogUtils.h"
-#include "PartitionedProducerImpl.h"
 #include "TopicName.h"
 
 #include <boost/algorithm/string.hpp>
-#include <boost/algorithm/string/find.hpp>
+#include <exception>
+#include <iostream>
 #include <memory>
+#include <sstream>
 #include <string>
 #include <vector>
-#include <iostream>
-#include <sstream>
-#include <algorithm>
-#include <exception>
+
+#include "LogUtils.h"
+#include "NamedEntity.h"
+#include "NamespaceName.h"
 
 DECLARE_LOG_OBJECT()
 namespace pulsar {
 
 const std::string TopicDomain::Persistent = "persistent";
 const std::string TopicDomain::NonPersistent = "non-persistent";
+static const std::string PARTITION_NAME_SUFFIX = "-partition-";
 
 typedef std::unique_lock<std::mutex> Lock;
 // static members
@@ -233,12 +233,12 @@ bool TopicName::isPersistent() const { return this->domain_ == TopicDomain::Pers
 std::string TopicName::getTopicPartitionName(unsigned int partition) const {
     std::stringstream topicPartitionName;
     // make this topic name as well
-    topicPartitionName << toString() << PartitionedProducerImpl::PARTITION_NAME_SUFFIX << partition;
+    topicPartitionName << toString() << PARTITION_NAME_SUFFIX << partition;
     return topicPartitionName.str();
 }
 
 int TopicName::getPartitionIndex(const std::string& topic) {
-    const auto& suffix = PartitionedProducerImpl::PARTITION_NAME_SUFFIX;
+    const auto& suffix = PARTITION_NAME_SUFFIX;
     const size_t pos = topic.rfind(suffix);
     if (pos == std::string::npos) {
         return -1;
diff --git a/lib/TopicName.h b/lib/TopicName.h
index d8620ea..51f701f 100644
--- a/lib/TopicName.h
+++ b/lib/TopicName.h
@@ -19,15 +19,20 @@
 #ifndef _PULSAR_TOPIC_NAME_HEADER_
 #define _PULSAR_TOPIC_NAME_HEADER_
 
+#include <curl/curl.h>
 #include <pulsar/defines.h>
-#include "NamespaceName.h"
-#include "ServiceUnitId.h"
 
-#include <string>
-#include <curl/curl.h>
+#include <memory>
 #include <mutex>
+#include <string>
+
+#include "ServiceUnitId.h"
 
 namespace pulsar {
+
+class NamespaceName;
+using NamespaceNamePtr = std::shared_ptr<NamespaceName>;
+
 class PULSAR_PUBLIC TopicDomain {
    public:
     static const std::string Persistent;
diff --git a/lib/UnAckedMessageTrackerDisabled.h b/lib/UnAckedMessageTrackerDisabled.h
index c25c1a5..bd12b3e 100644
--- a/lib/UnAckedMessageTrackerDisabled.h
+++ b/lib/UnAckedMessageTrackerDisabled.h
@@ -18,7 +18,7 @@
  */
 #ifndef LIB_UNACKEDMESSAGETRACKERDISABLED_H_
 #define LIB_UNACKEDMESSAGETRACKERDISABLED_H_
-#include "lib/UnAckedMessageTrackerInterface.h"
+#include "UnAckedMessageTrackerInterface.h"
 namespace pulsar {
 
 class UnAckedMessageTrackerDisabled : public UnAckedMessageTrackerInterface {
diff --git a/lib/UnAckedMessageTrackerEnabled.cc b/lib/UnAckedMessageTrackerEnabled.cc
index 9d0160f..1bc878b 100644
--- a/lib/UnAckedMessageTrackerEnabled.cc
+++ b/lib/UnAckedMessageTrackerEnabled.cc
@@ -20,6 +20,11 @@
 
 #include <functional>
 
+#include "ClientImpl.h"
+#include "ConsumerImplBase.h"
+#include "ExecutorService.h"
+#include "LogUtils.h"
+
 DECLARE_LOG_OBJECT();
 
 namespace pulsar {
diff --git a/lib/UnAckedMessageTrackerEnabled.h b/lib/UnAckedMessageTrackerEnabled.h
index 7ed7b03..13dee21 100644
--- a/lib/UnAckedMessageTrackerEnabled.h
+++ b/lib/UnAckedMessageTrackerEnabled.h
@@ -18,17 +18,27 @@
  */
 #ifndef LIB_UNACKEDMESSAGETRACKERENABLED_H_
 #define LIB_UNACKEDMESSAGETRACKERENABLED_H_
-#include "lib/TestUtil.h"
-#include "lib/UnAckedMessageTrackerInterface.h"
-
+#include <boost/asio/deadline_timer.hpp>
+#include <deque>
+#include <map>
 #include <mutex>
+#include <set>
+
+#include "TestUtil.h"
+#include "UnAckedMessageTrackerInterface.h"
 
 namespace pulsar {
+
+class ClientImpl;
+class ConsumerImplBase;
+using ClientImplPtr = std::shared_ptr<ClientImpl>;
+using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
+
 class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
    public:
     ~UnAckedMessageTrackerEnabled();
-    UnAckedMessageTrackerEnabled(long timeoutMs, const ClientImplPtr, ConsumerImplBase&);
-    UnAckedMessageTrackerEnabled(long timeoutMs, long tickDuration, const ClientImplPtr, ConsumerImplBase&);
+    UnAckedMessageTrackerEnabled(long timeoutMs, ClientImplPtr, ConsumerImplBase&);
+    UnAckedMessageTrackerEnabled(long timeoutMs, long tickDuration, ClientImplPtr, ConsumerImplBase&);
     bool add(const MessageId& msgId);
     bool remove(const MessageId& msgId);
     void removeMessagesTill(const MessageId& msgId);
diff --git a/lib/UnAckedMessageTrackerInterface.h b/lib/UnAckedMessageTrackerInterface.h
index 50fa72c..3bcaaa5 100644
--- a/lib/UnAckedMessageTrackerInterface.h
+++ b/lib/UnAckedMessageTrackerInterface.h
@@ -18,18 +18,10 @@
  */
 #ifndef LIB_UNACKEDMESSAGETRACKERINTERFACE_H_
 #define LIB_UNACKEDMESSAGETRACKERINTERFACE_H_
-#include <string>
+#include <pulsar/MessageId.h>
+
 #include <memory>
-#include <set>
-#include <algorithm>
-#include <utility>
-#include "pulsar/MessageId.h"
-#include "lib/ClientImpl.h"
-#include "lib/ConsumerImplBase.h"
-#include <boost/asio.hpp>
-#include <lib/LogUtils.h>
-#include "lib/PulsarApi.pb.h"
-#include <boost/asio/error.hpp>
+#include <string>
 namespace pulsar {
 
 class UnAckedMessageTrackerInterface {
diff --git a/lib/UnboundedBlockingQueue.h b/lib/UnboundedBlockingQueue.h
index 0f7fc2a..01ffc0c 100644
--- a/lib/UnboundedBlockingQueue.h
+++ b/lib/UnboundedBlockingQueue.h
@@ -19,9 +19,9 @@
 #ifndef LIB_UNBOUNDEDBLOCKINGQUEUE_H_
 #define LIB_UNBOUNDEDBLOCKINGQUEUE_H_
 
-#include <mutex>
-#include <condition_variable>
 #include <boost/circular_buffer.hpp>
+#include <condition_variable>
+#include <mutex>
 // For struct QueueNotEmpty
 #include "BlockingQueue.h"
 
diff --git a/lib/Url.cc b/lib/Url.cc
index f31e1fc..b52b723 100644
--- a/lib/Url.cc
+++ b/lib/Url.cc
@@ -19,7 +19,6 @@
 #include "Url.h"
 
 #include <map>
-
 #include <sstream>
 
 #ifdef PULSAR_USE_BOOST_REGEX
diff --git a/lib/Url.h b/lib/Url.h
index f5596c5..5cf76ef 100644
--- a/lib/Url.h
+++ b/lib/Url.h
@@ -19,9 +19,10 @@
 #ifndef LIB_URL_H_
 #define LIB_URL_H_
 
-#include <string>
 #include <pulsar/defines.h>
 
+#include <string>
+
 namespace pulsar {
 
 /**
diff --git a/lib/UtilAllocator.h b/lib/UtilAllocator.h
index acd1414..7096e35 100644
--- a/lib/UtilAllocator.h
+++ b/lib/UtilAllocator.h
@@ -20,6 +20,7 @@
 #define LIB_UTILALLOCATOR_H_
 
 #include <boost/aligned_storage.hpp>
+#include <boost/noncopyable.hpp>
 
 class HandlerAllocator : private boost::noncopyable {
    public:
diff --git a/lib/Utils.h b/lib/Utils.h
index b0f500e..016f09f 100644
--- a/lib/Utils.h
+++ b/lib/Utils.h
@@ -21,10 +21,10 @@
 
 #include <pulsar/Result.h>
 
-#include "Future.h"
-
-#include <map>
 #include <iostream>
+#include <map>
+
+#include "Future.h"
 
 namespace pulsar {
 
diff --git a/lib/auth/AuthAthenz.cc b/lib/auth/AuthAthenz.cc
index 82d1276..2360ab8 100644
--- a/lib/auth/AuthAthenz.cc
+++ b/lib/auth/AuthAthenz.cc
@@ -16,15 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <lib/auth/AuthAthenz.h>
+#include "AuthAthenz.h"
 
 #include <boost/property_tree/json_parser.hpp>
 #include <boost/property_tree/ptree.hpp>
-namespace ptree = boost::property_tree;
-
+#include <functional>
 #include <sstream>
 
-#include <functional>
+#include "athenz/ZTSClient.h"
+#include "lib/LogUtils.h"
+
+namespace ptree = boost::property_tree;
 
 DECLARE_LOG_OBJECT()
 
diff --git a/lib/auth/AuthAthenz.h b/lib/auth/AuthAthenz.h
index e58a4bc..7d2048f 100644
--- a/lib/auth/AuthAthenz.h
+++ b/lib/auth/AuthAthenz.h
@@ -20,11 +20,11 @@
 #define PULSAR_AUTH_ATHENZ_H_
 
 #include <pulsar/Authentication.h>
-#include <lib/auth/athenz/ZTSClient.h>
-#include <string>
 
 namespace pulsar {
 
+class ZTSClient;
+
 const std::string ATHENZ_PLUGIN_NAME = "athenz";
 const std::string ATHENZ_JAVA_PLUGIN_NAME = "org.apache.pulsar.client.impl.auth.AuthenticationAthenz";
 
diff --git a/lib/auth/AuthBasic.cc b/lib/auth/AuthBasic.cc
index ca74803..8d42061 100644
--- a/lib/auth/AuthBasic.cc
+++ b/lib/auth/AuthBasic.cc
@@ -19,15 +19,14 @@
 
 #include "AuthBasic.h"
 
-#include <stdexcept>
 #include <boost/archive/iterators/base64_from_binary.hpp>
 #include <boost/archive/iterators/transform_width.hpp>
 #include <boost/property_tree/json_parser.hpp>
 #include <boost/property_tree/ptree.hpp>
-namespace ptree = boost::property_tree;
-
-#include <sstream>
 #include <functional>
+#include <sstream>
+#include <stdexcept>
+namespace ptree = boost::property_tree;
 
 namespace pulsar {
 
diff --git a/lib/auth/AuthBasic.h b/lib/auth/AuthBasic.h
index 2bd9e11..65c104f 100644
--- a/lib/auth/AuthBasic.h
+++ b/lib/auth/AuthBasic.h
@@ -20,8 +20,6 @@
 #pragma once
 
 #include <pulsar/Authentication.h>
-#include <string>
-#include <boost/function.hpp>
 
 namespace pulsar {
 
diff --git a/lib/auth/AuthOauth2.cc b/lib/auth/AuthOauth2.cc
index 2fce804..66c1b05 100644
--- a/lib/auth/AuthOauth2.cc
+++ b/lib/auth/AuthOauth2.cc
@@ -16,15 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <lib/auth/AuthOauth2.h>
+#include "AuthOauth2.h"
 
 #include <curl/curl.h>
-#include <sstream>
-#include <stdexcept>
+
 #include <boost/property_tree/json_parser.hpp>
 #include <boost/property_tree/ptree.hpp>
+#include <sstream>
+#include <stdexcept>
 
-#include <lib/LogUtils.h>
+#include "lib/LogUtils.h"
 DECLARE_LOG_OBJECT()
 
 namespace pulsar {
diff --git a/lib/auth/AuthOauth2.h b/lib/auth/AuthOauth2.h
index c940cf9..565af06 100644
--- a/lib/auth/AuthOauth2.h
+++ b/lib/auth/AuthOauth2.h
@@ -20,9 +20,9 @@
 #pragma once
 
 #include <pulsar/Authentication.h>
+
 #include <chrono>
 #include <mutex>
-#include <string>
 
 namespace pulsar {
 
diff --git a/lib/auth/AuthTls.cc b/lib/auth/AuthTls.cc
index fdf7f21..e0287cd 100644
--- a/lib/auth/AuthTls.cc
+++ b/lib/auth/AuthTls.cc
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <lib/auth/AuthTls.h>
+#include "AuthTls.h"
 
 namespace pulsar {
 AuthDataTls::AuthDataTls(const std::string& certificatePath, const std::string& privateKeyPath) {
diff --git a/lib/auth/AuthTls.h b/lib/auth/AuthTls.h
index 510aea0..8b243e0 100644
--- a/lib/auth/AuthTls.h
+++ b/lib/auth/AuthTls.h
@@ -20,7 +20,6 @@
 #pragma once
 
 #include <pulsar/Authentication.h>
-#include <string>
 
 namespace pulsar {
 
diff --git a/lib/auth/AuthToken.cc b/lib/auth/AuthToken.cc
index e8ebc72..367a26d 100644
--- a/lib/auth/AuthToken.cc
+++ b/lib/auth/AuthToken.cc
@@ -19,11 +19,9 @@
 #include "AuthToken.h"
 
 #include <boost/algorithm/string/predicate.hpp>
-#include <functional>
-#include <stdexcept>
-
-#include <sstream>
 #include <fstream>
+#include <sstream>
+#include <stdexcept>
 
 namespace pulsar {
 
diff --git a/lib/auth/AuthToken.h b/lib/auth/AuthToken.h
index 8473fe3..eed7b30 100644
--- a/lib/auth/AuthToken.h
+++ b/lib/auth/AuthToken.h
@@ -20,8 +20,6 @@
 #pragma once
 
 #include <pulsar/Authentication.h>
-#include <string>
-#include <boost/function.hpp>
 
 namespace pulsar {
 
diff --git a/lib/auth/athenz/ZTSClient.cc b/lib/auth/athenz/ZTSClient.cc
index 919536f..65bfd21 100644
--- a/lib/auth/athenz/ZTSClient.cc
+++ b/lib/auth/athenz/ZTSClient.cc
@@ -17,22 +17,23 @@
  * under the License.
  */
 #include "ZTSClient.h"
+
 #include <sstream>
 
+#include "lib/LogUtils.h"
+
 #ifndef _MSC_VER
 #include <unistd.h>
 #else
 #include <stdio.h>
 #endif
-#include <string.h>
-#include <time.h>
-
-#include <openssl/sha.h>
-#include <openssl/rsa.h>
+#include <curl/curl.h>
 #include <openssl/ec.h>
 #include <openssl/pem.h>
-
-#include <curl/curl.h>
+#include <openssl/rsa.h>
+#include <openssl/sha.h>
+#include <string.h>
+#include <time.h>
 
 #include <boost/property_tree/json_parser.hpp>
 #include <boost/property_tree/ptree.hpp>
@@ -51,7 +52,6 @@ namespace ptree = boost::property_tree;
 
 #include <boost/archive/iterators/base64_from_binary.hpp>
 #include <boost/archive/iterators/transform_width.hpp>
-
 #include <mutex>
 
 #ifdef PULSAR_USE_BOOST_REGEX
diff --git a/lib/auth/athenz/ZTSClient.h b/lib/auth/athenz/ZTSClient.h
index fdc690c..429087e 100644
--- a/lib/auth/athenz/ZTSClient.h
+++ b/lib/auth/athenz/ZTSClient.h
@@ -16,10 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <string>
-#include <map>
 #include <pulsar/defines.h>
-#include <lib/LogUtils.h>
+
+#include <map>
+#include <string>
 
 namespace pulsar {
 
diff --git a/lib/c/cStringMap.cc b/lib/c/cStringMap.cc
index 221dce4..a24c2f6 100644
--- a/lib/c/cStringMap.cc
+++ b/lib/c/cStringMap.cc
@@ -57,4 +57,4 @@ const char *pulsar_string_map_get_value(pulsar_string_map_t *map, int idx) {
     }
 
     return it->second.c_str();
-}
\ No newline at end of file
+}
diff --git a/lib/c/c_Authentication.cc b/lib/c/c_Authentication.cc
index 8384fac..e4b2fbd 100644
--- a/lib/c/c_Authentication.cc
+++ b/lib/c/c_Authentication.cc
@@ -17,14 +17,13 @@
  * under the License.
  */
 
+#include <pulsar/Authentication.h>
 #include <pulsar/c/authentication.h>
 
-#include <pulsar/Authentication.h>
+#include <cstdlib>
 
 #include "c_structs.h"
 
-#include <cstdlib>
-
 pulsar_authentication_t *pulsar_authentication_create(const char *dynamicLibPath,
                                                       const char *authParamsString) {
     pulsar_authentication_t *authentication = new pulsar_authentication_t;
diff --git a/lib/c/c_Message.cc b/lib/c/c_Message.cc
index 4fe4c39..552fa08 100644
--- a/lib/c/c_Message.cc
+++ b/lib/c/c_Message.cc
@@ -18,6 +18,7 @@
  */
 
 #include <pulsar/c/message.h>
+
 #include "c_structs.h"
 
 pulsar_message_t *pulsar_message_create() { return new pulsar_message_t; }
diff --git a/lib/c/c_MessageId.cc b/lib/c/c_MessageId.cc
index 537bb70..ca7739e 100644
--- a/lib/c/c_MessageId.cc
+++ b/lib/c/c_MessageId.cc
@@ -18,11 +18,13 @@
  */
 
 #include <pulsar/c/message_id.h>
-#include "c_structs.h"
 
+#include <cstring>
 #include <mutex>
 #include <sstream>
 
+#include "c_structs.h"
+
 std::once_flag initialized;
 
 static pulsar_message_id_t earliest;
diff --git a/lib/c/c_ProducerConfiguration.cc b/lib/c/c_ProducerConfiguration.cc
index fbc5714..868eddf 100644
--- a/lib/c/c_ProducerConfiguration.cc
+++ b/lib/c/c_ProducerConfiguration.cc
@@ -16,8 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#include <pulsar/c/message.h>
 #include <pulsar/c/producer_configuration.h>
-#include <include/pulsar/c/message.h>
 
 #include "c_structs.h"
 
diff --git a/lib/c/c_Reader.cc b/lib/c/c_Reader.cc
index a28d9c2..3490b54 100644
--- a/lib/c/c_Reader.cc
+++ b/lib/c/c_Reader.cc
@@ -17,8 +17,8 @@
  * under the License.
  */
 
-#include <pulsar/c/reader.h>
 #include <pulsar/Reader.h>
+#include <pulsar/c/reader.h>
 
 #include "c_structs.h"
 
diff --git a/lib/c/c_ReaderConfiguration.cc b/lib/c/c_ReaderConfiguration.cc
index 9d3a1a0..fe05cf2 100644
--- a/lib/c/c_ReaderConfiguration.cc
+++ b/lib/c/c_ReaderConfiguration.cc
@@ -17,11 +17,11 @@
  * under the License.
  */
 
-#include <pulsar/c/reader.h>
+#include <pulsar/Reader.h>
+#include <pulsar/ReaderConfiguration.h>
 #include <pulsar/c/message.h>
+#include <pulsar/c/reader.h>
 #include <pulsar/c/reader_configuration.h>
-#include <pulsar/ReaderConfiguration.h>
-#include <pulsar/Reader.h>
 
 #include "c_structs.h"
 
@@ -85,4 +85,4 @@ void pulsar_reader_configuration_set_read_compacted(pulsar_reader_configuration_
 
 int pulsar_reader_configuration_is_read_compacted(pulsar_reader_configuration_t *configuration) {
     return configuration->conf.isReadCompacted();
-}
\ No newline at end of file
+}
diff --git a/lib/c/c_Result.cc b/lib/c/c_Result.cc
index 157a91f..e0624b4 100644
--- a/lib/c/c_Result.cc
+++ b/lib/c/c_Result.cc
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-#include <pulsar/c/result.h>
 #include <pulsar/Result.h>
+#include <pulsar/c/result.h>
 
 const char *pulsar_result_str(pulsar_result result) { return pulsar::strResult((pulsar::Result)result); }
diff --git a/lib/c/c_structs.h b/lib/c/c_structs.h
index eb8889a..0ad134f 100644
--- a/lib/c/c_structs.h
+++ b/lib/c/c_structs.h
@@ -18,11 +18,11 @@
  */
 #pragma once
 
-#include <pulsar/c/result.h>
 #include <pulsar/Client.h>
+#include <pulsar/c/result.h>
 
-#include <memory>
 #include <functional>
+#include <memory>
 
 struct _pulsar_client {
     std::unique_ptr<pulsar::Client> client;
diff --git a/lib/checksum/ChecksumProvider.h b/lib/checksum/ChecksumProvider.h
index 378b321..a1e5ef7 100644
--- a/lib/checksum/ChecksumProvider.h
+++ b/lib/checksum/ChecksumProvider.h
@@ -19,8 +19,8 @@
 #ifndef _CHECKSUM_PROVIDER_H_
 #define _CHECKSUM_PROVIDER_H_
 
-#include <stdint.h>
 #include <pulsar/defines.h>
+#include <stdint.h>
 
 namespace pulsar {
 
diff --git a/lib/checksum/crc32c_arm.cc b/lib/checksum/crc32c_arm.cc
index d937a16..266ac04 100644
--- a/lib/checksum/crc32c_arm.cc
+++ b/lib/checksum/crc32c_arm.cc
@@ -23,6 +23,7 @@
 //  (found in the LICENSE.Apache file in the root directory).
 
 #include "crc32c_arm.h"
+
 #include "lib/checksum/crc32c_sw.h"
 
 #if defined(HAVE_ARM64_CRC)
diff --git a/lib/checksum/crc32c_sse42.cc b/lib/checksum/crc32c_sse42.cc
index c5dba04..8c52a27 100644
--- a/lib/checksum/crc32c_sse42.cc
+++ b/lib/checksum/crc32c_sse42.cc
@@ -28,8 +28,9 @@
 
 #include <assert.h>
 #include <stdlib.h>
-#include "lib/checksum/crc32c_sw.h"
+
 #include "gf2.hpp"
+#include "lib/checksum/crc32c_sw.h"
 
 #if BOOST_ARCH_X86_64 && !defined(__arm64__)
 #define PULSAR_X86_64
diff --git a/lib/checksum/crc32c_sw.cc b/lib/checksum/crc32c_sw.cc
index d03802c..4186964 100644
--- a/lib/checksum/crc32c_sw.cc
+++ b/lib/checksum/crc32c_sw.cc
@@ -34,6 +34,7 @@
  */
 
 #include "crc32c_sw.h"
+
 #include <mutex>
 
 namespace pulsar {
diff --git a/lib/lz4/lz4.h b/lib/lz4/lz4.h
index e5fb5a4..a428b8b 100644
--- a/lib/lz4/lz4.h
+++ b/lib/lz4/lz4.h
@@ -402,4 +402,4 @@ int LZ4_decompress_safe_withPrefix64k(const char *src, char *dst, int compressed
 LZ4_DEPRECATED("use LZ4_decompress_fast_usingDict() instead")
 
 int LZ4_decompress_fast_withPrefix64k(const char *src, char *dst, int originalSize);
-}  // namespace pulsar
\ No newline at end of file
+}  // namespace pulsar
diff --git a/lib/stats/ConsumerStatsBase.h b/lib/stats/ConsumerStatsBase.h
index de7e07b..13ca154 100644
--- a/lib/stats/ConsumerStatsBase.h
+++ b/lib/stats/ConsumerStatsBase.h
@@ -20,15 +20,15 @@
 #ifndef PULSAR_CONSUMER_STATS_BASE_HEADER
 #define PULSAR_CONSUMER_STATS_BASE_HEADER
 #include <pulsar/Message.h>
-#include <lib/PulsarApi.pb.h>
 #include <pulsar/Result.h>
-#include <boost/date_time/posix_time/posix_time.hpp>
+
+#include "lib/ProtoApiEnums.h"
 
 namespace pulsar {
 class ConsumerStatsBase {
    public:
     virtual void receivedMessage(Message&, Result) = 0;
-    virtual void messageAcknowledged(Result, proto::CommandAck_AckType) = 0;
+    virtual void messageAcknowledged(Result, CommandAck_AckType) = 0;
     virtual ~ConsumerStatsBase() {}
 };
 
diff --git a/lib/stats/ConsumerStatsDisabled.h b/lib/stats/ConsumerStatsDisabled.h
index e2233d5..f32d026 100644
--- a/lib/stats/ConsumerStatsDisabled.h
+++ b/lib/stats/ConsumerStatsDisabled.h
@@ -20,14 +20,14 @@
 #ifndef PULSAR_CONSUMER_STATS_DISABLED_H_
 #define PULSAR_CONSUMER_STATS_DISABLED_H_
 
-#include <lib/stats/ConsumerStatsBase.h>
+#include "ConsumerStatsBase.h"
 
 namespace pulsar {
 
 class ConsumerStatsDisabled : public ConsumerStatsBase {
    public:
     virtual void receivedMessage(Message&, Result) {}
-    virtual void messageAcknowledged(Result, proto::CommandAck_AckType) {}
+    virtual void messageAcknowledged(Result, CommandAck_AckType) {}
 };
 
 } /* namespace pulsar */
diff --git a/lib/stats/ConsumerStatsImpl.cc b/lib/stats/ConsumerStatsImpl.cc
index 38534a6..833dcd1 100644
--- a/lib/stats/ConsumerStatsImpl.cc
+++ b/lib/stats/ConsumerStatsImpl.cc
@@ -17,14 +17,19 @@
  * under the License.
  */
 
-#include <lib/stats/ConsumerStatsImpl.h>
-#include <lib/LogUtils.h>
+#include "ConsumerStatsImpl.h"
 
 #include <functional>
 
+#include "lib/ExecutorService.h"
+#include "lib/LogUtils.h"
+#include "lib/Utils.h"
+
 namespace pulsar {
 DECLARE_LOG_OBJECT();
 
+using Lock = std::unique_lock<std::mutex>;
+
 ConsumerStatsImpl::ConsumerStatsImpl(std::string consumerStr, ExecutorServicePtr executor,
                                      unsigned int statsIntervalInSeconds)
     : consumerStr_(consumerStr),
@@ -80,16 +85,16 @@ void ConsumerStatsImpl::receivedMessage(Message& msg, Result res) {
     totalReceivedMsgMap_[res] += 1;
 }
 
-void ConsumerStatsImpl::messageAcknowledged(Result res, proto::CommandAck_AckType ackType) {
+void ConsumerStatsImpl::messageAcknowledged(Result res, CommandAck_AckType ackType) {
     Lock lock(mutex_);
     ackedMsgMap_[std::make_pair(res, ackType)] += 1;
     totalAckedMsgMap_[std::make_pair(res, ackType)] += 1;
 }
 
 std::ostream& operator<<(std::ostream& os,
-                         const std::map<std::pair<Result, proto::CommandAck_AckType>, unsigned long>& m) {
+                         const std::map<std::pair<Result, CommandAck_AckType>, unsigned long>& m) {
     os << "{";
-    for (std::map<std::pair<Result, proto::CommandAck_AckType>, unsigned long>::const_iterator it = m.begin();
+    for (std::map<std::pair<Result, CommandAck_AckType>, unsigned long>::const_iterator it = m.begin();
          it != m.end(); it++) {
         os << "[Key: {"
            << "Result: " << strResult((it->first).first) << ", ackType: " << (it->first).second
diff --git a/lib/stats/ConsumerStatsImpl.h b/lib/stats/ConsumerStatsImpl.h
index 5607cce..a301e12 100644
--- a/lib/stats/ConsumerStatsImpl.h
+++ b/lib/stats/ConsumerStatsImpl.h
@@ -20,23 +20,30 @@
 #ifndef PULSAR_CONSUMER_STATS_IMPL_H_
 #define PULSAR_CONSUMER_STATS_IMPL_H_
 
-#include <lib/stats/ConsumerStatsBase.h>
-#include <lib/ExecutorService.h>
-#include <lib/Utils.h>
+#include <boost/asio/deadline_timer.hpp>
+#include <map>
+#include <mutex>
 #include <utility>
+
+#include "ConsumerStatsBase.h"
+#include "lib/ExecutorService.h"
 namespace pulsar {
 
+using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
+class ExecutorService;
+using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
+
 class ConsumerStatsImpl : public ConsumerStatsBase {
    private:
     std::string consumerStr_;
 
     unsigned long numBytesRecieved_ = 0;
     std::map<Result, unsigned long> receivedMsgMap_;
-    std::map<std::pair<Result, proto::CommandAck_AckType>, unsigned long> ackedMsgMap_;
+    std::map<std::pair<Result, CommandAck_AckType>, unsigned long> ackedMsgMap_;
 
     unsigned long totalNumBytesRecieved_ = 0;
     std::map<Result, unsigned long> totalReceivedMsgMap_;
-    std::map<std::pair<Result, proto::CommandAck_AckType>, unsigned long> totalAckedMsgMap_;
+    std::map<std::pair<Result, CommandAck_AckType>, unsigned long> totalAckedMsgMap_;
 
     ExecutorServicePtr executor_;
     DeadlineTimerPtr timer_;
@@ -52,11 +59,10 @@ class ConsumerStatsImpl : public ConsumerStatsBase {
     ConsumerStatsImpl(const ConsumerStatsImpl& stats);
     void flushAndReset(const boost::system::error_code&);
     virtual void receivedMessage(Message&, Result);
-    virtual void messageAcknowledged(Result, proto::CommandAck_AckType);
+    virtual void messageAcknowledged(Result, CommandAck_AckType);
     virtual ~ConsumerStatsImpl();
 
-    const inline std::map<std::pair<Result, proto::CommandAck_AckType>, unsigned long>& getAckedMsgMap()
-        const {
+    const inline std::map<std::pair<Result, CommandAck_AckType>, unsigned long>& getAckedMsgMap() const {
         return ackedMsgMap_;
     }
 
@@ -64,8 +70,7 @@ class ConsumerStatsImpl : public ConsumerStatsBase {
 
     const inline std::map<Result, unsigned long>& getReceivedMsgMap() const { return receivedMsgMap_; }
 
-    inline const std::map<std::pair<Result, proto::CommandAck_AckType>, unsigned long>& getTotalAckedMsgMap()
-        const {
+    inline const std::map<std::pair<Result, CommandAck_AckType>, unsigned long>& getTotalAckedMsgMap() const {
         return totalAckedMsgMap_;
     }
 
diff --git a/lib/stats/ProducerStatsBase.h b/lib/stats/ProducerStatsBase.h
index 0ae16d1..aafc877 100644
--- a/lib/stats/ProducerStatsBase.h
+++ b/lib/stats/ProducerStatsBase.h
@@ -21,6 +21,7 @@
 #define PULSAR_PRODUCER_STATS_BASE_HEADER
 #include <pulsar/Message.h>
 #include <pulsar/Result.h>
+
 #include <boost/date_time/posix_time/posix_time.hpp>
 
 namespace pulsar {
diff --git a/lib/stats/ProducerStatsDisabled.h b/lib/stats/ProducerStatsDisabled.h
index 6568c07..df1df0f 100644
--- a/lib/stats/ProducerStatsDisabled.h
+++ b/lib/stats/ProducerStatsDisabled.h
@@ -19,7 +19,7 @@
 
 #ifndef PULSAR_PRODUCER_STATS_DISABLED_HEADER
 #define PULSAR_PRODUCER_STATS_DISABLED_HEADER
-#include <lib/stats/ProducerStatsBase.h>
+#include "ProducerStatsBase.h"
 
 namespace pulsar {
 class ProducerStatsDisabled : public ProducerStatsBase {
diff --git a/lib/stats/ProducerStatsImpl.cc b/lib/stats/ProducerStatsImpl.cc
index e6f0221..f30aebb 100644
--- a/lib/stats/ProducerStatsImpl.cc
+++ b/lib/stats/ProducerStatsImpl.cc
@@ -17,12 +17,14 @@
  * under the License.
  */
 
-#include <lib/stats/ProducerStatsImpl.h>
-
-#include <lib/LogUtils.h>
+#include "ProducerStatsImpl.h"
 
 #include <array>
 
+#include "lib/ExecutorService.h"
+#include "lib/LogUtils.h"
+#include "lib/Utils.h"
+
 namespace pulsar {
 DECLARE_LOG_OBJECT();
 
diff --git a/lib/stats/ProducerStatsImpl.h b/lib/stats/ProducerStatsImpl.h
index 27ffacc..a826ef0 100644
--- a/lib/stats/ProducerStatsImpl.h
+++ b/lib/stats/ProducerStatsImpl.h
@@ -20,29 +20,31 @@
 #ifndef PULSAR_PRODUCER_STATS_IMPL_HEADER
 #define PULSAR_PRODUCER_STATS_IMPL_HEADER
 
-#include <pulsar/Message.h>
 #include <map>
-#include <lib/ExecutorService.h>
 
 #if BOOST_VERSION >= 106400
 #include <boost/serialization/array_wrapper.hpp>
 #endif
-#include <boost/accumulators/framework/features.hpp>
-
 #include <boost/accumulators/accumulators.hpp>
-#include <boost/accumulators/statistics.hpp>
 #include <boost/accumulators/framework/accumulator_set.hpp>
+#include <boost/accumulators/framework/features.hpp>
+#include <boost/accumulators/statistics.hpp>
 #include <boost/accumulators/statistics/extended_p_square.hpp>
-
+#include <boost/asio/deadline_timer.hpp>
 #include <boost/date_time/local_time/local_time.hpp>
+#include <iostream>
 #include <memory>
 #include <mutex>
-#include <iostream>
 #include <vector>
-#include <lib/Utils.h>
-#include <lib/stats/ProducerStatsBase.h>
+
+#include "ProducerStatsBase.h"
 
 namespace pulsar {
+
+class ExecutorService;
+using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
+using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
+
 typedef boost::accumulators::accumulator_set<
     double,
     boost::accumulators::stats<boost::accumulators::tag::mean, boost::accumulators::tag::extended_p_square> >
diff --git a/perf/PerfConsumer.cc b/perf/PerfConsumer.cc
index 6717fde..c4f1f00 100644
--- a/perf/PerfConsumer.cc
+++ b/perf/PerfConsumer.cc
@@ -20,27 +20,26 @@
 DECLARE_LOG_OBJECT()
 
 #include <chrono>
-#include <thread>
-#include <iostream>
 #include <fstream>
-#include <mutex>
 #include <functional>
+#include <iostream>
+#include <mutex>
+#include <thread>
 
 using namespace std::chrono;
 
-#include <boost/date_time/posix_time/posix_time.hpp>
-#include <boost/program_options.hpp>
 #include <boost/accumulators/accumulators.hpp>
-#include <boost/accumulators/statistics/stats.hpp>
 #include <boost/accumulators/statistics/mean.hpp>
 #include <boost/accumulators/statistics/p_square_quantile.hpp>
+#include <boost/accumulators/statistics/stats.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/program_options.hpp>
 namespace po = boost::program_options;
 using namespace boost::accumulators;
 
 #include <lib/Latch.h>
-
-#include <pulsar/Client.h>
 #include <pulsar/Authentication.h>
+#include <pulsar/Client.h>
 using namespace pulsar;
 
 static int64_t currentTimeMillis() {
diff --git a/perf/PerfProducer.cc b/perf/PerfProducer.cc
index f7d3361..04d5cbf 100644
--- a/perf/PerfProducer.cc
+++ b/perf/PerfProducer.cc
@@ -19,26 +19,27 @@
 #include <lib/LogUtils.h>
 DECLARE_LOG_OBJECT()
 
-#include <mutex>
-
 #include <boost/accumulators/accumulators.hpp>
-#include <boost/accumulators/statistics/stats.hpp>
 #include <boost/accumulators/statistics/mean.hpp>
 #include <boost/accumulators/statistics/p_square_quantile.hpp>
-#include <boost/program_options/variables_map.hpp>
+#include <boost/accumulators/statistics/stats.hpp>
 #include <boost/program_options.hpp>
-#include <thread>
+#include <boost/program_options/variables_map.hpp>
 #include <functional>
+#include <mutex>
+#include <thread>
 namespace po = boost::program_options;
 
+#include <pulsar/Authentication.h>
+#include <pulsar/Client.h>
+#include <pulsar/MessageBuilder.h>
+
 #include <atomic>
-#include <iostream>
 #include <fstream>
+#include <iostream>
 #include <vector>
-#include <pulsar/Client.h>
+
 #include "RateLimiter.h"
-#include <pulsar/MessageBuilder.h>
-#include <pulsar/Authentication.h>
 typedef std::shared_ptr<pulsar::RateLimiter> RateLimiterPtr;
 
 struct Arguments {
diff --git a/perf/RateLimiter.h b/perf/RateLimiter.h
index eea76ec..e65969e 100644
--- a/perf/RateLimiter.h
+++ b/perf/RateLimiter.h
@@ -20,8 +20,8 @@
 #define PERF_RATELIMITER_H_
 
 #include <chrono>
-#include <thread>
 #include <mutex>
+#include <thread>
 
 namespace pulsar {
 
diff --git a/tests/AuthBasicTest.cc b/tests/AuthBasicTest.cc
index 29d6624..0b8b7f0 100644
--- a/tests/AuthBasicTest.cc
+++ b/tests/AuthBasicTest.cc
@@ -17,9 +17,8 @@
  * under the License.
  */
 
-#include <pulsar/Authentication.h>
-
 #include <gtest/gtest.h>
+#include <pulsar/Authentication.h>
 #include <pulsar/Client.h>
 
 #include <string>
diff --git a/tests/AuthPluginTest.cc b/tests/AuthPluginTest.cc
index 8ba9563..e2d3351 100644
--- a/tests/AuthPluginTest.cc
+++ b/tests/AuthPluginTest.cc
@@ -16,18 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include "pulsar/Authentication.h"
 #include <gtest/gtest.h>
+#include <pulsar/Authentication.h>
 #include <pulsar/Client.h>
-#include <boost/asio.hpp>
+
 #include <boost/algorithm/string.hpp>
+#include <boost/asio.hpp>
 #include <thread>
-#include <lib/LogUtils.h>
-#include <lib/auth/AuthOauth2.h>
 
-#include <lib/Latch.h>
 #include "lib/Future.h"
+#include "lib/Latch.h"
+#include "lib/LogUtils.h"
 #include "lib/Utils.h"
+#include "lib/auth/AuthOauth2.h"
 DECLARE_LOG_OBJECT()
 
 using namespace pulsar;
diff --git a/tests/AuthTokenTest.cc b/tests/AuthTokenTest.cc
index fb14d4c..8bdd268 100644
--- a/tests/AuthTokenTest.cc
+++ b/tests/AuthTokenTest.cc
@@ -17,19 +17,18 @@
  * under the License.
  */
 
-#include <pulsar/Authentication.h>
-
 #include <gtest/gtest.h>
+#include <pulsar/Authentication.h>
 #include <pulsar/Client.h>
-#include <boost/asio.hpp>
-#include <boost/algorithm/string.hpp>
-#include <lib/LogUtils.h>
 
-#include <string>
+#include <boost/algorithm/string.hpp>
+#include <boost/asio.hpp>
 #include <fstream>
 #include <streambuf>
+#include <string>
 
 #include "lib/Future.h"
+#include "lib/LogUtils.h"
 #include "lib/Utils.h"
 DECLARE_LOG_OBJECT()
 
diff --git a/tests/BackoffTest.cc b/tests/BackoffTest.cc
index 8ecf566..d066b94 100644
--- a/tests/BackoffTest.cc
+++ b/tests/BackoffTest.cc
@@ -17,9 +17,13 @@
  * under the License.
  */
 #include <gtest/gtest.h>
+
 #include <thread>
-#include "Backoff.h"
+
 #include "PulsarFriend.h"
+#include "lib/Backoff.h"
+#include "lib/ClientConnection.h"
+#include "lib/stats/ProducerStatsImpl.h"
 
 using namespace pulsar;
 using boost::posix_time::milliseconds;
diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc
index d3e424e..61cf28a 100644
--- a/tests/BasicEndToEndTest.cc
+++ b/tests/BasicEndToEndTest.cc
@@ -16,41 +16,38 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <set>
-#include <mutex>
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+
+#include <algorithm>
 #include <chrono>
-#include <thread>
-#include <vector>
 #include <cstring>
+#include <functional>
+#include <mutex>
+#include <set>
 #include <sstream>
 #include <stdexcept>
-#include <algorithm>
-#include <functional>
-
-#include <gtest/gtest.h>
-#include <pulsar/Client.h>
-#include <pulsar/Consumer.h>
-#include <pulsar/MessageBuilder.h>
-#include <pulsar/CryptoKeyReader.h>
-
-#include <lib/Latch.h>
-#include <lib/Utils.h>
-#include <lib/Future.h>
-#include <lib/Commands.h>
-#include <lib/LogUtils.h>
-#include <lib/TimeUtils.h>
-#include <lib/TopicName.h>
-#include <lib/ClientImpl.h>
-#include <lib/ConsumerImpl.h>
-#include <lib/PulsarApi.pb.h>
-#include <lib/MultiTopicsConsumerImpl.h>
-#include <lib/AckGroupingTrackerEnabled.h>
-#include <lib/AckGroupingTrackerDisabled.h>
-#include <lib/PatternMultiTopicsConsumerImpl.h>
+#include <thread>
+#include <vector>
 
+#include "CustomRoutingPolicy.h"
 #include "HttpHelper.h"
 #include "PulsarFriend.h"
-#include "CustomRoutingPolicy.h"
+#include "lib/AckGroupingTrackerDisabled.h"
+#include "lib/AckGroupingTrackerEnabled.h"
+#include "lib/ClientConnection.h"
+#include "lib/ClientImpl.h"
+#include "lib/Commands.h"
+#include "lib/ConsumerImpl.h"
+#include "lib/Future.h"
+#include "lib/Latch.h"
+#include "lib/LogUtils.h"
+#include "lib/TimeUtils.h"
+#include "lib/TopicName.h"
+#include "lib/UnAckedMessageTrackerDisabled.h"
+#include "lib/UnAckedMessageTrackerEnabled.h"
+#include "lib/Utils.h"
+#include "lib/stats/ProducerStatsImpl.h"
 
 DECLARE_LOG_OBJECT()
 
@@ -3512,7 +3509,7 @@ class AckGroupingTrackerMock : public AckGroupingTracker {
     explicit AckGroupingTrackerMock(bool mockAck) : mockAck_(mockAck) {}
 
     bool callDoImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t consumerId, const MessageId &msgId,
-                            proto::CommandAck_AckType ackType) {
+                            CommandAck_AckType ackType) {
         if (!this->mockAck_) {
             // Not mocking ACK, expose this method.
             return this->doImmediateAck(connWeakPtr, consumerId, msgId, ackType);
@@ -3586,7 +3583,7 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerSingleAckBehavior) {
         auto connPtr = connWeakPtr.lock();
         ASSERT_NE(connPtr, nullptr);
         ASSERT_TRUE(tracker.callDoImmediateAck(connWeakPtr, consumerImpl.getConsumerId(), recvMsgId[msgIdx],
-                                               proto::CommandAck::Individual));
+                                               CommandAck_AckType_Individual));
     }
     Message msg;
     ASSERT_EQ(ResultTimeout, consumer.receive(msg, 1000));
diff --git a/tests/BatchMessageTest.cc b/tests/BatchMessageTest.cc
index 62fd5ff..273146c 100644
--- a/tests/BatchMessageTest.cc
+++ b/tests/BatchMessageTest.cc
@@ -16,28 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <pulsar/MessageBatch.h>
+
 #include <atomic>
 #include <ctime>
 #include <functional>
-#include <gtest/gtest.h>
 #include <sstream>
 #include <thread>
-#include <unistd.h>
-
-#include <lib/Commands.h>
-#include <lib/Future.h>
-#include <lib/Latch.h>
-#include <lib/LogUtils.h>
-#include <lib/TopicName.h>
-#include <lib/Utils.h>
-#include <pulsar/Client.h>
-#include <pulsar/MessageBatch.h>
-#include <pulsar/MessageBuilder.h>
 
 #include "ConsumerTest.h"
 #include "CustomRoutingPolicy.h"
 #include "HttpHelper.h"
 #include "PulsarFriend.h"
+#include "lib/Commands.h"
+#include "lib/Future.h"
+#include "lib/Latch.h"
+#include "lib/LogUtils.h"
+#include "lib/ProtoApiEnums.h"
+#include "lib/Utils.h"
+#include "lib/stats/ProducerStatsImpl.h"
 
 DECLARE_LOG_OBJECT();
 
@@ -330,8 +329,8 @@ TEST(BatchMessageTest, testSmallReceiverQueueSize) {
     }
 
     ConsumerStatsImplPtr consumerStatsImplPtr = PulsarFriend::getConsumerStatsPtr(consumer);
-    unsigned long t = consumerStatsImplPtr->getAckedMsgMap().at(
-        std::make_pair<Result, proto::CommandAck_AckType>(ResultOk, proto::CommandAck_AckType_Individual));
+    unsigned long t =
+        consumerStatsImplPtr->getAckedMsgMap().at(std::make_pair(ResultOk, CommandAck_AckType_Individual));
     ASSERT_EQ(t, numOfMessages);
     ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getAckedMsgMap()), numOfMessages);
     ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getTotalAckedMsgMap()), numOfMessages);
@@ -581,8 +580,8 @@ TEST(BatchMessageTest, testCumulativeAck) {
     ASSERT_EQ(consumerStatsImplPtr->getReceivedMsgMap().at(ResultTimeout), 1);
     ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getAckedMsgMap()), 1);
     ASSERT_EQ(producerStatsImplPtr->getNumBytesSent(), consumerStatsImplPtr->getNumBytesRecieved());
-    unsigned long t = consumerStatsImplPtr->getAckedMsgMap().at(
-        std::make_pair<Result, proto::CommandAck_AckType>(ResultOk, proto::CommandAck_AckType_Cumulative));
+    unsigned long t =
+        consumerStatsImplPtr->getAckedMsgMap().at(std::make_pair(ResultOk, CommandAck_AckType_Cumulative));
     ASSERT_EQ(t, 1);
 
     // Number of messages produced
@@ -612,8 +611,7 @@ TEST(BatchMessageTest, testCumulativeAck) {
     }
 
     ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getAckedMsgMap()), 1);
-    t = consumerStatsImplPtr->getAckedMsgMap().at(
-        std::make_pair<Result, proto::CommandAck_AckType>(ResultOk, proto::CommandAck_AckType_Cumulative));
+    t = consumerStatsImplPtr->getAckedMsgMap().at(std::make_pair(ResultOk, CommandAck_AckType_Cumulative));
     ASSERT_EQ(t, 1);
 
     // Number of messages consumed
diff --git a/tests/BlockingQueueTest.cc b/tests/BlockingQueueTest.cc
index 94b0a1b..0b86b5b 100644
--- a/tests/BlockingQueueTest.cc
+++ b/tests/BlockingQueueTest.cc
@@ -17,13 +17,14 @@
  * under the License.
  */
 #include <gtest/gtest.h>
-#include <lib/BlockingQueue.h>
-#include <lib/Latch.h>
 
 #include <future>
 #include <iostream>
 #include <thread>
 
+#include "lib/BlockingQueue.h"
+#include "lib/Latch.h"
+
 class ProducerWorker {
    private:
     std::thread producerThread_;
diff --git a/tests/ClientDeduplicationTest.cc b/tests/ClientDeduplicationTest.cc
index c3373d5..90991b9 100644
--- a/tests/ClientDeduplicationTest.cc
+++ b/tests/ClientDeduplicationTest.cc
@@ -16,15 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <pulsar/Client.h>
-
 #include <gtest/gtest.h>
-
-#include "HttpHelper.h"
+#include <pulsar/Client.h>
 
 #include <string>
 #include <thread>
 
+#include "HttpHelper.h"
+
 using namespace pulsar;
 
 static std::string serviceUrl = "pulsar://localhost:6650";
diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc
index aa48bdc..bdb0555 100644
--- a/tests/ClientTest.cc
+++ b/tests/ClientTest.cc
@@ -17,14 +17,16 @@
  * under the License.
  */
 #include <gtest/gtest.h>
+#include <pulsar/Client.h>
+
+#include <future>
 
 #include "HttpHelper.h"
 #include "PulsarFriend.h"
-
-#include <future>
-#include <pulsar/Client.h>
-#include "../lib/checksum/ChecksumProvider.h"
+#include "lib/ClientConnection.h"
 #include "lib/LogUtils.h"
+#include "lib/checksum/ChecksumProvider.h"
+#include "lib/stats/ProducerStatsImpl.h"
 
 DECLARE_LOG_OBJECT()
 
diff --git a/tests/CompressionCodecSnappyTest.cc b/tests/CompressionCodecSnappyTest.cc
index 27d668f..c05c402 100644
--- a/tests/CompressionCodecSnappyTest.cc
+++ b/tests/CompressionCodecSnappyTest.cc
@@ -18,7 +18,7 @@
  */
 #include <gtest/gtest.h>
 
-#include "../lib/CompressionCodecSnappy.h"
+#include "lib/CompressionCodecSnappy.h"
 
 using namespace pulsar;
 
diff --git a/tests/ConsumerConfigurationTest.cc b/tests/ConsumerConfigurationTest.cc
index 20cd8f4..fde8736 100644
--- a/tests/ConsumerConfigurationTest.cc
+++ b/tests/ConsumerConfigurationTest.cc
@@ -16,9 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <pulsar/Client.h>
 #include <gtest/gtest.h>
 #include <lib/LogUtils.h>
+#include <pulsar/Client.h>
+
 #include "NoOpsCryptoKeyReader.h"
 
 DECLARE_LOG_OBJECT()
diff --git a/tests/ConsumerStatsTest.cc b/tests/ConsumerStatsTest.cc
index c398a53..3c1959a 100644
--- a/tests/ConsumerStatsTest.cc
+++ b/tests/ConsumerStatsTest.cc
@@ -18,17 +18,18 @@
  */
 #include <gtest/gtest.h>
 #include <pulsar/Client.h>
-#include <lib/LogUtils.h>
-#include <lib/Commands.h>
-#include "lib/Future.h"
-#include "lib/Utils.h"
-#include "PulsarFriend.h"
-#include "ConsumerTest.h"
-#include "HttpHelper.h"
-#include <lib/Latch.h>
 
 #include <functional>
 #include <thread>
+
+#include "ConsumerTest.h"
+#include "HttpHelper.h"
+#include "PulsarFriend.h"
+#include "lib/Future.h"
+#include "lib/Latch.h"
+#include "lib/LogUtils.h"
+#include "lib/MultiTopicsBrokerConsumerStatsImpl.h"
+#include "lib/Utils.h"
 DECLARE_LOG_OBJECT();
 
 using namespace pulsar;
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index c8a07e6..e35a1f0 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -16,22 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+
 #include <chrono>
-#include <thread>
-#include <time.h>
-#include <set>
+#include <ctime>
 #include <map>
+#include <set>
+#include <thread>
 #include <vector>
 
-#include "gtest/gtest.h"
-
-#include "pulsar/Client.h"
+#include "HttpHelper.h"
 #include "PulsarFriend.h"
+#include "lib/ClientConnection.h"
 #include "lib/Future.h"
-#include "lib/Utils.h"
 #include "lib/LogUtils.h"
 #include "lib/MultiTopicsConsumerImpl.h"
-#include "HttpHelper.h"
+#include "lib/TimeUtils.h"
+#include "lib/UnAckedMessageTrackerDisabled.h"
+#include "lib/UnAckedMessageTrackerEnabled.h"
+#include "lib/Utils.h"
+#include "lib/stats/ProducerStatsImpl.h"
 
 static const std::string lookupUrl = "pulsar://localhost:6650";
 static const std::string adminUrl = "http://localhost:8080/";
diff --git a/tests/ConsumerTest.h b/tests/ConsumerTest.h
index 8c7d3f7..ca84aa7 100644
--- a/tests/ConsumerTest.h
+++ b/tests/ConsumerTest.h
@@ -16,9 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include "lib/ConsumerImpl.h"
 #include <string>
 
+#include "lib/ConsumerImpl.h"
+
 using std::string;
 
 namespace pulsar {
diff --git a/tests/CustomLoggerTest.cc b/tests/CustomLoggerTest.cc
index bd80c31..dde4056 100644
--- a/tests/CustomLoggerTest.cc
+++ b/tests/CustomLoggerTest.cc
@@ -16,13 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#include <gtest/gtest.h>
 #include <pulsar/Client.h>
 #include <pulsar/ConsoleLoggerFactory.h>
-#include <LogUtils.h>
-#include <gtest/gtest.h>
+
 #include <atomic>
 #include <thread>
 
+#include "lib/LogUtils.h"
+
 using namespace pulsar;
 
 static std::vector<std::string> logLines;
diff --git a/tests/CustomRoutingPolicy.h b/tests/CustomRoutingPolicy.h
index ed10c5b..0cbf8b4 100644
--- a/tests/CustomRoutingPolicy.h
+++ b/tests/CustomRoutingPolicy.h
@@ -19,10 +19,11 @@
 #ifndef CUSTOM_ROUTER_POLICY_HEADER_
 #define CUSTOM_ROUTER_POLICY_HEADER_
 
-#include <cstdlib>  // rand()
-#include <boost/algorithm/string.hpp>
 #include <pulsar/DeprecatedException.h>
 
+#include <boost/algorithm/string.hpp>
+#include <cstdlib>  // rand()
+
 namespace pulsar {
 class CustomRoutingPolicy : public MessageRoutingPolicy {
     /** @deprecated */
diff --git a/tests/HashTest.cc b/tests/HashTest.cc
index bd6de09..76a49fd 100644
--- a/tests/HashTest.cc
+++ b/tests/HashTest.cc
@@ -16,14 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <pulsar/Client.h>
-#include <gtest/gtest.h>
 #include <gmock/gmock.h>
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+
 #include <boost/functional/hash.hpp>
 
-#include "../lib/BoostHash.h"
-#include "../lib/JavaStringHash.h"
-#include "../lib/Murmur3_32Hash.h"
+#include "lib/BoostHash.h"
+#include "lib/JavaStringHash.h"
+#include "lib/Murmur3_32Hash.h"
 
 using ::testing::AtLeast;
 using ::testing::Return;
diff --git a/tests/KeyBasedBatchingTest.cc b/tests/KeyBasedBatchingTest.cc
index fcb558a..e596266 100644
--- a/tests/KeyBasedBatchingTest.cc
+++ b/tests/KeyBasedBatchingTest.cc
@@ -16,14 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
 #include <time.h>
+
 #include <atomic>
 #include <map>
 #include <utility>
 
-#include <gtest/gtest.h>
-#include <pulsar/Client.h>
-#include <lib/Latch.h>
+#include "lib/Latch.h"
 
 using namespace pulsar;
 
diff --git a/tests/KeySharedConsumerTest.cc b/tests/KeySharedConsumerTest.cc
index 4660334..74da5b3 100644
--- a/tests/KeySharedConsumerTest.cc
+++ b/tests/KeySharedConsumerTest.cc
@@ -16,17 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+
 #include <cmath>
 #include <ctime>
-#include <vector>
 #include <map>
-
-#include <gtest/gtest.h>
-#include <pulsar/Client.h>
-#include "lib/LogUtils.h"
+#include <vector>
 
 #include "HttpHelper.h"
 #include "LogHelper.h"
+#include "lib/LogUtils.h"
 
 DECLARE_LOG_OBJECT()
 
diff --git a/tests/KeySharedPolicyTest.cc b/tests/KeySharedPolicyTest.cc
index 49fef3f..ff43102 100644
--- a/tests/KeySharedPolicyTest.cc
+++ b/tests/KeySharedPolicyTest.cc
@@ -16,18 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <cmath>
-#include <ctime>
-#include <vector>
-#include <map>
-
 #include <gtest/gtest.h>
 #include <pulsar/Client.h>
 #include <pulsar/KeySharedPolicy.h>
-#include "lib/LogUtils.h"
+
+#include <cmath>
+#include <ctime>
+#include <map>
+#include <vector>
 
 #include "HttpHelper.h"
 #include "LogHelper.h"
+#include "lib/LogUtils.h"
 
 DECLARE_LOG_OBJECT()
 
diff --git a/tests/LatchTest.cc b/tests/LatchTest.cc
index c69141e..041d2e3 100644
--- a/tests/LatchTest.cc
+++ b/tests/LatchTest.cc
@@ -17,9 +17,11 @@
  * under the License.
  */
 #include <gtest/gtest.h>
-#include <lib/Latch.h>
+
 #include <thread>
-#include "LogUtils.h"
+
+#include "lib/Latch.h"
+#include "lib/LogUtils.h"
 
 DECLARE_LOG_OBJECT()
 
diff --git a/tests/LoggerTest.cc b/tests/LoggerTest.cc
index d26ccc6..2032744 100644
--- a/tests/LoggerTest.cc
+++ b/tests/LoggerTest.cc
@@ -16,9 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include "LogUtils.h"
 #include <gtest/gtest.h>
 
+#include "lib/LogUtils.h"
+
 DECLARE_LOG_OBJECT()
 
 TEST(LoggerTest, testLogger) {
diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc
index 77c1e1a..5ac4122 100644
--- a/tests/LookupServiceTest.cc
+++ b/tests/LookupServiceTest.cc
@@ -17,21 +17,23 @@
  * under the License.
  */
 #include <BinaryProtoLookupService.h>
-#include <HTTPLookupService.h>
-#include <pulsar/Client.h>
-
-#include <gtest/gtest.h>
 #include <Future.h>
+#include <HTTPLookupService.h>
 #include <Utils.h>
-#include "ConnectionPool.h"
-#include "HttpHelper.h"
+#include <gtest/gtest.h>
 #include <pulsar/Authentication.h>
-#include <boost/exception/all.hpp>
-#include "LogUtils.h"
-#include "RetryableLookupService.h"
-#include "PulsarFriend.h"
+#include <pulsar/Client.h>
 
 #include <algorithm>
+#include <boost/exception/all.hpp>
+
+#include "HttpHelper.h"
+#include "PulsarFriend.h"
+#include "lib/ClientConnection.h"
+#include "lib/ConnectionPool.h"
+#include "lib/LogUtils.h"
+#include "lib/RetryableLookupService.h"
+#include "lib/TimeUtils.h"
 
 using namespace pulsar;
 
diff --git a/tests/MapCacheTest.cc b/tests/MapCacheTest.cc
index 12a89ee..2140937 100644
--- a/tests/MapCacheTest.cc
+++ b/tests/MapCacheTest.cc
@@ -17,7 +17,8 @@
  * under the License.
  */
 #include <gtest/gtest.h>
-#include <lib/MapCache.h>
+
+#include "lib/MapCache.h"
 
 using namespace pulsar;
 
diff --git a/tests/MemoryLimitControllerTest.cc b/tests/MemoryLimitControllerTest.cc
index eb63760..a462aef 100644
--- a/tests/MemoryLimitControllerTest.cc
+++ b/tests/MemoryLimitControllerTest.cc
@@ -18,10 +18,11 @@
  */
 
 #include <gtest/gtest.h>
+
 #include <thread>
 
-#include "../lib/MemoryLimitController.h"
-#include "../lib/Latch.h"
+#include "lib/Latch.h"
+#include "lib/MemoryLimitController.h"
 
 using namespace pulsar;
 
@@ -127,4 +128,4 @@ TEST(MemoryLimitControllerTest, testStepRelease) {
     t1.join();
     t2.join();
     t3.join();
-}
\ No newline at end of file
+}
diff --git a/tests/MemoryLimitTest.cc b/tests/MemoryLimitTest.cc
index cb0b47a..39c0f6e 100644
--- a/tests/MemoryLimitTest.cc
+++ b/tests/MemoryLimitTest.cc
@@ -18,15 +18,15 @@
  */
 
 #include <gtest/gtest.h>
+#include <pulsar/Client.h>
+
 #include <array>
 #include <thread>
 
-#include "../lib/MemoryLimitController.h"
-#include "../lib/Latch.h"
-#include "../lib/Future.h"
-#include "../lib/Utils.h"
-
-#include <pulsar/Client.h>
+#include "lib/Future.h"
+#include "lib/Latch.h"
+#include "lib/MemoryLimitController.h"
+#include "lib/Utils.h"
 
 using namespace pulsar;
 
@@ -160,4 +160,4 @@ TEST(MemoryLimitTest, testNoProducerQueueSize) {
         Result res = p.getFuture().get(id);
         ASSERT_EQ(res, ResultOk);
     }
-}
\ No newline at end of file
+}
diff --git a/tests/MessageChunkingTest.cc b/tests/MessageChunkingTest.cc
index ae0114c..61a9714 100644
--- a/tests/MessageChunkingTest.cc
+++ b/tests/MessageChunkingTest.cc
@@ -16,13 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+
 #include <ctime>
 #include <random>
 
-#include <pulsar/Client.h>
-#include <gtest/gtest.h>
-#include "lib/LogUtils.h"
 #include "PulsarFriend.h"
+#include "lib/LogUtils.h"
 
 DECLARE_LOG_OBJECT()
 
diff --git a/tests/MessageIdTest.cc b/tests/MessageIdTest.cc
index 55fa181..55257d9 100644
--- a/tests/MessageIdTest.cc
+++ b/tests/MessageIdTest.cc
@@ -16,14 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <pulsar/MessageId.h>
-#include "lib/MessageIdUtil.h"
-#include "PulsarFriend.h"
-
 #include <gtest/gtest.h>
+#include <pulsar/MessageId.h>
 
 #include <string>
 
+#include "PulsarFriend.h"
+#include "lib/MessageIdUtil.h"
+
 using namespace pulsar;
 
 TEST(MessageIdTest, testSerialization) {
diff --git a/tests/MessageTest.cc b/tests/MessageTest.cc
index fcc22e9..7e26431 100644
--- a/tests/MessageTest.cc
+++ b/tests/MessageTest.cc
@@ -16,11 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <pulsar/MessageBuilder.h>
-#include <pulsar/Client.h>
 #include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <pulsar/MessageBuilder.h>
+
 #include <string>
-#include <lib/LogUtils.h>
+
+#include "lib/LogUtils.h"
 
 using namespace pulsar;
 TEST(MessageTest, testMessageContents) {
diff --git a/tests/MessagesImplTest.cc b/tests/MessagesImplTest.cc
index e963501..a0fdc7a 100644
--- a/tests/MessagesImplTest.cc
+++ b/tests/MessagesImplTest.cc
@@ -17,8 +17,9 @@
  * under the License.
  */
 #include <gtest/gtest.h>
-#include <MessagesImpl.h>
-#include "pulsar/MessageBuilder.h"
+#include <pulsar/MessageBuilder.h>
+
+#include "lib/MessagesImpl.h"
 
 using namespace pulsar;
 
diff --git a/tests/NamespaceNameTest.cc b/tests/NamespaceNameTest.cc
index 51975c8..132506e 100644
--- a/tests/NamespaceNameTest.cc
+++ b/tests/NamespaceNameTest.cc
@@ -16,9 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <NamespaceName.h>
-
 #include <gtest/gtest.h>
+
+#include "lib/NamespaceName.h"
 using namespace pulsar;
 
 TEST(NamespaceNameTest, testNamespaceName) {
diff --git a/tests/PartitionsUpdateTest.cc b/tests/PartitionsUpdateTest.cc
index 845e447..010e5cb 100644
--- a/tests/PartitionsUpdateTest.cc
+++ b/tests/PartitionsUpdateTest.cc
@@ -19,13 +19,13 @@
 #include <gtest/gtest.h>
 #include <pulsar/Client.h>
 
-#include <set>
 #include <chrono>
-#include <thread>
 #include <memory>
+#include <set>
+#include <thread>
 
-#include "HttpHelper.h"
 #include "CustomRoutingPolicy.h"
+#include "HttpHelper.h"
 
 using namespace pulsar;
 
diff --git a/tests/PeriodicTaskTest.cc b/tests/PeriodicTaskTest.cc
index 2c1da70..7b04817 100644
--- a/tests/PeriodicTaskTest.cc
+++ b/tests/PeriodicTaskTest.cc
@@ -17,9 +17,11 @@
  * under the License.
  */
 #include <gtest/gtest.h>
+
 #include <atomic>
 #include <chrono>
 #include <thread>
+
 #include "lib/ExecutorService.h"
 #include "lib/LogUtils.h"
 #include "lib/PeriodicTask.h"
diff --git a/tests/ProducerConfigurationTest.cc b/tests/ProducerConfigurationTest.cc
index 5c54129..df5867c 100644
--- a/tests/ProducerConfigurationTest.cc
+++ b/tests/ProducerConfigurationTest.cc
@@ -18,6 +18,7 @@
  */
 #include <gtest/gtest.h>
 #include <pulsar/ProducerConfiguration.h>
+
 #include "NoOpsCryptoKeyReader.h"
 
 using namespace pulsar;
diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc
index 36b23ee..ee07cbb 100644
--- a/tests/ProducerTest.cc
+++ b/tests/ProducerTest.cc
@@ -16,17 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <pulsar/Client.h>
 #include <gtest/gtest.h>
+#include <pulsar/Client.h>
+
 #include <thread>
 
 #include "HttpHelper.h"
-
 #include "lib/Future.h"
-#include "lib/Utils.h"
 #include "lib/Latch.h"
 #include "lib/LogUtils.h"
 #include "lib/ProducerImpl.h"
+#include "lib/Utils.h"
 DECLARE_LOG_OBJECT()
 
 using namespace pulsar;
diff --git a/tests/PromiseTest.cc b/tests/PromiseTest.cc
index 73c6f8c..25b6b72 100644
--- a/tests/PromiseTest.cc
+++ b/tests/PromiseTest.cc
@@ -17,12 +17,14 @@
  * under the License.
  */
 #include <gtest/gtest.h>
-#include <lib/Future.h>
+
 #include <chrono>
 #include <string>
 #include <thread>
 #include <vector>
 
+#include "lib/Future.h"
+
 using namespace pulsar;
 
 TEST(PromiseTest, testSetValue) {
diff --git a/tests/ProtobufNativeSchemaTest.cc b/tests/ProtobufNativeSchemaTest.cc
index f9557bd..37bd780 100644
--- a/tests/ProtobufNativeSchemaTest.cc
+++ b/tests/ProtobufNativeSchemaTest.cc
@@ -19,9 +19,11 @@
 #include <gtest/gtest.h>
 #include <pulsar/Client.h>
 #include <pulsar/ProtobufNativeSchema.h>
+
 #include <stdexcept>
+
 #include "PaddingDemo.pb.h"
-#include "Test.pb.h"  // generated from "pulsar-client/src/test/proto/Test.proto"
+#include "Test.pb.h"
 
 using namespace pulsar;
 
diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h
index df8e3dc..18f2bb6 100644
--- a/tests/PulsarFriend.h
+++ b/tests/PulsarFriend.h
@@ -19,13 +19,17 @@
 
 #include <string>
 
+#include "lib/ClientConnection.h"
 #include "lib/ClientImpl.h"
-#include "lib/ProducerImpl.h"
-#include "lib/PartitionedProducerImpl.h"
 #include "lib/ConsumerImpl.h"
 #include "lib/MultiTopicsConsumerImpl.h"
+#include "lib/NamespaceName.h"
+#include "lib/PartitionedProducerImpl.h"
+#include "lib/ProducerImpl.h"
 #include "lib/ReaderImpl.h"
 #include "lib/RetryableLookupService.h"
+#include "lib/stats/ConsumerStatsImpl.h"
+#include "lib/stats/ProducerStatsImpl.h"
 
 using std::string;
 
diff --git a/tests/ReaderConfigurationTest.cc b/tests/ReaderConfigurationTest.cc
index 8dc60f4..5783ac9 100644
--- a/tests/ReaderConfigurationTest.cc
+++ b/tests/ReaderConfigurationTest.cc
@@ -22,8 +22,9 @@
  */
 #include <gtest/gtest.h>
 #include <pulsar/Client.h>
-#include <lib/ReaderImpl.h>
+
 #include "NoOpsCryptoKeyReader.h"
+#include "lib/ReaderImpl.h"
 
 using namespace pulsar;
 
diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index 799702f..b88da99 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -16,19 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#include <gtest/gtest.h>
 #include <pulsar/Client.h>
 #include <pulsar/Reader.h>
-#include "HttpHelper.h"
-#include "PulsarFriend.h"
-
-#include <gtest/gtest.h>
-
 #include <time.h>
+
 #include <string>
 
-#include <lib/Latch.h>
-#include <lib/LogUtils.h>
-#include <lib/ReaderImpl.h>
+#include "HttpHelper.h"
+#include "PulsarFriend.h"
+#include "lib/ClientConnection.h"
+#include "lib/Latch.h"
+#include "lib/LogUtils.h"
+#include "lib/ReaderImpl.h"
 DECLARE_LOG_OBJECT()
 
 using namespace pulsar;
diff --git a/tests/RoundRobinMessageRouterTest.cc b/tests/RoundRobinMessageRouterTest.cc
index ce5ad17..56a7605 100644
--- a/tests/RoundRobinMessageRouterTest.cc
+++ b/tests/RoundRobinMessageRouterTest.cc
@@ -16,13 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#include <gtest/gtest.h>
 #include <pulsar/Client.h>
 #include <pulsar/ProducerConfiguration.h>
-#include <gtest/gtest.h>
+
 #include <thread>
 
-#include "../lib/RoundRobinMessageRouter.h"
-#include "../lib/TopicMetadataImpl.h"
+#include "lib/RoundRobinMessageRouter.h"
+#include "lib/TopicMetadataImpl.h"
 
 using namespace pulsar;
 
diff --git a/tests/SemaphoreTest.cc b/tests/SemaphoreTest.cc
index 0cdec79..de3da4f 100644
--- a/tests/SemaphoreTest.cc
+++ b/tests/SemaphoreTest.cc
@@ -18,10 +18,11 @@
  */
 
 #include <gtest/gtest.h>
+
 #include <thread>
 
-#include "../lib/Semaphore.h"
-#include "../lib/Latch.h"
+#include "lib/Latch.h"
+#include "lib/Semaphore.h"
 
 using namespace pulsar;
 
diff --git a/tests/ServiceURITest.cc b/tests/ServiceURITest.cc
index 9d4c88f..4e01ab2 100644
--- a/tests/ServiceURITest.cc
+++ b/tests/ServiceURITest.cc
@@ -17,6 +17,7 @@
  * under the License.
  */
 #include <gtest/gtest.h>
+
 #include "lib/ServiceURI.h"
 
 using namespace pulsar;
diff --git a/tests/ShutdownTest.cc b/tests/ShutdownTest.cc
index 3951347..1dca019 100644
--- a/tests/ShutdownTest.cc
+++ b/tests/ShutdownTest.cc
@@ -16,14 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <atomic>
-#include <ctime>
 #include <gtest/gtest.h>
 #include <pulsar/Client.h>
-#include "lib/ClientImpl.h"
+
+#include <atomic>
+#include <ctime>
+
 #include "HttpHelper.h"
 #include "PulsarFriend.h"
 #include "WaitUtils.h"
+#include "lib/ClientImpl.h"
 
 using namespace pulsar;
 
diff --git a/tests/SinglePartitionMessageRouterTest.cc b/tests/SinglePartitionMessageRouterTest.cc
index e82d080..788f19b 100644
--- a/tests/SinglePartitionMessageRouterTest.cc
+++ b/tests/SinglePartitionMessageRouterTest.cc
@@ -16,17 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
 #include <pulsar/Client.h>
 #include <pulsar/ProducerConfiguration.h>
+
 #include <boost/functional/hash.hpp>
-#include <gtest/gtest.h>
-#include <gmock/gmock.h>
 
+#include "lib/SinglePartitionMessageRouter.h"
+#include "lib/TopicMetadataImpl.h"
 #include "tests/mocks/GMockMessage.h"
 
-#include "../lib/SinglePartitionMessageRouter.h"
-#include "../lib/TopicMetadataImpl.h"
-
 using ::testing::AtLeast;
 using ::testing::Return;
 using ::testing::ReturnRef;
@@ -70,4 +70,4 @@ TEST(SinglePartitionMessageRouterTest, DISABLED_getPartitionWithPartitionKey) {
 
     ASSERT_EQ(expectedParrtition1, router.getPartition(message1, TopicMetadataImpl(numPartitons)));
     ASSERT_EQ(expectedParrtition2, router.getPartition(message2, TopicMetadataImpl(numPartitons)));
-}
\ No newline at end of file
+}
diff --git a/tests/SynchronizedHashMapTest.cc b/tests/SynchronizedHashMapTest.cc
index 8d74a24..87cbe1e 100644
--- a/tests/SynchronizedHashMapTest.cc
+++ b/tests/SynchronizedHashMapTest.cc
@@ -17,11 +17,13 @@
  * under the License.
  */
 #include <gtest/gtest.h>
+
 #include <algorithm>
 #include <atomic>
 #include <chrono>
 #include <thread>
 #include <vector>
+
 #include "lib/Latch.h"
 #include "lib/SynchronizedHashMap.h"
 
diff --git a/tests/TopicMetadataImplTest.cc b/tests/TopicMetadataImplTest.cc
index 091dd26..bdc6878 100644
--- a/tests/TopicMetadataImplTest.cc
+++ b/tests/TopicMetadataImplTest.cc
@@ -16,10 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <pulsar/Client.h>
 #include <gtest/gtest.h>
+#include <pulsar/Client.h>
 
-#include "../lib/TopicMetadataImpl.h"
+#include "lib/TopicMetadataImpl.h"
 
 using namespace pulsar;
 
diff --git a/tests/TopicNameTest.cc b/tests/TopicNameTest.cc
index 3db2a6c..44b3dc2 100644
--- a/tests/TopicNameTest.cc
+++ b/tests/TopicNameTest.cc
@@ -17,9 +17,11 @@
  * under the License.
  */
 #include <gtest/gtest.h>
-#include <lib/TopicName.h>
+
 #include <map>
 
+#include "lib/TopicName.h"
+
 using namespace pulsar;
 
 TEST(TopicNameTest, testLookup) {
diff --git a/tests/UnboundedBlockingQueueTest.cc b/tests/UnboundedBlockingQueueTest.cc
index 819c22e..bccb68d 100644
--- a/tests/UnboundedBlockingQueueTest.cc
+++ b/tests/UnboundedBlockingQueueTest.cc
@@ -17,12 +17,13 @@
  * under the License.
  */
 #include <gtest/gtest.h>
-#include <lib/UnboundedBlockingQueue.h>
-#include <lib/Latch.h>
 
 #include <future>
 #include <thread>
 
+#include "lib/Latch.h"
+#include "lib/UnboundedBlockingQueue.h"
+
 class UnboundedProducerWorker {
    private:
     std::thread producerThread_;
diff --git a/tests/UrlTest.cc b/tests/UrlTest.cc
index 6cd2d1c..c470f5b 100644
--- a/tests/UrlTest.cc
+++ b/tests/UrlTest.cc
@@ -16,9 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include "Url.h"
 #include <gtest/gtest.h>
 
+#include "lib/Url.h"
+
 using namespace pulsar;
 
 TEST(UrlTest, testUrl) {
diff --git a/tests/VersionTest.cc b/tests/VersionTest.cc
index 57e1e78..0fa790d 100644
--- a/tests/VersionTest.cc
+++ b/tests/VersionTest.cc
@@ -16,8 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <pulsar/Version.h>
 #include <gtest/gtest.h>
+#include <pulsar/Version.h>
 
 TEST(VersionTest, testMacro) {
 #ifdef PULSAR_VERSION
diff --git a/tests/ZLibCompressionTest.cc b/tests/ZLibCompressionTest.cc
index c510db1..249d7ee 100644
--- a/tests/ZLibCompressionTest.cc
+++ b/tests/ZLibCompressionTest.cc
@@ -17,7 +17,8 @@
  * under the License.
  */
 #include <gtest/gtest.h>
-#include <lib/CompressionCodecZLib.h>
+
+#include "lib/CompressionCodecZLib.h"
 
 using namespace pulsar;
 
diff --git a/tests/ZTSClientTest.cc b/tests/ZTSClientTest.cc
index b338e79..fe6de0b 100644
--- a/tests/ZTSClientTest.cc
+++ b/tests/ZTSClientTest.cc
@@ -16,9 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include "lib/auth/athenz/ZTSClient.h"
 #include <gtest/gtest.h>
 
+#include "lib/auth/athenz/ZTSClient.h"
+
 using namespace pulsar;
 
 namespace pulsar {
diff --git a/tests/ZeroQueueSizeTest.cc b/tests/ZeroQueueSizeTest.cc
index 59780fe..644f42c 100644
--- a/tests/ZeroQueueSizeTest.cc
+++ b/tests/ZeroQueueSizeTest.cc
@@ -18,8 +18,7 @@
  */
 #include <gtest/gtest.h>
 #include <pulsar/Client.h>
-#include <lib/Latch.h>
-#include "ConsumerTest.h"
+
 #include <atomic>
 #include <chrono>
 #include <condition_variable>
@@ -27,6 +26,10 @@
 #include <memory>
 #include <mutex>
 
+#include "ConsumerTest.h"
+#include "lib/Latch.h"
+#include "lib/LogUtils.h"
+
 DECLARE_LOG_OBJECT()
 
 using namespace pulsar;
diff --git a/tests/c/c_BasicEndToEndTest.cc b/tests/c/c_BasicEndToEndTest.cc
index ae01bef..04aa1dc 100644
--- a/tests/c/c_BasicEndToEndTest.cc
+++ b/tests/c/c_BasicEndToEndTest.cc
@@ -17,12 +17,12 @@
  * under the License.
  */
 
-#include <future>
+#include <gtest/gtest.h>
+#include <pulsar/c/client.h>
 #include <stdlib.h>
 #include <string.h>
 
-#include <gtest/gtest.h>
-#include <pulsar/c/client.h>
+#include <future>
 
 struct send_ctx {
     pulsar_result result;
diff --git a/tests/c/c_ConsumerConfigurationTest.cc b/tests/c/c_ConsumerConfigurationTest.cc
index c6afeef..e877867 100644
--- a/tests/c/c_ConsumerConfigurationTest.cc
+++ b/tests/c/c_ConsumerConfigurationTest.cc
@@ -16,8 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <pulsar/c/consumer_configuration.h>
 #include <gtest/gtest.h>
+#include <pulsar/c/consumer_configuration.h>
 
 TEST(C_ConsumerConfigurationTest, testCApiConfig) {
     pulsar_consumer_configuration_t *consumer_conf = pulsar_consumer_configuration_create();
diff --git a/tests/c/c_ProducerConfigurationTest.cc b/tests/c/c_ProducerConfigurationTest.cc
index 507b3a9..1614227 100644
--- a/tests/c/c_ProducerConfigurationTest.cc
+++ b/tests/c/c_ProducerConfigurationTest.cc
@@ -16,8 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <pulsar/c/producer_configuration.h>
 #include <gtest/gtest.h>
+#include <pulsar/c/producer_configuration.h>
 
 TEST(C_ProducerConfigurationTest, testCApiConfig) {
     pulsar_producer_configuration_t *producer_conf = pulsar_producer_configuration_create();
diff --git a/tests/main.cc b/tests/main.cc
index 0659925..6104501 100644
--- a/tests/main.cc
+++ b/tests/main.cc
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <LogUtils.h>
 #include <gmock/gmock.h>
 
 int main(int argc, char **argv) {
diff --git a/wireshark/pulsarDissector.cc b/wireshark/pulsarDissector.cc
index 9ff311e..38bfa4e 100644
--- a/wireshark/pulsarDissector.cc
+++ b/wireshark/pulsarDissector.cc
@@ -16,15 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <glib.h>
 #include <config.h>
+#include <epan/column-utils.h>
+#include <epan/dissectors/packet-tcp.h>
 #include <epan/expert.h>
 #include <epan/packet.h>
 #include <epan/prefs.h>
 #include <epan/proto.h>
-#include <epan/column-utils.h>
-#include <epan/dissectors/packet-tcp.h>
 #include <epan/value_string.h>
+#include <glib.h>
 #include <wsutil/nstime.h>
 
 #include "PulsarApi.pb.h"