You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by gi...@git.apache.org on 2017/06/26 16:47:10 UTC

[GitHub] bjustin-ibm commented on a change in pull request #186: automatically monitor and restore the health of the DB changes feed

bjustin-ibm commented on a change in pull request #186: automatically monitor and restore the health of the DB changes feed
URL: https://github.com/apache/incubator-openwhisk-package-kafka/pull/186#discussion_r124060782
 
 

 ##########
 File path: provider/service.py
 ##########
 @@ -13,62 +13,115 @@
 # limitations under the License.
 
 import logging
+import time
 
 from consumer import Consumer
 from database import Database
+from datetime import datetime
+from datetimeutils import secondsSince
 from threading import Thread
 
+# How often to produce canary documents
+canaryInterval = 60 # seconds
+
+# How long to wait between detecting carnary documents before restarting the
+# DB changes feed. Should be significantly larger than canaryInterval to allow for
+# the roundtrip to DB as well as to let the Service handle other work in the
+# meantime.
+canaryTimeout = 90 # seconds
+
+# How long the changes feed should poll before timing out
+changesFeedTimeout = 10 # seconds
+
 class Service (Thread):
     def __init__(self, consumers):
         Thread.__init__(self)
         self.daemon = True
 
-        self.changes = Database().changesFeed()
+        self.lastCanaryTime = datetime.now()
+
+        self.database = Database()
+        self.changes = self.database.changesFeed(timeout=changesFeedTimeout)
+        self.lastSequence = None
+        self.canaryGenerator = CanaryDocumentGenerator()
+
         self.consumers = consumers
 
     def run(self):
+        self.canaryGenerator.start()
+
         while True:
             for change in self.changes:
-                if "deleted" in change and change["deleted"] == True:
-                    logging.info('[changes] Found a delete')
-                    consumer = self.consumers.getConsumerForTrigger(change['id'])
-                    if consumer != None:
-                        if consumer.desiredState() == Consumer.State.Disabled:
-                            # just remove it from memory
-                            logging.info('[{}] Removing disabled trigger'.format(consumer.trigger))
-                            self.consumers.removeConsumerForTrigger(consumer.trigger)
-                        else:
-                            logging.info('[{}] Shutting down running trigger'.format(consumer.trigger))
-                            consumer.shutdown()
-                # since we can't use a filter function for the feed (then
-                # you don't get deletes) we need to manually verify this
-                # is a valid trigger doc that has changed
-                elif 'triggerURL' in change['doc']:
-                    logging.info('[changes] Found a change in a trigger document')
-                    document = change['doc']
-
-                    if not self.consumers.hasConsumerForTrigger(change["id"]):
-                        logging.info('[{}] Found a new trigger to create'.format(change["id"]))
-                        self.createAndRunConsumer(document)
-                    else:
-                        logging.info('[{}] Found a change to an existing trigger'.format(change["id"]))
-                        existingConsumer = self.consumers.getConsumerForTrigger(change["id"])
+                # change could be None because the changes feed will timeout
+                # if it hasn't detected any changes. This timeout allows us to
+                # check whether or not the feed is capable of detecting canary
+                # documents
+                if change != None:
+                    # Record the sequence in case the changes feed needs to be
+                    # restarted. This way the new feed can pick up right where
+                    # the old one left off.
+                    self.lastSequence = change['seq']
+
+                    if "deleted" in change and change["deleted"] == True:
+                        logging.info('[changes] Found a delete')
+                        consumer = self.consumers.getConsumerForTrigger(change['id'])
+                        if consumer != None:
+                            if consumer.desiredState() == Consumer.State.Disabled:
+                                # just remove it from memory
+                                logging.info('[{}] Removing disabled trigger'.format(consumer.trigger))
+                                self.consumers.removeConsumerForTrigger(consumer.trigger)
+                            else:
+                                logging.info('[{}] Shutting down running trigger'.format(consumer.trigger))
+                                consumer.shutdown()
+                    # since we can't use a filter function for the feed (then
+                    # you don't get deletes) we need to manually verify this
+                    # is a valid trigger doc that has changed
+                    elif 'triggerURL' in change['doc']:
+                        logging.info('[changes] Found a change in a trigger document')
+                        document = change['doc']
 
-                        if existingConsumer.desiredState() == Consumer.State.Disabled and self.__isTriggerDocActive(document):
-                            # disabled trigger has become active
-                            logging.info('[{}] Existing disabled trigger should become active'.format(change["id"]))
+                        if not self.consumers.hasConsumerForTrigger(change["id"]):
+                            logging.info('[{}] Found a new trigger to create'.format(change["id"]))
                             self.createAndRunConsumer(document)
-                        elif existingConsumer.desiredState() == Consumer.State.Running and not self.__isTriggerDocActive(document):
-                            # running trigger should become disabled
-                            logging.info('[{}] Existing running trigger should become disabled'.format(change["id"]))
-                            existingConsumer.disable()
                         else:
-                            logging.debug('[changes] Found non-interesting trigger change: \n{}\n{}'.format(existingConsumer.desiredState(), document))
-                else:
-                    logging.debug('[changes] Found a change for a non-trigger document')
+                            logging.info('[{}] Found a change to an existing trigger'.format(change["id"]))
+                            existingConsumer = self.consumers.getConsumerForTrigger(change["id"])
 
-            logging.error("[changes] uh-oh! I made it out of the changes for loop!")
+                            if existingConsumer.desiredState() == Consumer.State.Disabled and self.__isTriggerDocActive(document):
+                                # disabled trigger has become active
+                                logging.info('[{}] Existing disabled trigger should become active'.format(change["id"]))
+                                self.createAndRunConsumer(document)
+                            elif existingConsumer.desiredState() == Consumer.State.Running and not self.__isTriggerDocActive(document):
+                                # running trigger should become disabled
+                                logging.info('[{}] Existing running trigger should become disabled'.format(change["id"]))
+                                existingConsumer.disable()
+                            else:
+                                logging.debug('[changes] Found non-interesting trigger change: \n{}\n{}'.format(existingConsumer.desiredState(), document))
+                    elif 'canary' in change['doc']:
+                        # found a canary - update lastCanaryTime
+                        logging.info('[canary] I found a canary. The last one was {} seconds ago.'.format(secondsSince(self.lastCanaryTime)))
+                        self.lastCanaryTime = datetime.now()
+
+                        # delete the canary document
+                        self.database.deleteDoc(change['id'])
+                    else:
+                        logging.debug('[changes] Found a change for a non-trigger document')
 
+                if secondsSince(self.lastCanaryTime) > canaryTimeout:
+                    logging.warn('[canary] It has been more than {} seconds since the last canary - restarting the DB changes feed'.format(canaryTimeout))
+                    self.restartChangesFeed()
+                    break
 
 Review comment:
   Let's say we do that... then what do we do with that information?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services