You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cs...@apache.org on 2017/08/07 20:25:27 UTC

[incubator-openwhisk-package-kafka] branch master updated: Produce action fixes (#202)

This is an automated email from the ASF dual-hosted git repository.

csantanapr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new dc913fc  Produce action fixes (#202)
dc913fc is described below

commit dc913fc54d8ca7d6f28e06a4dc5ac448390f1040
Author: Justin Berstler <bj...@us.ibm.com>
AuthorDate: Mon Aug 7 16:25:25 2017 -0400

    Produce action fixes (#202)
    
    * Increase stability of the messageHubProduce action
    
    Tweak producer settings to be more efficient
    add retry behavior when the produce fails
    shuffle broker list to distribute the initial connection load
    Split produce into two phases
      1. Establish and verify a connection with Message Hub - this can be retried
      2. Produce the message - this can NOT be retried as it may result in duplicated messages
    
    attempt to cache/reuse producer
    
    This requires a slight change in the Python 3 action runtime that enables effective use of globals() across action invocations (under certain circumstances).
    
    Limit number of cached connections
    
    Arbitrarily set a limit of 10 cached connections
    If you try to cache an 11th connection, arbitrarily remove one of the existing connections
    
    * adjust for review feedback
    
    * Make timeout values based on the invocation time remaining
    
    Utilize __OW_DEADLINE to figure out when the action will be killed by the system
    Use this value to determine appropriate timeout values for various aspects of the produce action
    Reserve at least 10 seconds for the .send() call
    Set a producer timeout that allows for three retries (minus the reserved 10 seconds)
---
 action/kafkaProduce.py                             | 166 +++++++++++++++---
 action/messageHubProduce.py                        | 194 ++++++++++++++++-----
 .../system/packages/MessageHubProduceTests.scala   |   4 +-
 3 files changed, 297 insertions(+), 67 deletions(-)

diff --git a/action/kafkaProduce.py b/action/kafkaProduce.py
index 96070e1..27e177c 100644
--- a/action/kafkaProduce.py
+++ b/action/kafkaProduce.py
@@ -18,46 +18,105 @@
  */
 """
 
-import ssl
 import base64
+import logging
+import math
+import os
+import sys
+import time
+import traceback
+
 from kafka import KafkaProducer
-from kafka.errors import NoBrokersAvailable
+from kafka.errors import NoBrokersAvailable, KafkaTimeoutError, AuthenticationFailedError
+from kafka.version import __version__
+from random import shuffle
+
 
+logging.basicConfig(stream=sys.stdout, level=logging.INFO,
+        format='%(levelname)-8s %(asctime)s %(message)s',
+        datefmt='[%H:%M:%S]')
+
+max_cached_producers = 10
 
 def main(params):
+    producer = None
+    logging.info("Using kafka-python %s", str(__version__))
+
+    logging.info("Validating parameters")
     validationResult = validateParams(params)
     if validationResult[0] != True:
         return {'error': validationResult[1]}
     else:
         validatedParams = validationResult[1]
 
-    brokers = params['brokers']
-
-    try:
-        producer = KafkaProducer(
-            api_version_auto_timeout_ms=15000,
-            bootstrap_servers=brokers)
-
-        print("Created producer")
+    attempt = 0
+    max_attempts = 3
 
-        # only use the key parameter if it is present
-        if 'key' in validatedParams:
-            messageKey = validatedParams['key']
-            producer.send(validatedParams['topic'], bytes(validatedParams['value'], 'utf-8'), key=bytes(messageKey, 'utf-8'))
-        else:
-            producer.send(validatedParams['topic'], bytes(validatedParams['value'], 'utf-8'))
+    result = {"success": True}
 
-        producer.flush()
+    while attempt < max_attempts:
+        attempt += 1
+        logging.info("Starting attempt {}".format(attempt))
 
-        print("Sent message")
-    except NoBrokersAvailable:
-        # this exception's message is a little too generic
+        try:
+            logging.info("Getting producer")
+
+            # set a client timeout that allows for 3 connection retries while still
+            # reserving 10s for the actual send
+            producer_timeout_ms = math.floor(getRemainingTime(reservedTime=10) / max_attempts * 1000)
+            producer = getProducer(validatedParams, producer_timeout_ms)
+
+            topic = validatedParams['topic']
+            logging.info("Finding topic {}".format(topic))
+            partition_info = producer.partitions_for(topic)
+            logging.info("Found topic {} with partition(s) {}".format(topic, partition_info))
+
+            break
+        except Exception as e:
+            if attempt == max_attempts:
+                producer = None
+                logging.warning(e)
+                traceback.print_exc(limit=5)
+                result = getResultForException(e)
+
+    # we successfully connected and found the topic metadata... let's send!
+    if producer is not None:
+        try:
+            logging.info("Producing message")
+
+            # only use the key parameter if it is present
+            value = validatedParams['value']
+            if 'key' in validatedParams:
+                messageKey = validatedParams['key']
+                future = producer.send(
+                    topic, bytes(value, 'utf-8'), key=bytes(messageKey, 'utf-8'))
+            else:
+                future = producer.send(topic, bytes(value, 'utf-8'))
+
+            # future should wait all of the remaining time
+            future_time_seconds = math.floor(getRemainingTime())
+            sent = future.get(timeout=future_time_seconds)
+            msg = "Successfully sent message to {}:{} at offset {}".format(
+                sent.topic, sent.partition, sent.offset)
+            logging.info(msg)
+            result = {"success": True, "message": msg}
+        except Exception as e:
+            logging.warning(e)
+            traceback.print_exc(limit=5)
+            result = getResultForException(e)
+
+    return result
+
+def getResultForException(e):
+    if isinstance(e, KafkaTimeoutError):
+        return {'error': 'Timed out communicating with Message Hub'}
+    elif isinstance(e, AuthenticationFailedError):
+        return {'error': 'Authentication failed'}
+    elif isinstance(e, NoBrokersAvailable):
         return {'error': 'No brokers available. Check that your supplied brokers are correct and available.'}
-    except Exception as e:
+    else:
         return {'error': '{}'.format(e)}
 
-    return {"success": True}
-
 
 def validateParams(params):
     validatedParams = params.copy()
@@ -71,6 +130,12 @@ def validateParams(params):
     if len(missingParams) > 0:
         return (False, "You must supply all of the following parameters: {}".format(', '.join(missingParams)))
 
+    if isinstance(params['brokers'], str):
+        # turn it into a List
+        validatedParams['brokers'] = params['brokers'].split(',')
+
+    shuffle(validatedParams['brokers'])
+
     if 'base64DecodeValue' in params and params['base64DecodeValue'] == True:
         try:
             validatedParams['value'] = base64.b64decode(params['value']).decode('utf-8')
@@ -90,3 +155,58 @@ def validateParams(params):
             return (False, "key parameter is not Base64 encoded")
 
     return (True, validatedParams)
+
+def getProducer(validatedParams, timeout_ms):
+    connectionHash = getConnectionHash(validatedParams)
+
+    if globals().get("cached_producers") is None:
+        logging.info("dictionary was None")
+        globals()["cached_producers"] = dict()
+
+    # remove arbitrary connection to make room for new one
+    if len(globals()["cached_producers"]) == max_cached_producers:
+        poppedProducer = globals()["cached_producers"].popitem()[1]
+        poppedProducer.close(timeout=1)
+        logging.info("Removed cached producer")
+
+    if connectionHash not in globals()["cached_producers"]:
+        logging.info("cache miss")
+        # create a new connection
+
+        producer = KafkaProducer(
+            api_version_auto_timeout_ms=15000,
+            batch_size=0,
+            bootstrap_servers=validatedParams['brokers'],
+            max_block_ms=timeout_ms,
+            request_timeout_ms=timeout_ms,
+        )
+
+        logging.info("Created producer")
+
+        # store the producer globally for subsequent invocations
+        globals()["cached_producers"][connectionHash] = producer
+
+        # return it
+        return producer
+    else:
+        logging.info("Reusing existing producer")
+        return globals()["cached_producers"][connectionHash]
+
+
+def getConnectionHash(params):
+    # always use the sorted brokers to combat the effects of shuffle()
+    brokers = params['kafka_brokers_sasl']
+    brokers.sort()
+    brokersString = ",".join(brokers)
+
+    return brokersString
+
+# return the remaining time (in seconds) until the action will expire,
+# optionally reserving some time (also in seconds).
+def getRemainingTime(reservedTime=0):
+    deadlineSeconds = int(os.getenv('__OW_DEADLINE', 60000)) / 1000
+    remaining = deadlineSeconds - time.time() - reservedTime
+
+    # ensure value is at least zero
+    # yes, this is a little paranoid
+    return max(remaining, 0)
diff --git a/action/messageHubProduce.py b/action/messageHubProduce.py
index d78547c..bdb658d 100755
--- a/action/messageHubProduce.py
+++ b/action/messageHubProduce.py
@@ -18,64 +18,106 @@
  */
 """
 
-import ssl
 import base64
+import logging
+import math
+import os
+import ssl
+import sys
+import time
+import traceback
+
 from kafka import KafkaProducer
-from kafka.errors import NoBrokersAvailable, KafkaTimeoutError
+from kafka.errors import NoBrokersAvailable, KafkaTimeoutError, AuthenticationFailedError
+from kafka.version import __version__
+from random import shuffle
+
+
+logging.basicConfig(stream=sys.stdout, level=logging.INFO,
+        format='%(levelname)-8s %(asctime)s %(message)s',
+        datefmt='[%H:%M:%S]')
 
+max_cached_producers = 10
 
 def main(params):
+    producer = None
+    logging.info("Using kafka-python %s", str(__version__))
+
+    logging.info("Validating parameters")
     validationResult = validateParams(params)
     if validationResult[0] != True:
         return {'error': validationResult[1]}
     else:
         validatedParams = validationResult[1]
 
-    sasl_mechanism = 'PLAIN'
-    security_protocol = 'SASL_SSL'
+    attempt = 0
+    max_attempts = 3
 
-    # Create a new context using system defaults, disable all but TLS1.2
-    context = ssl.create_default_context()
-    context.options &= ssl.OP_NO_TLSv1
-    context.options &= ssl.OP_NO_TLSv1_1
+    result = {"success": True}
 
-    try:
-        producer = KafkaProducer(
-            api_version=(0, 10),
-            api_version_auto_timeout_ms=15000,
-            bootstrap_servers=validatedParams['kafka_brokers_sasl'],
-            max_block_ms=20000,
-            request_timeout_ms=20000,
-            sasl_plain_username=validatedParams['user'],
-            sasl_plain_password=validatedParams['password'],
-            security_protocol=security_protocol,
-            ssl_context=context,
-            sasl_mechanism=sasl_mechanism)
-
-        print("Created producer")
-
-        # only use the key parameter if it is present
-        if 'key' in validatedParams:
-            messageKey = validatedParams['key']
-            future = producer.send(validatedParams['topic'], bytes(validatedParams['value'], 'utf-8'), key=bytes(messageKey, 'utf-8'))
-        else:
-            future = producer.send(validatedParams['topic'], bytes(validatedParams['value'], 'utf-8'))
-
-        sent = future.get(timeout=30)
-        msg = "Successfully sent message to {}:{} at offset {}".format(
-            sent.topic, sent.partition, sent.offset)
-
-        print(msg)
-    except KafkaTimeoutError:
-        return {'error': 'Timed out communicating with Message Hub.'}
-    except NoBrokersAvailable:
-        # this exception's message is a little too generic
+    while attempt < max_attempts:
+        attempt += 1
+        logging.info("Starting attempt {}".format(attempt))
+
+        try:
+            logging.info("Getting producer")
+
+            # set a client timeout that allows for 3 connection retries while still
+            # reserving 10s for the actual send
+            producer_timeout_ms = math.floor(getRemainingTime(reservedTime=10) / max_attempts * 1000)
+            producer = getProducer(validatedParams, producer_timeout_ms)
+
+            topic = validatedParams['topic']
+            logging.info("Finding topic {}".format(topic))
+            partition_info = producer.partitions_for(topic)
+            logging.info("Found topic {} with partition(s) {}".format(topic, partition_info))
+
+            break
+        except Exception as e:
+            if attempt == max_attempts:
+                producer = None
+                logging.warning(e)
+                traceback.print_exc(limit=5)
+                result = getResultForException(e)
+
+    # we successfully connected and found the topic metadata... let's send!
+    if producer is not None:
+        try:
+            logging.info("Producing message")
+
+            # only use the key parameter if it is present
+            value = validatedParams['value']
+            if 'key' in validatedParams:
+                messageKey = validatedParams['key']
+                future = producer.send(
+                    topic, bytes(value, 'utf-8'), key=bytes(messageKey, 'utf-8'))
+            else:
+                future = producer.send(topic, bytes(value, 'utf-8'))
+
+            # future should wait all of the remaining time
+            future_time_seconds = math.floor(getRemainingTime())
+            sent = future.get(timeout=future_time_seconds)
+            msg = "Successfully sent message to {}:{} at offset {}".format(
+                sent.topic, sent.partition, sent.offset)
+            logging.info(msg)
+            result = {"success": True, "message": msg}
+        except Exception as e:
+            logging.warning(e)
+            traceback.print_exc(limit=5)
+            result = getResultForException(e)
+
+    return result
+
+def getResultForException(e):
+    if isinstance(e, KafkaTimeoutError):
+        return {'error': 'Timed out communicating with Message Hub'}
+    elif isinstance(e, AuthenticationFailedError):
+        return {'error': 'Authentication failed'}
+    elif isinstance(e, NoBrokersAvailable):
         return {'error': 'No brokers available. Check that your supplied brokers are correct and available.'}
-    except Exception as e:
+    else:
         return {'error': '{}'.format(e)}
 
-    return {"success": True}
-
 
 def validateParams(params):
     validatedParams = params.copy()
@@ -89,6 +131,12 @@ def validateParams(params):
     if len(missingParams) > 0:
         return (False, "You must supply all of the following parameters: {}".format(', '.join(missingParams)))
 
+    if isinstance(params['kafka_brokers_sasl'], str):
+        # turn it into a List
+        validatedParams['kafka_brokers_sasl'] = params['kafka_brokers_sasl'].split(',')
+
+    shuffle(validatedParams['kafka_brokers_sasl'])
+
     if 'base64DecodeValue' in params and params['base64DecodeValue'] == True:
         try:
             validatedParams['value'] = base64.b64decode(params['value']).decode('utf-8')
@@ -108,3 +156,65 @@ def validateParams(params):
             return (False, "key parameter is not Base64 encoded")
 
     return (True, validatedParams)
+
+def getProducer(validatedParams, timeout_ms):
+    connectionHash = getConnectionHash(validatedParams)
+
+    if globals().get("cached_producers") is None:
+        logging.info("dictionary was None")
+        globals()["cached_producers"] = dict()
+
+    # remove arbitrary connection to make room for new one
+    if len(globals()["cached_producers"]) == max_cached_producers:
+        poppedProducer = globals()["cached_producers"].popitem()[1]
+        poppedProducer.close(timeout=1)
+        logging.info("Removed cached producer")
+
+    if connectionHash not in globals()["cached_producers"]:
+        logging.info("cache miss")
+        # create a new connection
+        sasl_mechanism = 'PLAIN'
+        security_protocol = 'SASL_SSL'
+
+        # Create a new context using system defaults, disable all but TLS1.2
+        context = ssl.create_default_context()
+        context.options &= ssl.OP_NO_TLSv1
+        context.options &= ssl.OP_NO_TLSv1_1
+
+        producer = KafkaProducer(
+            api_version=(0, 10),
+            batch_size=0,
+            bootstrap_servers=validatedParams['kafka_brokers_sasl'],
+            max_block_ms=timeout_ms,
+            request_timeout_ms=timeout_ms,
+            sasl_plain_username=validatedParams['user'],
+            sasl_plain_password=validatedParams['password'],
+            security_protocol=security_protocol,
+            ssl_context=context,
+            sasl_mechanism=sasl_mechanism
+        )
+
+        logging.info("Created producer")
+
+        # store the producer globally for subsequent invocations
+        globals()["cached_producers"][connectionHash] = producer
+
+        # return it
+        return producer
+    else:
+        logging.info("Reusing existing producer")
+        return globals()["cached_producers"][connectionHash]
+
+def getConnectionHash(params):
+    apiKey = "{}:{}".format(params['user'], params['password'])
+    return apiKey
+
+# return the remaining time (in seconds) until the action will expire,
+# optionally reserving some time (also in seconds).
+def getRemainingTime(reservedTime=0):
+    deadlineSeconds = int(os.getenv('__OW_DEADLINE', 60000)) / 1000
+    remaining = deadlineSeconds - time.time() - reservedTime
+
+    # ensure value is at least zero
+    # yes, this is a little paranoid
+    return max(remaining, 0)
diff --git a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
index 4dfc893..fb77547 100644
--- a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
@@ -107,7 +107,7 @@ class MessageHubProduceTests
     it should "Reject with bad credentials" in {
         val badAuthParams = validParameters + ("user" -> "ThisWillNeverWork".toJson)
 
-        withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", badAuthParams)) {
+        withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", badAuthParams), totalWait = 60 seconds) {
             activation =>
                 activation.response.success shouldBe false
                 activation.response.result.get.toString should include("Authentication failed")
@@ -117,7 +117,7 @@ class MessageHubProduceTests
     it should "Reject with bad broker list" in {
         val badBrokerParams = validParameters + ("kafka_brokers_sasl" -> List("localhost:0000").toJson)
 
-        withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", badBrokerParams)) {
+        withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", badBrokerParams), totalWait = 60 seconds) {
             activation =>
                 activation.response.success shouldBe false
                 activation.response.result.get.toString should include("Timed out communicating with Message Hub")

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].