You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/06/12 11:37:11 UTC

[GitHub] cbickel closed pull request #3686: Refactor `ensureTopic` to expose failure details.

cbickel closed pull request #3686: Refactor `ensureTopic` to expose failure details.
URL: https://github.com/apache/incubator-openwhisk/pull/3686
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/common/CausedBy.scala b/common/scala/src/main/scala/whisk/common/CausedBy.scala
new file mode 100644
index 0000000000..caa2ba4d0d
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/common/CausedBy.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.common
+
+/**
+ * Helper to match on exceptions caused by other exceptions.
+ *
+ * Use this like:
+ *
+ * ```
+ * try {
+ *   block()
+ * } catch {
+ *   case CausedBy(internalException: MyFancyException) => ...
+ * }
+ * ```
+ */
+object CausedBy {
+  def unapply(e: Throwable): Option[Throwable] = Option(e.getCause)
+}
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
index e939a4637e..7351b64f32 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -18,18 +18,18 @@
 package whisk.connector.kafka
 
 import java.util.Properties
-import java.util.concurrent.ExecutionException
 
 import akka.actor.ActorSystem
 import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic}
 import org.apache.kafka.common.errors.TopicExistsException
 import pureconfig._
-import whisk.common.Logging
+import whisk.common.{CausedBy, Logging}
 import whisk.core.{ConfigKeys, WhiskConfig}
 import whisk.core.connector.{MessageConsumer, MessageProducer, MessagingProvider}
 
 import scala.collection.JavaConverters._
 import scala.concurrent.duration.FiniteDuration
+import scala.util.{Failure, Success, Try}
 
 case class KafkaConfig(replicationFactor: Short)
 
@@ -47,31 +47,28 @@ object KafkaMessagingProvider extends MessagingProvider {
   def getProducer(config: WhiskConfig)(implicit logging: Logging, actorSystem: ActorSystem): MessageProducer =
     new KafkaProducerConnector(config.kafkaHosts)
 
-  def ensureTopic(config: WhiskConfig, topic: String, topicConfig: String)(implicit logging: Logging): Boolean = {
-    val kc = loadConfigOrThrow[KafkaConfig](ConfigKeys.kafka)
-    val tc = KafkaConfiguration.configMapToKafkaConfig(
-      loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaTopics + s".$topicConfig"))
+  def ensureTopic(config: WhiskConfig, topic: String, topicConfigKey: String)(implicit logging: Logging): Try[Unit] = {
+    val kafkaConfig = loadConfigOrThrow[KafkaConfig](ConfigKeys.kafka)
+    val topicConfig = KafkaConfiguration.configMapToKafkaConfig(
+      loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaTopics + "." + topicConfigKey))
 
-    val baseConfig = Map(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.kafkaHosts)
     val commonConfig = configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon))
-    val client = AdminClient.create(baseConfig ++ commonConfig)
-    val numPartitions = 1
-    val nt = new NewTopic(topic, numPartitions, kc.replicationFactor).configs(tc.asJava)
-    val results = client.createTopics(List(nt).asJava)
-    try {
-      results.values().get(topic).get()
-      logging.info(this, s"created topic $topic")
-      true
-    } catch {
-      case e: ExecutionException if e.getCause.isInstanceOf[TopicExistsException] =>
-        logging.info(this, s"topic $topic already existed")
-        true
-      case e: Exception =>
-        logging.error(this, s"ensureTopic for $topic failed due to $e")
-        false
-    } finally {
-      client.close()
-    }
+    val client = AdminClient.create(commonConfig + (AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.kafkaHosts))
+    val partitions = 1
+    val nt = new NewTopic(topic, partitions, kafkaConfig.replicationFactor).configs(topicConfig.asJava)
+
+    val result = Try(client.createTopics(List(nt).asJava).values().get(topic).get())
+      .map(_ => logging.info(this, s"created topic $topic"))
+      .recoverWith {
+        case CausedBy(_: TopicExistsException) =>
+          Success(logging.info(this, s"topic $topic already existed"))
+        case t =>
+          logging.error(this, s"ensureTopic for $topic failed due to $t")
+          Failure(t)
+      }
+
+    client.close()
+    result
   }
 }
 
diff --git a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
index 8ec1f5a55d..6e6b24a212 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
@@ -25,6 +25,8 @@ import whisk.common.Logging
 import whisk.core.WhiskConfig
 import whisk.spi.Spi
 
+import scala.util.Try
+
 /**
  * An Spi for providing Messaging implementations.
  */
