You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2019/06/10 10:05:15 UTC

[pulsar-client-node] 22/23: Add acknowledgeCumulative / receiveWithTimeout methods

This is an automated email from the ASF dual-hosted git repository.

massakam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git

commit a945e49730ec4caf059dfb166020969dbcc69d6c
Author: hrsakai <hs...@yahoo-corp.jp>
AuthorDate: Fri Jun 7 15:21:03 2019 +0900

    Add acknowledgeCumulative / receiveWithTimeout methods
---
 src/Consumer.cc          | 52 ++++++++++++++++++++++++++++++++++++++----------
 src/Consumer.h           |  3 +++
 tests/end_to_end.test.js | 39 ++++++++++++++++++++++++++++++++++++
 3 files changed, 84 insertions(+), 10 deletions(-)

diff --git a/src/Consumer.cc b/src/Consumer.cc
index db6fab4..454ba3c 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -28,13 +28,17 @@ Napi::FunctionReference Consumer::constructor;
 void Consumer::Init(Napi::Env env, Napi::Object exports) {
   Napi::HandleScope scope(env);
 
-  Napi::Function func = DefineClass(env, "Consumer",
-                                    {
-                                        InstanceMethod("receive", &Consumer::Receive),
-                                        InstanceMethod("acknowledge", &Consumer::Acknowledge),
-                                        InstanceMethod("acknowledgeId", &Consumer::AcknowledgeId),
-                                        InstanceMethod("close", &Consumer::Close),
-                                    });
+  Napi::Function func =
+      DefineClass(env, "Consumer",
+                  {
+                      InstanceMethod("receive", &Consumer::Receive),
+                      InstanceMethod("receiveWithTimeout", &Consumer::ReceiveWithTimeout),
+                      InstanceMethod("acknowledge", &Consumer::Acknowledge),
+                      InstanceMethod("acknowledgeId", &Consumer::AcknowledgeId),
+                      InstanceMethod("acknowledgeCumulative", &Consumer::AcknowledgeCumulative),
+                      InstanceMethod("acknowledgeCumulativeId", &Consumer::AcknowledgeCumulativeId),
+                      InstanceMethod("close", &Consumer::Close),
+                  });
 
   constructor = Napi::Persistent(func);
   constructor.SuppressDestruct();
@@ -108,13 +112,20 @@ Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, pulsar_client_
 
 class ConsumerReceiveWorker : public Napi::AsyncWorker {
  public:
-  ConsumerReceiveWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer)
+  ConsumerReceiveWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer,
+                        int64_t timeout = -1)
       : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
         deferred(deferred),
