You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/02/07 18:41:29 UTC

[GitHub] dubeejw closed pull request #247: Fix uncaught exception occurring when reading in a message containing non-unicode bytes

dubeejw closed pull request #247: Fix uncaught exception occurring when reading in a message containing non-unicode bytes
URL: https://github.com/apache/incubator-openwhisk-package-kafka/pull/247
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/README.md b/README.md
index ec475af..22cd60e 100644
--- a/README.md
+++ b/README.md
@@ -232,6 +232,80 @@ Here is an example of a batched trigger payload (please note the change in the *
  }
  ```
 
+### Checking the status and configuration of a trigger
+The status and configuration of a feed trigger can be gotten using `wsk trigger get`.
+
+Example:
+```
+$ wsk trigger get myTopicTrigger
+```
+
+This response will contain a `result` object containing the status of the trigger along with configuration information
+
+e.g.
+
+```json
+{
+  "result": {
+      "config": {
+          "isBinaryKey": false,
+          "isBinaryValue": false,
+          "isJSONData": false,
+          "kafka_admin_url": ...,
+          "kafka_brokers_sasl": [
+              ...
+          ],
+          "user": ...,
+          "password": ...,
+          "topic": "myTopic",
+          "triggerName": "/myNamespace/myTopicTrigger"
+      },
+      "status": {
+          "active": true,
+          "dateChanged": 1517245917340,
+          "dateChangedISO": "2018-01-29T17:11:57Z"
+      }
+  }
+}
+```
+
+ Triggers may become inactive when certain exceptional behavior occurs. For example, there was an error firing the trigger or it was not possible to connect to the kafka brokers. When a trigger becomes inactive the status object will contain additional information as to the cause.
+
+ e.g
+
+ ```json
+{
+    "status": {
+        "active": false,
+        "dateChanged": 1517936358359,
+        "dateChangedISO": "2018-02-06T16:59:18Z",
+        "reason": {
+            "kind": "AUTO",
+            "message": "Automatically disabled trigger. Consumer was unable to connect to broker(s) after 30 attempts",
+            "statusCode": 403
+        }
+    }
+}
+```
+
+### Updating the configuration of a trigger
+It is possible to update a limited set of configuration parameters for a trigger. The updatable parameters are:
+- `isBinaryKey`
+- `isBinaryValue`
+- `isJSONData`
+
+These parameters can be updated using `wsk trigger update`
+
+Examples:
+
+```
+$ wsk trigger update myTopicTrigger -p isJSONData true
+```
+
+```
+$ wsk trigger update myTopicTrigger -p isJSONData false -p isBinaryKey true -p isBinaryValue
+```
+
 ### Producing messages to Message Hub
 If you would like to use an OpenWhisk action to conveniently produce a message to Message Hub, you can use the `/messaging/messageHubProduce` action. This action takes the following parameters:
 
diff --git a/provider/consumer.py b/provider/consumer.py
index 75c5e9b..55735bd 100644
--- a/provider/consumer.py
+++ b/provider/consumer.py
@@ -446,6 +446,13 @@ def __getOffsetList(self, messages):
         return offsets
 
     def __encodeMessageIfNeeded(self, value):
+        # let's make sure whatever data we're getting is utf-8 encoded
+        try:
+            value = value.encode('utf-8')
+        except UnicodeDecodeError:
+            logging.warn('[{}] Value contains non-unicode bytes. Replacing invalid bytes.'.format(self.trigger))
+            value = unicode(value, errors='replace').encode('utf-8')
+
         if self.encodeValueAsJSON:
             try:
                 parsed = json.loads(value)
@@ -454,6 +461,7 @@ def __encodeMessageIfNeeded(self, value):
             except ValueError:
                 # no big deal, just return the original value
                 logging.warn('[{}] I was asked to encode a message as JSON, but I failed.'.format(self.trigger))
+                value = "\"{}\"".format(value)
                 pass
         elif self.encodeValueAsBase64:
             try:
@@ -486,7 +494,7 @@ def __error_callback(self, error):
             if self.authErrors > self.maxAuthErrors:
                 logging.warning('[{}] Shutting down consumer and disabling trigger. Exceeded the allowable number of _AUTHENTICATION errors'.format(self.trigger))
                 self.setDesiredState(Consumer.State.Disabled)
-                message = 'Automatically disabled trigger. Consumer failed to authenticate with broker(s) after more than 30 attempts with apikey {}:{}'.format(self.username, self.password)
+                message = 'Automatically disabled trigger. Consumer was unable to connect to broker(s) after 30 attempts'.format()
                 self.database.disableTrigger(self.trigger, 403, message)
 
     def __on_assign(self, consumer, partitions):
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index 6341bbf..1fe18a1 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -167,8 +167,8 @@ class MessageHubFeedTests
       }
 
       println("Polling for activations")
-      val activations = wsk.activation.pollFor(N = 100, Some(triggerName), retries = maxRetries)
-      assert(activations.length > 0)
+      val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
+      assert(activations.length == 1)
 
       val matchingActivations = for {
         id <- activations
@@ -236,7 +236,7 @@ class MessageHubFeedTests
 
       // verify there are two trigger activations required to handle these messages
       println("Polling for activations")
-      val activations = wsk.activation.pollFor(N = 100, Some(triggerName), retries = maxRetries)
+      val activations = wsk.activation.pollFor(N = 2, Some(triggerName), retries = maxRetries)
 
       println("Verifying activation content")
       val matchingActivations = for {
@@ -291,7 +291,7 @@ class MessageHubFeedTests
 
       // verify there are no activations that match
       println("Polling for activations")
-      val activations = wsk.activation.pollFor(N = 100, Some(triggerName), retries = maxRetries)
+      val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
 
       println("Verifying activation content")
       val matchingActivations = for {
@@ -470,8 +470,8 @@ class MessageHubFeedTests
 
   def checkForActivations(triggerName: String, since: Instant, topic: String, key: String, value: String) = {
     println("Polling for activations")
-    val activations = wsk.activation.pollFor(N = 100, Some(triggerName), since = Some(since), retries = maxRetries)
-    assert(activations.length > 0)
+    val activations = wsk.activation.pollFor(N = 1, Some(triggerName), since = Some(since), retries = maxRetries)
+    assert(activations.length == 1)
 
     println("Validating content of activation(s)")
     val matchingActivations = for {
diff --git a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
index 6de9417..eb4bcd1 100644
--- a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
@@ -199,8 +199,8 @@ class MessageHubProduceTests
 
             // verify trigger fired
             println("Polling for activations")
-            val activations = wsk.activation.pollFor(N = 100, Some(triggerName), retries = maxRetries)
-            assert(activations.length > 0)
+            val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
+            assert(activations.length == 1)
 
             val matchingActivations = for {
                 id <- activations
@@ -268,8 +268,8 @@ class MessageHubProduceTests
 
             // verify trigger fired
             println("Polling for activations")
-            val activations = wsk.activation.pollFor(N = 100, Some(triggerName), retries = maxRetries)
-            assert(activations.length > 0)
+            val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
+            assert(activations.length == 1)
 
             val matchingActivations = for {
                 id <- activations


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services