You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2019/06/12 02:43:58 UTC

[pulsar-client-node] 10/29: Support authentication

This is an automated email from the ASF dual-hosted git repository.

massakam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git

commit 0dccd84811597258a559f137cf45593f7734f08f
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Mon Mar 25 16:21:16 2019 +0900

    Support authentication
---
 binding.gyp                               |  3 +-
 index.js => examples/consumer_tls_auth.js | 36 +++++++++---
 index.js => examples/producer_tls_auth.js | 38 ++++++++++---
 index.js                                  |  6 ++
 src/Authentication.cc                     | 95 +++++++++++++++++++++++++++++++
 src/{addon.cc => Authentication.h}        | 29 +++++-----
 index.js => src/AuthenticationAthenz.js   | 13 +++--
 index.js => src/AuthenticationTls.js      | 12 ++--
 index.js => src/AuthenticationToken.js    | 12 ++--
 src/Client.cc                             | 11 ++++
 src/addon.cc                              |  6 +-
 11 files changed, 213 insertions(+), 48 deletions(-)

diff --git a/binding.gyp b/binding.gyp
index 88a6c70..cfcf8b7 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -29,12 +29,13 @@
       "sources": [
         "src/addon.cc",
         "src/Message.cc",
+        "src/MessageId.cc",
+        "src/Authentication.cc",
         "src/Client.cc",
         "src/Producer.cc",
         "src/ProducerConfig.cc",
         "src/Consumer.cc",
         "src/ConsumerConfig.cc",
-        "src/MessageId.cc",
       ],
       "libraries": ["-lpulsar"],
     }
diff --git a/index.js b/examples/consumer_tls_auth.js
similarity index 51%
copy from index.js
copy to examples/consumer_tls_auth.js
index 03f3a08..e71b251 100644
--- a/index.js
+++ b/examples/consumer_tls_auth.js
@@ -17,12 +17,34 @@
  * under the License.
  */
 
-const PulsarBinding = require('bindings')('Pulsar');
+const Pulsar = require('../index.js');
 
-const Pulsar = {
-  Client: PulsarBinding.Client,
-  Message: PulsarBinding.Message,
-  MessageId: PulsarBinding.MessageId,
-};
+(async () => {
+  const auth = new Pulsar.AuthenticationTls({
+    certificatePath: '/path/to/client.crt',
+    privateKeyPath: '/path/to/client.key',
+  });
 
-module.exports = Pulsar;
+  // Create a client
+  const client = new Pulsar.Client({
+    serviceUrl: 'pulsar+ssl://localhost:6651',
+    authentication: auth,
+    tlsTrustCertsFilePath: '/path/to/server.crt',
+  });
+
+  // Create a consumer
+  const consumer = await client.subscribe({
+    topic: 'persistent://public/default/my-topic',
+    subscription: 'sub1',
+  });
+
+  // Receive messages
+  for (let i = 0; i < 10; i += 1) {
+    const msg = await consumer.receive();
+    console.log(msg.getData().toString());
+    consumer.acknowledge(msg);
+  }
+
+  await consumer.close();
+  await client.close();
+})();
diff --git a/index.js b/examples/producer_tls_auth.js
similarity index 50%
copy from index.js
copy to examples/producer_tls_auth.js
index 03f3a08..df4e33e 100644
--- a/index.js
+++ b/examples/producer_tls_auth.js
@@ -17,12 +17,36 @@
  * under the License.
  */
 
-const PulsarBinding = require('bindings')('Pulsar');
+const Pulsar = require('../index.js');
 
-const Pulsar = {
-  Client: PulsarBinding.Client,
-  Message: PulsarBinding.Message,
-  MessageId: PulsarBinding.MessageId,
-};
+(async () => {
+  const auth = new Pulsar.AuthenticationTls({
+    certificatePath: '/path/to/client.crt',
+    privateKeyPath: '/path/to/client.key',
+  });
 
-module.exports = Pulsar;
+  // Create a client
+  const client = new Pulsar.Client({
+    serviceUrl: 'pulsar+ssl://localhost:6651',
+    authentication: auth,
+    tlsTrustCertsFilePath: '/path/to/server.crt',
+  });
+
+  // Create a producer
+  const producer = await client.createProducer({
+    topic: 'persistent://public/default/my-topic',
+  });
+
+  // Send messages
+  for (let i = 0; i < 10; i += 1) {
+    const msg = `my-message-${i}`;
+    producer.send({
+      data: Buffer.from(msg),
+    });
+    console.log(`Sent message: ${msg}`);
+  }
+  await producer.flush();
+
+  await producer.close();
+  await client.close();
+})();
diff --git a/index.js b/index.js
index 03f3a08..97c83c6 100644
--- a/index.js
+++ b/index.js
@@ -18,11 +18,17 @@
  */
 
 const PulsarBinding = require('bindings')('Pulsar');
+const AuthenticationTls = require('./src/AuthenticationTls.js');
+const AuthenticationAthenz = require('./src/AuthenticationAthenz.js');
+const AuthenticationToken = require('./src/AuthenticationToken.js');
 
 const Pulsar = {
   Client: PulsarBinding.Client,
   Message: PulsarBinding.Message,
   MessageId: PulsarBinding.MessageId,
+  AuthenticationTls,
+  AuthenticationAthenz,
+  AuthenticationToken,
 };
 
 module.exports = Pulsar;