-        cConsumer(cConsumer) {}
+        cConsumer(cConsumer),
+        timeout(timeout) {}
   ~ConsumerReceiveWorker() {}
   void Execute() {
-    pulsar_result result = pulsar_consumer_receive(this->cConsumer, &(this->cMessage));
+    pulsar_result result;
+    if (timeout > 0) {
+      result = pulsar_consumer_receive_with_timeout(this->cConsumer, &(this->cMessage), timeout);
+    } else {
+      result = pulsar_consumer_receive(this->cConsumer, &(this->cMessage));
+    }
 
     if (result != pulsar_result_Ok) {
       SetError(std::string("Failed to received message ") + pulsar_result_str(result));
@@ -130,6 +141,7 @@ class ConsumerReceiveWorker : public Napi::AsyncWorker {
   Napi::Promise::Deferred deferred;
   pulsar_consumer_t *cConsumer;
   pulsar_message_t *cMessage;
+  int64_t timeout;
 };
 
 Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) {
@@ -139,6 +151,14 @@ Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) {
   return deferred.Promise();
 }
 
+Napi::Value Consumer::ReceiveWithTimeout(const Napi::CallbackInfo &info) {
+  Napi::Number timeout = info[0].As<Napi::Object>().ToNumber();
+  Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
+  ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->cConsumer, timeout.Int64Value());
+  wk->Queue();
+  return deferred.Promise();
+}
+
 void Consumer::Acknowledge(const Napi::CallbackInfo &info) {
   Napi::Object obj = info[0].As<Napi::Object>();
   Message *msg = Message::Unwrap(obj);
@@ -151,6 +171,18 @@ void Consumer::AcknowledgeId(const Napi::CallbackInfo &info) {
   pulsar_consumer_acknowledge_async_id(this->cConsumer, msgId->GetCMessageId(), NULL, NULL);
 }
 
+void Consumer::AcknowledgeCumulative(const Napi::CallbackInfo &info) {
+  Napi::Object obj = info[0].As<Napi::Object>();
+  Message *msg = Message::Unwrap(obj);
+  pulsar_consumer_acknowledge_cumulative_async(this->cConsumer, msg->GetCMessage(), NULL, NULL);
+}
+
+void Consumer::AcknowledgeCumulativeId(const Napi::CallbackInfo &info) {
+  Napi::Object obj = info[0].As<Napi::Object>();
+  MessageId *msgId = MessageId::Unwrap(obj);
+  pulsar_consumer_acknowledge_cumulative_async_id(this->cConsumer, msgId->GetCMessageId(), NULL, NULL);
+}
+
 class ConsumerCloseWorker : public Napi::AsyncWorker {
  public:
   ConsumerCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer)
diff --git a/src/Consumer.h b/src/Consumer.h
index fe68124..5e8bf5b 100644
--- a/src/Consumer.h
+++ b/src/Consumer.h
@@ -36,8 +36,11 @@ class Consumer : public Napi::ObjectWrap<Consumer> {
   pulsar_consumer_t *cConsumer;
 
   Napi::Value Receive(const Napi::CallbackInfo &info);
+  Napi::Value ReceiveWithTimeout(const Napi::CallbackInfo &info);
   void Acknowledge(const Napi::CallbackInfo &info);
   void AcknowledgeId(const Napi::CallbackInfo &info);
+  void AcknowledgeCumulative(const Napi::CallbackInfo &info);
+  void AcknowledgeCumulativeId(const Napi::CallbackInfo &info);
   Napi::Value Close(const Napi::CallbackInfo &info);
 };
 
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index 9d5ed49..ae24d86 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -40,6 +40,7 @@ const Pulsar = require('../index.js');
         subscription: 'sub1',
         ackTimeoutMs: 10000,
       });
+
       expect(consumer).not.toBeNull();
 
       const messages = [];
@@ -62,6 +63,44 @@ const Pulsar = require('../index.js');
 
       await producer.close();
       await consumer.close();
+    });
+
+    test('acknowledgeCumulative', async () => {
+      const producer = await client.createProducer({
+        topic: 'persistent://public/default/acknowledgeCumulative',
+        sendTimeoutMs: 30000,
+        batchingEnabled: true,
+      });
+      expect(producer).not.toBeNull();
+
+      const consumer = await client.subscribe({
+        topic: 'persistent://public/default/acknowledgeCumulative',
+        subscription: 'sub1',
+        ackTimeoutMs: 10000,
+      });
+      expect(consumer).not.toBeNull();
+
+      const messages = [];
+      for (let i = 0; i < 10; i += 1) {
+        const msg = `my-message-${i}`;
+        producer.send({
+          data: Buffer.from(msg),
+        });
+        messages.push(msg);
+      }
+      await producer.flush();
+
+      for (let i = 0; i < 10; i += 1) {
+        const msg = await consumer.receive();
+        if (i === 9) {
+          consumer.acknowledgeCumulative(msg);
+        }
+      }
+
+      await expect(consumer.receiveWithTimeout(1000)).rejects.toThrow('Failed to received message TimeOut');
+
+      await producer.close();
+      await consumer.close();
       await client.close();
     });
   });