You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2019/08/06 04:35:39 UTC
[pulsar-client-node] 24/45: impl reader
This is an automated email from the ASF dual-hosted git repository.
massakam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
commit 618729cc5a3ec00b7d2cb5c6a8e12dd142be203a
Author: yfuruta <yf...@yahoo-corp.jp>
AuthorDate: Tue Jun 11 13:47:12 2019 +0900
impl reader
---
binding.gyp | 2 +
src/Client.h => examples/reader.js | 38 ++++----
src/Client.cc | 14 ++-
src/Client.h | 1 +
src/Reader.cc | 185 +++++++++++++++++++++++++++++++++++++
src/{Client.h => Reader.h} | 22 +++--
src/ReaderConfig.cc | 70 ++++++++++++++
src/{Client.h => ReaderConfig.h} | 27 +++---
src/addon.cc | 2 +
tests/end_to_end.test.js | 61 +++++++++++-
tests/reader.test.js | 72 +++++++++++++++
11 files changed, 445 insertions(+), 49 deletions(-)
diff --git a/binding.gyp b/binding.gyp
index cfcf8b7..4b7b2f4 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -36,6 +36,8 @@
"src/ProducerConfig.cc",
"src/Consumer.cc",
"src/ConsumerConfig.cc",
+ "src/Reader.cc",
+ "src/ReaderConfig.cc",
],
"libraries": ["-lpulsar"],
}
diff --git a/src/Client.h b/examples/reader.js
similarity index 59%
copy from src/Client.h
copy to examples/reader.js
index 63c66f2..5665a35 100644
--- a/src/Client.h
+++ b/examples/reader.js
@@ -17,25 +17,27 @@
* under the License.
*/
-#ifndef CLIENT_H
-#define CLIENT_H
+const Pulsar = require('../index.js');
-#include <napi.h>
-#include <pulsar/c/client.h>
+(async () => {
+ // Create a client
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ operationTimeoutSeconds: 30,
+ });
-class Client : public Napi::ObjectWrap<Client> {
- public:
- static Napi::Object Init(Napi::Env env, Napi::Object exports);
- Client(const Napi::CallbackInfo &info);
- ~Client();
+ // Create a reader
+ const reader = await client.createReader({
+ topic: 'persistent://public/default/my-topic',
+ startMessageId: Pulsar.MessageId.earliest(),
+ });
- private:
- static Napi::FunctionReference constructor;
- pulsar_client_t *cClient;
+ // read messages
+ for (let i = 0; i < 10; i += 1) {
+ const msg = await reader.readNext();
+ console.log(msg.getData().toString());
+ }
- Napi::Value CreateProducer(const Napi::CallbackInfo &info);
- Napi::Value Subscribe(const Napi::CallbackInfo &info);
- Napi::Value Close(const Napi::CallbackInfo &info);
-};
-
-#endif
+ await reader.close();
+ await client.close();
+})();
diff --git a/src/Client.cc b/src/Client.cc
index 123fc0f..a0c5bad 100644
--- a/src/Client.cc
+++ b/src/Client.cc
@@ -20,6 +20,7 @@
#include "Client.h"
#include "Consumer.h"
#include "Producer.h"
+#include "Reader.h"
#include "Authentication.h"
#include <pulsar/c/client.h>
#include <pulsar/c/client_configuration.h>
@@ -43,10 +44,11 @@ Napi::FunctionReference Client::constructor;
Napi::Object Client::Init(Napi::Env env, Napi::Object exports) {
Napi::HandleScope scope(env);
- Napi::Function func =
- DefineClass(env, "Client",
- {InstanceMethod("createProducer", &Client::CreateProducer),
- InstanceMethod("subscribe", &Client::Subscribe), InstanceMethod("close", &Client::Close)});
+ Napi::Function func = DefineClass(
+ env, "Client",
+ {InstanceMethod("createProducer", &Client::CreateProducer),
+ InstanceMethod("subscribe", &Client::Subscribe), InstanceMethod("createReader", &Client::CreateReader),
+ InstanceMethod("close", &Client::Close)});
constructor = Napi::Persistent(func);
constructor.SuppressDestruct();
@@ -151,6 +153,10 @@ Napi::Value Client::Subscribe(const Napi::CallbackInfo &info) {
return Consumer::NewInstance(info, this->cClient);
}
+Napi::Value Client::CreateReader(const Napi::CallbackInfo &info) {
+ return Reader::NewInstance(info, this->cClient);
+}
+
class ClientCloseWorker : public Napi::AsyncWorker {
public:
ClientCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient)
diff --git a/src/Client.h b/src/Client.h
index 63c66f2..0def389 100644
--- a/src/Client.h
+++ b/src/Client.h
@@ -35,6 +35,7 @@ class Client : public Napi::ObjectWrap<Client> {
Napi::Value CreateProducer(const Napi::CallbackInfo &info);
Napi::Value Subscribe(const Napi::CallbackInfo &info);
+ Napi::Value CreateReader(const Napi::CallbackInfo &info);
Napi::Value Close(const Napi::CallbackInfo &info);
};
diff --git a/src/Reader.cc b/src/Reader.cc
new file mode 100644
index 0000000..8c2b859
--- /dev/null
+++ b/src/Reader.cc
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "Message.h"
+#include "Reader.h"
+#include "ReaderConfig.h"
+#include <pulsar/c/result.h>
+#include <pulsar/c/reader.h>
+
+Napi::FunctionReference Reader::constructor;
+
+void Reader::Init(Napi::Env env, Napi::Object exports) {
+ Napi::HandleScope scope(env);
+
+ Napi::Function func = DefineClass(env, "Reader",
+ {
+ InstanceMethod("readNext", &Reader::ReadNext),
+ InstanceMethod("hasNext", &Reader::HasNext),
+ InstanceMethod("close", &Reader::Close),
+ });
+
+ constructor = Napi::Persistent(func);
+ constructor.SuppressDestruct();
+}
+
+void Reader::SetCReader(pulsar_reader_t *cReader) { this->cReader = cReader; }
+
+Reader::Reader(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Reader>(info) {}
+
+class ReaderNewInstanceWorker : public Napi::AsyncWorker {
+ public:
+ ReaderNewInstanceWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient,
+ ReaderConfig *readerConfig)
+ : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
+ deferred(deferred),
+ cClient(cClient),
+ readerConfig(readerConfig) {}
+ ~ReaderNewInstanceWorker() {}
+ void Execute() {
+ const std::string &topic = this->readerConfig->GetTopic();
+ if (topic.empty()) {
+ SetError(std::string("Topic is required and must be specified as a string when creating reader"));
+ return;
+ }
+ if (this->readerConfig->GetCStartMessageId() == nullptr) {
+ SetError(std::string(
+ "StartMessageId is required and must be specified as a MessageId object when creating reader"));
+ return;
+ }
+
+ pulsar_result result =
+ pulsar_client_create_reader(this->cClient, topic.c_str(), this->readerConfig->GetCStartMessageId(),
+ this->readerConfig->GetCReaderConfig(), &(this->cReader));
+ delete this->readerConfig;
+ if (result != pulsar_result_Ok) {
+ SetError(std::string("Failed to create reader: ") + pulsar_result_str(result));
+ return;
+ }
+ }
+ void OnOK() {
+ Napi::Object obj = Reader::constructor.New({});
+ Reader *reader = Reader::Unwrap(obj);
+ reader->SetCReader(this->cReader);
+ this->deferred.Resolve(obj);
+ }
+ void OnError(const Napi::Error &e) { this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value()); }
+
+ private:
+ Napi::Promise::Deferred deferred;
+ pulsar_client_t *cClient;
+ ReaderConfig *readerConfig;
+ pulsar_reader_t *cReader;
+};
+
+Napi::Value Reader::NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient) {
+ Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
+ Napi::Object config = info[0].As<Napi::Object>();
+ ReaderConfig *readerConfig = new ReaderConfig(config);
+ ReaderNewInstanceWorker *wk = new ReaderNewInstanceWorker(deferred, cClient, readerConfig);
+ wk->Queue();
+ return deferred.Promise();
+}
+
+class ReaderReadNextWorker : public Napi::AsyncWorker {
+ public:
+ ReaderReadNextWorker(const Napi::Promise::Deferred &deferred, pulsar_reader_t *cReader,
+ int64_t timeout = -1)
+ : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
+ deferred(deferred),
+ cReader(cReader),
+ timeout(timeout) {}
+ ~ReaderReadNextWorker() {}
+ void Execute() {
+ pulsar_result result;
+ if (timeout > 0) {
+ result = pulsar_reader_read_next_with_timeout(this->cReader, &(this->cMessage), timeout);
+ } else {
+ result = pulsar_reader_read_next(this->cReader, &(this->cMessage));
+ }
+ if (result != pulsar_result_Ok) {
+ SetError(std::string("Failed to received message ") + pulsar_result_str(result));
+ }
+ }
+ void OnOK() {
+ Napi::Object obj = Message::NewInstance({}, this->cMessage);
+ this->deferred.Resolve(obj);
+ }
+ void OnError(const Napi::Error &e) { this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value()); }
+
+ private:
+ Napi::Promise::Deferred deferred;
+ pulsar_reader_t *cReader;
+ pulsar_message_t *cMessage;
+ int64_t timeout;
+};
+
+Napi::Value Reader::ReadNext(const Napi::CallbackInfo &info) {
+ Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
+ if (info[0].IsUndefined()) {
+ ReaderReadNextWorker *wk = new ReaderReadNextWorker(deferred, this->cReader);
+ wk->Queue();
+ } else {
+ Napi::Number timeout = info[0].As<Napi::Object>().ToNumber();
+ ReaderReadNextWorker *wk = new ReaderReadNextWorker(deferred, this->cReader, timeout.Int64Value());
+ wk->Queue();
+ }
+ return deferred.Promise();
+}
+
+Napi::Value Reader::HasNext(const Napi::CallbackInfo &info) {
+ int value = 0;
+ pulsar_result result = pulsar_reader_has_message_available(this->cReader, &value);
+ if (result != pulsar_result_Ok || value != 1) {
+ return Napi::Boolean::New(info.Env(), false);
+ } else {
+ return Napi::Boolean::New(info.Env(), true);
+ }
+}
+
+class ReaderCloseWorker : public Napi::AsyncWorker {
+ public:
+ ReaderCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_reader_t *cReader)
+ : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
+ deferred(deferred),
+ cReader(cReader) {}
+ ~ReaderCloseWorker() {}
+ void Execute() {
+ pulsar_result result = pulsar_reader_close(this->cReader);
+ if (result != pulsar_result_Ok) SetError(pulsar_result_str(result));
+ }
+ void OnOK() { this->deferred.Resolve(Env().Null()); }
+ void OnError(const Napi::Error &e) {
+ this->deferred.Reject(
+ Napi::Error::New(Env(), std::string("Failed to close reader: ") + e.Message()).Value());
+ }
+
+ private:
+ Napi::Promise::Deferred deferred;
+ pulsar_reader_t *cReader;
+};
+
+Napi::Value Reader::Close(const Napi::CallbackInfo &info) {
+ Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
+ ReaderCloseWorker *wk = new ReaderCloseWorker(deferred, this->cReader);
+ wk->Queue();
+ return deferred.Promise();
+}
+
+Reader::~Reader() { pulsar_reader_free(this->cReader); }
diff --git a/src/Client.h b/src/Reader.h
similarity index 67%
copy from src/Client.h
copy to src/Reader.h
index 63c66f2..787b732 100644
--- a/src/Client.h
+++ b/src/Reader.h
@@ -17,24 +17,26 @@
* under the License.
*/
-#ifndef CLIENT_H
-#define CLIENT_H
+#ifndef READER_H
+#define READER_H
#include <napi.h>
#include <pulsar/c/client.h>
-class Client : public Napi::ObjectWrap<Client> {
+class Reader : public Napi::ObjectWrap<Reader> {
public:
- static Napi::Object Init(Napi::Env env, Napi::Object exports);
- Client(const Napi::CallbackInfo &info);
- ~Client();
+ static void Init(Napi::Env env, Napi::Object exports);
+ static Napi::Value NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient);
+ static Napi::FunctionReference constructor;
+ Reader(const Napi::CallbackInfo &info);
+ ~Reader();
+ void SetCReader(pulsar_reader_t *cReader);
private:
- static Napi::FunctionReference constructor;
- pulsar_client_t *cClient;
+ pulsar_reader_t *cReader;
- Napi::Value CreateProducer(const Napi::CallbackInfo &info);
- Napi::Value Subscribe(const Napi::CallbackInfo &info);
+ Napi::Value ReadNext(const Napi::CallbackInfo &info);
+ Napi::Value HasNext(const Napi::CallbackInfo &info);
Napi::Value Close(const Napi::CallbackInfo &info);
};
diff --git a/src/ReaderConfig.cc b/src/ReaderConfig.cc
new file mode 100644
index 0000000..7e15807
--- /dev/null
+++ b/src/ReaderConfig.cc
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "ReaderConfig.h"
+#include "MessageId.h"
+#include <map>
+
+static const std::string CFG_TOPIC = "topic";
+static const std::string CFG_START_MESSAGE_ID = "startMessageId";
+static const std::string CFG_RECV_QUEUE = "receiverQueueSize";
+static const std::string CFG_READER_NAME = "readerName";
+static const std::string CFG_SUBSCRIPTION_ROLE_PREFIX = "subscriptionRolePrefix";
+
+ReaderConfig::ReaderConfig(const Napi::Object &readerConfig) : topic(""), cStartMessageId(NULL) {
+ this->cReaderConfig = pulsar_reader_configuration_create();
+
+ if (readerConfig.Has(CFG_TOPIC) && readerConfig.Get(CFG_TOPIC).IsString()) {
+ this->topic = readerConfig.Get(CFG_TOPIC).ToString().Utf8Value();
+ }
+ if (readerConfig.Has(CFG_START_MESSAGE_ID) && readerConfig.Get(CFG_START_MESSAGE_ID).IsObject()) {
+ Napi::Object objMessageId = readerConfig.Get(CFG_START_MESSAGE_ID).ToObject();
+ MessageId *msgId = MessageId::Unwrap(objMessageId);
+ this->cStartMessageId = msgId->GetCMessageId();
+ }
+
+ if (readerConfig.Has(CFG_RECV_QUEUE) && readerConfig.Get(CFG_RECV_QUEUE).IsNumber()) {
+ int32_t receiverQueueSize = readerConfig.Get(CFG_RECV_QUEUE).ToNumber().Int32Value();
+ if (receiverQueueSize >= 0) {
+ pulsar_reader_configuration_set_receiver_queue_size(this->cReaderConfig, receiverQueueSize);
+ }
+ }
+
+ if (readerConfig.Has(CFG_READER_NAME) && readerConfig.Get(CFG_READER_NAME).IsString()) {
+ std::string readerName = readerConfig.Get(CFG_READER_NAME).ToString().Utf8Value();
+ if (!readerName.empty())
+ pulsar_reader_configuration_set_reader_name(this->cReaderConfig, readerName.c_str());
+ }
+
+ if (readerConfig.Has(CFG_SUBSCRIPTION_ROLE_PREFIX) &&
+ readerConfig.Get(CFG_SUBSCRIPTION_ROLE_PREFIX).IsString()) {
+ std::string subscriptionRolePrefix =
+ readerConfig.Get(CFG_SUBSCRIPTION_ROLE_PREFIX).ToString().Utf8Value();
+ if (!subscriptionRolePrefix.empty())
+ pulsar_reader_configuration_set_reader_name(this->cReaderConfig, subscriptionRolePrefix.c_str());
+ }
+}
+
+ReaderConfig::~ReaderConfig() { pulsar_reader_configuration_free(this->cReaderConfig); }
+
+pulsar_reader_configuration_t *ReaderConfig::GetCReaderConfig() { return this->cReaderConfig; }
+
+std::string ReaderConfig::GetTopic() { return this->topic; }
+
+pulsar_message_id_t *ReaderConfig::GetCStartMessageId() { return this->cStartMessageId; }
diff --git a/src/Client.h b/src/ReaderConfig.h
similarity index 64%
copy from src/Client.h
copy to src/ReaderConfig.h
index 63c66f2..1983459 100644
--- a/src/Client.h
+++ b/src/ReaderConfig.h
@@ -17,25 +17,26 @@
* under the License.
*/
-#ifndef CLIENT_H
-#define CLIENT_H
+#ifndef READER_CONFIG_H
+#define READER_CONFIG_H
#include <napi.h>
-#include <pulsar/c/client.h>
+#include <pulsar/c/reader.h>
+#include <pulsar/c/reader_configuration.h>
+#include <pulsar/c/message_id.h>
-class Client : public Napi::ObjectWrap<Client> {
+class ReaderConfig {
public:
- static Napi::Object Init(Napi::Env env, Napi::Object exports);
- Client(const Napi::CallbackInfo &info);
- ~Client();
+ ReaderConfig(const Napi::Object &readerConfig);
+ ~ReaderConfig();
+ pulsar_reader_configuration_t *GetCReaderConfig();
+ pulsar_message_id_t *GetCStartMessageId();
+ std::string GetTopic();
private:
- static Napi::FunctionReference constructor;
- pulsar_client_t *cClient;
-
- Napi::Value CreateProducer(const Napi::CallbackInfo &info);
- Napi::Value Subscribe(const Napi::CallbackInfo &info);
- Napi::Value Close(const Napi::CallbackInfo &info);
+ pulsar_reader_configuration_t *cReaderConfig;
+ pulsar_message_id_t *cStartMessageId;
+ std::string topic;
};
#endif
diff --git a/src/addon.cc b/src/addon.cc
index 050ae12..fa26ae0 100644
--- a/src/addon.cc
+++ b/src/addon.cc
@@ -23,6 +23,7 @@
#include "Producer.h"
#include "Consumer.h"
#include "Client.h"
+#include "Reader.h"
#include <napi.h>
Napi::Object InitAll(Napi::Env env, Napi::Object exports) {
@@ -31,6 +32,7 @@ Napi::Object InitAll(Napi::Env env, Napi::Object exports) {
Authentication::Init(env, exports);
Producer::Init(env, exports);
Consumer::Init(env, exports);
+ Reader::Init(env, exports);
return Client::Init(env, exports);
}
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index ae24d86..9f15cb7 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -23,11 +23,12 @@ const Pulsar = require('../index.js');
(() => {
describe('End To End', () => {
- const client = new Pulsar.Client({
- serviceUrl: 'pulsar://localhost:6650',
- operationTimeoutSeconds: 30,
- });
test('Produce/Consume', async () => {
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ operationTimeoutSeconds: 30,
+ });
+
const producer = await client.createProducer({
topic: 'persistent://public/default/test-end-to-end',
sendTimeoutMs: 30000,
@@ -63,9 +64,15 @@ const Pulsar = require('../index.js');
await producer.close();
await consumer.close();
+ await client.close();
});
test('acknowledgeCumulative', async () => {
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ operationTimeoutSeconds: 30,
+ });
+
const producer = await client.createProducer({
topic: 'persistent://public/default/acknowledgeCumulative',
sendTimeoutMs: 30000,
@@ -103,5 +110,51 @@ const Pulsar = require('../index.js');
await consumer.close();
await client.close();
});
+
+ test('Produce/Read', async () => {
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ operationTimeoutSeconds: 30,
+ });
+ expect(client).not.toBeNull();
+
+ const producer = await client.createProducer({
+ topic: 'persistent://public/default/test-end-to-end',
+ sendTimeoutMs: 30000,
+ batchingEnabled: true,
+ });
+ expect(producer).not.toBeNull();
+
+ const reader = await client.createReader({
+ topic: 'persistent://public/default/test-end-to-end',
+ startMessageId: Pulsar.MessageId.latest(),
+ });
+ expect(reader).not.toBeNull();
+
+ const messages = [];
+ for (let i = 0; i < 10; i += 1) {
+ const msg = `my-message-${i}`;
+ producer.send({
+ data: Buffer.from(msg),
+ });
+ messages.push(msg);
+ }
+ await producer.flush();
+
+ expect(reader.hasNext()).toBe(true);
+
+ const results = [];
+ for (let i = 0; i < 10; i += 1) {
+ const msg = await reader.readNext();
+ results.push(msg.getData().toString());
+ }
+ expect(lodash.difference(messages, results)).toEqual([]);
+
+ expect(reader.hasNext()).toBe(false);
+
+ await producer.close();
+ await reader.close();
+ await client.close();
+ });
});
})();
diff --git a/tests/reader.test.js b/tests/reader.test.js
new file mode 100644
index 0000000..9c87212
--- /dev/null
+++ b/tests/reader.test.js
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+const Pulsar = require('../index.js');
+
+(() => {
+ describe('Reader', () => {
+ describe('Create', () => {
+ test('No Topic', async () => {
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ operationTimeoutSeconds: 30,
+ });
+ await expect(client.createReader({
+ startMessageId: Pulsar.MessageId.earliest(),
+ })).rejects.toThrow('Topic is required and must be specified as a string when creating reader');
+ await client.close();
+ });
+
+ test('No Topic', async () => {
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ operationTimeoutSeconds: 30,
+ });
+ await expect(client.createReader({
+ topic: 0,
+ startMessageId: Pulsar.MessageId.earliest(),
+ })).rejects.toThrow('Topic is required and must be specified as a string when creating reader');
+ await client.close();
+ });
+
+ test('No StartMessageId', async () => {
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ operationTimeoutSeconds: 30,
+ });
+ await expect(client.createReader({
+ topic: 'persistent://public/default/topic',
+ })).rejects.toThrow('StartMessageId is required and must be specified as a MessageId object when creating reader');
+ await client.close();
+ });
+
+ test('Not StartMessageId as MessageId', async () => {
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ operationTimeoutSeconds: 30,
+ });
+ await expect(client.createReader({
+ topic: 'persistent://public/default/topic',
+ startMessageId: 'not MessageId',
+ })).rejects.toThrow('StartMessageId is required and must be specified as a MessageId object when creating reader');
+ await client.close();
+ });
+ });
+ });
+})();