You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by du...@apache.org on 2018/03/06 03:58:22 UTC
[incubator-openwhisk-package-kafka] branch master updated: check
for 204 before attempting to fetch response json (#255)
This is an automated email from the ASF dual-hosted git repository.
dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git
The following commit(s) were added to refs/heads/master by this push:
new 27777f8 check for 204 before attempting to fetch response json (#255)
27777f8 is described below
commit 27777f856158626a8a954282e9a795e8f6adfd2c
Author: Adnan Baruni <ab...@users.noreply.github.com>
AuthorDate: Mon Mar 5 21:58:20 2018 -0600
check for 204 before attempting to fetch response json (#255)
* check for 204 before attempting to fetch response json
* make test more robust
---
provider/consumer.py | 12 ++++++++----
.../src/test/scala/system/packages/MessageHubFeedTests.scala | 12 ++++--------
2 files changed, 12 insertions(+), 12 deletions(-)
diff --git a/provider/consumer.py b/provider/consumer.py
index a8427d0..f9aaef2 100644
--- a/provider/consumer.py
+++ b/provider/consumer.py
@@ -354,11 +354,15 @@ class ConsumerProcess (Process):
# Manually commit offset if the trigger was fired successfully. Retry firing the trigger
# for a select set of status codes
if status_code in range(200, 300):
- response_json = response.json()
- if 'activationId' in response_json and response_json['activationId'] is not None:
- logging.info("[{}] Fired trigger with activation {}".format(self.trigger, response_json['activationId']))
- else:
+ if status_code == 204:
logging.info("[{}] Successfully fired trigger".format(self.trigger))
+ else:
+ response_json = response.json()
+ if 'activationId' in response_json and response_json['activationId'] is not None:
+ logging.info("[{}] Fired trigger with activation {}".format(self.trigger, response_json['activationId']))
+ else:
+ logging.info("[{}] Successfully fired trigger".format(self.trigger))
+ # the consumer may have consumed messages that did not make it into the messages array.
# the consumer may have consumed messages that did not make it into the messages array.
# be sure to only commit to the messages that were actually fired.
self.consumer.commit(offsets=self.__getOffsetList(messages), async=False)
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index 9cd1e2f..1b8db83 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -44,7 +44,6 @@ import ActionHelper._
import java.util.Base64
import java.nio.charset.StandardCharsets
-import java.time.{Clock, Instant}
import whisk.utils.retry
@@ -414,8 +413,6 @@ class MessageHubFeedTests
println("Giving the consumer a moment to get ready")
Thread.sleep(consumerInitTime)
- val first = Instant.now(Clock.systemUTC())
-
println("Producing a message")
withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
"user" -> kafkaUtils.getAsJson("user"),
@@ -428,7 +425,7 @@ class MessageHubFeedTests
_.response.success shouldBe true
}
- checkForActivations(triggerName, first, topic, key, currentTime)
+ checkForActivations(1, triggerName, topic, key, currentTime)
println("Updating trigger")
@@ -446,7 +443,6 @@ class MessageHubFeedTests
println("Giving the consumer a moment to get ready")
Thread.sleep(consumerInitTime)
- val second = Instant.now(Clock.systemUTC())
val encodedCurrentTime = Base64.getEncoder.encodeToString(currentTime.getBytes(StandardCharsets.UTF_8))
println("Producing a message")
@@ -461,7 +457,7 @@ class MessageHubFeedTests
_.response.success shouldBe true
}
- checkForActivations(triggerName, second, topic, key, encodedCurrentTime)
+ checkForActivations(2, triggerName, topic, key, encodedCurrentTime)
}
def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = {
@@ -477,10 +473,10 @@ class MessageHubFeedTests
}
}
- def checkForActivations(triggerName: String, since: Instant, topic: String, key: String, value: String) = {
+ def checkForActivations(numActivations: Int, triggerName: String, topic: String, key: String, value: String) = {
retry({
println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 1, Some(triggerName), since = Some(since), retries = maxRetries)
+ val activations = wsk.activation.pollFor(N = numActivations, Some(triggerName), retries = maxRetries)
assert(activations.nonEmpty)
println("Validating content of activation(s)")
--
To stop receiving notification emails like this one, please contact
dubeejw@apache.org.