You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by hr...@apache.org on 2020/03/19 05:16:29 UTC

[pulsar-client-node] branch master updated: Add support for passing `log` function when creating client (#82)

This is an automated email from the ASF dual-hosted git repository.

hrsakai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c53a34  Add support for passing `log` function when creating client (#82)
2c53a34 is described below

commit 2c53a34280748b14535b9ac5c0fa0a2332130178
Author: Yosi Attias <yo...@gmail.com>
AuthorDate: Thu Mar 19 07:16:18 2020 +0200

    Add support for passing `log` function when creating client (#82)
    
    ```
    var client = new Pulsar.Client({
      log: function(level /* typescript enum */, file, line, message) {
          // log with console.log or other logging implementation
      }
    })
    ```
    
    If no log function is passed, it will use default logging.
---
 src/Client.cc | 44 ++++++++++++++++++++++++++++++++++++++++++--
 src/Client.h  | 18 ++++++++++++++++++
 2 files changed, 60 insertions(+), 2 deletions(-)

diff --git a/src/Client.cc b/src/Client.cc
index 02fa2f5..1893af2 100644
--- a/src/Client.cc
+++ b/src/Client.cc
@@ -38,6 +38,7 @@ static const std::string CFG_TLS_TRUST_CERT = "tlsTrustCertsFilePath";
 static const std::string CFG_TLS_VALIDATE_HOSTNAME = "tlsValidateHostname";
 static const std::string CFG_TLS_ALLOW_INSECURE = "tlsAllowInsecureConnection";
 static const std::string CFG_STATS_INTERVAL = "statsIntervalInSeconds";
+static const std::string CFG_LOG = "log";
 
 Napi::FunctionReference Client::constructor;
 
@@ -73,6 +74,17 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Client>(info)
 
   pulsar_client_configuration_t *cClientConfig = pulsar_client_configuration_create();
 
+  if (clientConfig.Has(CFG_LOG) && clientConfig.Get(CFG_LOG).IsFunction()) {
+    Napi::ThreadSafeFunction logFunction = Napi::ThreadSafeFunction::New(
+        env, clientConfig.Get(CFG_LOG).As<Napi::Function>(), "Pulsar Logging", 0, 1);
+    this->logCallback = new LogCallback();
+    this->logCallback->callback = logFunction;
+
+    pulsar_client_configuration_set_logger(cClientConfig, &LogMessage, this->logCallback);
+  } else {
+    this->logCallback = nullptr;
+  }
+
   if (clientConfig.Has(CFG_AUTH) && clientConfig.Get(CFG_AUTH).IsObject()) {
     Napi::Object obj = clientConfig.Get(CFG_AUTH).ToObject();
     if (obj.Has(CFG_AUTH_PROP) && obj.Get(CFG_AUTH_PROP).IsObject()) {
@@ -141,12 +153,17 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Client>(info)
   pulsar_client_configuration_free(cClientConfig);
 }
 
-Client::~Client() { pulsar_client_free(this->cClient); }
+Client::~Client() {
+  pulsar_client_free(this->cClient);
+  if (this->logCallback != nullptr) {
+    this->logCallback->callback.Release();
+    this->logCallback = nullptr;
+  }
+}
 
 Napi::Value Client::CreateProducer(const Napi::CallbackInfo &info) {
   return Producer::NewInstance(info, this->cClient);
 }
-
 Napi::Value Client::Subscribe(const Napi::CallbackInfo &info) {
   return Consumer::NewInstance(info, this->cClient);
 }
@@ -155,6 +172,29 @@ Napi::Value Client::CreateReader(const Napi::CallbackInfo &info) {
   return Reader::NewInstance(info, this->cClient);
 }
 
+void LogMessageProxy(Napi::Env env, Napi::Function jsCallback, struct LogMessage *logMessage) {
+  Napi::Number logLevel = Napi::Number::New(env, static_cast<double>(logMessage->level));
+  Napi::String file = Napi::String::New(env, logMessage->file);
+  Napi::Number line = Napi::Number::New(env, static_cast<double>(logMessage->line));
+  Napi::String message = Napi::String::New(env, logMessage->message);
+
+  delete logMessage;
+  jsCallback.Call({logLevel, file, line, message});
+}
+
+void LogMessage(pulsar_logger_level_t level, const char *file, int line, const char *message, void *ctx) {
+  LogCallback *logCallback = (LogCallback *)ctx;
+
+  if (logCallback->callback.Acquire() != napi_ok) {
+    return;
+  }
+
+  struct LogMessage *logMessage = new struct LogMessage(level, std::string(file), line, std::string(message));
+
+  logCallback->callback.BlockingCall(logMessage, LogMessageProxy);
+  logCallback->callback.Release();
+}
+
 class ClientCloseWorker : public Napi::AsyncWorker {
  public:
   ClientCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient)
diff --git a/src/Client.h b/src/Client.h
index 0def389..170b4f0 100644
--- a/src/Client.h
+++ b/src/Client.h
@@ -23,15 +23,33 @@
 #include <napi.h>
 #include <pulsar/c/client.h>
 
+struct LogMessage {
+  pulsar_logger_level_t level;
+  std::string file;
+  int line;
+  std::string message;
+
+  LogMessage(pulsar_logger_level_t level, std::string file, int line, std::string message)
+      : level(level), file(file), line(line), message(message) {}
+};
+
+struct LogCallback {
+  Napi::ThreadSafeFunction callback;
+};
+
+void LogMessage(pulsar_logger_level_t level, const char *file, int line, const char *message, void *ctx);
+
 class Client : public Napi::ObjectWrap<Client> {
  public:
   static Napi::Object Init(Napi::Env env, Napi::Object exports);
+
   Client(const Napi::CallbackInfo &info);
   ~Client();
 
  private:
   static Napi::FunctionReference constructor;
   pulsar_client_t *cClient;
+  LogCallback *logCallback;
 
   Napi::Value CreateProducer(const Napi::CallbackInfo &info);
   Napi::Value Subscribe(const Napi::CallbackInfo &info);