You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cs...@apache.org on 2017/07/21 18:50:49 UTC

[incubator-openwhisk-package-kafka] branch master updated: Block until message is sent, or timeout (#204)

This is an automated email from the ASF dual-hosted git repository.

csantanapr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new 74d0447  Block until message is sent, or timeout (#204)
74d0447 is described below

commit 74d0447d412ff89fb42b4f43815925053b731d06
Author: Justin Berstler <bj...@us.ibm.com>
AuthorDate: Fri Jul 21 14:50:44 2017 -0400

    Block until message is sent, or timeout (#204)
    
    As it turns out, flush() only waits for the message to be put on the network, and does not wait for any required acks.
---
 action/messageHubProduce.py                              | 16 +++++++++++-----
 .../scala/system/packages/MessageHubProduceTests.scala   |  2 +-
 2 files changed, 12 insertions(+), 6 deletions(-)

diff --git a/action/messageHubProduce.py b/action/messageHubProduce.py
index 162350f..d78547c 100755
--- a/action/messageHubProduce.py
+++ b/action/messageHubProduce.py
@@ -21,7 +21,7 @@
 import ssl
 import base64
 from kafka import KafkaProducer
-from kafka.errors import NoBrokersAvailable
+from kafka.errors import NoBrokersAvailable, KafkaTimeoutError
 
 
 def main(params):
@@ -44,6 +44,8 @@ def main(params):
             api_version=(0, 10),
             api_version_auto_timeout_ms=15000,
             bootstrap_servers=validatedParams['kafka_brokers_sasl'],
+            max_block_ms=20000,
+            request_timeout_ms=20000,
             sasl_plain_username=validatedParams['user'],
             sasl_plain_password=validatedParams['password'],
             security_protocol=security_protocol,
@@ -55,13 +57,17 @@ def main(params):
         # only use the key parameter if it is present
         if 'key' in validatedParams:
             messageKey = validatedParams['key']
-            producer.send(validatedParams['topic'], bytes(validatedParams['value'], 'utf-8'), key=bytes(messageKey, 'utf-8'))
+            future = producer.send(validatedParams['topic'], bytes(validatedParams['value'], 'utf-8'), key=bytes(messageKey, 'utf-8'))
         else:
-            producer.send(validatedParams['topic'], bytes(validatedParams['value'], 'utf-8'))
+            future = producer.send(validatedParams['topic'], bytes(validatedParams['value'], 'utf-8'))
 
-        producer.flush()
+        sent = future.get(timeout=30)
+        msg = "Successfully sent message to {}:{} at offset {}".format(
+            sent.topic, sent.partition, sent.offset)
 
-        print("Sent message")
+        print(msg)
+    except KafkaTimeoutError:
+        return {'error': 'Timed out communicating with Message Hub.'}
     except NoBrokersAvailable:
         # this exception's message is a little too generic
         return {'error': 'No brokers available. Check that your supplied brokers are correct and available.'}
diff --git a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
index 670bc98..4dfc893 100644
--- a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
@@ -120,7 +120,7 @@ class MessageHubProduceTests
         withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", badBrokerParams)) {
             activation =>
                 activation.response.success shouldBe false
-                activation.response.result.get.toString should include("No brokers available")
+                activation.response.result.get.toString should include("Timed out communicating with Message Hub")
         }
     }
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].