@@ -36,5 +38,5 @@ trait MessagingProvider extends Spi {
     maxPeek: Int = Int.MaxValue,
     maxPollInterval: FiniteDuration = 5.minutes)(implicit logging: Logging, actorSystem: ActorSystem): MessageConsumer
   def getProducer(config: WhiskConfig)(implicit logging: Logging, actorSystem: ActorSystem): MessageProducer
-  def ensureTopic(config: WhiskConfig, topic: String, topicConfig: String)(implicit logging: Logging): Boolean
+  def ensureTopic(config: WhiskConfig, topic: String, topicConfig: String)(implicit logging: Logging): Try[Unit]
 }
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index c1c739075b..8096711b1b 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -216,18 +216,16 @@ object Controller {
     }
 
     val msgProvider = SpiLoader.get[MessagingProvider]
-    if (!msgProvider.ensureTopic(config, topic = "completed" + instance, topicConfig = "completed")) {
-      abort(s"failure during msgProvider.ensureTopic for topic completed$instance")
-    }
-    if (!msgProvider.ensureTopic(config, topic = "health", topicConfig = "health")) {
-      abort(s"failure during msgProvider.ensureTopic for topic health")
-    }
-    if (!msgProvider.ensureTopic(config, topic = "cacheInvalidation", topicConfig = "cache-invalidation")) {
-      abort(s"failure during msgProvider.ensureTopic for topic cacheInvalidation")
-    }
 
-    if (!msgProvider.ensureTopic(config, topic = "events", topicConfig = "events")) {
-      abort(s"failure during msgProvider.ensureTopic for topic events")
+    Map(
+      "completed" + instance -> "completed",
+      "health" -> "health",
+      "cacheInvalidation" -> "cache-invalidation",
+      "events" -> "events").foreach {
+      case (topic, topicConfigurationKey) =>
+        if (msgProvider.ensureTopic(config, topic, topicConfigurationKey).isFailure) {
+          abort(s"failure during msgProvider.ensureTopic for topic $topic")
+        }
     }
 
     ExecManifest.initialize(config) match {
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 4a69d3cf99..134aa05ed5 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -168,7 +168,7 @@ object Invoker {
 
     val invokerInstance = InstanceId(assignedInvokerId, invokerName)
     val msgProvider = SpiLoader.get[MessagingProvider]
-    if (!msgProvider.ensureTopic(config, topic = "invoker" + assignedInvokerId, topicConfig = "invoker")) {
+    if (msgProvider.ensureTopic(config, topic = "invoker" + assignedInvokerId, topicConfig = "invoker").isFailure) {
       abort(s"failure during msgProvider.ensureTopic for topic invoker$assignedInvokerId")
     }
     val producer = msgProvider.getProducer(config)
diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala b/tests/src/test/scala/services/KafkaConnectorTests.scala
index 3f91b59fd2..57e2e56ad2 100644
--- a/tests/src/test/scala/services/KafkaConnectorTests.scala
+++ b/tests/src/test/scala/services/KafkaConnectorTests.scala
@@ -18,6 +18,7 @@
 package services
 
 import java.io.File
+import java.nio.charset.StandardCharsets
 import java.util.Calendar
 
 import common.{StreamLogging, TestUtils, WhiskProperties, WskActorSystem}
@@ -61,11 +62,9 @@ class KafkaConnectorTests
   val kafkaHosts: Array[String] = config.kafkaHosts.split(",")
   val replicationFactor: Int = kafkaHosts.length / 2 + 1
   System.setProperty("whisk.kafka.replication-factor", replicationFactor.toString)
-  println(s"Create test topic '$topic' with replicationFactor=$replicationFactor")
-  assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic $topic failed")
 
   println(s"Create test topic '$topic' with replicationFactor=$replicationFactor")
-  assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic $topic failed")
+  KafkaMessagingProvider.ensureTopic(config, topic, topic) shouldBe 'success
 
   val producer = new KafkaProducerConnector(config.kafkaHosts)
   val consumer = new KafkaConsumerConnector(config.kafkaHosts, groupid, topic)
@@ -94,7 +93,8 @@ class KafkaConnectorTests
       val sent = Await.result(producer.send(topic, message), waitForSend)
       println(s"Successfully sent message to topic: $sent")
       println(s"Receiving message from topic.")
-      val received = consumer.peek(waitForReceive).map { case (_, _, _, msg) => new String(msg, "utf-8") }
+      val received =
+        consumer.peek(waitForReceive).map { case (_, _, _, msg) => new String(msg, StandardCharsets.UTF_8) }
       val end = java.lang.System.currentTimeMillis
       val elapsed = end - start
       println(s"Received ${received.size}. Took $elapsed msec: $received")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services