You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by du...@apache.org on 2018/09/27 20:18:32 UTC

[incubator-openwhisk-package-kafka] branch feed-test-resiliency created (now 74d5fa6)

This is an automated email from the ASF dual-hosted git repository.

dubeejw pushed a change to branch feed-test-resiliency
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git.


      at 74d5fa6  MessageHubFeedTests Resiliency Updates

This branch includes the following new commits:

     new 74d5fa6  MessageHubFeedTests Resiliency Updates

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-openwhisk-package-kafka] 01/01: MessageHubFeedTests Resiliency Updates

Posted by du...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dubeejw pushed a commit to branch feed-test-resiliency
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git

commit 74d5fa68393984955aacc9ae0b387fbef577526f
Author: dubeejw <jw...@us.ibm.com>
AuthorDate: Thu Sep 27 16:17:52 2018 -0400

    MessageHubFeedTests Resiliency Updates
---
 .../dat/createTriggerActionsFromEncodedMessage.js  |  11 ++
 .../system/packages/MessageHubFeedTests.scala      | 194 +++++----------------
 2 files changed, 56 insertions(+), 149 deletions(-)

diff --git a/tests/dat/createTriggerActionsFromEncodedMessage.js b/tests/dat/createTriggerActionsFromEncodedMessage.js
new file mode 100644
index 0000000..0c27eb8
--- /dev/null
+++ b/tests/dat/createTriggerActionsFromEncodedMessage.js
@@ -0,0 +1,11 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more contributor
+// license agreements; and to You under the Apache License, Version 2.0.
+
+var openwhisk = require('openwhisk');
+
+function main(params) {
+    console.log(JSON.stringify(params));
+    var name = new Buffer(params.messages[0].value, 'base64').toString('ascii');
+    var ow = openwhisk({ignore_certs: true});
+    return ow.triggers.create({name: name});
+}
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index f2e6180..d970d4b 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -20,7 +20,6 @@ import system.utils.KafkaUtils
 
 import scala.concurrent.duration.DurationInt
 import scala.language.postfixOps
-
 import org.junit.runner.RunWith
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FlatSpec
@@ -28,23 +27,17 @@ import org.scalatest.Matchers
 import org.scalatest.Inside
 import org.scalatest.junit.JUnitRunner
 import org.apache.kafka.clients.producer.ProducerRecord
-
 import spray.json.DefaultJsonProtocol._
 import spray.json._
-
 import common.JsHelpers
-import common.TestUtils
 import common.TestHelpers
 import common.Wsk
 import common.WskActorSystem
 import common.WskProps
 import common.WskTestHelpers
-
 import ActionHelper._
 
-import java.util.Base64
-import java.nio.charset.StandardCharsets
-
+import common.TestUtils.NOT_FOUND
 import whisk.utils.retry
 
 @RunWith(classOf[JUnitRunner])
@@ -75,8 +68,6 @@ class MessageHubFeedTests
   val wsk = new Wsk()
   val actionName = s"${messagingPackage}/${messageHubFeed}"
 
-  val defaultAction = Some(TestUtils.getTestActionFilename("hello.js"))
-
   behavior of "Message Hub feed action"
 
   it should "reject invocation when topic argument is missing" in {
@@ -126,78 +117,7 @@ class MessageHubFeedTests
     runActionWithExpectedResult(actionName, "dat/multipleValueTypes.json", expectedOutput, false)
   }
 
-  it should "fire a trigger when a binary message is posted to message hub" in withAssetCleaner(wskprops) {
-    val currentTime = s"${System.currentTimeMillis}"
-
-    (wp, assetHelper) =>
-      val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
-      println(s"Creating trigger ${triggerName}")
-
-      createTrigger(assetHelper, triggerName, parameters = Map(
-        "user" -> kafkaUtils.getAsJson("user"),
-        "password" -> kafkaUtils.getAsJson("password"),
-        "api_key" -> kafkaUtils.getAsJson("api_key"),
-        "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
-        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
-        "topic" -> topic.toJson,
-        "isBinaryKey" -> true.toJson,
-        "isBinaryValue" -> true.toJson))
-
-      val defaultActionName = s"helloKafka-${currentTime}"
-
-      assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
-        action.create(name, defaultAction)
-      }
-      assetHelper.withCleaner(wsk.rule, "rule") { (rule, name) =>
-        rule.create(name, trigger = triggerName, action = defaultActionName)
-      }
-
-      // It takes a moment for the consumer to fully initialize.
-      println("Giving the consumer a moment to get ready")
-      Thread.sleep(consumerInitTime)
-
-      // key to use for the produced message
-      val key = "TheKey"
-      val encodedCurrentTime = Base64.getEncoder.encodeToString(currentTime.getBytes(StandardCharsets.UTF_8))
-      val encodedKey = Base64.getEncoder.encodeToString(key.getBytes(StandardCharsets.UTF_8))
-
-      withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
-        "user" -> kafkaUtils.getAsJson("user"),
-        "password" -> kafkaUtils.getAsJson("password"),
-        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
-        "topic" -> topic.toJson,
-        "key" -> key.toJson,
-        "value" -> currentTime.toJson))) {
-        _.response.success shouldBe true
-      }
-
-      retry({
-        println("Polling for activations")
-        val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
-        assert(activations.nonEmpty)
-
-        val matchingActivations = for {
-          id <- activations
-          activation = wsk.activation.waitForActivation(id)
-          if (activation.isRight && activation.right.get.fields.get("response").toString.contains(encodedCurrentTime))
-        } yield activation.right.get
-
-        assert(matchingActivations.nonEmpty)
-
-        val activation = matchingActivations.head
-        activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
-
-        // assert that there exists a message in the activation which has the expected keys and values
-        val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = encodedCurrentTime)
-        assert(messages.length == 1)
-
-        val message = messages.head
-        message.getFieldPath("topic") shouldBe Some(topic.toJson)
-        message.getFieldPath("key") shouldBe Some(encodedKey.toJson)
-      }, N = 3)
-  }
-
-  it should "not fire a single trigger with an oversized payload" in withAssetCleaner(wskprops) {
+  it should "fire multiple triggers for two large payloads" in withAssetCleaner(wskprops) {
     // payload size should be under the payload limit, but greater than 50% of the limit
     val testPayloadSize = 600000
 
@@ -218,6 +138,7 @@ class MessageHubFeedTests
         "isBinaryKey" -> false.toJson,
         "isBinaryValue" -> false.toJson))
 
