You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by du...@apache.org on 2018/02/07 18:41:27 UTC
[incubator-openwhisk-package-kafka] branch master updated: Fix
uncaught exception occurring when reading in a message containing
non-unicode bytes (#247)
This is an automated email from the ASF dual-hosted git repository.
dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git
The following commit(s) were added to refs/heads/master by this push:
new a88af2a Fix uncaught exception occurring when reading in a message containing non-unicode bytes (#247)
a88af2a is described below
commit a88af2ad4156a21c173bf9444f8f6f0aef5131d3
Author: Adnan Baruni <ab...@users.noreply.github.com>
AuthorDate: Wed Feb 7 12:41:25 2018 -0600
Fix uncaught exception occurring when reading in a message containing non-unicode bytes (#247)
* replace bad bytes when non-unicode passed in
* treat messages as strings when they can not be JSON parsed
---
README.md | 74 ++++++++++++++++++++++
provider/consumer.py | 10 ++-
.../system/packages/MessageHubFeedTests.scala | 12 ++--
.../system/packages/MessageHubProduceTests.scala | 8 +--
4 files changed, 93 insertions(+), 11 deletions(-)
diff --git a/README.md b/README.md
index ec475af..22cd60e 100644
--- a/README.md
+++ b/README.md
@@ -232,6 +232,80 @@ Here is an example of a batched trigger payload (please note the change in the *
}
```
+### Checking the status and configuration of a trigger
+The status and configuration of a feed trigger can be gotten using `wsk trigger get`.
+
+Example:
+```
+$ wsk trigger get myTopicTrigger
+```
+
+This response will contain a `result` object containing the status of the trigger along with configuration information
+
+e.g.
+
+```json
+{
+ "result": {
+ "config": {
+ "isBinaryKey": false,
+ "isBinaryValue": false,
+ "isJSONData": false,
+ "kafka_admin_url": ...,
+ "kafka_brokers_sasl": [
+ ...
+ ],
+ "user": ...,
+ "password": ...,
+ "topic": "myTopic",
+ "triggerName": "/myNamespace/myTopicTrigger"
+ },
+ "status": {
+ "active": true,
+ "dateChanged": 1517245917340,
+ "dateChangedISO": "2018-01-29T17:11:57Z"
+ }
+ }
+}
+```
+
+ Triggers may become inactive when certain exceptional behavior occurs. For example, there was an error firing the trigger or it was not possible to connect to the kafka brokers. When a trigger becomes inactive the status object will contain additional information as to the cause.
+
+ e.g
+
+ ```json
+{
+ "status": {
+ "active": false,
+ "dateChanged": 1517936358359,
+ "dateChangedISO": "2018-02-06T16:59:18Z",
+ "reason": {
+ "kind": "AUTO",
+ "message": "Automatically disabled trigger. Consumer was unable to connect to broker(s) after 30 attempts",
+ "statusCode": 403
+ }
+ }
+}
+```
+
+### Updating the configuration of a trigger
+It is possible to update a limited set of configuration parameters for a trigger. The updatable parameters are:
+- `isBinaryKey`
+- `isBinaryValue`
+- `isJSONData`
+
+These parameters can be updated using `wsk trigger update`
+
+Examples:
+
+```
+$ wsk trigger update myTopicTrigger -p isJSONData true
+```
+
+```
+$ wsk trigger update myTopicTrigger -p isJSONData false -p isBinaryKey true -p isBinaryValue
+```
+
### Producing messages to Message Hub
If you would like to use an OpenWhisk action to conveniently produce a message to Message Hub, you can use the `/messaging/messageHubProduce` action. This action takes the following parameters:
diff --git a/provider/consumer.py b/provider/consumer.py
index 75c5e9b..55735bd 100644
--- a/provider/consumer.py
+++ b/provider/consumer.py
@@ -446,6 +446,13 @@ class ConsumerProcess (Process):
return offsets
def __encodeMessageIfNeeded(self, value):
+ # let's make sure whatever data we're getting is utf-8 encoded
+ try:
+ value = value.encode('utf-8')
+ except UnicodeDecodeError:
+ logging.warn('[{}] Value contains non-unicode bytes. Replacing invalid bytes.'.format(self.trigger))
+ value = unicode(value, errors='replace').encode('utf-8')
+
if self.encodeValueAsJSON:
try:
parsed = json.loads(value)
@@ -454,6 +461,7 @@ class ConsumerProcess (Process):
except ValueError:
# no big deal, just return the original value
logging.warn('[{}] I was asked to encode a message as JSON, but I failed.'.format(self.trigger))
+ value = "\"{}\"".format(value)
pass
elif self.encodeValueAsBase64:
try:
@@ -486,7 +494,7 @@ class ConsumerProcess (Process):
if self.authErrors > self.maxAuthErrors:
logging.warning('[{}] Shutting down consumer and disabling trigger. Exceeded the allowable number of _AUTHENTICATION errors'.format(self.trigger))
self.setDesiredState(Consumer.State.Disabled)
- message = 'Automatically disabled trigger. Consumer failed to authenticate with broker(s) after more than 30 attempts with apikey {}:{}'.format(self.username, self.password)
+ message = 'Automatically disabled trigger. Consumer was unable to connect to broker(s) after 30 attempts'.format()
self.database.disableTrigger(self.trigger, 403, message)
def __on_assign(self, consumer, partitions):
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index 6341bbf..1fe18a1 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -167,8 +167,8 @@ class MessageHubFeedTests
}
println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 100, Some(triggerName), retries = maxRetries)
- assert(activations.length > 0)
+ val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
+ assert(activations.length == 1)
val matchingActivations = for {
id <- activations
@@ -236,7 +236,7 @@ class MessageHubFeedTests
// verify there are two trigger activations required to handle these messages
println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 100, Some(triggerName), retries = maxRetries)
+ val activations = wsk.activation.pollFor(N = 2, Some(triggerName), retries = maxRetries)
println("Verifying activation content")
val matchingActivations = for {
@@ -291,7 +291,7 @@ class MessageHubFeedTests
// verify there are no activations that match
println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 100, Some(triggerName), retries = maxRetries)
+ val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
println("Verifying activation content")
val matchingActivations = for {
@@ -470,8 +470,8 @@ class MessageHubFeedTests
def checkForActivations(triggerName: String, since: Instant, topic: String, key: String, value: String) = {
println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 100, Some(triggerName), since = Some(since), retries = maxRetries)
- assert(activations.length > 0)
+ val activations = wsk.activation.pollFor(N = 1, Some(triggerName), since = Some(since), retries = maxRetries)
+ assert(activations.length == 1)
println("Validating content of activation(s)")
val matchingActivations = for {
diff --git a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
index 6de9417..eb4bcd1 100644
--- a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
@@ -199,8 +199,8 @@ class MessageHubProduceTests
// verify trigger fired
println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 100, Some(triggerName), retries = maxRetries)
- assert(activations.length > 0)
+ val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
+ assert(activations.length == 1)
val matchingActivations = for {
id <- activations
@@ -268,8 +268,8 @@ class MessageHubProduceTests
// verify trigger fired
println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 100, Some(triggerName), retries = maxRetries)
- assert(activations.length > 0)
+ val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
+ assert(activations.length == 1)
val matchingActivations = for {
id <- activations
--
To stop receiving notification emails like this one, please contact
dubeejw@apache.org.