You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/11/07 15:22:48 UTC

[GitHub] jasonpet closed pull request #292: Ensure test consumers exists before producing messages

jasonpet closed pull request #292: Ensure test consumers exists before producing messages
URL: https://github.com/apache/incubator-openwhisk-package-kafka/pull/292
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala b/tests/src/test/scala/system/health/BasicHealthTest.scala
index 2b99938..949bce6 100644
--- a/tests/src/test/scala/system/health/BasicHealthTest.scala
+++ b/tests/src/test/scala/system/health/BasicHealthTest.scala
@@ -19,7 +19,6 @@ package system.health
 
 import java.util.concurrent.{TimeUnit, TimeoutException}
 
-import com.jayway.restassured.RestAssured
 import common.TestUtils.NOT_FOUND
 import common._
 import org.apache.kafka.clients.producer.ProducerRecord
@@ -43,7 +42,8 @@ class BasicHealthTest
   with TestHelpers
   with WskTestHelpers
   with Inside
-  with JsHelpers {
+  with JsHelpers
+  with KafkaUtils {
 
   val topic = "test"
   val sessionTimeout = 10 seconds
@@ -55,125 +55,24 @@ class BasicHealthTest
   val messageHubFeed = "messageHubFeed"
   val messageHubProduce = "messageHubProduce"
   val actionName = s"$messagingPackage/$messageHubFeed"
-
-  val consumerInitTime = 10000 // ms
-
-  val kafkaUtils = new KafkaUtils
-
   val maxRetries = System.getProperty("max.retries", "60").toInt
 
   behavior of "Message Hub feed"
 
-  it should "create a new trigger" in withAssetCleaner(wskprops) {
-    val triggerName = s"newTrigger-${System.currentTimeMillis}"
-    println(s"Creating trigger $triggerName")
-
-    (wp, assetHelper) =>
-      val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
-        (trigger, _) =>
-          trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
-            "user" -> kafkaUtils.getAsJson("user"),
-            "password" -> kafkaUtils.getAsJson("password"),
-            "api_key" -> kafkaUtils.getAsJson("api_key"),
-            "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
-            "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
-            "topic" -> topic.toJson
-          ))
-      }
-
-      withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
-        activation =>
-          // should be successful
-          activation.response.success shouldBe true
-
-          // It takes a moment for the consumer to fully initialize.
-          println("Giving the consumer a moment to get ready")
-          Thread.sleep(consumerInitTime)
-
-          val uuid = activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"", "")
-
-          println("Checking health endpoint(s) for existence of consumer uuid")
-          // get /health endpoint(s) and ensure it contains the new uuid
-          val healthUrls = System.getProperty("health_url").split("\\s*,\\s*").filterNot(_.isEmpty)
-          healthUrls shouldNot be(empty)
-
-          retry({
-            val uuids = healthUrls.flatMap(u => {
-              val response = RestAssured.given().get(u)
-              response.statusCode() should be(200)
-              response.asString()
-                .parseJson
-                .asJsObject
-                .getFields("consumers")
-                .head
-                .convertTo[JsArray]
-                .elements
-                .flatMap(c => {
-                  c.asJsObject.fields.keySet
-                })
-            }).toList
-
-            uuids should contain(uuid)
-
-          }, N = 10, waitBeforeRetry = Some(1.second))
-      }
-  }
-
-  it should "fire a trigger when a message is posted to message hub" in withAssetCleaner(wskprops) {
+  it should "create a consumer and fire a trigger when a message is posted to messagehub" in withAssetCleaner(wskprops) {
     val currentTime = s"${System.currentTimeMillis}"
 
     (wp, assetHelper) =>
       val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
-      println(s"Creating trigger $triggerName")
-
-      val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
-        (trigger, _) =>
-          trigger.create(triggerName, feed = Some(actionName), parameters = Map(
-            "user" -> kafkaUtils.getAsJson("user"),
-            "password" -> kafkaUtils.getAsJson("password"),
-            "api_key" -> kafkaUtils.getAsJson("api_key"),
-            "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
-            "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
-            "topic" -> topic.toJson
-          ))
-      }
-
-      withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
-        activation =>
-          // should be successful
-          activation.response.success shouldBe true
 
-          // It takes a moment for the consumer to fully initialize.
-          println("Giving the consumer a moment to get ready")
-          Thread.sleep(consumerInitTime)
-
-          val uuid = activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"", "")
-
-          println("Checking health endpoint(s) for existence of consumer uuid")
-          // get /health endpoint(s) and ensure it contains the new uuid
-          val healthUrls = System.getProperty("health_url").split("\\s*,\\s*").filterNot(_.isEmpty)
-          healthUrls shouldNot be(empty)
-
-          retry({
-            val uuids = healthUrls.flatMap(u => {
-              val response = RestAssured.given().get(u)
-              response.statusCode() should be(200)
-              response.asString()
-                .parseJson
-                .asJsObject
-                .getFields("consumers")
-                .head
-                .convertTo[JsArray]
-                .elements
-                .flatMap(c => {
-                  c.asJsObject.fields.keySet
-                })
-            }).toList
-
-            uuids should contain(uuid)
-
-          }, N = 10, waitBeforeRetry = Some(1.second))
-      }
+      createTrigger(assetHelper, triggerName, parameters = Map(
+        "user" -> getAsJson("user"),
+        "password" -> getAsJson("password"),
+        "api_key" -> getAsJson("api_key"),
+        "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+        "kafka_brokers_sasl" -> getAsJson("brokers"),
+        "topic" -> topic.toJson
+      ))
 
       // This action creates a trigger if it gets executed.
       // The name of the trigger will be the message, that has been send to kafka.
