You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/04/05 04:57:19 UTC

[GitHub] [pulsar-client-node] equanz commented on a diff in pull request #200: convert worker threads to async functions

equanz commented on code in PR #200:
URL: https://github.com/apache/pulsar-client-node/pull/200#discussion_r842345647


##########
src/Consumer.cc:
##########
@@ -243,160 +235,225 @@ class ConsumerReceiveWorker : public Napi::AsyncWorker {
 
  private:
   Napi::Promise::Deferred deferred;
-  pulsar_consumer_t *cConsumer;
-  pulsar_message_t *cMessage;
+  std::shared_ptr<pulsar_consumer_t> cConsumer;
+  std::shared_ptr<pulsar_message_t> cMessage;
   int64_t timeout;
 };
 
 Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) {
   Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
   if (info[0].IsUndefined()) {
-    ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->wrapper->cConsumer);
+    ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->cConsumer);
     wk->Queue();
   } else {
     Napi::Number timeout = info[0].As<Napi::Object>().ToNumber();
-    ConsumerReceiveWorker *wk =
-        new ConsumerReceiveWorker(deferred, this->wrapper->cConsumer, timeout.Int64Value());
+    ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->cConsumer, timeout.Int64Value());
     wk->Queue();
   }
   return deferred.Promise();
 }
 
-void Consumer::Acknowledge(const Napi::CallbackInfo &info) {
-  Napi::Object obj = info[0].As<Napi::Object>();
-  Message *msg = Message::Unwrap(obj);
-  pulsar_consumer_acknowledge_async(this->wrapper->cConsumer, msg->GetCMessage(), NULL, NULL);
+Napi::Value Consumer::Acknowledge(const Napi::CallbackInfo &info) {
+  auto obj = info[0].As<Napi::Object>();
+  auto msg = Message::Unwrap(obj);
+  auto deferred = ThreadSafeDeferred::New(Env());
+  auto ctx = new ExtDeferredContext<Consumer *>(this, deferred);
+  this->Ref();
+
+  pulsar_consumer_acknowledge_async(
+      this->cConsumer.get(), msg->GetCMessage().get(),
+      [](pulsar_result result, void *ctx) {
+        auto deferredContext = static_cast<ExtDeferredContext<Consumer *> *>(ctx);
+        auto deferred = deferredContext->deferred;
+        auto self = deferredContext->ref;
+        delete deferredContext;
+
+        if (result != pulsar_result_Ok) {
+          deferred->Reject(std::string("Failed to acknowledge: ") + pulsar_result_str(result));
+        } else {
+          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+        }
+
+        self->Unref();
+      },
+      ctx);
+
+  return deferred->Promise();
 }
 
-void Consumer::AcknowledgeId(const Napi::CallbackInfo &info) {
-  Napi::Object obj = info[0].As<Napi::Object>();
-  MessageId *msgId = MessageId::Unwrap(obj);
-  pulsar_consumer_acknowledge_async_id(this->wrapper->cConsumer, msgId->GetCMessageId(), NULL, NULL);
+Napi::Value Consumer::AcknowledgeId(const Napi::CallbackInfo &info) {
+  auto obj = info[0].As<Napi::Object>();
+  auto *msgId = MessageId::Unwrap(obj);
+  auto deferred = ThreadSafeDeferred::New(Env());
+  auto ctx = new ExtDeferredContext<Consumer *>(this, deferred);
+  this->Ref();
+
+  pulsar_consumer_acknowledge_async_id(
+      this->cConsumer.get(), msgId->GetCMessageId().get(),
+      [](pulsar_result result, void *ctx) {
+        auto deferredContext = static_cast<ExtDeferredContext<Consumer *> *>(ctx);
+        auto deferred = deferredContext->deferred;
+        auto self = deferredContext->ref;
+        delete deferredContext;
+
+        if (result != pulsar_result_Ok) {
+          deferred->Reject(std::string("Failed to acknowledge id: ") + pulsar_result_str(result));
+        } else {
+          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+        }
+
+        self->Unref();
+      },
+      ctx);
+
+  return deferred->Promise();
 }
 
 void Consumer::NegativeAcknowledge(const Napi::CallbackInfo &info) {
   Napi::Object obj = info[0].As<Napi::Object>();
   Message *msg = Message::Unwrap(obj);
-  pulsar_consumer_negative_acknowledge(this->wrapper->cConsumer, msg->GetCMessage());
+  std::shared_ptr<pulsar_message_t> cMessage = msg->GetCMessage();
+  pulsar_consumer_negative_acknowledge(this->cConsumer.get(), cMessage.get());
 }
 
 void Consumer::NegativeAcknowledgeId(const Napi::CallbackInfo &info) {
   Napi::Object obj = info[0].As<Napi::Object>();
   MessageId *msgId = MessageId::Unwrap(obj);
-  pulsar_consumer_negative_acknowledge_id(this->wrapper->cConsumer, msgId->GetCMessageId());
+  std::shared_ptr<pulsar_message_id_t> cMessageId = msgId->GetCMessageId();
+  pulsar_consumer_negative_acknowledge_id(this->cConsumer.get(), cMessageId.get());
 }
 
-void Consumer::AcknowledgeCumulative(const Napi::CallbackInfo &info) {
-  Napi::Object obj = info[0].As<Napi::Object>();
-  Message *msg = Message::Unwrap(obj);
-  pulsar_consumer_acknowledge_cumulative_async(this->wrapper->cConsumer, msg->GetCMessage(), NULL, NULL);
+Napi::Value Consumer::AcknowledgeCumulative(const Napi::CallbackInfo &info) {
+  auto obj = info[0].As<Napi::Object>();
+  auto *msg = Message::Unwrap(obj);
+  auto deferred = ThreadSafeDeferred::New(Env());
+  auto ctx = new ExtDeferredContext<Consumer *>(this, deferred);
+  this->Ref();
+
+  pulsar_consumer_acknowledge_cumulative_async(
+      this->cConsumer.get(), msg->GetCMessage().get(),
+      [](pulsar_result result, void *ctx) {
+        auto deferredContext = static_cast<ExtDeferredContext<Consumer *> *>(ctx);
+        auto deferred = deferredContext->deferred;
+        auto self = deferredContext->ref;
+        delete deferredContext;
+
+        if (result != pulsar_result_Ok) {
+          deferred->Reject(std::string("Failed to acknowledge cumulatively: ") + pulsar_result_str(result));
+        } else {
+          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+        }
+
+        self->Unref();
+      },
+      ctx);
+
+  return deferred->Promise();
 }
 
-void Consumer::AcknowledgeCumulativeId(const Napi::CallbackInfo &info) {
-  Napi::Object obj = info[0].As<Napi::Object>();
-  MessageId *msgId = MessageId::Unwrap(obj);
-  pulsar_consumer_acknowledge_cumulative_async_id(this->wrapper->cConsumer, msgId->GetCMessageId(), NULL,
-                                                  NULL);
+Napi::Value Consumer::AcknowledgeCumulativeId(const Napi::CallbackInfo &info) {
+  auto obj = info[0].As<Napi::Object>();
+  auto *msgId = MessageId::Unwrap(obj);
+  auto deferred = ThreadSafeDeferred::New(Env());
+  auto ctx = new ExtDeferredContext<Consumer *>(this, deferred);
+  this->Ref();
+
+  pulsar_consumer_acknowledge_cumulative_async_id(
+      this->cConsumer.get(), msgId->GetCMessageId().get(),
+      [](pulsar_result result, void *ctx) {
+        auto deferredContext = static_cast<ExtDeferredContext<Consumer *> *>(ctx);
+        auto deferred = deferredContext->deferred;
+        auto self = deferredContext->ref;
+        delete deferredContext;
+
+        if (result != pulsar_result_Ok) {
+          deferred->Reject(std::string("Failed to acknowledge cumulatively by id: ") +
+                           pulsar_result_str(result));
+        } else {
+          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+        }
+
+        self->Unref();
+      },
+      ctx);
+
+  return deferred->Promise();
 }
 
 Napi::Value Consumer::IsConnected(const Napi::CallbackInfo &info) {
   Napi::Env env = info.Env();
-  return Napi::Boolean::New(env, pulsar_consumer_is_connected(this->wrapper->cConsumer));
+  return Napi::Boolean::New(env, pulsar_consumer_is_connected(this->cConsumer.get()));
 }
 
-class ConsumerCloseWorker : public Napi::AsyncWorker {
- public:
-  ConsumerCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer,
-                      Consumer *consumer)
-      : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
-        deferred(deferred),
-        cConsumer(cConsumer),
-        consumer(consumer) {}
-
-  ~ConsumerCloseWorker() {}
-  void Execute() {
-    pulsar_consumer_pause_message_listener(this->cConsumer);
-    pulsar_result result = pulsar_consumer_close(this->cConsumer);
-    if (result != pulsar_result_Ok) {
-      SetError(pulsar_result_str(result));
-    }
-  }
-  void OnOK() {
-    this->consumer->Cleanup();
-    this->deferred.Resolve(Env().Null());
-  }
-  void OnError(const Napi::Error &e) {
-    this->deferred.Reject(
-        Napi::Error::New(Env(), std::string("Failed to close consumer: ") + e.Message()).Value());
-  }
-
- private:
-  Napi::Promise::Deferred deferred;
-  pulsar_consumer_t *cConsumer;
-  Consumer *consumer;
-};
-
-class ConsumerUnsubscribeWorker : public Napi::AsyncWorker {
- public:
-  ConsumerUnsubscribeWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer,
-                            Consumer *consumer)
-      : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
-        deferred(deferred),
-        cConsumer(cConsumer),
-        consumer(consumer) {}
-
-  ~ConsumerUnsubscribeWorker() {}
-  void Execute() {
-    pulsar_consumer_pause_message_listener(this->cConsumer);
-    pulsar_result result = pulsar_consumer_unsubscribe(this->cConsumer);
-    if (result != pulsar_result_Ok) {
-      SetError(pulsar_result_str(result));
-    }
-  }
-  void OnOK() {
-    this->consumer->Cleanup();
-    this->deferred.Resolve(Env().Null());
-  }
-  void OnError(const Napi::Error &e) {
-    this->deferred.Reject(
-        Napi::Error::New(Env(), std::string("Failed to unsubscribe consumer: ") + e.Message()).Value());
-  }
-
- private:
-  Napi::Promise::Deferred deferred;
-  pulsar_consumer_t *cConsumer;
-  Consumer *consumer;
-};
-
 void Consumer::Cleanup() {
-  if (this->listener) {
-    this->CleanupListener();
+  if (this->listener != nullptr) {
+    pulsar_consumer_pause_message_listener(this->cConsumer.get());
+    this->listener->callback.Release();
+    this->listener = nullptr;
+    this->Unref();
   }
 }
 
-void Consumer::CleanupListener() {
-  pulsar_consumer_pause_message_listener(this->wrapper->cConsumer);
-  this->Unref();
-  this->listener->callback.Release();
-  this->listener = nullptr;
-}
-
 Napi::Value Consumer::Close(const Napi::CallbackInfo &info) {
-  Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
-  ConsumerCloseWorker *wk = new ConsumerCloseWorker(deferred, this->wrapper->cConsumer, this);
-  wk->Queue();
-  return deferred.Promise();
+  auto deferred = ThreadSafeDeferred::New(Env());
+  auto ctx = new ExtDeferredContext<Consumer *>(this, deferred);
+  this->Ref();
+
+  pulsar_consumer_pause_message_listener(this->cConsumer.get());
+  pulsar_consumer_close_async(
+      this->cConsumer.get(),
+      [](pulsar_result result, void *ctx) {
+        auto deferredContext = static_cast<ExtDeferredContext<Consumer *> *>(ctx);
+        auto deferred = deferredContext->deferred;
+        auto self = deferredContext->ref;
+        delete deferredContext;
+
+        if (result != pulsar_result_Ok) {
+          deferred->Reject(std::string("Failed to close consumer: ") + pulsar_result_str(result));
+        } else {
+          self->Cleanup();
+          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+        }
+
+        self->Unref();
+      },
+      ctx);
+
+  return deferred->Promise();
 }
 
 Napi::Value Consumer::Unsubscribe(const Napi::CallbackInfo &info) {
-  Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
-  ConsumerUnsubscribeWorker *wk = new ConsumerUnsubscribeWorker(deferred, this->wrapper->cConsumer, this);
-  wk->Queue();
-  return deferred.Promise();
+  auto deferred = ThreadSafeDeferred::New(Env());
+  auto ctx = new ExtDeferredContext<Consumer *>(this, deferred);
+  this->Ref();
+
+  pulsar_consumer_pause_message_listener(this->cConsumer.get());
+  pulsar_consumer_unsubscribe_async(
+      this->cConsumer.get(),
+      [](pulsar_result result, void *ctx) {
+        auto deferredContext = static_cast<ExtDeferredContext<Consumer *> *>(ctx);
+        auto deferred = deferredContext->deferred;
+        auto self = deferredContext->ref;
+        delete deferredContext;
+
+        if (result != pulsar_result_Ok) {
+          deferred->Reject(std::string("Failed to unsubscribe consumer: ") + pulsar_result_str(result));
+        } else {
+          self->Cleanup();
+          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+        }
+
+        self->Unref();
+      },
+      ctx);
+
+  return deferred->Promise();
 }
 
 Consumer::~Consumer() {
-  if (this->listener) {
-    this->CleanupListener();
+  this->Cleanup();
+  while (this->Unref() != 0) {
+    // If Ref() > 0 then the process is shutting down. We must unref to prevent
+    // double free (once for the env shutdown and once for non-zero refs)
   }

Review Comment:
   Note:
   I noticed Reference#Unref throws the Error at the other lines (backtraces as below) in the https://github.com/apache/pulsar-client-node/pull/200/commits/3def7b8e122970a18d792590f07b821b549db150 commit.
   
   ```
   (gdb) run perf/perf_producer.js --url pulsar://host.docker.internal:6650 --topic persistent://public/default/topic -i 30
   Starting program: /usr/local/bin/node perf/perf_producer.js --url pulsar://host.docker.internal:6650 --topic persistent://public/default/topic -i 30
   warning: Error disabling address space randomization: Operation not permitted
   [Thread debugging using libthread_db enabled]
   Using host libthread_db library "/lib64/libthread_db.so.1".
   [New Thread 0x7f14709ed700 (LWP 2166)]
   [New Thread 0x7f146bfff700 (LWP 2167)]
   [New Thread 0x7f146b7fe700 (LWP 2168)]
   [New Thread 0x7f146affd700 (LWP 2169)]
   [New Thread 0x7f146a7fc700 (LWP 2170)]
   [New Thread 0x7f1471c1b700 (LWP 2171)]
   ----------------------
   size: 1024
   url: pulsar://host.docker.internal:6650
   topic: persistent://public/default/topic
   iteration: 30
   messages: 1000
   ----------------------
   [New Thread 0x7f1468f00700 (LWP 2172)]
   [New Thread 0x7f145bfff700 (LWP 2173)]
   Throughput produced: 6277.019  msg/s --- 49.039 Mbit/s --- Latency: mean: 92.55816686251468 ms - med: 95 - 95pct: 149 - 99pct: 156 - 99.9pct: 158 - 99.99pct: 158 - Max: 158.00080001354218
   Throughput produced: 6735.213  msg/s --- 52.619 Mbit/s --- Latency: mean: 79.87486515641855 ms - med: 80 - 95pct: 140 - 99pct: 146 - 99.9pct: 148 - 99.99pct: 148 - Max: 148.17530000209808
   Throughput produced: 7529.627  msg/s --- 58.825 Mbit/s --- Latency: mean: 75.30654420206659 ms - med: 77 - 95pct: 126 - 99pct: 131 - 99.9pct: 132 - 99.99pct: 132 - Max: 132.66699999570847
   Throughput produced: 4261.913  msg/s --- 33.296 Mbit/s --- Latency: mean: 127.84177869700103 ms - med: 122 - 95pct: 222 - 99pct: 227 - 99.9pct: 229 - 99.99pct: 229 - Max: 229.04490000009537
   Throughput produced: 4543.716  msg/s --- 35.498 Mbit/s --- Latency: mean: 142.1572192513369 ms - med: 150 - 95pct: 214 - 99pct: 218 - 99.9pct: 220 - 99.99pct: 220 - Max: 220.01279997825623
   Throughput produced: 5914.056  msg/s --- 46.204 Mbit/s --- Latency: mean: 96.91391509433963 ms - med: 93 - 95pct: 163 - 99pct: 166 - 99.9pct: 169 - 99.99pct: 169 - Max: 169.00579994916916
   Throughput produced: 7065.959  msg/s --- 55.203 Mbit/s --- Latency: mean: 83.79047619047618 ms - med: 85 - 95pct: 135 - 99pct: 139 - 99.9pct: 141 - 99.99pct: 141 - Max: 141.43660002946854
   Throughput produced: 8951.213  msg/s --- 69.931 Mbit/s --- Latency: mean: 61.14530892448513 ms - med: 62 - 95pct: 105 - 99pct: 110 - 99.9pct: 111 - 99.99pct: 111 - Max: 111.6453999876976
   Throughput produced: 7331.771  msg/s --- 57.279 Mbit/s --- Latency: mean: 76.43560606060606 ms - med: 75 - 95pct: 131 - 99pct: 135 - 99.9pct: 136 - 99.99pct: 136 - Max: 136.28170001506805
   Throughput produced: 5457.243  msg/s --- 42.635 Mbit/s --- Latency: mean: 100.08841099163679 ms - med: 100 - 95pct: 174 - 99pct: 180 - 99.9pct: 182 - 99.99pct: 182 - Max: 182.2026999592781
   Throughput produced: 7964.142  msg/s --- 62.22 Mbit/s --- Latency: mean: 75.52533992583436 ms - med: 77 - 95pct: 119 - 99pct: 123 - 99.9pct: 125 - 99.99pct: 125 - Max: 125.44050002098083
   Throughput produced: 7403.996  msg/s --- 57.844 Mbit/s --- Latency: mean: 74.31855309218203 ms - med: 77 - 95pct: 127 - 99pct: 132 - 99.9pct: 134 - 99.99pct: 134 - Max: 134.95059996843338
   Throughput produced: 6052.195  msg/s --- 47.283 Mbit/s --- Latency: mean: 98.21065375302663 ms - med: 96 - 95pct: 157 - 99pct: 163 - 99.9pct: 165 - 99.99pct: 165 - Max: 165.1327999830246
   Throughput produced: 6790.83  msg/s --- 53.053 Mbit/s --- Latency: mean: 81.64018691588785 ms - med: 81 - 95pct: 142 - 99pct: 145 - 99.9pct: 147 - 99.99pct: 147 - Max: 147.17070001363754
   Throughput produced: 7530.325  msg/s --- 58.831 Mbit/s --- Latency: mean: 73.41789215686275 ms - med: 72 - 95pct: 125 - 99pct: 131 - 99.9pct: 132 - 99.99pct: 132 - Max: 132.66470003128052
   Throughput produced: 6098.937  msg/s --- 47.648 Mbit/s --- Latency: mean: 0 ms - med: 0 - 95pct: 0 - 99pct: 0 - 99.9pct: 0 - 99.99pct: 0 - Max: 0
   Throughput produced: 8248.756  msg/s --- 64.443 Mbit/s --- Latency: mean: 66.27010804321729 ms - med: 65 - 95pct: 115 - 99pct: 119 - 99.9pct: 121 - 99.99pct: 121 - Max: 121.02969998121262
   Throughput produced: 6061.877  msg/s --- 47.358 Mbit/s --- Latency: mean: 85.33713692946058 ms - med: 82 - 95pct: 158 - 99pct: 163 - 99.9pct: 164 - 99.99pct: 164 - Max: 164.81310003995895
   Throughput produced: 4276.394  msg/s --- 33.409 Mbit/s --- Latency: mean: 143.57420675537358 ms - med: 156 - 95pct: 227 - 99pct: 232 - 99.9pct: 233 - 99.99pct: 233 - Max: 233.78890001773834
   Throughput produced: 4568.025  msg/s --- 35.688 Mbit/s --- Latency: mean: 133.0496957403651 ms - med: 139 - 95pct: 213 - 99pct: 217 - 99.9pct: 218 - 99.99pct: 218 - Max: 218.86809998750687
   
   Program received signal SIGSEGV, Segmentation fault.
   [Switching to Thread 0x7f1468f00700 (LWP 2172)]
   0x0000000000bbe3d0 in v8::Exception::Error(v8::Local<v8::String>) ()
   Missing separate debuginfos, use: debuginfo-install apache-pulsar-client-2.9.1-1.x86_64 glibc-2.17-307.el7.1.x86_64 libgcc-4.8.5-44.el7.x86_64 libstdc++-4.8.5-44.el7.x86_64
   (gdb) backtrace
   #0  0x0000000000bbe3d0 in v8::Exception::Error(v8::Local<v8::String>) ()
   #1  0x00000000009debc2 in napi_create_error ()
   #2  0x00007f1469d8e34a in Napi::Error::New (env=0x5f0c760) at /pulsar-client-node/node_modules/node-addon-api/napi-inl.h:2570
   #3  0x00007f1469da2e71 in Napi::Reference<Napi::Object>::Unref (this=0x5de1b38) at /pulsar-client-node/node_modules/node-addon-api/napi-inl.h:2944
   #4  0x00007f1469da73e4 in Producer::__lambda32::operator() (__closure=0x0, result=pulsar_result_Ok, ctx=0x5f1ce40) at ../src/Producer.cc:165
   #5  0x00007f1469da747e in Producer::__lambda32::_FUN (result=pulsar_result_Ok, ctx=0x5f1ce40) at ../src/Producer.cc:166
   #6  0x00007f14694cef77 in pulsar::ProducerImpl::ackReceived(unsigned long, pulsar::MessageId&) () from /lib/libpulsar.so.2.9.1
   #7  0x00007f1469423bde in pulsar::ClientConnection::handleIncomingCommand() () from /lib/libpulsar.so.2.9.1
   #8  0x00007f1469426a6a in pulsar::ClientConnection::processIncomingBuffer() () from /lib/libpulsar.so.2.9.1
   #9  0x00007f1469427808 in pulsar::ClientConnection::handleRead(boost::system::error_code const&, unsigned long, unsigned int) () from /lib/libpulsar.so.2.9.1
   #10 0x00007f1469436a90 in boost::asio::detail::reactive_socket_recv_op<boost::asio::mutable_buffers_1, AllocHandler<std::_Bind<std::_Mem_fn<void (pulsar::ClientConnection::*)(boost::system::error_code const&, unsigned long, unsigned int)> (std::shared_ptr<pulsar::ClientConnection>, std::_Placeholder<1>, std::_Placeholder<2>, unsigned int)> > >::do_complete(boost::asio::detail::task_io_service*, boost::asio::detail::task_io_service_operation*, boost::system::error_code const&, unsigned long) () from /lib/libpulsar.so.2.9.1
   #11 0x00007f146942c671 in boost::asio::detail::task_io_service::run(boost::system::error_code&) () from /lib/libpulsar.so.2.9.1
   #12 0x00007f146947f596 in pulsar::ExecutorService::startWorker(std::shared_ptr<boost::asio::io_service>) () from /lib/libpulsar.so.2.9.1
   #13 0x00007f1469481a42 in std::thread::_Impl<std::_Bind_simple<std::_Bind<std::_Mem_fn<void (pulsar::ExecutorService::*)(std::shared_ptr<boost::asio::io_service>)> (pulsar::ExecutorService*, std::shared_ptr<boost::asio::io_service>)> ()> >::_M_run() () from /lib/libpulsar.so.2.9.1
   #14 0x00007f14715a5330 in ?? () from /lib64/libstdc++.so.6
   #15 0x00007f1470dc3ea5 in start_thread () from /lib64/libpthread.so.0
   #16 0x00007f1470aec8dd in clone () from /lib64/libc.so.6
   ```
   
   This issue was fixed in the https://github.com/apache/pulsar-client-node/pull/200/commits/94cb0508921f9436b90e3bd25767bd162788f043 commit.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org