You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/02/22 06:11:42 UTC
[GitHub] dubeejw closed pull request #244: Fix tests that fire trigger when message is consumed
dubeejw closed pull request #244: Fix tests that fire trigger when message is consumed
URL: https://github.com/apache/incubator-openwhisk-package-kafka/pull/244
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala b/tests/src/test/scala/system/health/BasicHealthTest.scala
index f88a164..688d4ff 100644
--- a/tests/src/test/scala/system/health/BasicHealthTest.scala
+++ b/tests/src/test/scala/system/health/BasicHealthTest.scala
@@ -19,14 +19,17 @@ package system.health
import java.time.Clock
import java.time.Instant
+import java.time.temporal.ChronoUnit
import system.utils.KafkaUtils
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
+
import org.junit.runner.RunWith
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Inside, Matchers}
import org.scalatest.junit.JUnitRunner
+
import common.JsHelpers
import common.TestHelpers
import common.TestUtils
@@ -37,21 +40,24 @@ import common.Wsk
import common.WskActorSystem
import common.WskProps
import common.WskTestHelpers
+
import spray.json.DefaultJsonProtocol._
import spray.json.{JsObject, pimpAny}
+
import com.jayway.restassured.RestAssured
-import whisk.utils.retry;
+
+import whisk.utils.retry
@RunWith(classOf[JUnitRunner])
class BasicHealthTest
- extends FlatSpec
- with Matchers
- with WskActorSystem
- with BeforeAndAfterAll
- with TestHelpers
- with WskTestHelpers
- with Inside
- with JsHelpers {
+ extends FlatSpec
+ with Matchers
+ with WskActorSystem
+ with BeforeAndAfterAll
+ with TestHelpers
+ with WskTestHelpers
+ with Inside
+ with JsHelpers {
val topic = "test"
val sessionTimeout = 10 seconds
@@ -156,11 +162,11 @@ class BasicHealthTest
rule.create(name, trigger = triggerName, action = defaultActionName)
}
- retry({
- val start = Instant.now(Clock.systemUTC())
- // key to use for the produced message
- val key = "TheKey"
+ val key = "TheKey"
+ val sleepTime = maxRetries * 1000
+ val start = Instant.now(Clock.systemUTC()).minus(5, ChronoUnit.MINUTES) // Allow for 5 minutes of clock skew
+ retry({
println("Producing a message")
withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
"user" -> kafkaUtils.getAsJson("user"),
@@ -173,8 +179,15 @@ class BasicHealthTest
_.response.success shouldBe true
}
- println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 100, Some(triggerName), since = Some(start), retries = maxRetries)
+ // Sleep for sleepTime ms to simulate pollFor(), but with no requests
+ println(s"Giving the consumer $sleepTime ms to consume the message")
+ Thread.sleep(sleepTime)
+
+ // Make sure we don't get a stale activations view
+ wsk.activation.list(Some(triggerName), limit = Some(10))
+
+ println(s"Getting last 10 activations for $triggerName since $start")
+ val activations = wsk.activation.ids(wsk.activation.list(Some(triggerName), limit = Some(10), since = Some(start)))
assert(activations.length > 0)
println("Validating content of activation(s)")
@@ -184,7 +197,9 @@ class BasicHealthTest
if (activation.isRight && activation.right.get.fields.get("response").toString.contains(currentTime))
} yield activation.right.get
- assert(matchingActivations.length == 1)
+ // We may have gone through the retry loop already, so we can have more than one matching activation
+ assert(matchingActivations.length >= 1, "Consumer trigger was not fired, or activation got lost")
+ println(s"Found ${matchingActivations.length} triggers fired with expected message")
val activation = matchingActivations.head
activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index 6341bbf..4bdbda1 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -16,19 +16,29 @@
*/
package system.packages
+import java.time.Clock
+import java.time.Instant
+import java.util.Base64
+import java.nio.charset.StandardCharsets
+import java.time.{Clock, Instant}
+import java.time.temporal.ChronoUnit
+
import system.utils.KafkaUtils
-import org.apache.kafka.clients.producer.ProducerRecord
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
+
+import org.apache.kafka.clients.producer.ProducerRecord
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import org.scalatest.Inside
import org.scalatest.junit.JUnitRunner
+
import spray.json.DefaultJsonProtocol._
import spray.json._
+
import common.JsHelpers
import common.TestUtils
import common.TestHelpers
@@ -36,11 +46,8 @@ import common.Wsk
import common.WskActorSystem
import common.WskProps
import common.WskTestHelpers
-import ActionHelper._
-import java.util.Base64
-import java.nio.charset.StandardCharsets
-import java.time.{Clock, Instant}
+import ActionHelper._
@RunWith(classOf[JUnitRunner])
class MessageHubFeedTests
@@ -153,6 +160,8 @@ class MessageHubFeedTests
// key to use for the produced message
val key = "TheKey"
+ val sleepTime = maxRetries * 1000
+ val start = Instant.now(Clock.systemUTC()).minus(5, ChronoUnit.MINUTES) // Allow for 5 minutes of clock skew
val encodedCurrentTime = Base64.getEncoder.encodeToString(currentTime.getBytes(StandardCharsets.UTF_8))
val encodedKey = Base64.getEncoder.encodeToString(key.getBytes(StandardCharsets.UTF_8))
@@ -166,17 +175,27 @@ class MessageHubFeedTests
_.response.success shouldBe true
}
- println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 100, Some(triggerName), retries = maxRetries)
+ // Sleep for sleepTime ms to simulate pollFor(), but with no requests
+ println(s"Giving the consumer $sleepTime ms to consume the message")
+ Thread.sleep(sleepTime)
+
+ // Make sure we don't get a stale activations view
+ wsk.activation.list(Some(triggerName), limit = Some(10))
+
+ println(s"Getting last 10 activations for $triggerName since $start")
+ val activations = wsk.activation.ids(wsk.activation.list(Some(triggerName), limit = Some(10), since = Some(start)))
assert(activations.length > 0)
+ println("Validating content of activation(s)")
val matchingActivations = for {
id <- activations
activation = wsk.activation.waitForActivation(id)
if (activation.isRight && activation.right.get.fields.get("response").toString.contains(encodedCurrentTime))
} yield activation.right.get
- assert(matchingActivations.length == 1)
+ // We may have gone through the retry loop already, so we can have more than one matching activation
+ assert(matchingActivations.length >= 1, "Consumer trigger was not fired, or activation got lost")
+ println(s"Found ${matchingActivations.length} triggers fired with expected message")
val activation = matchingActivations.head
activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
diff --git a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
index 6de9417..0aacf09 100644
--- a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
@@ -17,6 +17,9 @@
package system.packages
+import java.time.Clock
+import java.time.Instant
+
import system.utils.KafkaUtils
import scala.concurrent.duration.DurationInt
@@ -40,6 +43,7 @@ import spray.json.pimpAny
import java.util.Base64
import java.nio.charset.StandardCharsets
+import java.time.temporal.ChronoUnit
@RunWith(classOf[JUnitRunner])
class MessageHubProduceTests
@@ -183,9 +187,12 @@ class MessageHubProduceTests
rule.create(name, trigger = triggerName, action = defaultActionName)
}
- // It takes a moment for the consumer to fully initialize.
- println("Giving the consumer a moment to get ready")
- Thread.sleep(consumerInitTime)
+ val sleepTime = maxRetries * 1000
+ val start = Instant.now(Clock.systemUTC()).minus(5, ChronoUnit.MINUTES) // Allow for 5 minutes of clock skew
+
+ // Sleep for sleepTime ms to simulate pollFor(), but with no requests
+ println(s"Giving the consumer $sleepTime ms to consume the message")
+ Thread.sleep(sleepTime)
// produce message
val decodedMessage = "This will be base64 encoded"
@@ -197,18 +204,26 @@ class MessageHubProduceTests
activation.response.success shouldBe true
}
- // verify trigger fired
- println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 100, Some(triggerName), retries = maxRetries)
+ println("Giving the consumer a moment to consume the message")
+ Thread.sleep(consumerInitTime)
+
+ // Make sure we don't get a stale activations view
+ wsk.activation.list(Some(triggerName), limit = Some(10))
+
+ println(s"Getting last 10 activations for $triggerName since $start")
+ val activations = wsk.activation.ids(wsk.activation.list(Some(triggerName), limit = Some(10), since = Some(start)))
assert(activations.length > 0)
+ println("Validating content of activation(s)")
val matchingActivations = for {
id <- activations
activation = wsk.activation.waitForActivation(id)
if (activation.isRight && activation.right.get.fields.get("response").toString.contains(decodedMessage))
} yield activation.right.get
- assert(matchingActivations.length == 1)
+ // We may have gone through the retry loop already, so we can have more than one matching activation
+ assert(matchingActivations.length >= 1, "Consumer trigger was not fired, or activation got lost")
+ println(s"Found ${matchingActivations.length} triggers fired with expected message")
val activation = matchingActivations.head
activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
@@ -216,6 +231,7 @@ class MessageHubProduceTests
// assert that there exists a message in the activation which has the expected keys and values
val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = decodedMessage)
assert(messages.length == 1)
+
}
it should "Post a message with a binary key" in withAssetCleaner(wskprops) {
@@ -252,6 +268,9 @@ class MessageHubProduceTests
rule.create(name, trigger = triggerName, action = defaultActionName)
}
+ val sleepTime = maxRetries * 1000
+ val start = Instant.now(Clock.systemUTC()).minus(5, ChronoUnit.MINUTES) // Allow for 5 minutes of clock skew
+
// It takes a moment for the consumer to fully initialize.
println("Giving the consumer a moment to get ready")
Thread.sleep(consumerInitTime)
@@ -266,18 +285,27 @@ class MessageHubProduceTests
activation.response.success shouldBe true
}
- // verify trigger fired
- println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 100, Some(triggerName), retries = maxRetries)
+ // Sleep for sleepTime ms to simulate pollFor(), but with no requests
+ println(s"Giving the consumer $sleepTime ms to consume the message")
+ Thread.sleep(sleepTime)
+
+ // Make sure we don't get a stale activations view
+ wsk.activation.list(Some(triggerName), limit = Some(10))
+
+ println(s"Getting last 10 activations for $triggerName since $start")
+ val activations = wsk.activation.ids(wsk.activation.list(Some(triggerName), limit = Some(10), since = Some(start)))
assert(activations.length > 0)
+ println("Validating content of activation(s)")
val matchingActivations = for {
id <- activations
activation = wsk.activation.waitForActivation(id)
if (activation.isRight && activation.right.get.fields.get("response").toString.contains(decodedKey))
} yield activation.right.get
- assert(matchingActivations.length == 1)
+ // We may have gone through the retry loop already, so we can have more than one matching activation
+ assert(matchingActivations.length >= 1, "Consumer trigger was not fired, or activation got lost")
+ println(s"Found ${matchingActivations.length} triggers fired with expected message")
val activation = matchingActivations.head
activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services