You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2020/03/30 04:16:40 UTC

[pulsar-client-node] branch master updated: Fix consumer being closed with message listener (#83)

This is an automated email from the ASF dual-hosted git repository.

massakam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ba8ce4  Fix consumer being closed with message listener (#83)
8ba8ce4 is described below

commit 8ba8ce4841947b9a3c2af90a6fe01b495c0d9418
Author: Yosi Attias <yo...@gmail.com>
AuthorDate: Mon Mar 30 07:16:32 2020 +0300

    Fix consumer being closed with message listener (#83)
    
    - Call `Ref()` when we a have message listener, to make sure this reference is kept a live.
    - Pass single consumer instance to message listener instead of re-creating it.
---
 src/Consumer.cc                             | 51 +++++++++++++++++++++++++++--
 src/Consumer.h                              |  1 +
 src/ConsumerConfig.cc                       | 34 ++-----------------
 src/ConsumerConfig.h                        | 17 +++-------
 src/{ConsumerConfig.h => MessageListener.h} | 30 +++--------------
 5 files changed, 62 insertions(+), 71 deletions(-)

diff --git a/src/Consumer.cc b/src/Consumer.cc
index 6b4879e..e2ad7b2 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -45,8 +45,48 @@ void Consumer::Init(Napi::Env env, Napi::Object exports) {
   constructor.SuppressDestruct();
 }
 
+struct MessageListenerProxyData {
+  pulsar_message_t *cMessage;
+  Consumer *consumer;
+
+  MessageListenerProxyData(pulsar_message_t *cMessage, Consumer *consumer)
+      : cMessage(cMessage), consumer(consumer) {}
+};
+
+void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback, MessageListenerProxyData *data) {
+  Napi::Object msg = Message::NewInstance({}, data->cMessage);
+  Consumer *consumer = data->consumer;
+  delete data;
+
+  jsCallback.Call({msg, consumer->Value()});
+}
+
+void MessageListener(pulsar_consumer_t *cConsumer, pulsar_message_t *cMessage, void *ctx) {
+  ListenerCallback *listenerCallback = (ListenerCallback *)ctx;
+
+  Consumer *consumer = (Consumer *)listenerCallback->consumer;
+
+  if (listenerCallback->callback.Acquire() != napi_ok) {
+    return;
+  };
+  MessageListenerProxyData *dataPtr = new MessageListenerProxyData(cMessage, consumer);
+  listenerCallback->callback.BlockingCall(dataPtr, MessageListenerProxy);
+  listenerCallback->callback.Release();
+}
+
 void Consumer::SetCConsumer(std::shared_ptr<CConsumerWrapper> cConsumer) { this->wrapper = cConsumer; }
-void Consumer::SetListenerCallback(ListenerCallback *listener) { this->listener = listener; }
+void Consumer::SetListenerCallback(ListenerCallback *listener) {
+  if (listener) {
+    // Maintain reference to consumer, so it won't get garbage collected
+    // since, when we have a listener, we don't have to maintain reference to consumer (in js code)
+    this->Ref();
+
+    // Pass consumer as argument
+    listener->consumer = this;
+  }
+
+  this->listener = listener;
+}
 
 Consumer::Consumer(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Consumer>(info), listener(nullptr) {}
 
@@ -100,6 +140,7 @@ class ConsumerNewInstanceWorker : public Napi::AsyncWorker {
   void OnOK() {
     Napi::Object obj = Consumer::constructor.New({});
     Consumer *consumer = Consumer::Unwrap(obj);
+
     consumer->SetCConsumer(this->consumerWrapper);
     consumer->SetListenerCallback(this->listener);
     this->deferred.Resolve(obj);
@@ -118,8 +159,10 @@ class ConsumerNewInstanceWorker : public Napi::AsyncWorker {
 Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient) {
   Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
   Napi::Object config = info[0].As<Napi::Object>();
+
   std::shared_ptr<CConsumerWrapper> consumerWrapper = std::make_shared<CConsumerWrapper>();
-  ConsumerConfig *consumerConfig = new ConsumerConfig(config, consumerWrapper);
+
+  ConsumerConfig *consumerConfig = new ConsumerConfig(config, consumerWrapper, &MessageListener);
   ConsumerNewInstanceWorker *wk =
       new ConsumerNewInstanceWorker(deferred, cClient, consumerConfig, consumerWrapper);
   wk->Queue();
@@ -235,6 +278,10 @@ class ConsumerCloseWorker : public Napi::AsyncWorker {
 };
 
 Napi::Value Consumer::Close(const Napi::CallbackInfo &info) {
+  if (this->listener) {
+    this->Unref();
+  }
+
   Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
   ConsumerCloseWorker *wk = new ConsumerCloseWorker(deferred, this->wrapper->cConsumer);
   wk->Queue();
diff --git a/src/Consumer.h b/src/Consumer.h
index 1f909e7..63fb403 100644
--- a/src/Consumer.h
+++ b/src/Consumer.h
@@ -23,6 +23,7 @@
 #include <napi.h>
 #include <pulsar/c/client.h>
 #include "ConsumerConfig.h"
+#include "MessageListener.h"
 
 class Consumer : public Napi::ObjectWrap<Consumer> {
  public:
diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc
index 1f734df..935500b 100644
--- a/src/ConsumerConfig.cc
+++ b/src/ConsumerConfig.cc
@@ -46,38 +46,11 @@ static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
 static const std::map<std::string, initial_position> INIT_POSITION = {
     {"Latest", initial_position_latest}, {"Earliest", initial_position_earliest}};
 
-struct MessageListenerProxyData {
-  std::shared_ptr<CConsumerWrapper> consumerWrapper;
-  pulsar_message_t *cMessage;
-
-  MessageListenerProxyData(std::shared_ptr<CConsumerWrapper> consumerWrapper, pulsar_message_t *cMessage)
-      : consumerWrapper(consumerWrapper), cMessage(cMessage) {}
-};
-
-void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback, MessageListenerProxyData *data) {
-  Napi::Object msg = Message::NewInstance({}, data->cMessage);
-  Napi::Object consumerObj = Consumer::constructor.New({});
-  Consumer *consumer = Consumer::Unwrap(consumerObj);
-  consumer->SetCConsumer(std::move(data->consumerWrapper));
-  delete data;
-  jsCallback.Call({msg, consumerObj});
-}
-
-void MessageListener(pulsar_consumer_t *cConsumer, pulsar_message_t *cMessage, void *ctx) {
-  ListenerCallback *listenerCallback = (ListenerCallback *)ctx;
-  if (listenerCallback->callback.Acquire() != napi_ok) {
-    return;
-  };
-  MessageListenerProxyData *dataPtr =
-      new MessageListenerProxyData(listenerCallback->consumerWrapper, cMessage);
-  listenerCallback->callback.BlockingCall(dataPtr, MessageListenerProxy);
-  listenerCallback->callback.Release();
-}
-
 void FinalizeListenerCallback(Napi::Env env, ListenerCallback *cb, void *) { delete cb; }
 
 ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig,
-                               std::shared_ptr<CConsumerWrapper> consumerWrapper)
+                               std::shared_ptr<CConsumerWrapper> consumerWrapper,
+                               pulsar_message_listener messageListener)
     : topic(""), subscription(""), ackTimeoutMs(0), nAckRedeliverTimeoutMs(60000), listener(nullptr) {
   this->cConsumerConfig = pulsar_consumer_configuration_create();
 
@@ -156,12 +129,11 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig,
 
   if (consumerConfig.Has(CFG_LISTENER) && consumerConfig.Get(CFG_LISTENER).IsFunction()) {
     this->listener = new ListenerCallback();
-    this->listener->consumerWrapper = consumerWrapper;
     Napi::ThreadSafeFunction callback = Napi::ThreadSafeFunction::New(
         consumerConfig.Env(), consumerConfig.Get(CFG_LISTENER).As<Napi::Function>(), "Listener Callback", 1,
         1, (void *)NULL, FinalizeListenerCallback, listener);
     this->listener->callback = std::move(callback);
-    pulsar_consumer_configuration_set_message_listener(this->cConsumerConfig, &MessageListener,
+    pulsar_consumer_configuration_set_message_listener(this->cConsumerConfig, messageListener,
                                                        this->listener);
   }
 
diff --git a/src/ConsumerConfig.h b/src/ConsumerConfig.h
index 0617332..e947b8b 100644
--- a/src/ConsumerConfig.h
+++ b/src/ConsumerConfig.h
@@ -20,31 +20,22 @@
 #ifndef CONSUMER_CONFIG_H
 #define CONSUMER_CONFIG_H
 
-#include <napi.h>
 #include <pulsar/c/consumer_configuration.h>
+#include "MessageListener.h"
 
 #define MIN_ACK_TIMEOUT_MILLIS 10000
 
-struct CConsumerWrapper {
-  pulsar_consumer_t *cConsumer;
-  CConsumerWrapper();
-  ~CConsumerWrapper();
-};
-
-struct ListenerCallback {
-  Napi::ThreadSafeFunction callback;
-  std::shared_ptr<CConsumerWrapper> consumerWrapper;
-};
-
 class ConsumerConfig {
  public:
-  ConsumerConfig(const Napi::Object &consumerConfig, std::shared_ptr<CConsumerWrapper> consumerWrapper);
+  ConsumerConfig(const Napi::Object &consumerConfig, std::shared_ptr<CConsumerWrapper> consumerWrapper,
+                 pulsar_message_listener messageListener);
   ~ConsumerConfig();
   pulsar_consumer_configuration_t *GetCConsumerConfig();
   std::string GetTopic();
   std::string GetSubscription();
   int64_t GetAckTimeoutMs();
   int64_t GetNAckRedeliverTimeoutMs();
+
   ListenerCallback *GetListenerCallback();
 
  private:
diff --git a/src/ConsumerConfig.h b/src/MessageListener.h
similarity index 56%
copy from src/ConsumerConfig.h
copy to src/MessageListener.h
index 0617332..79dfe3b 100644
--- a/src/ConsumerConfig.h
+++ b/src/MessageListener.h
@@ -17,13 +17,11 @@
  * under the License.
  */
 
-#ifndef CONSUMER_CONFIG_H
-#define CONSUMER_CONFIG_H
+#ifndef MESSAGELISTENER_H
+#define MESSAGELISTENER_H
 
 #include <napi.h>
-#include <pulsar/c/consumer_configuration.h>
-
-#define MIN_ACK_TIMEOUT_MILLIS 10000
+#include <pulsar/c/client.h>
 
 struct CConsumerWrapper {
   pulsar_consumer_t *cConsumer;
@@ -33,27 +31,9 @@ struct CConsumerWrapper {
 
 struct ListenerCallback {
   Napi::ThreadSafeFunction callback;
-  std::shared_ptr<CConsumerWrapper> consumerWrapper;
-};
-
-class ConsumerConfig {
- public:
-  ConsumerConfig(const Napi::Object &consumerConfig, std::shared_ptr<CConsumerWrapper> consumerWrapper);
-  ~ConsumerConfig();
-  pulsar_consumer_configuration_t *GetCConsumerConfig();
-  std::string GetTopic();
-  std::string GetSubscription();
-  int64_t GetAckTimeoutMs();
-  int64_t GetNAckRedeliverTimeoutMs();
-  ListenerCallback *GetListenerCallback();
 
- private:
-  pulsar_consumer_configuration_t *cConsumerConfig;
-  std::string topic;
-  std::string subscription;
-  int64_t ackTimeoutMs;
-  int64_t nAckRedeliverTimeoutMs;
-  ListenerCallback *listener;
+  // Using consumer as void* since the ListenerCallback is shared between Config and Consumer.
+  void *consumer;
 };
 
 #endif