You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by nk...@apache.org on 2019/11/27 02:39:23 UTC
[pulsar-client-node] branch master updated: Add negative
acknowledgement support for consumer (#59)
This is an automated email from the ASF dual-hosted git repository.
nkurihar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push:
new ae91394 Add negative acknowledgement support for consumer (#59)
ae91394 is described below
commit ae91394e89501aeb4edba71cb67c0c22301eb4a6
Author: Fernando Rejon Barrera <39...@users.noreply.github.com>
AuthorDate: Wed Nov 27 03:39:15 2019 +0100
Add negative acknowledgement support for consumer (#59)
* add nack redeliver timeout declaration
* implmement nack redeliver timeout
* add nack to consumer test
* add negative acknowledge e2e test
* add negative acknowledge declarations
* add negative acknowledge definitions
---
src/Consumer.cc | 20 ++++++++++++++++++++
src/Consumer.h | 2 ++
src/ConsumerConfig.cc | 11 ++++++++++-
src/ConsumerConfig.h | 2 ++
tests/consumer.test.js | 15 +++++++++++++++
tests/end_to_end.test.js | 47 +++++++++++++++++++++++++++++++++++++++++++++++
6 files changed, 96 insertions(+), 1 deletion(-)
diff --git a/src/Consumer.cc b/src/Consumer.cc
index 12bba8e..4726402 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -34,6 +34,8 @@ void Consumer::Init(Napi::Env env, Napi::Object exports) {
InstanceMethod("receive", &Consumer::Receive),
InstanceMethod("acknowledge", &Consumer::Acknowledge),
InstanceMethod("acknowledgeId", &Consumer::AcknowledgeId),
+ InstanceMethod("negativeAcknowledge", &Consumer::NegativeAcknowledge),
+ InstanceMethod("negativeAcknowledgeId", &Consumer::NegativeAcknowledgeId),
InstanceMethod("acknowledgeCumulative", &Consumer::AcknowledgeCumulative),
InstanceMethod("acknowledgeCumulativeId", &Consumer::AcknowledgeCumulativeId),
InstanceMethod("close", &Consumer::Close),
@@ -75,6 +77,12 @@ class ConsumerNewInstanceWorker : public Napi::AsyncWorker {
SetError(msg);
return;
}
+ int32_t nAckRedeliverTimeoutMs = this->consumerConfig->GetNAckRedeliverTimeoutMs();
+ if (nAckRedeliverTimeoutMs < 0) {
+ std::string msg("NAck timeout should be greater than or equal to zero");
+ SetError(msg);
+ return;
+ }
pulsar_result result =
pulsar_client_subscribe(this->cClient, topic.c_str(), subscription.c_str(),
@@ -168,6 +176,18 @@ void Consumer::AcknowledgeId(const Napi::CallbackInfo &info) {
pulsar_consumer_acknowledge_async_id(this->cConsumer, msgId->GetCMessageId(), NULL, NULL);
}
+void Consumer::NegativeAcknowledge(const Napi::CallbackInfo &info) {
+ Napi::Object obj = info[0].As<Napi::Object>();
+ Message *msg = Message::Unwrap(obj);
+ pulsar_consumer_negative_acknowledge(this->cConsumer, msg->GetCMessage());
+}
+
+void Consumer::NegativeAcknowledgeId(const Napi::CallbackInfo &info) {
+ Napi::Object obj = info[0].As<Napi::Object>();
+ MessageId *msgId = MessageId::Unwrap(obj);
+ pulsar_consumer_negative_acknowledge_id(this->cConsumer, msgId->GetCMessageId());
+}
+
void Consumer::AcknowledgeCumulative(const Napi::CallbackInfo &info) {
Napi::Object obj = info[0].As<Napi::Object>();
Message *msg = Message::Unwrap(obj);
diff --git a/src/Consumer.h b/src/Consumer.h
index 7aa41d9..c6373d0 100644
--- a/src/Consumer.h
+++ b/src/Consumer.h
@@ -38,6 +38,8 @@ class Consumer : public Napi::ObjectWrap<Consumer> {
Napi::Value Receive(const Napi::CallbackInfo &info);
void Acknowledge(const Napi::CallbackInfo &info);
void AcknowledgeId(const Napi::CallbackInfo &info);
+ void NegativeAcknowledge(const Napi::CallbackInfo &info);
+ void NegativeAcknowledgeId(const Napi::CallbackInfo &info);
void AcknowledgeCumulative(const Napi::CallbackInfo &info);
void AcknowledgeCumulativeId(const Napi::CallbackInfo &info);
Napi::Value Close(const Napi::CallbackInfo &info);
diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc
index a98bb69..b07bef2 100644
--- a/src/ConsumerConfig.cc
+++ b/src/ConsumerConfig.cc
@@ -25,6 +25,7 @@ static const std::string CFG_TOPIC = "topic";
static const std::string CFG_SUBSCRIPTION = "subscription";
static const std::string CFG_SUBSCRIPTION_TYPE = "subscriptionType";
static const std::string CFG_ACK_TIMEOUT = "ackTimeoutMs";
+static const std::string CFG_NACK_REDELIVER_TIMEOUT = "nAckRedeliverTimeoutMs";
static const std::string CFG_RECV_QUEUE = "receiverQueueSize";
static const std::string CFG_RECV_QUEUE_ACROSS_PARTITIONS = "receiverQueueSizeAcrossPartitions";
static const std::string CFG_CONSUMER_NAME = "consumerName";
@@ -36,7 +37,7 @@ static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
{"Failover", pulsar_ConsumerFailover}};
ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig)
- : topic(""), subscription(""), ackTimeoutMs(0) {
+ : topic(""), subscription(""), ackTimeoutMs(0), nAckRedeliverTimeoutMs(60000) {
this->cConsumerConfig = pulsar_consumer_configuration_create();
if (consumerConfig.Has(CFG_TOPIC) && consumerConfig.Get(CFG_TOPIC).IsString()) {
@@ -67,6 +68,13 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig)
}
}
+ if (consumerConfig.Has(CFG_NACK_REDELIVER_TIMEOUT) && consumerConfig.Get(CFG_NACK_REDELIVER_TIMEOUT).IsNumber()) {
+ this->nAckRedeliverTimeoutMs = consumerConfig.Get(CFG_NACK_REDELIVER_TIMEOUT).ToNumber().Int64Value();
+ if (this->nAckRedeliverTimeoutMs >= 0) {
+ pulsar_configure_set_negative_ack_redelivery_delay_ms(this->cConsumerConfig, this->nAckRedeliverTimeoutMs);
+ }
+ }
+
if (consumerConfig.Has(CFG_RECV_QUEUE) && consumerConfig.Get(CFG_RECV_QUEUE).IsNumber()) {
int32_t receiverQueueSize = consumerConfig.Get(CFG_RECV_QUEUE).ToNumber().Int32Value();
if (receiverQueueSize >= 0) {
@@ -103,3 +111,4 @@ pulsar_consumer_configuration_t *ConsumerConfig::GetCConsumerConfig() { return t
std::string ConsumerConfig::GetTopic() { return this->topic; }
std::string ConsumerConfig::GetSubscription() { return this->subscription; }
int64_t ConsumerConfig::GetAckTimeoutMs() { return this->ackTimeoutMs; }
+int64_t ConsumerConfig::GetNAckRedeliverTimeoutMs() { return this->nAckRedeliverTimeoutMs; }
diff --git a/src/ConsumerConfig.h b/src/ConsumerConfig.h
index 7070bf9..f5dcce8 100644
--- a/src/ConsumerConfig.h
+++ b/src/ConsumerConfig.h
@@ -33,12 +33,14 @@ class ConsumerConfig {
std::string GetTopic();
std::string GetSubscription();
int64_t GetAckTimeoutMs();
+ int64_t GetNAckRedeliverTimeoutMs();
private:
pulsar_consumer_configuration_t *cConsumerConfig;
std::string topic;
std::string subscription;
int64_t ackTimeoutMs;
+ int64_t nAckRedeliverTimeoutMs;
};
#endif
diff --git a/tests/consumer.test.js b/tests/consumer.test.js
index c008cac..65bd792 100644
--- a/tests/consumer.test.js
+++ b/tests/consumer.test.js
@@ -30,6 +30,7 @@ const Pulsar = require('../index.js');
await expect(client.subscribe({
subscription: 'sub1',
ackTimeoutMs: 10000,
+ nAckRedeliverTimeoutMs: 60000,
})).rejects.toThrow('Topic is required and must be specified as a string when creating consumer');
});
@@ -38,6 +39,7 @@ const Pulsar = require('../index.js');
topic: 0,
subscription: 'sub1',
ackTimeoutMs: 10000,
+ nAckRedeliverTimeoutMs: 60000,
})).rejects.toThrow('Topic is required and must be specified as a string when creating consumer');
});
@@ -45,6 +47,7 @@ const Pulsar = require('../index.js');
await expect(client.subscribe({
topic: 'persistent://public/default/t1',
ackTimeoutMs: 10000,
+ nAckRedeliverTimeoutMs: 60000,
})).rejects.toThrow('Subscription is required and must be specified as a string when creating consumer');
});
@@ -53,6 +56,7 @@ const Pulsar = require('../index.js');
topic: 'persistent://public/default/t1',
subscription: 0,
ackTimeoutMs: 10000,
+ nAckRedeliverTimeoutMs: 60000,
})).rejects.toThrow('Subscription is required and must be specified as a string when creating consumer');
});
@@ -61,6 +65,7 @@ const Pulsar = require('../index.js');
topic: 'persistent://no-tenant/namespace/topic',
subscription: 'sub1',
ackTimeoutMs: 10000,
+ nAckRedeliverTimeoutMs: 60000,
})).rejects.toThrow('Failed to create consumer: ConnectError');
});
@@ -69,8 +74,18 @@ const Pulsar = require('../index.js');
topic: 'persistent://public/no-namespace/topic',
subscription: 'sub1',
ackTimeoutMs: 10000,
+ nAckRedeliverTimeoutMs: 60000,
})).rejects.toThrow('Failed to create consumer: ConnectError');
});
+
+ test('Not Positive NAckRedeliverTimeout', async () => {
+ await expect(client.subscribe({
+ topic: 'persistent://public/default/t1',
+ subscription: 'sub1',
+ ackTimeoutMs: 10000,
+ nAckRedeliverTimeoutMs: -12,
+ })).rejects.toThrow('NAck timeout should be greater than or equal to zero');
+ });
});
});
})();
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index 477e57f..0a8ce6f 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -67,6 +67,53 @@ const Pulsar = require('../index.js');
await client.close();
});
+ test('negativeAcknowledge', async () => {
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ operationTimeoutSeconds: 30,
+ });
+
+ const topic = 'persistent://public/default/produce-consume';
+ const producer = await client.createProducer({
+ topic,
+ sendTimeoutMs: 30000,
+ batchingEnabled: true,
+ });
+ expect(producer).not.toBeNull();
+
+ const consumer = await client.subscribe({
+ topic,
+ subscription: 'sub1',
+ ackTimeoutMs: 10000,
+ nAckRedeliverTimeoutMs: 1000,
+ });
+
+ expect(consumer).not.toBeNull();
+
+ const message = 'my-message';
+ producer.send({
+ data: Buffer.from(message),
+ });
+ await producer.flush();
+
+ const results = [];
+ const msg = await consumer.receive();
+ results.push(msg.getData().toString());
+ consumer.negativeAcknowledge(msg);
+
+ const msg2 = await consumer.receive();
+ results.push(msg2.getData().toString());
+ consumer.acknowledge(msg2);
+
+ await expect(consumer.receive(1000)).rejects.toThrow('Failed to received message TimeOut');
+
+ expect(results).toEqual([message, message]);
+
+ await producer.close();
+ await consumer.close();
+ await client.close();
+ });
+
test('acknowledgeCumulative', async () => {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',