You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ja...@apache.org on 2018/04/27 15:59:30 UTC
[incubator-openwhisk-package-kafka] branch master updated: log
timestamp of produced message. ensure consumer created before producing
message by polling health endpoint for uuid (#266)
This is an automated email from the ASF dual-hosted git repository.
japetrsn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git
The following commit(s) were added to refs/heads/master by this push:
new 8b0ae80 log timestamp of produced message. ensure consumer created before producing message by polling health endpoint for uuid (#266)
8b0ae80 is described below
commit 8b0ae8049b26e86262f0c7ae7ea7b1ab28c6ecf4
Author: Adnan Baruni <ab...@users.noreply.github.com>
AuthorDate: Fri Apr 27 10:59:23 2018 -0500
log timestamp of produced message. ensure consumer created before producing message by polling health endpoint for uuid (#266)
---
.../test/scala/system/health/BasicHealthTest.scala | 43 ++++++++++++++++++----
1 file changed, 36 insertions(+), 7 deletions(-)
diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala b/tests/src/test/scala/system/health/BasicHealthTest.scala
index 799b4db..090352c 100644
--- a/tests/src/test/scala/system/health/BasicHealthTest.scala
+++ b/tests/src/test/scala/system/health/BasicHealthTest.scala
@@ -144,12 +144,41 @@ class BasicHealthTest
}
withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
- _.response.success shouldBe true
- }
+ activation =>
+ // should be successful
+ activation.response.success shouldBe true
+
+ // It takes a moment for the consumer to fully initialize.
+ println("Giving the consumer a moment to get ready")
+ Thread.sleep(consumerInitTime)
+
+ val uuid = activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"", "")
- // It takes a moment for the consumer to fully initialize.
- println("Giving the consumer a moment to get ready")
- Thread.sleep(consumerInitTime)
+ println("Checking health endpoint(s) for existence of consumer uuid")
+ // get /health endpoint(s) and ensure it contains the new uuid
+ val healthUrls = System.getProperty("health_url").split("\\s*,\\s*").filterNot(_.isEmpty)
+ healthUrls shouldNot be(empty)
+
+ retry({
+ val uuids = healthUrls.flatMap(u => {
+ val response = RestAssured.given().get(u)
+ response.statusCode() should be(200)
+ response.asString()
+ .parseJson
+ .asJsObject
+ .getFields("consumers")
+ .head
+ .convertTo[JsArray]
+ .elements
+ .flatMap(c => {
+ c.asJsObject.fields.keySet
+ })
+ }).toList
+
+ uuids should contain(uuid)
+
+ }, N = 10, waitBeforeRetry = Some(1.second))
+ }
val defaultAction = Some(TestUtils.getTestActionFilename("hello.js"))
val defaultActionName = s"helloKafka-$currentTime"
@@ -165,7 +194,7 @@ class BasicHealthTest
// key to use for the produced message
val key = "TheKey"
- println("Producing a message")
+ println(s"Producing message with key: $key and value: $currentTime")
val producer = kafkaUtils.createProducer()
val record = new ProducerRecord(topic, key, currentTime)
val future = producer.send(record)
@@ -176,7 +205,7 @@ class BasicHealthTest
try {
val result = future.get(60, TimeUnit.SECONDS)
- println(s"Produced record to topic: ${result.topic()} on partition: ${result.partition()} at offset: ${result.offset()} with key: $key and value: $currentTime.")
+ println(s"Produced message to topic: ${result.topic()} on partition: ${result.partition()} at offset: ${result.offset()} with timestamp: ${result.timestamp()}.")
} catch {
case e: TimeoutException =>
fail(s"TimeoutException received waiting for message to be produced to topic: $topic with key: $key and value: $value. ${e.getMessage}")
--
To stop receiving notification emails like this one, please contact
japetrsn@apache.org.