You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cs...@apache.org on 2017/07/21 18:50:49 UTC
[incubator-openwhisk-package-kafka] branch master updated: Block
until message is sent, or timeout (#204)
This is an automated email from the ASF dual-hosted git repository.
csantanapr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git
The following commit(s) were added to refs/heads/master by this push:
new 74d0447 Block until message is sent, or timeout (#204)
74d0447 is described below
commit 74d0447d412ff89fb42b4f43815925053b731d06
Author: Justin Berstler <bj...@us.ibm.com>
AuthorDate: Fri Jul 21 14:50:44 2017 -0400
Block until message is sent, or timeout (#204)
As it turns out, flush() only waits for the message to be put on the network, and does not wait for any required acks.
---
action/messageHubProduce.py | 16 +++++++++++-----
.../scala/system/packages/MessageHubProduceTests.scala | 2 +-
2 files changed, 12 insertions(+), 6 deletions(-)
diff --git a/action/messageHubProduce.py b/action/messageHubProduce.py
index 162350f..d78547c 100755
--- a/action/messageHubProduce.py
+++ b/action/messageHubProduce.py
@@ -21,7 +21,7 @@
import ssl
import base64
from kafka import KafkaProducer
-from kafka.errors import NoBrokersAvailable
+from kafka.errors import NoBrokersAvailable, KafkaTimeoutError
def main(params):
@@ -44,6 +44,8 @@ def main(params):
api_version=(0, 10),
api_version_auto_timeout_ms=15000,
bootstrap_servers=validatedParams['kafka_brokers_sasl'],
+ max_block_ms=20000,
+ request_timeout_ms=20000,
sasl_plain_username=validatedParams['user'],
sasl_plain_password=validatedParams['password'],
security_protocol=security_protocol,
@@ -55,13 +57,17 @@ def main(params):
# only use the key parameter if it is present
if 'key' in validatedParams:
messageKey = validatedParams['key']
- producer.send(validatedParams['topic'], bytes(validatedParams['value'], 'utf-8'), key=bytes(messageKey, 'utf-8'))
+ future = producer.send(validatedParams['topic'], bytes(validatedParams['value'], 'utf-8'), key=bytes(messageKey, 'utf-8'))
else:
- producer.send(validatedParams['topic'], bytes(validatedParams['value'], 'utf-8'))
+ future = producer.send(validatedParams['topic'], bytes(validatedParams['value'], 'utf-8'))
- producer.flush()
+ sent = future.get(timeout=30)
+ msg = "Successfully sent message to {}:{} at offset {}".format(
+ sent.topic, sent.partition, sent.offset)
- print("Sent message")
+ print(msg)
+ except KafkaTimeoutError:
+ return {'error': 'Timed out communicating with Message Hub.'}
except NoBrokersAvailable:
# this exception's message is a little too generic
return {'error': 'No brokers available. Check that your supplied brokers are correct and available.'}
diff --git a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
index 670bc98..4dfc893 100644
--- a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
@@ -120,7 +120,7 @@ class MessageHubProduceTests
withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", badBrokerParams)) {
activation =>
activation.response.success shouldBe false
- activation.response.result.get.toString should include("No brokers available")
+ activation.response.result.get.toString should include("Timed out communicating with Message Hub")
}
}
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].