You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cs...@apache.org on 2017/10/03 01:21:40 UTC
[incubator-openwhisk-package-kafka] branch master updated: update
test to delete artifacts. reformat with scalariform (#216)
This is an automated email from the ASF dual-hosted git repository.
csantanapr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git
The following commit(s) were added to refs/heads/master by this push:
new 8530bf5 update test to delete artifacts. reformat with scalariform (#216)
8530bf5 is described below
commit 8530bf5d2ecd974df91d82923382af5646410c36
Author: Adnan Baruni <ab...@users.noreply.github.com>
AuthorDate: Mon Oct 2 20:21:38 2017 -0500
update test to delete artifacts. reformat with scalariform (#216)
---
.../test/scala/system/health/BasicHealthTest.scala | 253 +++++++++++----------
1 file changed, 129 insertions(+), 124 deletions(-)
diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala b/tests/src/test/scala/system/health/BasicHealthTest.scala
index fe9d9bf..7443527 100644
--- a/tests/src/test/scala/system/health/BasicHealthTest.scala
+++ b/tests/src/test/scala/system/health/BasicHealthTest.scala
@@ -47,141 +47,146 @@ import com.jayway.restassured.RestAssured
import whisk.utils.retry;
-
@RunWith(classOf[JUnitRunner])
class BasicHealthTest
- extends FlatSpec
- with Matchers
- with WskActorSystem
- with BeforeAndAfterAll
- with TestHelpers
- with WskTestHelpers
- with JsHelpers {
-
- val topic = "test"
- val sessionTimeout = 10 seconds
+ extends FlatSpec
+ with Matchers
+ with WskActorSystem
+ with BeforeAndAfterAll
+ with TestHelpers
+ with WskTestHelpers
+ with JsHelpers {
+
+ val topic = "test"
+ val sessionTimeout = 10 seconds
+
+ implicit val wskprops = WskProps()
+ val wsk = new Wsk()
+
+ val messagingPackage = "/whisk.system/messaging"
+ val messageHubFeed = "messageHubFeed"
+ val messageHubProduce = "messageHubProduce"
+
+ val consumerInitTime = 10000 // ms
+
+ val kafkaUtils = new KafkaUtils
+
+ behavior of "Message Hub feed"
+
+ it should "create a new trigger" in withAssetCleaner(wskprops) {
+ val triggerName = s"newTrigger-${System.currentTimeMillis}"
+ println(s"Creating trigger ${triggerName}")
+
+ (wp, assetHelper) =>
+ val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
+ (trigger, _) =>
+ trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
+ "user" -> kafkaUtils.getAsJson("user"),
+ "password" -> kafkaUtils.getAsJson("password"),
+ "api_key" -> kafkaUtils.getAsJson("api_key"),
+ "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
+ "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+ "topic" -> topic.toJson
+ ))
+ }
+
+ withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
+ activation =>
+ // should be successful
+ activation.response.success shouldBe true
+ val uuid = activation.response.result.get.fields.get("uuid").get.toString
+
+ // get /health endpoint and ensure it contains the new uuid
+ retry({
+ val response = RestAssured.given().get(System.getProperty("health_url"))
+ assert(response.statusCode() == 200 && response.asString().contains(uuid))
+ }, N = 3, waitBeforeRetry = Some(1.second))
+ }
+ }
+
+ it should "fire a trigger when a message is posted to message hub" in withAssetCleaner(wskprops) {
+ val currentTime = s"${System.currentTimeMillis}"
+
+ val baseTriggerName = "/_/BasicHealthTestTrigger"
+
+ val triggerName = System.getProperty("trigger.suffix") match {
+ case suffix if suffix != "" && suffix != null => s"${baseTriggerName}-${suffix}"
+ case _ => s"${baseTriggerName}-${currentTime}"
+ }
- implicit val wskprops = WskProps()
- val wsk = new Wsk()
+ (wp, assetHelper) =>
+ val result = wsk.trigger.get(triggerName, expectedExitCode = DONTCARE_EXIT)
- val messagingPackage = "/whisk.system/messaging"
- val messageHubFeed = "messageHubFeed"
- val messageHubProduce = "messageHubProduce"
+ if (result.exitCode == NOT_FOUND) {
+ // trigger does not yet exist, create it
+ println(s"Creating trigger ${triggerName}")
- val consumerInitTime = 10000 // ms
+ val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
+ (trigger, _) =>
+ trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
+ "user" -> kafkaUtils.getAsJson("user"),
+ "password" -> kafkaUtils.getAsJson("password"),
+ "api_key" -> kafkaUtils.getAsJson("api_key"),
+ "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
+ "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+ "topic" -> topic.toJson
+ ))
+ }
- val kafkaUtils = new KafkaUtils
+ withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
+ activation =>
+ // should be successful
+ activation.response.success shouldBe true
+ }
- behavior of "Message Hub feed"
+ // It takes a moment for the consumer to fully initialize.
+ println("Giving the consumer a moment to get ready")
+ Thread.sleep(consumerInitTime)
+ } else {
+ result.exitCode shouldBe (SUCCESS_EXIT)
+ println(s"Trigger already exists, reusing it: $triggerName")
+ }
+
+ retry({
+ val start = Instant.now(Clock.systemUTC())
+ // key to use for the produced message
+ val key = "TheKey"
+
+ println("Producing a message")
+ withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
+ "user" -> kafkaUtils.getAsJson("user"),
+ "password" -> kafkaUtils.getAsJson("password"),
+ "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+ "topic" -> topic.toJson,
+ "key" -> key.toJson,
+ "value" -> currentTime.toJson
+ ))) {
+ _.response.success shouldBe true
+ }
- it should "create a new trigger" in withAssetCleaner(wskprops) {
- val triggerName = s"newTrigger-${System.currentTimeMillis}"
- println(s"Creating trigger ${triggerName}")
+ println("Polling for activations")
+ val activations = wsk.activation.pollFor(N = 100, Some(triggerName), since = Some(start), retries = 30)
+ assert(activations.length > 0)
- (wp, assetHelper) =>
- val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
- (trigger, _) =>
- trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "api_key" -> kafkaUtils.getAsJson("api_key"),
- "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
- "topic" -> topic.toJson))
- }
-
- withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
- activation =>
- // should be successful
- activation.response.success shouldBe true
- val uuid = activation.response.result.get.fields.get("uuid").get.toString
-
- // get /health endpoint and ensure it contains the new uuid
- retry({
- val response = RestAssured.given().get(System.getProperty("health_url"))
- assert(response.statusCode() == 200 && response.asString().contains(uuid))
- }, N = 3, waitBeforeRetry = Some(1.second))
- }
- }
+ println("Validating content of activation(s)")
+ val matchingActivations = for {
+ id <- activations
+ activation = wsk.activation.waitForActivation(id)
+ if (activation.isRight && activation.right.get.fields.get("response").toString.contains(currentTime))
+ } yield activation.right.get
- it should "fire a trigger when a message is posted to message hub" in withAssetCleaner(wskprops) {
- val currentTime = s"${System.currentTimeMillis}"
+ assert(matchingActivations.length == 1)
- val baseTriggerName = "/_/BasicHealthTestTrigger"
+ val activation = matchingActivations.head
+ activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
- val triggerName = System.getProperty("trigger.suffix") match {
- case suffix if suffix != "" && suffix != null => s"${baseTriggerName}-${suffix}"
- case _ => s"${baseTriggerName}-${currentTime}"
- }
+ // assert that there exists a message in the activation which has the expected keys and values
+ val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = currentTime)
+ assert(messages.length == 1)
- (wp, assetHelper) =>
- val result = wsk.trigger.get(triggerName, expectedExitCode = DONTCARE_EXIT)
-
- if(result.exitCode == NOT_FOUND) {
- // trigger does not yet exist, create it
- println(s"Creating trigger ${triggerName}")
-
- val feedCreationResult = wsk.trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "api_key" -> kafkaUtils.getAsJson("api_key"),
- "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
- "topic" -> topic.toJson))
-
- withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
- activation =>
- // should be successful
- activation.response.success shouldBe true
- }
-
- // It takes a moment for the consumer to fully initialize.
- println("Giving the consumer a moment to get ready")
- Thread.sleep(consumerInitTime)
- } else {
- result.exitCode shouldBe(SUCCESS_EXIT)
- println(s"Trigger already exists, reusing it: $triggerName")
- }
-
- retry({
- val start = Instant.now(Clock.systemUTC())
- // key to use for the produced message
- val key = "TheKey"
-
- println("Producing a message")
- withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
- "topic" -> topic.toJson,
- "key" -> key.toJson,
- "value" -> currentTime.toJson))) {
- _.response.success shouldBe true
- }
-
- println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 100, Some(triggerName), since = Some(start), retries = 30)
- assert(activations.length > 0)
-
- println("Validating content of activation(s)")
- val matchingActivations = for {
- id <- activations
- activation = wsk.activation.waitForActivation(id)
- if (activation.isRight && activation.right.get.fields.get("response").toString.contains(currentTime))
- } yield activation.right.get
-
- assert(matchingActivations.length == 1)
-
- val activation = matchingActivations.head
- activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
-
- // assert that there exists a message in the activation which has the expected keys and values
- val messages = KafkaUtils.messagesInActivation(activation, field="value", value=currentTime)
- assert(messages.length == 1)
-
- val message = messages.head
- message.getFieldPath("topic") shouldBe Some(topic.toJson)
- message.getFieldPath("key") shouldBe Some(key.toJson)
- }, N = 3)
- }
+ val message = messages.head
+ message.getFieldPath("topic") shouldBe Some(topic.toJson)
+ message.getFieldPath("key") shouldBe Some(key.toJson)
+ }, N = 3)
+ }
}
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].