You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2019/08/06 04:35:39 UTC

[pulsar-client-node] 24/45: impl reader

This is an automated email from the ASF dual-hosted git repository.

massakam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git

commit 618729cc5a3ec00b7d2cb5c6a8e12dd142be203a
Author: yfuruta <yf...@yahoo-corp.jp>
AuthorDate: Tue Jun 11 13:47:12 2019 +0900

    impl reader
---
 binding.gyp                        |   2 +
 src/Client.h => examples/reader.js |  38 ++++----
 src/Client.cc                      |  14 ++-
 src/Client.h                       |   1 +
 src/Reader.cc                      | 185 +++++++++++++++++++++++++++++++++++++
 src/{Client.h => Reader.h}         |  22 +++--
 src/ReaderConfig.cc                |  70 ++++++++++++++
 src/{Client.h => ReaderConfig.h}   |  27 +++---
 src/addon.cc                       |   2 +
 tests/end_to_end.test.js           |  61 +++++++++++-
 tests/reader.test.js               |  72 +++++++++++++++
 11 files changed, 445 insertions(+), 49 deletions(-)

diff --git a/binding.gyp b/binding.gyp
index cfcf8b7..4b7b2f4 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -36,6 +36,8 @@
         "src/ProducerConfig.cc",
         "src/Consumer.cc",
         "src/ConsumerConfig.cc",
+        "src/Reader.cc",
+        "src/ReaderConfig.cc",
       ],
       "libraries": ["-lpulsar"],
     }
diff --git a/src/Client.h b/examples/reader.js
similarity index 59%
copy from src/Client.h
copy to examples/reader.js
index 63c66f2..5665a35 100644
--- a/src/Client.h
+++ b/examples/reader.js
@@ -17,25 +17,27 @@
  * under the License.
  */
 
-#ifndef CLIENT_H
-#define CLIENT_H
+const Pulsar = require('../index.js');
 
-#include <napi.h>
-#include <pulsar/c/client.h>
+(async () => {
+  // Create a client
+  const client = new Pulsar.Client({
+    serviceUrl: 'pulsar://localhost:6650',
+    operationTimeoutSeconds: 30,
+  });
 
-class Client : public Napi::ObjectWrap<Client> {
- public:
-  static Napi::Object Init(Napi::Env env, Napi::Object exports);
-  Client(const Napi::CallbackInfo &info);
-  ~Client();
+  // Create a reader
+  const reader = await client.createReader({
+    topic: 'persistent://public/default/my-topic',
+    startMessageId: Pulsar.MessageId.earliest(),
+  });
 
- private:
-  static Napi::FunctionReference constructor;
-  pulsar_client_t *cClient;
+  // read messages
+  for (let i = 0; i < 10; i += 1) {
+    const msg = await reader.readNext();
+    console.log(msg.getData().toString());
+  }
 
-  Napi::Value CreateProducer(const Napi::CallbackInfo &info);
-  Napi::Value Subscribe(const Napi::CallbackInfo &info);
-  Napi::Value Close(const Napi::CallbackInfo &info);
-};
-
-#endif
+  await reader.close();
+  await client.close();
+})();
diff --git a/src/Client.cc b/src/Client.cc
index 123fc0f..a0c5bad 100644
--- a/src/Client.cc
+++ b/src/Client.cc
@@ -20,6 +20,7 @@
 #include "Client.h"
 #include "Consumer.h"
 #include "Producer.h"
+#include "Reader.h"
 #include "Authentication.h"
 #include <pulsar/c/client.h>
 #include <pulsar/c/client_configuration.h>
