You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by du...@apache.org on 2018/02/07 18:41:27 UTC

[incubator-openwhisk-package-kafka] branch master updated: Fix uncaught exception occurring when reading in a message containing non-unicode bytes (#247)

This is an automated email from the ASF dual-hosted git repository.

dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new a88af2a  Fix uncaught exception occurring when reading in a message containing non-unicode bytes (#247)
a88af2a is described below

commit a88af2ad4156a21c173bf9444f8f6f0aef5131d3
Author: Adnan Baruni <ab...@users.noreply.github.com>
AuthorDate: Wed Feb 7 12:41:25 2018 -0600

    Fix uncaught exception occurring when reading in a message containing non-unicode bytes (#247)
    
    * replace bad bytes when non-unicode passed in
    
    * treat messages as strings when they can not be JSON parsed
---
 README.md                                          | 74 ++++++++++++++++++++++
 provider/consumer.py                               | 10 ++-
 .../system/packages/MessageHubFeedTests.scala      | 12 ++--
 .../system/packages/MessageHubProduceTests.scala   |  8 +--
 4 files changed, 93 insertions(+), 11 deletions(-)

diff --git a/README.md b/README.md
index ec475af..22cd60e 100644
--- a/README.md
+++ b/README.md
@@ -232,6 +232,80 @@ Here is an example of a batched trigger payload (please note the change in the *
  }
  ```
 
+### Checking the status and configuration of a trigger
+The status and configuration of a feed trigger can be gotten using `wsk trigger get`.
+
+Example:
+```
+$ wsk trigger get myTopicTrigger
+```
+
+This response will contain a `result` object containing the status of the trigger along with configuration information
+
+e.g.
+
+```json
+{
+  "result": {
+      "config": {
+          "isBinaryKey": false,
+          "isBinaryValue": false,
+          "isJSONData": false,
+          "kafka_admin_url": ...,
+          "kafka_brokers_sasl": [
+              ...
+          ],
+          "user": ...,
+          "password": ...,
+          "topic": "myTopic",
+          "triggerName": "/myNamespace/myTopicTrigger"
+      },
+      "status": {
+          "active": true,
+          "dateChanged": 1517245917340,
+          "dateChangedISO": "2018-01-29T17:11:57Z"
+      }
+  }
+}
+```
+
+ Triggers may become inactive when certain exceptional behavior occurs. For example, there was an error firing the trigger or it was not possible to connect to the kafka brokers. When a trigger becomes inactive the status object will contain additional information as to the cause.
+
+ e.g
+
+ ```json
+{
+    "status": {
+        "active": false,
+        "dateChanged": 1517936358359,
+        "dateChangedISO": "2018-02-06T16:59:18Z",
+        "reason": {
+            "kind": "AUTO",
+            "message": "Automatically disabled trigger. Consumer was unable to connect to broker(s) after 30 attempts",
+            "statusCode": 403
+        }
+    }
+}
+```
+
+### Updating the configuration of a trigger
+It is possible to update a limited set of configuration parameters for a trigger. The updatable parameters are:
+- `isBinaryKey`
+- `isBinaryValue`
+- `isJSONData`
+
+These parameters can be updated using `wsk trigger update`
+
+Examples:
+
+```
+$ wsk trigger update myTopicTrigger -p isJSONData true
+```
+
+```
+$ wsk trigger update myTopicTrigger -p isJSONData false -p isBinaryKey true -p isBinaryValue
+```
+
 ### Producing messages to Message Hub
 If you would like to use an OpenWhisk action to conveniently produce a message to Message Hub, you can use the `/messaging/messageHubProduce` action. This action takes the following parameters:
 
diff --git a/provider/consumer.py b/provider/consumer.py
index 75c5e9b..55735bd 100644
--- a/provider/consumer.py
+++ b/provider/consumer.py
@@ -446,6 +446,13 @@ class ConsumerProcess (Process):
         return offsets
 
     def __encodeMessageIfNeeded(self, value):
+        # let's make sure whatever data we're getting is utf-8 encoded
+        try:
+            value = value.encode('utf-8')
+        except UnicodeDecodeError:
+            logging.warn('[{}] Value contains non-unicode bytes. Replacing invalid bytes.'.format(self.trigger))
+            value = unicode(value, errors='replace').encode('utf-8')
+
         if self.encodeValueAsJSON:
             try:
                 parsed = json.loads(value)
@@ -454,6 +461,7 @@ class ConsumerProcess (Process):
             except ValueError:
                 # no big deal, just return the original value
                 logging.warn('[{}] I was asked to encode a message as JSON, but I failed.'.format(self.trigger))
+                value = "\"{}\"".format(value)
                 pass
         elif self.encodeValueAsBase64:
             try:
@@ -486,7 +494,7 @@ class ConsumerProcess (Process):
             if self.authErrors > self.maxAuthErrors:
                 logging.warning('[{}] Shutting down consumer and disabling trigger. Exceeded the allowable number of _AUTHENTICATION errors'.format(self.trigger))
                 self.setDesiredState(Consumer.State.Disabled)
-                message = 'Automatically disabled trigger. Consumer failed to authenticate with broker(s) after more than 30 attempts with apikey {}:{}'.format(self.username, self.password)
+                message = 'Automatically disabled trigger. Consumer was unable to connect to broker(s) after 30 attempts'.format()
                 self.database.disableTrigger(self.trigger, 403, message)
 
     def __on_assign(self, consumer, partitions):
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index 6341bbf..1fe18a1 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -167,8 +167,8 @@ class MessageHubFeedTests
       }
 
       println("Polling for activations")
-      val activations = wsk.activation.pollFor(N = 100, Some(triggerName), retries = maxRetries)
-      assert(activations.length > 0)
+      val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
+      assert(activations.length == 1)
 
       val matchingActivations = for {
         id <- activations
@@ -236,7 +236,7 @@ class MessageHubFeedTests
 
       // verify there are two trigger activations required to handle these messages
       println("Polling for activations")
-      val activations = wsk.activation.pollFor(N = 100, Some(triggerName), retries = maxRetries)
+      val activations = wsk.activation.pollFor(N = 2, Some(triggerName), retries = maxRetries)
 
       println("Verifying activation content")
       val matchingActivations = for {
@@ -291,7 +291,7 @@ class MessageHubFeedTests
 
       // verify there are no activations that match
       println("Polling for activations")
-      val activations = wsk.activation.pollFor(N = 100, Some(triggerName), retries = maxRetries)
+      val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
 
       println("Verifying activation content")
       val matchingActivations = for {
@@ -470,8 +470,8 @@ class MessageHubFeedTests
 
   def checkForActivations(triggerName: String, since: Instant, topic: String, key: String, value: String) = {
     println("Polling for activations")
-    val activations = wsk.activation.pollFor(N = 100, Some(triggerName), since = Some(since), retries = maxRetries)
-    assert(activations.length > 0)
+    val activations = wsk.activation.pollFor(N = 1, Some(triggerName), since = Some(since), retries = maxRetries)
+    assert(activations.length == 1)
 
     println("Validating content of activation(s)")
     val matchingActivations = for {
diff --git a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
index 6de9417..eb4bcd1 100644
--- a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
@@ -199,8 +199,8 @@ class MessageHubProduceTests
 
             // verify trigger fired
             println("Polling for activations")
-            val activations = wsk.activation.pollFor(N = 100, Some(triggerName), retries = maxRetries)
-            assert(activations.length > 0)
+            val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
+            assert(activations.length == 1)
 
             val matchingActivations = for {
                 id <- activations
@@ -268,8 +268,8 @@ class MessageHubProduceTests
 
             // verify trigger fired
             println("Polling for activations")
-            val activations = wsk.activation.pollFor(N = 100, Some(triggerName), retries = maxRetries)
-            assert(activations.length > 0)
+            val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
+            assert(activations.length == 1)
 
             val matchingActivations = for {
                 id <- activations

-- 
To stop receiving notification emails like this one, please contact
dubeejw@apache.org.