You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cb...@apache.org on 2018/08/17 07:41:59 UTC
[incubator-openwhisk] branch master updated: Log possible errors
around creation of kafka clients. (#3972)
This is an automated email from the ASF dual-hosted git repository.
cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 0313730 Log possible errors around creation of kafka clients. (#3972)
0313730 is described below
commit 03137300589c295ba360963f84982e8bd80f30e6
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Fri Aug 17 09:41:55 2018 +0200
Log possible errors around creation of kafka clients. (#3972)
---
.../connector/kafka/KafkaConsumerConnector.scala | 29 ++++++------
.../connector/kafka/KafkaProducerConnector.scala | 10 ++--
.../src/main/scala/whisk/utils/Exceptions.scala | 54 ++++++++++++++++++++++
3 files changed, 75 insertions(+), 18 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index 1f2ea2b..0891cee 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -27,6 +27,7 @@ import whisk.common.{Logging, LoggingMarkers, MetricEmitter, Scheduler}
import whisk.connector.kafka.KafkaConfiguration._
import whisk.core.ConfigKeys
import whisk.core.connector.MessageConsumer
+import whisk.utils.Exceptions
import whisk.utils.TimeHelpers._
import scala.collection.JavaConverters._
@@ -41,7 +42,8 @@ class KafkaConsumerConnector(
groupid: String,
topic: String,
override val maxPeek: Int = Int.MaxValue)(implicit logging: Logging, actorSystem: ActorSystem)
- extends MessageConsumer {
+ extends MessageConsumer
+ with Exceptions {
implicit val ec: ExecutionContext = actorSystem.dispatcher
private val gracefulWaitTime = 100.milliseconds
@@ -148,28 +150,27 @@ class KafkaConsumerConnector(
verifyConfig(config, ConsumerConfig.configNames().asScala.toSet)
- val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new ByteArrayDeserializer)
+ val consumer = tryAndThrow(s"creating consumer for $topic") {
+ new KafkaConsumer(config, new ByteArrayDeserializer, new ByteArrayDeserializer)
+ }
// subscribe does not need to be synchronized, because the reference to the consumer hasn't been returned yet and
// thus this is guaranteed only to be called by the calling thread.
- consumer.subscribe(Seq(topic).asJavaCollection)
+ tryAndThrow(s"subscribing to $topic")(consumer.subscribe(Seq(topic).asJavaCollection))
+
consumer
}
private def recreateConsumer(): Unit = synchronized {
logging.info(this, s"recreating consumer for '$topic'")
- try {
- consumer.close()
- } catch {
- // According to documentation, the consumer is force closed if it cannot be closed gracefully.
- // See https://kafka.apache.org/11/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
- //
- // For the moment, we have no special handling of 'InterruptException' - it may be possible or even
- // needed to re-try the 'close()' when being interrupted.
- case t: Throwable =>
- logging.error(this, s"failed to close old consumer while recreating: $t")
- }
+ // According to documentation, the consumer is force closed if it cannot be closed gracefully.
+ // See https://kafka.apache.org/11/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
+ //
+ // For the moment, we have no special handling of 'InterruptException' - it may be possible or even
+ // needed to re-try the 'close()' when being interrupted.
+ tryAndSwallow("closing old consumer")(consumer.close())
logging.info(this, s"old consumer closed for '$topic'")
+
consumer = createConsumer(topic)
}
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
index aea6b3c..692a149 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
@@ -28,6 +28,7 @@ import whisk.connector.kafka.KafkaConfiguration._
import whisk.core.ConfigKeys
import whisk.core.connector.{Message, MessageProducer}
import whisk.core.entity.UUIDs
+import whisk.utils.Exceptions
import scala.collection.JavaConverters._
import scala.concurrent.duration._
@@ -36,7 +37,8 @@ import scala.util.{Failure, Success}
class KafkaProducerConnector(kafkahosts: String, id: String = UUIDs.randomUUID().toString)(implicit logging: Logging,
actorSystem: ActorSystem)
- extends MessageProducer {
+ extends MessageProducer
+ with Exceptions {
implicit val ec: ExecutionContext = actorSystem.dispatcher
private val gracefulWaitTime = 100.milliseconds
@@ -99,12 +101,12 @@ class KafkaProducerConnector(kafkahosts: String, id: String = UUIDs.randomUUID()
verifyConfig(config, ProducerConfig.configNames().asScala.toSet)
- new KafkaProducer(config, new StringSerializer, new StringSerializer)
+ tryAndThrow("creating producer")(new KafkaProducer(config, new StringSerializer, new StringSerializer))
}
private def recreateProducer(): Unit = {
- val oldProducer = producer
- oldProducer.close()
+ logging.info(this, s"recreating producer")
+ tryAndSwallow("closing old producer")(producer.close())
logging.info(this, s"old producer closed")
producer = createProducer()
}
diff --git a/common/scala/src/main/scala/whisk/utils/Exceptions.scala b/common/scala/src/main/scala/whisk/utils/Exceptions.scala
new file mode 100644
index 0000000..f56f5fb
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/utils/Exceptions.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.utils
+
+import whisk.common.Logging
+
+import scala.util.control.NonFatal
+
+trait Exceptions {
+
+ /**
+ * Executes the block, catches and logs a NonFatal exception and swallows it.
+ *
+ * @param task description of the task that's being executed
+ * @param block the block to execute
+ */
+ def tryAndSwallow(task: String)(block: => Any)(implicit logging: Logging): Unit = {
+ try block
+ catch {
+ case NonFatal(t) => logging.error(this, s"$task failed: $t")
+ }
+ }
+
+ /**
+ * Executes the block, catches and logs a NonFatal exception and rethrows it.
+ *
+ * @param task description of the task that's being executed
+ * @param block the block to execute
+ */
+ def tryAndThrow[T](task: String)(block: => T)(implicit logging: Logging): T = {
+ try block
+ catch {
+ case NonFatal(t) =>
+ logging.error(this, s"$task failed: $t")
+ throw t
+ }
+ }
+
+}