You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/02/22 06:11:42 UTC

[GitHub] dubeejw closed pull request #244: Fix tests that fire trigger when message is consumed

dubeejw closed pull request #244: Fix tests that fire trigger when message is consumed
URL: https://github.com/apache/incubator-openwhisk-package-kafka/pull/244
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala b/tests/src/test/scala/system/health/BasicHealthTest.scala
index f88a164..688d4ff 100644
--- a/tests/src/test/scala/system/health/BasicHealthTest.scala
+++ b/tests/src/test/scala/system/health/BasicHealthTest.scala
@@ -19,14 +19,17 @@ package system.health
 
 import java.time.Clock
 import java.time.Instant
+import java.time.temporal.ChronoUnit
 
 import system.utils.KafkaUtils
 
 import scala.concurrent.duration.DurationInt
 import scala.language.postfixOps
+
 import org.junit.runner.RunWith
 import org.scalatest.{BeforeAndAfterAll, FlatSpec, Inside, Matchers}
 import org.scalatest.junit.JUnitRunner
+
 import common.JsHelpers
 import common.TestHelpers
 import common.TestUtils
@@ -37,21 +40,24 @@ import common.Wsk
 import common.WskActorSystem
 import common.WskProps
 import common.WskTestHelpers
+
 import spray.json.DefaultJsonProtocol._
 import spray.json.{JsObject, pimpAny}
+
 import com.jayway.restassured.RestAssured
-import whisk.utils.retry;
+
+import whisk.utils.retry
 
 @RunWith(classOf[JUnitRunner])
 class BasicHealthTest
