You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/12 16:34:13 UTC

[pulsar-client-node] 03/07: change 4 to 2 spaces and reformat

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git

commit 196a69c30e238e1438f4964b6643bfc4b56ed903
Author: yfuruta <yf...@yahoo-corp.jp>
AuthorDate: Tue Mar 12 10:48:30 2019 +0900

    change 4 to 2 spaces and reformat
---
 .clang-format         |   2 +-
 src/Client.cc         | 194 +++++++++++++++++++-------------------
 src/Client.h          |  20 ++--
 src/Consumer.cc       | 243 ++++++++++++++++++++++++------------------------
 src/Consumer.h        |  26 +++---
 src/ConsumerConfig.cc |  89 +++++++++---------
 src/ConsumerConfig.h  |  24 ++---
 src/Message.cc        | 252 +++++++++++++++++++++++++-------------------------
 src/Message.h         |  60 ++++++------
 src/MessageId.cc      |  64 ++++++-------
 src/MessageId.h       |  28 +++---
 src/Producer.cc       | 194 +++++++++++++++++++-------------------
 src/Producer.h        |  22 ++---
 src/ProducerConfig.cc | 181 ++++++++++++++++++------------------
 src/ProducerConfig.h  |  16 ++--
 src/addon.cc          |  10 +-
 16 files changed, 707 insertions(+), 718 deletions(-)

diff --git a/.clang-format b/.clang-format
index cb40b50..f2d174b 100644
--- a/.clang-format
+++ b/.clang-format
@@ -17,7 +17,7 @@
 
 
 BasedOnStyle: Google
-IndentWidth: 4
+IndentWidth: 2
 ColumnLimit: 110
 SortIncludes: false
 BreakBeforeBraces: Custom
