You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2020/03/30 04:16:40 UTC
[pulsar-client-node] branch master updated: Fix consumer being
closed with message listener (#83)
This is an automated email from the ASF dual-hosted git repository.
massakam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push:
new 8ba8ce4 Fix consumer being closed with message listener (#83)
8ba8ce4 is described below
commit 8ba8ce4841947b9a3c2af90a6fe01b495c0d9418
Author: Yosi Attias <yo...@gmail.com>
AuthorDate: Mon Mar 30 07:16:32 2020 +0300
Fix consumer being closed with message listener (#83)
- Call `Ref()` when we a have message listener, to make sure this reference is kept a live.
- Pass single consumer instance to message listener instead of re-creating it.
---
src/Consumer.cc | 51 +++++++++++++++++++++++++++--
src/Consumer.h | 1 +
src/ConsumerConfig.cc | 34 ++-----------------
src/ConsumerConfig.h | 17 +++-------
src/{ConsumerConfig.h => MessageListener.h} | 30 +++--------------
5 files changed, 62 insertions(+), 71 deletions(-)
diff --git a/src/Consumer.cc b/src/Consumer.cc
index 6b4879e..e2ad7b2 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -45,8 +45,48 @@ void Consumer::Init(Napi::Env env, Napi::Object exports) {
constructor.SuppressDestruct();
}
+struct MessageListenerProxyData {
+ pulsar_message_t *cMessage;
+ Consumer *consumer;
+
+ MessageListenerProxyData(pulsar_message_t *cMessage, Consumer *consumer)
+ : cMessage(cMessage), consumer(consumer) {}
+};
+
+void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback, MessageListenerProxyData *data) {
+ Napi::Object msg = Message::NewInstance({}, data->cMessage);
+ Consumer *consumer = data->consumer;
+ delete data;
+
+ jsCallback.Call({msg, consumer->Value()});
+}
+
+void MessageListener(pulsar_consumer_t *cConsumer, pulsar_message_t *cMessage, void *ctx) {
+ ListenerCallback *listenerCallback = (ListenerCallback *)ctx;
+
+ Consumer *consumer = (Consumer *)listenerCallback->consumer;
+
+ if (listenerCallback->callback.Acquire() != napi_ok) {
+ return;
+ };
+ MessageListenerProxyData *dataPtr = new MessageListenerProxyData(cMessage, consumer);
+ listenerCallback->callback.BlockingCall(dataPtr, MessageListenerProxy);
+ listenerCallback->callback.Release();
+}
+
void Consumer::SetCConsumer(std::shared_ptr<CConsumerWrapper> cConsumer) { this->wrapper = cConsumer; }
-void Consumer::SetListenerCallback(ListenerCallback *listener) { this->listener = listener; }
+void Consumer::SetListenerCallback(ListenerCallback *listener) {
+ if (listener) {
+ // Maintain reference to consumer, so it won't get garbage collected
+ // since, when we have a listener, we don't have to maintain reference to consumer (in js code)
+ this->Ref();
+
+ // Pass consumer as argument
+ listener->consumer = this;
+ }
+
+ this->listener = listener;
+}
Consumer::Consumer(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Consumer>(info), listener(nullptr) {}
@@ -100,6 +140,7 @@ class ConsumerNewInstanceWorker : public Napi::AsyncWorker {
void OnOK() {
Napi::Object obj = Consumer::constructor.New({});
Consumer *consumer = Consumer::Unwrap(obj);
+
consumer->SetCConsumer(this->consumerWrapper);
consumer->SetListenerCallback(this->listener);
this->deferred.Resolve(obj);
@@ -118,8 +159,10 @@ class ConsumerNewInstanceWorker : public Napi::AsyncWorker {
Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient) {
Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
Napi::Object config = info[0].As<Napi::Object>();
+
std::shared_ptr<CConsumerWrapper> consumerWrapper = std::make_shared<CConsumerWrapper>();
- ConsumerConfig *consumerConfig = new ConsumerConfig(config, consumerWrapper);
+
+ ConsumerConfig *consumerConfig = new ConsumerConfig(config, consumerWrapper, &MessageListener);
ConsumerNewInstanceWorker *wk =
new ConsumerNewInstanceWorker(deferred, cClient, consumerConfig, consumerWrapper);
wk->Queue();
@@ -235,6 +278,10 @@ class ConsumerCloseWorker : public Napi::AsyncWorker {
};
Napi::Value Consumer::Close(const Napi::CallbackInfo &info) {
+ if (this->listener) {
+ this->Unref();
+ }
+
Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
ConsumerCloseWorker *wk = new ConsumerCloseWorker(deferred, this->wrapper->cConsumer);
wk->Queue();
diff --git a/src/Consumer.h b/src/Consumer.h
index 1f909e7..63fb403 100644
--- a/src/Consumer.h
+++ b/src/Consumer.h
@@ -23,6 +23,7 @@
#include <napi.h>
#include <pulsar/c/client.h>
#include "ConsumerConfig.h"
+#include "MessageListener.h"
class Consumer : public Napi::ObjectWrap<Consumer> {
public:
diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc
index 1f734df..935500b 100644
--- a/src/ConsumerConfig.cc
+++ b/src/ConsumerConfig.cc
@@ -46,38 +46,11 @@ static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
static const std::map<std::string, initial_position> INIT_POSITION = {
{"Latest", initial_position_latest}, {"Earliest", initial_position_earliest}};
-struct MessageListenerProxyData {
- std::shared_ptr<CConsumerWrapper> consumerWrapper;
- pulsar_message_t *cMessage;
-
- MessageListenerProxyData(std::shared_ptr<CConsumerWrapper> consumerWrapper, pulsar_message_t *cMessage)
- : consumerWrapper(consumerWrapper), cMessage(cMessage) {}
-};
-
-void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback, MessageListenerProxyData *data) {
- Napi::Object msg = Message::NewInstance({}, data->cMessage);
- Napi::Object consumerObj = Consumer::constructor.New({});
- Consumer *consumer = Consumer::Unwrap(consumerObj);
- consumer->SetCConsumer(std::move(data->consumerWrapper));
- delete data;
- jsCallback.Call({msg, consumerObj});
-}
-
-void MessageListener(pulsar_consumer_t *cConsumer, pulsar_message_t *cMessage, void *ctx) {
- ListenerCallback *listenerCallback = (ListenerCallback *)ctx;
- if (listenerCallback->callback.Acquire() != napi_ok) {
- return;
- };
- MessageListenerProxyData *dataPtr =
- new MessageListenerProxyData(listenerCallback->consumerWrapper, cMessage);
- listenerCallback->callback.BlockingCall(dataPtr, MessageListenerProxy);
- listenerCallback->callback.Release();
-}
-
void FinalizeListenerCallback(Napi::Env env, ListenerCallback *cb, void *) { delete cb; }
ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig,
- std::shared_ptr<CConsumerWrapper> consumerWrapper)
+ std::shared_ptr<CConsumerWrapper> consumerWrapper,
+ pulsar_message_listener messageListener)
: topic(""), subscription(""), ackTimeoutMs(0), nAckRedeliverTimeoutMs(60000), listener(nullptr) {
this->cConsumerConfig = pulsar_consumer_configuration_create();
@@ -156,12 +129,11 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig,
if (consumerConfig.Has(CFG_LISTENER) && consumerConfig.Get(CFG_LISTENER).IsFunction()) {
this->listener = new ListenerCallback();
- this->listener->consumerWrapper = consumerWrapper;
Napi::ThreadSafeFunction callback = Napi::ThreadSafeFunction::New(
consumerConfig.Env(), consumerConfig.Get(CFG_LISTENER).As<Napi::Function>(), "Listener Callback", 1,
1, (void *)NULL, FinalizeListenerCallback, listener);
this->listener->callback = std::move(callback);
- pulsar_consumer_configuration_set_message_listener(this->cConsumerConfig, &MessageListener,
+ pulsar_consumer_configuration_set_message_listener(this->cConsumerConfig, messageListener,
this->listener);
}
diff --git a/src/ConsumerConfig.h b/src/ConsumerConfig.h
index 0617332..e947b8b 100644
--- a/src/ConsumerConfig.h
+++ b/src/ConsumerConfig.h
@@ -20,31 +20,22 @@
#ifndef CONSUMER_CONFIG_H
#define CONSUMER_CONFIG_H
-#include <napi.h>
#include <pulsar/c/consumer_configuration.h>
+#include "MessageListener.h"
#define MIN_ACK_TIMEOUT_MILLIS 10000
-struct CConsumerWrapper {
- pulsar_consumer_t *cConsumer;
- CConsumerWrapper();
- ~CConsumerWrapper();
-};
-
-struct ListenerCallback {
- Napi::ThreadSafeFunction callback;
- std::shared_ptr<CConsumerWrapper> consumerWrapper;
-};
-
class ConsumerConfig {
public:
- ConsumerConfig(const Napi::Object &consumerConfig, std::shared_ptr<CConsumerWrapper> consumerWrapper);
+ ConsumerConfig(const Napi::Object &consumerConfig, std::shared_ptr<CConsumerWrapper> consumerWrapper,
+ pulsar_message_listener messageListener);
~ConsumerConfig();
pulsar_consumer_configuration_t *GetCConsumerConfig();
std::string GetTopic();
std::string GetSubscription();
int64_t GetAckTimeoutMs();
int64_t GetNAckRedeliverTimeoutMs();
+
ListenerCallback *GetListenerCallback();
private:
diff --git a/src/ConsumerConfig.h b/src/MessageListener.h
similarity index 56%
copy from src/ConsumerConfig.h
copy to src/MessageListener.h
index 0617332..79dfe3b 100644
--- a/src/ConsumerConfig.h
+++ b/src/MessageListener.h
@@ -17,13 +17,11 @@
* under the License.
*/
-#ifndef CONSUMER_CONFIG_H
-#define CONSUMER_CONFIG_H
+#ifndef MESSAGELISTENER_H
+#define MESSAGELISTENER_H
#include <napi.h>
-#include <pulsar/c/consumer_configuration.h>
-
-#define MIN_ACK_TIMEOUT_MILLIS 10000
+#include <pulsar/c/client.h>
struct CConsumerWrapper {
pulsar_consumer_t *cConsumer;
@@ -33,27 +31,9 @@ struct CConsumerWrapper {
struct ListenerCallback {
Napi::ThreadSafeFunction callback;
- std::shared_ptr<CConsumerWrapper> consumerWrapper;
-};
-
-class ConsumerConfig {
- public:
- ConsumerConfig(const Napi::Object &consumerConfig, std::shared_ptr<CConsumerWrapper> consumerWrapper);
- ~ConsumerConfig();
- pulsar_consumer_configuration_t *GetCConsumerConfig();
- std::string GetTopic();
- std::string GetSubscription();
- int64_t GetAckTimeoutMs();
- int64_t GetNAckRedeliverTimeoutMs();
- ListenerCallback *GetListenerCallback();
- private:
- pulsar_consumer_configuration_t *cConsumerConfig;
- std::string topic;
- std::string subscription;
- int64_t ackTimeoutMs;
- int64_t nAckRedeliverTimeoutMs;
- ListenerCallback *listener;
+ // Using consumer as void* since the ListenerCallback is shared between Config and Consumer.
+ void *consumer;
};
#endif