-  extends FlatSpec
-  with Matchers
-  with WskActorSystem
-  with BeforeAndAfterAll
-  with TestHelpers
-  with WskTestHelpers
-  with Inside
-  with JsHelpers {
+    extends FlatSpec
+        with Matchers
+        with WskActorSystem
+        with BeforeAndAfterAll
+        with TestHelpers
+        with WskTestHelpers
+        with Inside
+        with JsHelpers {
 
   val topic = "test"
   val sessionTimeout = 10 seconds
@@ -156,11 +162,11 @@ class BasicHealthTest
         rule.create(name, trigger = triggerName, action = defaultActionName)
       }
 
-      retry({
-        val start = Instant.now(Clock.systemUTC())
-        // key to use for the produced message
-        val key = "TheKey"
+      val key = "TheKey"
+      val sleepTime = maxRetries * 1000
+      val start = Instant.now(Clock.systemUTC()).minus(5, ChronoUnit.MINUTES) // Allow for 5 minutes of clock skew
 
+      retry({
         println("Producing a message")
         withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
           "user" -> kafkaUtils.getAsJson("user"),
@@ -173,8 +179,15 @@ class BasicHealthTest
           _.response.success shouldBe true
         }
 
-        println("Polling for activations")
-        val activations = wsk.activation.pollFor(N = 100, Some(triggerName), since = Some(start), retries = maxRetries)
+        // Sleep for sleepTime ms to simulate pollFor(), but with no requests
+        println(s"Giving the consumer $sleepTime ms to consume the message")
+        Thread.sleep(sleepTime)
+
+        // Make sure we don't get a stale activations view
+        wsk.activation.list(Some(triggerName), limit = Some(10))
+
+        println(s"Getting last 10 activations for $triggerName since $start")
+        val activations = wsk.activation.ids(wsk.activation.list(Some(triggerName), limit = Some(10), since = Some(start)))
         assert(activations.length > 0)
 
         println("Validating content of activation(s)")
@@ -184,7 +197,9 @@ class BasicHealthTest
           if (activation.isRight && activation.right.get.fields.get("response").toString.contains(currentTime))
         } yield activation.right.get
 
-        assert(matchingActivations.length == 1)
+        // We may have gone through the retry loop already, so we can have more than one matching activation
+        assert(matchingActivations.length >= 1, "Consumer trigger was not fired, or activation got lost")
+        println(s"Found ${matchingActivations.length} triggers fired with expected message")
 
         val activation = matchingActivations.head
         activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index 6341bbf..4bdbda1 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -16,19 +16,29 @@
  */
 package system.packages
 
+import java.time.Clock
+import java.time.Instant
+import java.util.Base64
+import java.nio.charset.StandardCharsets
+import java.time.{Clock, Instant}
+import java.time.temporal.ChronoUnit
+
 import system.utils.KafkaUtils
-import org.apache.kafka.clients.producer.ProducerRecord
 
 import scala.concurrent.duration.DurationInt
 import scala.language.postfixOps
+
+import org.apache.kafka.clients.producer.ProducerRecord
 import org.junit.runner.RunWith
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FlatSpec
 import org.scalatest.Matchers
 import org.scalatest.Inside
 import org.scalatest.junit.JUnitRunner
+
 import spray.json.DefaultJsonProtocol._
 import spray.json._
+
 import common.JsHelpers
 import common.TestUtils
 import common.TestHelpers
@@ -36,11 +46,8 @@ import common.Wsk
 import common.WskActorSystem
 import common.WskProps
 import common.WskTestHelpers
-import ActionHelper._
-import java.util.Base64
-import java.nio.charset.StandardCharsets
-import java.time.{Clock, Instant}
 
+import ActionHelper._
 
 @RunWith(classOf[JUnitRunner])
 class MessageHubFeedTests
@@ -153,6 +160,8 @@ class MessageHubFeedTests
 
       // key to use for the produced message
       val key = "TheKey"
+      val sleepTime = maxRetries * 1000
+      val start = Instant.now(Clock.systemUTC()).minus(5, ChronoUnit.MINUTES) // Allow for 5 minutes of clock skew
       val encodedCurrentTime = Base64.getEncoder.encodeToString(currentTime.getBytes(StandardCharsets.UTF_8))
       val encodedKey = Base64.getEncoder.encodeToString(key.getBytes(StandardCharsets.UTF_8))
 
@@ -166,17 +175,27 @@ class MessageHubFeedTests
         _.response.success shouldBe true
       }
 
-      println("Polling for activations")
-      val activations = wsk.activation.pollFor(N = 100, Some(triggerName), retries = maxRetries)
+      // Sleep for sleepTime ms to simulate pollFor(), but with no requests
+      println(s"Giving the consumer $sleepTime ms to consume the message")
+      Thread.sleep(sleepTime)
+
+      // Make sure we don't get a stale activations view
+      wsk.activation.list(Some(triggerName), limit = Some(10))
+
+      println(s"Getting last 10 activations for $triggerName since $start")
+      val activations = wsk.activation.ids(wsk.activation.list(Some(triggerName), limit = Some(10), since = Some(start)))
       assert(activations.length > 0)
 
+      println("Validating content of activation(s)")
       val matchingActivations = for {
         id <- activations
         activation = wsk.activation.waitForActivation(id)
         if (activation.isRight && activation.right.get.fields.get("response").toString.contains(encodedCurrentTime))
       } yield activation.right.get
 
-      assert(matchingActivations.length == 1)
+      // We may have gone through the retry loop already, so we can have more than one matching activation
+      assert(matchingActivations.length >= 1, "Consumer trigger was not fired, or activation got lost")
+      println(s"Found ${matchingActivations.length} triggers fired with expected message")
 
       val activation = matchingActivations.head
       activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
diff --git a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
index 6de9417..0aacf09 100644
--- a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
@@ -17,6 +17,9 @@
 
 package system.packages
 
+import java.time.Clock
+import java.time.Instant
+
 import system.utils.KafkaUtils
 
 import scala.concurrent.duration.DurationInt
@@ -40,6 +43,7 @@ import spray.json.pimpAny
 
 import java.util.Base64
 import java.nio.charset.StandardCharsets
+import java.time.temporal.ChronoUnit
 
 @RunWith(classOf[JUnitRunner])
 class MessageHubProduceTests
@@ -183,9 +187,12 @@ class MessageHubProduceTests
                 rule.create(name, trigger = triggerName, action = defaultActionName)
             }
 
-            // It takes a moment for the consumer to fully initialize.
-            println("Giving the consumer a moment to get ready")
-            Thread.sleep(consumerInitTime)
+            val sleepTime = maxRetries * 1000
+            val start = Instant.now(Clock.systemUTC()).minus(5, ChronoUnit.MINUTES) // Allow for 5 minutes of clock skew
+
+            // Sleep for sleepTime ms to simulate pollFor(), but with no requests
+            println(s"Giving the consumer $sleepTime ms to consume the message")
+            Thread.sleep(sleepTime)
 
             // produce message
             val decodedMessage = "This will be base64 encoded"
@@ -197,18 +204,26 @@ class MessageHubProduceTests
                     activation.response.success shouldBe true
             }
 
