You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by hr...@apache.org on 2020/03/19 05:16:29 UTC
[pulsar-client-node] branch master updated: Add support for passing
`log` function when creating client (#82)
This is an automated email from the ASF dual-hosted git repository.
hrsakai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push:
new 2c53a34 Add support for passing `log` function when creating client (#82)
2c53a34 is described below
commit 2c53a34280748b14535b9ac5c0fa0a2332130178
Author: Yosi Attias <yo...@gmail.com>
AuthorDate: Thu Mar 19 07:16:18 2020 +0200
Add support for passing `log` function when creating client (#82)
```
var client = new Pulsar.Client({
log: function(level /* typescript enum */, file, line, message) {
// log with console.log or other logging implementation
}
})
```
If no log function is passed, it will use default logging.
---
src/Client.cc | 44 ++++++++++++++++++++++++++++++++++++++++++--
src/Client.h | 18 ++++++++++++++++++
2 files changed, 60 insertions(+), 2 deletions(-)
diff --git a/src/Client.cc b/src/Client.cc
index 02fa2f5..1893af2 100644
--- a/src/Client.cc
+++ b/src/Client.cc
@@ -38,6 +38,7 @@ static const std::string CFG_TLS_TRUST_CERT = "tlsTrustCertsFilePath";
static const std::string CFG_TLS_VALIDATE_HOSTNAME = "tlsValidateHostname";
static const std::string CFG_TLS_ALLOW_INSECURE = "tlsAllowInsecureConnection";
static const std::string CFG_STATS_INTERVAL = "statsIntervalInSeconds";
+static const std::string CFG_LOG = "log";
Napi::FunctionReference Client::constructor;
@@ -73,6 +74,17 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Client>(info)
pulsar_client_configuration_t *cClientConfig = pulsar_client_configuration_create();
+ if (clientConfig.Has(CFG_LOG) && clientConfig.Get(CFG_LOG).IsFunction()) {
+ Napi::ThreadSafeFunction logFunction = Napi::ThreadSafeFunction::New(
+ env, clientConfig.Get(CFG_LOG).As<Napi::Function>(), "Pulsar Logging", 0, 1);
+ this->logCallback = new LogCallback();
+ this->logCallback->callback = logFunction;
+
+ pulsar_client_configuration_set_logger(cClientConfig, &LogMessage, this->logCallback);
+ } else {
+ this->logCallback = nullptr;
+ }
+
if (clientConfig.Has(CFG_AUTH) && clientConfig.Get(CFG_AUTH).IsObject()) {
Napi::Object obj = clientConfig.Get(CFG_AUTH).ToObject();
if (obj.Has(CFG_AUTH_PROP) && obj.Get(CFG_AUTH_PROP).IsObject()) {
@@ -141,12 +153,17 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Client>(info)
pulsar_client_configuration_free(cClientConfig);
}
-Client::~Client() { pulsar_client_free(this->cClient); }
+Client::~Client() {
+ pulsar_client_free(this->cClient);
+ if (this->logCallback != nullptr) {
+ this->logCallback->callback.Release();
+ this->logCallback = nullptr;
+ }
+}
Napi::Value Client::CreateProducer(const Napi::CallbackInfo &info) {
return Producer::NewInstance(info, this->cClient);
}
-
Napi::Value Client::Subscribe(const Napi::CallbackInfo &info) {
return Consumer::NewInstance(info, this->cClient);
}
@@ -155,6 +172,29 @@ Napi::Value Client::CreateReader(const Napi::CallbackInfo &info) {
return Reader::NewInstance(info, this->cClient);
}
+void LogMessageProxy(Napi::Env env, Napi::Function jsCallback, struct LogMessage *logMessage) {
+ Napi::Number logLevel = Napi::Number::New(env, static_cast<double>(logMessage->level));
+ Napi::String file = Napi::String::New(env, logMessage->file);
+ Napi::Number line = Napi::Number::New(env, static_cast<double>(logMessage->line));
+ Napi::String message = Napi::String::New(env, logMessage->message);
+
+ delete logMessage;
+ jsCallback.Call({logLevel, file, line, message});
+}
+
+void LogMessage(pulsar_logger_level_t level, const char *file, int line, const char *message, void *ctx) {
+ LogCallback *logCallback = (LogCallback *)ctx;
+
+ if (logCallback->callback.Acquire() != napi_ok) {
+ return;
+ }
+
+ struct LogMessage *logMessage = new struct LogMessage(level, std::string(file), line, std::string(message));
+
+ logCallback->callback.BlockingCall(logMessage, LogMessageProxy);
+ logCallback->callback.Release();
+}
+
class ClientCloseWorker : public Napi::AsyncWorker {
public:
ClientCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient)
diff --git a/src/Client.h b/src/Client.h
index 0def389..170b4f0 100644
--- a/src/Client.h
+++ b/src/Client.h
@@ -23,15 +23,33 @@
#include <napi.h>
#include <pulsar/c/client.h>
+struct LogMessage {
+ pulsar_logger_level_t level;
+ std::string file;
+ int line;
+ std::string message;
+
+ LogMessage(pulsar_logger_level_t level, std::string file, int line, std::string message)
+ : level(level), file(file), line(line), message(message) {}
+};
+
+struct LogCallback {
+ Napi::ThreadSafeFunction callback;
+};
+
+void LogMessage(pulsar_logger_level_t level, const char *file, int line, const char *message, void *ctx);
+
class Client : public Napi::ObjectWrap<Client> {
public:
static Napi::Object Init(Napi::Env env, Napi::Object exports);
+
Client(const Napi::CallbackInfo &info);
~Client();
private:
static Napi::FunctionReference constructor;
pulsar_client_t *cClient;
+ LogCallback *logCallback;
Napi::Value CreateProducer(const Napi::CallbackInfo &info);
Napi::Value Subscribe(const Napi::CallbackInfo &info);