You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cb...@apache.org on 2018/06/15 15:08:47 UTC

[incubator-openwhisk] branch master updated: Retry `ensureTopic` on transient, retriable exceptions. (#3753)

This is an automated email from the ASF dual-hosted git repository.

cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c816aa  Retry `ensureTopic` on transient, retriable exceptions. (#3753)
3c816aa is described below

commit 3c816aa6e326e643da5ebb8d5ad504578a597b9b
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Fri Jun 15 17:08:35 2018 +0200

    Retry `ensureTopic` on transient, retriable exceptions. (#3753)
    
    Like writing and reading from Kafka, creating topics can be subject to a battery of transient errors. Retrying those errors is safe and keeps us sane.
---
 .../connector/kafka/KafkaMessagingProvider.scala   | 30 ++++++++++++++--------
 1 file changed, 19 insertions(+), 11 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
index 7351b64..6843cbc 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -21,14 +21,14 @@ import java.util.Properties
 
 import akka.actor.ActorSystem
 import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic}
-import org.apache.kafka.common.errors.TopicExistsException
+import org.apache.kafka.common.errors.{RetriableException, TopicExistsException}
 import pureconfig._
 import whisk.common.{CausedBy, Logging}
 import whisk.core.{ConfigKeys, WhiskConfig}
 import whisk.core.connector.{MessageConsumer, MessageProducer, MessagingProvider}
 
 import scala.collection.JavaConverters._
-import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.duration._
 import scala.util.{Failure, Success, Try}
 
 case class KafkaConfig(replicationFactor: Short)
@@ -57,15 +57,23 @@ object KafkaMessagingProvider extends MessagingProvider {
     val partitions = 1
     val nt = new NewTopic(topic, partitions, kafkaConfig.replicationFactor).configs(topicConfig.asJava)
 
-    val result = Try(client.createTopics(List(nt).asJava).values().get(topic).get())
-      .map(_ => logging.info(this, s"created topic $topic"))
-      .recoverWith {
-        case CausedBy(_: TopicExistsException) =>
-          Success(logging.info(this, s"topic $topic already existed"))
-        case t =>
-          logging.error(this, s"ensureTopic for $topic failed due to $t")
-          Failure(t)
-      }
+    def createTopic(retries: Int = 5): Try[Unit] = {
+      Try(client.createTopics(List(nt).asJava).values().get(topic).get())
+        .map(_ => logging.info(this, s"created topic $topic"))
+        .recoverWith {
+          case CausedBy(_: TopicExistsException) =>
+            Success(logging.info(this, s"topic $topic already existed"))
+          case CausedBy(t: RetriableException) if retries > 0 =>
+            logging.warn(this, s"topic $topic could not be created because of $t, retries left: $retries")
+            Thread.sleep(1.second.toMillis)
+            createTopic(retries - 1)
+          case t =>
+            logging.error(this, s"ensureTopic for $topic failed due to $t")
+            Failure(t)
+        }
+    }
+
+    val result = createTopic()
 
     client.close()
     result

-- 
To stop receiving notification emails like this one, please contact
cbickel@apache.org.