You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by hr...@apache.org on 2021/05/20 06:51:14 UTC

[pulsar-client-node] branch master updated: Support Reader Listener (#153)

This is an automated email from the ASF dual-hosted git repository.

hrsakai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new c261c26  Support Reader Listener (#153)
c261c26 is described below

commit c261c262460f0a090729bf42db883432e42c9052
Author: Yuto Furuta <mz...@gmail.com>
AuthorDate: Thu May 20 15:51:05 2021 +0900

    Support Reader Listener (#153)
    
    * support reader listener
    
    * fix reader
    
    * use createReaderCallback
    
    Co-authored-by: Yuto Furuta <yf...@yahoo-corp.jp>
---
 src/ReaderConfig.h => examples/reader_listener.js |  37 +++---
 src/Reader.cc                                     | 136 +++++++++++++++++-----
 src/Reader.h                                      |   9 +-
 src/ReaderConfig.cc                               |  37 +++++-
 src/ReaderConfig.h                                |   6 +-
 src/{ReaderConfig.h => ReaderListener.h}          |  29 +++--
 tests/end_to_end.test.js                          |  50 ++++++++
 7 files changed, 236 insertions(+), 68 deletions(-)

diff --git a/src/ReaderConfig.h b/examples/reader_listener.js
similarity index 60%
copy from src/ReaderConfig.h
copy to examples/reader_listener.js
index 69fc634..63809a6 100644
--- a/src/ReaderConfig.h
+++ b/examples/reader_listener.js
@@ -17,26 +17,21 @@
  * under the License.
  */
 
-#ifndef READER_CONFIG_H
-#define READER_CONFIG_H
+const Pulsar = require('pulsar-client');
 
-#include <napi.h>
-#include <pulsar/c/reader.h>
-#include <pulsar/c/reader_configuration.h>
-#include <pulsar/c/message_id.h>
+(async () => {
+  // Create a client
+  const client = new Pulsar.Client({
+    serviceUrl: 'pulsar://localhost:6650',
+    operationTimeoutSeconds: 30,
+  });
 
-class ReaderConfig {
- public:
-  ReaderConfig(const Napi::Object &readerConfig);
-  ~ReaderConfig();
-  pulsar_reader_configuration_t *GetCReaderConfig();
-  pulsar_message_id_t *GetCStartMessageId();
-  std::string GetTopic();
-
- private:
-  std::string topic;
-  pulsar_message_id_t *cStartMessageId;
-  pulsar_reader_configuration_t *cReaderConfig;
-};
-
-#endif
+  // Create a reader
+  const reader = await client.createReader({
+    topic: 'persistent://public/default/my-topic',
+    startMessageId: Pulsar.MessageId.latest(),
+    listener: (msg, reader) => {
+      console.log(msg.getData().toString());
+    },
+  });
+})();
diff --git a/src/Reader.cc b/src/Reader.cc
index 8118628..885431f 100644
--- a/src/Reader.cc
+++ b/src/Reader.cc
@@ -22,6 +22,8 @@
 #include "ReaderConfig.h"
 #include <pulsar/c/result.h>
 #include <pulsar/c/reader.h>
+#include <atomic>
+#include <thread>
 
 Napi::FunctionReference Reader::constructor;
 
@@ -39,44 +41,86 @@ void Reader::Init(Napi::Env env, Napi::Object exports) {
   constructor.SuppressDestruct();
 }
 
-void Reader::SetCReader(pulsar_reader_t *cReader) { this->cReader = cReader; }
+struct ReaderListenerProxyData {
+  pulsar_message_t *cMessage;
+  Reader *reader;
+
+  ReaderListenerProxyData(pulsar_message_t *cMessage, Reader *reader) : cMessage(cMessage), reader(reader) {}
+};
+
+void ReaderListenerProxy(Napi::Env env, Napi::Function jsCallback, ReaderListenerProxyData *data) {
+  Napi::Object msg = Message::NewInstance({}, data->cMessage);
+  Reader *reader = data->reader;
+  delete data;
+
+  jsCallback.Call({msg, reader->Value()});
+}
+
+void ReaderListener(pulsar_reader_t *cReader, pulsar_message_t *cMessage, void *ctx) {
+  ReaderListenerCallback *readerListenerCallback = (ReaderListenerCallback *)ctx;
+  Reader *reader = (Reader *)readerListenerCallback->reader;
+  if (readerListenerCallback->callback.Acquire() != napi_ok) {
+    return;
+  }
+  ReaderListenerProxyData *dataPtr = new ReaderListenerProxyData(cMessage, reader);
+  readerListenerCallback->callback.BlockingCall(dataPtr, ReaderListenerProxy);
+  readerListenerCallback->callback.Release();
+}
 
-Reader::Reader(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Reader>(info) {}
+void Reader::SetCReader(std::shared_ptr<CReaderWrapper> cReader) { this->wrapper = cReader; }
+void Reader::SetListenerCallback(ReaderListenerCallback *listener) {
+  if (listener) {
+    // Maintain reference to reader, so it won't get garbage collected
+    // since, when we have a listener, we don't have to maintain reference to reader (in js code)
+    this->Ref();
+
+    // Pass reader as argument
+    listener->reader = this;
+  }
+
+  this->listener = listener;
+}
+
+Reader::Reader(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Reader>(info), listener(nullptr) {}
 
 class ReaderNewInstanceWorker : public Napi::AsyncWorker {
  public:
   ReaderNewInstanceWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient,
-                          ReaderConfig *readerConfig)
+                          ReaderConfig *readerConfig, std::shared_ptr<CReaderWrapper> readerWrapper)
       : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
         deferred(deferred),
         cClient(cClient),
-        readerConfig(readerConfig) {}
+        readerConfig(readerConfig),
+        readerWrapper(readerWrapper),
+        done(false) {}
   ~ReaderNewInstanceWorker() {}
   void Execute() {
     const std::string &topic = this->readerConfig->GetTopic();
     if (topic.empty()) {
-      SetError(std::string("Topic is required and must be specified as a string when creating reader"));
+      std::string msg("Topic is required and must be specified as a string when creating reader");
+      SetError(msg);
       return;
     }
     if (this->readerConfig->GetCStartMessageId() == nullptr) {
-      SetError(std::string(
-          "StartMessageId is required and must be specified as a MessageId object when creating reader"));
+      std::string msg(
+          "StartMessageId is required and must be specified as a MessageId object when creating reader");
+      SetError(msg);
       return;
     }
 
-    pulsar_result result =
-        pulsar_client_create_reader(this->cClient, topic.c_str(), this->readerConfig->GetCStartMessageId(),
-                                    this->readerConfig->GetCReaderConfig(), &(this->cReader));
-    delete this->readerConfig;
-    if (result != pulsar_result_Ok) {
-      SetError(std::string("Failed to create reader: ") + pulsar_result_str(result));
-      return;
+    pulsar_client_create_reader_async(this->cClient, topic.c_str(), this->readerConfig->GetCStartMessageId(),
+                                      this->readerConfig->GetCReaderConfig(),
+                                      &ReaderNewInstanceWorker::createReaderCallback, (void *)this);
+
+    while (!done) {
+      std::this_thread::yield();
     }
   }
   void OnOK() {
     Napi::Object obj = Reader::constructor.New({});
     Reader *reader = Reader::Unwrap(obj);
-    reader->SetCReader(this->cReader);
+    reader->SetCReader(this->readerWrapper);
+    reader->SetListenerCallback(this->listener);
     this->deferred.Resolve(obj);
   }
   void OnError(const Napi::Error &e) { this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value()); }
@@ -84,15 +128,33 @@ class ReaderNewInstanceWorker : public Napi::AsyncWorker {
  private:
   Napi::Promise::Deferred deferred;
   pulsar_client_t *cClient;
-  ReaderConfig *readerConfig;
   pulsar_reader_t *cReader;
+  ReaderConfig *readerConfig;
+  ReaderListenerCallback *listener;
+  std::shared_ptr<CReaderWrapper> readerWrapper;
+  std::atomic<bool> done;
+  static void createReaderCallback(pulsar_result result, pulsar_reader_t *reader, void *ctx) {
+    ReaderNewInstanceWorker *worker = (ReaderNewInstanceWorker *)ctx;
+    if (result != pulsar_result_Ok) {
+      worker->SetError(std::string("Failed to create reader: ") + pulsar_result_str(result));
+    } else {
+      worker->readerWrapper->cReader = reader;
+      worker->listener = worker->readerConfig->GetListenerCallback();
+    }
+
+    delete worker->readerConfig;
+    worker->done = true;
+  }
 };
 
 Napi::Value Reader::NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient) {
   Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
   Napi::Object config = info[0].As<Napi::Object>();
-  ReaderConfig *readerConfig = new ReaderConfig(config);
-  ReaderNewInstanceWorker *wk = new ReaderNewInstanceWorker(deferred, cClient, readerConfig);
+
+  std::shared_ptr<CReaderWrapper> readerWrapper = std::make_shared<CReaderWrapper>();
+
+  ReaderConfig *readerConfig = new ReaderConfig(config, readerWrapper, &ReaderListener);
+  ReaderNewInstanceWorker *wk = new ReaderNewInstanceWorker(deferred, cClient, readerConfig, readerWrapper);
   wk->Queue();
   return deferred.Promise();
 }
@@ -133,11 +195,12 @@ class ReaderReadNextWorker : public Napi::AsyncWorker {
 Napi::Value Reader::ReadNext(const Napi::CallbackInfo &info) {
   Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
   if (info[0].IsUndefined()) {
-    ReaderReadNextWorker *wk = new ReaderReadNextWorker(deferred, this->cReader);
+    ReaderReadNextWorker *wk = new ReaderReadNextWorker(deferred, this->wrapper->cReader);
     wk->Queue();
   } else {
     Napi::Number timeout = info[0].As<Napi::Object>().ToNumber();
-    ReaderReadNextWorker *wk = new ReaderReadNextWorker(deferred, this->cReader, timeout.Int64Value());
+    ReaderReadNextWorker *wk =
+        new ReaderReadNextWorker(deferred, this->wrapper->cReader, timeout.Int64Value());
     wk->Queue();
   }
   return deferred.Promise();
@@ -145,7 +208,7 @@ Napi::Value Reader::ReadNext(const Napi::CallbackInfo &info) {
 
 Napi::Value Reader::HasNext(const Napi::CallbackInfo &info) {
   int value = 0;
-  pulsar_result result = pulsar_reader_has_message_available(this->cReader, &value);
+  pulsar_result result = pulsar_reader_has_message_available(this->wrapper->cReader, &value);
   if (result != pulsar_result_Ok) {
     Napi::Error::New(info.Env(), "Failed to check if next message is available").ThrowAsJavaScriptException();
     return Napi::Boolean::New(info.Env(), false);
@@ -158,16 +221,20 @@ Napi::Value Reader::HasNext(const Napi::CallbackInfo &info) {
 
 class ReaderCloseWorker : public Napi::AsyncWorker {
  public:
-  ReaderCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_reader_t *cReader)
+  ReaderCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_reader_t *cReader, Reader *reader)
       : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
         deferred(deferred),
-        cReader(cReader) {}
+        cReader(cReader),
+        reader(reader) {}
   ~ReaderCloseWorker() {}
   void Execute() {
     pulsar_result result = pulsar_reader_close(this->cReader);
     if (result != pulsar_result_Ok) SetError(pulsar_result_str(result));
   }
-  void OnOK() { this->deferred.Resolve(Env().Null()); }
+  void OnOK() {
+    this->reader->Cleanup();
+    this->deferred.Resolve(Env().Null());
+  }
   void OnError(const Napi::Error &e) {
     this->deferred.Reject(
         Napi::Error::New(Env(), std::string("Failed to close reader: ") + e.Message()).Value());
@@ -176,13 +243,30 @@ class ReaderCloseWorker : public Napi::AsyncWorker {
  private:
   Napi::Promise::Deferred deferred;
   pulsar_reader_t *cReader;
+  Reader *reader;
 };
 
 Napi::Value Reader::Close(const Napi::CallbackInfo &info) {
   Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
-  ReaderCloseWorker *wk = new ReaderCloseWorker(deferred, this->cReader);
+  ReaderCloseWorker *wk = new ReaderCloseWorker(deferred, this->wrapper->cReader, this);
   wk->Queue();
   return deferred.Promise();
 }
 
-Reader::~Reader() { pulsar_reader_free(this->cReader); }
+void Reader::Cleanup() {
+  if (this->listener) {
+    this->CleanupListener();
+  }
+}
+
+void Reader::CleanupListener() {
+  this->Unref();
+  this->listener->callback.Release();
+  this->listener = nullptr;
+}
+
+Reader::~Reader() {
+  if (this->listener) {
+    this->CleanupListener();
+  }
+}
diff --git a/src/Reader.h b/src/Reader.h
index 787b732..8242fa9 100644
--- a/src/Reader.h
+++ b/src/Reader.h
@@ -22,6 +22,7 @@
 
 #include <napi.h>
 #include <pulsar/c/client.h>
+#include "ReaderConfig.h"
 
 class Reader : public Napi::ObjectWrap<Reader> {
  public:
@@ -30,14 +31,18 @@ class Reader : public Napi::ObjectWrap<Reader> {
   static Napi::FunctionReference constructor;
   Reader(const Napi::CallbackInfo &info);
   ~Reader();
-  void SetCReader(pulsar_reader_t *cReader);
+  void SetCReader(std::shared_ptr<CReaderWrapper> cReader);
+  void SetListenerCallback(ReaderListenerCallback *listener);
+  void Cleanup();
 
  private:
-  pulsar_reader_t *cReader;
+  std::shared_ptr<CReaderWrapper> wrapper;
+  ReaderListenerCallback *listener;
 
   Napi::Value ReadNext(const Napi::CallbackInfo &info);
   Napi::Value HasNext(const Napi::CallbackInfo &info);
   Napi::Value Close(const Napi::CallbackInfo &info);
+  void CleanupListener();
 };
 
 #endif
diff --git a/src/ReaderConfig.cc b/src/ReaderConfig.cc
index 040d98f..dd027b8 100644
--- a/src/ReaderConfig.cc
+++ b/src/ReaderConfig.cc
@@ -27,8 +27,13 @@ static const std::string CFG_RECV_QUEUE = "receiverQueueSize";
 static const std::string CFG_READER_NAME = "readerName";
 static const std::string CFG_SUBSCRIPTION_ROLE_PREFIX = "subscriptionRolePrefix";
 static const std::string CFG_READ_COMPACTED = "readCompacted";
+static const std::string CFG_LISTENER = "listener";
 
-ReaderConfig::ReaderConfig(const Napi::Object &readerConfig) : topic(""), cStartMessageId(NULL) {
+void FinalizeListenerCallback(Napi::Env env, ReaderListenerCallback *cb, void *) { delete cb; }
+
+ReaderConfig::ReaderConfig(const Napi::Object &readerConfig, std::shared_ptr<CReaderWrapper> readerWrapper,
+                           pulsar_reader_listener readerListener)
+    : topic(""), cStartMessageId(NULL), listener(nullptr) {
   this->cReaderConfig = pulsar_reader_configuration_create();
 
   if (readerConfig.Has(CFG_TOPIC) && readerConfig.Get(CFG_TOPIC).IsString()) {
@@ -67,12 +72,40 @@ ReaderConfig::ReaderConfig(const Napi::Object &readerConfig) : topic(""), cStart
       pulsar_reader_configuration_set_read_compacted(this->cReaderConfig, 1);
     }
   }
+
+  if (readerConfig.Has(CFG_LISTENER) && readerConfig.Get(CFG_LISTENER).IsFunction()) {
+    this->listener = new ReaderListenerCallback();
+    Napi::ThreadSafeFunction callback = Napi::ThreadSafeFunction::New(
+        readerConfig.Env(), readerConfig.Get(CFG_LISTENER).As<Napi::Function>(), "Reader Listener Callback",
+        1, 1, (void *)NULL, FinalizeListenerCallback, listener);
+    this->listener->callback = std::move(callback);
+    pulsar_reader_configuration_set_reader_listener(this->cReaderConfig, readerListener, this->listener);
+  }
 }
 
-ReaderConfig::~ReaderConfig() { pulsar_reader_configuration_free(this->cReaderConfig); }
+ReaderConfig::~ReaderConfig() {
+  pulsar_reader_configuration_free(this->cReaderConfig);
+  if (this->listener) {
+    this->listener->callback.Release();
+  }
+}
 
 pulsar_reader_configuration_t *ReaderConfig::GetCReaderConfig() { return this->cReaderConfig; }
 
 std::string ReaderConfig::GetTopic() { return this->topic; }
 
 pulsar_message_id_t *ReaderConfig::GetCStartMessageId() { return this->cStartMessageId; }
+
+ReaderListenerCallback *ReaderConfig::GetListenerCallback() {
+  ReaderListenerCallback *cb = this->listener;
+  this->listener = nullptr;
+  return cb;
+}
+
+CReaderWrapper::CReaderWrapper() : cReader(nullptr) {}
+
+CReaderWrapper::~CReaderWrapper() {
+  if (this->cReader) {
+    pulsar_reader_free(this->cReader);
+  }
+}
diff --git a/src/ReaderConfig.h b/src/ReaderConfig.h
index 69fc634..0dacd35 100644
--- a/src/ReaderConfig.h
+++ b/src/ReaderConfig.h
@@ -24,19 +24,23 @@
 #include <pulsar/c/reader.h>
 #include <pulsar/c/reader_configuration.h>
 #include <pulsar/c/message_id.h>
+#include "ReaderListener.h"
 
 class ReaderConfig {
  public:
-  ReaderConfig(const Napi::Object &readerConfig);
+  ReaderConfig(const Napi::Object &readerConfig, std::shared_ptr<CReaderWrapper> readerWrapper,
+               pulsar_reader_listener readerListener);
   ~ReaderConfig();
   pulsar_reader_configuration_t *GetCReaderConfig();
   pulsar_message_id_t *GetCStartMessageId();
   std::string GetTopic();
+  ReaderListenerCallback *GetListenerCallback();
 
  private:
   std::string topic;
   pulsar_message_id_t *cStartMessageId;
   pulsar_reader_configuration_t *cReaderConfig;
+  ReaderListenerCallback *listener;
 };
 
 #endif
diff --git a/src/ReaderConfig.h b/src/ReaderListener.h
similarity index 62%
copy from src/ReaderConfig.h
copy to src/ReaderListener.h
index 69fc634..a8ff255 100644
--- a/src/ReaderConfig.h
+++ b/src/ReaderListener.h
@@ -17,26 +17,23 @@
  * under the License.
  */
 
-#ifndef READER_CONFIG_H
-#define READER_CONFIG_H
+#ifndef READER_LISTENER_H
+#define READER_LISTENER_H
 
 #include <napi.h>
-#include <pulsar/c/reader.h>
-#include <pulsar/c/reader_configuration.h>
-#include <pulsar/c/message_id.h>
+#include <pulsar/c/client.h>
 
-class ReaderConfig {
- public:
-  ReaderConfig(const Napi::Object &readerConfig);
-  ~ReaderConfig();
-  pulsar_reader_configuration_t *GetCReaderConfig();
-  pulsar_message_id_t *GetCStartMessageId();
-  std::string GetTopic();
+struct CReaderWrapper {
+  pulsar_reader_t *cReader;
+  CReaderWrapper();
+  ~CReaderWrapper();
+};
+
+struct ReaderListenerCallback {
+  Napi::ThreadSafeFunction callback;
 
- private:
-  std::string topic;
-  pulsar_message_id_t *cStartMessageId;
-  pulsar_reader_configuration_t *cReaderConfig;
+  // Using reader as void* since the ReaderListenerCallback is shared between Config and Reader.
+  void *reader;
 };
 
 #endif
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index 4acf808..46364e0 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -213,6 +213,56 @@ const Pulsar = require('../index.js');
       await client.close();
     });
 
+    test('Produce/Read Listener', async () => {
+      const client = new Pulsar.Client({
+        serviceUrl: 'pulsar://localhost:6650',
+        operationTimeoutSeconds: 30,
+      });
+
+      const topic = 'persistent://public/default/produce-read-listener';
+      const producer = await client.createProducer({
+        topic,
+        sendTimeoutMs: 30000,
+        batchingEnabled: true,
+      });
+      expect(producer).not.toBeNull();
+
+      let finish;
+      const results = [];
+      const finishPromise = new Promise((resolve) => {
+        finish = resolve;
+      });
+
+      const reader = await client.createReader({
+        topic,
+        startMessageId: Pulsar.MessageId.latest(),
+        listener: (message) => {
+          const data = message.getData().toString();
+          results.push(data);
+          if (results.length === 10) finish();
+        },
+      });
+
+      expect(reader).not.toBeNull();
+
+      const messages = [];
+      for (let i = 0; i < 10; i += 1) {
+        const msg = `my-message-${i}`;
+        producer.send({
+          data: Buffer.from(msg),
+        });
+        messages.push(msg);
+      }
+      await producer.flush();
+
+      await finishPromise;
+      expect(lodash.difference(messages, results)).toEqual([]);
+
+      await producer.close();
+      await reader.close();
+      await client.close();
+    });
+
     test('acknowledgeCumulative', async () => {
       const client = new Pulsar.Client({
         serviceUrl: 'pulsar://localhost:6650',