You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by du...@apache.org on 2018/03/06 03:58:22 UTC

[incubator-openwhisk-package-kafka] branch master updated: check for 204 before attempting to fetch response json (#255)

This is an automated email from the ASF dual-hosted git repository.

dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new 27777f8  check for 204 before attempting to fetch response json (#255)
27777f8 is described below

commit 27777f856158626a8a954282e9a795e8f6adfd2c
Author: Adnan Baruni <ab...@users.noreply.github.com>
AuthorDate: Mon Mar 5 21:58:20 2018 -0600

    check for 204 before attempting to fetch response json (#255)
    
    * check for 204 before attempting to fetch response json
    
    * make test more robust
---
 provider/consumer.py                                         | 12 ++++++++----
 .../src/test/scala/system/packages/MessageHubFeedTests.scala | 12 ++++--------
 2 files changed, 12 insertions(+), 12 deletions(-)

diff --git a/provider/consumer.py b/provider/consumer.py
index a8427d0..f9aaef2 100644
--- a/provider/consumer.py
+++ b/provider/consumer.py
@@ -354,11 +354,15 @@ class ConsumerProcess (Process):
                     # Manually commit offset if the trigger was fired successfully. Retry firing the trigger
                     # for a select set of status codes
                     if status_code in range(200, 300):
-                        response_json = response.json()
-                        if 'activationId' in response_json and response_json['activationId'] is not None:
-                            logging.info("[{}] Fired trigger with activation {}".format(self.trigger, response_json['activationId']))
-                        else:
+                        if status_code == 204:
                             logging.info("[{}] Successfully fired trigger".format(self.trigger))
+                        else:
+                            response_json = response.json()
+                            if 'activationId' in response_json and response_json['activationId'] is not None:
+                                logging.info("[{}] Fired trigger with activation {}".format(self.trigger, response_json['activationId']))
+                            else:
+                                logging.info("[{}] Successfully fired trigger".format(self.trigger))
+                        # the consumer may have consumed messages that did not make it into the messages array.
                         # the consumer may have consumed messages that did not make it into the messages array.
                         # be sure to only commit to the messages that were actually fired.
                         self.consumer.commit(offsets=self.__getOffsetList(messages), async=False)
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index 9cd1e2f..1b8db83 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -44,7 +44,6 @@ import ActionHelper._
 
 import java.util.Base64
 import java.nio.charset.StandardCharsets
-import java.time.{Clock, Instant}
 
 import whisk.utils.retry
 
@@ -414,8 +413,6 @@ class MessageHubFeedTests
       println("Giving the consumer a moment to get ready")
       Thread.sleep(consumerInitTime)
 
-      val first = Instant.now(Clock.systemUTC())
-
       println("Producing a message")
       withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
         "user" -> kafkaUtils.getAsJson("user"),
@@ -428,7 +425,7 @@ class MessageHubFeedTests
         _.response.success shouldBe true
       }
 
-      checkForActivations(triggerName, first, topic, key, currentTime)
+      checkForActivations(1, triggerName, topic, key, currentTime)
 
       println("Updating trigger")
 
@@ -446,7 +443,6 @@ class MessageHubFeedTests
       println("Giving the consumer a moment to get ready")
       Thread.sleep(consumerInitTime)
 
-      val second = Instant.now(Clock.systemUTC())
       val encodedCurrentTime = Base64.getEncoder.encodeToString(currentTime.getBytes(StandardCharsets.UTF_8))
 
       println("Producing a message")
@@ -461,7 +457,7 @@ class MessageHubFeedTests
         _.response.success shouldBe true
       }
 
-      checkForActivations(triggerName, second, topic, key, encodedCurrentTime)
+      checkForActivations(2, triggerName, topic, key, encodedCurrentTime)
   }
 
   def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = {
@@ -477,10 +473,10 @@ class MessageHubFeedTests
     }
   }
 
-  def checkForActivations(triggerName: String, since: Instant, topic: String, key: String, value: String) = {
+  def checkForActivations(numActivations: Int, triggerName: String, topic: String, key: String, value: String) = {
     retry({
       println("Polling for activations")
-      val activations = wsk.activation.pollFor(N = 1, Some(triggerName), since = Some(since), retries = maxRetries)
+      val activations = wsk.activation.pollFor(N = numActivations, Some(triggerName), retries = maxRetries)
       assert(activations.nonEmpty)
 
       println("Validating content of activation(s)")

-- 
To stop receiving notification emails like this one, please contact
dubeejw@apache.org.