+      val defaultAction = Some("dat/createTriggerActionsFromKey.js")
       val defaultActionName = s"helloKafka-${currentTime}"
 
       assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
@@ -227,6 +148,16 @@ class MessageHubFeedTests
         rule.create(name, trigger = triggerName, action = defaultActionName)
       }
 
+      val verificationName1 = s"trigger1-$currentTime"
+      val verificationName2 = s"trigger2-$currentTime"
+
+      assetHelper.withCleaner(wsk.trigger, verificationName1) { (trigger, name) =>
+        trigger.get(name, NOT_FOUND)
+      }
+      assetHelper.withCleaner(wsk.trigger, verificationName2) { (trigger, name) =>
+        trigger.get(name, NOT_FOUND)
+      }
+
       // It takes a moment for the consumer to fully initialize.
       println("Giving the consumer a moment to get ready")
       Thread.sleep(consumerInitTime)
@@ -235,27 +166,14 @@ class MessageHubFeedTests
       // This should ensure that the feed fires these as two separate triggers.
       println("Rapidly producing two large messages")
       val producer = kafkaUtils.createProducer()
-      val firstMessage = new ProducerRecord(topic, "key", generateMessage(s"first${currentTime}", testPayloadSize))
-      val secondMessage = new ProducerRecord(topic, "key", generateMessage(s"second${currentTime}", testPayloadSize))
+      val firstMessage = new ProducerRecord(topic, verificationName1, generateMessage(s"first${currentTime}", testPayloadSize))
+      val secondMessage = new ProducerRecord(topic, verificationName2, generateMessage(s"second${currentTime}", testPayloadSize))
       producer.send(firstMessage)
       producer.send(secondMessage)
       producer.close()
 
-      retry({
-        // verify there are two trigger activations required to handle these messages
-        println("Polling for activations")
-        val activations = wsk.activation.pollFor(N = 2, Some(triggerName), retries = maxRetries)
-
-        println("Verifying activation content")
-        val matchingActivations = for {
-          id <- activations
-          activation = wsk.activation.waitForActivation(id)
-          if (activation.isRight && (activation.right.get.fields.get("response").toString.contains(s"first${currentTime}") ||
-            activation.right.get.fields.get("response").toString.contains(s"second${currentTime}")))
-        } yield activation.right.get
-
-        assert(matchingActivations.length == 2)
-      }, N = 3)
+      retry(wsk.trigger.get(verificationName1), 60, Some(1.second))
+      retry(wsk.trigger.get(verificationName2), 60, Some(1.second))
   }
 
   it should "not fire a trigger for a single oversized message" in withAssetCleaner(wskprops) {
@@ -279,6 +197,7 @@ class MessageHubFeedTests
         "isBinaryKey" -> false.toJson,
         "isBinaryValue" -> false.toJson))
 
+      val defaultAction = Some("dat/createTriggerActionsFromKey.js")
       val defaultActionName = s"helloKafka-${currentTime}"
 
       assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
@@ -288,30 +207,21 @@ class MessageHubFeedTests
         rule.create(name, trigger = triggerName, action = defaultActionName)
       }
 
+      val verificationName = s"trigger-$currentTime"
+
+      wsk.trigger.get(verificationName, NOT_FOUND)
+
       // It takes a moment for the consumer to fully initialize.
       println("Giving the consumer a moment to get ready")
       Thread.sleep(consumerInitTime)
 
       println("Producing an oversized message")
       val producer = kafkaUtils.createProducer()
