You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2019/06/10 10:05:01 UTC
[pulsar-client-node] 08/23: Support Producer.flush()
This is an automated email from the ASF dual-hosted git repository.
massakam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
commit e0a82d9a0e894b8b699492be7f185a3c4d025c67
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Thu Mar 14 16:16:17 2019 +0900
Support Producer.flush()
---
examples/producer.js | 7 +++----
package-lock.json | 2 +-
perf/perf_producer.js | 7 +++----
src/Producer.cc | 39 +++++++++++++++++++++++++++++++++++++--
src/Producer.h | 1 +
5 files changed, 45 insertions(+), 11 deletions(-)
diff --git a/examples/producer.js b/examples/producer.js
index 8730719..2dda9cc 100644
--- a/examples/producer.js
+++ b/examples/producer.js
@@ -34,15 +34,14 @@ const Pulsar = require('../index.js');
});
// Send messages
- const results = [];
for (let i = 0; i < 10; i += 1) {
const msg = `my-message-${i}`;
- results.push(producer.send({
+ producer.send({
data: Buffer.from(msg),
- }));
+ });
console.log(`Sent message: ${msg}`);
}
- await Promise.all(results);
+ await producer.flush();
await producer.close();
await client.close();
diff --git a/package-lock.json b/package-lock.json
index 25d495e..0adfd94 100644
--- a/package-lock.json
+++ b/package-lock.json
@@ -1,6 +1,6 @@
{
"name": "pulsar-client",
- "version": "0.0.1",
+ "version": "2.4.0-SNAPSHOT",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
diff --git a/perf/perf_producer.js b/perf/perf_producer.js
index 19f251c..fbe12f1 100644
--- a/perf/perf_producer.js
+++ b/perf/perf_producer.js
@@ -85,17 +85,16 @@ const Pulsar = require('../index.js');
// measure
await delay(1000);
const startMeasureTimeMilliSeconds = performance.now();
- const results = [];
for (let mi = 0; mi < numOfMessages; mi += 1) {
const startSendTimeMilliSeconds = performance.now();
- results.push(producer.send({
+ producer.send({
data: message,
}).then(() => {
// add latency
histogram.recordValue((performance.now() - startSendTimeMilliSeconds));
- }));
+ });
}
- await Promise.all(results);
+ await producer.flush();
const endMeasureTimeMilliSeconds = performance.now();
// result
diff --git a/src/Producer.cc b/src/Producer.cc
index a19d828..041695e 100644
--- a/src/Producer.cc
+++ b/src/Producer.cc
@@ -27,8 +27,10 @@ Napi::FunctionReference Producer::constructor;
void Producer::Init(Napi::Env env, Napi::Object exports) {
Napi::HandleScope scope(env);
- Napi::Function func = DefineClass(
- env, "Producer", {InstanceMethod("send", &Producer::Send), InstanceMethod("close", &Producer::Close)});
+ Napi::Function func =
+ DefineClass(env, "Producer",
+ {InstanceMethod("send", &Producer::Send), InstanceMethod("flush", &Producer::Flush),
+ InstanceMethod("close", &Producer::Close)});
constructor = Napi::Persistent(func);
constructor.SuppressDestruct();
@@ -119,6 +121,39 @@ Napi::Value Producer::Send(const Napi::CallbackInfo &info) {
return deferred.Promise();
}
+class ProducerFlushWorker : public Napi::AsyncWorker {
+ public:
+ ProducerFlushWorker(const Napi::Promise::Deferred &deferred, pulsar_producer_t *cProducer)
+ : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
+ deferred(deferred),
+ cProducer(cProducer) {}
+
+ ~ProducerFlushWorker() {}
+
+ void Execute() {
+ pulsar_result result = pulsar_producer_flush(this->cProducer);
+ if (result != pulsar_result_Ok) SetError(pulsar_result_str(result));
+ }
+
+ void OnOK() { this->deferred.Resolve(Env().Null()); }
+
+ void OnError(const Napi::Error &e) {
+ this->deferred.Reject(
+ Napi::Error::New(Env(), std::string("Failed to flush producer: ") + e.Message()).Value());
+ }
+
+ private:
+ Napi::Promise::Deferred deferred;
+ pulsar_producer_t *cProducer;
+};
+
+Napi::Value Producer::Flush(const Napi::CallbackInfo &info) {
+ Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
+ ProducerFlushWorker *wk = new ProducerFlushWorker(deferred, this->cProducer);
+ wk->Queue();
+ return deferred.Promise();
+}
+
class ProducerCloseWorker : public Napi::AsyncWorker {
public:
ProducerCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_producer_t *cProducer)
diff --git a/src/Producer.h b/src/Producer.h
index 5d31cfc..ea9b112 100644
--- a/src/Producer.h
+++ b/src/Producer.h
@@ -36,6 +36,7 @@ class Producer : public Napi::ObjectWrap<Producer> {
private:
pulsar_producer_t *cProducer;
Napi::Value Send(const Napi::CallbackInfo &info);
+ Napi::Value Flush(const Napi::CallbackInfo &info);
Napi::Value Close(const Napi::CallbackInfo &info);
};