You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by du...@apache.org on 2018/04/17 17:52:38 UTC
[incubator-openwhisk-package-kafka] branch master updated: Remove
use of messageHubProduce action in health test and use java producer (#264)
This is an automated email from the ASF dual-hosted git repository.
dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git
The following commit(s) were added to refs/heads/master by this push:
new 88aa2ef Remove use of messageHubProduce action in health test and use java producer (#264)
88aa2ef is described below
commit 88aa2ef4825f1a0e2a26db79b1edd5d4753c834c
Author: Adnan Baruni <ab...@users.noreply.github.com>
AuthorDate: Tue Apr 17 12:52:36 2018 -0500
Remove use of messageHubProduce action in health test and use java producer (#264)
---
.../test/scala/system/health/BasicHealthTest.scala | 27 ++++++++++++++--------
1 file changed, 18 insertions(+), 9 deletions(-)
diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala b/tests/src/test/scala/system/health/BasicHealthTest.scala
index c80458c..799b4db 100644
--- a/tests/src/test/scala/system/health/BasicHealthTest.scala
+++ b/tests/src/test/scala/system/health/BasicHealthTest.scala
@@ -17,6 +17,8 @@
package system.health
+import java.util.concurrent.{TimeUnit, TimeoutException}
+
import system.utils.KafkaUtils
import scala.concurrent.duration.DurationInt
@@ -34,6 +36,7 @@ import common.WskTestHelpers
import spray.json._
import spray.json.DefaultJsonProtocol._
import com.jayway.restassured.RestAssured
+import org.apache.kafka.clients.producer.ProducerRecord
import whisk.utils.retry;
@RunWith(classOf[JUnitRunner])
@@ -163,15 +166,21 @@ class BasicHealthTest
val key = "TheKey"
println("Producing a message")
- withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
- "topic" -> topic.toJson,
- "key" -> key.toJson,
- "value" -> currentTime.toJson
- ))) {
- _.response.success shouldBe true
+ val producer = kafkaUtils.createProducer()
+ val record = new ProducerRecord(topic, key, currentTime)
+ val future = producer.send(record)
+
+ producer.flush()
+ producer.close()
+
+ try {
+ val result = future.get(60, TimeUnit.SECONDS)
+
+ println(s"Produced record to topic: ${result.topic()} on partition: ${result.partition()} at offset: ${result.offset()} with key: $key and value: $currentTime.")
+ } catch {
+ case e: TimeoutException =>
+ fail(s"TimeoutException received waiting for message to be produced to topic: $topic with key: $key and value: $value. ${e.getMessage}")
+ case e: Exception => throw e
}
retry({
--
To stop receiving notification emails like this one, please contact
dubeejw@apache.org.