diff --git a/src/Authentication.cc b/src/Authentication.cc
new file mode 100644
index 0000000..226fd0a
--- /dev/null
+++ b/src/Authentication.cc
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "Authentication.h"
+
+static const std::string PARAM_TLS_CERT = "certificatePath";
+static const std::string PARAM_TLS_KEY = "privateKeyPath";
+static const std::string PARAM_TOKEN = "token";
+
+Napi::FunctionReference Authentication::constructor;
+
+Napi::Object Authentication::Init(Napi::Env env, Napi::Object exports) {
+  Napi::HandleScope scope(env);
+
+  Napi::Function func = DefineClass(env, "Authentication", {});
+
+  constructor = Napi::Persistent(func);
+  constructor.SuppressDestruct();
+
+  exports.Set("Authentication", func);
+  return exports;
+}
+
+Authentication::Authentication(const Napi::CallbackInfo &info)
+    : Napi::ObjectWrap<Authentication>(info), cAuthentication(nullptr) {
+  Napi::Env env = info.Env();
+  Napi::HandleScope scope(env);
+
+  if (info.Length() < 1 || !info[0].IsString() || info[0].ToString().Utf8Value().empty()) {
+    Napi::Error::New(env, "Authentication method is not specified").ThrowAsJavaScriptException();
+    return;
+  }
+
+  std::string authMethod = info[0].ToString().Utf8Value();
+
+  if (authMethod == "tls" || authMethod == "token") {
+    if (info.Length() < 2 || !info[1].IsObject()) {
+      Napi::Error::New(env, "Authentication parameter must be a object").ThrowAsJavaScriptException();
+      return;
+    }
+
+    Napi::Object obj = info[1].ToObject();
+
+    if (authMethod == "tls") {
+      if (!obj.Has(PARAM_TLS_CERT) || !obj.Get(PARAM_TLS_CERT).IsString() || !obj.Has(PARAM_TLS_KEY) ||
+          !obj.Get(PARAM_TLS_KEY).IsString()) {
+        Napi::Error::New(env, "Missing required parameter").ThrowAsJavaScriptException();
+        return;
+      }
+      this->cAuthentication =
+          pulsar_authentication_tls_create(obj.Get(PARAM_TLS_CERT).ToString().Utf8Value().c_str(),
+                                           obj.Get(PARAM_TLS_KEY).ToString().Utf8Value().c_str());
+    } else if (authMethod == "token") {
+      if (!obj.Has(PARAM_TOKEN) || !obj.Get(PARAM_TOKEN).IsString()) {
+        Napi::Error::New(env, "Missing required parameter").ThrowAsJavaScriptException();
+        return;
+      }
+      this->cAuthentication =
+          pulsar_authentication_token_create(obj.Get(PARAM_TOKEN).ToString().Utf8Value().c_str());
+    }
+  } else if (authMethod == "athenz") {
+    if (info.Length() < 2 || !info[1].IsString()) {
+      Napi::Error::New(env, "Authentication parameter must be a JSON string").ThrowAsJavaScriptException();
+      return;
+    }
+    this->cAuthentication = pulsar_authentication_athenz_create(info[1].ToString().Utf8Value().c_str());
+  } else {
+    Napi::Error::New(env, "Unsupported authentication method").ThrowAsJavaScriptException();
+    return;
+  }
+}
+
+Authentication::~Authentication() {
+  if (this->cAuthentication != nullptr) {
+    pulsar_authentication_free(this->cAuthentication);
+  }
+}
+
+pulsar_authentication_t *Authentication::GetCAuthentication() { return this->cAuthentication; }
diff --git a/src/addon.cc b/src/Authentication.h
similarity index 65%
copy from src/addon.cc
copy to src/Authentication.h
index 9e75d3f..3666bd8 100644
--- a/src/addon.cc
+++ b/src/Authentication.h
@@ -17,19 +17,22 @@
  * under the License.
  */
 
-#include "Client.h"
-#include "Consumer.h"
-#include "Message.h"
-#include "MessageId.h"
-#include "Producer.h"
+#ifndef AUTH_H
+#define AUTH_H
+
 #include <napi.h>
+#include <pulsar/c/authentication.h>
+
+class Authentication : public Napi::ObjectWrap<Authentication> {
+ public:
+  static Napi::Object Init(Napi::Env env, Napi::Object exports);
+  Authentication(const Napi::CallbackInfo &info);
+  ~Authentication();
+  pulsar_authentication_t *GetCAuthentication();
 
-Napi::Object InitAll(Napi::Env env, Napi::Object exports) {
-  Message::Init(env, exports);
-  MessageId::Init(env, exports);
-  Producer::Init(env, exports);
-  Consumer::Init(env, exports);
-  return Client::Init(env, exports);
-}
+ private:
+  static Napi::FunctionReference constructor;
+  pulsar_authentication_t *cAuthentication;
+};
 
