You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cs...@apache.org on 2017/08/16 18:08:40 UTC
[incubator-openwhisk] branch master updated: Remove unnecessary
Actor conflation. (#2608)
This is an automated email from the ASF dual-hosted git repository.
csantanapr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 3f4ac08 Remove unnecessary Actor conflation. (#2608)
3f4ac08 is described below
commit 3f4ac08d00cdb2803f0af694cc2dd458488b9a1e
Author: rodric rabbah <ro...@gmail.com>
AuthorDate: Wed Aug 16 14:08:38 2017 -0400
Remove unnecessary Actor conflation. (#2608)
---
.../main/scala/whisk/http/BasicHttpService.scala | 77 +++++++++-------------
.../scala/whisk/core/controller/Controller.scala | 34 +++++-----
.../scala/whisk/core/controller/RestAPIs.scala | 2 +-
.../main/scala/whisk/core/invoker/Invoker.scala | 9 ++-
.../scala/whisk/core/invoker/InvokerServer.scala | 10 +--
5 files changed, 53 insertions(+), 79 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/http/BasicHttpService.scala b/common/scala/src/main/scala/whisk/http/BasicHttpService.scala
index 6ca0fe5..1df098e 100644
--- a/common/scala/src/main/scala/whisk/http/BasicHttpService.scala
+++ b/common/scala/src/main/scala/whisk/http/BasicHttpService.scala
@@ -17,58 +17,45 @@
package whisk.http
-import scala.concurrent.duration.DurationInt
-import scala.language.postfixOps
import scala.collection.immutable.Seq
-import scala.concurrent.Future
+import scala.concurrent.Await
+import scala.concurrent.duration.DurationInt
-import akka.actor.Actor
import akka.actor.ActorSystem
-import akka.actor.Props
import akka.event.Logging
-import akka.japi.Creator
-import akka.util.Timeout
-import akka.http.scaladsl.server.Directives
-import akka.http.scaladsl.server.directives.DebuggingDirectives
-import akka.http.scaladsl.server.directives.LogEntry
-import akka.http.scaladsl.server.Route
-import akka.http.scaladsl.model.HttpRequest
+import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
+import akka.http.scaladsl.model.HttpRequest
+import akka.http.scaladsl.server.Directives
import akka.http.scaladsl.server.RejectionHandler
-import akka.http.scaladsl.server.UnacceptedResponseContentTypeRejection
+import akka.http.scaladsl.server.Route
import akka.http.scaladsl.server.RouteResult.Rejected
-import akka.http.scaladsl.Http
-
+import akka.http.scaladsl.server.UnacceptedResponseContentTypeRejection
+import akka.http.scaladsl.server.directives.DebuggingDirectives
+import akka.http.scaladsl.server.directives.LogEntry
+import akka.stream.ActorMaterializer
import spray.json._
-
import whisk.common.LogMarker
import whisk.common.LogMarkerToken
-import whisk.common.Logging
import whisk.common.LoggingMarkers
import whisk.common.TransactionCounter
import whisk.common.TransactionId
-import akka.stream.ActorMaterializer
/**
* This trait extends the Akka Directives and Actor with logging and transaction counting
* facilities common to all OpenWhisk REST services.
*/
-trait BasicHttpService extends Directives with Actor with TransactionCounter {
- implicit def logging: Logging
- implicit val materializer = ActorMaterializer()
- implicit val actorSystem = context.system
- implicit val executionContext = actorSystem.dispatcher
-
- val port: Int
+trait BasicHttpService extends Directives with TransactionCounter {
/** Rejection handler to terminate connection on a bad request. Delegates to Akka handler. */
- implicit def customRejectionHandler(implicit transid: TransactionId) =
+ implicit def customRejectionHandler(implicit transid: TransactionId) = {
RejectionHandler.default.mapRejectionResponse {
- case res @ HttpResponse(_, _, ent: HttpEntity.Strict, _) =>
- val error = ErrorResponse(ent.data.utf8String, transid).toJson
- res.copy(entity = HttpEntity(ContentTypes.`application/json`, error.compactPrint))
- case x => x
- }
+ case res @ HttpResponse(_, _, ent: HttpEntity.Strict, _) =>
+ val error = ErrorResponse(ent.data.utf8String, transid).toJson
+ res.copy(entity = HttpEntity(ContentTypes.`application/json`, error.compactPrint))
+ case x => x
+ }
+ }
/**
* Gets the routes implemented by the HTTP service.
@@ -115,10 +102,6 @@ trait BasicHttpService extends Directives with Actor with TransactionCounter {
}
}
- def receive = {
- case _ =>
- }
-
/** Assigns transaction id to every request. */
protected val assignId = extract(_ => transid())
@@ -145,19 +128,19 @@ trait BasicHttpService extends Directives with Actor with TransactionCounter {
Some(LogEntry(s"[$tid] [$name] $marker", l))
case _ => None // other kind of responses
}
-
- val bindingFuture = {
- Http().bindAndHandle(route, "0.0.0.0", port)
- }
-
- def shutdown(): Future[Unit] = {
- bindingFuture.flatMap(_.unbind()).map(_ => ())
- }
}
-object BasicHttpService extends Directives {
- def startService[T <: Actor](system: ActorSystem, name: String, interface: String, service: Creator[T]) = {
- val actor = system.actorOf(Props.create(service), s"$name-service")
- implicit val timeout = Timeout(5 seconds)
+object BasicHttpService {
+ /**
+ * Starts an HTTP route handler on given port and registers a shutdown hook.
+ */
+ def startService(route: Route, port: Int)(implicit actorSystem: ActorSystem, materializer: ActorMaterializer): Unit = {
+ implicit val executionContext = actorSystem.dispatcher
+ val httpBinding = Http().bindAndHandle(route, "0.0.0.0", port)
+ sys.addShutdownHook {
+ Await.result(httpBinding.map(_.unbind()), 30.seconds)
+ actorSystem.terminate()
+ Await.result(actorSystem.whenTerminated, 30.seconds)
+ }
}
}
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index 27a63bf..ac99db8 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -19,18 +19,16 @@ package whisk.core.controller
import scala.concurrent.Await
import scala.concurrent.duration.DurationInt
-import scala.util.{Failure, Success}
+import scala.util.{ Failure, Success }
import akka.actor._
import akka.actor.ActorSystem
-import akka.japi.Creator
-import akka.http.scaladsl.server.Route
-import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
-
+import akka.http.scaladsl.model.Uri
+import akka.http.scaladsl.server.Route
+import akka.stream.ActorMaterializer
import spray.json._
import spray.json.DefaultJsonProtocol._
-
import whisk.common.AkkaLogging
import whisk.common.Logging
import whisk.common.LoggingMarkers
@@ -69,9 +67,10 @@ import whisk.http.BasicRasService
*/
class Controller(
override val instance: InstanceId,
- override val port: Int,
runtimes: Runtimes,
implicit val whiskConfig: WhiskConfig,
+ implicit val actorSystem: ActorSystem,
+ implicit val materializer: ActorMaterializer,
implicit val logging: Logging)
extends BasicRasService {
@@ -80,11 +79,11 @@ class Controller(
TransactionId.controller.mark(this, LoggingMarkers.CONTROLLER_STARTUP(instance.toInt), s"starting controller instance ${instance.toInt}")
/**
- * A Route in Akka is technically a function taking a RequestContext as a parameter.
- *
- * The "~" Akka DSL operator composes two independent Routes, building a routing tree structure.
- * @see http://doc.akka.io/docs/akka-http/current/scala/http/routing-dsl/routes.html#composing-routes
- */
+ * A Route in Akka is technically a function taking a RequestContext as a parameter.
+ *
+ * The "~" Akka DSL operator composes two independent Routes, building a routing tree structure.
+ * @see http://doc.akka.io/docs/akka-http/current/scala/http/routing-dsl/routes.html#composing-routes
+ */
override def routes(implicit transid: TransactionId): Route = {
super.routes ~ {
(pathEndOrSingleSlash & get) {
@@ -117,6 +116,8 @@ class Controller(
* @return JSON of invoker health
*/
private val internalInvokerHealth = {
+ implicit val executionContext = actorSystem.dispatcher
+
(path("invokers") & get) {
complete {
loadBalancer.allInvokers.map(_.map {
@@ -158,12 +159,6 @@ object Controller {
"concurrent_actions" -> config.actionInvokeConcurrentLimit.toInt.toJson),
"runtimes" -> runtimes.toJson)
- // akka-style factory to create a Controller object
- private class ServiceBuilder(config: WhiskConfig, instance: InstanceId, logging: Logging, port: Int) extends Creator[Controller] {
- // this method is not reached unless ExecManifest was initialized successfully
- def create = new Controller(instance, port, ExecManifest.runtimesManifest, config, logging)
- }
-
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem("controller-actor-system")
implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
@@ -189,7 +184,8 @@ object Controller {
ExecManifest.initialize(config) match {
case Success(_) =>
- BasicHttpService.startService(actorSystem, "controller", "0.0.0.0", new ServiceBuilder(config, InstanceId(instance), logger, port))
+ val controller = new Controller(InstanceId(instance), ExecManifest.runtimesManifest, config, actorSystem, ActorMaterializer.create(actorSystem), logger)
+ BasicHttpService.startService(controller.route, port)(actorSystem, controller.materializer)
case Failure(t) =>
logger.error(this, s"Invalid runtimes manifest: $t")
diff --git a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
index 1d060d8..5d4fc74 100644
--- a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
@@ -127,6 +127,7 @@ protected[controller] trait RespondWithHeaders extends Directives {
class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
implicit val activeAckTopicIndex: InstanceId,
implicit val actorSystem: ActorSystem,
+ implicit val materializer: ActorMaterializer,
implicit val logging: Logging,
implicit val entityStore: EntityStore,
implicit val entitlementProvider: EntitlementProvider,
@@ -138,7 +139,6 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
with Authenticate
with AuthenticatedRoute
with RespondWithHeaders {
- implicit val materializer = ActorMaterializer()
implicit val executionContext = actorSystem.dispatcher
implicit val authStore = WhiskAuthStore.datastore(config)
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index b336c27..9fd6bba 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -22,7 +22,7 @@ import scala.concurrent.duration._
import scala.util.Failure
import akka.actor.ActorSystem
-import akka.japi.Creator
+import akka.stream.ActorMaterializer
import whisk.common.AkkaLogging
import whisk.common.Scheduler
import whisk.core.WhiskConfig
@@ -98,9 +98,8 @@ object Invoker {
})
val port = config.servicePort.toInt
-
- BasicHttpService.startService(actorSystem, "invoker", "0.0.0.0", new Creator[InvokerServer] {
- def create = new InvokerServer(invokerInstance, invokerInstance.toInt, port)
- })
+ BasicHttpService.startService(
+ new InvokerServer(invokerInstance, invokerInstance.toInt).route, port)(
+ actorSystem, ActorMaterializer.create(actorSystem))
}
}
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerServer.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerServer.scala
index 56e28ad..4fd7888 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerServer.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerServer.scala
@@ -17,20 +17,16 @@
package whisk.core.invoker
-import whisk.http.BasicRasService
-import whisk.core.WhiskConfig
-import whisk.common.Logging
import whisk.core.entity.InstanceId
+import whisk.http.BasicRasService
+
/**
* Implements web server to handle certain REST API calls.
* Currently provides a health ping route, only.
*/
class InvokerServer(
override val instance: InstanceId,
- override val numberOfInstances: Int,
- override val port: Int)(
- override implicit val logging: Logging,
- implicit val whiskConfig: WhiskConfig)
+ override val numberOfInstances: Int)
extends BasicRasService {
}
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].