You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by du...@apache.org on 2017/10/17 18:48:33 UTC

[incubator-openwhisk-package-kafka] branch master updated: Add ability to get trigger configuration and status (#217)

This is an automated email from the ASF dual-hosted git repository.

dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fb980c  Add ability to get trigger configuration and status (#217)
3fb980c is described below

commit 3fb980c72c54b33add8bdf2fa6231e08287d7c56
Author: Adnan Baruni <ab...@users.noreply.github.com>
AuthorDate: Tue Oct 17 13:48:32 2017 -0500

    Add ability to get trigger configuration and status (#217)
    
    * initial implementation of trigger management, get status and configuration
    
    * add test for get trigger config and status
    
    * add implementation from message hub feed action to kafka feed action
---
 action/kafkaFeed.js                                |  2 +
 action/kafkaFeedWeb.js                             | 29 ++++++++++
 action/lib/common.js                               | 29 +++++++++-
 action/messageHubFeed.js                           |  2 +
 action/messageHubFeedWeb.js                        | 29 ++++++++++
 .../system/packages/MessageHubFeedTests.scala      | 67 +++++++++++++++++++++-
 6 files changed, 153 insertions(+), 5 deletions(-)

diff --git a/action/kafkaFeed.js b/action/kafkaFeed.js
index 3cd42f7..bf55538 100644
--- a/action/kafkaFeed.js
+++ b/action/kafkaFeed.js
@@ -18,6 +18,8 @@ function main(params) {
 
     if (params.lifecycleEvent === 'CREATE') {
         return common.createTrigger(endpoint, massagedParams, webActionName);
+    } else if (params.lifecycleEvent === 'READ') {
+        return common.getTrigger(endpoint, massagedParams, webActionName);
     } else if (params.lifecycleEvent === 'DELETE') {
         return common.deleteTrigger(endpoint, massagedParams, webActionName);
     }
diff --git a/action/kafkaFeedWeb.js b/action/kafkaFeedWeb.js
index de39edb..d60ce2f 100644
--- a/action/kafkaFeedWeb.js
+++ b/action/kafkaFeedWeb.js
@@ -61,6 +61,35 @@ function main(params) {
 
                     resolve(common.webResponse(statusCode, body));
                 });
+        } else if (params.__ow_method === "get") {
+            const triggerURL = common.getTriggerURL(params.authKey, params.endpoint, params.triggerName);
+
+            return common.verifyTriggerAuth(triggerURL)
+                .then(() => {
+                    db = new Database(params.DB_URL, params.DB_NAME);
+                    return db.getTrigger(params.triggerName);
+                })
+                .then((triggerDoc) => {
+                    var body = {
+                        config: {
+                            triggerName: triggerDoc.triggerName,
+                            topic: triggerDoc.topic,
+                            isJSONData: triggerDoc.isJSONData,
+                            isBinaryValue: triggerDoc.isBinaryValue,
+                            isBinaryKey: triggerDoc.isBinaryKey,
+                            isMessageHub: triggerDoc.isMessageHub,
+                            brokers: triggerDoc.brokers,
+                            kafka_admin_url: triggerDoc.kafka_admin_url,
+                            username: triggerDoc.username,
+                            password: triggerDoc.password
+                        },
+                        status: triggerDoc.status
+                    }
+                    resolve(common.webResponse(200, body, 'application/json'));
+                })
+                .catch(error => {
+                    resolve(common.webResponse(500, error.toString()));
+                });
         } else if (params.__ow_method === "delete") {
             const triggerURL = common.getTriggerURL(params.authKey, params.endpoint, params.triggerName);
 
diff --git a/action/lib/common.js b/action/lib/common.js
index 46a7d32..638f8c2 100644
--- a/action/lib/common.js
+++ b/action/lib/common.js
@@ -130,6 +130,30 @@ function deleteTrigger(endpoint, params, actionName) {
         });
 }
 
+function getTrigger(endpoint, params, actionName) {
+    var options = {
+        method: 'GET',
+        url: getWebActionURL(endpoint, actionName),
+        rejectUnauthorized: false,
+        json: true,
+        body: params,
+        headers: {
+            'Content-Type': 'application/json',
+            'Accept': 'application/json',
+            'User-Agent': 'whisk'
+        }
+    };
+
+    return request(options)
+        .then(response => {
+            return response;
+        })
+        .catch(error => {
+            console.log(`Error fetching trigger: ${JSON.stringify(error, null, 2)}`);
+            return Promise.reject(error.response.body);
+        });
+}
+
 // perform parameter validation that is common to both feed actions
 function performCommonParameterValidation(rawParams) {
     var validatedParams = {};
@@ -166,11 +190,11 @@ function performCommonParameterValidation(rawParams) {
     return { validatedParams: validatedParams };
 }
 
-function webResponse(code, body) {
+function webResponse(code, body, contentType = 'text/plain') {
     return {
         statusCode: code,
         headers: {
-            'Content-Type': 'text/plain'
+            'Content-Type': contentType
         },
         body: body
     };
@@ -179,6 +203,7 @@ function webResponse(code, body) {
 module.exports = {
     'createTrigger': createTrigger,
     'deleteTrigger': deleteTrigger,
+    'getTrigger': getTrigger,
     'getBooleanFromArgs': getBooleanFromArgs,
     'getTriggerFQN': getTriggerFQN,
     'getTriggerURL': getTriggerURL,
diff --git a/action/messageHubFeed.js b/action/messageHubFeed.js
index 818520a..3c2ae7c 100644
--- a/action/messageHubFeed.js
+++ b/action/messageHubFeed.js
@@ -20,6 +20,8 @@ function main(params) {
 
     if (params.lifecycleEvent === 'CREATE') {
         return common.createTrigger(endpoint, massagedParams, webActionName);
+    } else if (params.lifecycleEvent === 'READ') {
+        return common.getTrigger(endpoint, massagedParams, webActionName);
     } else if (params.lifecycleEvent === 'DELETE') {
         return common.deleteTrigger(endpoint, massagedParams, webActionName);
     }
diff --git a/action/messageHubFeedWeb.js b/action/messageHubFeedWeb.js
index 042096a..b6c5523 100644
--- a/action/messageHubFeedWeb.js
+++ b/action/messageHubFeedWeb.js
@@ -64,6 +64,35 @@ function main(params) {
 
                     resolve(common.webResponse(statusCode, body));
                 });
+        } else if (params.__ow_method === "get") {
+            const triggerURL = common.getTriggerURL(params.authKey, params.endpoint, params.triggerName);
+
+            return common.verifyTriggerAuth(triggerURL)
+                .then(() => {
+                    db = new Database(params.DB_URL, params.DB_NAME);
+                    return db.getTrigger(params.triggerName);
+                })
+                .then((triggerDoc) => {
+                    var body = {
+                        config: {
+                            triggerName: triggerDoc.triggerName,
+                            topic: triggerDoc.topic,
+                            isJSONData: triggerDoc.isJSONData,
+                            isBinaryValue: triggerDoc.isBinaryValue,
+                            isBinaryKey: triggerDoc.isBinaryKey,
+                            isMessageHub: triggerDoc.isMessageHub,
+                            brokers: triggerDoc.brokers,
+                            kafka_admin_url: triggerDoc.kafka_admin_url,
+                            username: triggerDoc.username,
+                            password: triggerDoc.password
+                        },
+                        status: triggerDoc.status
+                    }
+                    resolve(common.webResponse(200, body, 'application/json'));
+                })
+                .catch(error => {
+                    resolve(common.webResponse(500, error.toString()));
+                });
         } else if (params.__ow_method === "delete") {
             const triggerURL = common.getTriggerURL(params.authKey, params.endpoint, params.triggerName);
 
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index eddc077..279772b 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -27,12 +27,11 @@ import org.junit.runner.RunWith
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FlatSpec
 import org.scalatest.Matchers
+import org.scalatest.Inside
 import org.scalatest.junit.JUnitRunner
 
 import spray.json.DefaultJsonProtocol._
-import spray.json.JsObject
-import spray.json.JsString
-import spray.json.pimpAny
+import spray.json._
 
 import common.JsHelpers
 import common.TestHelpers
@@ -49,6 +48,7 @@ import java.nio.charset.StandardCharsets
 class MessageHubFeedTests
   extends FlatSpec
   with Matchers
+  with Inside
   with WskActorSystem
   with BeforeAndAfterAll
   with TestHelpers
@@ -274,6 +274,67 @@ class MessageHubFeedTests
       assert(matchingActivations.length == 0)
   }
 
+  it should "return correct status and configuration" in withAssetCleaner(wskprops) {
+    val currentTime = s"${System.currentTimeMillis}"
+
+    (wp, assetHelper) =>
+      val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
+      println(s"Creating trigger ${triggerName}")
+
+      val username = kafkaUtils.getAsJson("user")
+      val password = kafkaUtils.getAsJson("password")
+      val admin_url = kafkaUtils.getAsJson("kafka_admin_url")
+      val brokers = kafkaUtils.getAsJson("brokers")
+
+      createTrigger(assetHelper, triggerName, parameters = Map(
+        "user" -> username,
+        "password" -> password,
+        "api_key" -> kafkaUtils.getAsJson("api_key"),
+        "kafka_admin_url" -> admin_url,
+        "kafka_brokers_sasl" -> brokers,
+        "topic" -> topic.toJson,
+        "isBinaryKey" -> false.toJson,
+        "isBinaryValue" -> false.toJson
+      ))
+
+      val run = wsk.action.invoke(actionName, parameters = Map(
+        "triggerName" -> triggerName.toJson,
+        "lifecycleEvent" -> "READ".toJson,
+        "authKey" -> wp.authKey.toJson
+      ))
+
+      withActivation(wsk.activation, run) {
+        activation =>
+          activation.response.success shouldBe true
+
+          inside (activation.response.result) {
+            case Some(result) =>
+              val config = result.getFields("config").head.asInstanceOf[JsObject].fields
+              val status = result.getFields("status").head.asInstanceOf[JsObject].fields
+
+              config should contain("brokers" -> brokers)
+              config should contain("isBinaryKey" -> false.toJson)
+              config should contain("isBinaryValue" -> false.toJson)
+              config should contain("isJSONData" -> false.toJson)
+              config should contain("isMessageHub" -> true.toJson)
+              config should contain("kafka_admin_url" -> admin_url)
+              config should contain("password" -> password)
+              config should contain("topic" -> topic.toJson)
+              config should contain("username" -> username)
+              config("triggerName").convertTo[String].split("/").last should equal (triggerName.split("/").last)
+              config should not {
+                contain key "authKey"
+                contain key "triggerURL"
+                contain key "uuid"
+                contain key "worker"
+              }
+              status should contain("active" -> true.toJson)
+              status should contain key "dateChanged"
+              status should not(contain key "reason")
+          }
+      }
+  }
+
   def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = {
     val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) {
       (trigger, _) =>

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].