You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ho...@apache.org on 2017/09/21 19:42:05 UTC

[incubator-openwhisk-wskdeploy] branch master updated: Adding integration test on message hub (#528)

This is an automated email from the ASF dual-hosted git repository.

houshengbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-wskdeploy.git


The following commit(s) were added to refs/heads/master by this push:
     new ea0cfa6  Adding integration test on message hub (#528)
ea0cfa6 is described below

commit ea0cfa6c390c75dbbdd485f9246e196e8157100e
Author: Priti Desai <pd...@us.ibm.com>
AuthorDate: Thu Sep 21 12:42:03 2017 -0700

    Adding integration test on message hub (#528)
    
    * Adding integratin test on message hub
    
    * updating test name
    
    * adding newline at EOF
    
    * fixing trigger name
    
    * adding topic while creating message hub binding
---
 tests/src/integration/message-hub/deployment.yaml  | 15 ++++
 tests/src/integration/message-hub/manifest.yaml    | 30 ++++++++
 .../integration/message-hub/message-hub_test.go    | 61 ++++++++++++++++
 tests/src/integration/message-hub/src/events.json  | 41 +++++++++++
 .../message-hub/src/process-messages.js            | 53 ++++++++++++++
 .../message-hub/src/receive-messages.js            | 69 ++++++++++++++++++
 .../data-processing-app/actions/mhget/mhget.js     | 83 ----------------------
 .../data-processing-app/actions/mhpost/index.js    | 59 ---------------
 .../actions/mhpost/package.json                    | 11 ---
 tests/usecases/data-processing-app/deployment.yaml | 14 ----
 tests/usecases/data-processing-app/manifest.yaml   | 23 ------
 11 files changed, 269 insertions(+), 190 deletions(-)

diff --git a/tests/src/integration/message-hub/deployment.yaml b/tests/src/integration/message-hub/deployment.yaml
new file mode 100644
index 0000000..5f6413a
--- /dev/null
+++ b/tests/src/integration/message-hub/deployment.yaml
@@ -0,0 +1,15 @@
+application:
+    name: DataProcessingApp
+    packages:
+        data-processing-with-messagehub:
+            actions:
+                process-messages-received-from-messagehub:
+                    inputs:
+                        messagehub_instance: wskdeployMessageHub
+                        topic:  $DESTINATION_TOPIC
+            triggers:
+                wskdeployMessageHubTrigger:
+                    inputs:
+                        isJSONData: true
+                        topic: $SOURCE_TOPIC
+
diff --git a/tests/src/integration/message-hub/manifest.yaml b/tests/src/integration/message-hub/manifest.yaml
new file mode 100644
index 0000000..ddfcdb7
--- /dev/null
+++ b/tests/src/integration/message-hub/manifest.yaml
@@ -0,0 +1,30 @@
+packages:
+    data-processing-with-messagehub:
+        dependencies:
+            wskdeployMessageHub:
+                location: /whisk.system/messaging
+                inputs:
+                    user: $MESSAGEHUB_USER
+                    password: $MESSAGEHUB_PASSWORD
+                    kafka_admin_url: $MESSAGEHUB_ADMIN_HOST
+                    kafka_brokers_sasl: $KAFKA_BROKERS_SASL
+                    topic: $SOURCE_TOPIC
+        actions:
+            receive-messages-from-messagehub:
+                function: src/receive-messages.js
+                runtime: nodejs:6
+            process-messages-received-from-messagehub:
+                function: src/process-messages.js
+                runtime: nodejs:6
+        sequences:
+            data-processing-sequence:
+                actions: receive-messages-from-messagehub, process-messages-received-from-messagehub
+        triggers:
+            wskdeployMessageHubTrigger:
+                feed: wskdeployMessageHub/messageHubFeed
+        rules:
+            data-processing-rule:
+                trigger: wskdeployMessageHubTrigger
+                action: data-processing-sequence
+
+
diff --git a/tests/src/integration/message-hub/message-hub_test.go b/tests/src/integration/message-hub/message-hub_test.go
new file mode 100644
index 0000000..d039356
--- /dev/null
+++ b/tests/src/integration/message-hub/message-hub_test.go
@@ -0,0 +1,61 @@
+// +build integration
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package tests
+
+import (
+	"fmt"
+	"github.com/apache/incubator-openwhisk-wskdeploy/tests/src/integration/common"
+	"github.com/stretchr/testify/assert"
+	"os"
+	"testing"
+)
+
+/* *
+ * Please configure following env. variables in order to run this integration test:
+ * 	BLUEMIX_APIHOST
+ *	BLUEMIX_NAMESPACE
+ * 	BLUEMIX_AUTH
+ *	MESSAGEHUB_USER
+ *	MESSAGEHUB_PASSWORD
+ */
+func TestMessageHub(t *testing.T) {
+	os.Setenv("MESSAGEHUB_ADMIN_HOST", "https://kafka-admin-prod01.messagehub.services.us-south.bluemix.net:443")
+	os.Setenv("KAFKA_BROKERS_SASL", "[kafka01-prod01.messagehub.services.us-south.bluemix.net:9093 kafka02-prod01.messagehub.services.us-south.bluemix.net:9093 kafka03-prod01.messagehub.services.us-south.bluemix.net:9093 kafka04-prod01.messagehub.services.us-south.bluemix.net:9093 kafka05-prod01.messagehub.services.us-south.bluemix.net:9093]")
+	os.Setenv("SOURCE_TOPIC", "in-topic")
+	os.Setenv("DESTINATION_TOPIC", "out-topic")
+
+	wskprops := common.GetWskpropsFromEnvVars(common.BLUEMIX_APIHOST, common.BLUEMIX_NAMESPACE, common.BLUEMIX_AUTH)
+	err := common.ValidateWskprops(wskprops)
+	if err != nil {
+		fmt.Println(err.Error())
+		fmt.Println("Wsk properties are not properly configured, so tests are skipped.")
+	} else {
+		wskdeploy := common.NewWskdeploy()
+		_, err := wskdeploy.DeployWithCredentials(manifestPath, deploymentPath, wskprops)
+		assert.Equal(t, nil, err, "Failed to deploy the manifest file.")
+		_, err = wskdeploy.UndeployWithCredentials(manifestPath, deploymentPath, wskprops)
+		assert.Equal(t, nil, err, "Failed to undeploy the manifest file.")
+	}
+}
+
+var (
+	manifestPath   = os.Getenv("GOPATH") + "/src/github.com/apache/incubator-openwhisk-wskdeploy/tests/src/integration/message-hub/manifest.yaml"
+	deploymentPath = os.Getenv("GOPATH") + "/src/github.com/apache/incubator-openwhisk-wskdeploy/tests/src/integration/message-hub/deployment.yaml"
+)
diff --git a/tests/src/integration/message-hub/src/events.json b/tests/src/integration/message-hub/src/events.json
new file mode 100644
index 0000000..f2e87da
--- /dev/null
+++ b/tests/src/integration/message-hub/src/events.json
@@ -0,0 +1,41 @@
+{
+  "events": [{
+    "id": "1",
+    "eventType": "update",
+    "timestamp": "2017-09-01T11:11:11.111+02",
+    "payload": {
+      "category": 4,
+      "name": "Harvey",
+      "location": "Houston"
+    }
+  }, {
+    "id": "2",
+    "eventType": "update",
+    "timestamp": "2017-09-10T11:11:11.111+02",
+    "payload": {
+      "category": 3,
+      "name": "Irma",
+      "location": "Florida"
+    }
+  },
+    {
+      "id": "3",
+      "eventType": "update",
+      "timestamp": "2017-09-15T11:11:11.111+02",
+      "payload": {
+        "category": 1,
+        "name": "Jose",
+        "location": "New York"
+      }
+    }, {
+    "id": "4",
+    "eventType": "update",
+    "timestamp": "2017-09-20T11:11:11.111+02",
+    "payload": {
+      "category": 4,
+      "name": "Maria",
+      "location": "Puerto Rico"
+    }
+  }]
+}
+
diff --git a/tests/src/integration/message-hub/src/process-messages.js b/tests/src/integration/message-hub/src/process-messages.js
new file mode 100644
index 0000000..6fca77f
--- /dev/null
+++ b/tests/src/integration/message-hub/src/process-messages.js
@@ -0,0 +1,53 @@
+
+var openwhisk = require('openwhisk');
+
+/**
+ * Analyze incoming message and generate a summary as a response
+ */
+function transform(events) {
+    var average = 0;
+    for (var i = 0; i < events.length; i++) {
+        average += events[i].payload.category;
+    }
+    average = average / events.length;
+    var result = {
+        "agent": "OpenWhisk action",
+        "events_count": events.length,
+        "avg_category": average
+    };
+    return result;
+}
+
+/**
+ * Process incoming message from the receive-messages action earlier
+ * in the sequence and publish a new message to Message Hub.
+ */
+function main(params) {
+    console.log("DEBUG: Received message as input: " + JSON.stringify(params));
+
+    return new Promise(function(resolve, reject) {
+        if (!params.topic || !params.messagehub_instance || !params.events || !params.events[0]) {
+            reject("Error: Invalid arguments. Must include topic, events[], message hub service name.");
+        }
+
+        var transformedMessage = JSON.stringify(transform(params.events));
+        console.log("DEBUG: Message to be published: " + transformedMessage);
+
+        openwhisk().actions.invoke({
+            name: params.messagehub_instance + '/messageHubProduce',
+            blocking: true,
+            result: true,
+            params: {
+                value: transformedMessage,
+                topic: params.topic
+            }
+        }).then(result => {
+            resolve({
+                "result": "Success: Message was sent to Message Hub."
+            });
+        }).catch(error => {
+            reject(error);
+        });
+
+    });
+}
diff --git a/tests/src/integration/message-hub/src/receive-messages.js b/tests/src/integration/message-hub/src/receive-messages.js
new file mode 100644
index 0000000..7b9e9dd
--- /dev/null
+++ b/tests/src/integration/message-hub/src/receive-messages.js
@@ -0,0 +1,69 @@
+/*
+ This function is bound to a trigger and is initiated when the message arrives
+ via OpenWhisk feed connected to Message Hub. Note that many messages can come in
+ as a large batch. Example input:
+
+{
+  "messages": [{
+    "partition": 0,
+    "key": null,
+    "offset": 252,
+    "topic": "in-topic",
+    "value": {
+      "events": [{
+        "eventType": "update",
+        "id": "1",
+        "timestamp": "2017-09-01T11:11:11.111+02",
+        "payload": {
+          "category": 4,
+          "name": "Harvey",
+          "location": "Houston"
+        }
+      }, {
+        ...
+      }]
+    }
+  }, {
+    ...
+  }]
+}
+
+
+Expected output (merge all events from multiple 'messages' into one big 'events'):
+{
+  "events": [{
+        "eventType": "update",
+        "id": "1",
+        "timestamp": "2017-09-01T11:11:11.111+02",
+        "payload": {
+          "category": 4,
+          "name": "Harvey",
+          "location": "Houston"
+        }
+  }, {
+    ...
+  }]
+}
+**/
+
+function main(params) {
+    console.log("DEBUG: Received the following message as input: " + JSON.stringify(params));
+
+    return new Promise(function(resolve, reject) {
+        if (!params.messages || !params.messages[0] ||
+            !params.messages[0].value || !params.messages[0].value.events) {
+            reject("Invalid arguments. Must include 'messages' JSON array with 'value' field");
+        }
+        var msgs = params.messages;
+        var out = [];
+        for (var i = 0; i < msgs.length; i++) {
+            var msg = msgs[i];
+            for (var j = 0; j < msg.value.events.length; j++) {
+                out.push(msg.value.events[j]);
+            }
+        }
+        resolve({
+            "events": out
+        });
+    });
+}
diff --git a/tests/usecases/data-processing-app/actions/mhget/mhget.js b/tests/usecases/data-processing-app/actions/mhget/mhget.js
deleted file mode 100644
index 2d26852..0000000
--- a/tests/usecases/data-processing-app/actions/mhget/mhget.js
+++ /dev/null
@@ -1,83 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more contributor
-// license agreements; and to You under the Apache License, Version 2.0.
-
-/*
- This function is bound to a trigger and is initiated when the message arrives
- via OpenWhisk feed connected to Message Hub. Note that many messages can come in
- as a large batch. Example input:
-{
-    "messages": [{
-        "partition": 0,
-        "key": null,
-        "offset": 252,
-        "topic": "in-topic",
-        "value": {
-            "events": [{
-                "eventType": "update",
-                "timestamp": "2011-01-01T11:11:11.111+02",
-                "payload": {
-                    "velocity": 11,
-                    "type": "taxi",
-                    "name": "taxi1",
-                    "location": {
-                        "latitude": 11.11,
-                        "longitude": -1.11
-                    }
-                },
-                "deviceType": "taxi",
-                "orgId": "Org1",
-                "deviceId": "taxi1"
-            },
-            { ... }
-            ]
-        }
-    },
-    { ... }
-    ]
-}
-
-Expected output (merge all events from multiple 'messages' into one big 'events'):
-{
-    "events": [{
-        "eventType": "update",
-        "timestamp": "2011-01-01T11:11:11.111+02",
-        "payload": {
-            "velocity": 11,
-            "type": "taxi",
-            "name": "taxi1",
-            "location": {
-                "latitude": 11.11,
-                "longitude": -1.11
-            }
-        },
-        "deviceType": "taxi",
-        "orgId": "Org1",
-        "deviceId": "taxi1"
-    },
-    { ... }
-    ]
-
-}
-**/
-
-function main(params) {
-    console.log("DEBUG: received the following message as input: " + JSON.stringify(params));
-
-    return new Promise(function(resolve, reject) {
-        if (!params.messages || !params.messages[0] ||
-            !params.messages[0].value || !params.messages[0].value.events) {
-            reject("Invalid arguments. Must include 'messages' JSON array with 'value' field");
-        }
-        var msgs = params.messages;
-        var out = [];
-        for (var i = 0; i < msgs.length; i++) {
-            var msg = msgs[i];
-            for (var j = 0; j < msg.value.events.length; j++) {
-                out.push(msg.value.events[j]);
-            }
-        }
-        resolve({
-            "events": out
-        });
-    });
-}
diff --git a/tests/usecases/data-processing-app/actions/mhpost/index.js b/tests/usecases/data-processing-app/actions/mhpost/index.js
deleted file mode 100644
index 520c82d..0000000
--- a/tests/usecases/data-processing-app/actions/mhpost/index.js
+++ /dev/null
@@ -1,59 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more contributor
-// license agreements; and to You under the Apache License, Version 2.0.
-
-/**
- * Analyze incoming message and generate a summary as a response
- */
-function transform(events) {
-  var average = 0;
-  for (var i = 0; i < events.length; i++) {
-    average += events[i].payload.velocity;
-  }
-  average = average / events.length;
-  var result = {
-    "agent": "OpenWhisk action",
-    "events_count": events.length,
-    "avg_velocity": average
-  };
-  return result;
-}
-
-/**
- * Process incoming message and publish it to Message Hub or Kafka.
- * This code is used as the OpenWhisk Action implementation and linked to a trigger via a rule.
- */
-function mhpost(args) {
-  console.log("DEBUG: Received message as input: " + JSON.stringify(args));
-
-  return new Promise(function(resolve, reject) {
-    if (!args.topic || !args.events || !args.events[0] || !args.kafka_rest_url || !args.api_key)
-      reject("Error: Invalid arguments. Must include topic, events[], kafka_rest_url, api_key.");
-
-    // construct CF-style VCAP services JSON
-    var vcap_services = {
-      "messagehub": [{
-        "credentials": {
-          "kafka_rest_url": args.kafka_rest_url,
-          "api_key": args.api_key
-        }
-      }]
-    };
-
-    var MessageHub = require('message-hub-rest');
-    var kafka = new MessageHub(vcap_services);
-    var transformedMessage = transform(args.events);
-    console.log("DEBUG: Message to be published: " + JSON.stringify(transformedMessage));
-
-    kafka.produce(args.topic, transformedMessage)
-      .then(function() {
-        resolve({
-          "result": "Success: Message was sent to IBM Message Hub."
-        });
-      })
-      .fail(function(error) {
-        reject(error);
-      });
-  });
-}
-
-exports.main = mhpost;
diff --git a/tests/usecases/data-processing-app/actions/mhpost/package.json b/tests/usecases/data-processing-app/actions/mhpost/package.json
deleted file mode 100644
index 0ed4e2e..0000000
--- a/tests/usecases/data-processing-app/actions/mhpost/package.json
+++ /dev/null
@@ -1,11 +0,0 @@
-{
-  "name": "mypost",
-  "version": "0.1.0",
-  "description": "Action posting messages to a Message Hub topic",
-  "author": "IBM",
-  "license": "Apache-2.0",
-  "dependencies": {
-    "message-hub-rest": "1.2.0"
-  },
-  "main": "index.js"
-}
\ No newline at end of file
diff --git a/tests/usecases/data-processing-app/deployment.yaml b/tests/usecases/data-processing-app/deployment.yaml
deleted file mode 100644
index 073af2c..0000000
--- a/tests/usecases/data-processing-app/deployment.yaml
+++ /dev/null
@@ -1,14 +0,0 @@
-application:
-  name: testapp
-  namespace: _
-  version: 1.0
-  packages:
-    name: kafka
-    inputs:
-      kafka_rest_url: https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443
-      topic: out-topic
-    triggers:
-      kafka-trigger:
-        inputs:
-          isJSONData: true
-          topic: in-topic
\ No newline at end of file
diff --git a/tests/usecases/data-processing-app/manifest.yaml b/tests/usecases/data-processing-app/manifest.yaml
deleted file mode 100644
index b6fd633..0000000
--- a/tests/usecases/data-processing-app/manifest.yaml
+++ /dev/null
@@ -1,23 +0,0 @@
-package:
-  name: kafka
-  version: 1.0
-  license: Apache-2.0
-  actions:
-    mhpost-action:
-      version: 1.0
-      function: actions/mhpost
-      runtime: nodejs:6
-    mhget-action:
-      version: 1.0
-      function: actions/mhget/mhget.js
-      runtime: nodejs:6
-  triggers:
-    kafka-trigger:
-      feed: Bluemix_kafka-broker_Credentials-1/messageHubFeed
-  sequences:
-    kafka-sequence:
-      actions: mhget-action, kafka/mhpost-action
-  rules:
-    kafka-inbound-rule:
-      trigger: kafka-trigger
-      action: kafka-sequence

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].