You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by hr...@apache.org on 2020/06/29 05:09:25 UTC

[pulsar-client-node] branch master updated: Fix consumer double close (#96)

This is an automated email from the ASF dual-hosted git repository.

hrsakai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new df9d3d0  Fix consumer double close (#96)
df9d3d0 is described below

commit df9d3d059ec91627a839376bc83d9118b3581774
Author: Yosi Attias <yo...@gmail.com>
AuthorDate: Mon Jun 29 08:09:17 2020 +0300

    Fix consumer double close (#96)
    
    We do `Unref` more than once in the old code, now we do `Unref` only if the consumer is closed.
---
 src/Consumer.cc        | 27 ++++++++++++++++++++-------
 src/Consumer.h         |  1 +
 tests/consumer.test.js | 17 +++++++++++++++++
 3 files changed, 38 insertions(+), 7 deletions(-)

diff --git a/src/Consumer.cc b/src/Consumer.cc
index e2ad7b2..225ed3e 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -68,7 +68,8 @@ void MessageListener(pulsar_consumer_t *cConsumer, pulsar_message_t *cMessage, v
 
   if (listenerCallback->callback.Acquire() != napi_ok) {
     return;
-  };
+  }
+
   MessageListenerProxyData *dataPtr = new MessageListenerProxyData(cMessage, consumer);
   listenerCallback->callback.BlockingCall(dataPtr, MessageListenerProxy);
   listenerCallback->callback.Release();
@@ -256,17 +257,25 @@ void Consumer::AcknowledgeCumulativeId(const Napi::CallbackInfo &info) {
 
 class ConsumerCloseWorker : public Napi::AsyncWorker {
  public:
-  ConsumerCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer)
+  ConsumerCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer,
+                      Consumer *consumer)
       : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
         deferred(deferred),
-        cConsumer(cConsumer) {}
+        cConsumer(cConsumer),
+        consumer(consumer) {}
+
   ~ConsumerCloseWorker() {}
   void Execute() {
     pulsar_consumer_pause_message_listener(this->cConsumer);
     pulsar_result result = pulsar_consumer_close(this->cConsumer);
-    if (result != pulsar_result_Ok) SetError(pulsar_result_str(result));
+    if (result != pulsar_result_Ok) {
+      SetError(pulsar_result_str(result));
+    }
+  }
+  void OnOK() {
+    this->consumer->Cleanup();
+    this->deferred.Resolve(Env().Null());
   }
-  void OnOK() { this->deferred.Resolve(Env().Null()); }
   void OnError(const Napi::Error &e) {
     this->deferred.Reject(
         Napi::Error::New(Env(), std::string("Failed to close consumer: ") + e.Message()).Value());
@@ -275,15 +284,19 @@ class ConsumerCloseWorker : public Napi::AsyncWorker {
  private:
   Napi::Promise::Deferred deferred;
   pulsar_consumer_t *cConsumer;
+  Consumer *consumer;
 };
 
-Napi::Value Consumer::Close(const Napi::CallbackInfo &info) {
+void Consumer::Cleanup() {
   if (this->listener) {
     this->Unref();
+    this->listener = nullptr;
   }
+}
 
+Napi::Value Consumer::Close(const Napi::CallbackInfo &info) {
   Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
-  ConsumerCloseWorker *wk = new ConsumerCloseWorker(deferred, this->wrapper->cConsumer);
+  ConsumerCloseWorker *wk = new ConsumerCloseWorker(deferred, this->wrapper->cConsumer, this);
   wk->Queue();
   return deferred.Promise();
 }
diff --git a/src/Consumer.h b/src/Consumer.h
index 63fb403..2c5f856 100644
--- a/src/Consumer.h
+++ b/src/Consumer.h
@@ -34,6 +34,7 @@ class Consumer : public Napi::ObjectWrap<Consumer> {
   ~Consumer();
   void SetCConsumer(std::shared_ptr<CConsumerWrapper> cConsumer);
   void SetListenerCallback(ListenerCallback *listener);
+  void Cleanup();
 
  private:
   std::shared_ptr<CConsumerWrapper> wrapper;
diff --git a/tests/consumer.test.js b/tests/consumer.test.js
index 65bd792..e61120c 100644
--- a/tests/consumer.test.js
+++ b/tests/consumer.test.js
@@ -87,5 +87,22 @@ const Pulsar = require('../index.js');
         })).rejects.toThrow('NAck timeout should be greater than or equal to zero');
       });
     });
+
+    describe('Close', () => {
+      test('throws error on subsequent calls to close', async () => {
+        const consumer = await client.subscribe({
+          topic: 'persistent://public/default/my-topic',
+          subscription: 'sub1',
+          subscriptionType: 'Shared',
+          // Test with listener since it changes the flow of close
+          // and reproduces an issue
+          listener() {},
+        });
+
+        await expect(consumer.close()).resolves.toEqual(null);
+
+        await expect(consumer.close()).rejects.toThrow('Failed to close consumer: AlreadyClosed');
+      });
+    });
   });
 })();