@@ -43,10 +44,11 @@ Napi::FunctionReference Client::constructor;
 Napi::Object Client::Init(Napi::Env env, Napi::Object exports) {
   Napi::HandleScope scope(env);
 
-  Napi::Function func =
-      DefineClass(env, "Client",
-                  {InstanceMethod("createProducer", &Client::CreateProducer),
-                   InstanceMethod("subscribe", &Client::Subscribe), InstanceMethod("close", &Client::Close)});
+  Napi::Function func = DefineClass(
+      env, "Client",
+      {InstanceMethod("createProducer", &Client::CreateProducer),
+       InstanceMethod("subscribe", &Client::Subscribe), InstanceMethod("createReader", &Client::CreateReader),
+       InstanceMethod("close", &Client::Close)});
 
   constructor = Napi::Persistent(func);
   constructor.SuppressDestruct();
@@ -151,6 +153,10 @@ Napi::Value Client::Subscribe(const Napi::CallbackInfo &info) {
   return Consumer::NewInstance(info, this->cClient);
 }
 
+Napi::Value Client::CreateReader(const Napi::CallbackInfo &info) {
+  return Reader::NewInstance(info, this->cClient);
+}
+
 class ClientCloseWorker : public Napi::AsyncWorker {
  public:
   ClientCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient)
diff --git a/src/Client.h b/src/Client.h
index 63c66f2..0def389 100644
--- a/src/Client.h
+++ b/src/Client.h
@@ -35,6 +35,7 @@ class Client : public Napi::ObjectWrap<Client> {
 
   Napi::Value CreateProducer(const Napi::CallbackInfo &info);
   Napi::Value Subscribe(const Napi::CallbackInfo &info);
+  Napi::Value CreateReader(const Napi::CallbackInfo &info);
   Napi::Value Close(const Napi::CallbackInfo &info);
 };
 
diff --git a/src/Reader.cc b/src/Reader.cc
new file mode 100644
index 0000000..8c2b859
--- /dev/null
+++ b/src/Reader.cc
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "Message.h"
+#include "Reader.h"
+#include "ReaderConfig.h"
+#include <pulsar/c/result.h>
+#include <pulsar/c/reader.h>
+
+Napi::FunctionReference Reader::constructor;
+
+void Reader::Init(Napi::Env env, Napi::Object exports) {
+  Napi::HandleScope scope(env);
+
+  Napi::Function func = DefineClass(env, "Reader",
+                                    {
+                                        InstanceMethod("readNext", &Reader::ReadNext),
+                                        InstanceMethod("hasNext", &Reader::HasNext),
+                                        InstanceMethod("close", &Reader::Close),
+                                    });
+
+  constructor = Napi::Persistent(func);
+  constructor.SuppressDestruct();
+}
+
+void Reader::SetCReader(pulsar_reader_t *cReader) { this->cReader = cReader; }
+
+Reader::Reader(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Reader>(info) {}
+
+class ReaderNewInstanceWorker : public Napi::AsyncWorker {
+ public:
+  ReaderNewInstanceWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient,
+                          ReaderConfig *readerConfig)
+      : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
+        deferred(deferred),
+        cClient(cClient),
+        readerConfig(readerConfig) {}
+  ~ReaderNewInstanceWorker() {}
+  void Execute() {
+    const std::string &topic = this->readerConfig->GetTopic();
+    if (topic.empty()) {
+      SetError(std::string("Topic is required and must be specified as a string when creating reader"));
+      return;
+    }
+    if (this->readerConfig->GetCStartMessageId() == nullptr) {
+      SetError(std::string(
+          "StartMessageId is required and must be specified as a MessageId object when creating reader"));
+      return;
+    }
+
+    pulsar_result result =
+        pulsar_client_create_reader(this->cClient, topic.c_str(), this->readerConfig->GetCStartMessageId(),
+                                    this->readerConfig->GetCReaderConfig(), &(this->cReader));
+    delete this->readerConfig;
+    if (result != pulsar_result_Ok) {
+      SetError(std::string("Failed to create reader: ") + pulsar_result_str(result));
+      return;
+    }
+  }
+  void OnOK() {
+    Napi::Object obj = Reader::constructor.New({});
+    Reader *reader = Reader::Unwrap(obj);
+    reader->SetCReader(this->cReader);
+    this->deferred.Resolve(obj);
+  }
+  void OnError(const Napi::Error &e) { this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value()); }
+
+ private:
+  Napi::Promise::Deferred deferred;
+  pulsar_client_t *cClient;
+  ReaderConfig *readerConfig;
+  pulsar_reader_t *cReader;
+};
+
+Napi::Value Reader::NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient) {
+  Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
+  Napi::Object config = info[0].As<Napi::Object>();
+  ReaderConfig *readerConfig = new ReaderConfig(config);
+  ReaderNewInstanceWorker *wk = new ReaderNewInstanceWorker(deferred, cClient, readerConfig);
+  wk->Queue();
+  return deferred.Promise();
+}
+
+class ReaderReadNextWorker : public Napi::AsyncWorker {
+ public:
+  ReaderReadNextWorker(const Napi::Promise::Deferred &deferred, pulsar_reader_t *cReader,
+                       int64_t timeout = -1)
+      : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
+        deferred(deferred),
+        cReader(cReader),
+        timeout(timeout) {}
+  ~ReaderReadNextWorker() {}
+  void Execute() {
+    pulsar_result result;
+    if (timeout > 0) {
+      result = pulsar_reader_read_next_with_timeout(this->cReader, &(this->cMessage), timeout);
+    } else {
+      result = pulsar_reader_read_next(this->cReader, &(this->cMessage));
+    }
+    if (result != pulsar_result_Ok) {
+      SetError(std::string("Failed to received message ") + pulsar_result_str(result));
+    }
+  }
+  void OnOK() {
+    Napi::Object obj = Message::NewInstance({}, this->cMessage);
+    this->deferred.Resolve(obj);
+  }
+  void OnError(const Napi::Error &e) { this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value()); }
+
+ private:
+  Napi::Promise::Deferred deferred;
+  pulsar_reader_t *cReader;
+  pulsar_message_t *cMessage;
+  int64_t timeout;
+};
+
+Napi::Value Reader::ReadNext(const Napi::CallbackInfo &info) {
+  Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
+  if (info[0].IsUndefined()) {
+    ReaderReadNextWorker *wk = new ReaderReadNextWorker(deferred, this->cReader);
+    wk->Queue();
+  } else {
+    Napi::Number timeout = info[0].As<Napi::Object>().ToNumber();
+    ReaderReadNextWorker *wk = new ReaderReadNextWorker(deferred, this->cReader, timeout.Int64Value());
+    wk->Queue();
+  }
+  return deferred.Promise();
+}
+
+Napi::Value Reader::HasNext(const Napi::CallbackInfo &info) {
+  int value = 0;
+  pulsar_result result = pulsar_reader_has_message_available(this->cReader, &value);
+  if (result != pulsar_result_Ok || value != 1) {
+    return Napi::Boolean::New(info.Env(), false);
+  } else {
+    return Napi::Boolean::New(info.Env(), true);
+  }
+}
+
+class ReaderCloseWorker : public Napi::AsyncWorker {
+ public:
+  ReaderCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_reader_t *cReader)
+      : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
+        deferred(deferred),
+        cReader(cReader) {}
+  ~ReaderCloseWorker() {}
+  void Execute() {
+    pulsar_result result = pulsar_reader_close(this->cReader);
+    if (result != pulsar_result_Ok) SetError(pulsar_result_str(result));
+  }
+  void OnOK() { this->deferred.Resolve(Env().Null()); }
+  void OnError(const Napi::Error &e) {
+    this->deferred.Reject(
+        Napi::Error::New(Env(), std::string("Failed to close reader: ") + e.Message()).Value());
+  }
+
+ private:
+  Napi::Promise::Deferred deferred;
+  pulsar_reader_t *cReader;
+};
+
+Napi::Value Reader::Close(const Napi::CallbackInfo &info) {
+  Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
+  ReaderCloseWorker *wk = new ReaderCloseWorker(deferred, this->cReader);
+  wk->Queue();
+  return deferred.Promise();
+}
+
+Reader::~Reader() { pulsar_reader_free(this->cReader); }
diff --git a/src/Client.h b/src/Reader.h
similarity index 67%
copy from src/Client.h
copy to src/Reader.h
index 63c66f2..787b732 100644
--- a/src/Client.h
+++ b/src/Reader.h
@@ -17,24 +17,26 @@
  * under the License.
  */
 