@@ -201,7 +100,7 @@ class BasicHealthTest
       }
 
       println(s"Producing message with key: $key and value: $verificationName")
-      val producer = kafkaUtils.createProducer()
+      val producer = createProducer()
       val record = new ProducerRecord(topic, key, verificationName)
       val future = producer.send(record)
 
@@ -231,17 +130,17 @@ class BasicHealthTest
       val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
       println(s"Creating trigger $triggerName")
 
-      val username = kafkaUtils.getAsJson("user")
-      val password = kafkaUtils.getAsJson("password")
-      val admin_url = kafkaUtils.getAsJson("kafka_admin_url")
-      val brokers = kafkaUtils.getAsJson("brokers")
+      val username = getAsJson("user")
+      val password = getAsJson("password")
+      val admin_url = getAsJson("kafka_admin_url")
+      val brokers = getAsJson("brokers")
 
       val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
         (trigger, _) =>
           trigger.create(triggerName, feed = Some(actionName), parameters = Map(
             "user" -> username,
             "password" -> password,
-            "api_key" -> kafkaUtils.getAsJson("api_key"),
+            "api_key" -> getAsJson("api_key"),
             "kafka_admin_url" -> admin_url,
             "kafka_brokers_sasl" -> brokers,
             "topic" -> topic.toJson,
diff --git a/tests/src/test/scala/system/packages/KafkaProduceTests.scala b/tests/src/test/scala/system/packages/KafkaProduceTests.scala
index 85d5007..e3d9094 100644
--- a/tests/src/test/scala/system/packages/KafkaProduceTests.scala
+++ b/tests/src/test/scala/system/packages/KafkaProduceTests.scala
@@ -45,7 +45,8 @@ class KafkaProduceTests
     with BeforeAndAfterAll
     with TestHelpers
     with WskTestHelpers
-    with JsHelpers {
+    with JsHelpers
+    with KafkaUtils {
 
     val topic = "test"
     val sessionTimeout = 10 seconds
@@ -56,8 +57,6 @@ class KafkaProduceTests
     val actionName = "kafkaProduceAction"
     val actionFile = "../action/kafkaProduce.py"
 
-    val kafkaUtils = new KafkaUtils
-
     behavior of "Kafka Produce action"
 
     override def beforeAll() {
@@ -73,7 +72,7 @@ class KafkaProduceTests
     def testMissingParameter(missingParam : String) = {
         var fullParamsMap = Map(
             "topic" -> topic.toJson,
-            "brokers" -> kafkaUtils.getAsJson("brokers"),
+            "brokers" -> getAsJson("brokers"),
             "value" -> "This will fail".toJson)
         var missingParamsMap = fullParamsMap.filterKeys(_ != missingParam)
 
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index 04ed5c9..4263a1d 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -36,7 +36,6 @@ import common.WskActorSystem
 import common.WskProps
 import common.WskTestHelpers
 import ActionHelper._
-
 import common.TestUtils.NOT_FOUND
 import whisk.utils.retry
 
@@ -49,19 +48,15 @@ class MessageHubFeedTests
   with BeforeAndAfterAll
   with TestHelpers
   with WskTestHelpers
-  with JsHelpers {
+  with JsHelpers
+  with KafkaUtils {
 
   val topic = "test"
   val sessionTimeout = 10 seconds
-
   val messagingPackage = "/whisk.system/messaging"
   val messageHubFeed = "messageHubFeed"
   val messageHubProduce = "messageHubProduce"
-
   val consumerInitTime = 10000 // ms
-
-  val kafkaUtils = new KafkaUtils
-
   val maxRetries = System.getProperty("max.retries", "60").toInt
 
   implicit val wskprops = WskProps()
@@ -126,14 +121,13 @@ class MessageHubFeedTests
 
     (wp, assetHelper) =>
       val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
-      println(s"Creating trigger ${triggerName}")
 
       createTrigger(assetHelper, triggerName, parameters = Map(
-        "user" -> kafkaUtils.getAsJson("user"),
-        "password" -> kafkaUtils.getAsJson("password"),
-        "api_key" -> kafkaUtils.getAsJson("api_key"),
-        "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
-        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+        "user" -> getAsJson("user"),
+        "password" -> getAsJson("password"),
+        "api_key" -> getAsJson("api_key"),
+        "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+        "kafka_brokers_sasl" -> getAsJson("brokers"),
         "topic" -> topic.toJson,
         "isBinaryKey" -> false.toJson,
         "isBinaryValue" -> false.toJson))
@@ -158,20 +152,15 @@ class MessageHubFeedTests
         trigger.get(name, NOT_FOUND)
       }
 
-      // It takes a moment for the consumer to fully initialize.
-      println("Giving the consumer a moment to get ready")
-      Thread.sleep(consumerInitTime)
-
       // Rapidly produce two messages whose size are each greater than half the allowed payload limit.
       // This should ensure that the feed fires these as two separate triggers.
       println("Rapidly producing two large messages")
-      val producer = kafkaUtils.createProducer()
+      val producer = createProducer()
       val firstMessage = new ProducerRecord(topic, verificationName1, generateMessage(s"first${currentTime}", testPayloadSize))
       val secondMessage = new ProducerRecord(topic, verificationName2, generateMessage(s"second${currentTime}", testPayloadSize))
       producer.send(firstMessage)
       producer.send(secondMessage)
       producer.close()
-
       retry(wsk.trigger.get(verificationName1), 60, Some(1.second))
       retry(wsk.trigger.get(verificationName2), 60, Some(1.second))
   }
@@ -185,14 +174,13 @@ class MessageHubFeedTests
 
     (wp, assetHelper) =>
       val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
-      println(s"Creating trigger ${triggerName}")
 
       createTrigger(assetHelper, triggerName, parameters = Map(
-        "user" -> kafkaUtils.getAsJson("user"),
-        "password" -> kafkaUtils.getAsJson("password"),
-        "api_key" -> kafkaUtils.getAsJson("api_key"),
-        "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
-        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+        "user" -> getAsJson("user"),
+        "password" -> getAsJson("password"),
+        "api_key" -> getAsJson("api_key"),
+        "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+        "kafka_brokers_sasl" -> getAsJson("brokers"),
         "topic" -> topic.toJson,
         "isBinaryKey" -> false.toJson,
         "isBinaryValue" -> false.toJson))
@@ -211,12 +199,8 @@ class MessageHubFeedTests
 
       wsk.trigger.get(verificationName, NOT_FOUND)
 
-      // It takes a moment for the consumer to fully initialize.
-      println("Giving the consumer a moment to get ready")
-      Thread.sleep(consumerInitTime)
-
       println("Producing an oversized message")
-      val producer = kafkaUtils.createProducer()
+      val producer = createProducer()
       val bigMessage = new ProducerRecord(topic, verificationName, generateMessage(s"${currentTime}", testPayloadSize))
       producer.send(bigMessage)
       producer.close()
@@ -229,17 +213,15 @@ class MessageHubFeedTests
 
     (wp, assetHelper) =>
       val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
-      println(s"Creating trigger ${triggerName}")
-
-      val username = kafkaUtils.getAsJson("user")
-      val password = kafkaUtils.getAsJson("password")
-      val admin_url = kafkaUtils.getAsJson("kafka_admin_url")
-      val brokers = kafkaUtils.getAsJson("brokers")
+      val username = getAsJson("user")
+      val password = getAsJson("password")
+      val admin_url = getAsJson("kafka_admin_url")
+      val brokers = getAsJson("brokers")
 
       createTrigger(assetHelper, triggerName, parameters = Map(
         "user" -> username,
         "password" -> password,
-        "api_key" -> kafkaUtils.getAsJson("api_key"),
+        "api_key" -> getAsJson("api_key"),
         "kafka_admin_url" -> admin_url,
         "kafka_brokers_sasl" -> brokers,
         "topic" -> topic.toJson,
@@ -263,17 +245,15 @@ class MessageHubFeedTests
 
     (wp, assetHelper) =>
       val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
-      println(s"Creating trigger $triggerName")
-
-      val username = kafkaUtils.getAsJson("user")
-      val password = kafkaUtils.getAsJson("password")
-      val admin_url = kafkaUtils.getAsJson("kafka_admin_url")
-      val brokers = kafkaUtils.getAsJson("brokers")
+      val username = getAsJson("user")
+      val password = getAsJson("password")
+      val admin_url = getAsJson("kafka_admin_url")
+      val brokers = getAsJson("brokers")
 
       createTrigger(assetHelper, triggerName, parameters = Map(
         "user" -> username,
         "password" -> password,
-        "api_key" -> kafkaUtils.getAsJson("api_key"),
+        "api_key" -> getAsJson("api_key"),
         "kafka_admin_url" -> admin_url,
         "kafka_brokers_sasl" -> brokers,
         "topic" -> topic.toJson,
@@ -299,31 +279,22 @@ class MessageHubFeedTests
 
     (wp, assetHelper) =>
       val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
-      println(s"Creating trigger $triggerName")
-
-      val username = kafkaUtils.getAsJson("user")
-      val password = kafkaUtils.getAsJson("password")
-      val admin_url = kafkaUtils.getAsJson("kafka_admin_url")
-      val brokers = kafkaUtils.getAsJson("brokers")
-
-      val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
-        (trigger, _) =>
-          trigger.create(triggerName, feed = Some(actionName), parameters = Map(
-            "user" -> username,
-            "password" -> password,
-            "api_key" -> kafkaUtils.getAsJson("api_key"),
-            "kafka_admin_url" -> admin_url,
-            "kafka_brokers_sasl" -> brokers,
-            "topic" -> topic.toJson,
-            "isJSONData" -> true.toJson,
-            "isBinaryKey" -> false.toJson,
-            "isBinaryValue" -> false.toJson
-          ))
-      }
+      val username = getAsJson("user")
+      val password = getAsJson("password")
+      val admin_url = getAsJson("kafka_admin_url")
+      val brokers = getAsJson("brokers")
 
-      withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
-        _.response.success shouldBe true
-      }
+      createTrigger(assetHelper, triggerName, parameters = Map(
+        "user" -> username,
+        "password" -> password,
+        "api_key" -> getAsJson("api_key"),
+        "kafka_admin_url" -> admin_url,
+        "kafka_brokers_sasl" -> brokers,
+        "topic" -> topic.toJson,
+        "isJSONData" -> true.toJson,
+        "isBinaryKey" -> false.toJson,
+        "isBinaryValue" -> false.toJson
+      ))
 
       val readRunResult = wsk.action.invoke(actionName, parameters = Map(
         "triggerName" -> triggerName.toJson,
@@ -384,14 +355,13 @@ class MessageHubFeedTests
     (wp, assetHelper) =>
       val key = "TheKey"
       val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
-      println(s"Creating trigger $triggerName")
 
       createTrigger(assetHelper, triggerName, parameters = Map(
-        "user" -> kafkaUtils.getAsJson("user"),
-        "password" -> kafkaUtils.getAsJson("password"),
-        "api_key" -> kafkaUtils.getAsJson("api_key"),
-        "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
-        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+        "user" -> getAsJson("user"),
+        "password" -> getAsJson("password"),
+        "api_key" -> getAsJson("api_key"),
+        "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+        "kafka_brokers_sasl" -> getAsJson("brokers"),
         "topic" -> topic.toJson
       ))
 
@@ -411,14 +381,11 @@ class MessageHubFeedTests
         trigger.get(name, NOT_FOUND)
       }
 
-      println("Giving the consumer a moment to get ready")
-      Thread.sleep(consumerInitTime)
-
       println("Producing a message")
       withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
-        "user" -> kafkaUtils.getAsJson("user"),
-        "password" -> kafkaUtils.getAsJson("password"),
-        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+        "user" -> getAsJson("user"),
+        "password" -> getAsJson("password"),
+        "kafka_brokers_sasl" -> getAsJson("brokers"),
         "topic" -> topic.toJson,
         "key" -> key.toJson,
         "value" -> verificationName1.toJson
@@ -455,9 +422,9 @@ class MessageHubFeedTests
 
       println("Producing a message")
       withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
-        "user" -> kafkaUtils.getAsJson("user"),
-        "password" -> kafkaUtils.getAsJson("password"),
-        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+        "user" -> getAsJson("user"),
+        "password" -> getAsJson("password"),
+        "kafka_brokers_sasl" -> getAsJson("brokers"),
         "topic" -> topic.toJson,
         "key" -> key.toJson,
         "value" -> verificationName2.toJson
@@ -474,16 +441,15 @@ class MessageHubFeedTests
     (wp, assetHelper) =>
       val key = "TheKey"
       val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
-      println(s"Creating trigger $triggerName")
 
       createTrigger(assetHelper, triggerName, parameters = Map(
         "__bx_creds" -> Map(
           "messagehub" -> Map(
-            "user" -> kafkaUtils.getAsJson("user"),
-            "password" -> kafkaUtils.getAsJson("password"),
-            "api_key" -> kafkaUtils.getAsJson("api_key"),
-            "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
-            "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"))).toJson,
+            "user" -> getAsJson("user"),
+            "password" -> getAsJson("password"),
+            "api_key" -> getAsJson("api_key"),
+            "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+            "kafka_brokers_sasl" -> getAsJson("brokers"))).toJson,
         "topic" -> topic.toJson
       ))
 
@@ -508,9 +474,9 @@ class MessageHubFeedTests
 
       println("Producing a message")
       withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
-        "user" -> kafkaUtils.getAsJson("user"),
-        "password" -> kafkaUtils.getAsJson("password"),
-        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+        "user" -> getAsJson("user"),
+        "password" -> getAsJson("password"),
+        "kafka_brokers_sasl" -> getAsJson("brokers"),
         "topic" -> topic.toJson,
         "key" -> key.toJson,
         "value" -> verificationName1.toJson
@@ -521,19 +487,6 @@ class MessageHubFeedTests
       retry(wsk.trigger.get(verificationName1), 60, Some(1.second))
   }
 
-  def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = {
-    val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) {
-      (trigger, _) =>
-        trigger.create(name, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = parameters)
-    }
-
-    withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
-      activation =>
-        // should be successful
-        activation.response.success shouldBe true
-    }
-  }
-
   def generateMessage(prefix: String, size: Int): String = {
     val longString = Array.fill[String](size)("0").mkString("")
     s"${prefix}${longString}"
diff --git a/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala b/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
index 4170ab2..baedd0c 100644
--- a/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
+++ b/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
@@ -19,7 +19,6 @@ package system.packages
 import system.utils.KafkaUtils
 
 import scala.concurrent.duration.DurationInt
-import scala.language.postfixOps
 import org.junit.runner.RunWith
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FlatSpec
@@ -49,7 +48,8 @@ class MessageHubMultiWorkersTest extends FlatSpec
   with TestHelpers
   with WskTestHelpers
   with JsHelpers
-  with StreamLogging {
+  with StreamLogging
+  with KafkaUtils {
 
   val topic = "test"
 
@@ -58,7 +58,6 @@ class MessageHubMultiWorkersTest extends FlatSpec
 
   val messagingPackage = "/whisk.system/messaging"
   val messageHubFeed = "messageHubFeed"
-
   val dbProtocol = WhiskProperties.getProperty("db.protocol")
   val dbHost = WhiskProperties.getProperty("db.host")
   val dbPort = WhiskProperties.getProperty("db.port").toInt
@@ -66,11 +65,8 @@ class MessageHubMultiWorkersTest extends FlatSpec
   val dbPassword = WhiskProperties.getProperty("db.password")
   val dbPrefix = WhiskProperties.getProperty(WhiskConfig.dbPrefix)
   val dbName = s"${dbPrefix}ow_kafka_triggers"
-
   val client = new ExtendedCouchDbRestClient(dbProtocol, dbHost, dbPort, dbUsername, dbPassword, dbName)
 
-  val kafkaUtils = new KafkaUtils
-
   behavior of "Mussage Hub Feed"
 
   ignore should "assign two triggers to same worker when only worker0 is available" in withAssetCleaner(wskprops) {
@@ -187,26 +183,13 @@ class MessageHubMultiWorkersTest extends FlatSpec
       })
   }
 
-  def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = {
-    val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) {
-      (trigger, _) =>
-        trigger.create(name, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = parameters)
-    }
-
-    withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
-      activation =>
-        // should be successful
-        activation.response.success shouldBe true
-    }
-  }
-
   def constructParams(workers: List[String]) = {
     Map(
-      "user" -> kafkaUtils.getAsJson("user"),
-      "password" -> kafkaUtils.getAsJson("password"),
-      "api_key" -> kafkaUtils.getAsJson("api_key"),
-      "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
-      "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+      "user" -> getAsJson("user"),
+      "password" -> getAsJson("password"),
+      "api_key" -> getAsJson("api_key"),
+      "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+      "kafka_brokers_sasl" -> getAsJson("brokers"),
       "topic" -> topic.toJson,
       "workers" -> workers.toJson
     )
diff --git a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
index 5cddefb..4ba4064 100644
--- a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
@@ -52,7 +52,8 @@ class MessageHubProduceTests
     with BeforeAndAfterAll
     with TestHelpers
     with WskTestHelpers
-    with JsHelpers {
+    with JsHelpers
+    with KafkaUtils {
 
     val topic = "test"
     val sessionTimeout = 10 seconds
@@ -63,19 +64,15 @@ class MessageHubProduceTests
     val messagingPackage = "/whisk.system/messaging"
     val messageHubFeed = "messageHubFeed"
     val messageHubProduce = "messageHubProduce"
-
     val consumerInitTime = 10000 // ms
-
-    val kafkaUtils = new KafkaUtils
-
     val maxRetries = System.getProperty("max.retries", "60").toInt
 
     // these parameter values are 100% valid and should work as-is
     val validParameters = Map(
-        "user" -> kafkaUtils.getAsJson("user"),
-        "password" -> kafkaUtils.getAsJson("password"),
+        "user" -> getAsJson("user"),
+        "password" -> getAsJson("password"),
         "topic" -> topic.toJson,
-        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+        "kafka_brokers_sasl" -> getAsJson("brokers"),
         "value" -> "Big Trouble is actually a really good Tim Allen movie. Seriously.".toJson)
 
     behavior of "Message Hub Produce action"
@@ -135,26 +132,14 @@ class MessageHubProduceTests
 
         (wp, assetHelper) =>
             val triggerName = s"/_/binaryValueTrigger-$currentTime"
-            println(s"Creating trigger ${triggerName}")
-
-            val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
-                (trigger, _) =>
-                    trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
-                        "user" -> kafkaUtils.getAsJson("user"),
-                        "password" -> kafkaUtils.getAsJson("password"),
-                        "api_key" -> kafkaUtils.getAsJson("api_key"),
-                        "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
-                        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
-                        "topic" -> topic.toJson))
-            }
 
-            withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
-                _.response.success shouldBe true
-            }
-
-            // It takes a moment for the consumer to fully initialize.
-            println("Giving the consumer a moment to get ready")
-            Thread.sleep(consumerInitTime)
+            createTrigger(assetHelper, triggerName, parameters = Map(
+                "user" -> getAsJson("user"),
+                "password" -> getAsJson("password"),
+                "api_key" -> getAsJson("api_key"),
+                "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+                "kafka_brokers_sasl" -> getAsJson("brokers"),
+                "topic" -> topic.toJson))
 
             val defaultAction = Some("dat/createTriggerActions.js")
             val defaultActionName = s"helloKafka-${currentTime}"
@@ -191,26 +176,14 @@ class MessageHubProduceTests
 
         (wp, assetHelper) =>
             val triggerName = s"/_/binaryKeyTrigger-$currentTime"
-            println(s"Creating trigger ${triggerName}")
-
-            val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
-                (trigger, _) =>
-                    trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
-                        "user" -> kafkaUtils.getAsJson("user"),
-                        "password" -> kafkaUtils.getAsJson("password"),
-                        "api_key" -> kafkaUtils.getAsJson("api_key"),
-                        "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
-                        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
-                        "topic" -> topic.toJson))
-            }
-
-            withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
-                _.response.success shouldBe true
-            }
 
