You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/05/11 02:20:12 UTC

[GitHub] [pulsar-client-node] k2la opened a new pull request #153: Support Reader Listener

k2la opened a new pull request #153:
URL: https://github.com/apache/pulsar-client-node/pull/153


   Add listener function to the Reader, using the C++ client listener support.
   - Add listener for reader
   - Add example and e2e test of using the reader listener


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-node] k2la commented on a change in pull request #153: Support Reader Listener

Posted by GitBox <gi...@apache.org>.
k2la commented on a change in pull request #153:
URL: https://github.com/apache/pulsar-client-node/pull/153#discussion_r634037831



##########
File path: src/Reader.cc
##########
@@ -66,33 +105,41 @@ class ReaderNewInstanceWorker : public Napi::AsyncWorker {
 
     pulsar_result result =
         pulsar_client_create_reader(this->cClient, topic.c_str(), this->readerConfig->GetCStartMessageId(),
-                                    this->readerConfig->GetCReaderConfig(), &(this->cReader));
-    delete this->readerConfig;
+                                    this->readerConfig->GetCReaderConfig(), &this->readerWrapper->cReader);
     if (result != pulsar_result_Ok) {
       SetError(std::string("Failed to create reader: ") + pulsar_result_str(result));
       return;
+    } else {
+      this->listener = this->readerConfig->GetListenerCallback();
     }
+    delete this->readerConfig;

Review comment:
       Exactly. If the result is not ok, this is not be executed.
   I fixed that this is be done if the result is not ok.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-node] hrsakai commented on a change in pull request #153: Support Reader Listener

Posted by GitBox <gi...@apache.org>.
hrsakai commented on a change in pull request #153:
URL: https://github.com/apache/pulsar-client-node/pull/153#discussion_r634015655



##########
File path: src/Reader.cc
##########
@@ -176,13 +228,31 @@ class ReaderCloseWorker : public Napi::AsyncWorker {
  private:
   Napi::Promise::Deferred deferred;
   pulsar_reader_t *cReader;
+  Reader *reader;
 };
 
 Napi::Value Reader::Close(const Napi::CallbackInfo &info) {
   Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
-  ReaderCloseWorker *wk = new ReaderCloseWorker(deferred, this->cReader);
+  ReaderCloseWorker *wk = new ReaderCloseWorker(deferred, this->wrapper->cReader, this);
   wk->Queue();
   return deferred.Promise();
 }
 
-Reader::~Reader() { pulsar_reader_free(this->cReader); }
+void Reader::Cleanup() {
+  if (this->listener) {
+    this->CleanupListener();
+  }
+}
+
+void Reader::CleanupListener() {
+  this->Unref();
+  this->listener->callback.Release();
+  this->listener = nullptr;
+}
+
+Reader::~Reader() {
+  if (this->listener) {
+    this->CleanupListener();
+  }
+  pulsar_reader_free(this->wrapper->cReader);

Review comment:
       Can we delete this line because `pulsar_reader_free` is executed on `CReaderWrapper::~CReaderWrapper`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-node] k2la commented on a change in pull request #153: Support Reader Listener

Posted by GitBox <gi...@apache.org>.
k2la commented on a change in pull request #153:
URL: https://github.com/apache/pulsar-client-node/pull/153#discussion_r634037611



##########
File path: src/Reader.cc
##########
@@ -176,13 +228,31 @@ class ReaderCloseWorker : public Napi::AsyncWorker {
  private:
   Napi::Promise::Deferred deferred;
   pulsar_reader_t *cReader;
+  Reader *reader;
 };
 
 Napi::Value Reader::Close(const Napi::CallbackInfo &info) {
   Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
-  ReaderCloseWorker *wk = new ReaderCloseWorker(deferred, this->cReader);
+  ReaderCloseWorker *wk = new ReaderCloseWorker(deferred, this->wrapper->cReader, this);
   wk->Queue();
   return deferred.Promise();
 }
 
-Reader::~Reader() { pulsar_reader_free(this->cReader); }
+void Reader::Cleanup() {
+  if (this->listener) {
+    this->CleanupListener();
+  }
+}
+
+void Reader::CleanupListener() {
+  this->Unref();
+  this->listener->callback.Release();
+  this->listener = nullptr;
+}
+
+Reader::~Reader() {
+  if (this->listener) {
+    this->CleanupListener();
+  }
+  pulsar_reader_free(this->wrapper->cReader);

Review comment:
       Exactly. If the result is not ok, this is not be executed.
   I fixed that this is be done if the result is not ok.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-node] k2la commented on a change in pull request #153: Support Reader Listener

Posted by GitBox <gi...@apache.org>.
k2la commented on a change in pull request #153:
URL: https://github.com/apache/pulsar-client-node/pull/153#discussion_r635809844



