You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/11/07 15:22:48 UTC
[GitHub] jasonpet closed pull request #292: Ensure test consumers exists
before producing messages
jasonpet closed pull request #292: Ensure test consumers exists before producing messages
URL: https://github.com/apache/incubator-openwhisk-package-kafka/pull/292
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala b/tests/src/test/scala/system/health/BasicHealthTest.scala
index 2b99938..949bce6 100644
--- a/tests/src/test/scala/system/health/BasicHealthTest.scala
+++ b/tests/src/test/scala/system/health/BasicHealthTest.scala
@@ -19,7 +19,6 @@ package system.health
import java.util.concurrent.{TimeUnit, TimeoutException}
-import com.jayway.restassured.RestAssured
import common.TestUtils.NOT_FOUND
import common._
import org.apache.kafka.clients.producer.ProducerRecord
@@ -43,7 +42,8 @@ class BasicHealthTest
with TestHelpers
with WskTestHelpers
with Inside
- with JsHelpers {
+ with JsHelpers
+ with KafkaUtils {
val topic = "test"
val sessionTimeout = 10 seconds
@@ -55,125 +55,24 @@ class BasicHealthTest
val messageHubFeed = "messageHubFeed"
val messageHubProduce = "messageHubProduce"
val actionName = s"$messagingPackage/$messageHubFeed"
-
- val consumerInitTime = 10000 // ms
-
- val kafkaUtils = new KafkaUtils
-
val maxRetries = System.getProperty("max.retries", "60").toInt
behavior of "Message Hub feed"
- it should "create a new trigger" in withAssetCleaner(wskprops) {
- val triggerName = s"newTrigger-${System.currentTimeMillis}"
- println(s"Creating trigger $triggerName")
-
- (wp, assetHelper) =>
- val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
- (trigger, _) =>
- trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "api_key" -> kafkaUtils.getAsJson("api_key"),
- "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
- "topic" -> topic.toJson
- ))
- }
-
- withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
- activation =>
- // should be successful
- activation.response.success shouldBe true
-
- // It takes a moment for the consumer to fully initialize.
- println("Giving the consumer a moment to get ready")
- Thread.sleep(consumerInitTime)
-
- val uuid = activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"", "")
-
- println("Checking health endpoint(s) for existence of consumer uuid")
- // get /health endpoint(s) and ensure it contains the new uuid
- val healthUrls = System.getProperty("health_url").split("\\s*,\\s*").filterNot(_.isEmpty)
- healthUrls shouldNot be(empty)
-
- retry({
- val uuids = healthUrls.flatMap(u => {
- val response = RestAssured.given().get(u)
- response.statusCode() should be(200)
- response.asString()
- .parseJson
- .asJsObject
- .getFields("consumers")
- .head
- .convertTo[JsArray]
- .elements
- .flatMap(c => {
- c.asJsObject.fields.keySet
- })
- }).toList
-
- uuids should contain(uuid)
-
- }, N = 10, waitBeforeRetry = Some(1.second))
- }
- }
-
- it should "fire a trigger when a message is posted to message hub" in withAssetCleaner(wskprops) {
+ it should "create a consumer and fire a trigger when a message is posted to messagehub" in withAssetCleaner(wskprops) {
val currentTime = s"${System.currentTimeMillis}"
(wp, assetHelper) =>
val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
- println(s"Creating trigger $triggerName")
-
- val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
- (trigger, _) =>
- trigger.create(triggerName, feed = Some(actionName), parameters = Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "api_key" -> kafkaUtils.getAsJson("api_key"),
- "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
- "topic" -> topic.toJson
- ))
- }
-
- withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
- activation =>
- // should be successful
- activation.response.success shouldBe true
- // It takes a moment for the consumer to fully initialize.
- println("Giving the consumer a moment to get ready")
- Thread.sleep(consumerInitTime)
-
- val uuid = activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"", "")
-
- println("Checking health endpoint(s) for existence of consumer uuid")
- // get /health endpoint(s) and ensure it contains the new uuid
- val healthUrls = System.getProperty("health_url").split("\\s*,\\s*").filterNot(_.isEmpty)
- healthUrls shouldNot be(empty)
-
- retry({
- val uuids = healthUrls.flatMap(u => {
- val response = RestAssured.given().get(u)
- response.statusCode() should be(200)
- response.asString()
- .parseJson
- .asJsObject
- .getFields("consumers")
- .head
- .convertTo[JsArray]
- .elements
- .flatMap(c => {
- c.asJsObject.fields.keySet
- })
- }).toList
-
- uuids should contain(uuid)
-
- }, N = 10, waitBeforeRetry = Some(1.second))
- }
+ createTrigger(assetHelper, triggerName, parameters = Map(
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
+ "api_key" -> getAsJson("api_key"),
+ "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"),
+ "topic" -> topic.toJson
+ ))
// This action creates a trigger if it gets executed.
// The name of the trigger will be the message, that has been send to kafka.
@@ -201,7 +100,7 @@ class BasicHealthTest
}
println(s"Producing message with key: $key and value: $verificationName")
- val producer = kafkaUtils.createProducer()
+ val producer = createProducer()
val record = new ProducerRecord(topic, key, verificationName)
val future = producer.send(record)
@@ -231,17 +130,17 @@ class BasicHealthTest
val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
println(s"Creating trigger $triggerName")
- val username = kafkaUtils.getAsJson("user")
- val password = kafkaUtils.getAsJson("password")
- val admin_url = kafkaUtils.getAsJson("kafka_admin_url")
- val brokers = kafkaUtils.getAsJson("brokers")
+ val username = getAsJson("user")
+ val password = getAsJson("password")
+ val admin_url = getAsJson("kafka_admin_url")
+ val brokers = getAsJson("brokers")
val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
(trigger, _) =>
trigger.create(triggerName, feed = Some(actionName), parameters = Map(
"user" -> username,
"password" -> password,
- "api_key" -> kafkaUtils.getAsJson("api_key"),
+ "api_key" -> getAsJson("api_key"),
"kafka_admin_url" -> admin_url,
"kafka_brokers_sasl" -> brokers,
"topic" -> topic.toJson,
diff --git a/tests/src/test/scala/system/packages/KafkaProduceTests.scala b/tests/src/test/scala/system/packages/KafkaProduceTests.scala
index 85d5007..e3d9094 100644
--- a/tests/src/test/scala/system/packages/KafkaProduceTests.scala
+++ b/tests/src/test/scala/system/packages/KafkaProduceTests.scala
@@ -45,7 +45,8 @@ class KafkaProduceTests
with BeforeAndAfterAll
with TestHelpers
with WskTestHelpers
- with JsHelpers {
+ with JsHelpers
+ with KafkaUtils {
val topic = "test"
val sessionTimeout = 10 seconds
@@ -56,8 +57,6 @@ class KafkaProduceTests
val actionName = "kafkaProduceAction"
val actionFile = "../action/kafkaProduce.py"
- val kafkaUtils = new KafkaUtils
-
behavior of "Kafka Produce action"
override def beforeAll() {
@@ -73,7 +72,7 @@ class KafkaProduceTests
def testMissingParameter(missingParam : String) = {
var fullParamsMap = Map(
"topic" -> topic.toJson,
- "brokers" -> kafkaUtils.getAsJson("brokers"),
+ "brokers" -> getAsJson("brokers"),
"value" -> "This will fail".toJson)
var missingParamsMap = fullParamsMap.filterKeys(_ != missingParam)
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index 04ed5c9..4263a1d 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -36,7 +36,6 @@ import common.WskActorSystem
import common.WskProps
import common.WskTestHelpers
import ActionHelper._
-
import common.TestUtils.NOT_FOUND
import whisk.utils.retry
@@ -49,19 +48,15 @@ class MessageHubFeedTests
with BeforeAndAfterAll
with TestHelpers
with WskTestHelpers
- with JsHelpers {
+ with JsHelpers
+ with KafkaUtils {
val topic = "test"
val sessionTimeout = 10 seconds
-
val messagingPackage = "/whisk.system/messaging"
val messageHubFeed = "messageHubFeed"
val messageHubProduce = "messageHubProduce"
-
val consumerInitTime = 10000 // ms
-
- val kafkaUtils = new KafkaUtils
-
val maxRetries = System.getProperty("max.retries", "60").toInt
implicit val wskprops = WskProps()
@@ -126,14 +121,13 @@ class MessageHubFeedTests
(wp, assetHelper) =>
val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
- println(s"Creating trigger ${triggerName}")
createTrigger(assetHelper, triggerName, parameters = Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "api_key" -> kafkaUtils.getAsJson("api_key"),
- "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
+ "api_key" -> getAsJson("api_key"),
+ "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"),
"topic" -> topic.toJson,
"isBinaryKey" -> false.toJson,
"isBinaryValue" -> false.toJson))
@@ -158,20 +152,15 @@ class MessageHubFeedTests
trigger.get(name, NOT_FOUND)
}
- // It takes a moment for the consumer to fully initialize.
- println("Giving the consumer a moment to get ready")
- Thread.sleep(consumerInitTime)
-
// Rapidly produce two messages whose size are each greater than half the allowed payload limit.
// This should ensure that the feed fires these as two separate triggers.
println("Rapidly producing two large messages")
- val producer = kafkaUtils.createProducer()
+ val producer = createProducer()
val firstMessage = new ProducerRecord(topic, verificationName1, generateMessage(s"first${currentTime}", testPayloadSize))
val secondMessage = new ProducerRecord(topic, verificationName2, generateMessage(s"second${currentTime}", testPayloadSize))
producer.send(firstMessage)
producer.send(secondMessage)
producer.close()
-
retry(wsk.trigger.get(verificationName1), 60, Some(1.second))
retry(wsk.trigger.get(verificationName2), 60, Some(1.second))
}
@@ -185,14 +174,13 @@ class MessageHubFeedTests
(wp, assetHelper) =>
val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
- println(s"Creating trigger ${triggerName}")
createTrigger(assetHelper, triggerName, parameters = Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "api_key" -> kafkaUtils.getAsJson("api_key"),
- "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
+ "api_key" -> getAsJson("api_key"),
+ "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"),
"topic" -> topic.toJson,
"isBinaryKey" -> false.toJson,
"isBinaryValue" -> false.toJson))
@@ -211,12 +199,8 @@ class MessageHubFeedTests
wsk.trigger.get(verificationName, NOT_FOUND)
- // It takes a moment for the consumer to fully initialize.
- println("Giving the consumer a moment to get ready")
- Thread.sleep(consumerInitTime)
-
println("Producing an oversized message")
- val producer = kafkaUtils.createProducer()
+ val producer = createProducer()
val bigMessage = new ProducerRecord(topic, verificationName, generateMessage(s"${currentTime}", testPayloadSize))
producer.send(bigMessage)
producer.close()
@@ -229,17 +213,15 @@ class MessageHubFeedTests
(wp, assetHelper) =>
val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
- println(s"Creating trigger ${triggerName}")
-
- val username = kafkaUtils.getAsJson("user")
- val password = kafkaUtils.getAsJson("password")
- val admin_url = kafkaUtils.getAsJson("kafka_admin_url")
- val brokers = kafkaUtils.getAsJson("brokers")
+ val username = getAsJson("user")
+ val password = getAsJson("password")
+ val admin_url = getAsJson("kafka_admin_url")
+ val brokers = getAsJson("brokers")
createTrigger(assetHelper, triggerName, parameters = Map(
"user" -> username,
"password" -> password,
- "api_key" -> kafkaUtils.getAsJson("api_key"),
+ "api_key" -> getAsJson("api_key"),
"kafka_admin_url" -> admin_url,
"kafka_brokers_sasl" -> brokers,
"topic" -> topic.toJson,
@@ -263,17 +245,15 @@ class MessageHubFeedTests
(wp, assetHelper) =>
val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
- println(s"Creating trigger $triggerName")
-
- val username = kafkaUtils.getAsJson("user")
- val password = kafkaUtils.getAsJson("password")
- val admin_url = kafkaUtils.getAsJson("kafka_admin_url")
- val brokers = kafkaUtils.getAsJson("brokers")
+ val username = getAsJson("user")
+ val password = getAsJson("password")
+ val admin_url = getAsJson("kafka_admin_url")
+ val brokers = getAsJson("brokers")
createTrigger(assetHelper, triggerName, parameters = Map(
"user" -> username,
"password" -> password,
- "api_key" -> kafkaUtils.getAsJson("api_key"),
+ "api_key" -> getAsJson("api_key"),
"kafka_admin_url" -> admin_url,
"kafka_brokers_sasl" -> brokers,
"topic" -> topic.toJson,
@@ -299,31 +279,22 @@ class MessageHubFeedTests
(wp, assetHelper) =>
val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
- println(s"Creating trigger $triggerName")
-
- val username = kafkaUtils.getAsJson("user")
- val password = kafkaUtils.getAsJson("password")
- val admin_url = kafkaUtils.getAsJson("kafka_admin_url")
- val brokers = kafkaUtils.getAsJson("brokers")
-
- val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
- (trigger, _) =>
- trigger.create(triggerName, feed = Some(actionName), parameters = Map(
- "user" -> username,
- "password" -> password,
- "api_key" -> kafkaUtils.getAsJson("api_key"),
- "kafka_admin_url" -> admin_url,
- "kafka_brokers_sasl" -> brokers,
- "topic" -> topic.toJson,
- "isJSONData" -> true.toJson,
- "isBinaryKey" -> false.toJson,
- "isBinaryValue" -> false.toJson
- ))
- }
+ val username = getAsJson("user")
+ val password = getAsJson("password")
+ val admin_url = getAsJson("kafka_admin_url")
+ val brokers = getAsJson("brokers")
- withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
- _.response.success shouldBe true
- }
+ createTrigger(assetHelper, triggerName, parameters = Map(
+ "user" -> username,
+ "password" -> password,
+ "api_key" -> getAsJson("api_key"),
+ "kafka_admin_url" -> admin_url,
+ "kafka_brokers_sasl" -> brokers,
+ "topic" -> topic.toJson,
+ "isJSONData" -> true.toJson,
+ "isBinaryKey" -> false.toJson,
+ "isBinaryValue" -> false.toJson
+ ))
val readRunResult = wsk.action.invoke(actionName, parameters = Map(
"triggerName" -> triggerName.toJson,
@@ -384,14 +355,13 @@ class MessageHubFeedTests
(wp, assetHelper) =>
val key = "TheKey"
val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
- println(s"Creating trigger $triggerName")
createTrigger(assetHelper, triggerName, parameters = Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "api_key" -> kafkaUtils.getAsJson("api_key"),
- "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
+ "api_key" -> getAsJson("api_key"),
+ "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"),
"topic" -> topic.toJson
))
@@ -411,14 +381,11 @@ class MessageHubFeedTests
trigger.get(name, NOT_FOUND)
}
- println("Giving the consumer a moment to get ready")
- Thread.sleep(consumerInitTime)
-
println("Producing a message")
withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"),
"topic" -> topic.toJson,
"key" -> key.toJson,
"value" -> verificationName1.toJson
@@ -455,9 +422,9 @@ class MessageHubFeedTests
println("Producing a message")
withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"),
"topic" -> topic.toJson,
"key" -> key.toJson,
"value" -> verificationName2.toJson
@@ -474,16 +441,15 @@ class MessageHubFeedTests
(wp, assetHelper) =>
val key = "TheKey"
val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
- println(s"Creating trigger $triggerName")
createTrigger(assetHelper, triggerName, parameters = Map(
"__bx_creds" -> Map(
"messagehub" -> Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "api_key" -> kafkaUtils.getAsJson("api_key"),
- "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"))).toJson,
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
+ "api_key" -> getAsJson("api_key"),
+ "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"))).toJson,
"topic" -> topic.toJson
))
@@ -508,9 +474,9 @@ class MessageHubFeedTests
println("Producing a message")
withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"),
"topic" -> topic.toJson,
"key" -> key.toJson,
"value" -> verificationName1.toJson
@@ -521,19 +487,6 @@ class MessageHubFeedTests
retry(wsk.trigger.get(verificationName1), 60, Some(1.second))
}
- def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = {
- val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) {
- (trigger, _) =>
- trigger.create(name, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = parameters)
- }
-
- withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
- activation =>
- // should be successful
- activation.response.success shouldBe true
- }
- }
-
def generateMessage(prefix: String, size: Int): String = {
val longString = Array.fill[String](size)("0").mkString("")
s"${prefix}${longString}"
diff --git a/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala b/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
index 4170ab2..baedd0c 100644
--- a/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
+++ b/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
@@ -19,7 +19,6 @@ package system.packages
import system.utils.KafkaUtils
import scala.concurrent.duration.DurationInt
-import scala.language.postfixOps
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
@@ -49,7 +48,8 @@ class MessageHubMultiWorkersTest extends FlatSpec
with TestHelpers
with WskTestHelpers
with JsHelpers
- with StreamLogging {
+ with StreamLogging
+ with KafkaUtils {
val topic = "test"
@@ -58,7 +58,6 @@ class MessageHubMultiWorkersTest extends FlatSpec
val messagingPackage = "/whisk.system/messaging"
val messageHubFeed = "messageHubFeed"
-
val dbProtocol = WhiskProperties.getProperty("db.protocol")
val dbHost = WhiskProperties.getProperty("db.host")
val dbPort = WhiskProperties.getProperty("db.port").toInt
@@ -66,11 +65,8 @@ class MessageHubMultiWorkersTest extends FlatSpec
val dbPassword = WhiskProperties.getProperty("db.password")
val dbPrefix = WhiskProperties.getProperty(WhiskConfig.dbPrefix)
val dbName = s"${dbPrefix}ow_kafka_triggers"
-
val client = new ExtendedCouchDbRestClient(dbProtocol, dbHost, dbPort, dbUsername, dbPassword, dbName)
- val kafkaUtils = new KafkaUtils
-
behavior of "Mussage Hub Feed"
ignore should "assign two triggers to same worker when only worker0 is available" in withAssetCleaner(wskprops) {
@@ -187,26 +183,13 @@ class MessageHubMultiWorkersTest extends FlatSpec
})
}
- def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = {
- val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) {
- (trigger, _) =>
- trigger.create(name, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = parameters)
- }
-
- withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
- activation =>
- // should be successful
- activation.response.success shouldBe true
- }
- }
-
def constructParams(workers: List[String]) = {
Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "api_key" -> kafkaUtils.getAsJson("api_key"),
- "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
+ "api_key" -> getAsJson("api_key"),
+ "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"),
"topic" -> topic.toJson,
"workers" -> workers.toJson
)
diff --git a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
index 5cddefb..4ba4064 100644
--- a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
@@ -52,7 +52,8 @@ class MessageHubProduceTests
with BeforeAndAfterAll
with TestHelpers
with WskTestHelpers
- with JsHelpers {
+ with JsHelpers
+ with KafkaUtils {
val topic = "test"
val sessionTimeout = 10 seconds
@@ -63,19 +64,15 @@ class MessageHubProduceTests
val messagingPackage = "/whisk.system/messaging"
val messageHubFeed = "messageHubFeed"
val messageHubProduce = "messageHubProduce"
-
val consumerInitTime = 10000 // ms
-
- val kafkaUtils = new KafkaUtils
-
val maxRetries = System.getProperty("max.retries", "60").toInt
// these parameter values are 100% valid and should work as-is
val validParameters = Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
"topic" -> topic.toJson,
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"),
"value" -> "Big Trouble is actually a really good Tim Allen movie. Seriously.".toJson)
behavior of "Message Hub Produce action"
@@ -135,26 +132,14 @@ class MessageHubProduceTests
(wp, assetHelper) =>
val triggerName = s"/_/binaryValueTrigger-$currentTime"
- println(s"Creating trigger ${triggerName}")
-
- val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
- (trigger, _) =>
- trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "api_key" -> kafkaUtils.getAsJson("api_key"),
- "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
- "topic" -> topic.toJson))
- }
- withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
- _.response.success shouldBe true
- }
-
- // It takes a moment for the consumer to fully initialize.
- println("Giving the consumer a moment to get ready")
- Thread.sleep(consumerInitTime)
+ createTrigger(assetHelper, triggerName, parameters = Map(
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
+ "api_key" -> getAsJson("api_key"),
+ "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"),
+ "topic" -> topic.toJson))
val defaultAction = Some("dat/createTriggerActions.js")
val defaultActionName = s"helloKafka-${currentTime}"
@@ -191,26 +176,14 @@ class MessageHubProduceTests
(wp, assetHelper) =>
val triggerName = s"/_/binaryKeyTrigger-$currentTime"
- println(s"Creating trigger ${triggerName}")
-
- val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
- (trigger, _) =>
- trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "api_key" -> kafkaUtils.getAsJson("api_key"),
- "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
- "topic" -> topic.toJson))
- }
-
- withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
- _.response.success shouldBe true
- }
- // It takes a moment for the consumer to fully initialize.
- println("Giving the consumer a moment to get ready")
- Thread.sleep(consumerInitTime)
+ createTrigger(assetHelper, triggerName, parameters = Map(
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
+ "api_key" -> getAsJson("api_key"),
+ "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"),
+ "topic" -> topic.toJson))
val defaultAction = Some("dat/createTriggerActionsFromKey.js")
val defaultActionName = s"helloKafka-${currentTime}"
diff --git a/tests/src/test/scala/system/stress/StressTest.scala b/tests/src/test/scala/system/stress/StressTest.scala
index b174323..bd413e0 100644
--- a/tests/src/test/scala/system/stress/StressTest.scala
+++ b/tests/src/test/scala/system/stress/StressTest.scala
@@ -42,7 +42,8 @@ class BasicStressTest
with Matchers
with WskActorSystem
with TestHelpers
- with WskTestHelpers {
+ with WskTestHelpers
+ with KafkaUtils {
val topic = "test"
val sessionTimeout = 10 seconds
@@ -54,8 +55,6 @@ class BasicStressTest
val messageHubFeed = "messageHubFeed"
val messageHubProduce = "messageHubProduce"
- val kafkaUtils = new KafkaUtils
-
behavior of "Message Hub provider"
it should "rapidly create and delete many triggers" in {
@@ -80,11 +79,11 @@ class BasicStressTest
val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
println(s"\nCreating trigger #${iterationLabel}: ${triggerName}")
val feedCreationResult = wsk.trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "api_key" -> kafkaUtils.getAsJson("api_key"),
- "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
+ "api_key" -> getAsJson("api_key"),
+ "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"),
"topic" -> topic.toJson))
println("Waiting for trigger create")
diff --git a/tests/src/test/scala/system/utils/KafkaUtils.scala b/tests/src/test/scala/system/utils/KafkaUtils.scala
index be17d6c..e01c715 100644
--- a/tests/src/test/scala/system/utils/KafkaUtils.scala
+++ b/tests/src/test/scala/system/utils/KafkaUtils.scala
@@ -17,24 +17,29 @@
package system.utils
-import common.TestUtils
-
import java.util.HashMap
import java.util.Properties
+
+import com.jayway.restassured.RestAssured
+import com.jayway.restassured.config.{RestAssuredConfig, SSLConfig}
import javax.security.auth.login.Configuration
import javax.security.auth.login.AppConfigurationEntry
-
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.KafkaProducer
import scala.collection.mutable.ListBuffer
-
import spray.json.DefaultJsonProtocol._
import spray.json._
-
+import system.packages.ActionHelper._
import whisk.utils.JsHelpers
+import scala.concurrent.duration.DurationInt
+import scala.language.postfixOps
+import common.TestHelpers
+import common.TestUtils
+import common.WskTestHelpers
+import whisk.utils.retry
-class KafkaUtils {
+trait KafkaUtils extends TestHelpers with WskTestHelpers {
lazy val messageHubProps = KafkaUtils.initializeMessageHub()
def createProducer() : KafkaProducer[String, String] = {
@@ -52,9 +57,71 @@ class KafkaUtils {
case key => this(key).asInstanceOf[String].toJson
}
}
+
+ val sslconfig = {
+ val inner = new SSLConfig().allowAllHostnames()
+ val config = inner.relaxedHTTPSValidation()
+ new RestAssuredConfig().sslConfig(config)
+ }
+
+ def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = {
+ println(s"Creating trigger $name")
+
+ val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) {
+ (trigger, _) =>
+ trigger.create(name, feed = Some(s"/whisk.system/messaging/messageHubFeed"), parameters = parameters)
+ }
+
+ withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
+ activation =>
+ // should be successful
+ activation.response.success shouldBe true
+
+ // It takes a moment for the consumer to fully initialize.
+ println("Giving the consumer a moment to get ready")
+ Thread.sleep(KafkaUtils.consumerInitTime)
+
+ val uuid = activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"", "")
+ consumerExists(uuid)
+ }
+ }
+
+
+ def consumerExists(uuid: String) = {
+ println("Checking health endpoint(s) for existence of consumer uuid")
+ // get /health endpoint(s) and ensure it contains the new uuid
+ val healthUrls: Array[String] = System.getProperty("health_url").split("\\s*,\\s*").filterNot(_.isEmpty)
+ assert(healthUrls.size != 0)
+
+ retry({
+ val uuids: Array[(String, JsValue)] = healthUrls.flatMap(u => {
+ val response = RestAssured.given().config(sslconfig).get(u)
+ assert(response.statusCode() == 200)
+
+ response.asString()
+ .parseJson
+ .asJsObject
+ .getFields("consumers")
+ .head
+ .convertTo[JsArray]
+ .elements
+ .flatMap(c => {
+ val consumer = c.asJsObject.fields.head
+ consumer match {
+ case (u, v) if u == uuid && v.asJsObject.getFields("currentState").head == "Running".toJson => Some(consumer)
+ case _ => None
+ }
+ })
+ })
+
+ assert(uuids.nonEmpty)
+ }, N = 60, waitBeforeRetry = Some(1.second))
+ }
}
object KafkaUtils {
+ val consumerInitTime = 10000 // ms
+
def asKafkaProducerProps(props : Map[String,Object]) : Properties = {
val requiredKeys = List("brokers",
"user",
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services