You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2017/06/15 16:48:44 UTC
[incubator-openwhisk] branch master updated: surface errors in
runtimes.manifest configuration in the controller logs (#2256)
This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new cdf7a69 surface errors in runtimes.manifest configuration in the controller logs (#2256)
cdf7a69 is described below
commit cdf7a69a8199858add89e790cdec769039ae8207
Author: ddragosd <dd...@gmail.com>
AuthorDate: Thu Jun 15 09:48:42 2017 -0700
surface errors in runtimes.manifest configuration in the controller logs (#2256)
---
.../scala/whisk/core/entity/ExecManifest.scala | 17 +++--
.../scala/whisk/core/controller/Controller.scala | 26 +++++---
.../main/scala/whisk/core/invoker/Invoker.scala | 73 +++++++++++++---------
.../scala/whisk/core/entity/test/ExecHelpers.scala | 2 +-
.../whisk/core/entity/test/ExecManifestTests.scala | 28 ++++++++-
5 files changed, 94 insertions(+), 52 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/core/entity/ExecManifest.scala b/common/scala/src/main/scala/whisk/core/entity/ExecManifest.scala
index 21746d6..bfba5ac 100644
--- a/common/scala/src/main/scala/whisk/core/entity/ExecManifest.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/ExecManifest.scala
@@ -16,9 +16,7 @@
package whisk.core.entity
-import scala.util.Try
-import scala.util.Failure
-
+import scala.util.{Failure, Success, Try}
import spray.json._
import spray.json.DefaultJsonProtocol._
import whisk.core.WhiskConfig
@@ -43,15 +41,14 @@ protected[core] object ExecManifest {
*
* @param config a valid configuration
* @param reinit re-initialize singleton iff true
- * @return true if initialized successfully, or if previously initialized
+ * @return the manifest if initialized successfully, or if previously initialized
*/
- protected[core] def initialize(config: WhiskConfig, reinit: Boolean = false): Boolean = {
+ protected[core] def initialize(config: WhiskConfig, reinit: Boolean = false): Try[Runtimes] = {
if (manifest.isEmpty || reinit) {
- Try(config.runtimesManifest.parseJson.asJsObject)
- .flatMap(runtimes(_))
- .map(m => manifest = Some(m))
- .isSuccess
- } else true
+ val mf = Try(config.runtimesManifest.parseJson.asJsObject).flatMap(runtimes(_))
+ mf.foreach(m => manifest = Some(m))
+ mf
+ } else Success(manifest.get)
}
/**
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index 8a6aad6..808da4e 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -18,12 +18,10 @@ package whisk.core.controller
import scala.concurrent.Await
import scala.concurrent.duration.DurationInt
-
import akka.actor.Actor
import akka.actor.ActorContext
import akka.actor.ActorSystem
import akka.japi.Creator
-
import spray.http.StatusCodes._
import spray.http.Uri
import spray.httpx.SprayJsonSupport._
@@ -31,7 +29,6 @@ import spray.json._
import spray.json.DefaultJsonProtocol._
import spray.routing.Directive.pimpApply
import spray.routing.Route
-
import whisk.common.AkkaLogging
import whisk.common.Logging
import whisk.common.TransactionId
@@ -46,6 +43,8 @@ import whisk.http.BasicHttpService
import whisk.http.BasicRasService
import whisk.common.LoggingMarkers
+import scala.util.{Failure, Success}
+
/**
* The Controller is the service that provides the REST API for OpenWhisk.
*
@@ -180,14 +179,25 @@ object Controller {
// second argument. (TODO .. seems fragile)
val instance = if (args.length > 0) args(1).toInt else 0
- // initialize the runtimes manifest
- if (config.isValid && ExecManifest.initialize(config)) {
- val port = config.servicePort.toInt
- BasicHttpService.startService(actorSystem, "controller", "0.0.0.0", port, new ServiceBuilder(config, instance, logger))
- } else {
+ def abort() = {
logger.error(this, "Bad configuration, cannot start.")
actorSystem.terminate()
Await.result(actorSystem.whenTerminated, 30.seconds)
+ sys.exit(1)
+ }
+
+ if (!config.isValid) {
+ abort()
+ }
+
+ ExecManifest.initialize(config) match {
+ case Success(_) =>
+ val port = config.servicePort.toInt
+ BasicHttpService.startService(actorSystem, "controller", "0.0.0.0", port, new ServiceBuilder(config, instance, logger))
+
+ case Failure(t) =>
+ logger.error(this, s"Invalid runtimes manifest: $t")
+ abort()
}
}
}
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 83c2d91..2733a4f 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -431,42 +431,53 @@ object Invoker {
// load values for the required properties from the environment
val config = new WhiskConfig(requiredProperties)
- // if configuration is valid, initialize the runtimes manifest
- if (config.isValid && ExecManifest.initialize(config)) {
- val topic = s"invoker$instance"
- val groupid = "invokers"
- val maxdepth = ContainerPool.getDefaultMaxActive(config)
- val consumer = new KafkaConsumerConnector(config.kafkaHost, groupid, topic, maxdepth)
- val producer = new KafkaProducerConnector(config.kafkaHost, ec)
- val dispatcher = new Dispatcher(consumer, 500 milliseconds, 2 * maxdepth, actorSystem)
-
- val invoker = if (Try(config.invokerUseReactivePool.toBoolean).getOrElse(false)) {
- new InvokerReactive(config, instance, dispatcher.activationFeed, producer)
- } else {
- new Invoker(config, instance, dispatcher.activationFeed, producer)
- }
- logger.info(this, s"using $invoker")
+ def abort() = {
+ logger.error(this, "Bad configuration, cannot start.")
+ actorSystem.terminate()
+ Await.result(actorSystem.whenTerminated, 30.seconds)
+ sys.exit(1)
+ }
- dispatcher.addHandler(invoker, true)
- dispatcher.start()
+ if (!config.isValid) {
+ abort()
+ }
- Scheduler.scheduleWaitAtMost(1.seconds)(() => {
- producer.send("health", PingMessage(s"invoker$instance")).andThen {
- case Failure(t) => logger.error(this, s"failed to ping the controller: $t")
- }
- })
+ val execManifest = ExecManifest.initialize(config)
+ if (execManifest.isFailure) {
+ logger.error(this, s"Invalid runtimes manifest: ${execManifest.failed.get}")
+ abort()
+ }
- val port = config.servicePort.toInt
- BasicHttpService.startService(actorSystem, "invoker", "0.0.0.0", port, new Creator[InvokerServer] {
- def create = new InvokerServer {
- override implicit val logging = logger
- }
- })
+ val topic = s"invoker$instance"
+ val groupid = "invokers"
+ val maxdepth = ContainerPool.getDefaultMaxActive(config)
+ val consumer = new KafkaConsumerConnector(config.kafkaHost, groupid, topic, maxdepth)
+ val producer = new KafkaProducerConnector(config.kafkaHost, ec)
+ val dispatcher = new Dispatcher(consumer, 500 milliseconds, 2 * maxdepth, actorSystem)
+
+ val invoker = if (Try(config.invokerUseReactivePool.toBoolean).getOrElse(false)) {
+ new InvokerReactive(config, instance, dispatcher.activationFeed, producer)
} else {
- logger.error(this, "Bad configuration, cannot start.")
- actorSystem.terminate()
- Await.result(actorSystem.whenTerminated, 30.seconds)
+ new Invoker(config, instance, dispatcher.activationFeed, producer)
}
+ logger.info(this, s"using $invoker")
+
+ dispatcher.addHandler(invoker, true)
+ dispatcher.start()
+
+ Scheduler.scheduleWaitAtMost(1.seconds)(() => {
+ producer.send("health", PingMessage(s"invoker$instance")).andThen {
+ case Failure(t) => logger.error(this, s"failed to ping the controller: $t")
+ }
+ })
+
+ val port = config.servicePort.toInt
+ BasicHttpService.startService(actorSystem, "invoker", "0.0.0.0", port, new Creator[InvokerServer] {
+ def create = new InvokerServer {
+ override implicit val logging = logger
+ }
+ })
+
}
}
diff --git a/tests/src/test/scala/whisk/core/entity/test/ExecHelpers.scala b/tests/src/test/scala/whisk/core/entity/test/ExecHelpers.scala
index d0b9871..9dd98cd 100644
--- a/tests/src/test/scala/whisk/core/entity/test/ExecHelpers.scala
+++ b/tests/src/test/scala/whisk/core/entity/test/ExecHelpers.scala
@@ -33,7 +33,7 @@ trait ExecHelpers
self: Suite =>
private val config = new WhiskConfig(ExecManifest.requiredProperties)
- ExecManifest.initialize(config) shouldBe true
+ ExecManifest.initialize(config) should be a 'success
protected val NODEJS = "nodejs"
protected val NODEJS6 = "nodejs:6"
diff --git a/tests/src/test/scala/whisk/core/entity/test/ExecManifestTests.scala b/tests/src/test/scala/whisk/core/entity/test/ExecManifestTests.scala
index 611cec4..1c865a0 100644
--- a/tests/src/test/scala/whisk/core/entity/test/ExecManifestTests.scala
+++ b/tests/src/test/scala/whisk/core/entity/test/ExecManifestTests.scala
@@ -16,21 +16,27 @@
package whisk.core.entity.test
-import scala.util.Success
+import java.io.{BufferedWriter, File, FileWriter}
+import java.util.NoSuchElementException
+import scala.util.{Success}
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import org.scalatest.junit.JUnitRunner
-
import spray.json._
import spray.json.DefaultJsonProtocol._
+import whisk.core.WhiskConfig
import whisk.core.entity.ExecManifest
import whisk.core.entity.ExecManifest._
+import common.StreamLogging
+import common.WskActorSystem
@RunWith(classOf[JUnitRunner])
class ExecManifestTests
extends FlatSpec
+ with WskActorSystem
+ with StreamLogging
with Matchers {
behavior of "ExecManifest"
@@ -170,4 +176,22 @@ class ExecManifestTests
image.localImageName("r", "p", Some("tag")) shouldBe s"r/p/$name:tag"
}
}
+
+ it should "throw an error when configured manifest is a valid JSON, but with a missing key" in {
+ val config_manifest = """{"nodejs":[{"kind":"nodejs:6","default":true,"image":{"name":"nodejs6action"}}]}"""
+ val file = File.createTempFile("cxt", ".txt")
+ file.deleteOnExit()
+
+ val bw = new BufferedWriter(new FileWriter(file))
+ bw.write("runtimes.manifest=" + config_manifest + "\n")
+ bw.close()
+
+ val result = ExecManifest.initialize(new WhiskConfig(Map("runtimes.manifest" -> null), Set(), file), true)
+
+ result should be a 'failure
+
+ the [NoSuchElementException] thrownBy {
+ result.get
+ } should have message ("key not found: runtimes")
+ }
}
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].