You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ja...@apache.org on 2018/04/27 15:59:30 UTC

[incubator-openwhisk-package-kafka] branch master updated: log timestamp of produced message. ensure consumer created before producing message by polling health endpoint for uuid (#266)

This is an automated email from the ASF dual-hosted git repository.

japetrsn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b0ae80  log timestamp of produced message. ensure consumer created before producing message by polling health endpoint for uuid (#266)
8b0ae80 is described below

commit 8b0ae8049b26e86262f0c7ae7ea7b1ab28c6ecf4
Author: Adnan Baruni <ab...@users.noreply.github.com>
AuthorDate: Fri Apr 27 10:59:23 2018 -0500

    log timestamp of produced message. ensure consumer created before producing message by polling health endpoint for uuid (#266)
---
 .../test/scala/system/health/BasicHealthTest.scala | 43 ++++++++++++++++++----
 1 file changed, 36 insertions(+), 7 deletions(-)

diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala b/tests/src/test/scala/system/health/BasicHealthTest.scala
index 799b4db..090352c 100644
--- a/tests/src/test/scala/system/health/BasicHealthTest.scala
+++ b/tests/src/test/scala/system/health/BasicHealthTest.scala
@@ -144,12 +144,41 @@ class BasicHealthTest
       }
 
       withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
-        _.response.success shouldBe true
-      }
+        activation =>
+          // should be successful
+          activation.response.success shouldBe true
+
+          // It takes a moment for the consumer to fully initialize.
+          println("Giving the consumer a moment to get ready")
+          Thread.sleep(consumerInitTime)
+
+          val uuid = activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"", "")
 
-      // It takes a moment for the consumer to fully initialize.
-      println("Giving the consumer a moment to get ready")
-      Thread.sleep(consumerInitTime)
+          println("Checking health endpoint(s) for existence of consumer uuid")
+          // get /health endpoint(s) and ensure it contains the new uuid
+          val healthUrls = System.getProperty("health_url").split("\\s*,\\s*").filterNot(_.isEmpty)
+          healthUrls shouldNot be(empty)
+
+          retry({
+            val uuids = healthUrls.flatMap(u => {
+              val response = RestAssured.given().get(u)
+              response.statusCode() should be(200)
+              response.asString()
+                .parseJson
+                .asJsObject
+                .getFields("consumers")
+                .head
+                .convertTo[JsArray]
+                .elements
+                .flatMap(c => {
+                  c.asJsObject.fields.keySet
+                })
+            }).toList
+
+            uuids should contain(uuid)
+
+          }, N = 10, waitBeforeRetry = Some(1.second))
+      }
 
       val defaultAction = Some(TestUtils.getTestActionFilename("hello.js"))
       val defaultActionName = s"helloKafka-$currentTime"
@@ -165,7 +194,7 @@ class BasicHealthTest
       // key to use for the produced message
       val key = "TheKey"
 
-      println("Producing a message")
+      println(s"Producing message with key: $key and value: $currentTime")
       val producer = kafkaUtils.createProducer()
       val record = new ProducerRecord(topic, key, currentTime)
       val future = producer.send(record)
@@ -176,7 +205,7 @@ class BasicHealthTest
       try {
         val result = future.get(60, TimeUnit.SECONDS)
 
-        println(s"Produced record to topic: ${result.topic()} on partition: ${result.partition()} at offset: ${result.offset()} with key: $key and value: $currentTime.")
+        println(s"Produced message to topic: ${result.topic()} on partition: ${result.partition()} at offset: ${result.offset()} with timestamp: ${result.timestamp()}.")
       } catch {
         case e: TimeoutException =>
           fail(s"TimeoutException received waiting for message to be produced to topic: $topic with key: $key and value: $value. ${e.getMessage}")

-- 
To stop receiving notification emails like this one, please contact
japetrsn@apache.org.