You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2019/07/25 08:08:18 UTC

[incubator-openwhisk] branch master updated: Close the consumer when WakeupExcpetion happened (#4459)

This is an automated email from the ASF dual-hosted git repository.

style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new e99911f  Close the consumer when WakeupExcpetion happened (#4459)
e99911f is described below

commit e99911fdf0ea5908b2ed924141c167f72db203c1
Author: ningyougang <41...@qq.com>
AuthorDate: Thu Jul 25 16:08:05 2019 +0800

    Close the consumer when WakeupExcpetion happened (#4459)
---
 .../apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala    | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala
index b75c689..db4cfe3 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala
@@ -132,6 +132,9 @@ class KafkaConsumerConnector(
         } else {
           throw e
         }
+      case e: WakeupException =>
+        logging.info(this, s"WakeupException happened when do commit action for topic ${topic}")
+        recreateConsumer()
     }
 
   override def close(): Unit = synchronized {
@@ -196,6 +199,8 @@ class KafkaConsumerConnector(
         }
       }
     }.andThen {
+      case Failure(_: WakeupException) =>
+        recreateConsumer()
       case Failure(e) =>
         // Only log level info because failed metric reporting is not critical
         logging.info(this, s"lag metric reporting failed for topic '$topic': $e")