You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by du...@apache.org on 2018/02/22 17:16:39 UTC

[incubator-openwhisk-package-kafka] branch master updated: Improvements to BasicHealthTest (#249)

This is an automated email from the ASF dual-hosted git repository.

dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new c306261  Improvements to BasicHealthTest (#249)
c306261 is described below

commit c3062612f65d510ce8a6c53615b083d4278b5c50
Author: Adnan Baruni <ab...@users.noreply.github.com>
AuthorDate: Thu Feb 22 11:16:36 2018 -0600

    Improvements to BasicHealthTest (#249)
    
    
    * remove message producing from retry
    
    * unique rule name
    
    * account for multiple workers and thus multiple health urls when checking for consumer uuid
---
 .../test/scala/system/health/BasicHealthTest.scala | 158 +++++++++------------
 1 file changed, 68 insertions(+), 90 deletions(-)

diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala b/tests/src/test/scala/system/health/BasicHealthTest.scala
index f88a164..52b176d 100644
--- a/tests/src/test/scala/system/health/BasicHealthTest.scala
+++ b/tests/src/test/scala/system/health/BasicHealthTest.scala
@@ -17,9 +17,6 @@
 
 package system.health
 
-import java.time.Clock
-import java.time.Instant
-
 import system.utils.KafkaUtils
 
 import scala.concurrent.duration.DurationInt
@@ -30,15 +27,12 @@ import org.scalatest.junit.JUnitRunner
 import common.JsHelpers
 import common.TestHelpers
 import common.TestUtils
-import common.TestUtils.DONTCARE_EXIT
-import common.TestUtils.NOT_FOUND
-import common.TestUtils.SUCCESS_EXIT
 import common.Wsk
 import common.WskActorSystem
 import common.WskProps
 import common.WskTestHelpers
+import spray.json._
 import spray.json.DefaultJsonProtocol._
-import spray.json.{JsObject, pimpAny}
 import com.jayway.restassured.RestAssured
 import whisk.utils.retry;
 
@@ -62,7 +56,7 @@ class BasicHealthTest
   val messagingPackage = "/whisk.system/messaging"
   val messageHubFeed = "messageHubFeed"
   val messageHubProduce = "messageHubProduce"
-  val actionName = s"${messagingPackage}/${messageHubFeed}"
+  val actionName = s"$messagingPackage/$messageHubFeed"
 
   val consumerInitTime = 10000 // ms
 
@@ -74,7 +68,7 @@ class BasicHealthTest
 
   it should "create a new trigger" in withAssetCleaner(wskprops) {
     val triggerName = s"newTrigger-${System.currentTimeMillis}"
-    println(s"Creating trigger ${triggerName}")
+    println(s"Creating trigger $triggerName")
 
     (wp, assetHelper) =>
       val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
@@ -93,12 +87,30 @@ class BasicHealthTest
         activation =>
           // should be successful
           activation.response.success shouldBe true
-          val uuid = activation.response.result.get.fields.get("uuid").get.toString
+          val uuid = activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"", "")
+
+          // get /health endpoint(s) and ensure it contains the new uuid
+          val healthUrls = System.getProperty("health_url").split("\\s*,\\s*").filterNot(_.isEmpty)
+          healthUrls shouldNot be(empty)
 
-          // get /health endpoint and ensure it contains the new uuid
           retry({
-            val response = RestAssured.given().get(System.getProperty("health_url"))
-            assert(response.statusCode() == 200 && response.asString().contains(uuid))
+            val uuids = healthUrls.flatMap(u => {
+              val response = RestAssured.given().get(u)
+              response.statusCode() should be(200)
+              response.asString()
+                .parseJson
+                .asJsObject
+                .getFields("consumers")
+                .head
+                .convertTo[JsArray]
+                .elements
+                .flatMap(c => {
+                  c.asJsObject.fields.keySet
+                })
+            }).toList
+
+            uuids should contain(uuid)
+
           }, N = 3, waitBeforeRetry = Some(1.second))
       }
   }
@@ -106,96 +118,62 @@ class BasicHealthTest
   it should "fire a trigger when a message is posted to message hub" in withAssetCleaner(wskprops) {
     val currentTime = s"${System.currentTimeMillis}"
 
-    val baseTriggerName = "/_/BasicHealthTestTrigger"
+    (wp, assetHelper) =>
+      val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
+      println(s"Creating trigger $triggerName")
 
-    val triggerName = System.getProperty("trigger.suffix") match {
-      case suffix if suffix != "" && suffix != null => s"${baseTriggerName}-${suffix}"
-      case _ => s"${baseTriggerName}-${currentTime}"
-    }
+      val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
+        (trigger, _) =>
+          trigger.create(triggerName, feed = Some(actionName), parameters = Map(
+            "user" -> kafkaUtils.getAsJson("user"),
+            "password" -> kafkaUtils.getAsJson("password"),
+            "api_key" -> kafkaUtils.getAsJson("api_key"),
+            "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
+            "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+            "topic" -> topic.toJson
+          ))
+      }
 
-    (wp, assetHelper) =>
-      val result = wsk.trigger.get(triggerName, expectedExitCode = DONTCARE_EXIT)
-
-      if (result.exitCode == NOT_FOUND) {
-        // trigger does not yet exist, create it
-        println(s"Creating trigger ${triggerName}")
-
-        val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
-          (trigger, _) =>
-            trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
-              "user" -> kafkaUtils.getAsJson("user"),
-              "password" -> kafkaUtils.getAsJson("password"),
-              "api_key" -> kafkaUtils.getAsJson("api_key"),
-              "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
-              "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
-              "topic" -> topic.toJson
-            ))
-        }
-
-        withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
-          activation =>
-            // should be successful
-            activation.response.success shouldBe true
-        }
-
-        // It takes a moment for the consumer to fully initialize.
-        println("Giving the consumer a moment to get ready")
-        Thread.sleep(consumerInitTime)
-      } else {
-        result.exitCode shouldBe (SUCCESS_EXIT)
-        println(s"Trigger already exists, reusing it: $triggerName")
+      withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
+        activation =>
+          // should be successful
+          activation.response.success shouldBe true
       }
 
