You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ja...@apache.org on 2019/05/13 20:58:33 UTC

[incubator-openwhisk-package-kafka] branch master updated: Set running state after brokers are connected (#340)

This is an automated email from the ASF dual-hosted git repository.

japetrsn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new 30f549d  Set running state after brokers are connected (#340)
30f549d is described below

commit 30f549db6014bcc3e8e1e340e39a2f5cdc58a784
Author: James Dubee <jw...@us.ibm.com>
AuthorDate: Mon May 13 16:58:28 2019 -0400

    Set running state after brokers are connected (#340)
---
 provider/consumer.py | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/provider/consumer.py b/provider/consumer.py
index 2ced050..91d4094 100644
--- a/provider/consumer.py
+++ b/provider/consumer.py
@@ -329,7 +329,6 @@ class ConsumerProcess (Process):
 
                 if self.secondsSinceLastPoll() < 0:
                     logging.info('[{}] Completed first poll'.format(self.trigger))
-                    self.__recordState(Consumer.State.Running)
 
                 if (message is not None):
                     if not message.error():
@@ -530,6 +529,10 @@ class ConsumerProcess (Process):
     def __on_assign(self, consumer, partitions):
         logging.info('[{}] Completed partition assignment. Connected to broker(s)'.format(self.trigger))
 
+        if self.currentState() == Consumer.State.Initializing and self.__shouldRun():
+            logging.info('[{}] Setting consumer state to runnning.'.format(self.trigger))
+            self.__recordState(Consumer.State.Running)
+
     def __on_revoke(self, consumer, partitions):
         logging.info('[{}] Partition assignment has been revoked. Disconnected from broker(s)'.format(self.trigger))