You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by du...@apache.org on 2018/02/22 17:16:39 UTC
[incubator-openwhisk-package-kafka] branch master updated:
Improvements to BasicHealthTest (#249)
This is an automated email from the ASF dual-hosted git repository.
dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git
The following commit(s) were added to refs/heads/master by this push:
new c306261 Improvements to BasicHealthTest (#249)
c306261 is described below
commit c3062612f65d510ce8a6c53615b083d4278b5c50
Author: Adnan Baruni <ab...@users.noreply.github.com>
AuthorDate: Thu Feb 22 11:16:36 2018 -0600
Improvements to BasicHealthTest (#249)
* remove message producing from retry
* unique rule name
* account for multiple workers and thus multiple health urls when checking for consumer uuid
---
.../test/scala/system/health/BasicHealthTest.scala | 158 +++++++++------------
1 file changed, 68 insertions(+), 90 deletions(-)
diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala b/tests/src/test/scala/system/health/BasicHealthTest.scala
index f88a164..52b176d 100644
--- a/tests/src/test/scala/system/health/BasicHealthTest.scala
+++ b/tests/src/test/scala/system/health/BasicHealthTest.scala
@@ -17,9 +17,6 @@
package system.health
-import java.time.Clock
-import java.time.Instant
-
import system.utils.KafkaUtils
import scala.concurrent.duration.DurationInt
@@ -30,15 +27,12 @@ import org.scalatest.junit.JUnitRunner
import common.JsHelpers
import common.TestHelpers
import common.TestUtils
-import common.TestUtils.DONTCARE_EXIT
-import common.TestUtils.NOT_FOUND
-import common.TestUtils.SUCCESS_EXIT
import common.Wsk
import common.WskActorSystem
import common.WskProps
import common.WskTestHelpers
+import spray.json._
import spray.json.DefaultJsonProtocol._
-import spray.json.{JsObject, pimpAny}
import com.jayway.restassured.RestAssured
import whisk.utils.retry;
@@ -62,7 +56,7 @@ class BasicHealthTest
val messagingPackage = "/whisk.system/messaging"
val messageHubFeed = "messageHubFeed"
val messageHubProduce = "messageHubProduce"
- val actionName = s"${messagingPackage}/${messageHubFeed}"
+ val actionName = s"$messagingPackage/$messageHubFeed"
val consumerInitTime = 10000 // ms
@@ -74,7 +68,7 @@ class BasicHealthTest
it should "create a new trigger" in withAssetCleaner(wskprops) {
val triggerName = s"newTrigger-${System.currentTimeMillis}"
- println(s"Creating trigger ${triggerName}")
+ println(s"Creating trigger $triggerName")
(wp, assetHelper) =>
val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
@@ -93,12 +87,30 @@ class BasicHealthTest
activation =>
// should be successful
activation.response.success shouldBe true
- val uuid = activation.response.result.get.fields.get("uuid").get.toString
+ val uuid = activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"", "")
+
+ // get /health endpoint(s) and ensure it contains the new uuid
+ val healthUrls = System.getProperty("health_url").split("\\s*,\\s*").filterNot(_.isEmpty)
+ healthUrls shouldNot be(empty)
- // get /health endpoint and ensure it contains the new uuid
retry({
- val response = RestAssured.given().get(System.getProperty("health_url"))
- assert(response.statusCode() == 200 && response.asString().contains(uuid))
+ val uuids = healthUrls.flatMap(u => {
+ val response = RestAssured.given().get(u)
+ response.statusCode() should be(200)
+ response.asString()
+ .parseJson
+ .asJsObject
+ .getFields("consumers")
+ .head
+ .convertTo[JsArray]
+ .elements
+ .flatMap(c => {
+ c.asJsObject.fields.keySet
+ })
+ }).toList
+
+ uuids should contain(uuid)
+
}, N = 3, waitBeforeRetry = Some(1.second))
}
}
@@ -106,96 +118,62 @@ class BasicHealthTest
it should "fire a trigger when a message is posted to message hub" in withAssetCleaner(wskprops) {
val currentTime = s"${System.currentTimeMillis}"
- val baseTriggerName = "/_/BasicHealthTestTrigger"
+ (wp, assetHelper) =>
+ val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
+ println(s"Creating trigger $triggerName")
- val triggerName = System.getProperty("trigger.suffix") match {
- case suffix if suffix != "" && suffix != null => s"${baseTriggerName}-${suffix}"
- case _ => s"${baseTriggerName}-${currentTime}"
- }
+ val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
+ (trigger, _) =>
+ trigger.create(triggerName, feed = Some(actionName), parameters = Map(
+ "user" -> kafkaUtils.getAsJson("user"),
+ "password" -> kafkaUtils.getAsJson("password"),
+ "api_key" -> kafkaUtils.getAsJson("api_key"),
+ "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
+ "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+ "topic" -> topic.toJson
+ ))
+ }
- (wp, assetHelper) =>
- val result = wsk.trigger.get(triggerName, expectedExitCode = DONTCARE_EXIT)
-
- if (result.exitCode == NOT_FOUND) {
- // trigger does not yet exist, create it
- println(s"Creating trigger ${triggerName}")
-
- val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
- (trigger, _) =>
- trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "api_key" -> kafkaUtils.getAsJson("api_key"),
- "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
- "topic" -> topic.toJson
- ))
- }
-
- withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
- activation =>
- // should be successful
- activation.response.success shouldBe true
- }
-
- // It takes a moment for the consumer to fully initialize.
- println("Giving the consumer a moment to get ready")
- Thread.sleep(consumerInitTime)
- } else {
- result.exitCode shouldBe (SUCCESS_EXIT)
- println(s"Trigger already exists, reusing it: $triggerName")
+ withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
+ activation =>
+ // should be successful
+ activation.response.success shouldBe true
}
+ // It takes a moment for the consumer to fully initialize.
+ println("Giving the consumer a moment to get ready")
+ Thread.sleep(consumerInitTime)
+
val defaultAction = Some(TestUtils.getTestActionFilename("hello.js"))
- val defaultActionName = s"helloKafka-${currentTime}"
+ val defaultActionName = s"helloKafka-$currentTime"
assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
action.create(name, defaultAction)
}
- assetHelper.withCleaner(wsk.rule, "rule") { (rule, name) =>
+
+ assetHelper.withCleaner(wsk.rule, s"dummyMessageHub-helloKafka-$currentTime") { (rule, name) =>
rule.create(name, trigger = triggerName, action = defaultActionName)
}
- retry({
- val start = Instant.now(Clock.systemUTC())
- // key to use for the produced message
- val key = "TheKey"
-
- println("Producing a message")
- withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
- "topic" -> topic.toJson,
- "key" -> key.toJson,
- "value" -> currentTime.toJson
- ))) {
- _.response.success shouldBe true
- }
+ // key to use for the produced message
+ val key = "TheKey"
+
+ println("Producing a message")
+ withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
+ "user" -> kafkaUtils.getAsJson("user"),
+ "password" -> kafkaUtils.getAsJson("password"),
+ "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+ "topic" -> topic.toJson,
+ "key" -> key.toJson,
+ "value" -> currentTime.toJson
+ ))) {
+ _.response.success shouldBe true
+ }
+ retry({
println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 100, Some(triggerName), since = Some(start), retries = maxRetries)
- assert(activations.length > 0)
-
- println("Validating content of activation(s)")
- val matchingActivations = for {
- id <- activations
- activation = wsk.activation.waitForActivation(id)
- if (activation.isRight && activation.right.get.fields.get("response").toString.contains(currentTime))
- } yield activation.right.get
-
- assert(matchingActivations.length == 1)
-
- val activation = matchingActivations.head
- activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
-
- // assert that there exists a message in the activation which has the expected keys and values
- val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = currentTime)
- assert(messages.length == 1)
-
- val message = messages.head
- message.getFieldPath("topic") shouldBe Some(topic.toJson)
- message.getFieldPath("key") shouldBe Some(key.toJson)
+ val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
+ assert(activations.nonEmpty)
}, N = 3)
}
@@ -204,7 +182,7 @@ class BasicHealthTest
(wp, assetHelper) =>
val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
- println(s"Creating trigger ${triggerName}")
+ println(s"Creating trigger $triggerName")
val username = kafkaUtils.getAsJson("user")
val password = kafkaUtils.getAsJson("password")
--
To stop receiving notification emails like this one, please contact
dubeejw@apache.org.