You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2019/06/12 02:44:10 UTC
[pulsar-client-node] 22/29: Add acknowledgeCumulative /
receiveWithTimeout methods
This is an automated email from the ASF dual-hosted git repository.
massakam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
commit a945e49730ec4caf059dfb166020969dbcc69d6c
Author: hrsakai <hs...@yahoo-corp.jp>
AuthorDate: Fri Jun 7 15:21:03 2019 +0900
Add acknowledgeCumulative / receiveWithTimeout methods
---
src/Consumer.cc | 52 ++++++++++++++++++++++++++++++++++++++----------
src/Consumer.h | 3 +++
tests/end_to_end.test.js | 39 ++++++++++++++++++++++++++++++++++++
3 files changed, 84 insertions(+), 10 deletions(-)
diff --git a/src/Consumer.cc b/src/Consumer.cc
index db6fab4..454ba3c 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -28,13 +28,17 @@ Napi::FunctionReference Consumer::constructor;
void Consumer::Init(Napi::Env env, Napi::Object exports) {
Napi::HandleScope scope(env);
- Napi::Function func = DefineClass(env, "Consumer",
- {
- InstanceMethod("receive", &Consumer::Receive),
- InstanceMethod("acknowledge", &Consumer::Acknowledge),
- InstanceMethod("acknowledgeId", &Consumer::AcknowledgeId),
- InstanceMethod("close", &Consumer::Close),
- });
+ Napi::Function func =
+ DefineClass(env, "Consumer",
+ {
+ InstanceMethod("receive", &Consumer::Receive),
+ InstanceMethod("receiveWithTimeout", &Consumer::ReceiveWithTimeout),
+ InstanceMethod("acknowledge", &Consumer::Acknowledge),
+ InstanceMethod("acknowledgeId", &Consumer::AcknowledgeId),
+ InstanceMethod("acknowledgeCumulative", &Consumer::AcknowledgeCumulative),
+ InstanceMethod("acknowledgeCumulativeId", &Consumer::AcknowledgeCumulativeId),
+ InstanceMethod("close", &Consumer::Close),
+ });
constructor = Napi::Persistent(func);
constructor.SuppressDestruct();
@@ -108,13 +112,20 @@ Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, pulsar_client_
class ConsumerReceiveWorker : public Napi::AsyncWorker {
public:
- ConsumerReceiveWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer)
+ ConsumerReceiveWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer,
+ int64_t timeout = -1)
: AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
deferred(deferred),
- cConsumer(cConsumer) {}
+ cConsumer(cConsumer),
+ timeout(timeout) {}
~ConsumerReceiveWorker() {}
void Execute() {
- pulsar_result result = pulsar_consumer_receive(this->cConsumer, &(this->cMessage));
+ pulsar_result result;
+ if (timeout > 0) {
+ result = pulsar_consumer_receive_with_timeout(this->cConsumer, &(this->cMessage), timeout);
+ } else {
+ result = pulsar_consumer_receive(this->cConsumer, &(this->cMessage));
+ }
if (result != pulsar_result_Ok) {
SetError(std::string("Failed to received message ") + pulsar_result_str(result));
@@ -130,6 +141,7 @@ class ConsumerReceiveWorker : public Napi::AsyncWorker {
Napi::Promise::Deferred deferred;
pulsar_consumer_t *cConsumer;
pulsar_message_t *cMessage;
+ int64_t timeout;
};
Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) {
@@ -139,6 +151,14 @@ Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) {
return deferred.Promise();
}
+Napi::Value Consumer::ReceiveWithTimeout(const Napi::CallbackInfo &info) {
+ Napi::Number timeout = info[0].As<Napi::Object>().ToNumber();
+ Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
+ ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->cConsumer, timeout.Int64Value());
+ wk->Queue();
+ return deferred.Promise();
+}
+
void Consumer::Acknowledge(const Napi::CallbackInfo &info) {
Napi::Object obj = info[0].As<Napi::Object>();
Message *msg = Message::Unwrap(obj);
@@ -151,6 +171,18 @@ void Consumer::AcknowledgeId(const Napi::CallbackInfo &info) {
pulsar_consumer_acknowledge_async_id(this->cConsumer, msgId->GetCMessageId(), NULL, NULL);
}
+void Consumer::AcknowledgeCumulative(const Napi::CallbackInfo &info) {
+ Napi::Object obj = info[0].As<Napi::Object>();
+ Message *msg = Message::Unwrap(obj);
+ pulsar_consumer_acknowledge_cumulative_async(this->cConsumer, msg->GetCMessage(), NULL, NULL);
+}
+
+void Consumer::AcknowledgeCumulativeId(const Napi::CallbackInfo &info) {
+ Napi::Object obj = info[0].As<Napi::Object>();
+ MessageId *msgId = MessageId::Unwrap(obj);
+ pulsar_consumer_acknowledge_cumulative_async_id(this->cConsumer, msgId->GetCMessageId(), NULL, NULL);
+}
+
class ConsumerCloseWorker : public Napi::AsyncWorker {
public:
ConsumerCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer)
diff --git a/src/Consumer.h b/src/Consumer.h
index fe68124..5e8bf5b 100644
--- a/src/Consumer.h
+++ b/src/Consumer.h
@@ -36,8 +36,11 @@ class Consumer : public Napi::ObjectWrap<Consumer> {
pulsar_consumer_t *cConsumer;
Napi::Value Receive(const Napi::CallbackInfo &info);
+ Napi::Value ReceiveWithTimeout(const Napi::CallbackInfo &info);
void Acknowledge(const Napi::CallbackInfo &info);
void AcknowledgeId(const Napi::CallbackInfo &info);
+ void AcknowledgeCumulative(const Napi::CallbackInfo &info);
+ void AcknowledgeCumulativeId(const Napi::CallbackInfo &info);
Napi::Value Close(const Napi::CallbackInfo &info);
};
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index 9d5ed49..ae24d86 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -40,6 +40,7 @@ const Pulsar = require('../index.js');
subscription: 'sub1',
ackTimeoutMs: 10000,
});
+
expect(consumer).not.toBeNull();
const messages = [];
@@ -62,6 +63,44 @@ const Pulsar = require('../index.js');
await producer.close();
await consumer.close();
+ });
+
+ test('acknowledgeCumulative', async () => {
+ const producer = await client.createProducer({
+ topic: 'persistent://public/default/acknowledgeCumulative',
+ sendTimeoutMs: 30000,
+ batchingEnabled: true,
+ });
+ expect(producer).not.toBeNull();
+
+ const consumer = await client.subscribe({
+ topic: 'persistent://public/default/acknowledgeCumulative',
+ subscription: 'sub1',
+ ackTimeoutMs: 10000,
+ });
+ expect(consumer).not.toBeNull();
+
+ const messages = [];
+ for (let i = 0; i < 10; i += 1) {
+ const msg = `my-message-${i}`;
+ producer.send({
+ data: Buffer.from(msg),
+ });
+ messages.push(msg);
+ }
+ await producer.flush();
+
+ for (let i = 0; i < 10; i += 1) {
+ const msg = await consumer.receive();
+ if (i === 9) {
+ consumer.acknowledgeCumulative(msg);
+ }
+ }
+
+ await expect(consumer.receiveWithTimeout(1000)).rejects.toThrow('Failed to received message TimeOut');
+
+ await producer.close();
+ await consumer.close();
await client.close();
});
});