You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2021/12/06 02:36:47 UTC
[pulsar-client-node] branch branch-1.3 updated: fix: use pause and resume api to ensure orderly init listener->consumer (#182)
This is an automated email from the ASF dual-hosted git repository.
massakam pushed a commit to branch branch-1.3
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/branch-1.3 by this push:
new 91b29f9 fix: use pause and resume api to ensure orderly init listener->consumer (#182)
91b29f9 is described below
commit 91b29f9d5718e4e88bc58e08a99ee24ee71b13c6
Author: lipeining <39...@users.noreply.github.com>
AuthorDate: Mon Dec 6 10:36:44 2021 +0800
fix: use pause and resume api to ensure orderly init listener->consumer (#182)
Co-authored-by: lipeining <li...@joyy.com>
---
src/Consumer.cc | 11 +++++++++++
1 file changed, 11 insertions(+)
diff --git a/src/Consumer.cc b/src/Consumer.cc
index 8e4a9e1..9789e70 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -163,6 +163,12 @@ class ConsumerNewInstanceWorker : public Napi::AsyncWorker {
consumer->SetCConsumer(this->consumerWrapper);
consumer->SetListenerCallback(this->listener);
+
+ if (this->listener) {
+ // resume to enable MessageListener function callback
+ resume_message_listener(this->consumerWrapper->cConsumer);
+ }
+
this->deferred.Resolve(obj);
}
void OnError(const Napi::Error &e) { this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value()); }
@@ -182,6 +188,11 @@ class ConsumerNewInstanceWorker : public Napi::AsyncWorker {
} else {
worker->consumerWrapper->cConsumer = consumer;
worker->listener = worker->consumerConfig->GetListenerCallback();
+
+ if (worker->listener) {
+ // pause, will resume in OnOK, to prevent MessageListener get a nullptr of consumer
+ pulsar_consumer_pause_message_listener(consumer);
+ }
}
delete worker->consumerConfig;