You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cb...@apache.org on 2018/08/17 07:41:59 UTC

[incubator-openwhisk] branch master updated: Log possible errors around creation of kafka clients. (#3972)

This is an automated email from the ASF dual-hosted git repository.

cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 0313730  Log possible errors around creation of kafka clients. (#3972)
0313730 is described below

commit 03137300589c295ba360963f84982e8bd80f30e6
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Fri Aug 17 09:41:55 2018 +0200

    Log possible errors around creation of kafka clients. (#3972)
---
 .../connector/kafka/KafkaConsumerConnector.scala   | 29 ++++++------
 .../connector/kafka/KafkaProducerConnector.scala   | 10 ++--
 .../src/main/scala/whisk/utils/Exceptions.scala    | 54 ++++++++++++++++++++++
 3 files changed, 75 insertions(+), 18 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index 1f2ea2b..0891cee 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -27,6 +27,7 @@ import whisk.common.{Logging, LoggingMarkers, MetricEmitter, Scheduler}
 import whisk.connector.kafka.KafkaConfiguration._
 import whisk.core.ConfigKeys
 import whisk.core.connector.MessageConsumer
+import whisk.utils.Exceptions
 import whisk.utils.TimeHelpers._
 
 import scala.collection.JavaConverters._
@@ -41,7 +42,8 @@ class KafkaConsumerConnector(
   groupid: String,
   topic: String,
   override val maxPeek: Int = Int.MaxValue)(implicit logging: Logging, actorSystem: ActorSystem)
-    extends MessageConsumer {
+    extends MessageConsumer
+    with Exceptions {
 
   implicit val ec: ExecutionContext = actorSystem.dispatcher
   private val gracefulWaitTime = 100.milliseconds
@@ -148,28 +150,27 @@ class KafkaConsumerConnector(
 
     verifyConfig(config, ConsumerConfig.configNames().asScala.toSet)
 
-    val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new ByteArrayDeserializer)
+    val consumer = tryAndThrow(s"creating consumer for $topic") {
+      new KafkaConsumer(config, new ByteArrayDeserializer, new ByteArrayDeserializer)
+    }
 
     // subscribe does not need to be synchronized, because the reference to the consumer hasn't been returned yet and
     // thus this is guaranteed only to be called by the calling thread.
-    consumer.subscribe(Seq(topic).asJavaCollection)
+    tryAndThrow(s"subscribing to $topic")(consumer.subscribe(Seq(topic).asJavaCollection))
+
     consumer
   }
 
   private def recreateConsumer(): Unit = synchronized {
     logging.info(this, s"recreating consumer for '$topic'")
-    try {
-      consumer.close()
-    } catch {
-      // According to documentation, the consumer is force closed if it cannot be closed gracefully.
-      // See https://kafka.apache.org/11/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
-      //
-      // For the moment, we have no special handling of 'InterruptException' - it may be possible or even
-      // needed to re-try the 'close()' when being interrupted.
-      case t: Throwable =>
-        logging.error(this, s"failed to close old consumer while recreating: $t")
-    }
+    // According to documentation, the consumer is force closed if it cannot be closed gracefully.
+    // See https://kafka.apache.org/11/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
+    //
+    // For the moment, we have no special handling of 'InterruptException' - it may be possible or even
+    // needed to re-try the 'close()' when being interrupted.
+    tryAndSwallow("closing old consumer")(consumer.close())
     logging.info(this, s"old consumer closed for '$topic'")
+
     consumer = createConsumer(topic)
   }
 
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
index aea6b3c..692a149 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
@@ -28,6 +28,7 @@ import whisk.connector.kafka.KafkaConfiguration._
 import whisk.core.ConfigKeys
 import whisk.core.connector.{Message, MessageProducer}
 import whisk.core.entity.UUIDs
+import whisk.utils.Exceptions
 
 import scala.collection.JavaConverters._
 import scala.concurrent.duration._
@@ -36,7 +37,8 @@ import scala.util.{Failure, Success}
 
 class KafkaProducerConnector(kafkahosts: String, id: String = UUIDs.randomUUID().toString)(implicit logging: Logging,
                                                                                            actorSystem: ActorSystem)
-    extends MessageProducer {
+    extends MessageProducer
+    with Exceptions {
 
   implicit val ec: ExecutionContext = actorSystem.dispatcher
   private val gracefulWaitTime = 100.milliseconds
@@ -99,12 +101,12 @@ class KafkaProducerConnector(kafkahosts: String, id: String = UUIDs.randomUUID()
 
     verifyConfig(config, ProducerConfig.configNames().asScala.toSet)
 
-    new KafkaProducer(config, new StringSerializer, new StringSerializer)
+    tryAndThrow("creating producer")(new KafkaProducer(config, new StringSerializer, new StringSerializer))
   }
 
   private def recreateProducer(): Unit = {
-    val oldProducer = producer
-    oldProducer.close()
+    logging.info(this, s"recreating producer")
+    tryAndSwallow("closing old producer")(producer.close())
     logging.info(this, s"old producer closed")
     producer = createProducer()
   }
diff --git a/common/scala/src/main/scala/whisk/utils/Exceptions.scala b/common/scala/src/main/scala/whisk/utils/Exceptions.scala
new file mode 100644
index 0000000..f56f5fb
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/utils/Exceptions.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.utils
+
+import whisk.common.Logging
+
+import scala.util.control.NonFatal
+
+trait Exceptions {
+
+  /**
+   * Executes the block, catches and logs a NonFatal exception and swallows it.
+   *
+   * @param task description of the task that's being executed
+   * @param block the block to execute
+   */
+  def tryAndSwallow(task: String)(block: => Any)(implicit logging: Logging): Unit = {
+    try block
+    catch {
+      case NonFatal(t) => logging.error(this, s"$task failed: $t")
+    }
+  }
+
+  /**
+   * Executes the block, catches and logs a NonFatal exception and rethrows it.
+   *
+   * @param task description of the task that's being executed
+   * @param block the block to execute
+   */
+  def tryAndThrow[T](task: String)(block: => T)(implicit logging: Logging): T = {
+    try block
+    catch {
+      case NonFatal(t) =>
+        logging.error(this, s"$task failed: $t")
+        throw t
+    }
+  }
+
+}