You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ho...@apache.org on 2017/09/21 19:42:05 UTC
[incubator-openwhisk-wskdeploy] branch master updated: Adding
integration test on message hub (#528)
This is an automated email from the ASF dual-hosted git repository.
houshengbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-wskdeploy.git
The following commit(s) were added to refs/heads/master by this push:
new ea0cfa6 Adding integration test on message hub (#528)
ea0cfa6 is described below
commit ea0cfa6c390c75dbbdd485f9246e196e8157100e
Author: Priti Desai <pd...@us.ibm.com>
AuthorDate: Thu Sep 21 12:42:03 2017 -0700
Adding integration test on message hub (#528)
* Adding integratin test on message hub
* updating test name
* adding newline at EOF
* fixing trigger name
* adding topic while creating message hub binding
---
tests/src/integration/message-hub/deployment.yaml | 15 ++++
tests/src/integration/message-hub/manifest.yaml | 30 ++++++++
.../integration/message-hub/message-hub_test.go | 61 ++++++++++++++++
tests/src/integration/message-hub/src/events.json | 41 +++++++++++
.../message-hub/src/process-messages.js | 53 ++++++++++++++
.../message-hub/src/receive-messages.js | 69 ++++++++++++++++++
.../data-processing-app/actions/mhget/mhget.js | 83 ----------------------
.../data-processing-app/actions/mhpost/index.js | 59 ---------------
.../actions/mhpost/package.json | 11 ---
tests/usecases/data-processing-app/deployment.yaml | 14 ----
tests/usecases/data-processing-app/manifest.yaml | 23 ------
11 files changed, 269 insertions(+), 190 deletions(-)
diff --git a/tests/src/integration/message-hub/deployment.yaml b/tests/src/integration/message-hub/deployment.yaml
new file mode 100644
index 0000000..5f6413a
--- /dev/null
+++ b/tests/src/integration/message-hub/deployment.yaml
@@ -0,0 +1,15 @@
+application:
+ name: DataProcessingApp
+ packages:
+ data-processing-with-messagehub:
+ actions:
+ process-messages-received-from-messagehub:
+ inputs:
+ messagehub_instance: wskdeployMessageHub
+ topic: $DESTINATION_TOPIC
+ triggers:
+ wskdeployMessageHubTrigger:
+ inputs:
+ isJSONData: true
+ topic: $SOURCE_TOPIC
+
diff --git a/tests/src/integration/message-hub/manifest.yaml b/tests/src/integration/message-hub/manifest.yaml
new file mode 100644
index 0000000..ddfcdb7
--- /dev/null
+++ b/tests/src/integration/message-hub/manifest.yaml
@@ -0,0 +1,30 @@
+packages:
+ data-processing-with-messagehub:
+ dependencies:
+ wskdeployMessageHub:
+ location: /whisk.system/messaging
+ inputs:
+ user: $MESSAGEHUB_USER
+ password: $MESSAGEHUB_PASSWORD
+ kafka_admin_url: $MESSAGEHUB_ADMIN_HOST
+ kafka_brokers_sasl: $KAFKA_BROKERS_SASL
+ topic: $SOURCE_TOPIC
+ actions:
+ receive-messages-from-messagehub:
+ function: src/receive-messages.js
+ runtime: nodejs:6
+ process-messages-received-from-messagehub:
+ function: src/process-messages.js
+ runtime: nodejs:6
+ sequences:
+ data-processing-sequence:
+ actions: receive-messages-from-messagehub, process-messages-received-from-messagehub
+ triggers:
+ wskdeployMessageHubTrigger:
+ feed: wskdeployMessageHub/messageHubFeed
+ rules:
+ data-processing-rule:
+ trigger: wskdeployMessageHubTrigger
+ action: data-processing-sequence
+
+
diff --git a/tests/src/integration/message-hub/message-hub_test.go b/tests/src/integration/message-hub/message-hub_test.go
new file mode 100644
index 0000000..d039356
--- /dev/null
+++ b/tests/src/integration/message-hub/message-hub_test.go
@@ -0,0 +1,61 @@
+// +build integration
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package tests
+
+import (
+ "fmt"
+ "github.com/apache/incubator-openwhisk-wskdeploy/tests/src/integration/common"
+ "github.com/stretchr/testify/assert"
+ "os"
+ "testing"
+)
+
+/* *
+ * Please configure following env. variables in order to run this integration test:
+ * BLUEMIX_APIHOST
+ * BLUEMIX_NAMESPACE
+ * BLUEMIX_AUTH
+ * MESSAGEHUB_USER
+ * MESSAGEHUB_PASSWORD
+ */
+func TestMessageHub(t *testing.T) {
+ os.Setenv("MESSAGEHUB_ADMIN_HOST", "https://kafka-admin-prod01.messagehub.services.us-south.bluemix.net:443")
+ os.Setenv("KAFKA_BROKERS_SASL", "[kafka01-prod01.messagehub.services.us-south.bluemix.net:9093 kafka02-prod01.messagehub.services.us-south.bluemix.net:9093 kafka03-prod01.messagehub.services.us-south.bluemix.net:9093 kafka04-prod01.messagehub.services.us-south.bluemix.net:9093 kafka05-prod01.messagehub.services.us-south.bluemix.net:9093]")
+ os.Setenv("SOURCE_TOPIC", "in-topic")
+ os.Setenv("DESTINATION_TOPIC", "out-topic")
+
+ wskprops := common.GetWskpropsFromEnvVars(common.BLUEMIX_APIHOST, common.BLUEMIX_NAMESPACE, common.BLUEMIX_AUTH)
+ err := common.ValidateWskprops(wskprops)
+ if err != nil {
+ fmt.Println(err.Error())
+ fmt.Println("Wsk properties are not properly configured, so tests are skipped.")
+ } else {
+ wskdeploy := common.NewWskdeploy()
+ _, err := wskdeploy.DeployWithCredentials(manifestPath, deploymentPath, wskprops)
+ assert.Equal(t, nil, err, "Failed to deploy the manifest file.")
+ _, err = wskdeploy.UndeployWithCredentials(manifestPath, deploymentPath, wskprops)
+ assert.Equal(t, nil, err, "Failed to undeploy the manifest file.")
+ }
+}
+
+var (
+ manifestPath = os.Getenv("GOPATH") + "/src/github.com/apache/incubator-openwhisk-wskdeploy/tests/src/integration/message-hub/manifest.yaml"
+ deploymentPath = os.Getenv("GOPATH") + "/src/github.com/apache/incubator-openwhisk-wskdeploy/tests/src/integration/message-hub/deployment.yaml"
+)
diff --git a/tests/src/integration/message-hub/src/events.json b/tests/src/integration/message-hub/src/events.json
new file mode 100644
index 0000000..f2e87da
--- /dev/null
+++ b/tests/src/integration/message-hub/src/events.json
@@ -0,0 +1,41 @@
+{
+ "events": [{
+ "id": "1",
+ "eventType": "update",
+ "timestamp": "2017-09-01T11:11:11.111+02",
+ "payload": {
+ "category": 4,
+ "name": "Harvey",
+ "location": "Houston"
+ }
+ }, {
+ "id": "2",
+ "eventType": "update",
+ "timestamp": "2017-09-10T11:11:11.111+02",
+ "payload": {
+ "category": 3,
+ "name": "Irma",
+ "location": "Florida"
+ }
+ },
+ {
+ "id": "3",
+ "eventType": "update",
+ "timestamp": "2017-09-15T11:11:11.111+02",
+ "payload": {
+ "category": 1,
+ "name": "Jose",
+ "location": "New York"
+ }
+ }, {
+ "id": "4",
+ "eventType": "update",
+ "timestamp": "2017-09-20T11:11:11.111+02",
+ "payload": {
+ "category": 4,
+ "name": "Maria",
+ "location": "Puerto Rico"
+ }
+ }]
+}
+
diff --git a/tests/src/integration/message-hub/src/process-messages.js b/tests/src/integration/message-hub/src/process-messages.js
new file mode 100644
index 0000000..6fca77f
--- /dev/null
+++ b/tests/src/integration/message-hub/src/process-messages.js
@@ -0,0 +1,53 @@
+
+var openwhisk = require('openwhisk');
+
+/**
+ * Analyze incoming message and generate a summary as a response
+ */
+function transform(events) {
+ var average = 0;
+ for (var i = 0; i < events.length; i++) {
+ average += events[i].payload.category;
+ }
+ average = average / events.length;
+ var result = {
+ "agent": "OpenWhisk action",
+ "events_count": events.length,
+ "avg_category": average
+ };
+ return result;
+}
+
+/**
+ * Process incoming message from the receive-messages action earlier
+ * in the sequence and publish a new message to Message Hub.
+ */
+function main(params) {
+ console.log("DEBUG: Received message as input: " + JSON.stringify(params));
+
+ return new Promise(function(resolve, reject) {
+ if (!params.topic || !params.messagehub_instance || !params.events || !params.events[0]) {
+ reject("Error: Invalid arguments. Must include topic, events[], message hub service name.");
+ }
+
+ var transformedMessage = JSON.stringify(transform(params.events));
+ console.log("DEBUG: Message to be published: " + transformedMessage);
+
+ openwhisk().actions.invoke({
+ name: params.messagehub_instance + '/messageHubProduce',
+ blocking: true,
+ result: true,
+ params: {
+ value: transformedMessage,
+ topic: params.topic
+ }
+ }).then(result => {
+ resolve({
+ "result": "Success: Message was sent to Message Hub."
+ });
+ }).catch(error => {
+ reject(error);
+ });
+
+ });
+}
diff --git a/tests/src/integration/message-hub/src/receive-messages.js b/tests/src/integration/message-hub/src/receive-messages.js
new file mode 100644
index 0000000..7b9e9dd
--- /dev/null
+++ b/tests/src/integration/message-hub/src/receive-messages.js
@@ -0,0 +1,69 @@
+/*
+ This function is bound to a trigger and is initiated when the message arrives
+ via OpenWhisk feed connected to Message Hub. Note that many messages can come in
+ as a large batch. Example input:
+
+{
+ "messages": [{
+ "partition": 0,
+ "key": null,
+ "offset": 252,
+ "topic": "in-topic",
+ "value": {
+ "events": [{
+ "eventType": "update",
+ "id": "1",
+ "timestamp": "2017-09-01T11:11:11.111+02",
+ "payload": {
+ "category": 4,
+ "name": "Harvey",
+ "location": "Houston"
+ }
+ }, {
+ ...
+ }]
+ }
+ }, {
+ ...
+ }]
+}
+
+
+Expected output (merge all events from multiple 'messages' into one big 'events'):
+{
+ "events": [{
+ "eventType": "update",
+ "id": "1",
+ "timestamp": "2017-09-01T11:11:11.111+02",
+ "payload": {
+ "category": 4,
+ "name": "Harvey",
+ "location": "Houston"
+ }
+ }, {
+ ...
+ }]
+}
+**/
+
+function main(params) {
+ console.log("DEBUG: Received the following message as input: " + JSON.stringify(params));
+
+ return new Promise(function(resolve, reject) {
+ if (!params.messages || !params.messages[0] ||
+ !params.messages[0].value || !params.messages[0].value.events) {
+ reject("Invalid arguments. Must include 'messages' JSON array with 'value' field");
+ }
+ var msgs = params.messages;
+ var out = [];
+ for (var i = 0; i < msgs.length; i++) {
+ var msg = msgs[i];
+ for (var j = 0; j < msg.value.events.length; j++) {
+ out.push(msg.value.events[j]);
+ }
+ }
+ resolve({
+ "events": out
+ });
+ });
+}
diff --git a/tests/usecases/data-processing-app/actions/mhget/mhget.js b/tests/usecases/data-processing-app/actions/mhget/mhget.js
deleted file mode 100644
index 2d26852..0000000
--- a/tests/usecases/data-processing-app/actions/mhget/mhget.js
+++ /dev/null
@@ -1,83 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more contributor
-// license agreements; and to You under the Apache License, Version 2.0.
-
-/*
- This function is bound to a trigger and is initiated when the message arrives
- via OpenWhisk feed connected to Message Hub. Note that many messages can come in
- as a large batch. Example input:
-{
- "messages": [{
- "partition": 0,
- "key": null,
- "offset": 252,
- "topic": "in-topic",
- "value": {
- "events": [{
- "eventType": "update",
- "timestamp": "2011-01-01T11:11:11.111+02",
- "payload": {
- "velocity": 11,
- "type": "taxi",
- "name": "taxi1",
- "location": {
- "latitude": 11.11,
- "longitude": -1.11
- }
- },
- "deviceType": "taxi",
- "orgId": "Org1",
- "deviceId": "taxi1"
- },
- { ... }
- ]
- }
- },
- { ... }
- ]
-}
-
-Expected output (merge all events from multiple 'messages' into one big 'events'):
-{
- "events": [{
- "eventType": "update",
- "timestamp": "2011-01-01T11:11:11.111+02",
- "payload": {
- "velocity": 11,
- "type": "taxi",
- "name": "taxi1",
- "location": {
- "latitude": 11.11,
- "longitude": -1.11
- }
- },
- "deviceType": "taxi",
- "orgId": "Org1",
- "deviceId": "taxi1"
- },
- { ... }
- ]
-
-}
-**/
-
-function main(params) {
- console.log("DEBUG: received the following message as input: " + JSON.stringify(params));
-
- return new Promise(function(resolve, reject) {
- if (!params.messages || !params.messages[0] ||
- !params.messages[0].value || !params.messages[0].value.events) {
- reject("Invalid arguments. Must include 'messages' JSON array with 'value' field");
- }
- var msgs = params.messages;
- var out = [];
- for (var i = 0; i < msgs.length; i++) {
- var msg = msgs[i];
- for (var j = 0; j < msg.value.events.length; j++) {
- out.push(msg.value.events[j]);
- }
- }
- resolve({
- "events": out
- });
- });
-}
diff --git a/tests/usecases/data-processing-app/actions/mhpost/index.js b/tests/usecases/data-processing-app/actions/mhpost/index.js
deleted file mode 100644
index 520c82d..0000000
--- a/tests/usecases/data-processing-app/actions/mhpost/index.js
+++ /dev/null
@@ -1,59 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more contributor
-// license agreements; and to You under the Apache License, Version 2.0.
-
-/**
- * Analyze incoming message and generate a summary as a response
- */
-function transform(events) {
- var average = 0;
- for (var i = 0; i < events.length; i++) {
- average += events[i].payload.velocity;
- }
- average = average / events.length;
- var result = {
- "agent": "OpenWhisk action",
- "events_count": events.length,
- "avg_velocity": average
- };
- return result;
-}
-
-/**
- * Process incoming message and publish it to Message Hub or Kafka.
- * This code is used as the OpenWhisk Action implementation and linked to a trigger via a rule.
- */
-function mhpost(args) {
- console.log("DEBUG: Received message as input: " + JSON.stringify(args));
-
- return new Promise(function(resolve, reject) {
- if (!args.topic || !args.events || !args.events[0] || !args.kafka_rest_url || !args.api_key)
- reject("Error: Invalid arguments. Must include topic, events[], kafka_rest_url, api_key.");
-
- // construct CF-style VCAP services JSON
- var vcap_services = {
- "messagehub": [{
- "credentials": {
- "kafka_rest_url": args.kafka_rest_url,
- "api_key": args.api_key
- }
- }]
- };
-
- var MessageHub = require('message-hub-rest');
- var kafka = new MessageHub(vcap_services);
- var transformedMessage = transform(args.events);
- console.log("DEBUG: Message to be published: " + JSON.stringify(transformedMessage));
-
- kafka.produce(args.topic, transformedMessage)
- .then(function() {
- resolve({
- "result": "Success: Message was sent to IBM Message Hub."
- });
- })
- .fail(function(error) {
- reject(error);
- });
- });
-}
-
-exports.main = mhpost;
diff --git a/tests/usecases/data-processing-app/actions/mhpost/package.json b/tests/usecases/data-processing-app/actions/mhpost/package.json
deleted file mode 100644
index 0ed4e2e..0000000
--- a/tests/usecases/data-processing-app/actions/mhpost/package.json
+++ /dev/null
@@ -1,11 +0,0 @@
-{
- "name": "mypost",
- "version": "0.1.0",
- "description": "Action posting messages to a Message Hub topic",
- "author": "IBM",
- "license": "Apache-2.0",
- "dependencies": {
- "message-hub-rest": "1.2.0"
- },
- "main": "index.js"
-}
\ No newline at end of file
diff --git a/tests/usecases/data-processing-app/deployment.yaml b/tests/usecases/data-processing-app/deployment.yaml
deleted file mode 100644
index 073af2c..0000000
--- a/tests/usecases/data-processing-app/deployment.yaml
+++ /dev/null
@@ -1,14 +0,0 @@
-application:
- name: testapp
- namespace: _
- version: 1.0
- packages:
- name: kafka
- inputs:
- kafka_rest_url: https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443
- topic: out-topic
- triggers:
- kafka-trigger:
- inputs:
- isJSONData: true
- topic: in-topic
\ No newline at end of file
diff --git a/tests/usecases/data-processing-app/manifest.yaml b/tests/usecases/data-processing-app/manifest.yaml
deleted file mode 100644
index b6fd633..0000000
--- a/tests/usecases/data-processing-app/manifest.yaml
+++ /dev/null
@@ -1,23 +0,0 @@
-package:
- name: kafka
- version: 1.0
- license: Apache-2.0
- actions:
- mhpost-action:
- version: 1.0
- function: actions/mhpost
- runtime: nodejs:6
- mhget-action:
- version: 1.0
- function: actions/mhget/mhget.js
- runtime: nodejs:6
- triggers:
- kafka-trigger:
- feed: Bluemix_kafka-broker_Credentials-1/messageHubFeed
- sequences:
- kafka-sequence:
- actions: mhget-action, kafka/mhpost-action
- rules:
- kafka-inbound-rule:
- trigger: kafka-trigger
- action: kafka-sequence
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].