You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by nk...@apache.org on 2019/04/23 03:59:47 UTC

[pulsar-client-node] 08/20: Support Producer.flush()

This is an automated email from the ASF dual-hosted git repository.

nkurihar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git

commit e0a82d9a0e894b8b699492be7f185a3c4d025c67
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Thu Mar 14 16:16:17 2019 +0900

    Support Producer.flush()
---
 examples/producer.js  |  7 +++----
 package-lock.json     |  2 +-
 perf/perf_producer.js |  7 +++----
 src/Producer.cc       | 39 +++++++++++++++++++++++++++++++++++++--
 src/Producer.h        |  1 +
 5 files changed, 45 insertions(+), 11 deletions(-)

diff --git a/examples/producer.js b/examples/producer.js
index 8730719..2dda9cc 100644
--- a/examples/producer.js
+++ b/examples/producer.js
@@ -34,15 +34,14 @@ const Pulsar = require('../index.js');
   });
 
   // Send messages
-  const results = [];
   for (let i = 0; i < 10; i += 1) {
     const msg = `my-message-${i}`;
-    results.push(producer.send({
+    producer.send({
       data: Buffer.from(msg),
-    }));
+    });
     console.log(`Sent message: ${msg}`);
   }
-  await Promise.all(results);
+  await producer.flush();
 
   await producer.close();
   await client.close();
diff --git a/package-lock.json b/package-lock.json
index 25d495e..0adfd94 100644
--- a/package-lock.json
+++ b/package-lock.json
@@ -1,6 +1,6 @@
 {
   "name": "pulsar-client",
-  "version": "0.0.1",
+  "version": "2.4.0-SNAPSHOT",
   "lockfileVersion": 1,
   "requires": true,
   "dependencies": {
diff --git a/perf/perf_producer.js b/perf/perf_producer.js
index 19f251c..fbe12f1 100644
--- a/perf/perf_producer.js
+++ b/perf/perf_producer.js
@@ -85,17 +85,16 @@ const Pulsar = require('../index.js');
     // measure
     await delay(1000);
     const startMeasureTimeMilliSeconds = performance.now();
-    const results = [];
     for (let mi = 0; mi < numOfMessages; mi += 1) {
       const startSendTimeMilliSeconds = performance.now();
-      results.push(producer.send({
+      producer.send({
         data: message,
       }).then(() => {
         // add latency
         histogram.recordValue((performance.now() - startSendTimeMilliSeconds));
-      }));
+      });
     }
-    await Promise.all(results);
+    await producer.flush();
     const endMeasureTimeMilliSeconds = performance.now();
 
     // result
diff --git a/src/Producer.cc b/src/Producer.cc
index a19d828..041695e 100644
--- a/src/Producer.cc
+++ b/src/Producer.cc
@@ -27,8 +27,10 @@ Napi::FunctionReference Producer::constructor;
 void Producer::Init(Napi::Env env, Napi::Object exports) {
   Napi::HandleScope scope(env);
 
-  Napi::Function func = DefineClass(
-      env, "Producer", {InstanceMethod("send", &Producer::Send), InstanceMethod("close", &Producer::Close)});
+  Napi::Function func =
+      DefineClass(env, "Producer",
+                  {InstanceMethod("send", &Producer::Send), InstanceMethod("flush", &Producer::Flush),
+                   InstanceMethod("close", &Producer::Close)});
 
   constructor = Napi::Persistent(func);
   constructor.SuppressDestruct();
@@ -119,6 +121,39 @@ Napi::Value Producer::Send(const Napi::CallbackInfo &info) {
   return deferred.Promise();
 }
 
+class ProducerFlushWorker : public Napi::AsyncWorker {
+ public:
+  ProducerFlushWorker(const Napi::Promise::Deferred &deferred, pulsar_producer_t *cProducer)
+      : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
+        deferred(deferred),
+        cProducer(cProducer) {}
+
+  ~ProducerFlushWorker() {}
+
+  void Execute() {
+    pulsar_result result = pulsar_producer_flush(this->cProducer);
+    if (result != pulsar_result_Ok) SetError(pulsar_result_str(result));
+  }
+
+  void OnOK() { this->deferred.Resolve(Env().Null()); }
+
+  void OnError(const Napi::Error &e) {
+    this->deferred.Reject(
+        Napi::Error::New(Env(), std::string("Failed to flush producer: ") + e.Message()).Value());
+  }
+
+ private:
+  Napi::Promise::Deferred deferred;
+  pulsar_producer_t *cProducer;
+};
+
+Napi::Value Producer::Flush(const Napi::CallbackInfo &info) {
+  Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
+  ProducerFlushWorker *wk = new ProducerFlushWorker(deferred, this->cProducer);
+  wk->Queue();
+  return deferred.Promise();
+}
+
 class ProducerCloseWorker : public Napi::AsyncWorker {
  public:
   ProducerCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_producer_t *cProducer)
diff --git a/src/Producer.h b/src/Producer.h
index 5d31cfc..ea9b112 100644
--- a/src/Producer.h
+++ b/src/Producer.h
@@ -36,6 +36,7 @@ class Producer : public Napi::ObjectWrap<Producer> {
  private:
   pulsar_producer_t *cProducer;
   Napi::Value Send(const Napi::CallbackInfo &info);
+  Napi::Value Flush(const Napi::CallbackInfo &info);
   Napi::Value Close(const Napi::CallbackInfo &info);
 };