You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by du...@apache.org on 2018/09/18 19:39:14 UTC
[incubator-openwhisk-package-kafka] branch master updated: Message
Hub IAM Integration (#284)
This is an automated email from the ASF dual-hosted git repository.
dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git
The following commit(s) were added to refs/heads/master by this push:
new 2dc039b Message Hub IAM Integration (#284)
2dc039b is described below
commit 2dc039bff25f662748abf2ef99efa409c29721fa
Author: Adnan Baruni <ab...@users.noreply.github.com>
AuthorDate: Tue Sep 18 14:39:09 2018 -0500
Message Hub IAM Integration (#284)
* add authHandler for managing iam token
* ensure iam url is added to validated params in web action
* validated params
* updates to iam integration
* package node_modules in web zip
* remove logging import from auth handler
---
.gitignore | 3 ++
action/kafkaFeedWeb.js | 19 ++++---
action/lib/common.js | 11 ++--
action/messageHubFeed.js | 8 +++
action/messageHubFeedWeb.js | 34 +++++++++---
action/messageHubFeedWeb_package.json | 5 +-
installCatalog.sh | 3 +-
provider/authHandler.py | 99 +++++++++++++++++++++++++++++++++++
provider/consumer.py | 10 +++-
9 files changed, 170 insertions(+), 22 deletions(-)
diff --git a/.gitignore b/.gitignore
index bc22306..15822ad 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,6 +4,9 @@ launchConfigurations/
action/*.zip
action/package.json
tests/build
+action/node_modules/
+action/package-lock.json
+package-lock.json
.idea/
out/
\ No newline at end of file
diff --git a/action/kafkaFeedWeb.js b/action/kafkaFeedWeb.js
index 7d1350b..2c8a386 100644
--- a/action/kafkaFeedWeb.js
+++ b/action/kafkaFeedWeb.js
@@ -30,7 +30,7 @@ function main(params) {
// do these in parallel!
return Promise.all([
db.ensureTriggerIsUnique(validatedParams.triggerName),
- common.verifyTriggerAuth(validatedParams.triggerURL)
+ verifyTriggerAuth(validatedParams.triggerURL, params.authKey)
]);
})
.then(() => {
@@ -63,9 +63,9 @@ function main(params) {
resolve(common.webResponse(statusCode, body));
});
} else if (params.__ow_method === "get") {
- const triggerURL = common.getTriggerURL(params.authKey, params.endpoint, params.triggerName);
+ const triggerURL = common.getTriggerURL(params.endpoint, params.triggerName);
- return common.verifyTriggerAuth(triggerURL)
+ return verifyTriggerAuth(triggerURL, params.authKey)
.then(() => {
db = new Database(params.DB_URL, params.DB_NAME);
return db.getTrigger(params.triggerName);
@@ -93,9 +93,9 @@ function main(params) {
resolve(common.webResponse(500, error.toString()));
});
} else if (params.__ow_method === "put") {
- const triggerURL = common.getTriggerURL(params.authKey, params.endpoint, params.triggerName);
+ const triggerURL = common.getTriggerURL(params.endpoint, params.triggerName);
- return common.verifyTriggerAuth(triggerURL)
+ return verifyTriggerAuth(triggerURL, params.authKey)
.then(() => {
db = new Database(params.DB_URL, params.DB_NAME);
return db.getTrigger(params.triggerName);
@@ -123,9 +123,9 @@ function main(params) {
resolve(common.webResponse(statusCode, body));
});
} else if (params.__ow_method === "delete") {
- const triggerURL = common.getTriggerURL(params.authKey, params.endpoint, params.triggerName);
+ const triggerURL = common.getTriggerURL(params.endpoint, params.triggerName);
- return common.verifyTriggerAuth(triggerURL)
+ return verifyTriggerAuth(triggerURL, params.authKey)
.then(() => {
db = new Database(params.DB_URL, params.DB_NAME);
return db.deleteTrigger(params.triggerName);
@@ -178,4 +178,9 @@ function validateParameters(rawParams) {
return promise;
}
+function verifyTriggerAuth(triggerURL, apiKey) {
+ var auth = apiKey.split(':');
+ return common.verifyTriggerAuth(triggerURL, { user: auth[0], pass: auth[1] });
+}
+
exports.main = main;
diff --git a/action/lib/common.js b/action/lib/common.js
index a24f9b5..5220c90 100644
--- a/action/lib/common.js
+++ b/action/lib/common.js
@@ -9,19 +9,19 @@ function triggerComponents(triggerName) {
};
}
-function getTriggerURL(authKey, endpoint, triggerName) {
+function getTriggerURL(endpoint, triggerName) {
var massagedAPIHost = endpoint.replace(/https?:\/\/(.*)/, "$1");
var components = triggerComponents(triggerName);
var namespace = components.namespace;
var trigger = components.triggerName;
- var url = `https://${authKey}@${massagedAPIHost}/api/v1/namespaces/${encodeURIComponent(namespace)}/triggers/${encodeURIComponent(trigger)}`;
+ var url = `https://${massagedAPIHost}/api/v1/namespaces/${encodeURIComponent(namespace)}/triggers/${encodeURIComponent(trigger)}`;
return url;
}
-function verifyTriggerAuth(triggerURL) {
+function verifyTriggerAuth(triggerURL, auth) {
var options = {
method: 'GET',
url: triggerURL,
@@ -29,7 +29,8 @@ function verifyTriggerAuth(triggerURL) {
headers: {
'Content-Type': 'application/json',
'User-Agent': 'whisk'
- }
+ },
+ auth: auth
};
return request(options)
@@ -205,7 +206,7 @@ function performCommonParameterValidation(rawParams) {
// now that everything else is valid, let's add these
validatedParams.isBinaryKey = getBooleanFromArgs(rawParams, 'isBinaryKey');
validatedParams.authKey = rawParams.authKey;
- validatedParams.triggerURL = getTriggerURL(validatedParams.authKey, rawParams.endpoint, rawParams.triggerName);
+ validatedParams.triggerURL = getTriggerURL(rawParams.endpoint, rawParams.triggerName);
const uuid = require('uuid');
validatedParams.uuid = uuid.v4();
diff --git a/action/messageHubFeed.js b/action/messageHubFeed.js
index 66b3124..4b4638e 100644
--- a/action/messageHubFeed.js
+++ b/action/messageHubFeed.js
@@ -18,6 +18,14 @@ function main(params) {
var massagedParams = common.massageParamsForWeb(params);
massagedParams.triggerName = common.getTriggerFQN(params.triggerName);
+ var iamKey = process.env.__OW_IAM_NAMESPACE_API_KEY;
+ massagedParams.authKey = iamKey || process.env.__OW_API_KEY;
+ massagedParams.isIamKey = iamKey != undefined;
+
+ if (massagedParams.isIamKey) {
+ massagedParams.iamUrl = process.env.__OW_IAM_API_URL;
+ }
+
if (params.lifecycleEvent === 'CREATE') {
return common.createTrigger(endpoint, massagedParams, webActionName);
} else if (params.lifecycleEvent === 'READ') {
diff --git a/action/messageHubFeedWeb.js b/action/messageHubFeedWeb.js
index 0ad270b..1f64515 100644
--- a/action/messageHubFeedWeb.js
+++ b/action/messageHubFeedWeb.js
@@ -1,5 +1,6 @@
const common = require('./lib/common');
const Database = require('./lib/Database');
+const itm = require('@ibm-functions/iam-token-manager');
var moment = require('moment');
/**
@@ -32,7 +33,7 @@ function main(params) {
// do these in parallel!
return Promise.all([
db.ensureTriggerIsUnique(validatedParams.triggerName),
- common.verifyTriggerAuth(validatedParams.triggerURL),
+ verifyTriggerAuth(validatedParams.triggerURL, params.authKey, params.isIamKey, params.iamUrl),
checkMessageHubCredentials(validatedParams)
]);
})
@@ -66,9 +67,9 @@ function main(params) {
resolve(common.webResponse(statusCode, body));
});
} else if (params.__ow_method === "get") {
- const triggerURL = common.getTriggerURL(params.authKey, params.endpoint, params.triggerName);
+ const triggerURL = common.getTriggerURL(params.endpoint, params.triggerName);
- return common.verifyTriggerAuth(triggerURL)
+ return verifyTriggerAuth(triggerURL, params.authKey, params.isIamKey, params.iamUrl)
.then(() => {
db = new Database(params.DB_URL, params.DB_NAME);
return db.getTrigger(params.triggerName);
@@ -99,9 +100,9 @@ function main(params) {
resolve(common.webResponse(500, error.toString()));
});
} else if (params.__ow_method === "put") {
- const triggerURL = common.getTriggerURL(params.authKey, params.endpoint, params.triggerName);
+ const triggerURL = common.getTriggerURL(params.endpoint, params.triggerName);
- return common.verifyTriggerAuth(triggerURL)
+ return verifyTriggerAuth(triggerURL, params.authKey, params.isIamKey, params.iamUrl)
.then(() => {
db = new Database(params.DB_URL, params.DB_NAME);
return db.getTrigger(params.triggerName);
@@ -129,9 +130,9 @@ function main(params) {
resolve(common.webResponse(statusCode, body));
});
} else if (params.__ow_method === "delete") {
- const triggerURL = common.getTriggerURL(params.authKey, params.endpoint, params.triggerName);
+ const triggerURL = common.getTriggerURL(params.endpoint, params.triggerName);
- return common.verifyTriggerAuth(triggerURL)
+ return verifyTriggerAuth(triggerURL, params.authKey, params.isIamKey, params.iamUrl)
.then(() => {
db = new Database(params.DB_URL, params.DB_NAME);
return db.deleteTrigger(params.triggerName);
@@ -162,6 +163,16 @@ function validateParameters(rawParams) {
return;
} else {
validatedParams = commonValidationResult.validatedParams;
+
+ if (rawParams.isIamKey != undefined) {
+ validatedParams.isIamKey = rawParams.isIamKey;
+ } else {
+ validatedParams.isIamKey = false
+ }
+
+ if (rawParams.iamUrl) {
+ validatedParams.iamUrl = rawParams.iamUrl;
+ }
}
// kafka_brokers_sasl
@@ -243,4 +254,13 @@ function checkMessageHubCredentials(params) {
});
}
+function verifyTriggerAuth(triggerURL, apiKey, isIamKey, iamUrl) {
+ if (isIamKey) {
+ return new itm({ 'iamApikey': apiKey, 'iamUrl': iamUrl }).getToken().then( token => common.verifyTriggerAuth(triggerURL, { bearer: token }));
+ } else {
+ var auth = apiKey.split(':');
+ return common.verifyTriggerAuth(triggerURL, { user: auth[0], pass: auth[1] });
+ }
+}
+
exports.main = main;
diff --git a/action/messageHubFeedWeb_package.json b/action/messageHubFeedWeb_package.json
index 7735d19..00b159d 100644
--- a/action/messageHubFeedWeb_package.json
+++ b/action/messageHubFeedWeb_package.json
@@ -1,5 +1,8 @@
{
"name": "messageHubFeedWeb",
"version": "1.0.0",
- "main": "messageHubFeedWeb.js"
+ "main": "messageHubFeedWeb.js",
+ "dependencies": {
+ "@ibm-functions/iam-token-manager": "^1.0.0"
+ }
}
diff --git a/installCatalog.sh b/installCatalog.sh
index 3976dc5..855e6be 100755
--- a/installCatalog.sh
+++ b/installCatalog.sh
@@ -93,7 +93,8 @@ then
fi
cp -f messageHubFeedWeb_package.json package.json
-zip -r messageHubFeedWeb.zip lib package.json messageHubFeedWeb.js
+npm install
+zip -r messageHubFeedWeb.zip lib package.json messageHubFeedWeb.js node_modules
cd $OLD_PATH
diff --git a/provider/authHandler.py b/provider/authHandler.py
new file mode 100644
index 0000000..0325161
--- /dev/null
+++ b/provider/authHandler.py
@@ -0,0 +1,99 @@
+"""IAMAuth class.
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+"""
+
+import requests
+import time
+
+from requests.auth import AuthBase
+
+
+class IAMAuth(AuthBase):
+
+ def __init__(self, authKey, endpoint):
+ self.authKey = authKey
+ self.endpoint = endpoint
+ self.tokenInfo = {}
+
+ def __call__(self, r):
+ r.headers['Authorization'] = 'Bearer {}'.format(self.__getToken())
+ return r
+
+
+ def __getToken(self):
+ if 'expires_in' not in self.tokenInfo or self.__isRefreshTokenExpired():
+ self.tokenInfo = self.__requestToken()
+ return self.tokenInfo['access_token']
+ elif self.__isTokenExpired():
+ self.tokenInfo = self.__refreshToken()
+ return self.tokenInfo['access_token']
+ else:
+ return self.tokenInfo['access_token']
+
+
+ def __requestToken(self):
+ headers = {
+ 'Content-type': 'application/x-www-form-urlencoded',
+ 'Authorization': 'Basic Yng6Yng='
+ }
+ payload = {
+ 'grant_type': 'urn:ibm:params:oauth:grant-type:apikey',
+ 'apikey': self.authKey
+ }
+
+ return self.__sendRequest(payload, headers)
+
+ def __refreshToken(self):
+ headers = {
+ 'Content-type': 'application/x-www-form-urlencoded',
+ 'Authorization': 'Basic Yng6Yng='
+ }
+ payload = {
+ 'grant_type': 'refresh_token',
+ 'refresh_token': self.tokenInfo['refresh_token']
+ }
+
+ return self.__sendRequest(payload, headers)
+
+
+ def __isTokenExpired(self):
+ if 'expires_in' not in self.tokenInfo or 'expiration' not in self.tokenInfo:
+ return True
+
+ fractionOfTtl = 0.8
+ timeToLive = self.tokenInfo['expires_in']
+ expireTime = self.tokenInfo['expiration']
+ currentTime = int(time.time())
+ refreshTime = expireTime - (timeToLive * (1.0 - fractionOfTtl))
+
+ return refreshTime < currentTime
+
+ def __isRefreshTokenExpired(self):
+ if 'expiration' not in self.tokenInfo:
+ return true
+
+ sevenDays = 7 * 24 * 3600
+ currentTime = int(time.time())
+ newTokenTime = self.tokenInfo['expiration'] + sevenDays
+
+ return newTokenTime < currentTime
+
+ def __sendRequest(self, payload, headers):
+ response = requests.post(self.endpoint, data=payload, headers=headers)
+ return response.json()
diff --git a/provider/consumer.py b/provider/consumer.py
index 4bf62cf..1b6bf6f 100644
--- a/provider/consumer.py
+++ b/provider/consumer.py
@@ -32,6 +32,8 @@ from datetime import datetime
from datetimeutils import secondsSince
from multiprocessing import Process, Manager
from urlparse import urlparse
+from authHandler import IAMAuth
+from requests.auth import HTTPBasicAuth
local_dev = os.getenv('LOCAL_DEV', 'False')
payload_limit = int(os.getenv('PAYLOAD_LIMIT', 900000))
@@ -145,6 +147,12 @@ class ConsumerProcess (Process):
self.username = params["username"]
self.password = params["password"]
+ if 'isIamKey' in params and params['isIamKey'] == True:
+ self.authHandler = IAMAuth(params['authKey'], params['iamUrl'])
+ else:
+ auth = params['authKey'].split(':')
+ self.authHandler = HTTPBasicAuth(auth[0], auth[1])
+
# handle the case where there may be existing triggers that do not
# have the isJSONData field set
if "isJSONData" in params:
@@ -362,7 +370,7 @@ class ConsumerProcess (Process):
while retry:
try:
- response = requests.post(self.triggerURL, json=payload, timeout=10.0, verify=check_ssl)
+ response = requests.post(self.triggerURL, json=payload, auth=self.authHandler, timeout=10.0, verify=check_ssl)
status_code = response.status_code
logging.info("[{}] Repsonse status code {}".format(self.trigger, status_code))