You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by du...@apache.org on 2017/10/17 18:48:33 UTC
[incubator-openwhisk-package-kafka] branch master updated: Add
ability to get trigger configuration and status (#217)
This is an automated email from the ASF dual-hosted git repository.
dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git
The following commit(s) were added to refs/heads/master by this push:
new 3fb980c Add ability to get trigger configuration and status (#217)
3fb980c is described below
commit 3fb980c72c54b33add8bdf2fa6231e08287d7c56
Author: Adnan Baruni <ab...@users.noreply.github.com>
AuthorDate: Tue Oct 17 13:48:32 2017 -0500
Add ability to get trigger configuration and status (#217)
* initial implementation of trigger management, get status and configuration
* add test for get trigger config and status
* add implementation from message hub feed action to kafka feed action
---
action/kafkaFeed.js | 2 +
action/kafkaFeedWeb.js | 29 ++++++++++
action/lib/common.js | 29 +++++++++-
action/messageHubFeed.js | 2 +
action/messageHubFeedWeb.js | 29 ++++++++++
.../system/packages/MessageHubFeedTests.scala | 67 +++++++++++++++++++++-
6 files changed, 153 insertions(+), 5 deletions(-)
diff --git a/action/kafkaFeed.js b/action/kafkaFeed.js
index 3cd42f7..bf55538 100644
--- a/action/kafkaFeed.js
+++ b/action/kafkaFeed.js
@@ -18,6 +18,8 @@ function main(params) {
if (params.lifecycleEvent === 'CREATE') {
return common.createTrigger(endpoint, massagedParams, webActionName);
+ } else if (params.lifecycleEvent === 'READ') {
+ return common.getTrigger(endpoint, massagedParams, webActionName);
} else if (params.lifecycleEvent === 'DELETE') {
return common.deleteTrigger(endpoint, massagedParams, webActionName);
}
diff --git a/action/kafkaFeedWeb.js b/action/kafkaFeedWeb.js
index de39edb..d60ce2f 100644
--- a/action/kafkaFeedWeb.js
+++ b/action/kafkaFeedWeb.js
@@ -61,6 +61,35 @@ function main(params) {
resolve(common.webResponse(statusCode, body));
});
+ } else if (params.__ow_method === "get") {
+ const triggerURL = common.getTriggerURL(params.authKey, params.endpoint, params.triggerName);
+
+ return common.verifyTriggerAuth(triggerURL)
+ .then(() => {
+ db = new Database(params.DB_URL, params.DB_NAME);
+ return db.getTrigger(params.triggerName);
+ })
+ .then((triggerDoc) => {
+ var body = {
+ config: {
+ triggerName: triggerDoc.triggerName,
+ topic: triggerDoc.topic,
+ isJSONData: triggerDoc.isJSONData,
+ isBinaryValue: triggerDoc.isBinaryValue,
+ isBinaryKey: triggerDoc.isBinaryKey,
+ isMessageHub: triggerDoc.isMessageHub,
+ brokers: triggerDoc.brokers,
+ kafka_admin_url: triggerDoc.kafka_admin_url,
+ username: triggerDoc.username,
+ password: triggerDoc.password
+ },
+ status: triggerDoc.status
+ }
+ resolve(common.webResponse(200, body, 'application/json'));
+ })
+ .catch(error => {
+ resolve(common.webResponse(500, error.toString()));
+ });
} else if (params.__ow_method === "delete") {
const triggerURL = common.getTriggerURL(params.authKey, params.endpoint, params.triggerName);
diff --git a/action/lib/common.js b/action/lib/common.js
index 46a7d32..638f8c2 100644
--- a/action/lib/common.js
+++ b/action/lib/common.js
@@ -130,6 +130,30 @@ function deleteTrigger(endpoint, params, actionName) {
});
}
+function getTrigger(endpoint, params, actionName) {
+ var options = {
+ method: 'GET',
+ url: getWebActionURL(endpoint, actionName),
+ rejectUnauthorized: false,
+ json: true,
+ body: params,
+ headers: {
+ 'Content-Type': 'application/json',
+ 'Accept': 'application/json',
+ 'User-Agent': 'whisk'
+ }
+ };
+
+ return request(options)
+ .then(response => {
+ return response;
+ })
+ .catch(error => {
+ console.log(`Error fetching trigger: ${JSON.stringify(error, null, 2)}`);
+ return Promise.reject(error.response.body);
+ });
+}
+
// perform parameter validation that is common to both feed actions
function performCommonParameterValidation(rawParams) {
var validatedParams = {};
@@ -166,11 +190,11 @@ function performCommonParameterValidation(rawParams) {
return { validatedParams: validatedParams };
}
-function webResponse(code, body) {
+function webResponse(code, body, contentType = 'text/plain') {
return {
statusCode: code,
headers: {
- 'Content-Type': 'text/plain'
+ 'Content-Type': contentType
},
body: body
};
@@ -179,6 +203,7 @@ function webResponse(code, body) {
module.exports = {
'createTrigger': createTrigger,
'deleteTrigger': deleteTrigger,
+ 'getTrigger': getTrigger,
'getBooleanFromArgs': getBooleanFromArgs,
'getTriggerFQN': getTriggerFQN,
'getTriggerURL': getTriggerURL,
diff --git a/action/messageHubFeed.js b/action/messageHubFeed.js
index 818520a..3c2ae7c 100644
--- a/action/messageHubFeed.js
+++ b/action/messageHubFeed.js
@@ -20,6 +20,8 @@ function main(params) {
if (params.lifecycleEvent === 'CREATE') {
return common.createTrigger(endpoint, massagedParams, webActionName);
+ } else if (params.lifecycleEvent === 'READ') {
+ return common.getTrigger(endpoint, massagedParams, webActionName);
} else if (params.lifecycleEvent === 'DELETE') {
return common.deleteTrigger(endpoint, massagedParams, webActionName);
}
diff --git a/action/messageHubFeedWeb.js b/action/messageHubFeedWeb.js
index 042096a..b6c5523 100644
--- a/action/messageHubFeedWeb.js
+++ b/action/messageHubFeedWeb.js
@@ -64,6 +64,35 @@ function main(params) {
resolve(common.webResponse(statusCode, body));
});
+ } else if (params.__ow_method === "get") {
+ const triggerURL = common.getTriggerURL(params.authKey, params.endpoint, params.triggerName);
+
+ return common.verifyTriggerAuth(triggerURL)
+ .then(() => {
+ db = new Database(params.DB_URL, params.DB_NAME);
+ return db.getTrigger(params.triggerName);
+ })
+ .then((triggerDoc) => {
+ var body = {
+ config: {
+ triggerName: triggerDoc.triggerName,
+ topic: triggerDoc.topic,
+ isJSONData: triggerDoc.isJSONData,
+ isBinaryValue: triggerDoc.isBinaryValue,
+ isBinaryKey: triggerDoc.isBinaryKey,
+ isMessageHub: triggerDoc.isMessageHub,
+ brokers: triggerDoc.brokers,
+ kafka_admin_url: triggerDoc.kafka_admin_url,
+ username: triggerDoc.username,
+ password: triggerDoc.password
+ },
+ status: triggerDoc.status
+ }
+ resolve(common.webResponse(200, body, 'application/json'));
+ })
+ .catch(error => {
+ resolve(common.webResponse(500, error.toString()));
+ });
} else if (params.__ow_method === "delete") {
const triggerURL = common.getTriggerURL(params.authKey, params.endpoint, params.triggerName);
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index eddc077..279772b 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -27,12 +27,11 @@ import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
import org.scalatest.Matchers
+import org.scalatest.Inside
import org.scalatest.junit.JUnitRunner
import spray.json.DefaultJsonProtocol._
-import spray.json.JsObject
-import spray.json.JsString
-import spray.json.pimpAny
+import spray.json._
import common.JsHelpers
import common.TestHelpers
@@ -49,6 +48,7 @@ import java.nio.charset.StandardCharsets
class MessageHubFeedTests
extends FlatSpec
with Matchers
+ with Inside
with WskActorSystem
with BeforeAndAfterAll
with TestHelpers
@@ -274,6 +274,67 @@ class MessageHubFeedTests
assert(matchingActivations.length == 0)
}
+ it should "return correct status and configuration" in withAssetCleaner(wskprops) {
+ val currentTime = s"${System.currentTimeMillis}"
+
+ (wp, assetHelper) =>
+ val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
+ println(s"Creating trigger ${triggerName}")
+
+ val username = kafkaUtils.getAsJson("user")
+ val password = kafkaUtils.getAsJson("password")
+ val admin_url = kafkaUtils.getAsJson("kafka_admin_url")
+ val brokers = kafkaUtils.getAsJson("brokers")
+
+ createTrigger(assetHelper, triggerName, parameters = Map(
+ "user" -> username,
+ "password" -> password,
+ "api_key" -> kafkaUtils.getAsJson("api_key"),
+ "kafka_admin_url" -> admin_url,
+ "kafka_brokers_sasl" -> brokers,
+ "topic" -> topic.toJson,
+ "isBinaryKey" -> false.toJson,
+ "isBinaryValue" -> false.toJson
+ ))
+
+ val run = wsk.action.invoke(actionName, parameters = Map(
+ "triggerName" -> triggerName.toJson,
+ "lifecycleEvent" -> "READ".toJson,
+ "authKey" -> wp.authKey.toJson
+ ))
+
+ withActivation(wsk.activation, run) {
+ activation =>
+ activation.response.success shouldBe true
+
+ inside (activation.response.result) {
+ case Some(result) =>
+ val config = result.getFields("config").head.asInstanceOf[JsObject].fields
+ val status = result.getFields("status").head.asInstanceOf[JsObject].fields
+
+ config should contain("brokers" -> brokers)
+ config should contain("isBinaryKey" -> false.toJson)
+ config should contain("isBinaryValue" -> false.toJson)
+ config should contain("isJSONData" -> false.toJson)
+ config should contain("isMessageHub" -> true.toJson)
+ config should contain("kafka_admin_url" -> admin_url)
+ config should contain("password" -> password)
+ config should contain("topic" -> topic.toJson)
+ config should contain("username" -> username)
+ config("triggerName").convertTo[String].split("/").last should equal (triggerName.split("/").last)
+ config should not {
+ contain key "authKey"
+ contain key "triggerURL"
+ contain key "uuid"
+ contain key "worker"
+ }
+ status should contain("active" -> true.toJson)
+ status should contain key "dateChanged"
+ status should not(contain key "reason")
+ }
+ }
+ }
+
def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = {
val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) {
(trigger, _) =>
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].