diff --git a/src/Client.cc b/src/Client.cc
index 97fc573..16fb13d 100644
--- a/src/Client.cc
+++ b/src/Client.cc
@@ -38,133 +38,133 @@ static const std::string CFG_STATS_INTERVAL = "statsIntervalInSeconds";
 Napi::FunctionReference Client::constructor;
 
 Napi::Object Client::Init(Napi::Env env, Napi::Object exports) {
-    Napi::HandleScope scope(env);
+  Napi::HandleScope scope(env);
 
-    Napi::Function func = DefineClass(
-        env, "Client",
-        {InstanceMethod("createProducer", &Client::CreateProducer),
-         InstanceMethod("subscribe", &Client::Subscribe), InstanceMethod("close", &Client::Close)});
+  Napi::Function func =
+      DefineClass(env, "Client",
+                  {InstanceMethod("createProducer", &Client::CreateProducer),
+                   InstanceMethod("subscribe", &Client::Subscribe), InstanceMethod("close", &Client::Close)});
 
-    constructor = Napi::Persistent(func);
-    constructor.SuppressDestruct();
+  constructor = Napi::Persistent(func);
+  constructor.SuppressDestruct();
 
-    exports.Set("Client", func);
-    return exports;
+  exports.Set("Client", func);
+  return exports;
 }
 
 Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Client>(info) {
-    Napi::Env env = info.Env();
-    Napi::HandleScope scope(env);
-    Napi::Object clientConfig = info[0].As<Napi::Object>();
-
-    if (!clientConfig.Has(CFG_SERVICE_URL) || !clientConfig.Get(CFG_SERVICE_URL).IsString()) {
-        if (clientConfig.Get(CFG_SERVICE_URL).ToString().Utf8Value().empty()) {
-            Napi::Error::New(env, "Service URL is required and must be specified as a string")
-                .ThrowAsJavaScriptException();
-            return;
-        }
+  Napi::Env env = info.Env();
+  Napi::HandleScope scope(env);
+  Napi::Object clientConfig = info[0].As<Napi::Object>();
+
+  if (!clientConfig.Has(CFG_SERVICE_URL) || !clientConfig.Get(CFG_SERVICE_URL).IsString()) {
+    if (clientConfig.Get(CFG_SERVICE_URL).ToString().Utf8Value().empty()) {
+      Napi::Error::New(env, "Service URL is required and must be specified as a string")
+          .ThrowAsJavaScriptException();
+      return;
     }
-    Napi::String serviceUrl = clientConfig.Get(CFG_SERVICE_URL).ToString();
+  }
+  Napi::String serviceUrl = clientConfig.Get(CFG_SERVICE_URL).ToString();
 
-    pulsar_client_configuration_t *cClientConfig = pulsar_client_configuration_create();
+  pulsar_client_configuration_t *cClientConfig = pulsar_client_configuration_create();
 
-    if (clientConfig.Has(CFG_OP_TIMEOUT) && clientConfig.Get(CFG_OP_TIMEOUT).IsNumber()) {
-        int32_t operationTimeoutSeconds = clientConfig.Get(CFG_OP_TIMEOUT).ToNumber().Int32Value();
-        if (operationTimeoutSeconds > 0) {
-            pulsar_client_configuration_set_operation_timeout_seconds(cClientConfig, operationTimeoutSeconds);
-        }
+  if (clientConfig.Has(CFG_OP_TIMEOUT) && clientConfig.Get(CFG_OP_TIMEOUT).IsNumber()) {
+    int32_t operationTimeoutSeconds = clientConfig.Get(CFG_OP_TIMEOUT).ToNumber().Int32Value();
+    if (operationTimeoutSeconds > 0) {
+      pulsar_client_configuration_set_operation_timeout_seconds(cClientConfig, operationTimeoutSeconds);
     }
+  }
 
-    if (clientConfig.Has(CFG_IO_THREADS) && clientConfig.Get(CFG_IO_THREADS).IsNumber()) {
-        int32_t ioThreads = clientConfig.Get(CFG_IO_THREADS).ToNumber().Int32Value();
-        if (ioThreads > 0) {
-            pulsar_client_configuration_set_io_threads(cClientConfig, ioThreads);
-        }
+  if (clientConfig.Has(CFG_IO_THREADS) && clientConfig.Get(CFG_IO_THREADS).IsNumber()) {
+    int32_t ioThreads = clientConfig.Get(CFG_IO_THREADS).ToNumber().Int32Value();
+    if (ioThreads > 0) {
+      pulsar_client_configuration_set_io_threads(cClientConfig, ioThreads);
     }
+  }
 
-    if (clientConfig.Has(CFG_LISTENER_THREADS) && clientConfig.Get(CFG_LISTENER_THREADS).IsNumber()) {
-        int32_t messageListenerThreads = clientConfig.Get(CFG_LISTENER_THREADS).ToNumber().Int32Value();
-        if (messageListenerThreads > 0) {
-            pulsar_client_configuration_set_message_listener_threads(cClientConfig, messageListenerThreads);
-        }
+  if (clientConfig.Has(CFG_LISTENER_THREADS) && clientConfig.Get(CFG_LISTENER_THREADS).IsNumber()) {
+    int32_t messageListenerThreads = clientConfig.Get(CFG_LISTENER_THREADS).ToNumber().Int32Value();
+    if (messageListenerThreads > 0) {
+      pulsar_client_configuration_set_message_listener_threads(cClientConfig, messageListenerThreads);
     }
+  }
 
-    if (clientConfig.Has(CFG_CONCURRENT_LOOKUP) && clientConfig.Get(CFG_CONCURRENT_LOOKUP).IsNumber()) {
-        int32_t concurrentLookupRequest = clientConfig.Get(CFG_CONCURRENT_LOOKUP).ToNumber().Int32Value();
-        if (concurrentLookupRequest > 0) {
-            pulsar_client_configuration_set_concurrent_lookup_request(cClientConfig, concurrentLookupRequest);
-        }
+  if (clientConfig.Has(CFG_CONCURRENT_LOOKUP) && clientConfig.Get(CFG_CONCURRENT_LOOKUP).IsNumber()) {
+    int32_t concurrentLookupRequest = clientConfig.Get(CFG_CONCURRENT_LOOKUP).ToNumber().Int32Value();
+    if (concurrentLookupRequest > 0) {
+      pulsar_client_configuration_set_concurrent_lookup_request(cClientConfig, concurrentLookupRequest);
     }
-
-    if (clientConfig.Has(CFG_USE_TLS) && clientConfig.Get(CFG_USE_TLS).IsBoolean()) {
-        Napi::Boolean useTls = clientConfig.Get(CFG_USE_TLS).ToBoolean();
-        pulsar_client_configuration_set_use_tls(cClientConfig, useTls.Value());
-    }
-
-    if (clientConfig.Has(CFG_TLS_TRUST_CERT) && clientConfig.Get(CFG_TLS_TRUST_CERT).IsString()) {
-        Napi::String tlsTrustCertsFilePath = clientConfig.Get(CFG_TLS_TRUST_CERT).ToString();
-        pulsar_client_configuration_set_tls_trust_certs_file_path(cClientConfig,
-                                                                  tlsTrustCertsFilePath.Utf8Value().c_str());
-    }
-
-    if (clientConfig.Has(CFG_TLS_VALIDATE_HOSTNAME) &&
-        clientConfig.Get(CFG_TLS_VALIDATE_HOSTNAME).IsBoolean()) {
-        Napi::Boolean tlsValidateHostname = clientConfig.Get(CFG_TLS_VALIDATE_HOSTNAME).ToBoolean();
-        pulsar_client_configuration_set_validate_hostname(cClientConfig, tlsValidateHostname.Value());
-    }
-
-    if (clientConfig.Has(CFG_TLS_ALLOW_INSECURE) && clientConfig.Get(CFG_TLS_ALLOW_INSECURE).IsBoolean()) {
-        Napi::Boolean tlsAllowInsecureConnection = clientConfig.Get(CFG_TLS_ALLOW_INSECURE).ToBoolean();
-        pulsar_client_configuration_set_tls_allow_insecure_connection(cClientConfig,
-                                                                      tlsAllowInsecureConnection.Value());
-    }
-
-    if (clientConfig.Has(CFG_STATS_INTERVAL) && clientConfig.Get(CFG_STATS_INTERVAL).IsNumber()) {
-        uint32_t statsIntervalInSeconds = clientConfig.Get(CFG_STATS_INTERVAL).ToNumber().Uint32Value();
-        if (statsIntervalInSeconds > 0) {
-            pulsar_client_configuration_set_stats_interval_in_seconds(cClientConfig, statsIntervalInSeconds);
-        }
+  }
+
+  if (clientConfig.Has(CFG_USE_TLS) && clientConfig.Get(CFG_USE_TLS).IsBoolean()) {
+    Napi::Boolean useTls = clientConfig.Get(CFG_USE_TLS).ToBoolean();
+    pulsar_client_configuration_set_use_tls(cClientConfig, useTls.Value());
+  }
+
+  if (clientConfig.Has(CFG_TLS_TRUST_CERT) && clientConfig.Get(CFG_TLS_TRUST_CERT).IsString()) {
+    Napi::String tlsTrustCertsFilePath = clientConfig.Get(CFG_TLS_TRUST_CERT).ToString();
+    pulsar_client_configuration_set_tls_trust_certs_file_path(cClientConfig,
+                                                              tlsTrustCertsFilePath.Utf8Value().c_str());
+  }
+
+  if (clientConfig.Has(CFG_TLS_VALIDATE_HOSTNAME) &&
+      clientConfig.Get(CFG_TLS_VALIDATE_HOSTNAME).IsBoolean()) {
+    Napi::Boolean tlsValidateHostname = clientConfig.Get(CFG_TLS_VALIDATE_HOSTNAME).ToBoolean();
+    pulsar_client_configuration_set_validate_hostname(cClientConfig, tlsValidateHostname.Value());
+  }
+
+  if (clientConfig.Has(CFG_TLS_ALLOW_INSECURE) && clientConfig.Get(CFG_TLS_ALLOW_INSECURE).IsBoolean()) {
+    Napi::Boolean tlsAllowInsecureConnection = clientConfig.Get(CFG_TLS_ALLOW_INSECURE).ToBoolean();
+    pulsar_client_configuration_set_tls_allow_insecure_connection(cClientConfig,
+                                                                  tlsAllowInsecureConnection.Value());
+  }
+
+  if (clientConfig.Has(CFG_STATS_INTERVAL) && clientConfig.Get(CFG_STATS_INTERVAL).IsNumber()) {
+    uint32_t statsIntervalInSeconds = clientConfig.Get(CFG_STATS_INTERVAL).ToNumber().Uint32Value();
+    if (statsIntervalInSeconds > 0) {
+      pulsar_client_configuration_set_stats_interval_in_seconds(cClientConfig, statsIntervalInSeconds);
     }
+  }
 
-    this->cClient = pulsar_client_create(serviceUrl.Utf8Value().c_str(), cClientConfig);
-    pulsar_client_configuration_free(cClientConfig);
+  this->cClient = pulsar_client_create(serviceUrl.Utf8Value().c_str(), cClientConfig);
+  pulsar_client_configuration_free(cClientConfig);
 }
 
 Client::~Client() { pulsar_client_free(this->cClient); }
 
 Napi::Value Client::CreateProducer(const Napi::CallbackInfo &info) {
-    return Producer::NewInstance(info, this->cClient);
+  return Producer::NewInstance(info, this->cClient);
 }
 
 Napi::Value Client::Subscribe(const Napi::CallbackInfo &info) {
-    return Consumer::NewInstance(info, this->cClient);
+  return Consumer::NewInstance(info, this->cClient);
 }
 
 class ClientCloseWorker : public Napi::AsyncWorker {
-   public:
-    ClientCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient)
-        : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
-          deferred(deferred),
-          cClient(cClient) {}
-    ~ClientCloseWorker() {}
-    void Execute() {
-        pulsar_result result = pulsar_client_close(this->cClient);
-        if (result != pulsar_result_Ok) SetError(pulsar_result_str(result));
-    }
-    void OnOK() { this->deferred.Resolve(Env().Null()); }
-    void OnError(const Napi::Error &e) {
-        this->deferred.Reject(
-            Napi::Error::New(Env(), std::string("Failed to close client: ") + e.Message()).Value());
-    }
-
-   private:
-    Napi::Promise::Deferred deferred;
-    pulsar_client_t *cClient;
+ public:
+  ClientCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient)
+      : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
+        deferred(deferred),
+        cClient(cClient) {}
+  ~ClientCloseWorker() {}
+  void Execute() {
+    pulsar_result result = pulsar_client_close(this->cClient);
+    if (result != pulsar_result_Ok) SetError(pulsar_result_str(result));
+  }
+  void OnOK() { this->deferred.Resolve(Env().Null()); }
+  void OnError(const Napi::Error &e) {
+    this->deferred.Reject(
+        Napi::Error::New(Env(), std::string("Failed to close client: ") + e.Message()).Value());
+  }
+
+ private:
+  Napi::Promise::Deferred deferred;
+  pulsar_client_t *cClient;
 };
 
 Napi::Value Client::Close(const Napi::CallbackInfo &info) {
-    Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
-    ClientCloseWorker *wk = new ClientCloseWorker(deferred, this->cClient);
-    wk->Queue();
-    return deferred.Promise();
+  Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
+  ClientCloseWorker *wk = new ClientCloseWorker(deferred, this->cClient);
+  wk->Queue();
+  return deferred.Promise();
 }
diff --git a/src/Client.h b/src/Client.h
index bc92125..63c66f2 100644
--- a/src/Client.h
+++ b/src/Client.h
@@ -24,18 +24,18 @@
 #include <pulsar/c/client.h>
 
 class Client : public Napi::ObjectWrap<Client> {
-   public:
-    static Napi::Object Init(Napi::Env env, Napi::Object exports);
-    Client(const Napi::CallbackInfo &info);
-    ~Client();
+ public:
+  static Napi::Object Init(Napi::Env env, Napi::Object exports);
+  Client(const Napi::CallbackInfo &info);
+  ~Client();
 
-   private:
-    static Napi::FunctionReference constructor;
-    pulsar_client_t *cClient;
+ private:
+  static Napi::FunctionReference constructor;
+  pulsar_client_t *cClient;
 
-    Napi::Value CreateProducer(const Napi::CallbackInfo &info);
-    Napi::Value Subscribe(const Napi::CallbackInfo &info);
-    Napi::Value Close(const Napi::CallbackInfo &info);
+  Napi::Value CreateProducer(const Napi::CallbackInfo &info);
+  Napi::Value Subscribe(const Napi::CallbackInfo &info);
+  Napi::Value Close(const Napi::CallbackInfo &info);
 };
 
 #endif
diff --git a/src/Consumer.cc b/src/Consumer.cc
index 85e0fbf..db6fab4 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -26,18 +26,18 @@
 Napi::FunctionReference Consumer::constructor;
 
 void Consumer::Init(Napi::Env env, Napi::Object exports) {
-    Napi::HandleScope scope(env);
-
-    Napi::Function func = DefineClass(env, "Consumer",
-                                      {
-                                          InstanceMethod("receive", &Consumer::Receive),
-                                          InstanceMethod("acknowledge", &Consumer::Acknowledge),
-                                          InstanceMethod("acknowledgeId", &Consumer::AcknowledgeId),
-                                          InstanceMethod("close", &Consumer::Close),
-                                      });
-
-    constructor = Napi::Persistent(func);
-    constructor.SuppressDestruct();
+  Napi::HandleScope scope(env);
+
+  Napi::Function func = DefineClass(env, "Consumer",
+                                    {
+                                        InstanceMethod("receive", &Consumer::Receive),
+                                        InstanceMethod("acknowledge", &Consumer::Acknowledge),
+                                        InstanceMethod("acknowledgeId", &Consumer::AcknowledgeId),
+                                        InstanceMethod("close", &Consumer::Close),
+                                    });
+
+  constructor = Napi::Persistent(func);
+  constructor.SuppressDestruct();
 }
 
 void Consumer::SetCConsumer(pulsar_consumer_t *cConsumer) { this->cConsumer = cConsumer; }
@@ -45,144 +45,139 @@ void Consumer::SetCConsumer(pulsar_consumer_t *cConsumer) { this->cConsumer = cC
 Consumer::Consumer(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Consumer>(info) {}
 
 class ConsumerNewInstanceWorker : public Napi::AsyncWorker {
-   public:
-    ConsumerNewInstanceWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient,
-                              ConsumerConfig *consumerConfig)
-        : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
-          deferred(deferred),
-          cClient(cClient),
-          consumerConfig(consumerConfig) {}
-    ~ConsumerNewInstanceWorker() {}
-    void Execute() {
-        const std::string &topic = this->consumerConfig->GetTopic();
-        if (topic.empty()) {
-            SetError(
-                std::string("Topic is required and must be specified as a string when creating consumer"));
-            return;
-        }
-        const std::string &subscription = this->consumerConfig->GetSubscription();
-        if (subscription.empty()) {
-            SetError(std::string(
-                "Subscription is required and must be specified as a string when creating consumer"));
-            return;
-        }
-        int32_t ackTimeoutMs = this->consumerConfig->GetAckTimeoutMs();
-        if (ackTimeoutMs != 0 && ackTimeoutMs < MIN_ACK_TIMEOUT_MILLIS) {
-            std::string msg("Ack timeout should be 0 or greater than or equal to " +
-                            std::to_string(MIN_ACK_TIMEOUT_MILLIS));
-            SetError(msg);
-            return;
-        }
-
-        pulsar_result result =
-            pulsar_client_subscribe(this->cClient, topic.c_str(), subscription.c_str(),
-                                    this->consumerConfig->GetCConsumerConfig(), &(this->cConsumer));
-        delete this->consumerConfig;
-        if (result != pulsar_result_Ok) {
-            SetError(std::string("Failed to create consumer: ") + pulsar_result_str(result));
-            return;
-        }
+ public:
+  ConsumerNewInstanceWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient,
+                            ConsumerConfig *consumerConfig)
+      : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
+        deferred(deferred),
+        cClient(cClient),
+        consumerConfig(consumerConfig) {}
+  ~ConsumerNewInstanceWorker() {}
+  void Execute() {
+    const std::string &topic = this->consumerConfig->GetTopic();
+    if (topic.empty()) {
+      SetError(std::string("Topic is required and must be specified as a string when creating consumer"));
+      return;
     }
-    void OnOK() {
-        Napi::Object obj = Consumer::constructor.New({});
-        Consumer *consumer = Consumer::Unwrap(obj);
-        consumer->SetCConsumer(this->cConsumer);
-        this->deferred.Resolve(obj);
+    const std::string &subscription = this->consumerConfig->GetSubscription();
+    if (subscription.empty()) {
+      SetError(
+          std::string("Subscription is required and must be specified as a string when creating consumer"));
+      return;
     }
-    void OnError(const Napi::Error &e) {
-        this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value());
+    int32_t ackTimeoutMs = this->consumerConfig->GetAckTimeoutMs();
+    if (ackTimeoutMs != 0 && ackTimeoutMs < MIN_ACK_TIMEOUT_MILLIS) {
+      std::string msg("Ack timeout should be 0 or greater than or equal to " +
+                      std::to_string(MIN_ACK_TIMEOUT_MILLIS));
+      SetError(msg);
+      return;
     }
 
-   private:
-    Napi::Promise::Deferred deferred;
-    pulsar_client_t *cClient;
-    ConsumerConfig *consumerConfig;
-    pulsar_consumer_t *cConsumer;
+    pulsar_result result =
+        pulsar_client_subscribe(this->cClient, topic.c_str(), subscription.c_str(),
+                                this->consumerConfig->GetCConsumerConfig(), &(this->cConsumer));
+    delete this->consumerConfig;
+    if (result != pulsar_result_Ok) {
+      SetError(std::string("Failed to create consumer: ") + pulsar_result_str(result));
+      return;
+    }
+  }
+  void OnOK() {
+    Napi::Object obj = Consumer::constructor.New({});
+    Consumer *consumer = Consumer::Unwrap(obj);
+    consumer->SetCConsumer(this->cConsumer);
+    this->deferred.Resolve(obj);
+  }
+  void OnError(const Napi::Error &e) { this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value()); }
+
+ private:
+  Napi::Promise::Deferred deferred;
+  pulsar_client_t *cClient;
+  ConsumerConfig *consumerConfig;
+  pulsar_consumer_t *cConsumer;
 };
 
 Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient) {
-    Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
-    Napi::Object config = info[0].As<Napi::Object>();
-    ConsumerConfig *consumerConfig = new ConsumerConfig(config);
-    ConsumerNewInstanceWorker *wk = new ConsumerNewInstanceWorker(deferred, cClient, consumerConfig);
-    wk->Queue();
-    return deferred.Promise();
+  Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
+  Napi::Object config = info[0].As<Napi::Object>();
+  ConsumerConfig *consumerConfig = new ConsumerConfig(config);
+  ConsumerNewInstanceWorker *wk = new ConsumerNewInstanceWorker(deferred, cClient, consumerConfig);
+  wk->Queue();
+  return deferred.Promise();
 }
 
 class ConsumerReceiveWorker : public Napi::AsyncWorker {
-   public:
-    ConsumerReceiveWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer)
-        : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
-          deferred(deferred),
-          cConsumer(cConsumer) {}
-    ~ConsumerReceiveWorker() {}
-    void Execute() {
-        pulsar_result result = pulsar_consumer_receive(this->cConsumer, &(this->cMessage));
-
-        if (result != pulsar_result_Ok) {
-            SetError(std::string("Failed to received message ") + pulsar_result_str(result));
-        }
-    }
-    void OnOK() {
-        Napi::Object obj = Message::NewInstance({}, this->cMessage);
-        this->deferred.Resolve(obj);
+ public:
+  ConsumerReceiveWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer)
+      : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
+        deferred(deferred),
+        cConsumer(cConsumer) {}
+  ~ConsumerReceiveWorker() {}
+  void Execute() {
+    pulsar_result result = pulsar_consumer_receive(this->cConsumer, &(this->cMessage));
+
+    if (result != pulsar_result_Ok) {
+      SetError(std::string("Failed to received message ") + pulsar_result_str(result));
     }
-    void OnError(const Napi::Error &e) {
-        this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value());
-    }
-
-   private:
-    Napi::Promise::Deferred deferred;
-    pulsar_consumer_t *cConsumer;
-    pulsar_message_t *cMessage;
+  }
+  void OnOK() {
+    Napi::Object obj = Message::NewInstance({}, this->cMessage);
+    this->deferred.Resolve(obj);
+  }
+  void OnError(const Napi::Error &e) { this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value()); }
+
+ private:
+  Napi::Promise::Deferred deferred;
+  pulsar_consumer_t *cConsumer;
+  pulsar_message_t *cMessage;
 };
 
 Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) {
-    Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
-    ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->cConsumer);
-    wk->Queue();
-    return deferred.Promise();
+  Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
+  ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->cConsumer);
+  wk->Queue();
+  return deferred.Promise();
 }
 
 void Consumer::Acknowledge(const Napi::CallbackInfo &info) {
-    Napi::Object obj = info[0].As<Napi::Object>();
-    Message *msg = Message::Unwrap(obj);
-    pulsar_consumer_acknowledge_async(this->cConsumer, msg->GetCMessage(), NULL, NULL);
+  Napi::Object obj = info[0].As<Napi::Object>();
+  Message *msg = Message::Unwrap(obj);
+  pulsar_consumer_acknowledge_async(this->cConsumer, msg->GetCMessage(), NULL, NULL);
 }
 
 void Consumer::AcknowledgeId(const Napi::CallbackInfo &info) {
-    Napi::Object obj = info[0].As<Napi::Object>();
-    MessageId *msgId = MessageId::Unwrap(obj);
-    pulsar_consumer_acknowledge_async_id(this->cConsumer, msgId->GetCMessageId(), NULL, NULL);
+  Napi::Object obj = info[0].As<Napi::Object>();
+  MessageId *msgId = MessageId::Unwrap(obj);
+  pulsar_consumer_acknowledge_async_id(this->cConsumer, msgId->GetCMessageId(), NULL, NULL);
 }
 
 class ConsumerCloseWorker : public Napi::AsyncWorker {
-   public:
-    ConsumerCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer)
-        : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
-          deferred(deferred),
-          cConsumer(cConsumer) {}
-    ~ConsumerCloseWorker() {}
-    void Execute() {
-        pulsar_result result = pulsar_consumer_close(this->cConsumer);
-        if (result != pulsar_result_Ok) SetError(pulsar_result_str(result));
-    }
-    void OnOK() { this->deferred.Resolve(Env().Null()); }
-    void OnError(const Napi::Error &e) {
-        this->deferred.Reject(
-            Napi::Error::New(Env(), std::string("Failed to close consumer: ") + e.Message()).Value());
-    }
-
-   private:
-    Napi::Promise::Deferred deferred;
-    pulsar_consumer_t *cConsumer;
+ public:
+  ConsumerCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer)
+      : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
+        deferred(deferred),
+        cConsumer(cConsumer) {}
+  ~ConsumerCloseWorker() {}
+  void Execute() {
+    pulsar_result result = pulsar_consumer_close(this->cConsumer);
+    if (result != pulsar_result_Ok) SetError(pulsar_result_str(result));
+  }
+  void OnOK() { this->deferred.Resolve(Env().Null()); }
+  void OnError(const Napi::Error &e) {
+    this->deferred.Reject(
+        Napi::Error::New(Env(), std::string("Failed to close consumer: ") + e.Message()).Value());
+  }
+
+ private:
+  Napi::Promise::Deferred deferred;
+  pulsar_consumer_t *cConsumer;
 };
 
 Napi::Value Consumer::Close(const Napi::CallbackInfo &info) {
-    Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
-    ConsumerCloseWorker *wk = new ConsumerCloseWorker(deferred, this->cConsumer);
-    wk->Queue();
-    return deferred.Promise();
+  Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
+  ConsumerCloseWorker *wk = new ConsumerCloseWorker(deferred, this->cConsumer);
+  wk->Queue();
+  return deferred.Promise();
 }
 
 Consumer::~Consumer() { pulsar_consumer_free(this->cConsumer); }