+      // It takes a moment for the consumer to fully initialize.
+      println("Giving the consumer a moment to get ready")
+      Thread.sleep(consumerInitTime)
+
       val defaultAction = Some(TestUtils.getTestActionFilename("hello.js"))
-      val defaultActionName = s"helloKafka-${currentTime}"
+      val defaultActionName = s"helloKafka-$currentTime"
 
       assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
         action.create(name, defaultAction)
       }
-      assetHelper.withCleaner(wsk.rule, "rule") { (rule, name) =>
+
+      assetHelper.withCleaner(wsk.rule, s"dummyMessageHub-helloKafka-$currentTime") { (rule, name) =>
         rule.create(name, trigger = triggerName, action = defaultActionName)
       }
 
-      retry({
-        val start = Instant.now(Clock.systemUTC())
-        // key to use for the produced message
-        val key = "TheKey"
-
-        println("Producing a message")
-        withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
-          "user" -> kafkaUtils.getAsJson("user"),
-          "password" -> kafkaUtils.getAsJson("password"),
-          "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
-          "topic" -> topic.toJson,
-          "key" -> key.toJson,
-          "value" -> currentTime.toJson
-        ))) {
-          _.response.success shouldBe true
-        }
+      // key to use for the produced message
+      val key = "TheKey"
+
+      println("Producing a message")
+      withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
+        "user" -> kafkaUtils.getAsJson("user"),
+        "password" -> kafkaUtils.getAsJson("password"),
+        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+        "topic" -> topic.toJson,
+        "key" -> key.toJson,
+        "value" -> currentTime.toJson
+      ))) {
+        _.response.success shouldBe true
+      }
 
+      retry({
         println("Polling for activations")
-        val activations = wsk.activation.pollFor(N = 100, Some(triggerName), since = Some(start), retries = maxRetries)
-        assert(activations.length > 0)
-
-        println("Validating content of activation(s)")
-        val matchingActivations = for {
-          id <- activations
-          activation = wsk.activation.waitForActivation(id)
-          if (activation.isRight && activation.right.get.fields.get("response").toString.contains(currentTime))
-        } yield activation.right.get
-
-        assert(matchingActivations.length == 1)
-
-        val activation = matchingActivations.head
-        activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
-
-        // assert that there exists a message in the activation which has the expected keys and values
-        val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = currentTime)
-        assert(messages.length == 1)
-
-        val message = messages.head
-        message.getFieldPath("topic") shouldBe Some(topic.toJson)
-        message.getFieldPath("key") shouldBe Some(key.toJson)
+        val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
+        assert(activations.nonEmpty)
       }, N = 3)
   }
 
@@ -204,7 +182,7 @@ class BasicHealthTest
 
     (wp, assetHelper) =>
       val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
-      println(s"Creating trigger ${triggerName}")
+      println(s"Creating trigger $triggerName")
 
       val username = kafkaUtils.getAsJson("user")
       val password = kafkaUtils.getAsJson("password")

-- 
To stop receiving notification emails like this one, please contact
dubeejw@apache.org.