You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cs...@apache.org on 2017/08/16 18:08:40 UTC

[incubator-openwhisk] branch master updated: Remove unnecessary Actor conflation. (#2608)

This is an automated email from the ASF dual-hosted git repository.

csantanapr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f4ac08  Remove unnecessary Actor conflation. (#2608)
3f4ac08 is described below

commit 3f4ac08d00cdb2803f0af694cc2dd458488b9a1e
Author: rodric rabbah <ro...@gmail.com>
AuthorDate: Wed Aug 16 14:08:38 2017 -0400

    Remove unnecessary Actor conflation. (#2608)
---
 .../main/scala/whisk/http/BasicHttpService.scala   | 77 +++++++++-------------
 .../scala/whisk/core/controller/Controller.scala   | 34 +++++-----
 .../scala/whisk/core/controller/RestAPIs.scala     |  2 +-
 .../main/scala/whisk/core/invoker/Invoker.scala    |  9 ++-
 .../scala/whisk/core/invoker/InvokerServer.scala   | 10 +--
 5 files changed, 53 insertions(+), 79 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/http/BasicHttpService.scala b/common/scala/src/main/scala/whisk/http/BasicHttpService.scala
index 6ca0fe5..1df098e 100644
--- a/common/scala/src/main/scala/whisk/http/BasicHttpService.scala
+++ b/common/scala/src/main/scala/whisk/http/BasicHttpService.scala
@@ -17,58 +17,45 @@
 
 package whisk.http
 
-import scala.concurrent.duration.DurationInt
-import scala.language.postfixOps
 import scala.collection.immutable.Seq
-import scala.concurrent.Future
+import scala.concurrent.Await
+import scala.concurrent.duration.DurationInt
 
-import akka.actor.Actor
 import akka.actor.ActorSystem
-import akka.actor.Props
 import akka.event.Logging
-import akka.japi.Creator
-import akka.util.Timeout
-import akka.http.scaladsl.server.Directives
-import akka.http.scaladsl.server.directives.DebuggingDirectives
-import akka.http.scaladsl.server.directives.LogEntry
-import akka.http.scaladsl.server.Route
-import akka.http.scaladsl.model.HttpRequest
+import akka.http.scaladsl.Http
 import akka.http.scaladsl.model._
+import akka.http.scaladsl.model.HttpRequest
+import akka.http.scaladsl.server.Directives
 import akka.http.scaladsl.server.RejectionHandler
-import akka.http.scaladsl.server.UnacceptedResponseContentTypeRejection
+import akka.http.scaladsl.server.Route
 import akka.http.scaladsl.server.RouteResult.Rejected
-import akka.http.scaladsl.Http
-
+import akka.http.scaladsl.server.UnacceptedResponseContentTypeRejection
+import akka.http.scaladsl.server.directives.DebuggingDirectives
+import akka.http.scaladsl.server.directives.LogEntry
+import akka.stream.ActorMaterializer
 import spray.json._
-
 import whisk.common.LogMarker
 import whisk.common.LogMarkerToken
-import whisk.common.Logging
 import whisk.common.LoggingMarkers
 import whisk.common.TransactionCounter
 import whisk.common.TransactionId
-import akka.stream.ActorMaterializer
 
 /**
  * This trait extends the Akka Directives and Actor with logging and transaction counting
  * facilities common to all OpenWhisk REST services.
  */
-trait BasicHttpService extends Directives with Actor with TransactionCounter {
-    implicit def logging: Logging
-    implicit val materializer = ActorMaterializer()
-    implicit val actorSystem = context.system
-    implicit val executionContext = actorSystem.dispatcher
-
-    val port: Int
+trait BasicHttpService extends Directives with TransactionCounter {
 
     /** Rejection handler to terminate connection on a bad request. Delegates to Akka handler. */
-    implicit def customRejectionHandler(implicit transid: TransactionId) =
+    implicit def customRejectionHandler(implicit transid: TransactionId) = {
         RejectionHandler.default.mapRejectionResponse {
-                case res @ HttpResponse(_, _, ent: HttpEntity.Strict, _) =>
-                    val error = ErrorResponse(ent.data.utf8String, transid).toJson
-                    res.copy(entity = HttpEntity(ContentTypes.`application/json`, error.compactPrint))
-                case x => x
-            }
+            case res @ HttpResponse(_, _, ent: HttpEntity.Strict, _) =>
+                val error = ErrorResponse(ent.data.utf8String, transid).toJson
+                res.copy(entity = HttpEntity(ContentTypes.`application/json`, error.compactPrint))
+            case x => x
+        }
+    }
 
     /**
      * Gets the routes implemented by the HTTP service.
@@ -115,10 +102,6 @@ trait BasicHttpService extends Directives with Actor with TransactionCounter {
         }
     }
 
-    def receive = {
-        case _ =>
-    }
-
     /** Assigns transaction id to every request. */
     protected val assignId = extract(_ => transid())
 
@@ -145,19 +128,19 @@ trait BasicHttpService extends Directives with Actor with TransactionCounter {
             Some(LogEntry(s"[$tid] [$name] $marker", l))
         case _ => None // other kind of responses
     }
-
-    val bindingFuture = {
-        Http().bindAndHandle(route, "0.0.0.0", port)
-    }
-
-    def shutdown(): Future[Unit] = {
-        bindingFuture.flatMap(_.unbind()).map(_ => ())
-    }
 }
 
-object BasicHttpService extends Directives {
-    def startService[T <: Actor](system: ActorSystem, name: String, interface: String, service: Creator[T]) = {
-        val actor = system.actorOf(Props.create(service), s"$name-service")
-        implicit val timeout = Timeout(5 seconds)
+object BasicHttpService {
+    /**
+     * Starts an HTTP route handler on given port and registers a shutdown hook.
+     */
+    def startService(route: Route, port: Int)(implicit actorSystem: ActorSystem, materializer: ActorMaterializer): Unit = {
+        implicit val executionContext = actorSystem.dispatcher
+        val httpBinding = Http().bindAndHandle(route, "0.0.0.0", port)
+        sys.addShutdownHook {
+            Await.result(httpBinding.map(_.unbind()), 30.seconds)
+            actorSystem.terminate()
+            Await.result(actorSystem.whenTerminated, 30.seconds)
+        }
     }
 }
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index 27a63bf..ac99db8 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -19,18 +19,16 @@ package whisk.core.controller
 
 import scala.concurrent.Await
 import scala.concurrent.duration.DurationInt
-import scala.util.{Failure, Success}
+import scala.util.{ Failure, Success }
 
 import akka.actor._
 import akka.actor.ActorSystem
-import akka.japi.Creator
-import akka.http.scaladsl.server.Route
-import akka.http.scaladsl.model.Uri
 import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
-
+import akka.http.scaladsl.model.Uri
+import akka.http.scaladsl.server.Route
+import akka.stream.ActorMaterializer
 import spray.json._
 import spray.json.DefaultJsonProtocol._
-
 import whisk.common.AkkaLogging
 import whisk.common.Logging
 import whisk.common.LoggingMarkers
@@ -69,9 +67,10 @@ import whisk.http.BasicRasService
  */
 class Controller(
     override val instance: InstanceId,
-    override val port: Int,
     runtimes: Runtimes,
     implicit val whiskConfig: WhiskConfig,
+    implicit val actorSystem: ActorSystem,
+    implicit val materializer: ActorMaterializer,
     implicit val logging: Logging)
     extends BasicRasService {
 
@@ -80,11 +79,11 @@ class Controller(
     TransactionId.controller.mark(this, LoggingMarkers.CONTROLLER_STARTUP(instance.toInt), s"starting controller instance ${instance.toInt}")
 
     /**
-      * A Route in Akka is technically a function taking a RequestContext as a parameter.
-      *
-      * The "~" Akka DSL operator composes two independent Routes, building a routing tree structure.
-      * @see http://doc.akka.io/docs/akka-http/current/scala/http/routing-dsl/routes.html#composing-routes
-      */
+     * A Route in Akka is technically a function taking a RequestContext as a parameter.
+     *
+     * The "~" Akka DSL operator composes two independent Routes, building a routing tree structure.
+     * @see http://doc.akka.io/docs/akka-http/current/scala/http/routing-dsl/routes.html#composing-routes
+     */
     override def routes(implicit transid: TransactionId): Route = {
         super.routes ~ {
             (pathEndOrSingleSlash & get) {
@@ -117,6 +116,8 @@ class Controller(
      * @return JSON of invoker health
      */
     private val internalInvokerHealth = {
+        implicit val executionContext = actorSystem.dispatcher
+
         (path("invokers") & get) {
             complete {
                 loadBalancer.allInvokers.map(_.map {
@@ -158,12 +159,6 @@ object Controller {
             "concurrent_actions" -> config.actionInvokeConcurrentLimit.toInt.toJson),
         "runtimes" -> runtimes.toJson)
 
-    // akka-style factory to create a Controller object
-    private class ServiceBuilder(config: WhiskConfig, instance: InstanceId, logging: Logging, port: Int) extends Creator[Controller] {
-        // this method is not reached unless ExecManifest was initialized successfully
-        def create = new Controller(instance, port, ExecManifest.runtimesManifest, config, logging)
-    }
-
     def main(args: Array[String]): Unit = {
         implicit val actorSystem = ActorSystem("controller-actor-system")
         implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
@@ -189,7 +184,8 @@ object Controller {
 
         ExecManifest.initialize(config) match {
             case Success(_) =>
-                BasicHttpService.startService(actorSystem, "controller", "0.0.0.0", new ServiceBuilder(config, InstanceId(instance), logger, port))
+                val controller = new Controller(InstanceId(instance), ExecManifest.runtimesManifest, config, actorSystem, ActorMaterializer.create(actorSystem), logger)
+                BasicHttpService.startService(controller.route, port)(actorSystem, controller.materializer)
 
             case Failure(t) =>
                 logger.error(this, s"Invalid runtimes manifest: $t")
diff --git a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
index 1d060d8..5d4fc74 100644
--- a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
@@ -127,6 +127,7 @@ protected[controller] trait RespondWithHeaders extends Directives {
 class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
     implicit val activeAckTopicIndex: InstanceId,
     implicit val actorSystem: ActorSystem,
+    implicit val materializer: ActorMaterializer,
     implicit val logging: Logging,
     implicit val entityStore: EntityStore,
     implicit val entitlementProvider: EntitlementProvider,
@@ -138,7 +139,6 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
     with Authenticate
     with AuthenticatedRoute
     with RespondWithHeaders {
-    implicit val materializer = ActorMaterializer()
     implicit val executionContext = actorSystem.dispatcher
     implicit val authStore = WhiskAuthStore.datastore(config)
 
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index b336c27..9fd6bba 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -22,7 +22,7 @@ import scala.concurrent.duration._
 import scala.util.Failure
 
 import akka.actor.ActorSystem
-import akka.japi.Creator
+import akka.stream.ActorMaterializer
 import whisk.common.AkkaLogging
 import whisk.common.Scheduler
 import whisk.core.WhiskConfig
@@ -98,9 +98,8 @@ object Invoker {
         })
 
         val port = config.servicePort.toInt
-
-        BasicHttpService.startService(actorSystem, "invoker", "0.0.0.0", new Creator[InvokerServer] {
-            def create = new InvokerServer(invokerInstance, invokerInstance.toInt, port)
-        })
+        BasicHttpService.startService(
+            new InvokerServer(invokerInstance, invokerInstance.toInt).route, port)(
+                actorSystem, ActorMaterializer.create(actorSystem))
     }
 }
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerServer.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerServer.scala
index 56e28ad..4fd7888 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerServer.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerServer.scala
@@ -17,20 +17,16 @@
 
 package whisk.core.invoker
 
-import whisk.http.BasicRasService
-import whisk.core.WhiskConfig
-import whisk.common.Logging
 import whisk.core.entity.InstanceId
 
+import whisk.http.BasicRasService
+
 /**
  * Implements web server to handle certain REST API calls.
  * Currently provides a health ping route, only.
  */
 class InvokerServer(
     override val instance: InstanceId,
-    override val numberOfInstances: Int,
-    override val port: Int)(
-        override implicit val logging: Logging,
-        implicit val whiskConfig: WhiskConfig)
+    override val numberOfInstances: Int)
     extends BasicRasService {
 }

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].