You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by du...@apache.org on 2018/02/21 17:27:28 UTC

[incubator-openwhisk-package-kafka] branch master updated: Remove error callback (#250)

This is an automated email from the ASF dual-hosted git repository.

dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new 57bf7b2  Remove error callback (#250)
57bf7b2 is described below

commit 57bf7b2625d4ffedf897f52651a7257a7d5a1553
Author: Adnan Baruni <ab...@users.noreply.github.com>
AuthorDate: Wed Feb 21 11:27:26 2018 -0600

    Remove error callback (#250)
    
    * remove error callback that disables valid consumers
    
    * removed unused variables
---
 provider/consumer.py | 29 +----------------------------
 1 file changed, 1 insertion(+), 28 deletions(-)

diff --git a/provider/consumer.py b/provider/consumer.py
index 55735bd..a8427d0 100644
--- a/provider/consumer.py
+++ b/provider/consumer.py
@@ -131,20 +131,6 @@ class ConsumerProcess (Process):
         self.brokers = params["brokers"]
         self.topic = params["topic"]
 
-        self.authErrors = 0
-
-        # We want to account for the number of brokers when deciding the maximum number
-        # number of auth errors to allow
-        self.maxAuthErrors = len(self.brokers) * 30
-
-        # There is the possibility of being disconnected from one or more brokers while
-        # still maintaining a connection to one or more others. We'll use this flag to
-        # signal when we have been disconnected from all brokers. Value will be set to
-        # 'True' when we have received a partition assignment and 'False' when our
-        # partition assignment has been revoked. When disconnected we will begin to
-        # increment the 'authErrors' counter.
-        self.connected = False
-
         self.sharedDictionary = sharedDictionary
 
         if 'status' in params and params['status']['active'] == False:
@@ -267,8 +253,7 @@ class ConsumerProcess (Process):
                         'group.id': self.trigger,
                         'default.topic.config': {'auto.offset.reset': 'latest'},
                         'enable.auto.commit': False,
-                        'api.version.request': True,
-                        'error_cb': self.__error_callback
+                        'api.version.request': True
                     }
 
             if self.isMessageHub:
@@ -488,20 +473,8 @@ class ConsumerProcess (Process):
         logging.debug('[{}] Returning un-encoded message'.format(self.trigger))
         return key
 
-    def __error_callback(self, error):
-        if not self.connected and error.code() == KafkaError._AUTHENTICATION:
-            self.authErrors = self.authErrors + 1
-            if self.authErrors > self.maxAuthErrors:
-                logging.warning('[{}] Shutting down consumer and disabling trigger. Exceeded the allowable number of _AUTHENTICATION errors'.format(self.trigger))
-                self.setDesiredState(Consumer.State.Disabled)
-                message = 'Automatically disabled trigger. Consumer was unable to connect to broker(s) after 30 attempts'.format()
-                self.database.disableTrigger(self.trigger, 403, message)
-
     def __on_assign(self, consumer, partitions):
         logging.info('[{}] Completed partition assignment. Connected to broker(s)'.format(self.trigger))
-        self.authErrors = 0
-        self.connected = True
 
     def __on_revoke(self, consumer, partitions):
         logging.info('[{}] Partition assignment has been revoked. Disconnected from broker(s)'.format(self.trigger))
-        self.connected = False

-- 
To stop receiving notification emails like this one, please contact
dubeejw@apache.org.