##########
File path: src/Reader.cc
##########
@@ -66,33 +105,41 @@ class ReaderNewInstanceWorker : public Napi::AsyncWorker {
 
     pulsar_result result =
         pulsar_client_create_reader(this->cClient, topic.c_str(), this->readerConfig->GetCStartMessageId(),
-                                    this->readerConfig->GetCReaderConfig(), &(this->cReader));
-    delete this->readerConfig;
+                                    this->readerConfig->GetCReaderConfig(), &this->readerWrapper->cReader);
     if (result != pulsar_result_Ok) {
       SetError(std::string("Failed to create reader: ") + pulsar_result_str(result));
       return;
+    } else {
+      this->listener = this->readerConfig->GetListenerCallback();
     }
+    delete this->readerConfig;

Review comment:
       I implemented `createReaderCallback` like [subscribeCallback](https://github.com/apache/pulsar-client-node/blob/v1.3.0/src/Consumer.cc#L178-L190).
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-node] k2la commented on a change in pull request #153: Support Reader Listener

Posted by GitBox <gi...@apache.org>.
k2la commented on a change in pull request #153:
URL: https://github.com/apache/pulsar-client-node/pull/153#discussion_r634033760



##########
File path: src/Reader.cc
##########
@@ -176,13 +228,31 @@ class ReaderCloseWorker : public Napi::AsyncWorker {
  private:
   Napi::Promise::Deferred deferred;
   pulsar_reader_t *cReader;
+  Reader *reader;
 };
 
 Napi::Value Reader::Close(const Napi::CallbackInfo &info) {
   Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
-  ReaderCloseWorker *wk = new ReaderCloseWorker(deferred, this->cReader);
+  ReaderCloseWorker *wk = new ReaderCloseWorker(deferred, this->wrapper->cReader, this);
   wk->Queue();
   return deferred.Promise();
 }
 
-Reader::~Reader() { pulsar_reader_free(this->cReader); }
+void Reader::Cleanup() {
+  if (this->listener) {
+    this->CleanupListener();
+  }
+}
+
+void Reader::CleanupListener() {
+  this->Unref();
+  this->listener->callback.Release();
+  this->listener = nullptr;
+}
+
+Reader::~Reader() {
+  if (this->listener) {
+    this->CleanupListener();
+  }
+  pulsar_reader_free(this->wrapper->cReader);

Review comment:
       OK. I removed it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-node] hrsakai commented on a change in pull request #153: Support Reader Listener

Posted by GitBox <gi...@apache.org>.
hrsakai commented on a change in pull request #153:
URL: https://github.com/apache/pulsar-client-node/pull/153#discussion_r634050519



##########
File path: src/Reader.cc
##########
@@ -66,33 +105,41 @@ class ReaderNewInstanceWorker : public Napi::AsyncWorker {
 
     pulsar_result result =
         pulsar_client_create_reader(this->cClient, topic.c_str(), this->readerConfig->GetCStartMessageId(),
-                                    this->readerConfig->GetCReaderConfig(), &(this->cReader));
-    delete this->readerConfig;
+                                    this->readerConfig->GetCReaderConfig(), &this->readerWrapper->cReader);
     if (result != pulsar_result_Ok) {
       SetError(std::string("Failed to create reader: ") + pulsar_result_str(result));
       return;
+    } else {
+      this->listener = this->readerConfig->GetListenerCallback();
     }
+    delete this->readerConfig;

Review comment:
       I think it is better to write as follows.
   https://github.com/apache/pulsar-client-node/blob/v1.3.0/src/Consumer.cc#L180-L187




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-node] hrsakai merged pull request #153: Support Reader Listener

Posted by GitBox <gi...@apache.org>.
hrsakai merged pull request #153:
URL: https://github.com/apache/pulsar-client-node/pull/153


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-node] hrsakai commented on a change in pull request #153: Support Reader Listener

Posted by GitBox <gi...@apache.org>.
hrsakai commented on a change in pull request #153:
URL: https://github.com/apache/pulsar-client-node/pull/153#discussion_r634015393



##########
File path: src/Reader.cc
##########
@@ -66,33 +105,41 @@ class ReaderNewInstanceWorker : public Napi::AsyncWorker {
 
     pulsar_result result =
         pulsar_client_create_reader(this->cClient, topic.c_str(), this->readerConfig->GetCStartMessageId(),
-                                    this->readerConfig->GetCReaderConfig(), &(this->cReader));
-    delete this->readerConfig;
+                                    this->readerConfig->GetCReaderConfig(), &this->readerWrapper->cReader);
     if (result != pulsar_result_Ok) {
       SetError(std::string("Failed to create reader: ") + pulsar_result_str(result));
       return;
+    } else {
+      this->listener = this->readerConfig->GetListenerCallback();
     }
+    delete this->readerConfig;

Review comment:
       if `result` is not ok, won't this line be executed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org