You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2017/06/15 16:48:44 UTC

[incubator-openwhisk] branch master updated: surface errors in runtimes.manifest configuration in the controller logs (#2256)

This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new cdf7a69  surface errors in runtimes.manifest configuration in the controller logs (#2256)
cdf7a69 is described below

commit cdf7a69a8199858add89e790cdec769039ae8207
Author: ddragosd <dd...@gmail.com>
AuthorDate: Thu Jun 15 09:48:42 2017 -0700

    surface errors in runtimes.manifest configuration in the controller logs (#2256)
---
 .../scala/whisk/core/entity/ExecManifest.scala     | 17 +++--
 .../scala/whisk/core/controller/Controller.scala   | 26 +++++---
 .../main/scala/whisk/core/invoker/Invoker.scala    | 73 +++++++++++++---------
 .../scala/whisk/core/entity/test/ExecHelpers.scala |  2 +-
 .../whisk/core/entity/test/ExecManifestTests.scala | 28 ++++++++-
 5 files changed, 94 insertions(+), 52 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/core/entity/ExecManifest.scala b/common/scala/src/main/scala/whisk/core/entity/ExecManifest.scala
index 21746d6..bfba5ac 100644
--- a/common/scala/src/main/scala/whisk/core/entity/ExecManifest.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/ExecManifest.scala
@@ -16,9 +16,7 @@
 
 package whisk.core.entity
 
-import scala.util.Try
-import scala.util.Failure
-
+import scala.util.{Failure, Success, Try}
 import spray.json._
 import spray.json.DefaultJsonProtocol._
 import whisk.core.WhiskConfig
@@ -43,15 +41,14 @@ protected[core] object ExecManifest {
      *
      * @param config a valid configuration
      * @param reinit re-initialize singleton iff true
-     * @return true if initialized successfully, or if previously initialized
+     * @return the manifest if initialized successfully, or if previously initialized
      */
-    protected[core] def initialize(config: WhiskConfig, reinit: Boolean = false): Boolean = {
+    protected[core] def initialize(config: WhiskConfig, reinit: Boolean = false): Try[Runtimes] = {
         if (manifest.isEmpty || reinit) {
-            Try(config.runtimesManifest.parseJson.asJsObject)
-                .flatMap(runtimes(_))
-                .map(m => manifest = Some(m))
-                .isSuccess
-        } else true
+            val mf = Try(config.runtimesManifest.parseJson.asJsObject).flatMap(runtimes(_))
+            mf.foreach(m => manifest = Some(m))
+            mf
+        } else Success(manifest.get)
     }
 
     /**
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index 8a6aad6..808da4e 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -18,12 +18,10 @@ package whisk.core.controller
 
 import scala.concurrent.Await
 import scala.concurrent.duration.DurationInt
-
 import akka.actor.Actor
 import akka.actor.ActorContext
 import akka.actor.ActorSystem
 import akka.japi.Creator
-
 import spray.http.StatusCodes._
 import spray.http.Uri
 import spray.httpx.SprayJsonSupport._
@@ -31,7 +29,6 @@ import spray.json._
 import spray.json.DefaultJsonProtocol._
 import spray.routing.Directive.pimpApply
 import spray.routing.Route
-
 import whisk.common.AkkaLogging
 import whisk.common.Logging
 import whisk.common.TransactionId
@@ -46,6 +43,8 @@ import whisk.http.BasicHttpService
 import whisk.http.BasicRasService
 import whisk.common.LoggingMarkers
 
+import scala.util.{Failure, Success}
+
 /**
  * The Controller is the service that provides the REST API for OpenWhisk.
  *
@@ -180,14 +179,25 @@ object Controller {
         // second argument.  (TODO .. seems fragile)
         val instance = if (args.length > 0) args(1).toInt else 0
 
-        // initialize the runtimes manifest
-        if (config.isValid && ExecManifest.initialize(config)) {
-            val port = config.servicePort.toInt
-            BasicHttpService.startService(actorSystem, "controller", "0.0.0.0", port, new ServiceBuilder(config, instance, logger))
-        } else {
+        def abort() = {
             logger.error(this, "Bad configuration, cannot start.")
             actorSystem.terminate()
             Await.result(actorSystem.whenTerminated, 30.seconds)
+            sys.exit(1)
+        }
+
+        if (!config.isValid) {
+            abort()
+        }
+
+        ExecManifest.initialize(config) match {
+            case Success(_) =>
+                val port = config.servicePort.toInt
+                BasicHttpService.startService(actorSystem, "controller", "0.0.0.0", port, new ServiceBuilder(config, instance, logger))
+
+            case Failure(t) =>
+                logger.error(this, s"Invalid runtimes manifest: $t")
+                abort()
         }
     }
 }
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 83c2d91..2733a4f 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -431,42 +431,53 @@ object Invoker {
         // load values for the required properties from the environment
         val config = new WhiskConfig(requiredProperties)
 
-        // if configuration is valid, initialize the runtimes manifest
-        if (config.isValid && ExecManifest.initialize(config)) {
-            val topic = s"invoker$instance"
-            val groupid = "invokers"
-            val maxdepth = ContainerPool.getDefaultMaxActive(config)
-            val consumer = new KafkaConsumerConnector(config.kafkaHost, groupid, topic, maxdepth)
-            val producer = new KafkaProducerConnector(config.kafkaHost, ec)
-            val dispatcher = new Dispatcher(consumer, 500 milliseconds, 2 * maxdepth, actorSystem)
-
-            val invoker = if (Try(config.invokerUseReactivePool.toBoolean).getOrElse(false)) {
-                new InvokerReactive(config, instance, dispatcher.activationFeed, producer)
-            } else {
-                new Invoker(config, instance, dispatcher.activationFeed, producer)
-            }
-            logger.info(this, s"using $invoker")
+        def abort() = {
+            logger.error(this, "Bad configuration, cannot start.")
+            actorSystem.terminate()
+            Await.result(actorSystem.whenTerminated, 30.seconds)
+            sys.exit(1)
+        }
 
-            dispatcher.addHandler(invoker, true)
-            dispatcher.start()
+        if (!config.isValid) {
+            abort()
+        }
 
-            Scheduler.scheduleWaitAtMost(1.seconds)(() => {
-                producer.send("health", PingMessage(s"invoker$instance")).andThen {
-                    case Failure(t) => logger.error(this, s"failed to ping the controller: $t")
-                }
-            })
+        val execManifest = ExecManifest.initialize(config)
+        if (execManifest.isFailure) {
+            logger.error(this, s"Invalid runtimes manifest: ${execManifest.failed.get}")
+            abort()
+        }
 
-            val port = config.servicePort.toInt
-            BasicHttpService.startService(actorSystem, "invoker", "0.0.0.0", port, new Creator[InvokerServer] {
-                def create = new InvokerServer {
-                    override implicit val logging = logger
-                }
-            })
+        val topic = s"invoker$instance"
+        val groupid = "invokers"
+        val maxdepth = ContainerPool.getDefaultMaxActive(config)
+        val consumer = new KafkaConsumerConnector(config.kafkaHost, groupid, topic, maxdepth)
+        val producer = new KafkaProducerConnector(config.kafkaHost, ec)
+        val dispatcher = new Dispatcher(consumer, 500 milliseconds, 2 * maxdepth, actorSystem)
+
+        val invoker = if (Try(config.invokerUseReactivePool.toBoolean).getOrElse(false)) {
+            new InvokerReactive(config, instance, dispatcher.activationFeed, producer)
         } else {
-            logger.error(this, "Bad configuration, cannot start.")
-            actorSystem.terminate()
-            Await.result(actorSystem.whenTerminated, 30.seconds)
+            new Invoker(config, instance, dispatcher.activationFeed, producer)
         }
+        logger.info(this, s"using $invoker")
+
+        dispatcher.addHandler(invoker, true)
+        dispatcher.start()
+
+        Scheduler.scheduleWaitAtMost(1.seconds)(() => {
+            producer.send("health", PingMessage(s"invoker$instance")).andThen {
+                case Failure(t) => logger.error(this, s"failed to ping the controller: $t")
+            }
+        })
+
+        val port = config.servicePort.toInt
+        BasicHttpService.startService(actorSystem, "invoker", "0.0.0.0", port, new Creator[InvokerServer] {
+            def create = new InvokerServer {
+                override implicit val logging = logger
+            }
+        })
+
     }
 }
 
diff --git a/tests/src/test/scala/whisk/core/entity/test/ExecHelpers.scala b/tests/src/test/scala/whisk/core/entity/test/ExecHelpers.scala
index d0b9871..9dd98cd 100644
--- a/tests/src/test/scala/whisk/core/entity/test/ExecHelpers.scala
+++ b/tests/src/test/scala/whisk/core/entity/test/ExecHelpers.scala
@@ -33,7 +33,7 @@ trait ExecHelpers
     self: Suite =>
 
     private val config = new WhiskConfig(ExecManifest.requiredProperties)
-    ExecManifest.initialize(config) shouldBe true
+    ExecManifest.initialize(config) should be a 'success
 
     protected val NODEJS = "nodejs"
     protected val NODEJS6 = "nodejs:6"
diff --git a/tests/src/test/scala/whisk/core/entity/test/ExecManifestTests.scala b/tests/src/test/scala/whisk/core/entity/test/ExecManifestTests.scala
index 611cec4..1c865a0 100644
--- a/tests/src/test/scala/whisk/core/entity/test/ExecManifestTests.scala
+++ b/tests/src/test/scala/whisk/core/entity/test/ExecManifestTests.scala
@@ -16,21 +16,27 @@
 
 package whisk.core.entity.test
 
-import scala.util.Success
+import java.io.{BufferedWriter, File, FileWriter}
+import java.util.NoSuchElementException
 
+import scala.util.{Success}
 import org.junit.runner.RunWith
 import org.scalatest.FlatSpec
 import org.scalatest.Matchers
 import org.scalatest.junit.JUnitRunner
-
 import spray.json._
 import spray.json.DefaultJsonProtocol._
+import whisk.core.WhiskConfig
 import whisk.core.entity.ExecManifest
 import whisk.core.entity.ExecManifest._
+import common.StreamLogging
+import common.WskActorSystem
 
 @RunWith(classOf[JUnitRunner])
 class ExecManifestTests
     extends FlatSpec
+    with WskActorSystem
+    with StreamLogging
     with Matchers {
 
     behavior of "ExecManifest"
@@ -170,4 +176,22 @@ class ExecManifestTests
                     image.localImageName("r", "p", Some("tag")) shouldBe s"r/p/$name:tag"
             }
     }
+
+    it should "throw an error when configured manifest is a valid JSON, but with a missing key" in {
+        val config_manifest = """{"nodejs":[{"kind":"nodejs:6","default":true,"image":{"name":"nodejs6action"}}]}"""
+        val file = File.createTempFile("cxt", ".txt")
+        file.deleteOnExit()
+
+        val bw = new BufferedWriter(new FileWriter(file))
+        bw.write("runtimes.manifest=" + config_manifest + "\n")
+        bw.close()
+
+        val result = ExecManifest.initialize(new WhiskConfig(Map("runtimes.manifest" -> null), Set(), file), true)
+
+        result should be a 'failure
+
+        the [NoSuchElementException] thrownBy {
+            result.get
+        } should have message ("key not found: runtimes")
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].