-            // verify trigger fired
-            println("Polling for activations")
-            val activations = wsk.activation.pollFor(N = 100, Some(triggerName), retries = maxRetries)
+            println("Giving the consumer a moment to consume the message")
+            Thread.sleep(consumerInitTime)
+
+            // Make sure we don't get a stale activations view
+            wsk.activation.list(Some(triggerName), limit = Some(10))
+
+            println(s"Getting last 10 activations for $triggerName since $start")
+            val activations = wsk.activation.ids(wsk.activation.list(Some(triggerName), limit = Some(10), since = Some(start)))
             assert(activations.length > 0)
 
+            println("Validating content of activation(s)")
             val matchingActivations = for {
                 id <- activations
                 activation = wsk.activation.waitForActivation(id)
                 if (activation.isRight && activation.right.get.fields.get("response").toString.contains(decodedMessage))
             } yield activation.right.get
 
-            assert(matchingActivations.length == 1)
+            // We may have gone through the retry loop already, so we can have more than one matching activation
+            assert(matchingActivations.length >= 1, "Consumer trigger was not fired, or activation got lost")
+            println(s"Found ${matchingActivations.length} triggers fired with expected message")
 
             val activation = matchingActivations.head
             activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
@@ -216,6 +231,7 @@ class MessageHubProduceTests
             // assert that there exists a message in the activation which has the expected keys and values
             val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = decodedMessage)
             assert(messages.length == 1)
+
     }
 
     it should "Post a message with a binary key" in withAssetCleaner(wskprops) {
@@ -252,6 +268,9 @@ class MessageHubProduceTests
                 rule.create(name, trigger = triggerName, action = defaultActionName)
             }
 
+            val sleepTime = maxRetries * 1000
+            val start = Instant.now(Clock.systemUTC()).minus(5, ChronoUnit.MINUTES) // Allow for 5 minutes of clock skew
+
             // It takes a moment for the consumer to fully initialize.
             println("Giving the consumer a moment to get ready")
             Thread.sleep(consumerInitTime)
@@ -266,18 +285,27 @@ class MessageHubProduceTests
                     activation.response.success shouldBe true
             }
 
-            // verify trigger fired
-            println("Polling for activations")
-            val activations = wsk.activation.pollFor(N = 100, Some(triggerName), retries = maxRetries)
+            // Sleep for sleepTime ms to simulate pollFor(), but with no requests
+            println(s"Giving the consumer $sleepTime ms to consume the message")
+            Thread.sleep(sleepTime)
+
+            // Make sure we don't get a stale activations view
+            wsk.activation.list(Some(triggerName), limit = Some(10))
+
+            println(s"Getting last 10 activations for $triggerName since $start")
+            val activations = wsk.activation.ids(wsk.activation.list(Some(triggerName), limit = Some(10), since = Some(start)))
             assert(activations.length > 0)
 
+            println("Validating content of activation(s)")
             val matchingActivations = for {
                 id <- activations
                 activation = wsk.activation.waitForActivation(id)
                 if (activation.isRight && activation.right.get.fields.get("response").toString.contains(decodedKey))
             } yield activation.right.get
 
-            assert(matchingActivations.length == 1)
+            // We may have gone through the retry loop already, so we can have more than one matching activation
+            assert(matchingActivations.length >= 1, "Consumer trigger was not fired, or activation got lost")
+            println(s"Found ${matchingActivations.length} triggers fired with expected message")
 
             val activation = matchingActivations.head
             activation.getFieldPath("response", "success") shouldBe Some(true.toJson)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services