You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by gi...@git.apache.org on 2017/08/03 20:26:21 UTC

[GitHub] dnwe commented on a change in pull request #202: Produce action fixes

dnwe commented on a change in pull request #202: Produce action fixes
URL: https://github.com/apache/incubator-openwhisk-package-kafka/pull/202#discussion_r131250004
 
 

 ##########
 File path: action/messageHubProduce.py
 ##########
 @@ -108,3 +147,64 @@ def validateParams(params):
             return (False, "key parameter is not Base64 encoded")
 
     return (True, validatedParams)
+
+def getProducer(validatedParams):
+    connectionHash = getConnectionHash(validatedParams)
+
+    if globals().get("cached_producers") is None:
+        print("dictionary was none")
+        globals()["cached_producers"] = dict()
+
+    # remove arbitrary connection to make room for new one
+    if len(globals()["cached_producers"]) == max_cached_producers:
+        poppedProducer = globals()["cached_producers"].popitem()[1]
+        poppedProducer.close(timeout=1)
+        print("Removed cached producer")
+
+    if connectionHash not in globals()["cached_producers"]:
+        print("cache miss")
+        # create a new connection
+        sasl_mechanism = 'PLAIN'
+        security_protocol = 'SASL_SSL'
+
+        # Create a new context using system defaults, disable all but TLS1.2
+        context = ssl.create_default_context()
+        context.options &= ssl.OP_NO_TLSv1
+        context.options &= ssl.OP_NO_TLSv1_1
+
+        producer = KafkaProducer(
+            api_version=(0, 10),
+            batch_size=0,
+            bootstrap_servers=validatedParams['kafka_brokers_sasl'],
+            max_block_ms=15000,
+            request_timeout_ms=15000,
+            sasl_plain_username=validatedParams['user'],
+            sasl_plain_password=validatedParams['password'],
+            security_protocol=security_protocol,
+            ssl_context=context,
+            sasl_mechanism=sasl_mechanism
+        )
+
+        print("Created producer")
+
+        # store the producer globally for subsequent invocations
+        globals()["cached_producers"][connectionHash] = producer
+
+        # return it
+        return producer
+    else:
+        print("Reusing existing producer")
+        return globals()["cached_producers"][connectionHash]
+
+
+def getConnectionHash(params):
+    # always use the sorted brokers to combat the effects of shuffle()
+    brokers = params['kafka_brokers_sasl']
+    brokers.sort()
+    brokersString = ",".join(brokers)
+
+    apiKey = "{}:{}".format(params['user'], params['password'])
+
+    connectionHash = brokersString + apiKey
 
 Review comment:
   Technically the broker URLs are only used for bootstrapping to fetch the cluster info. The client discoverers all the available brokers in the response to that bootstrap, so your connectionHash can just be based on the apiKey as that will only be valid for a given cluster
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services