You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by du...@apache.org on 2019/08/19 16:44:47 UTC

[openwhisk-package-kafka] branch master updated: Test resiliency updates (#341)

This is an automated email from the ASF dual-hosted git repository.

dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk-package-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e4aa15  Test resiliency updates (#341)
7e4aa15 is described below

commit 7e4aa15e9f7a98d329ab63cd53b9a6e3067adb9c
Author: James Dubee <jw...@us.ibm.com>
AuthorDate: Mon Aug 19 12:44:42 2019 -0400

    Test resiliency updates (#341)
---
 .../system/packages/MessageHubFeedTests.scala      | 13 +++++++++--
 tests/src/test/scala/system/utils/KafkaUtils.scala | 25 +++++++++++-----------
 2 files changed, 24 insertions(+), 14 deletions(-)

diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index a40c75c..0c05d63 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -281,7 +281,7 @@ class MessageHubFeedTests
       val admin_url = getAsJson("kafka_admin_url")
       val brokers = getAsJson("brokers")
 
-      createTrigger(assetHelper, triggerName, parameters = Map(
+      val uuid = createTrigger(assetHelper, triggerName, parameters = Map(
         "user" -> username,
         "password" -> password,
         "api_key" -> getAsJson("api_key"),
@@ -312,6 +312,8 @@ class MessageHubFeedTests
           }
       }
 
+      consumerExists(uuid)
+
       val updateRunResult = wsk.action.invoke(actionName, parameters = Map(
         "triggerName" -> triggerName.toJson,
         "lifecycleEvent" -> "UPDATE".toJson,
@@ -326,6 +328,8 @@ class MessageHubFeedTests
           activation.response.success shouldBe true
       }
 
+      consumerExists(uuid)
+
       val run = wsk.action.invoke(actionName, parameters = Map(
         "triggerName" -> triggerName.toJson,
         "lifecycleEvent" -> "READ".toJson,
@@ -344,6 +348,8 @@ class MessageHubFeedTests
               config should contain("isJSONData" -> false.toJson)
           }
       }
+
+      consumerExists(uuid)
   }
 
   it should "fire a trigger when a message is posted to message hub before and after update" in withAssetCleaner(wskprops) {
@@ -353,7 +359,7 @@ class MessageHubFeedTests
       val key = "TheKey"
       val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
 
-      createTrigger(assetHelper, triggerName, parameters = Map(
+      val uuid = createTrigger(assetHelper, triggerName, parameters = Map(
         "user" -> getAsJson("user"),
         "password" -> getAsJson("password"),
         "api_key" -> getAsJson("api_key"),
@@ -405,6 +411,9 @@ class MessageHubFeedTests
 
       println("Giving the consumer a moment to get ready")
       Thread.sleep(consumerInitTime)
+
+      consumerExists(uuid)
+
       produceMessage(topic, key, verificationName2)
       retry(wsk.trigger.get(verificationName2), 60, Some(1.second))
   }
diff --git a/tests/src/test/scala/system/utils/KafkaUtils.scala b/tests/src/test/scala/system/utils/KafkaUtils.scala
index 2674746..a08a2b3 100644
--- a/tests/src/test/scala/system/utils/KafkaUtils.scala
+++ b/tests/src/test/scala/system/utils/KafkaUtils.scala
@@ -34,10 +34,10 @@ import system.packages.ActionHelper._
 import org.apache.openwhisk.utils.JsHelpers
 
 import scala.concurrent.duration.DurationInt
-import scala.language.postfixOps
 import common.TestHelpers
 import common.TestUtils
 import common.WskTestHelpers
+import common.ActivationResult
 import org.apache.openwhisk.utils.retry
 import org.apache.kafka.clients.producer.ProducerRecord
 
@@ -66,7 +66,7 @@ trait KafkaUtils extends TestHelpers with WskTestHelpers {
         new RestAssuredConfig().sslConfig(config)
     }
 
-    def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = {
+    def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]): String = {
         println(s"Creating trigger $name")
 
         val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) {
@@ -74,18 +74,19 @@ trait KafkaUtils extends TestHelpers with WskTestHelpers {
                 trigger.create(name, feed = Some(s"/whisk.system/messaging/messageHubFeed"), parameters = parameters)
         }
 
-        withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
-            activation =>
-                // should be successful
-                activation.response.success shouldBe true
+        val activation = wsk.parseJsonString(feedCreationResult.stdout.substring(0, feedCreationResult.stdout.indexOf("ok: created trigger"))).convertTo[ActivationResult]
 
-                // It takes a moment for the consumer to fully initialize.
-                println("Giving the consumer a moment to get ready")
-                Thread.sleep(KafkaUtils.consumerInitTime)
+        // should be successful
+        activation.response.success shouldBe true
 
-                val uuid = activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"", "")
-                consumerExists(uuid)
-        }
+        // It takes a moment for the consumer to fully initialize.
+        println("Giving the consumer a moment to get ready")
+        Thread.sleep(KafkaUtils.consumerInitTime)
+
+        val uuid = activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"", "")
+        consumerExists(uuid)
+
+        uuid
     }