You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2020/07/03 11:22:32 UTC
[pulsar-client-node] branch master updated: Get redelivery count
(#101)
This is an automated email from the ASF dual-hosted git repository.
massakam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push:
new a47dd21 Get redelivery count (#101)
a47dd21 is described below
commit a47dd211dcdd9f48cc242dc111097d8e5bc42fac
Author: Dion Jansen <di...@wlnss.com>
AuthorDate: Fri Jul 3 13:22:23 2020 +0200
Get redelivery count (#101)
* add nack redeliver timeout declaration
* implmement nack redeliver timeout
* add nack to consumer test
* add negative acknowledge e2e test
* add negative acknowledge declarations
* add negative acknowledge definitions
* add redelivery count
* update standalone settings
* fix methods
* add e2e test
* add redelivery cout e2e test
* reduce nack timeout
* 1.1.0-rc.1
* 1.1.0
* Cleanup consumer
* Bumped pulsar version to 2.5.0
* Linting
Co-authored-by: frejonb <fe...@wlnss.com>
Co-authored-by: hrsakai <hs...@yahoo-corp.jp>
---
pulsar-version.txt | 2 +-
src/Message.cc | 9 +++++++++
src/Message.h | 1 +
tests/conf/standalone.conf | 2 ++
tests/end_to_end.test.js | 46 ++++++++++++++++++++++++++++++++++++++++++++++
5 files changed, 59 insertions(+), 1 deletion(-)
diff --git a/pulsar-version.txt b/pulsar-version.txt
index 197c4d5..437459c 100755
--- a/pulsar-version.txt
+++ b/pulsar-version.txt
@@ -1 +1 @@
-2.4.0
+2.5.0
diff --git a/src/Message.cc b/src/Message.cc
index d5f755a..5f9e409 100644
--- a/src/Message.cc
+++ b/src/Message.cc
@@ -40,6 +40,7 @@ Napi::Object Message::Init(Napi::Env env, Napi::Object exports) {
InstanceMethod("getMessageId", &Message::GetMessageId),
InstanceMethod("getPublishTimestamp", &Message::GetPublishTimestamp),
InstanceMethod("getEventTimestamp", &Message::GetEventTimestamp),
+ InstanceMethod("getRedeliveryCount", &Message::GetRedeliveryCount),
InstanceMethod("getPartitionKey", &Message::GetPartitionKey)});
constructor = Napi::Persistent(func);
@@ -68,6 +69,14 @@ Napi::Value Message::GetTopicName(const Napi::CallbackInfo &info) {
return Napi::String::New(env, pulsar_message_get_topic_name(this->cMessage));
}
+Napi::Value Message::GetRedeliveryCount(const Napi::CallbackInfo &info) {
+ Napi::Env env = info.Env();
+ if (!ValidateCMessage(env)) {
+ return env.Null();
+ }
+ return Napi::Number::New(env, pulsar_message_get_redelivery_count(this->cMessage));
+}
+
Napi::Value Message::GetProperties(const Napi::CallbackInfo &info) {
Napi::Env env = info.Env();
if (!ValidateCMessage(env)) {
diff --git a/src/Message.h b/src/Message.h
index 42aa9aa..8d22029 100644
--- a/src/Message.h
+++ b/src/Message.h
@@ -44,6 +44,7 @@ class Message : public Napi::ObjectWrap<Message> {
Napi::Value GetPublishTimestamp(const Napi::CallbackInfo &info);
Napi::Value GetEventTimestamp(const Napi::CallbackInfo &info);
Napi::Value GetPartitionKey(const Napi::CallbackInfo &info);
+ Napi::Value GetRedeliveryCount(const Napi::CallbackInfo &info);
bool ValidateCMessage(Napi::Env env);
static char **NewStringArray(int size) { return (char **)calloc(sizeof(char *), size); }
diff --git a/tests/conf/standalone.conf b/tests/conf/standalone.conf
index 18d4c42..57b7079 100755
--- a/tests/conf/standalone.conf
+++ b/tests/conf/standalone.conf
@@ -78,6 +78,8 @@ statusFilePath=/usr/local/apache/htdocs
# Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction
maxUnackedMessagesPerConsumer=50000
+subscriptionRedeliveryTrackerEnabled=true
+
### --- Authentication --- ###
# Enable authentication
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index a0fad68..0afe864 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -116,6 +116,52 @@ const Pulsar = require('../index.js');
await client.close();
});
+ test('getRedeliveryCount', async () => {
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ operationTimeoutSeconds: 30,
+ });
+
+ const topic = 'persistent://public/default/produce-consume';
+ const producer = await client.createProducer({
+ topic,
+ sendTimeoutMs: 30000,
+ batchingEnabled: true,
+ });
+ expect(producer).not.toBeNull();
+
+ const consumer = await client.subscribe({
+ topic,
+ subscriptionType: 'Shared',
+ subscription: 'sub1',
+ ackTimeoutMs: 10000,
+ nAckRedeliverTimeoutMs: 100,
+ });
+
+ expect(consumer).not.toBeNull();
+
+ const message = 'my-message';
+ producer.send({
+ data: Buffer.from(message),
+ });
+ await producer.flush();
+
+ let redeliveryCount;
+ let msg;
+ for (let index = 0; index < 3; index += 1) {
+ msg = await consumer.receive();
+ redeliveryCount = msg.getRedeliveryCount();
+ consumer.negativeAcknowledge(msg);
+ }
+ expect(redeliveryCount).toBe(2);
+ consumer.acknowledge(msg);
+
+ await producer.close();
+ await consumer.close();
+ await client.close();
+ });
+
+
test('Produce/Consume Listener', async () => {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',