-NODE_API_MODULE(NODE_GYP_MODULE_NAME, InitAll)
+#endif
diff --git a/index.js b/src/AuthenticationAthenz.js
similarity index 77%
copy from index.js
copy to src/AuthenticationAthenz.js
index 03f3a08..edbce3b 100644
--- a/index.js
+++ b/src/AuthenticationAthenz.js
@@ -19,10 +19,11 @@
 
 const PulsarBinding = require('bindings')('Pulsar');
 
-const Pulsar = {
-  Client: PulsarBinding.Client,
-  Message: PulsarBinding.Message,
-  MessageId: PulsarBinding.MessageId,
-};
+class AuthenticationAthenz {
+  constructor(params) {
+    const paramsStr = (typeof params === 'object') ? JSON.stringify(params) : params;
+    this.binding = new PulsarBinding.Authentication('athenz', paramsStr);
+  }
+}
 
-module.exports = Pulsar;
+module.exports = AuthenticationAthenz;
diff --git a/index.js b/src/AuthenticationTls.js
similarity index 84%
copy from index.js
copy to src/AuthenticationTls.js
index 03f3a08..f00b579 100644
--- a/index.js
+++ b/src/AuthenticationTls.js
@@ -19,10 +19,10 @@
 
 const PulsarBinding = require('bindings')('Pulsar');
 
-const Pulsar = {
-  Client: PulsarBinding.Client,
-  Message: PulsarBinding.Message,
-  MessageId: PulsarBinding.MessageId,
-};
+class AuthenticationTls {
+  constructor(params) {
+    this.binding = new PulsarBinding.Authentication('tls', params);
+  }
+}
 
-module.exports = Pulsar;
+module.exports = AuthenticationTls;
diff --git a/index.js b/src/AuthenticationToken.js
similarity index 83%
copy from index.js
copy to src/AuthenticationToken.js
index 03f3a08..e40c892 100644
--- a/index.js
+++ b/src/AuthenticationToken.js
@@ -19,10 +19,10 @@
 
 const PulsarBinding = require('bindings')('Pulsar');
 
-const Pulsar = {
-  Client: PulsarBinding.Client,
-  Message: PulsarBinding.Message,
-  MessageId: PulsarBinding.MessageId,
-};
+class AuthenticationToken {
+  constructor(params) {
+    this.binding = new PulsarBinding.Authentication('token', params);
+  }
+}
 
-module.exports = Pulsar;
+module.exports = AuthenticationToken;
diff --git a/src/Client.cc b/src/Client.cc
index 16fb13d..123fc0f 100644
--- a/src/Client.cc
+++ b/src/Client.cc
@@ -20,11 +20,14 @@
 #include "Client.h"
 #include "Consumer.h"
 #include "Producer.h"
+#include "Authentication.h"
 #include <pulsar/c/client.h>
 #include <pulsar/c/client_configuration.h>
 #include <pulsar/c/result.h>
 
 static const std::string CFG_SERVICE_URL = "serviceUrl";
+static const std::string CFG_AUTH = "authentication";
+static const std::string CFG_AUTH_PROP = "binding";
 static const std::string CFG_OP_TIMEOUT = "operationTimeoutSeconds";
 static const std::string CFG_IO_THREADS = "ioThreads";
 static const std::string CFG_LISTENER_THREADS = "messageListenerThreads";
@@ -68,6 +71,14 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Client>(info)
 
   pulsar_client_configuration_t *cClientConfig = pulsar_client_configuration_create();
 
+  if (clientConfig.Has(CFG_AUTH) && clientConfig.Get(CFG_AUTH).IsObject()) {
+    Napi::Object obj = clientConfig.Get(CFG_AUTH).ToObject();
+    if (obj.Has(CFG_AUTH_PROP) && obj.Get(CFG_AUTH_PROP).IsObject()) {
+      Authentication *auth = Authentication::Unwrap(obj.Get(CFG_AUTH_PROP).ToObject());
+      pulsar_client_configuration_set_auth(cClientConfig, auth->GetCAuthentication());
+    }
+  }
+
   if (clientConfig.Has(CFG_OP_TIMEOUT) && clientConfig.Get(CFG_OP_TIMEOUT).IsNumber()) {
     int32_t operationTimeoutSeconds = clientConfig.Get(CFG_OP_TIMEOUT).ToNumber().Int32Value();
     if (operationTimeoutSeconds > 0) {
diff --git a/src/addon.cc b/src/addon.cc
index 9e75d3f..050ae12 100644
--- a/src/addon.cc
+++ b/src/addon.cc
@@ -17,16 +17,18 @@
  * under the License.
  */
 
-#include "Client.h"
-#include "Consumer.h"
 #include "Message.h"
 #include "MessageId.h"
+#include "Authentication.h"
 #include "Producer.h"
+#include "Consumer.h"
+#include "Client.h"
 #include <napi.h>
 
 Napi::Object InitAll(Napi::Env env, Napi::Object exports) {
   Message::Init(env, exports);
   MessageId::Init(env, exports);
+  Authentication::Init(env, exports);
   Producer::Init(env, exports);
   Consumer::Init(env, exports);
   return Client::Init(env, exports);