You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/03/02 19:46:54 UTC
[GitHub] csantanapr closed pull request #253: Resiliency Updates for MessageHub Tests
csantanapr closed pull request #253: Resiliency Updates for MessageHub Tests
URL: https://github.com/apache/incubator-openwhisk-package-kafka/pull/253
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala b/tests/src/test/scala/system/health/BasicHealthTest.scala
index 02183d2..139ed21 100644
--- a/tests/src/test/scala/system/health/BasicHealthTest.scala
+++ b/tests/src/test/scala/system/health/BasicHealthTest.scala
@@ -141,9 +141,7 @@ class BasicHealthTest
}
withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
- activation =>
- // should be successful
- activation.response.success shouldBe true
+ _.response.success shouldBe true
}
// It takes a moment for the consumer to fully initialize.
@@ -280,9 +278,7 @@ class BasicHealthTest
}
withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
- activation =>
- // should be successful
- activation.response.success shouldBe true
+ _.response.success shouldBe true
}
val readRunResult = wsk.action.invoke(actionName, parameters = Map(
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index 1fe18a1..e280e39 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -17,18 +17,21 @@
package system.packages
import system.utils.KafkaUtils
-import org.apache.kafka.clients.producer.ProducerRecord
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
+
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import org.scalatest.Inside
import org.scalatest.junit.JUnitRunner
+import org.apache.kafka.clients.producer.ProducerRecord
+
import spray.json.DefaultJsonProtocol._
import spray.json._
+
import common.JsHelpers
import common.TestUtils
import common.TestHelpers
@@ -36,11 +39,14 @@ import common.Wsk
import common.WskActorSystem
import common.WskProps
import common.WskTestHelpers
+
import ActionHelper._
+
import java.util.Base64
import java.nio.charset.StandardCharsets
import java.time.{Clock, Instant}
+import whisk.utils.retry
@RunWith(classOf[JUnitRunner])
class MessageHubFeedTests
@@ -166,28 +172,30 @@ class MessageHubFeedTests
_.response.success shouldBe true
}
- println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
- assert(activations.length == 1)
+ retry({
+ println("Polling for activations")
+ val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
+ assert(activations.length == 1)
- val matchingActivations = for {
- id <- activations
- activation = wsk.activation.waitForActivation(id)
- if (activation.isRight && activation.right.get.fields.get("response").toString.contains(encodedCurrentTime))
- } yield activation.right.get
+ val matchingActivations = for {
+ id <- activations
+ activation = wsk.activation.waitForActivation(id)
+ if (activation.isRight && activation.right.get.fields.get("response").toString.contains(encodedCurrentTime))
+ } yield activation.right.get
- assert(matchingActivations.length == 1)
+ assert(matchingActivations.length > 0)
- val activation = matchingActivations.head
- activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
+ val activation = matchingActivations.head
+ activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
- // assert that there exists a message in the activation which has the expected keys and values
- val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = encodedCurrentTime)
- assert(messages.length == 1)
+ // assert that there exists a message in the activation which has the expected keys and values
+ val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = encodedCurrentTime)
+ assert(messages.length == 1)
- val message = messages.head
- message.getFieldPath("topic") shouldBe Some(topic.toJson)
- message.getFieldPath("key") shouldBe Some(encodedKey.toJson)
+ val message = messages.head
+ message.getFieldPath("topic") shouldBe Some(topic.toJson)
+ message.getFieldPath("key") shouldBe Some(encodedKey.toJson)
+ }, N = 3)
}
it should "not fire a single trigger with an oversized payload" in withAssetCleaner(wskprops) {
@@ -234,19 +242,21 @@ class MessageHubFeedTests
producer.send(secondMessage)
producer.close()
- // verify there are two trigger activations required to handle these messages
- println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 2, Some(triggerName), retries = maxRetries)
-
- println("Verifying activation content")
- val matchingActivations = for {
- id <- activations
- activation = wsk.activation.waitForActivation(id)
- if (activation.isRight && (activation.right.get.fields.get("response").toString.contains(s"first${currentTime}") ||
- activation.right.get.fields.get("response").toString.contains(s"second${currentTime}")))
- } yield activation.right.get
-
- assert(matchingActivations.length == 2)
+ retry({
+ // verify there are two trigger activations required to handle these messages
+ println("Polling for activations")
+ val activations = wsk.activation.pollFor(N = 2, Some(triggerName), retries = maxRetries)
+
+ println("Verifying activation content")
+ val matchingActivations = for {
+ id <- activations
+ activation = wsk.activation.waitForActivation(id)
+ if (activation.isRight && (activation.right.get.fields.get("response").toString.contains(s"first${currentTime}") ||
+ activation.right.get.fields.get("response").toString.contains(s"second${currentTime}")))
+ } yield activation.right.get
+
+ assert(matchingActivations.length == 2)
+ }, N = 3)
}
it should "not fire a trigger for a single oversized message" in withAssetCleaner(wskprops) {
@@ -289,18 +299,20 @@ class MessageHubFeedTests
producer.send(bigMessage)
producer.close()
- // verify there are no activations that match
- println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
+ retry({
+ // verify there are no activations that match
+ println("Polling for activations")
+ val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
- println("Verifying activation content")
- val matchingActivations = for {
- id <- activations
- activation = wsk.activation.waitForActivation(id)
- if (activation.isRight && (activation.right.get.fields.get("response").toString.contains(s"first${currentTime}")))
- } yield activation.right.get
+ println("Verifying activation content")
+ val matchingActivations = for {
+ id <- activations
+ activation = wsk.activation.waitForActivation(id)
+ if (activation.isRight && (activation.right.get.fields.get("response").toString.contains(s"first${currentTime}")))
+ } yield activation.right.get
- assert(matchingActivations.length == 0)
+ assert(matchingActivations.length == 0)
+ }, N = 3)
}
it should "reject trigger update without passing in any updatable parameters" in withAssetCleaner(wskprops) {
@@ -333,8 +345,7 @@ class MessageHubFeedTests
))
withActivation(wsk.activation, run) {
- activation =>
- activation.response.success shouldBe false
+ _.response.success shouldBe false
}
}
@@ -370,8 +381,7 @@ class MessageHubFeedTests
))
withActivation(wsk.activation, run) {
- activation =>
- activation.response.success shouldBe false
+ _.response.success shouldBe false
}
}
@@ -430,8 +440,7 @@ class MessageHubFeedTests
))
withActivation(wsk.activation, updateRunResult) {
- activation =>
- activation.response.success shouldBe true
+ _.response.success shouldBe true
}
println("Giving the consumer a moment to get ready")
@@ -469,29 +478,31 @@ class MessageHubFeedTests
}
def checkForActivations(triggerName: String, since: Instant, topic: String, key: String, value: String) = {
- println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 1, Some(triggerName), since = Some(since), retries = maxRetries)
- assert(activations.length == 1)
+ retry({
+ println("Polling for activations")
+ val activations = wsk.activation.pollFor(N = 1, Some(triggerName), since = Some(since), retries = maxRetries)
+ assert(activations.length == 1)
- println("Validating content of activation(s)")
- val matchingActivations = for {
- id <- activations
- activation = wsk.activation.waitForActivation(id)
- if (activation.isRight && activation.right.get.fields.get("response").toString.contains(value))
- } yield activation.right.get
+ println("Validating content of activation(s)")
+ val matchingActivations = for {
+ id <- activations
+ activation = wsk.activation.waitForActivation(id)
+ if (activation.isRight && activation.right.get.fields.get("response").toString.contains(value))
+ } yield activation.right.get
- assert(matchingActivations.length == 1)
+ assert(matchingActivations.length > 0)
- val activation = matchingActivations.head
- activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
+ val activation = matchingActivations.head
+ activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
- // assert that there exists a message in the activation which has the expected keys and values
- val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = value)
- assert(messages.length == 1)
+ // assert that there exists a message in the activation which has the expected keys and values
+ val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = value)
+ assert(messages.length == 1)
- val message = messages.head
- message.getFieldPath("topic") shouldBe Some(topic.toJson)
- message.getFieldPath("key") shouldBe Some(key.toJson)
+ val message = messages.head
+ message.getFieldPath("topic") shouldBe Some(topic.toJson)
+ message.getFieldPath("key") shouldBe Some(key.toJson)
+ }, N = 3)
}
def generateMessage(prefix: String, size: Int): String = {
diff --git a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
index 3c2aac8..4d2b38c 100644
--- a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
@@ -35,12 +35,15 @@ import common.Wsk
import common.WskActorSystem
import common.WskProps
import common.WskTestHelpers
+
import spray.json.DefaultJsonProtocol._
import spray.json.pimpAny
import java.util.Base64
import java.nio.charset.StandardCharsets
+import whisk.utils.retry
+
@RunWith(classOf[JUnitRunner])
class MessageHubProduceTests
extends FlatSpec
@@ -150,7 +153,6 @@ class MessageHubProduceTests
}
it should "Post a message with a binary value" in withAssetCleaner(wskprops) {
- // create trigger
val currentTime = s"${System.currentTimeMillis}"
(wp, assetHelper) =>
@@ -169,53 +171,53 @@ class MessageHubProduceTests
}
withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
- activation =>
- // should be successful
- activation.response.success shouldBe true
+ _.response.success shouldBe true
}
+ // It takes a moment for the consumer to fully initialize.
+ println("Giving the consumer a moment to get ready")
+ Thread.sleep(consumerInitTime)
+
val defaultActionName = s"helloKafka-${currentTime}"
assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
action.create(name, defaultAction)
}
- assetHelper.withCleaner(wsk.rule, "rule") { (rule, name) =>
+
+ assetHelper.withCleaner(wsk.rule, s"dummyMessageHub-helloKafka-$currentTime") { (rule, name) =>
rule.create(name, trigger = triggerName, action = defaultActionName)
}
- // It takes a moment for the consumer to fully initialize.
- println("Giving the consumer a moment to get ready")
- Thread.sleep(consumerInitTime)
-
// produce message
val decodedMessage = "This will be base64 encoded"
val encodedMessage = Base64.getEncoder.encodeToString(decodedMessage.getBytes(StandardCharsets.UTF_8))
val base64ValueParams = validParameters + ("base64DecodeValue" -> true.toJson) + ("value" -> encodedMessage.toJson)
+ println("Producing a message")
withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", base64ValueParams)) {
- activation =>
- activation.response.success shouldBe true
+ _.response.success shouldBe true
}
- // verify trigger fired
- println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
- assert(activations.length > 0)
+ retry({
+ println("Polling for activations")
+ val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
+ assert(activations.nonEmpty)
- val matchingActivations = for {
- id <- activations
- activation = wsk.activation.waitForActivation(id)
- if (activation.isRight && activation.right.get.fields.get("response").toString.contains(decodedMessage))
- } yield activation.right.get
+ val matchingActivations = for {
+ id <- activations
+ activation = wsk.activation.waitForActivation(id)
+ if (activation.isRight && activation.right.get.fields.get("response").toString.contains(decodedMessage))
+ } yield activation.right.get
- assert(matchingActivations.length == 1)
+ assert(matchingActivations.length > 0)
- val activation = matchingActivations.head
- activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
+ val activation = matchingActivations.head
+ activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
- // assert that there exists a message in the activation which has the expected keys and values
- val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = decodedMessage)
- assert(messages.length == 1)
+ // assert that there exists a message in the activation which has the expected keys and values
+ val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = decodedMessage)
+ assert(messages.length == 1)
+ }, N = 3)
}
it should "Post a message with a binary key" in withAssetCleaner(wskprops) {
@@ -238,52 +240,52 @@ class MessageHubProduceTests
}
withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
- activation =>
- // should be successful
- activation.response.success shouldBe true
+ _.response.success shouldBe true
}
+ // It takes a moment for the consumer to fully initialize.
+ println("Giving the consumer a moment to get ready")
+ Thread.sleep(consumerInitTime)
+
val defaultActionName = s"helloKafka-${currentTime}"
assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
action.create(name, defaultAction)
}
- assetHelper.withCleaner(wsk.rule, "rule") { (rule, name) =>
+
+ assetHelper.withCleaner(wsk.rule, s"dummyMessageHub-helloKafka-$currentTime") { (rule, name) =>
rule.create(name, trigger = triggerName, action = defaultActionName)
}
- // It takes a moment for the consumer to fully initialize.
- println("Giving the consumer a moment to get ready")
- Thread.sleep(consumerInitTime)
-
// produce message
val decodedKey = "This will be base64 encoded"
val encodedKey = Base64.getEncoder.encodeToString(decodedKey.getBytes(StandardCharsets.UTF_8))
val base64ValueParams = validParameters + ("base64DecodeKey" -> true.toJson) + ("key" -> encodedKey.toJson)
+ println("Producing a message")
withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", base64ValueParams)) {
- activation =>
- activation.response.success shouldBe true
+ _.response.success shouldBe true
}
- // verify trigger fired
- println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
- assert(activations.length > 0)
+ retry({
+ println("Polling for activations")
+ val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
+ assert(activations.nonEmpty)
- val matchingActivations = for {
- id <- activations
- activation = wsk.activation.waitForActivation(id)
- if (activation.isRight && activation.right.get.fields.get("response").toString.contains(decodedKey))
- } yield activation.right.get
+ val matchingActivations = for {
+ id <- activations
+ activation = wsk.activation.waitForActivation(id)
+ if (activation.isRight && activation.right.get.fields.get("response").toString.contains(decodedKey))
+ } yield activation.right.get
- assert(matchingActivations.length == 1)
+ assert(matchingActivations.length > 0)
- val activation = matchingActivations.head
- activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
+ val activation = matchingActivations.head
+ activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
- // assert that there exists a message in the activation which has the expected keys and values
- val messages = KafkaUtils.messagesInActivation(activation, field = "key", value = decodedKey)
- assert(messages.length == 1)
+ // assert that there exists a message in the activation which has the expected keys and values
+ val messages = KafkaUtils.messagesInActivation(activation, field = "key", value = decodedKey)
+ assert(messages.length == 1)
+ }, N = 3)
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services