-#ifndef CLIENT_H
-#define CLIENT_H
+#ifndef READER_H
+#define READER_H
 
 #include <napi.h>
 #include <pulsar/c/client.h>
 
-class Client : public Napi::ObjectWrap<Client> {
+class Reader : public Napi::ObjectWrap<Reader> {
  public:
-  static Napi::Object Init(Napi::Env env, Napi::Object exports);
-  Client(const Napi::CallbackInfo &info);
-  ~Client();
+  static void Init(Napi::Env env, Napi::Object exports);
+  static Napi::Value NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient);
+  static Napi::FunctionReference constructor;
+  Reader(const Napi::CallbackInfo &info);
+  ~Reader();
+  void SetCReader(pulsar_reader_t *cReader);
 
  private:
-  static Napi::FunctionReference constructor;
-  pulsar_client_t *cClient;
+  pulsar_reader_t *cReader;
 
-  Napi::Value CreateProducer(const Napi::CallbackInfo &info);
-  Napi::Value Subscribe(const Napi::CallbackInfo &info);
+  Napi::Value ReadNext(const Napi::CallbackInfo &info);
+  Napi::Value HasNext(const Napi::CallbackInfo &info);
   Napi::Value Close(const Napi::CallbackInfo &info);
 };
 
diff --git a/src/ReaderConfig.cc b/src/ReaderConfig.cc
new file mode 100644
index 0000000..7e15807
--- /dev/null
+++ b/src/ReaderConfig.cc
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "ReaderConfig.h"
+#include "MessageId.h"
+#include <map>
+
+static const std::string CFG_TOPIC = "topic";
+static const std::string CFG_START_MESSAGE_ID = "startMessageId";
+static const std::string CFG_RECV_QUEUE = "receiverQueueSize";
+static const std::string CFG_READER_NAME = "readerName";
+static const std::string CFG_SUBSCRIPTION_ROLE_PREFIX = "subscriptionRolePrefix";
+
+ReaderConfig::ReaderConfig(const Napi::Object &readerConfig) : topic(""), cStartMessageId(NULL) {
+  this->cReaderConfig = pulsar_reader_configuration_create();
+
+  if (readerConfig.Has(CFG_TOPIC) && readerConfig.Get(CFG_TOPIC).IsString()) {
+    this->topic = readerConfig.Get(CFG_TOPIC).ToString().Utf8Value();
+  }
+  if (readerConfig.Has(CFG_START_MESSAGE_ID) && readerConfig.Get(CFG_START_MESSAGE_ID).IsObject()) {
+    Napi::Object objMessageId = readerConfig.Get(CFG_START_MESSAGE_ID).ToObject();
+    MessageId *msgId = MessageId::Unwrap(objMessageId);
+    this->cStartMessageId = msgId->GetCMessageId();
+  }
+
+  if (readerConfig.Has(CFG_RECV_QUEUE) && readerConfig.Get(CFG_RECV_QUEUE).IsNumber()) {
+    int32_t receiverQueueSize = readerConfig.Get(CFG_RECV_QUEUE).ToNumber().Int32Value();
+    if (receiverQueueSize >= 0) {
+      pulsar_reader_configuration_set_receiver_queue_size(this->cReaderConfig, receiverQueueSize);
+    }
+  }
+
+  if (readerConfig.Has(CFG_READER_NAME) && readerConfig.Get(CFG_READER_NAME).IsString()) {
+    std::string readerName = readerConfig.Get(CFG_READER_NAME).ToString().Utf8Value();
+    if (!readerName.empty())
+      pulsar_reader_configuration_set_reader_name(this->cReaderConfig, readerName.c_str());
+  }
+
+  if (readerConfig.Has(CFG_SUBSCRIPTION_ROLE_PREFIX) &&
+      readerConfig.Get(CFG_SUBSCRIPTION_ROLE_PREFIX).IsString()) {
+    std::string subscriptionRolePrefix =
+        readerConfig.Get(CFG_SUBSCRIPTION_ROLE_PREFIX).ToString().Utf8Value();
+    if (!subscriptionRolePrefix.empty())
+      pulsar_reader_configuration_set_reader_name(this->cReaderConfig, subscriptionRolePrefix.c_str());
+  }
+}
+
+ReaderConfig::~ReaderConfig() { pulsar_reader_configuration_free(this->cReaderConfig); }
+
+pulsar_reader_configuration_t *ReaderConfig::GetCReaderConfig() { return this->cReaderConfig; }
+
+std::string ReaderConfig::GetTopic() { return this->topic; }
+
+pulsar_message_id_t *ReaderConfig::GetCStartMessageId() { return this->cStartMessageId; }
diff --git a/src/Client.h b/src/ReaderConfig.h
similarity index 64%
copy from src/Client.h
copy to src/ReaderConfig.h
index 63c66f2..1983459 100644
--- a/src/Client.h
+++ b/src/ReaderConfig.h
@@ -17,25 +17,26 @@
  * under the License.
  */
 