-            // It takes a moment for the consumer to fully initialize.
-            println("Giving the consumer a moment to get ready")
-            Thread.sleep(consumerInitTime)
+            createTrigger(assetHelper, triggerName, parameters = Map(
+                "user" -> getAsJson("user"),
+                "password" -> getAsJson("password"),
+                "api_key" -> getAsJson("api_key"),
+                "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+                "kafka_brokers_sasl" -> getAsJson("brokers"),
+                "topic" -> topic.toJson))
 
             val defaultAction = Some("dat/createTriggerActionsFromKey.js")
             val defaultActionName = s"helloKafka-${currentTime}"
diff --git a/tests/src/test/scala/system/stress/StressTest.scala b/tests/src/test/scala/system/stress/StressTest.scala
index b174323..bd413e0 100644
--- a/tests/src/test/scala/system/stress/StressTest.scala
+++ b/tests/src/test/scala/system/stress/StressTest.scala
@@ -42,7 +42,8 @@ class BasicStressTest
     with Matchers
     with WskActorSystem
     with TestHelpers
-    with WskTestHelpers {
+    with WskTestHelpers
+    with KafkaUtils {
 
     val topic = "test"
     val sessionTimeout = 10 seconds
@@ -54,8 +55,6 @@ class BasicStressTest
     val messageHubFeed = "messageHubFeed"
     val messageHubProduce = "messageHubProduce"
 
-    val kafkaUtils = new KafkaUtils
-
     behavior of "Message Hub provider"
 
     it should "rapidly create and delete many triggers" in {
@@ -80,11 +79,11 @@ class BasicStressTest
             val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
             println(s"\nCreating trigger #${iterationLabel}: ${triggerName}")
             val feedCreationResult = wsk.trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
-                    "user" -> kafkaUtils.getAsJson("user"),
-                    "password" -> kafkaUtils.getAsJson("password"),
-                    "api_key" -> kafkaUtils.getAsJson("api_key"),
-                    "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
-                    "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+                    "user" -> getAsJson("user"),
+                    "password" -> getAsJson("password"),
+                    "api_key" -> getAsJson("api_key"),
+                    "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+                    "kafka_brokers_sasl" -> getAsJson("brokers"),
                     "topic" -> topic.toJson))
 
             println("Waiting for trigger create")
diff --git a/tests/src/test/scala/system/utils/KafkaUtils.scala b/tests/src/test/scala/system/utils/KafkaUtils.scala
index be17d6c..e01c715 100644
--- a/tests/src/test/scala/system/utils/KafkaUtils.scala
+++ b/tests/src/test/scala/system/utils/KafkaUtils.scala
@@ -17,24 +17,29 @@
 
 package system.utils
 
-import common.TestUtils
-
 import java.util.HashMap
 import java.util.Properties
+
+import com.jayway.restassured.RestAssured
+import com.jayway.restassured.config.{RestAssuredConfig, SSLConfig}
 import javax.security.auth.login.Configuration
 import javax.security.auth.login.AppConfigurationEntry
-
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.KafkaProducer
 
 import scala.collection.mutable.ListBuffer
-
 import spray.json.DefaultJsonProtocol._
 import spray.json._
-
+import system.packages.ActionHelper._
 import whisk.utils.JsHelpers
 
+import scala.concurrent.duration.DurationInt
+import scala.language.postfixOps
+import common.TestHelpers
+import common.TestUtils
+import common.WskTestHelpers
+import whisk.utils.retry
 
-class KafkaUtils {
+trait KafkaUtils extends TestHelpers with WskTestHelpers {
     lazy val messageHubProps = KafkaUtils.initializeMessageHub()
 
     def createProducer() : KafkaProducer[String, String] = {
@@ -52,9 +57,71 @@ class KafkaUtils {
             case key => this(key).asInstanceOf[String].toJson
         }
     }
+
+    val sslconfig = {
+        val inner = new SSLConfig().allowAllHostnames()
+        val config = inner.relaxedHTTPSValidation()
+        new RestAssuredConfig().sslConfig(config)
+    }
+
+    def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = {
+        println(s"Creating trigger $name")
+
+        val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) {
+            (trigger, _) =>
+                trigger.create(name, feed = Some(s"/whisk.system/messaging/messageHubFeed"), parameters = parameters)
+        }
+
+        withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
+            activation =>
+                // should be successful
+                activation.response.success shouldBe true
+
+                // It takes a moment for the consumer to fully initialize.
+                println("Giving the consumer a moment to get ready")
+                Thread.sleep(KafkaUtils.consumerInitTime)
+
+                val uuid = activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"", "")
+                consumerExists(uuid)
+        }
+    }
+
+
+    def consumerExists(uuid: String) = {
+        println("Checking health endpoint(s) for existence of consumer uuid")
+        // get /health endpoint(s) and ensure it contains the new uuid
+        val healthUrls: Array[String] = System.getProperty("health_url").split("\\s*,\\s*").filterNot(_.isEmpty)
+        assert(healthUrls.size != 0)
+
+        retry({
+            val uuids: Array[(String, JsValue)] = healthUrls.flatMap(u => {
+                val response = RestAssured.given().config(sslconfig).get(u)
+                assert(response.statusCode() == 200)
+
+                response.asString()
+                  .parseJson
+                  .asJsObject
+                  .getFields("consumers")
+                  .head
+                  .convertTo[JsArray]
+                  .elements
+                  .flatMap(c => {
+                      val consumer = c.asJsObject.fields.head
+                      consumer match {
+                          case (u, v) if u == uuid && v.asJsObject.getFields("currentState").head == "Running".toJson => Some(consumer)
+                          case _ => None
+                      }
+                  })
+            })
+
+            assert(uuids.nonEmpty)
+        }, N = 60, waitBeforeRetry = Some(1.second))
+    }
 }
 
 object KafkaUtils {
+    val consumerInitTime = 10000 // ms
+
     def asKafkaProducerProps(props : Map[String,Object]) : Properties = {
         val requiredKeys = List("brokers",
                                 "user",


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services