You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2020/07/03 11:22:32 UTC

[pulsar-client-node] branch master updated: Get redelivery count (#101)

This is an automated email from the ASF dual-hosted git repository.

massakam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new a47dd21  Get redelivery count (#101)
a47dd21 is described below

commit a47dd211dcdd9f48cc242dc111097d8e5bc42fac
Author: Dion Jansen <di...@wlnss.com>
AuthorDate: Fri Jul 3 13:22:23 2020 +0200

    Get redelivery count (#101)
    
    * add nack redeliver timeout declaration
    
    * implmement nack redeliver timeout
    
    * add nack to consumer test
    
    * add negative acknowledge e2e test
    
    * add negative acknowledge declarations
    
    * add negative acknowledge definitions
    
    * add redelivery count
    
    * update standalone settings
    
    * fix methods
    
    * add e2e test
    
    * add redelivery cout e2e test
    
    * reduce nack timeout
    
    * 1.1.0-rc.1
    
    * 1.1.0
    
    * Cleanup consumer
    
    * Bumped pulsar version to 2.5.0
    
    * Linting
    
    Co-authored-by: frejonb <fe...@wlnss.com>
    Co-authored-by: hrsakai <hs...@yahoo-corp.jp>
---
 pulsar-version.txt         |  2 +-
 src/Message.cc             |  9 +++++++++
 src/Message.h              |  1 +
 tests/conf/standalone.conf |  2 ++
 tests/end_to_end.test.js   | 46 ++++++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 59 insertions(+), 1 deletion(-)

diff --git a/pulsar-version.txt b/pulsar-version.txt
index 197c4d5..437459c 100755
--- a/pulsar-version.txt
+++ b/pulsar-version.txt
@@ -1 +1 @@
-2.4.0
+2.5.0
diff --git a/src/Message.cc b/src/Message.cc
index d5f755a..5f9e409 100644
--- a/src/Message.cc
+++ b/src/Message.cc
@@ -40,6 +40,7 @@ Napi::Object Message::Init(Napi::Env env, Napi::Object exports) {
        InstanceMethod("getMessageId", &Message::GetMessageId),
        InstanceMethod("getPublishTimestamp", &Message::GetPublishTimestamp),
        InstanceMethod("getEventTimestamp", &Message::GetEventTimestamp),
+       InstanceMethod("getRedeliveryCount", &Message::GetRedeliveryCount),
        InstanceMethod("getPartitionKey", &Message::GetPartitionKey)});
 
   constructor = Napi::Persistent(func);
@@ -68,6 +69,14 @@ Napi::Value Message::GetTopicName(const Napi::CallbackInfo &info) {
   return Napi::String::New(env, pulsar_message_get_topic_name(this->cMessage));
 }
 
+Napi::Value Message::GetRedeliveryCount(const Napi::CallbackInfo &info) {
+  Napi::Env env = info.Env();
+  if (!ValidateCMessage(env)) {
+    return env.Null();
+  }
+  return Napi::Number::New(env, pulsar_message_get_redelivery_count(this->cMessage));
+}
+
 Napi::Value Message::GetProperties(const Napi::CallbackInfo &info) {
   Napi::Env env = info.Env();
   if (!ValidateCMessage(env)) {
diff --git a/src/Message.h b/src/Message.h
index 42aa9aa..8d22029 100644
--- a/src/Message.h
+++ b/src/Message.h
@@ -44,6 +44,7 @@ class Message : public Napi::ObjectWrap<Message> {
   Napi::Value GetPublishTimestamp(const Napi::CallbackInfo &info);
   Napi::Value GetEventTimestamp(const Napi::CallbackInfo &info);
   Napi::Value GetPartitionKey(const Napi::CallbackInfo &info);
+  Napi::Value GetRedeliveryCount(const Napi::CallbackInfo &info);
   bool ValidateCMessage(Napi::Env env);
 
   static char **NewStringArray(int size) { return (char **)calloc(sizeof(char *), size); }
diff --git a/tests/conf/standalone.conf b/tests/conf/standalone.conf
index 18d4c42..57b7079 100755
--- a/tests/conf/standalone.conf
+++ b/tests/conf/standalone.conf
@@ -78,6 +78,8 @@ statusFilePath=/usr/local/apache/htdocs
 # Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction
 maxUnackedMessagesPerConsumer=50000
 
+subscriptionRedeliveryTrackerEnabled=true
+
 ### --- Authentication --- ###
 
 # Enable authentication
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index a0fad68..0afe864 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -116,6 +116,52 @@ const Pulsar = require('../index.js');
       await client.close();
     });
 
+    test('getRedeliveryCount', async () => {
+      const client = new Pulsar.Client({
+        serviceUrl: 'pulsar://localhost:6650',
+        operationTimeoutSeconds: 30,
+      });
+
+      const topic = 'persistent://public/default/produce-consume';
+      const producer = await client.createProducer({
+        topic,
+        sendTimeoutMs: 30000,
+        batchingEnabled: true,
+      });
+      expect(producer).not.toBeNull();
+
+      const consumer = await client.subscribe({
+        topic,
+        subscriptionType: 'Shared',
+        subscription: 'sub1',
+        ackTimeoutMs: 10000,
+        nAckRedeliverTimeoutMs: 100,
+      });
+
+      expect(consumer).not.toBeNull();
+
+      const message = 'my-message';
+      producer.send({
+        data: Buffer.from(message),
+      });
+      await producer.flush();
+
+      let redeliveryCount;
+      let msg;
+      for (let index = 0; index < 3; index += 1) {
+        msg = await consumer.receive();
+        redeliveryCount = msg.getRedeliveryCount();
+        consumer.negativeAcknowledge(msg);
+      }
+      expect(redeliveryCount).toBe(2);
+      consumer.acknowledge(msg);
+
+      await producer.close();
+      await consumer.close();
+      await client.close();
+    });
+
+
     test('Produce/Consume Listener', async () => {
       const client = new Pulsar.Client({
         serviceUrl: 'pulsar://localhost:6650',