You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by hr...@apache.org on 2021/05/20 06:51:14 UTC
[pulsar-client-node] branch master updated: Support Reader Listener
(#153)
This is an automated email from the ASF dual-hosted git repository.
hrsakai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push:
new c261c26 Support Reader Listener (#153)
c261c26 is described below
commit c261c262460f0a090729bf42db883432e42c9052
Author: Yuto Furuta <mz...@gmail.com>
AuthorDate: Thu May 20 15:51:05 2021 +0900
Support Reader Listener (#153)
* support reader listener
* fix reader
* use createReaderCallback
Co-authored-by: Yuto Furuta <yf...@yahoo-corp.jp>
---
src/ReaderConfig.h => examples/reader_listener.js | 37 +++---
src/Reader.cc | 136 +++++++++++++++++-----
src/Reader.h | 9 +-
src/ReaderConfig.cc | 37 +++++-
src/ReaderConfig.h | 6 +-
src/{ReaderConfig.h => ReaderListener.h} | 29 +++--
tests/end_to_end.test.js | 50 ++++++++
7 files changed, 236 insertions(+), 68 deletions(-)
diff --git a/src/ReaderConfig.h b/examples/reader_listener.js
similarity index 60%
copy from src/ReaderConfig.h
copy to examples/reader_listener.js
index 69fc634..63809a6 100644
--- a/src/ReaderConfig.h
+++ b/examples/reader_listener.js
@@ -17,26 +17,21 @@
* under the License.
*/
-#ifndef READER_CONFIG_H
-#define READER_CONFIG_H
+const Pulsar = require('pulsar-client');
-#include <napi.h>
-#include <pulsar/c/reader.h>
-#include <pulsar/c/reader_configuration.h>
-#include <pulsar/c/message_id.h>
+(async () => {
+ // Create a client
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ operationTimeoutSeconds: 30,
+ });
-class ReaderConfig {
- public:
- ReaderConfig(const Napi::Object &readerConfig);
- ~ReaderConfig();
- pulsar_reader_configuration_t *GetCReaderConfig();
- pulsar_message_id_t *GetCStartMessageId();
- std::string GetTopic();
-
- private:
- std::string topic;
- pulsar_message_id_t *cStartMessageId;
- pulsar_reader_configuration_t *cReaderConfig;
-};
-
-#endif
+ // Create a reader
+ const reader = await client.createReader({
+ topic: 'persistent://public/default/my-topic',
+ startMessageId: Pulsar.MessageId.latest(),
+ listener: (msg, reader) => {
+ console.log(msg.getData().toString());
+ },
+ });
+})();
diff --git a/src/Reader.cc b/src/Reader.cc
index 8118628..885431f 100644
--- a/src/Reader.cc
+++ b/src/Reader.cc
@@ -22,6 +22,8 @@
#include "ReaderConfig.h"
#include <pulsar/c/result.h>
#include <pulsar/c/reader.h>
+#include <atomic>
+#include <thread>
Napi::FunctionReference Reader::constructor;
@@ -39,44 +41,86 @@ void Reader::Init(Napi::Env env, Napi::Object exports) {
constructor.SuppressDestruct();
}
-void Reader::SetCReader(pulsar_reader_t *cReader) { this->cReader = cReader; }
+struct ReaderListenerProxyData {
+ pulsar_message_t *cMessage;
+ Reader *reader;
+
+ ReaderListenerProxyData(pulsar_message_t *cMessage, Reader *reader) : cMessage(cMessage), reader(reader) {}
+};
+
+void ReaderListenerProxy(Napi::Env env, Napi::Function jsCallback, ReaderListenerProxyData *data) {
+ Napi::Object msg = Message::NewInstance({}, data->cMessage);
+ Reader *reader = data->reader;
+ delete data;
+
+ jsCallback.Call({msg, reader->Value()});
+}
+
+void ReaderListener(pulsar_reader_t *cReader, pulsar_message_t *cMessage, void *ctx) {
+ ReaderListenerCallback *readerListenerCallback = (ReaderListenerCallback *)ctx;
+ Reader *reader = (Reader *)readerListenerCallback->reader;
+ if (readerListenerCallback->callback.Acquire() != napi_ok) {
+ return;
+ }
+ ReaderListenerProxyData *dataPtr = new ReaderListenerProxyData(cMessage, reader);
+ readerListenerCallback->callback.BlockingCall(dataPtr, ReaderListenerProxy);
+ readerListenerCallback->callback.Release();
+}
-Reader::Reader(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Reader>(info) {}
+void Reader::SetCReader(std::shared_ptr<CReaderWrapper> cReader) { this->wrapper = cReader; }
+void Reader::SetListenerCallback(ReaderListenerCallback *listener) {
+ if (listener) {
+ // Maintain reference to reader, so it won't get garbage collected
+ // since, when we have a listener, we don't have to maintain reference to reader (in js code)
+ this->Ref();
+
+ // Pass reader as argument
+ listener->reader = this;
+ }
+
+ this->listener = listener;
+}
+
+Reader::Reader(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Reader>(info), listener(nullptr) {}
class ReaderNewInstanceWorker : public Napi::AsyncWorker {
public:
ReaderNewInstanceWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient,
- ReaderConfig *readerConfig)
+ ReaderConfig *readerConfig, std::shared_ptr<CReaderWrapper> readerWrapper)
: AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
deferred(deferred),
cClient(cClient),
- readerConfig(readerConfig) {}
+ readerConfig(readerConfig),
+ readerWrapper(readerWrapper),
+ done(false) {}
~ReaderNewInstanceWorker() {}
void Execute() {
const std::string &topic = this->readerConfig->GetTopic();
if (topic.empty()) {
- SetError(std::string("Topic is required and must be specified as a string when creating reader"));
+ std::string msg("Topic is required and must be specified as a string when creating reader");
+ SetError(msg);
return;
}
if (this->readerConfig->GetCStartMessageId() == nullptr) {
- SetError(std::string(
- "StartMessageId is required and must be specified as a MessageId object when creating reader"));
+ std::string msg(
+ "StartMessageId is required and must be specified as a MessageId object when creating reader");
+ SetError(msg);
return;
}
- pulsar_result result =
- pulsar_client_create_reader(this->cClient, topic.c_str(), this->readerConfig->GetCStartMessageId(),
- this->readerConfig->GetCReaderConfig(), &(this->cReader));
- delete this->readerConfig;
- if (result != pulsar_result_Ok) {
- SetError(std::string("Failed to create reader: ") + pulsar_result_str(result));
- return;
+ pulsar_client_create_reader_async(this->cClient, topic.c_str(), this->readerConfig->GetCStartMessageId(),
+ this->readerConfig->GetCReaderConfig(),
+ &ReaderNewInstanceWorker::createReaderCallback, (void *)this);
+
+ while (!done) {
+ std::this_thread::yield();
}
}
void OnOK() {
Napi::Object obj = Reader::constructor.New({});
Reader *reader = Reader::Unwrap(obj);
- reader->SetCReader(this->cReader);
+ reader->SetCReader(this->readerWrapper);
+ reader->SetListenerCallback(this->listener);
this->deferred.Resolve(obj);
}
void OnError(const Napi::Error &e) { this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value()); }
@@ -84,15 +128,33 @@ class ReaderNewInstanceWorker : public Napi::AsyncWorker {
private:
Napi::Promise::Deferred deferred;
pulsar_client_t *cClient;
- ReaderConfig *readerConfig;
pulsar_reader_t *cReader;
+ ReaderConfig *readerConfig;
+ ReaderListenerCallback *listener;
+ std::shared_ptr<CReaderWrapper> readerWrapper;
+ std::atomic<bool> done;
+ static void createReaderCallback(pulsar_result result, pulsar_reader_t *reader, void *ctx) {
+ ReaderNewInstanceWorker *worker = (ReaderNewInstanceWorker *)ctx;
+ if (result != pulsar_result_Ok) {
+ worker->SetError(std::string("Failed to create reader: ") + pulsar_result_str(result));
+ } else {
+ worker->readerWrapper->cReader = reader;
+ worker->listener = worker->readerConfig->GetListenerCallback();
+ }
+
+ delete worker->readerConfig;
+ worker->done = true;
+ }
};
Napi::Value Reader::NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient) {
Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
Napi::Object config = info[0].As<Napi::Object>();
- ReaderConfig *readerConfig = new ReaderConfig(config);
- ReaderNewInstanceWorker *wk = new ReaderNewInstanceWorker(deferred, cClient, readerConfig);
+
+ std::shared_ptr<CReaderWrapper> readerWrapper = std::make_shared<CReaderWrapper>();
+
+ ReaderConfig *readerConfig = new ReaderConfig(config, readerWrapper, &ReaderListener);
+ ReaderNewInstanceWorker *wk = new ReaderNewInstanceWorker(deferred, cClient, readerConfig, readerWrapper);
wk->Queue();
return deferred.Promise();
}
@@ -133,11 +195,12 @@ class ReaderReadNextWorker : public Napi::AsyncWorker {
Napi::Value Reader::ReadNext(const Napi::CallbackInfo &info) {
Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
if (info[0].IsUndefined()) {
- ReaderReadNextWorker *wk = new ReaderReadNextWorker(deferred, this->cReader);
+ ReaderReadNextWorker *wk = new ReaderReadNextWorker(deferred, this->wrapper->cReader);
wk->Queue();
} else {
Napi::Number timeout = info[0].As<Napi::Object>().ToNumber();
- ReaderReadNextWorker *wk = new ReaderReadNextWorker(deferred, this->cReader, timeout.Int64Value());
+ ReaderReadNextWorker *wk =
+ new ReaderReadNextWorker(deferred, this->wrapper->cReader, timeout.Int64Value());
wk->Queue();
}
return deferred.Promise();
@@ -145,7 +208,7 @@ Napi::Value Reader::ReadNext(const Napi::CallbackInfo &info) {
Napi::Value Reader::HasNext(const Napi::CallbackInfo &info) {
int value = 0;
- pulsar_result result = pulsar_reader_has_message_available(this->cReader, &value);
+ pulsar_result result = pulsar_reader_has_message_available(this->wrapper->cReader, &value);
if (result != pulsar_result_Ok) {
Napi::Error::New(info.Env(), "Failed to check if next message is available").ThrowAsJavaScriptException();
return Napi::Boolean::New(info.Env(), false);
@@ -158,16 +221,20 @@ Napi::Value Reader::HasNext(const Napi::CallbackInfo &info) {
class ReaderCloseWorker : public Napi::AsyncWorker {
public:
- ReaderCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_reader_t *cReader)
+ ReaderCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_reader_t *cReader, Reader *reader)
: AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
deferred(deferred),
- cReader(cReader) {}
+ cReader(cReader),
+ reader(reader) {}
~ReaderCloseWorker() {}
void Execute() {
pulsar_result result = pulsar_reader_close(this->cReader);
if (result != pulsar_result_Ok) SetError(pulsar_result_str(result));
}
- void OnOK() { this->deferred.Resolve(Env().Null()); }
+ void OnOK() {
+ this->reader->Cleanup();
+ this->deferred.Resolve(Env().Null());
+ }
void OnError(const Napi::Error &e) {
this->deferred.Reject(
Napi::Error::New(Env(), std::string("Failed to close reader: ") + e.Message()).Value());
@@ -176,13 +243,30 @@ class ReaderCloseWorker : public Napi::AsyncWorker {
private:
Napi::Promise::Deferred deferred;
pulsar_reader_t *cReader;
+ Reader *reader;
};
Napi::Value Reader::Close(const Napi::CallbackInfo &info) {
Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
- ReaderCloseWorker *wk = new ReaderCloseWorker(deferred, this->cReader);
+ ReaderCloseWorker *wk = new ReaderCloseWorker(deferred, this->wrapper->cReader, this);
wk->Queue();
return deferred.Promise();
}
-Reader::~Reader() { pulsar_reader_free(this->cReader); }
+void Reader::Cleanup() {
+ if (this->listener) {
+ this->CleanupListener();
+ }
+}
+
+void Reader::CleanupListener() {
+ this->Unref();
+ this->listener->callback.Release();
+ this->listener = nullptr;
+}
+
+Reader::~Reader() {
+ if (this->listener) {
+ this->CleanupListener();
+ }
+}
diff --git a/src/Reader.h b/src/Reader.h
index 787b732..8242fa9 100644
--- a/src/Reader.h
+++ b/src/Reader.h
@@ -22,6 +22,7 @@
#include <napi.h>
#include <pulsar/c/client.h>
+#include "ReaderConfig.h"
class Reader : public Napi::ObjectWrap<Reader> {
public:
@@ -30,14 +31,18 @@ class Reader : public Napi::ObjectWrap<Reader> {
static Napi::FunctionReference constructor;
Reader(const Napi::CallbackInfo &info);
~Reader();
- void SetCReader(pulsar_reader_t *cReader);
+ void SetCReader(std::shared_ptr<CReaderWrapper> cReader);
+ void SetListenerCallback(ReaderListenerCallback *listener);
+ void Cleanup();
private:
- pulsar_reader_t *cReader;
+ std::shared_ptr<CReaderWrapper> wrapper;
+ ReaderListenerCallback *listener;
Napi::Value ReadNext(const Napi::CallbackInfo &info);
Napi::Value HasNext(const Napi::CallbackInfo &info);
Napi::Value Close(const Napi::CallbackInfo &info);
+ void CleanupListener();
};
#endif
diff --git a/src/ReaderConfig.cc b/src/ReaderConfig.cc
index 040d98f..dd027b8 100644
--- a/src/ReaderConfig.cc
+++ b/src/ReaderConfig.cc
@@ -27,8 +27,13 @@ static const std::string CFG_RECV_QUEUE = "receiverQueueSize";
static const std::string CFG_READER_NAME = "readerName";
static const std::string CFG_SUBSCRIPTION_ROLE_PREFIX = "subscriptionRolePrefix";
static const std::string CFG_READ_COMPACTED = "readCompacted";
+static const std::string CFG_LISTENER = "listener";
-ReaderConfig::ReaderConfig(const Napi::Object &readerConfig) : topic(""), cStartMessageId(NULL) {
+void FinalizeListenerCallback(Napi::Env env, ReaderListenerCallback *cb, void *) { delete cb; }
+
+ReaderConfig::ReaderConfig(const Napi::Object &readerConfig, std::shared_ptr<CReaderWrapper> readerWrapper,
+ pulsar_reader_listener readerListener)
+ : topic(""), cStartMessageId(NULL), listener(nullptr) {
this->cReaderConfig = pulsar_reader_configuration_create();
if (readerConfig.Has(CFG_TOPIC) && readerConfig.Get(CFG_TOPIC).IsString()) {
@@ -67,12 +72,40 @@ ReaderConfig::ReaderConfig(const Napi::Object &readerConfig) : topic(""), cStart
pulsar_reader_configuration_set_read_compacted(this->cReaderConfig, 1);
}
}
+
+ if (readerConfig.Has(CFG_LISTENER) && readerConfig.Get(CFG_LISTENER).IsFunction()) {
+ this->listener = new ReaderListenerCallback();
+ Napi::ThreadSafeFunction callback = Napi::ThreadSafeFunction::New(
+ readerConfig.Env(), readerConfig.Get(CFG_LISTENER).As<Napi::Function>(), "Reader Listener Callback",
+ 1, 1, (void *)NULL, FinalizeListenerCallback, listener);
+ this->listener->callback = std::move(callback);
+ pulsar_reader_configuration_set_reader_listener(this->cReaderConfig, readerListener, this->listener);
+ }
}
-ReaderConfig::~ReaderConfig() { pulsar_reader_configuration_free(this->cReaderConfig); }
+ReaderConfig::~ReaderConfig() {
+ pulsar_reader_configuration_free(this->cReaderConfig);
+ if (this->listener) {
+ this->listener->callback.Release();
+ }
+}
pulsar_reader_configuration_t *ReaderConfig::GetCReaderConfig() { return this->cReaderConfig; }
std::string ReaderConfig::GetTopic() { return this->topic; }
pulsar_message_id_t *ReaderConfig::GetCStartMessageId() { return this->cStartMessageId; }
+
+ReaderListenerCallback *ReaderConfig::GetListenerCallback() {
+ ReaderListenerCallback *cb = this->listener;
+ this->listener = nullptr;
+ return cb;
+}
+
+CReaderWrapper::CReaderWrapper() : cReader(nullptr) {}
+
+CReaderWrapper::~CReaderWrapper() {
+ if (this->cReader) {
+ pulsar_reader_free(this->cReader);
+ }
+}
diff --git a/src/ReaderConfig.h b/src/ReaderConfig.h
index 69fc634..0dacd35 100644
--- a/src/ReaderConfig.h
+++ b/src/ReaderConfig.h
@@ -24,19 +24,23 @@
#include <pulsar/c/reader.h>
#include <pulsar/c/reader_configuration.h>
#include <pulsar/c/message_id.h>
+#include "ReaderListener.h"
class ReaderConfig {
public:
- ReaderConfig(const Napi::Object &readerConfig);
+ ReaderConfig(const Napi::Object &readerConfig, std::shared_ptr<CReaderWrapper> readerWrapper,
+ pulsar_reader_listener readerListener);
~ReaderConfig();
pulsar_reader_configuration_t *GetCReaderConfig();
pulsar_message_id_t *GetCStartMessageId();
std::string GetTopic();
+ ReaderListenerCallback *GetListenerCallback();
private:
std::string topic;
pulsar_message_id_t *cStartMessageId;
pulsar_reader_configuration_t *cReaderConfig;
+ ReaderListenerCallback *listener;
};
#endif
diff --git a/src/ReaderConfig.h b/src/ReaderListener.h
similarity index 62%
copy from src/ReaderConfig.h
copy to src/ReaderListener.h
index 69fc634..a8ff255 100644
--- a/src/ReaderConfig.h
+++ b/src/ReaderListener.h
@@ -17,26 +17,23 @@
* under the License.
*/
-#ifndef READER_CONFIG_H
-#define READER_CONFIG_H
+#ifndef READER_LISTENER_H
+#define READER_LISTENER_H
#include <napi.h>
-#include <pulsar/c/reader.h>
-#include <pulsar/c/reader_configuration.h>
-#include <pulsar/c/message_id.h>
+#include <pulsar/c/client.h>
-class ReaderConfig {
- public:
- ReaderConfig(const Napi::Object &readerConfig);
- ~ReaderConfig();
- pulsar_reader_configuration_t *GetCReaderConfig();
- pulsar_message_id_t *GetCStartMessageId();
- std::string GetTopic();
+struct CReaderWrapper {
+ pulsar_reader_t *cReader;
+ CReaderWrapper();
+ ~CReaderWrapper();
+};
+
+struct ReaderListenerCallback {
+ Napi::ThreadSafeFunction callback;
- private:
- std::string topic;
- pulsar_message_id_t *cStartMessageId;
- pulsar_reader_configuration_t *cReaderConfig;
+ // Using reader as void* since the ReaderListenerCallback is shared between Config and Reader.
+ void *reader;
};
#endif
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index 4acf808..46364e0 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -213,6 +213,56 @@ const Pulsar = require('../index.js');
await client.close();
});
+ test('Produce/Read Listener', async () => {
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ operationTimeoutSeconds: 30,
+ });
+
+ const topic = 'persistent://public/default/produce-read-listener';
+ const producer = await client.createProducer({
+ topic,
+ sendTimeoutMs: 30000,
+ batchingEnabled: true,
+ });
+ expect(producer).not.toBeNull();
+
+ let finish;
+ const results = [];
+ const finishPromise = new Promise((resolve) => {
+ finish = resolve;
+ });
+
+ const reader = await client.createReader({
+ topic,
+ startMessageId: Pulsar.MessageId.latest(),
+ listener: (message) => {
+ const data = message.getData().toString();
+ results.push(data);
+ if (results.length === 10) finish();
+ },
+ });
+
+ expect(reader).not.toBeNull();
+
+ const messages = [];
+ for (let i = 0; i < 10; i += 1) {
+ const msg = `my-message-${i}`;
+ producer.send({
+ data: Buffer.from(msg),
+ });
+ messages.push(msg);
+ }
+ await producer.flush();
+
+ await finishPromise;
+ expect(lodash.difference(messages, results)).toEqual([]);
+
+ await producer.close();
+ await reader.close();
+ await client.close();
+ });
+
test('acknowledgeCumulative', async () => {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',