You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by hr...@apache.org on 2020/06/29 05:09:25 UTC
[pulsar-client-node] branch master updated: Fix consumer double
close (#96)
This is an automated email from the ASF dual-hosted git repository.
hrsakai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push:
new df9d3d0 Fix consumer double close (#96)
df9d3d0 is described below
commit df9d3d059ec91627a839376bc83d9118b3581774
Author: Yosi Attias <yo...@gmail.com>
AuthorDate: Mon Jun 29 08:09:17 2020 +0300
Fix consumer double close (#96)
We do `Unref` more than once in the old code, now we do `Unref` only if the consumer is closed.
---
src/Consumer.cc | 27 ++++++++++++++++++++-------
src/Consumer.h | 1 +
tests/consumer.test.js | 17 +++++++++++++++++
3 files changed, 38 insertions(+), 7 deletions(-)
diff --git a/src/Consumer.cc b/src/Consumer.cc
index e2ad7b2..225ed3e 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -68,7 +68,8 @@ void MessageListener(pulsar_consumer_t *cConsumer, pulsar_message_t *cMessage, v
if (listenerCallback->callback.Acquire() != napi_ok) {
return;
- };
+ }
+
MessageListenerProxyData *dataPtr = new MessageListenerProxyData(cMessage, consumer);
listenerCallback->callback.BlockingCall(dataPtr, MessageListenerProxy);
listenerCallback->callback.Release();
@@ -256,17 +257,25 @@ void Consumer::AcknowledgeCumulativeId(const Napi::CallbackInfo &info) {
class ConsumerCloseWorker : public Napi::AsyncWorker {
public:
- ConsumerCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer)
+ ConsumerCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer,
+ Consumer *consumer)
: AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
deferred(deferred),
- cConsumer(cConsumer) {}
+ cConsumer(cConsumer),
+ consumer(consumer) {}
+
~ConsumerCloseWorker() {}
void Execute() {
pulsar_consumer_pause_message_listener(this->cConsumer);
pulsar_result result = pulsar_consumer_close(this->cConsumer);
- if (result != pulsar_result_Ok) SetError(pulsar_result_str(result));
+ if (result != pulsar_result_Ok) {
+ SetError(pulsar_result_str(result));
+ }
+ }
+ void OnOK() {
+ this->consumer->Cleanup();
+ this->deferred.Resolve(Env().Null());
}
- void OnOK() { this->deferred.Resolve(Env().Null()); }
void OnError(const Napi::Error &e) {
this->deferred.Reject(
Napi::Error::New(Env(), std::string("Failed to close consumer: ") + e.Message()).Value());
@@ -275,15 +284,19 @@ class ConsumerCloseWorker : public Napi::AsyncWorker {
private:
Napi::Promise::Deferred deferred;
pulsar_consumer_t *cConsumer;
+ Consumer *consumer;
};
-Napi::Value Consumer::Close(const Napi::CallbackInfo &info) {
+void Consumer::Cleanup() {
if (this->listener) {
this->Unref();
+ this->listener = nullptr;
}
+}
+Napi::Value Consumer::Close(const Napi::CallbackInfo &info) {
Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
- ConsumerCloseWorker *wk = new ConsumerCloseWorker(deferred, this->wrapper->cConsumer);
+ ConsumerCloseWorker *wk = new ConsumerCloseWorker(deferred, this->wrapper->cConsumer, this);
wk->Queue();
return deferred.Promise();
}
diff --git a/src/Consumer.h b/src/Consumer.h
index 63fb403..2c5f856 100644
--- a/src/Consumer.h
+++ b/src/Consumer.h
@@ -34,6 +34,7 @@ class Consumer : public Napi::ObjectWrap<Consumer> {
~Consumer();
void SetCConsumer(std::shared_ptr<CConsumerWrapper> cConsumer);
void SetListenerCallback(ListenerCallback *listener);
+ void Cleanup();
private:
std::shared_ptr<CConsumerWrapper> wrapper;
diff --git a/tests/consumer.test.js b/tests/consumer.test.js
index 65bd792..e61120c 100644
--- a/tests/consumer.test.js
+++ b/tests/consumer.test.js
@@ -87,5 +87,22 @@ const Pulsar = require('../index.js');
})).rejects.toThrow('NAck timeout should be greater than or equal to zero');
});
});
+
+ describe('Close', () => {
+ test('throws error on subsequent calls to close', async () => {
+ const consumer = await client.subscribe({
+ topic: 'persistent://public/default/my-topic',
+ subscription: 'sub1',
+ subscriptionType: 'Shared',
+ // Test with listener since it changes the flow of close
+ // and reproduces an issue
+ listener() {},
+ });
+
+ await expect(consumer.close()).resolves.toEqual(null);
+
+ await expect(consumer.close()).rejects.toThrow('Failed to close consumer: AlreadyClosed');
+ });
+ });
});
})();