-      val bigMessage = new ProducerRecord(topic, "key", generateMessage(s"${currentTime}", testPayloadSize))
+      val bigMessage = new ProducerRecord(topic, verificationName, generateMessage(s"${currentTime}", testPayloadSize))
       producer.send(bigMessage)
       producer.close()
 
-      retry({
-        // verify there are no activations that match
-        println("Polling for activations")
-        val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
-
-        println("Verifying activation content")
-        val matchingActivations = for {
-          id <- activations
-          activation = wsk.activation.waitForActivation(id)
-          if (activation.isRight && (activation.right.get.fields.get("response").toString.contains(s"first${currentTime}")))
-        } yield activation.right.get
-
-        assert(matchingActivations.isEmpty)
-      }, N = 3)
+      a[Exception] should be thrownBy retry(wsk.trigger.get(verificationName), 60, Some(1.second))
   }
 
   it should "reject trigger update without passing in any updatable parameters" in withAssetCleaner(wskprops) {
@@ -485,15 +395,22 @@ class MessageHubFeedTests
         "topic" -> topic.toJson
       ))
 
+      val defaultAction1 = Some("dat/createTriggerActions.js")
       val defaultActionName = s"helloKafka-${currentTime}"
 
       assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
-        action.create(name, defaultAction)
+        action.create(name, defaultAction1)
       }
       assetHelper.withCleaner(wsk.rule, "rule") { (rule, name) =>
         rule.create(name, trigger = triggerName, action = defaultActionName)
       }
 
+      val verificationName1 = s"trigger1-$currentTime"
+
+      assetHelper.withCleaner(wsk.trigger, verificationName1) { (trigger, name) =>
+        trigger.get(name, NOT_FOUND)
+      }
+
       println("Giving the consumer a moment to get ready")
       Thread.sleep(consumerInitTime)
 
@@ -504,12 +421,12 @@ class MessageHubFeedTests
         "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
         "topic" -> topic.toJson,
         "key" -> key.toJson,
-        "value" -> currentTime.toJson
+        "value" -> verificationName1.toJson
       ))) {
         _.response.success shouldBe true
       }
 
-      checkForActivations(1, triggerName, topic, key, currentTime)
+      retry(wsk.trigger.get(verificationName1), 60, Some(1.second))
 
       println("Updating trigger")
 
@@ -524,11 +441,18 @@ class MessageHubFeedTests
         _.response.success shouldBe true
       }
 
+      val verificationName2 = s"trigger2-$currentTime"
+
+      assetHelper.withCleaner(wsk.trigger, verificationName2) { (trigger, name) =>
+        trigger.get(name, NOT_FOUND)
+      }
+
+      val defaultAction2 = Some("dat/createTriggerActionsFromEncodedMessage.js")
+      wsk.action.create(defaultActionName, defaultAction2, update = true)
+
       println("Giving the consumer a moment to get ready")
       Thread.sleep(consumerInitTime)
 
-      val encodedCurrentTime = Base64.getEncoder.encodeToString(currentTime.getBytes(StandardCharsets.UTF_8))
-
       println("Producing a message")
       withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
         "user" -> kafkaUtils.getAsJson("user"),
@@ -536,12 +460,12 @@ class MessageHubFeedTests
         "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
         "topic" -> topic.toJson,
         "key" -> key.toJson,
-        "value" -> currentTime.toJson
+        "value" -> verificationName2.toJson
       ))) {
         _.response.success shouldBe true
       }
 
-      checkForActivations(2, triggerName, topic, key, encodedCurrentTime)
+      retry(wsk.trigger.get(verificationName2), 60, Some(1.second))
   }
 
   def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = {
@@ -557,34 +481,6 @@ class MessageHubFeedTests
     }
   }
 
-  def checkForActivations(numActivations: Int, triggerName: String, topic: String, key: String, value: String) = {
-    retry({
-      println("Polling for activations")
-      val activations = wsk.activation.pollFor(N = numActivations, Some(triggerName), retries = maxRetries)
-      assert(activations.nonEmpty)
-
-      println("Validating content of activation(s)")
-      val matchingActivations = for {
-        id <- activations
-        activation = wsk.activation.waitForActivation(id)
-        if (activation.isRight && activation.right.get.fields.get("response").toString.contains(value))
-      } yield activation.right.get
-
-      assert(matchingActivations.nonEmpty)
-
-      val activation = matchingActivations.head
-      activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
-
-      // assert that there exists a message in the activation which has the expected keys and values
-      val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = value)
-      assert(messages.length == 1)
-
-      val message = messages.head
-      message.getFieldPath("topic") shouldBe Some(topic.toJson)
-      message.getFieldPath("key") shouldBe Some(key.toJson)
-    }, N = 3)
-  }
-
   def generateMessage(prefix: String, size: Int): String = {
     val longString = Array.fill[String](size)("0").mkString("")
     s"${prefix}${longString}"