You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by du...@apache.org on 2018/04/17 17:52:38 UTC

[incubator-openwhisk-package-kafka] branch master updated: Remove use of messageHubProduce action in health test and use java producer (#264)

This is an automated email from the ASF dual-hosted git repository.

dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new 88aa2ef  Remove use of messageHubProduce action in health test and use java producer (#264)
88aa2ef is described below

commit 88aa2ef4825f1a0e2a26db79b1edd5d4753c834c
Author: Adnan Baruni <ab...@users.noreply.github.com>
AuthorDate: Tue Apr 17 12:52:36 2018 -0500

    Remove use of messageHubProduce action in health test and use java producer (#264)
---
 .../test/scala/system/health/BasicHealthTest.scala | 27 ++++++++++++++--------
 1 file changed, 18 insertions(+), 9 deletions(-)

diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala b/tests/src/test/scala/system/health/BasicHealthTest.scala
index c80458c..799b4db 100644
--- a/tests/src/test/scala/system/health/BasicHealthTest.scala
+++ b/tests/src/test/scala/system/health/BasicHealthTest.scala
@@ -17,6 +17,8 @@
 
 package system.health
 
+import java.util.concurrent.{TimeUnit, TimeoutException}
+
 import system.utils.KafkaUtils
 
 import scala.concurrent.duration.DurationInt
@@ -34,6 +36,7 @@ import common.WskTestHelpers
 import spray.json._
 import spray.json.DefaultJsonProtocol._
 import com.jayway.restassured.RestAssured
+import org.apache.kafka.clients.producer.ProducerRecord
 import whisk.utils.retry;
 
 @RunWith(classOf[JUnitRunner])
@@ -163,15 +166,21 @@ class BasicHealthTest
       val key = "TheKey"
 
       println("Producing a message")
-      withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
-        "user" -> kafkaUtils.getAsJson("user"),
-        "password" -> kafkaUtils.getAsJson("password"),
-        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
-        "topic" -> topic.toJson,
-        "key" -> key.toJson,
-        "value" -> currentTime.toJson
-      ))) {
-        _.response.success shouldBe true
+      val producer = kafkaUtils.createProducer()
+      val record = new ProducerRecord(topic, key, currentTime)
+      val future = producer.send(record)
+
+      producer.flush()
+      producer.close()
+
+      try {
+        val result = future.get(60, TimeUnit.SECONDS)
+
+        println(s"Produced record to topic: ${result.topic()} on partition: ${result.partition()} at offset: ${result.offset()} with key: $key and value: $currentTime.")
+      } catch {
+        case e: TimeoutException =>
+          fail(s"TimeoutException received waiting for message to be produced to topic: $topic with key: $key and value: $value. ${e.getMessage}")
+        case e: Exception => throw e
       }
 
       retry({

-- 
To stop receiving notification emails like this one, please contact
dubeejw@apache.org.