-#ifndef CLIENT_H
-#define CLIENT_H
+#ifndef READER_CONFIG_H
+#define READER_CONFIG_H
 
 #include <napi.h>
-#include <pulsar/c/client.h>
+#include <pulsar/c/reader.h>
+#include <pulsar/c/reader_configuration.h>
+#include <pulsar/c/message_id.h>
 
-class Client : public Napi::ObjectWrap<Client> {
+class ReaderConfig {
  public:
-  static Napi::Object Init(Napi::Env env, Napi::Object exports);
-  Client(const Napi::CallbackInfo &info);
-  ~Client();
+  ReaderConfig(const Napi::Object &readerConfig);
+  ~ReaderConfig();
+  pulsar_reader_configuration_t *GetCReaderConfig();
+  pulsar_message_id_t *GetCStartMessageId();
+  std::string GetTopic();
 
  private:
-  static Napi::FunctionReference constructor;
-  pulsar_client_t *cClient;
-
-  Napi::Value CreateProducer(const Napi::CallbackInfo &info);
-  Napi::Value Subscribe(const Napi::CallbackInfo &info);
-  Napi::Value Close(const Napi::CallbackInfo &info);
+  pulsar_reader_configuration_t *cReaderConfig;
+  pulsar_message_id_t *cStartMessageId;
+  std::string topic;
 };
 
 #endif
diff --git a/src/addon.cc b/src/addon.cc
index 050ae12..fa26ae0 100644
--- a/src/addon.cc
+++ b/src/addon.cc
@@ -23,6 +23,7 @@
 #include "Producer.h"
 #include "Consumer.h"
 #include "Client.h"
+#include "Reader.h"
 #include <napi.h>
 
 Napi::Object InitAll(Napi::Env env, Napi::Object exports) {
@@ -31,6 +32,7 @@ Napi::Object InitAll(Napi::Env env, Napi::Object exports) {
   Authentication::Init(env, exports);
   Producer::Init(env, exports);
   Consumer::Init(env, exports);
+  Reader::Init(env, exports);
   return Client::Init(env, exports);
 }
 
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index ae24d86..9f15cb7 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -23,11 +23,12 @@ const Pulsar = require('../index.js');
 
 (() => {
   describe('End To End', () => {
-    const client = new Pulsar.Client({
-      serviceUrl: 'pulsar://localhost:6650',
-      operationTimeoutSeconds: 30,
-    });
     test('Produce/Consume', async () => {
+      const client = new Pulsar.Client({
+        serviceUrl: 'pulsar://localhost:6650',
+        operationTimeoutSeconds: 30,
+      });
+
       const producer = await client.createProducer({
         topic: 'persistent://public/default/test-end-to-end',
         sendTimeoutMs: 30000,
@@ -63,9 +64,15 @@ const Pulsar = require('../index.js');
 
       await producer.close();
       await consumer.close();
+      await client.close();
     });
 
     test('acknowledgeCumulative', async () => {
+      const client = new Pulsar.Client({
+        serviceUrl: 'pulsar://localhost:6650',
+        operationTimeoutSeconds: 30,
+      });
+
       const producer = await client.createProducer({
         topic: 'persistent://public/default/acknowledgeCumulative',
         sendTimeoutMs: 30000,
@@ -103,5 +110,51 @@ const Pulsar = require('../index.js');
       await consumer.close();
       await client.close();
     });
+
+    test('Produce/Read', async () => {
+      const client = new Pulsar.Client({
+        serviceUrl: 'pulsar://localhost:6650',
+        operationTimeoutSeconds: 30,
+      });
+      expect(client).not.toBeNull();
+
+      const producer = await client.createProducer({
+        topic: 'persistent://public/default/test-end-to-end',
+        sendTimeoutMs: 30000,
+        batchingEnabled: true,
+      });
+      expect(producer).not.toBeNull();
+
+      const reader = await client.createReader({
+        topic: 'persistent://public/default/test-end-to-end',
+        startMessageId: Pulsar.MessageId.latest(),
+      });
+      expect(reader).not.toBeNull();
+
+      const messages = [];
+      for (let i = 0; i < 10; i += 1) {
+        const msg = `my-message-${i}`;
+        producer.send({
+          data: Buffer.from(msg),
+        });
+        messages.push(msg);
+      }
+      await producer.flush();
+
+      expect(reader.hasNext()).toBe(true);
+
+      const results = [];
+      for (let i = 0; i < 10; i += 1) {
+        const msg = await reader.readNext();
+        results.push(msg.getData().toString());
+      }
+      expect(lodash.difference(messages, results)).toEqual([]);
+
+      expect(reader.hasNext()).toBe(false);
+
+      await producer.close();
+      await reader.close();
+      await client.close();
+    });
   });
 })();
diff --git a/tests/reader.test.js b/tests/reader.test.js
new file mode 100644
index 0000000..9c87212
--- /dev/null
+++ b/tests/reader.test.js
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+const Pulsar = require('../index.js');
+
+(() => {
+  describe('Reader', () => {
+    describe('Create', () => {
+      test('No Topic', async () => {
+        const client = new Pulsar.Client({
+          serviceUrl: 'pulsar://localhost:6650',
+          operationTimeoutSeconds: 30,
+        });
+        await expect(client.createReader({
+          startMessageId: Pulsar.MessageId.earliest(),
+        })).rejects.toThrow('Topic is required and must be specified as a string when creating reader');
+        await client.close();
+      });
+
+      test('No Topic', async () => {
+        const client = new Pulsar.Client({
+          serviceUrl: 'pulsar://localhost:6650',
+          operationTimeoutSeconds: 30,
+        });
+        await expect(client.createReader({
+          topic: 0,
+          startMessageId: Pulsar.MessageId.earliest(),
+        })).rejects.toThrow('Topic is required and must be specified as a string when creating reader');
+        await client.close();
+      });
+
+      test('No StartMessageId', async () => {
+        const client = new Pulsar.Client({
+          serviceUrl: 'pulsar://localhost:6650',
+          operationTimeoutSeconds: 30,
+        });
+        await expect(client.createReader({
+          topic: 'persistent://public/default/topic',
+        })).rejects.toThrow('StartMessageId is required and must be specified as a MessageId object when creating reader');
+        await client.close();
+      });
+
+      test('Not StartMessageId as MessageId', async () => {
+        const client = new Pulsar.Client({
+          serviceUrl: 'pulsar://localhost:6650',
+          operationTimeoutSeconds: 30,
+        });
+        await expect(client.createReader({
+          topic: 'persistent://public/default/topic',
+          startMessageId: 'not MessageId',
+        })).rejects.toThrow('StartMessageId is required and must be specified as a MessageId object when creating reader');
+        await client.close();
+      });
+    });
+  });
+})();