diff --git a/src/Consumer.h b/src/Consumer.h
index 8e77df3..fe68124 100644
--- a/src/Consumer.h
+++ b/src/Consumer.h
@@ -24,21 +24,21 @@
 #include <pulsar/c/client.h>
 
 class Consumer : public Napi::ObjectWrap<Consumer> {
-   public:
-    static void Init(Napi::Env env, Napi::Object exports);
-    static Napi::Value NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient);
-    static Napi::FunctionReference constructor;
-    Consumer(const Napi::CallbackInfo &info);
-    ~Consumer();
-    void SetCConsumer(pulsar_consumer_t *cConsumer);
+ public:
+  static void Init(Napi::Env env, Napi::Object exports);
+  static Napi::Value NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient);
+  static Napi::FunctionReference constructor;
+  Consumer(const Napi::CallbackInfo &info);
+  ~Consumer();
+  void SetCConsumer(pulsar_consumer_t *cConsumer);
 
-   private:
-    pulsar_consumer_t *cConsumer;
+ private:
+  pulsar_consumer_t *cConsumer;
 
-    Napi::Value Receive(const Napi::CallbackInfo &info);
-    void Acknowledge(const Napi::CallbackInfo &info);
-    void AcknowledgeId(const Napi::CallbackInfo &info);
-    Napi::Value Close(const Napi::CallbackInfo &info);
+  Napi::Value Receive(const Napi::CallbackInfo &info);
+  void Acknowledge(const Napi::CallbackInfo &info);
+  void AcknowledgeId(const Napi::CallbackInfo &info);
+  Napi::Value Close(const Napi::CallbackInfo &info);
 };
 
 #endif
diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc
index 78f1605..a98bb69 100644
--- a/src/ConsumerConfig.cc
+++ b/src/ConsumerConfig.cc
@@ -37,64 +37,63 @@ static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
 
 ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig)
     : topic(""), subscription(""), ackTimeoutMs(0) {
-    this->cConsumerConfig = pulsar_consumer_configuration_create();
+  this->cConsumerConfig = pulsar_consumer_configuration_create();
 
-    if (consumerConfig.Has(CFG_TOPIC) && consumerConfig.Get(CFG_TOPIC).IsString()) {
-        this->topic = consumerConfig.Get(CFG_TOPIC).ToString().Utf8Value();
-    }
+  if (consumerConfig.Has(CFG_TOPIC) && consumerConfig.Get(CFG_TOPIC).IsString()) {
+    this->topic = consumerConfig.Get(CFG_TOPIC).ToString().Utf8Value();
+  }
 
-    if (consumerConfig.Has(CFG_SUBSCRIPTION) && consumerConfig.Get(CFG_SUBSCRIPTION).IsString()) {
-        this->subscription = consumerConfig.Get(CFG_SUBSCRIPTION).ToString().Utf8Value();
-    }
+  if (consumerConfig.Has(CFG_SUBSCRIPTION) && consumerConfig.Get(CFG_SUBSCRIPTION).IsString()) {
+    this->subscription = consumerConfig.Get(CFG_SUBSCRIPTION).ToString().Utf8Value();
+  }
 
-    if (consumerConfig.Has(CFG_SUBSCRIPTION_TYPE) && consumerConfig.Get(CFG_SUBSCRIPTION_TYPE).IsString()) {
-        std::string subscriptionType = consumerConfig.Get(CFG_SUBSCRIPTION_TYPE).ToString().Utf8Value();
-        if (SUBSCRIPTION_TYPE.count(subscriptionType)) {
-            pulsar_consumer_configuration_set_consumer_type(this->cConsumerConfig,
-                                                            SUBSCRIPTION_TYPE.at(subscriptionType));
-        }
+  if (consumerConfig.Has(CFG_SUBSCRIPTION_TYPE) && consumerConfig.Get(CFG_SUBSCRIPTION_TYPE).IsString()) {
+    std::string subscriptionType = consumerConfig.Get(CFG_SUBSCRIPTION_TYPE).ToString().Utf8Value();
+    if (SUBSCRIPTION_TYPE.count(subscriptionType)) {
+      pulsar_consumer_configuration_set_consumer_type(this->cConsumerConfig,
+                                                      SUBSCRIPTION_TYPE.at(subscriptionType));
     }
+  }
 
-    if (consumerConfig.Has(CFG_CONSUMER_NAME) && consumerConfig.Get(CFG_CONSUMER_NAME).IsString()) {
-        std::string consumerName = consumerConfig.Get(CFG_CONSUMER_NAME).ToString().Utf8Value();
-        if (!consumerName.empty())
-            pulsar_consumer_set_consumer_name(this->cConsumerConfig, consumerName.c_str());
-    }
+  if (consumerConfig.Has(CFG_CONSUMER_NAME) && consumerConfig.Get(CFG_CONSUMER_NAME).IsString()) {
+    std::string consumerName = consumerConfig.Get(CFG_CONSUMER_NAME).ToString().Utf8Value();
+    if (!consumerName.empty()) pulsar_consumer_set_consumer_name(this->cConsumerConfig, consumerName.c_str());
+  }
 
-    if (consumerConfig.Has(CFG_ACK_TIMEOUT) && consumerConfig.Get(CFG_ACK_TIMEOUT).IsNumber()) {
-        this->ackTimeoutMs = consumerConfig.Get(CFG_ACK_TIMEOUT).ToNumber().Int64Value();
-        if (this->ackTimeoutMs == 0 || this->ackTimeoutMs >= MIN_ACK_TIMEOUT_MILLIS) {
-            pulsar_consumer_set_unacked_messages_timeout_ms(this->cConsumerConfig, this->ackTimeoutMs);
-        }
+  if (consumerConfig.Has(CFG_ACK_TIMEOUT) && consumerConfig.Get(CFG_ACK_TIMEOUT).IsNumber()) {
+    this->ackTimeoutMs = consumerConfig.Get(CFG_ACK_TIMEOUT).ToNumber().Int64Value();
+    if (this->ackTimeoutMs == 0 || this->ackTimeoutMs >= MIN_ACK_TIMEOUT_MILLIS) {
+      pulsar_consumer_set_unacked_messages_timeout_ms(this->cConsumerConfig, this->ackTimeoutMs);
     }
+  }
 
-    if (consumerConfig.Has(CFG_RECV_QUEUE) && consumerConfig.Get(CFG_RECV_QUEUE).IsNumber()) {
-        int32_t receiverQueueSize = consumerConfig.Get(CFG_RECV_QUEUE).ToNumber().Int32Value();
-        if (receiverQueueSize >= 0) {
-            pulsar_consumer_configuration_set_receiver_queue_size(this->cConsumerConfig, receiverQueueSize);
-        }
+  if (consumerConfig.Has(CFG_RECV_QUEUE) && consumerConfig.Get(CFG_RECV_QUEUE).IsNumber()) {
+    int32_t receiverQueueSize = consumerConfig.Get(CFG_RECV_QUEUE).ToNumber().Int32Value();
+    if (receiverQueueSize >= 0) {
+      pulsar_consumer_configuration_set_receiver_queue_size(this->cConsumerConfig, receiverQueueSize);
     }
+  }
 
-    if (consumerConfig.Has(CFG_RECV_QUEUE_ACROSS_PARTITIONS) &&
-        consumerConfig.Get(CFG_RECV_QUEUE_ACROSS_PARTITIONS).IsNumber()) {
-        int32_t receiverQueueSizeAcrossPartitions =
-            consumerConfig.Get(CFG_RECV_QUEUE_ACROSS_PARTITIONS).ToNumber().Int32Value();
-        if (receiverQueueSizeAcrossPartitions >= 0) {
-            pulsar_consumer_set_max_total_receiver_queue_size_across_partitions(
-                this->cConsumerConfig, receiverQueueSizeAcrossPartitions);
-        }
+  if (consumerConfig.Has(CFG_RECV_QUEUE_ACROSS_PARTITIONS) &&
+      consumerConfig.Get(CFG_RECV_QUEUE_ACROSS_PARTITIONS).IsNumber()) {
+    int32_t receiverQueueSizeAcrossPartitions =
+        consumerConfig.Get(CFG_RECV_QUEUE_ACROSS_PARTITIONS).ToNumber().Int32Value();
+    if (receiverQueueSizeAcrossPartitions >= 0) {
+      pulsar_consumer_set_max_total_receiver_queue_size_across_partitions(this->cConsumerConfig,
+                                                                          receiverQueueSizeAcrossPartitions);
     }
+  }
 
-    if (consumerConfig.Has(CFG_PROPS) && consumerConfig.Get(CFG_PROPS).IsObject()) {
-        Napi::Object propObj = consumerConfig.Get(CFG_PROPS).ToObject();
-        Napi::Array arr = propObj.GetPropertyNames();
-        int size = arr.Length();
-        for (int i = 0; i < size; i++) {
-            std::string key = arr.Get(i).ToString().Utf8Value();
-            std::string value = propObj.Get(key).ToString().Utf8Value();
-            pulsar_consumer_configuration_set_property(this->cConsumerConfig, key.c_str(), value.c_str());
-        }
+  if (consumerConfig.Has(CFG_PROPS) && consumerConfig.Get(CFG_PROPS).IsObject()) {
+    Napi::Object propObj = consumerConfig.Get(CFG_PROPS).ToObject();
+    Napi::Array arr = propObj.GetPropertyNames();
+    int size = arr.Length();
+    for (int i = 0; i < size; i++) {
+      std::string key = arr.Get(i).ToString().Utf8Value();
+      std::string value = propObj.Get(key).ToString().Utf8Value();
+      pulsar_consumer_configuration_set_property(this->cConsumerConfig, key.c_str(), value.c_str());
     }
+  }
 }
 
 ConsumerConfig::~ConsumerConfig() { pulsar_consumer_configuration_free(this->cConsumerConfig); }
diff --git a/src/ConsumerConfig.h b/src/ConsumerConfig.h
index 00c890e..7070bf9 100644
--- a/src/ConsumerConfig.h
+++ b/src/ConsumerConfig.h
@@ -26,19 +26,19 @@
 #define MIN_ACK_TIMEOUT_MILLIS 10000
 
 class ConsumerConfig {
-   public:
-    ConsumerConfig(const Napi::Object &consumerConfig);
-    ~ConsumerConfig();
-    pulsar_consumer_configuration_t *GetCConsumerConfig();
-    std::string GetTopic();
-    std::string GetSubscription();
-    int64_t GetAckTimeoutMs();
+ public:
+  ConsumerConfig(const Napi::Object &consumerConfig);
+  ~ConsumerConfig();
+  pulsar_consumer_configuration_t *GetCConsumerConfig();
+  std::string GetTopic();
+  std::string GetSubscription();
+  int64_t GetAckTimeoutMs();
 
-   private:
-    pulsar_consumer_configuration_t *cConsumerConfig;
-    std::string topic;
-    std::string subscription;
-    int64_t ackTimeoutMs;
+ private:
+  pulsar_consumer_configuration_t *cConsumerConfig;
+  std::string topic;
+  std::string subscription;
+  int64_t ackTimeoutMs;
 };
 
 #endif
diff --git a/src/Message.cc b/src/Message.cc
index 9a0129e..3c527d5 100644
--- a/src/Message.cc
+++ b/src/Message.cc
@@ -31,29 +31,29 @@ static const std::string CFG_REPL_CLUSTERS = "replicationClusters";
 Napi::FunctionReference Message::constructor;
 
 Napi::Object Message::Init(Napi::Env env, Napi::Object exports) {
-    Napi::HandleScope scope(env);
-
-    Napi::Function func = DefineClass(
-        env, "Message",
-        {InstanceMethod("getTopicName", &Message::GetTopicName),
-         InstanceMethod("getProperties", &Message::GetProperties),
-         InstanceMethod("getData", &Message::GetData), InstanceMethod("getMessageId", &Message::GetMessageId),
-         InstanceMethod("getPublishTimestamp", &Message::GetPublishTimestamp),
-         InstanceMethod("getEventTimestamp", &Message::GetEventTimestamp),
-         InstanceMethod("getPartitionKey", &Message::GetPartitionKey)});
-
-    constructor = Napi::Persistent(func);
-    constructor.SuppressDestruct();
-
-    exports.Set("Message", func);
-    return exports;
+  Napi::HandleScope scope(env);
+
+  Napi::Function func = DefineClass(
+      env, "Message",
+      {InstanceMethod("getTopicName", &Message::GetTopicName),
+       InstanceMethod("getProperties", &Message::GetProperties), InstanceMethod("getData", &Message::GetData),
+       InstanceMethod("getMessageId", &Message::GetMessageId),
+       InstanceMethod("getPublishTimestamp", &Message::GetPublishTimestamp),
+       InstanceMethod("getEventTimestamp", &Message::GetEventTimestamp),
+       InstanceMethod("getPartitionKey", &Message::GetPartitionKey)});
+
+  constructor = Napi::Persistent(func);
+  constructor.SuppressDestruct();
+
+  exports.Set("Message", func);
+  return exports;
 }
 
 Napi::Object Message::NewInstance(Napi::Value arg, pulsar_message_t *cMessage) {
-    Napi::Object obj = constructor.New({});
-    Message *msg = Unwrap(obj);
-    msg->cMessage = cMessage;
-    return obj;
+  Napi::Object obj = constructor.New({});
+  Message *msg = Unwrap(obj);
+  msg->cMessage = cMessage;
+  return obj;
 }
 
 Message::Message(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Message>(info), cMessage(nullptr) {}
@@ -61,137 +61,137 @@ Message::Message(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Message>(inf
 pulsar_message_t *Message::GetCMessage() { return this->cMessage; }
 
 Napi::Value Message::GetTopicName(const Napi::CallbackInfo &info) {
-    Napi::Env env = info.Env();
-    if (!ValidateCMessage(env)) {
-        return env.Null();
-    }
-    return Napi::String::New(env, pulsar_message_get_topic_name(this->cMessage));
+  Napi::Env env = info.Env();
+  if (!ValidateCMessage(env)) {
+    return env.Null();
+  }
+  return Napi::String::New(env, pulsar_message_get_topic_name(this->cMessage));
 }
 
 Napi::Value Message::GetProperties(const Napi::CallbackInfo &info) {
-    Napi::Env env = info.Env();
-    if (!ValidateCMessage(env)) {
-        return env.Null();
-    }
-    Napi::Array arr = Napi::Array::New(env);
-    pulsar_string_map_t *cProperties = pulsar_message_get_properties(this->cMessage);
-    int size = pulsar_string_map_size(cProperties);
-    for (int i = 0; i < size; i++) {
-        arr.Set(pulsar_string_map_get_key(cProperties, i), pulsar_string_map_get_value(cProperties, i));
-    }
-    return arr;
+  Napi::Env env = info.Env();
+  if (!ValidateCMessage(env)) {
+    return env.Null();
+  }
+  Napi::Array arr = Napi::Array::New(env);
+  pulsar_string_map_t *cProperties = pulsar_message_get_properties(this->cMessage);
+  int size = pulsar_string_map_size(cProperties);
+  for (int i = 0; i < size; i++) {
+    arr.Set(pulsar_string_map_get_key(cProperties, i), pulsar_string_map_get_value(cProperties, i));
+  }
+  return arr;
 }
 
 Napi::Value Message::GetData(const Napi::CallbackInfo &info) {
-    Napi::Env env = info.Env();
-    if (!ValidateCMessage(env)) {
-        return env.Null();
-    }
-    void *data = const_cast<void *>(pulsar_message_get_data(this->cMessage));
-    size_t size = (size_t)pulsar_message_get_length(this->cMessage);
-    return Napi::Buffer<char>::New(env, (char *)data, size);
+  Napi::Env env = info.Env();
+  if (!ValidateCMessage(env)) {
+    return env.Null();
+  }
+  void *data = const_cast<void *>(pulsar_message_get_data(this->cMessage));
+  size_t size = (size_t)pulsar_message_get_length(this->cMessage);
+  return Napi::Buffer<char>::New(env, (char *)data, size);
 }
 
 Napi::Value Message::GetMessageId(const Napi::CallbackInfo &info) {
-    Napi::Env env = info.Env();
-    if (!ValidateCMessage(env)) {
-        return env.Null();
-    }
-    return MessageId::NewInstanceFromMessage(info, this->cMessage);
+  Napi::Env env = info.Env();
+  if (!ValidateCMessage(env)) {
+    return env.Null();
+  }
+  return MessageId::NewInstanceFromMessage(info, this->cMessage);
 }
 
 Napi::Value Message::GetEventTimestamp(const Napi::CallbackInfo &info) {
-    Napi::Env env = info.Env();
-    if (!ValidateCMessage(env)) {
-        return env.Null();
-    }
-    return Napi::Number::New(env, pulsar_message_get_event_timestamp(this->cMessage));
+  Napi::Env env = info.Env();
+  if (!ValidateCMessage(env)) {
+    return env.Null();
+  }
+  return Napi::Number::New(env, pulsar_message_get_event_timestamp(this->cMessage));
 }
 
 Napi::Value Message::GetPublishTimestamp(const Napi::CallbackInfo &info) {
-    Napi::Env env = info.Env();
-    if (!ValidateCMessage(env)) {
-        return env.Null();
-    }
-    return Napi::Number::New(env, pulsar_message_get_publish_timestamp(this->cMessage));
+  Napi::Env env = info.Env();
+  if (!ValidateCMessage(env)) {
+    return env.Null();
+  }
+  return Napi::Number::New(env, pulsar_message_get_publish_timestamp(this->cMessage));
 }
 
 Napi::Value Message::GetPartitionKey(const Napi::CallbackInfo &info) {
-    Napi::Env env = info.Env();
-    if (!ValidateCMessage(env)) {
-        return env.Null();
-    }
-    return Napi::String::New(env, pulsar_message_get_partitionKey(this->cMessage));
+  Napi::Env env = info.Env();
+  if (!ValidateCMessage(env)) {
+    return env.Null();
+  }
+  return Napi::String::New(env, pulsar_message_get_partitionKey(this->cMessage));
 }
 
 bool Message::ValidateCMessage(Napi::Env env) {
-    if (this->cMessage) {
-        return true;
-    } else {
-        Napi::Error::New(env, "Message has not been built").ThrowAsJavaScriptException();
-        return false;
-    }
+  if (this->cMessage) {
+    return true;
+  } else {
+    Napi::Error::New(env, "Message has not been built").ThrowAsJavaScriptException();
+    return false;
+  }
 }
 
 pulsar_message_t *Message::BuildMessage(Napi::Object conf) {
-    pulsar_message_t *cMessage = pulsar_message_create();
-
-    if (conf.Has(CFG_DATA) && conf.Get(CFG_DATA).IsBuffer()) {
-        Napi::Buffer<char> buf = conf.Get(CFG_DATA).As<Napi::Buffer<char>>();
-        char *data = buf.Data();
-        pulsar_message_set_content(cMessage, data, buf.Length());
-    }
-
-    if (conf.Has(CFG_PROPS) && conf.Get(CFG_PROPS).IsObject()) {
-        Napi::Object propObj = conf.Get(CFG_PROPS).ToObject();
-        Napi::Array arr = propObj.GetPropertyNames();
-        int size = arr.Length();
-        for (int i = 0; i < size; i++) {
-            Napi::String key = arr.Get(i).ToString();
-            Napi::String value = propObj.Get(key).ToString();
-            pulsar_message_set_property(cMessage, key.Utf8Value().c_str(), value.Utf8Value().c_str());
-        }
-    }
-
-    if (conf.Has(CFG_EVENT_TIME) && conf.Get(CFG_EVENT_TIME).IsNumber()) {
-        int64_t eventTimestamp = conf.Get(CFG_EVENT_TIME).ToNumber().Int64Value();
-        if (eventTimestamp >= 0) {
-            pulsar_message_set_event_timestamp(cMessage, eventTimestamp);
-        }
-    }
-
-    if (conf.Has(CFG_SEQUENCE_ID) && conf.Get(CFG_SEQUENCE_ID).IsNumber()) {
-        Napi::Number sequenceId = conf.Get(CFG_SEQUENCE_ID).ToNumber();
-        pulsar_message_set_sequence_id(cMessage, sequenceId.Int64Value());
-    }
-
-    if (conf.Has(CFG_PARTITION_KEY) && conf.Get(CFG_PARTITION_KEY).IsString()) {
-        Napi::String partitionKey = conf.Get(CFG_PARTITION_KEY).ToString();
-        pulsar_message_set_partition_key(cMessage, partitionKey.Utf8Value().c_str());
-    }
-
-    if (conf.Has(CFG_REPL_CLUSTERS) && conf.Get(CFG_REPL_CLUSTERS).IsArray()) {
-        Napi::Array clusters = conf.Get(CFG_REPL_CLUSTERS).As<Napi::Array>();
-        // Empty list means to disable replication
-        int length = clusters.Length();
-        if (length == 0) {
-            pulsar_message_disable_replication(cMessage, 1);
-        } else {
-            char **arr = NewStringArray(length);
-            for (int i = 0; i < length; i++) {
-                SetString(arr, clusters.Get(i).ToString().Utf8Value().c_str(), i);
-            }
-            // TODO: temoporalily commented out unless 2.3.1 which includes interface change of
-            // pulsar_message_set_replication_clusters (#3729) is released
-            // pulsar_message_set_replication_clusters(cMessage, (const char **)arr, length);
-            FreeStringArray(arr, length);
-        }
-    }
-    return cMessage;
+  pulsar_message_t *cMessage = pulsar_message_create();
+
+  if (conf.Has(CFG_DATA) && conf.Get(CFG_DATA).IsBuffer()) {
+    Napi::Buffer<char> buf = conf.Get(CFG_DATA).As<Napi::Buffer<char>>();
+    char *data = buf.Data();
+    pulsar_message_set_content(cMessage, data, buf.Length());
+  }
+
+  if (conf.Has(CFG_PROPS) && conf.Get(CFG_PROPS).IsObject()) {
+    Napi::Object propObj = conf.Get(CFG_PROPS).ToObject();
+    Napi::Array arr = propObj.GetPropertyNames();
+    int size = arr.Length();
+    for (int i = 0; i < size; i++) {
+      Napi::String key = arr.Get(i).ToString();
+      Napi::String value = propObj.Get(key).ToString();
+      pulsar_message_set_property(cMessage, key.Utf8Value().c_str(), value.Utf8Value().c_str());
+    }
+  }
+
+  if (conf.Has(CFG_EVENT_TIME) && conf.Get(CFG_EVENT_TIME).IsNumber()) {
+    int64_t eventTimestamp = conf.Get(CFG_EVENT_TIME).ToNumber().Int64Value();
+    if (eventTimestamp >= 0) {
+      pulsar_message_set_event_timestamp(cMessage, eventTimestamp);
+    }
+  }
+
+  if (conf.Has(CFG_SEQUENCE_ID) && conf.Get(CFG_SEQUENCE_ID).IsNumber()) {
+    Napi::Number sequenceId = conf.Get(CFG_SEQUENCE_ID).ToNumber();
+    pulsar_message_set_sequence_id(cMessage, sequenceId.Int64Value());
+  }
+
+  if (conf.Has(CFG_PARTITION_KEY) && conf.Get(CFG_PARTITION_KEY).IsString()) {
+    Napi::String partitionKey = conf.Get(CFG_PARTITION_KEY).ToString();
+    pulsar_message_set_partition_key(cMessage, partitionKey.Utf8Value().c_str());
+  }
+
+  if (conf.Has(CFG_REPL_CLUSTERS) && conf.Get(CFG_REPL_CLUSTERS).IsArray()) {
+    Napi::Array clusters = conf.Get(CFG_REPL_CLUSTERS).As<Napi::Array>();
+    // Empty list means to disable replication
+    int length = clusters.Length();
+    if (length == 0) {
+      pulsar_message_disable_replication(cMessage, 1);
+    } else {
+      char **arr = NewStringArray(length);
+      for (int i = 0; i < length; i++) {
+        SetString(arr, clusters.Get(i).ToString().Utf8Value().c_str(), i);
+      }
+      // TODO: temoporalily commented out unless 2.3.1 which includes interface change of
+      // pulsar_message_set_replication_clusters (#3729) is released
+      // pulsar_message_set_replication_clusters(cMessage, (const char **)arr, length);
+      FreeStringArray(arr, length);
+    }
+  }
+  return cMessage;
 }
 
 Message::~Message() {
-    if (this->cMessage != nullptr) {
-        pulsar_message_free(this->cMessage);
-    }
+  if (this->cMessage != nullptr) {
+    pulsar_message_free(this->cMessage);
+  }
 }
diff --git a/src/Message.h b/src/Message.h
index 8992f9c..42aa9aa 100644
--- a/src/Message.h
+++ b/src/Message.h
@@ -24,41 +24,41 @@
 #include <pulsar/c/message.h>
 
 class Message : public Napi::ObjectWrap<Message> {
-   public:
-    static Napi::Object Init(Napi::Env env, Napi::Object exports);
-    static Napi::Object NewInstance(Napi::Value arg, pulsar_message_t *cMessage);
-    static pulsar_message_t *BuildMessage(Napi::Object conf);
-    Message(const Napi::CallbackInfo &info);
-    ~Message();
-    pulsar_message_t *GetCMessage();
+ public:
+  static Napi::Object Init(Napi::Env env, Napi::Object exports);
+  static Napi::Object NewInstance(Napi::Value arg, pulsar_message_t *cMessage);
+  static pulsar_message_t *BuildMessage(Napi::Object conf);
+  Message(const Napi::CallbackInfo &info);
+  ~Message();
+  pulsar_message_t *GetCMessage();
 
-   private:
-    static Napi::FunctionReference constructor;
+ private:
+  static Napi::FunctionReference constructor;
 
-    pulsar_message_t *cMessage;
+  pulsar_message_t *cMessage;
 
-    Napi::Value GetTopicName(const Napi::CallbackInfo &info);
-    Napi::Value GetProperties(const Napi::CallbackInfo &info);
-    Napi::Value GetData(const Napi::CallbackInfo &info);
-    Napi::Value GetMessageId(const Napi::CallbackInfo &info);
-    Napi::Value GetPublishTimestamp(const Napi::CallbackInfo &info);
-    Napi::Value GetEventTimestamp(const Napi::CallbackInfo &info);
-    Napi::Value GetPartitionKey(const Napi::CallbackInfo &info);
-    bool ValidateCMessage(Napi::Env env);
+  Napi::Value GetTopicName(const Napi::CallbackInfo &info);
+  Napi::Value GetProperties(const Napi::CallbackInfo &info);
+  Napi::Value GetData(const Napi::CallbackInfo &info);
+  Napi::Value GetMessageId(const Napi::CallbackInfo &info);
+  Napi::Value GetPublishTimestamp(const Napi::CallbackInfo &info);
+  Napi::Value GetEventTimestamp(const Napi::CallbackInfo &info);
+  Napi::Value GetPartitionKey(const Napi::CallbackInfo &info);
+  bool ValidateCMessage(Napi::Env env);
 
-    static char **NewStringArray(int size) { return (char **)calloc(sizeof(char *), size); }
-    static void SetString(char **array, const char *str, int n) {
-        char *copied = (char *)calloc(strlen(str) + 1, sizeof(char));
-        strcpy(copied, str);
-        array[n] = copied;
-    }
-    static void FreeStringArray(char **array, int size) {
-        int i;
-        for (i = 0; i < size; i++) {
-            free(array[i]);
-        }
-        free(array);
+  static char **NewStringArray(int size) { return (char **)calloc(sizeof(char *), size); }
+  static void SetString(char **array, const char *str, int n) {
+    char *copied = (char *)calloc(strlen(str) + 1, sizeof(char));
+    strcpy(copied, str);
+    array[n] = copied;
+  }
+  static void FreeStringArray(char **array, int size) {
+    int i;
+    for (i = 0; i < size; i++) {
+      free(array[i]);
     }
+    free(array);
+  }
 };
 
 #endif
diff --git a/src/MessageId.cc b/src/MessageId.cc
index 3a95f43..a0b520f 100644
--- a/src/MessageId.cc
+++ b/src/MessageId.cc
@@ -24,64 +24,64 @@
 Napi::FunctionReference MessageId::constructor;
 
 Napi::Object MessageId::Init(Napi::Env env, Napi::Object exports) {
-    Napi::HandleScope scope(env);
+  Napi::HandleScope scope(env);
 
-    Napi::Function func = DefineClass(env, "MessageId",
-                                      {
-                                          StaticMethod("earliest", &MessageId::Earliest, napi_static),
-                                          StaticMethod("latest", &MessageId::Latest, napi_static),
-                                          StaticMethod("finalize", &MessageId::Finalize, napi_static),
-                                          InstanceMethod("toString", &MessageId::ToString),
-                                      });
+  Napi::Function func = DefineClass(env, "MessageId",
+                                    {
+                                        StaticMethod("earliest", &MessageId::Earliest, napi_static),
+                                        StaticMethod("latest", &MessageId::Latest, napi_static),
+                                        StaticMethod("finalize", &MessageId::Finalize, napi_static),
+                                        InstanceMethod("toString", &MessageId::ToString),
+                                    });
 
-    constructor = Napi::Persistent(func);
-    constructor.SuppressDestruct();
+  constructor = Napi::Persistent(func);
+  constructor.SuppressDestruct();
 
-    exports.Set("MessageId", func);
-    return exports;
+  exports.Set("MessageId", func);
+  return exports;
 }
 
 MessageId::MessageId(const Napi::CallbackInfo &info) : Napi::ObjectWrap<MessageId>(info) {
-    Napi::Env env = info.Env();
-    Napi::HandleScope scope(env);
+  Napi::Env env = info.Env();
+  Napi::HandleScope scope(env);
 }
 
 Napi::Object MessageId::NewInstanceFromMessage(const Napi::CallbackInfo &info, pulsar_message_t *cMessage) {
-    Napi::Object obj = NewInstance(info[0]);
-    MessageId *msgId = Unwrap(obj);
-    msgId->cMessageId = pulsar_message_get_message_id(cMessage);
-    return obj;
+  Napi::Object obj = NewInstance(info[0]);
+  MessageId *msgId = Unwrap(obj);
+  msgId->cMessageId = pulsar_message_get_message_id(cMessage);
+  return obj;
 }
 
 Napi::Object MessageId::NewInstance(Napi::Value arg) {
-    Napi::Object obj = constructor.New({arg});
-    return obj;
+  Napi::Object obj = constructor.New({arg});
+  return obj;
 }
 
 void MessageId::Finalize(const Napi::CallbackInfo &info) {
-    Napi::Object obj = info[0].As<Napi::Object>();
-    MessageId *msgId = Unwrap(obj);
-    pulsar_message_id_free(msgId->cMessageId);
+  Napi::Object obj = info[0].As<Napi::Object>();
+  MessageId *msgId = Unwrap(obj);
+  pulsar_message_id_free(msgId->cMessageId);
 }
 
 Napi::Value MessageId::Earliest(const Napi::CallbackInfo &info) {
-    Napi::Object obj = NewInstance(info[0]);
-    MessageId *msgId = Unwrap(obj);
-    msgId->cMessageId = (pulsar_message_id_t *)pulsar_message_id_earliest();
-    return obj;
+  Napi::Object obj = NewInstance(info[0]);
+  MessageId *msgId = Unwrap(obj);
+  msgId->cMessageId = (pulsar_message_id_t *)pulsar_message_id_earliest();
+  return obj;
 }
 
 Napi::Value MessageId::Latest(const Napi::CallbackInfo &info) {
-    Napi::Object obj = NewInstance(info[0]);
-    MessageId *msgId = Unwrap(obj);
-    msgId->cMessageId = (pulsar_message_id_t *)pulsar_message_id_latest();
-    return obj;
+  Napi::Object obj = NewInstance(info[0]);
+  MessageId *msgId = Unwrap(obj);
+  msgId->cMessageId = (pulsar_message_id_t *)pulsar_message_id_latest();
+  return obj;
 }
 
 pulsar_message_id_t *MessageId::GetCMessageId() { return this->cMessageId; }
 
 Napi::Value MessageId::ToString(const Napi::CallbackInfo &info) {
-    return Napi::String::New(info.Env(), pulsar_message_id_str(this->cMessageId));
+  return Napi::String::New(info.Env(), pulsar_message_id_str(this->cMessageId));
 }
 
 MessageId::~MessageId() { pulsar_message_id_free(this->cMessageId); }
diff --git a/src/MessageId.h b/src/MessageId.h
index 761f04b..09c59e2 100644
--- a/src/MessageId.h
+++ b/src/MessageId.h
@@ -25,22 +25,22 @@
 #include <pulsar/c/message_id.h>
 
 class MessageId : public Napi::ObjectWrap<MessageId> {
-   public:
-    static Napi::Object Init(Napi::Env env, Napi::Object exports);
-    static Napi::Object NewInstance(Napi::Value arg);
-    static Napi::Object NewInstanceFromMessage(const Napi::CallbackInfo &info, pulsar_message_t *cMessage);
-    static Napi::Value Earliest(const Napi::CallbackInfo &info);
-    static Napi::Value Latest(const Napi::CallbackInfo &info);
-    static void Finalize(const Napi::CallbackInfo &info);
-    MessageId(const Napi::CallbackInfo &info);
-    ~MessageId();
-    pulsar_message_id_t *GetCMessageId();
+ public:
+  static Napi::Object Init(Napi::Env env, Napi::Object exports);
+  static Napi::Object NewInstance(Napi::Value arg);
+  static Napi::Object NewInstanceFromMessage(const Napi::CallbackInfo &info, pulsar_message_t *cMessage);
+  static Napi::Value Earliest(const Napi::CallbackInfo &info);
+  static Napi::Value Latest(const Napi::CallbackInfo &info);
+  static void Finalize(const Napi::CallbackInfo &info);
+  MessageId(const Napi::CallbackInfo &info);
+  ~MessageId();
+  pulsar_message_id_t *GetCMessageId();
 
-   private:
-    static Napi::FunctionReference constructor;
-    pulsar_message_id_t *cMessageId;
+ private:
+  static Napi::FunctionReference constructor;
+  pulsar_message_id_t *cMessageId;
 
-    Napi::Value ToString(const Napi::CallbackInfo &info);
+  Napi::Value ToString(const Napi::CallbackInfo &info);
 };
 
 #endif
diff --git a/src/Producer.cc b/src/Producer.cc
index 9bdb5d4..a19d828 100644
--- a/src/Producer.cc
+++ b/src/Producer.cc
@@ -25,131 +25,127 @@
 Napi::FunctionReference Producer::constructor;
 
 void Producer::Init(Napi::Env env, Napi::Object exports) {
-    Napi::HandleScope scope(env);
+  Napi::HandleScope scope(env);
 
-    Napi::Function func =
-        DefineClass(env, "Producer",
-                    {InstanceMethod("send", &Producer::Send), InstanceMethod("close", &Producer::Close)});
+  Napi::Function func = DefineClass(
+      env, "Producer", {InstanceMethod("send", &Producer::Send), InstanceMethod("close", &Producer::Close)});
 
-    constructor = Napi::Persistent(func);
-    constructor.SuppressDestruct();
+  constructor = Napi::Persistent(func);
+  constructor.SuppressDestruct();
 }
 
 void Producer::SetCProducer(pulsar_producer_t *cProducer) { this->cProducer = cProducer; }
 
 class ProducerNewInstanceWorker : public Napi::AsyncWorker {
-   public:
-    ProducerNewInstanceWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient,
-                              ProducerConfig *producerConfig)
-        : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
-          deferred(deferred),
-          cClient(cClient),
-          producerConfig(producerConfig) {}
-    ~ProducerNewInstanceWorker() {}
-    void Execute() {
-        const std::string &topic = this->producerConfig->GetTopic();
-        if (topic.empty()) {
-            SetError(
-                std::string("Topic is required and must be specified as a string when creating producer"));
-            return;
-        }
-
-        pulsar_result result = pulsar_client_create_producer(
-            this->cClient, topic.c_str(), this->producerConfig->GetCProducerConfig(), &(this->cProducer));
-        delete this->producerConfig;
-        if (result != pulsar_result_Ok) {
-            SetError(std::string("Failed to create producer: ") + pulsar_result_str(result));
-            return;
-        }
-    }
-    void OnOK() {
-        Napi::Object obj = Producer::constructor.New({});
-        Producer *producer = Producer::Unwrap(obj);
-        producer->SetCProducer(this->cProducer);
-        this->deferred.Resolve(obj);
-    }
-    void OnError(const Napi::Error &e) {
-        this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value());
+ public:
+  ProducerNewInstanceWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient,
+                            ProducerConfig *producerConfig)
+      : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
+        deferred(deferred),
+        cClient(cClient),
+        producerConfig(producerConfig) {}
+  ~ProducerNewInstanceWorker() {}
+  void Execute() {
+    const std::string &topic = this->producerConfig->GetTopic();
+    if (topic.empty()) {
+      SetError(std::string("Topic is required and must be specified as a string when creating producer"));
+      return;
     }
 
-   private:
-    Napi::Promise::Deferred deferred;
-    pulsar_client_t *cClient;
-    ProducerConfig *producerConfig;
-    pulsar_producer_t *cProducer;
+    pulsar_result result = pulsar_client_create_producer(
+        this->cClient, topic.c_str(), this->producerConfig->GetCProducerConfig(), &(this->cProducer));
+    delete this->producerConfig;
+    if (result != pulsar_result_Ok) {
+      SetError(std::string("Failed to create producer: ") + pulsar_result_str(result));
+      return;
+    }
+  }
+  void OnOK() {
+    Napi::Object obj = Producer::constructor.New({});
+    Producer *producer = Producer::Unwrap(obj);
+    producer->SetCProducer(this->cProducer);
+    this->deferred.Resolve(obj);
+  }
+  void OnError(const Napi::Error &e) { this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value()); }
+
+ private:
+  Napi::Promise::Deferred deferred;
+  pulsar_client_t *cClient;
+  ProducerConfig *producerConfig;
+  pulsar_producer_t *cProducer;
 };
 
 Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient) {
-    Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
-    Napi::Object config = info[0].As<Napi::Object>();
-    ProducerConfig *producerConfig = new ProducerConfig(config);
-    ProducerNewInstanceWorker *wk = new ProducerNewInstanceWorker(deferred, cClient, producerConfig);
-    wk->Queue();
-    return deferred.Promise();
+  Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
+  Napi::Object config = info[0].As<Napi::Object>();
+  ProducerConfig *producerConfig = new ProducerConfig(config);
+  ProducerNewInstanceWorker *wk = new ProducerNewInstanceWorker(deferred, cClient, producerConfig);
+  wk->Queue();
+  return deferred.Promise();
 }
 
 Producer::Producer(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Producer>(info) {}
 
 class ProducerSendWorker : public Napi::AsyncWorker {
-   public:
-    ProducerSendWorker(const Napi::Promise::Deferred &deferred, pulsar_producer_t *cProducer,
-                       pulsar_message_t *cMessage)
-        : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
-          deferred(deferred),
-          cProducer(cProducer),
-          cMessage(cMessage) {}
-    ~ProducerSendWorker() { pulsar_message_free(this->cMessage); }
-    void Execute() {
-        pulsar_result result = pulsar_producer_send(this->cProducer, this->cMessage);
-        if (result != pulsar_result_Ok) SetError(pulsar_result_str(result));
-    }
-    void OnOK() { this->deferred.Resolve(Env().Null()); }
-    void OnError(const Napi::Error &e) {
-        this->deferred.Reject(
-            Napi::Error::New(Env(), std::string("Failed to send message: ") + e.Message()).Value());
-    }
-
-   private:
-    Napi::Promise::Deferred deferred;
-    pulsar_producer_t *cProducer;
-    pulsar_message_t *cMessage;
+ public:
+  ProducerSendWorker(const Napi::Promise::Deferred &deferred, pulsar_producer_t *cProducer,
+                     pulsar_message_t *cMessage)
+      : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
+        deferred(deferred),
+        cProducer(cProducer),
+        cMessage(cMessage) {}
+  ~ProducerSendWorker() { pulsar_message_free(this->cMessage); }
+  void Execute() {
+    pulsar_result result = pulsar_producer_send(this->cProducer, this->cMessage);
+    if (result != pulsar_result_Ok) SetError(pulsar_result_str(result));
+  }
+  void OnOK() { this->deferred.Resolve(Env().Null()); }
+  void OnError(const Napi::Error &e) {
+    this->deferred.Reject(
+        Napi::Error::New(Env(), std::string("Failed to send message: ") + e.Message()).Value());
+  }
+
+ private:
+  Napi::Promise::Deferred deferred;
+  pulsar_producer_t *cProducer;
+  pulsar_message_t *cMessage;
 };
 
 Napi::Value Producer::Send(const Napi::CallbackInfo &info) {
-    Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
-    pulsar_message_t *cMessage = Message::BuildMessage(info[0].As<Napi::Object>());
-    ProducerSendWorker *wk = new ProducerSendWorker(deferred, this->cProducer, cMessage);
-    wk->Queue();
-    return deferred.Promise();
+  Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
+  pulsar_message_t *cMessage = Message::BuildMessage(info[0].As<Napi::Object>());
+  ProducerSendWorker *wk = new ProducerSendWorker(deferred, this->cProducer, cMessage);
+  wk->Queue();
+  return deferred.Promise();
 }
 
 class ProducerCloseWorker : public Napi::AsyncWorker {
-   public:
-    ProducerCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_producer_t *cProducer)
-        : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
-          deferred(deferred),
-          cProducer(cProducer) {}
-    ~ProducerCloseWorker() {}
-    void Execute() {
-        pulsar_result result = pulsar_producer_close(this->cProducer);
-        if (result != pulsar_result_Ok) SetError(pulsar_result_str(result));
-    }
-    void OnOK() { this->deferred.Resolve(Env().Null()); }
-    void OnError(const Napi::Error &e) {
-        this->deferred.Reject(
-            Napi::Error::New(Env(), std::string("Failed to close producer: ") + e.Message()).Value());
-    }
-
-   private:
-    Napi::Promise::Deferred deferred;
-    pulsar_producer_t *cProducer;
+ public:
+  ProducerCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_producer_t *cProducer)
+      : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
+        deferred(deferred),
+        cProducer(cProducer) {}
+  ~ProducerCloseWorker() {}
+  void Execute() {
+    pulsar_result result = pulsar_producer_close(this->cProducer);
+    if (result != pulsar_result_Ok) SetError(pulsar_result_str(result));
+  }
+  void OnOK() { this->deferred.Resolve(Env().Null()); }
+  void OnError(const Napi::Error &e) {
+    this->deferred.Reject(
+        Napi::Error::New(Env(), std::string("Failed to close producer: ") + e.Message()).Value());
+  }
+
+ private:
+  Napi::Promise::Deferred deferred;
+  pulsar_producer_t *cProducer;
 };
 
 Napi::Value Producer::Close(const Napi::CallbackInfo &info) {
-    Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
-    ProducerCloseWorker *wk = new ProducerCloseWorker(deferred, this->cProducer);
-    wk->Queue();
-    return deferred.Promise();
+  Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
+  ProducerCloseWorker *wk = new ProducerCloseWorker(deferred, this->cProducer);
+  wk->Queue();
+  return deferred.Promise();
 }
 
 Producer::~Producer() { pulsar_producer_free(this->cProducer); }
diff --git a/src/Producer.h b/src/Producer.h
index c758ce4..5d31cfc 100644
--- a/src/Producer.h
+++ b/src/Producer.h
@@ -25,18 +25,18 @@
 #include <pulsar/c/producer.h>
 
 class Producer : public Napi::ObjectWrap<Producer> {
-   public:
-    static void Init(Napi::Env env, Napi::Object exports);
-    static Napi::Value NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient);
-    static Napi::FunctionReference constructor;
-    Producer(const Napi::CallbackInfo &info);
-    ~Producer();
-    void SetCProducer(pulsar_producer_t *cProducer);
+ public:
+  static void Init(Napi::Env env, Napi::Object exports);
+  static Napi::Value NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient);
+  static Napi::FunctionReference constructor;
+  Producer(const Napi::CallbackInfo &info);
+  ~Producer();
+  void SetCProducer(pulsar_producer_t *cProducer);
 
-   private:
-    pulsar_producer_t *cProducer;
-    Napi::Value Send(const Napi::CallbackInfo &info);
-    Napi::Value Close(const Napi::CallbackInfo &info);
+ private:
+  pulsar_producer_t *cProducer;
+  Napi::Value Send(const Napi::CallbackInfo &info);
+  Napi::Value Close(const Napi::CallbackInfo &info);
 };
 
 #endif
diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc
index 9b94dd2..76fd25c 100644
--- a/src/ProducerConfig.cc
+++ b/src/ProducerConfig.cc
@@ -50,106 +50,105 @@ static std::map<std::string, pulsar_compression_type> COMPRESSION_TYPE = {{"Zlib
                                                                           {"LZ4", pulsar_CompressionLZ4}};
 
 ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") {
-    this->cProducerConfig = pulsar_producer_configuration_create();
-
-    if (producerConfig.Has(CFG_TOPIC) && producerConfig.Get(CFG_TOPIC).IsString()) {
-        this->topic = producerConfig.Get(CFG_TOPIC).ToString().Utf8Value();
-    }
-
-    if (producerConfig.Has(CFG_PRODUCER_NAME) && producerConfig.Get(CFG_PRODUCER_NAME).IsString()) {
-        std::string producerName = producerConfig.Get(CFG_PRODUCER_NAME).ToString().Utf8Value();
-        if (!producerName.empty())
-            pulsar_producer_configuration_set_producer_name(this->cProducerConfig, producerName.c_str());
-    }
-
-    if (producerConfig.Has(CFG_SEND_TIMEOUT) && producerConfig.Get(CFG_SEND_TIMEOUT).IsNumber()) {
-        int32_t sendTimeoutMs = producerConfig.Get(CFG_SEND_TIMEOUT).ToNumber().Int32Value();
-        if (sendTimeoutMs > 0) {
-            pulsar_producer_configuration_set_send_timeout(this->cProducerConfig, sendTimeoutMs);
-        }
+  this->cProducerConfig = pulsar_producer_configuration_create();
+
+  if (producerConfig.Has(CFG_TOPIC) && producerConfig.Get(CFG_TOPIC).IsString()) {
+    this->topic = producerConfig.Get(CFG_TOPIC).ToString().Utf8Value();
+  }
+
+  if (producerConfig.Has(CFG_PRODUCER_NAME) && producerConfig.Get(CFG_PRODUCER_NAME).IsString()) {
+    std::string producerName = producerConfig.Get(CFG_PRODUCER_NAME).ToString().Utf8Value();
+    if (!producerName.empty())
+      pulsar_producer_configuration_set_producer_name(this->cProducerConfig, producerName.c_str());
+  }
+
+  if (producerConfig.Has(CFG_SEND_TIMEOUT) && producerConfig.Get(CFG_SEND_TIMEOUT).IsNumber()) {
+    int32_t sendTimeoutMs = producerConfig.Get(CFG_SEND_TIMEOUT).ToNumber().Int32Value();
+    if (sendTimeoutMs > 0) {
+      pulsar_producer_configuration_set_send_timeout(this->cProducerConfig, sendTimeoutMs);
     }
+  }
 
-    if (producerConfig.Has(CFG_INIT_SEQUENCE_ID) && producerConfig.Get(CFG_INIT_SEQUENCE_ID).IsNumber()) {
-        int64_t initialSequenceId = producerConfig.Get(CFG_INIT_SEQUENCE_ID).ToNumber().Int64Value();
-        pulsar_producer_configuration_set_initial_sequence_id(this->cProducerConfig, initialSequenceId);
-    }
+  if (producerConfig.Has(CFG_INIT_SEQUENCE_ID) && producerConfig.Get(CFG_INIT_SEQUENCE_ID).IsNumber()) {
+    int64_t initialSequenceId = producerConfig.Get(CFG_INIT_SEQUENCE_ID).ToNumber().Int64Value();
+    pulsar_producer_configuration_set_initial_sequence_id(this->cProducerConfig, initialSequenceId);
+  }
 
-    if (producerConfig.Has(CFG_MAX_PENDING) && producerConfig.Get(CFG_MAX_PENDING).IsNumber()) {
-        int32_t maxPendingMessages = producerConfig.Get(CFG_MAX_PENDING).ToNumber().Int32Value();
-        if (maxPendingMessages > 0) {
-            pulsar_producer_configuration_set_max_pending_messages(this->cProducerConfig, maxPendingMessages);
-        }
+  if (producerConfig.Has(CFG_MAX_PENDING) && producerConfig.Get(CFG_MAX_PENDING).IsNumber()) {
+    int32_t maxPendingMessages = producerConfig.Get(CFG_MAX_PENDING).ToNumber().Int32Value();
+    if (maxPendingMessages > 0) {
+      pulsar_producer_configuration_set_max_pending_messages(this->cProducerConfig, maxPendingMessages);
     }
-
-    if (producerConfig.Has(CFG_MAX_PENDING_ACROSS_PARTITIONS) &&
-        producerConfig.Get(CFG_MAX_PENDING_ACROSS_PARTITIONS).IsNumber()) {
-        int32_t maxPendingMessagesAcrossPartitions =
-            producerConfig.Get(CFG_MAX_PENDING_ACROSS_PARTITIONS).ToNumber().Int32Value();
-        if (maxPendingMessagesAcrossPartitions > 0) {
-            pulsar_producer_configuration_set_max_pending_messages(this->cProducerConfig,
-                                                                   maxPendingMessagesAcrossPartitions);
-        }
+  }
+
+  if (producerConfig.Has(CFG_MAX_PENDING_ACROSS_PARTITIONS) &&
+      producerConfig.Get(CFG_MAX_PENDING_ACROSS_PARTITIONS).IsNumber()) {
+    int32_t maxPendingMessagesAcrossPartitions =
+        producerConfig.Get(CFG_MAX_PENDING_ACROSS_PARTITIONS).ToNumber().Int32Value();
+    if (maxPendingMessagesAcrossPartitions > 0) {
+      pulsar_producer_configuration_set_max_pending_messages(this->cProducerConfig,
+                                                             maxPendingMessagesAcrossPartitions);
     }
-
-    if (producerConfig.Has(CFG_BLOCK_IF_QUEUE_FULL) &&
-        producerConfig.Get(CFG_BLOCK_IF_QUEUE_FULL).IsBoolean()) {
-        bool blockIfQueueFull = producerConfig.Get(CFG_BLOCK_IF_QUEUE_FULL).ToBoolean().Value();
-        pulsar_producer_configuration_set_block_if_queue_full(this->cProducerConfig, blockIfQueueFull);
-    }
-
-    if (producerConfig.Has(CFG_ROUTING_MODE) && producerConfig.Get(CFG_ROUTING_MODE).IsString()) {
-        std::string messageRoutingMode = producerConfig.Get(CFG_ROUTING_MODE).ToString().Utf8Value();
-        if (MESSAGE_ROUTING_MODE.count(messageRoutingMode))
-            pulsar_producer_configuration_set_partitions_routing_mode(
-                this->cProducerConfig, MESSAGE_ROUTING_MODE.at(messageRoutingMode));
+  }
+
+  if (producerConfig.Has(CFG_BLOCK_IF_QUEUE_FULL) &&
+      producerConfig.Get(CFG_BLOCK_IF_QUEUE_FULL).IsBoolean()) {
+    bool blockIfQueueFull = producerConfig.Get(CFG_BLOCK_IF_QUEUE_FULL).ToBoolean().Value();
+    pulsar_producer_configuration_set_block_if_queue_full(this->cProducerConfig, blockIfQueueFull);
+  }
+
+  if (producerConfig.Has(CFG_ROUTING_MODE) && producerConfig.Get(CFG_ROUTING_MODE).IsString()) {
+    std::string messageRoutingMode = producerConfig.Get(CFG_ROUTING_MODE).ToString().Utf8Value();
+    if (MESSAGE_ROUTING_MODE.count(messageRoutingMode))
+      pulsar_producer_configuration_set_partitions_routing_mode(this->cProducerConfig,
+                                                                MESSAGE_ROUTING_MODE.at(messageRoutingMode));
+  }
+
+  if (producerConfig.Has(CFG_HASH_SCHEME) && producerConfig.Get(CFG_HASH_SCHEME).IsString()) {
+    std::string hashingScheme = producerConfig.Get(CFG_HASH_SCHEME).ToString().Utf8Value();
+    if (HASHING_SCHEME.count(hashingScheme))
+      pulsar_producer_configuration_set_hashing_scheme(this->cProducerConfig,
+                                                       HASHING_SCHEME.at(hashingScheme));
+  }
+
+  if (producerConfig.Has(CFG_COMPRESS_TYPE) && producerConfig.Get(CFG_COMPRESS_TYPE).IsString()) {
+    std::string compressionType = producerConfig.Get(CFG_COMPRESS_TYPE).ToString().Utf8Value();
+    if (COMPRESSION_TYPE.count(compressionType))
+      pulsar_producer_configuration_set_compression_type(this->cProducerConfig,
+                                                         COMPRESSION_TYPE.at(compressionType));
+  }
+
+  if (producerConfig.Has(CFG_BATCH_ENABLED) && producerConfig.Get(CFG_BATCH_ENABLED).IsBoolean()) {
+    bool batchingEnabled = producerConfig.Get(CFG_BATCH_ENABLED).ToBoolean().Value();
+    pulsar_producer_configuration_set_batching_enabled(this->cProducerConfig, batchingEnabled);
+  }
+
+  if (producerConfig.Has(CFG_BATCH_MAX_DELAY) && producerConfig.Get(CFG_BATCH_MAX_DELAY).IsNumber()) {
+    int64_t batchingMaxPublishDelayMs = producerConfig.Get(CFG_BATCH_MAX_DELAY).ToNumber().Int64Value();
+    if (batchingMaxPublishDelayMs > 0) {
+      pulsar_producer_configuration_set_batching_max_publish_delay_ms(this->cProducerConfig,
+                                                                      (long)batchingMaxPublishDelayMs);
     }
+  }
 
-    if (producerConfig.Has(CFG_HASH_SCHEME) && producerConfig.Get(CFG_HASH_SCHEME).IsString()) {
-        std::string hashingScheme = producerConfig.Get(CFG_HASH_SCHEME).ToString().Utf8Value();
-        if (HASHING_SCHEME.count(hashingScheme))
-            pulsar_producer_configuration_set_hashing_scheme(this->cProducerConfig,
-                                                             HASHING_SCHEME.at(hashingScheme));
+  if (producerConfig.Has(CFG_BATCH_MAX_MSG) && producerConfig.Get(CFG_BATCH_MAX_MSG).IsNumber()) {
+    uint32_t batchingMaxMessages = producerConfig.Get(CFG_BATCH_MAX_MSG).ToNumber().Uint32Value();
+    if (batchingMaxMessages > 0) {
+      pulsar_producer_configuration_set_batching_max_messages(this->cProducerConfig, batchingMaxMessages);
     }
-
-    if (producerConfig.Has(CFG_COMPRESS_TYPE) && producerConfig.Get(CFG_COMPRESS_TYPE).IsString()) {
-        std::string compressionType = producerConfig.Get(CFG_COMPRESS_TYPE).ToString().Utf8Value();
-        if (COMPRESSION_TYPE.count(compressionType))
-            pulsar_producer_configuration_set_compression_type(this->cProducerConfig,
-                                                               COMPRESSION_TYPE.at(compressionType));
-    }
-
-    if (producerConfig.Has(CFG_BATCH_ENABLED) && producerConfig.Get(CFG_BATCH_ENABLED).IsBoolean()) {
-        bool batchingEnabled = producerConfig.Get(CFG_BATCH_ENABLED).ToBoolean().Value();
-        pulsar_producer_configuration_set_batching_enabled(this->cProducerConfig, batchingEnabled);
-    }
-
-    if (producerConfig.Has(CFG_BATCH_MAX_DELAY) && producerConfig.Get(CFG_BATCH_MAX_DELAY).IsNumber()) {
-        int64_t batchingMaxPublishDelayMs = producerConfig.Get(CFG_BATCH_MAX_DELAY).ToNumber().Int64Value();
-        if (batchingMaxPublishDelayMs > 0) {
-            pulsar_producer_configuration_set_batching_max_publish_delay_ms(this->cProducerConfig,
-                                                                            (long)batchingMaxPublishDelayMs);
-        }
-    }
-
-    if (producerConfig.Has(CFG_BATCH_MAX_MSG) && producerConfig.Get(CFG_BATCH_MAX_MSG).IsNumber()) {
-        uint32_t batchingMaxMessages = producerConfig.Get(CFG_BATCH_MAX_MSG).ToNumber().Uint32Value();
-        if (batchingMaxMessages > 0) {
-            pulsar_producer_configuration_set_batching_max_messages(this->cProducerConfig,
-                                                                    batchingMaxMessages);
-        }
-    }
-
-    if (producerConfig.Has(CFG_PROPS) && producerConfig.Get(CFG_PROPS).IsObject()) {
-        Napi::Object propObj = producerConfig.Get(CFG_PROPS).ToObject();
-        Napi::Array arr = propObj.GetPropertyNames();
-        int size = arr.Length();
-        for (int i = 0; i < size; i++) {
-            Napi::String key = arr.Get(i).ToString();
-            Napi::String value = propObj.Get(key).ToString();
-            pulsar_producer_configuration_set_property(this->cProducerConfig, key.Utf8Value().c_str(),
-                                                       value.Utf8Value().c_str());
-        }
+  }
+
+  if (producerConfig.Has(CFG_PROPS) && producerConfig.Get(CFG_PROPS).IsObject()) {
+    Napi::Object propObj = producerConfig.Get(CFG_PROPS).ToObject();
+    Napi::Array arr = propObj.GetPropertyNames();
+    int size = arr.Length();
+    for (int i = 0; i < size; i++) {
+      Napi::String key = arr.Get(i).ToString();
+      Napi::String value = propObj.Get(key).ToString();
+      pulsar_producer_configuration_set_property(this->cProducerConfig, key.Utf8Value().c_str(),
+                                                 value.Utf8Value().c_str());
     }
+  }
 }
 
 ProducerConfig::~ProducerConfig() { pulsar_producer_configuration_free(this->cProducerConfig); }
diff --git a/src/ProducerConfig.h b/src/ProducerConfig.h
index e2ecad0..c18c813 100644
--- a/src/ProducerConfig.h
+++ b/src/ProducerConfig.h
@@ -24,15 +24,15 @@
 #include <pulsar/c/producer_configuration.h>
 
 class ProducerConfig {
-   public:
-    ProducerConfig(const Napi::Object &producerConfig);
-    ~ProducerConfig();
-    pulsar_producer_configuration_t *GetCProducerConfig();
-    std::string GetTopic();
+ public:
+  ProducerConfig(const Napi::Object &producerConfig);
+  ~ProducerConfig();
+  pulsar_producer_configuration_t *GetCProducerConfig();
+  std::string GetTopic();
 
-   private:
-    pulsar_producer_configuration_t *cProducerConfig;
-    std::string topic;
+ private:
+  pulsar_producer_configuration_t *cProducerConfig;
+  std::string topic;
 };
 
 #endif
diff --git a/src/addon.cc b/src/addon.cc
index abcbb24..9e75d3f 100644
--- a/src/addon.cc
+++ b/src/addon.cc
@@ -25,11 +25,11 @@
 #include <napi.h>
 
 Napi::Object InitAll(Napi::Env env, Napi::Object exports) {
-    Message::Init(env, exports);
-    MessageId::Init(env, exports);
-    Producer::Init(env, exports);
-    Consumer::Init(env, exports);
-    return Client::Init(env, exports);
+  Message::Init(env, exports);
+  MessageId::Init(env, exports);
+  Producer::Init(env, exports);
+  Consumer::Init(env, exports);
+  return Client::Init(env, exports);
 }
 
 NODE_API_MODULE(NODE_GYP_MODULE_NAME, InitAll)