You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2019/07/25 08:08:18 UTC
[incubator-openwhisk] branch master updated: Close the consumer
when WakeupExcpetion happened (#4459)
This is an automated email from the ASF dual-hosted git repository.
style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new e99911f Close the consumer when WakeupExcpetion happened (#4459)
e99911f is described below
commit e99911fdf0ea5908b2ed924141c167f72db203c1
Author: ningyougang <41...@qq.com>
AuthorDate: Thu Jul 25 16:08:05 2019 +0800
Close the consumer when WakeupExcpetion happened (#4459)
---
.../apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala | 5 +++++
1 file changed, 5 insertions(+)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala
index b75c689..db4cfe3 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala
@@ -132,6 +132,9 @@ class KafkaConsumerConnector(
} else {
throw e
}
+ case e: WakeupException =>
+ logging.info(this, s"WakeupException happened when do commit action for topic ${topic}")
+ recreateConsumer()
}
override def close(): Unit = synchronized {
@@ -196,6 +199,8 @@ class KafkaConsumerConnector(
}
}
}.andThen {
+ case Failure(_: WakeupException) =>
+ recreateConsumer()
case Failure(e) =>
// Only log level info because failed metric reporting is not critical
logging.info(this, s"lag metric reporting failed for topic '$topic': $e")