You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by ta...@apache.org on 2018/10/13 06:19:31 UTC

[predictionio] branch develop updated: [PIO-31] Move from spray to akka-http (#474)

This is an automated email from the ASF dual-hosted git repository.

takezoe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/predictionio.git


The following commit(s) were added to refs/heads/develop by this push:
     new f762bee  [PIO-31] Move from spray to akka-http (#474)
f762bee is described below

commit f762bee0d10155d2425bfdae0dbe3ce663f30da4
Author: Naoki Takezoe <ta...@gmail.com>
AuthorDate: Sat Oct 13 15:19:26 2018 +0900

    [PIO-31] Move from spray to akka-http (#474)
---
 LICENSE.txt                                        |   5 +-
 common/build.sbt                                   |  10 +-
 common/src/main/resources/application.conf         |   7 -
 .../akkahttpjson4s/Json4sSupport.scala             | 103 +++
 .../authentication/KeyAuthentication.scala         |  16 +-
 .../configuration/SSLConfiguration.scala           |  22 +-
 .../predictionio/workflow/CreateServer.scala       | 401 +++++-----
 data/build.sbt                                     |   5 +-
 .../org/apache/predictionio/data/api/Common.scala  |  68 +-
 .../apache/predictionio/data/api/EventServer.scala | 867 +++++++++------------
 .../org/apache/predictionio/data/api/Stats.scala   |   6 +-
 .../apache/predictionio/data/api/StatsActor.scala  |   3 +-
 .../apache/predictionio/data/api/Webhooks.scala    |   5 +-
 .../predictionio/data/api/EventServiceSpec.scala   |  66 +-
 .../predictionio/data/api/SegmentIOAuthSpec.scala  | 127 +--
 docs/manual/source/datacollection/eventapi.html.md |   4 +-
 .../source/datacollection/eventmodel.html.md.erb   |   4 +-
 docs/manual/source/deploy/monitoring.html.md       |   2 +-
 docs/manual/source/index.html.md.erb               |   2 +-
 tools/build.sbt                                    |  10 +-
 .../apache/predictionio/tools/admin/AdminAPI.scala | 144 ++--
 .../org/apache/predictionio/tools/admin/README.md  |   4 +-
 .../predictionio/tools/commands/Management.scala   |   4 +-
 .../apache/predictionio/tools/console/Pio.scala    |  19 +-
 .../predictionio/tools/dashboard/CorsSupport.scala |  77 +-
 .../predictionio/tools/dashboard/Dashboard.scala   | 160 ++--
 .../predictionio/tools/admin/AdminAPISpec.scala    |  62 +-
 27 files changed, 1001 insertions(+), 1202 deletions(-)

diff --git a/LICENSE.txt b/LICENSE.txt
index 5b1d72d..c83c231 100644
--- a/LICENSE.txt
+++ b/LICENSE.txt
@@ -409,7 +409,6 @@ Binary distribution bundles
   com.sun.jersey # jersey-server # 1.9 (https://github.com/jersey/jersey-1.x)
   javax.xml.bind # jaxb-api # 2.2.2
   com.sun.xml.bind # jaxb-impl # 2.2.3-1
-  org.jvnet.mimepull # mimepull # 1.9.5 (https://github.com/kohsuke/mimepull)
 
   which are available under the CDDL v1.1 license (https://glassfish.java.net/public/CDDL+GPL_1_1.html)
   
@@ -1708,9 +1707,8 @@ Binary distribution bundles
   org.scala-lang # scala-reflect # 2.11.12 (http://scala-lang.org/)
   org.scala-lang # scalap # 2.11.12 (http://scala-lang.org/)
   org.scala-lang.modules # scala-java8-compat_2.11 # 0.7.0 (http://scala-lang.org/)
-  org.scala-lang.modules # scala-parser-combinators_2.11 # 1.0.4 (http://scala-lang.org/)
   org.scala-lang.modules # scala-parser-combinators_2.11 # 1.0.6 (http://scala-lang.org/)
-  org.scala-lang.modules # scala-xml_2.11 # 1.0.3 (http://scala-lang.org/)
+  org.scala-lang.modules # scala-parser-combinators_2.11 # 1.1.0 (http://scala-lang.org/)
   org.scala-lang.modules # scala-xml_2.11 # 1.0.5 (http://scala-lang.org/)
   org.scala-lang.modules # scala-xml_2.11 # 1.0.6 (http://scala-lang.org/)
   
@@ -1782,3 +1780,4 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 The following libraries are from the public domain.
 
   org.tukaani # xz # 1.0 (http://tukaani.org/xz/java.html)
+  org.reactivestreams # reactive-streams # 1.0.2 (http://www.reactive-streams.org/)
diff --git a/common/build.sbt b/common/build.sbt
index 19e4f04..f9fd97b 100644
--- a/common/build.sbt
+++ b/common/build.sbt
@@ -20,9 +20,11 @@ import PIOBuild._
 name := "apache-predictionio-common"
 
 libraryDependencies ++= Seq(
-  "io.spray"          %% "spray-can"     % "1.3.3",
-  "io.spray"          %% "spray-routing" % "1.3.3",
-  "com.typesafe.akka" %% "akka-actor"    % akkaVersion.value,
-  "com.typesafe.akka" %% "akka-slf4j"    % akkaVersion.value)
+  "com.typesafe.akka" %% "akka-actor"           % akkaVersion.value,
+  "com.typesafe.akka" %% "akka-slf4j"           % akkaVersion.value,
+  "com.typesafe.akka" %% "akka-http"            % "10.1.5",
+  "org.json4s"        %% "json4s-native"        % json4sVersion.value,
+  "com.typesafe.akka" %% "akka-stream"          % "2.5.12"
+)
 
 pomExtra := childrenPomExtra.value
diff --git a/common/src/main/resources/application.conf b/common/src/main/resources/application.conf
index c47d909..f0e6c8a 100644
--- a/common/src/main/resources/application.conf
+++ b/common/src/main/resources/application.conf
@@ -3,10 +3,3 @@ akka {
   loggers = ["akka.event.slf4j.Slf4jLogger"]
   loglevel = "INFO"
 }
-
-spray.can {
-  server {
-    verbose-error-messages = "on"
-    request-timeout = 35s
-  }
-}
diff --git a/common/src/main/scala/org/apache/predictionio/akkahttpjson4s/Json4sSupport.scala b/common/src/main/scala/org/apache/predictionio/akkahttpjson4s/Json4sSupport.scala
new file mode 100644
index 0000000..62cb8de
--- /dev/null
+++ b/common/src/main/scala/org/apache/predictionio/akkahttpjson4s/Json4sSupport.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.akkahttpjson4s
+
+// Referenced from https://github.com/hseeberger/akka-http-json
+// because of the difference of supported json4s version.
+import java.lang.reflect.InvocationTargetException
+
+import akka.http.scaladsl.marshalling.{ Marshaller, ToEntityMarshaller }
+import akka.http.scaladsl.model.ContentTypeRange
+import akka.http.scaladsl.model.MediaType
+import akka.http.scaladsl.model.MediaTypes.`application/json`
+import akka.http.scaladsl.unmarshalling.{ FromEntityUnmarshaller, Unmarshaller }
+import akka.util.ByteString
+import org.json4s.{ Formats, MappingException, Serialization }
+import scala.collection.immutable.Seq
+
+/**
+  * Automatic to and from JSON marshalling/unmarshalling using an in-scope *Json4s* protocol.
+  *
+  * Pretty printing is enabled if an implicit [[Json4sSupport.ShouldWritePretty.True]] is in scope.
+  */
+object Json4sSupport extends Json4sSupport {
+
+  sealed abstract class ShouldWritePretty
+
+  final object ShouldWritePretty {
+    final object True  extends ShouldWritePretty
+    final object False extends ShouldWritePretty
+  }
+}
+
+/**
+  * Automatic to and from JSON marshalling/unmarshalling using an in-scope *Json4s* protocol.
+  *
+  * Pretty printing is enabled if an implicit [[Json4sSupport.ShouldWritePretty.True]] is in scope.
+  */
+trait Json4sSupport {
+  import Json4sSupport._
+
+  def unmarshallerContentTypes: Seq[ContentTypeRange] =
+    mediaTypes.map(ContentTypeRange.apply)
+
+  def mediaTypes: Seq[MediaType.WithFixedCharset] =
+    List(`application/json`)
+
+  private val jsonStringUnmarshaller =
+    Unmarshaller.byteStringUnmarshaller
+      .forContentTypes(unmarshallerContentTypes: _*)
+      .mapWithCharset {
+        case (ByteString.empty, _) => throw Unmarshaller.NoContentException
+        case (data, charset)       => data.decodeString(charset.nioCharset.name)
+      }
+
+  private val jsonStringMarshaller =
+    Marshaller.oneOf(mediaTypes: _*)(Marshaller.stringMarshaller)
+
+  /**
+    * HTTP entity => `A`
+    *
+    * @tparam A type to decode
+    * @return unmarshaller for `A`
+    */
+  implicit def unmarshaller[A: Manifest](implicit serialization: Serialization,
+    formats: Formats): FromEntityUnmarshaller[A] =
+    jsonStringUnmarshaller
+      .map(s => serialization.read(s))
+      .recover { _ => _ =>
+      { case MappingException(_, ite: InvocationTargetException) => throw ite.getCause }
+      }
+
+  /**
+    * `A` => HTTP entity
+    *
+    * @tparam A type to encode, must be upper bounded by `AnyRef`
+    * @return marshaller for any `A` value
+    */
+  implicit def marshaller[A <: AnyRef](implicit serialization: Serialization,
+    formats: Formats,
+    shouldWritePretty: ShouldWritePretty =
+    ShouldWritePretty.False): ToEntityMarshaller[A] =
+    shouldWritePretty match {
+      case ShouldWritePretty.False =>
+        jsonStringMarshaller.compose(serialization.write[A])
+      case ShouldWritePretty.True =>
+        jsonStringMarshaller.compose(serialization.writePretty[A])
+    }
+}
diff --git a/common/src/main/scala/org/apache/predictionio/authentication/KeyAuthentication.scala b/common/src/main/scala/org/apache/predictionio/authentication/KeyAuthentication.scala
index fa950aa..08ae09a 100644
--- a/common/src/main/scala/org/apache/predictionio/authentication/KeyAuthentication.scala
+++ b/common/src/main/scala/org/apache/predictionio/authentication/KeyAuthentication.scala
@@ -23,11 +23,10 @@ package org.apache.predictionio.authentication
   * It is highly recommended to implement a stonger authentication mechanism
   */
 
+import akka.http.scaladsl.model.HttpRequest
+import akka.http.scaladsl.model.headers.HttpChallenge
+import akka.http.scaladsl.server.{AuthenticationFailedRejection, Rejection, RequestContext}
 import com.typesafe.config.ConfigFactory
-import spray.http.HttpRequest
-import spray.routing.authentication._
-import spray.routing.{AuthenticationFailedRejection, RequestContext}
-
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.Future
 
@@ -42,19 +41,18 @@ trait KeyAuthentication {
     val param = "accessKey"
   }
 
-  def withAccessKeyFromFile: RequestContext => Future[Authentication[HttpRequest]] = {
+  def withAccessKeyFromFile: RequestContext => Future[Either[Rejection, HttpRequest]] = {
     ctx: RequestContext =>
-      val accessKeyParamOpt = ctx.request.uri.query.get(ServerKey.param)
+      val accessKeyParamOpt = ctx.request.uri.query().get(ServerKey.param)
       Future {
-
         val passedKey = accessKeyParamOpt.getOrElse {
           Left(AuthenticationFailedRejection(
-            AuthenticationFailedRejection.CredentialsRejected, Nil))
+            AuthenticationFailedRejection.CredentialsRejected, HttpChallenge("", None)))
         }
 
         if (!ServerKey.authEnforced || passedKey.equals(ServerKey.get)) Right(ctx.request)
         else Left(AuthenticationFailedRejection(
-          AuthenticationFailedRejection.CredentialsRejected, Nil))
+          AuthenticationFailedRejection.CredentialsRejected, HttpChallenge("", None)))
 
       }
   }
diff --git a/common/src/main/scala/org/apache/predictionio/configuration/SSLConfiguration.scala b/common/src/main/scala/org/apache/predictionio/configuration/SSLConfiguration.scala
index 9292e21..7880b13 100644
--- a/common/src/main/scala/org/apache/predictionio/configuration/SSLConfiguration.scala
+++ b/common/src/main/scala/org/apache/predictionio/configuration/SSLConfiguration.scala
@@ -15,19 +15,13 @@
  * limitations under the License.
  */
 
-
 package org.apache.predictionio.configuration
 
-/**
-  * Created by ykhodorkovsky on 2/26/16.
-  */
-
 import java.io.FileInputStream
 import java.security.KeyStore
-import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
 
 import com.typesafe.config.ConfigFactory
-import spray.io.ServerSSLEngineProvider
+import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
 
 trait SSLConfiguration {
 
@@ -39,7 +33,6 @@ trait SSLConfiguration {
   private val keyAlias = serverConfig.getString("org.apache.predictionio.server.ssl-key-alias")
 
   private val keyStore = {
-
     // Loading keystore from specified file
     val clientStore = KeyStore.getInstance("JKS")
     val inputStream = new FileInputStream(
@@ -50,7 +43,7 @@ trait SSLConfiguration {
   }
 
   // Creating SSL context
-  implicit def sslContext: SSLContext = {
+  def sslContext: SSLContext = {
     val context = SSLContext.getInstance("TLS")
     val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
     val kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
@@ -60,15 +53,4 @@ trait SSLConfiguration {
     context
   }
 
-  // provide implicit SSLEngine with some protocols
-  implicit def sslEngineProvider: ServerSSLEngineProvider = {
-    ServerSSLEngineProvider { engine =>
-      engine.setEnabledCipherSuites(Array(
-        "TLS_RSA_WITH_AES_256_CBC_SHA",
-        "TLS_ECDH_ECDSA_WITH_RC4_128_SHA",
-        "TLS_RSA_WITH_AES_128_CBC_SHA"))
-      engine.setEnabledProtocols(Array("TLSv1", "TLSv1.2", "TLSv1.1"))
-      engine
-    }
-  }
 }
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala b/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala
index 2447682..5642114 100644
--- a/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala
+++ b/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala
@@ -21,11 +21,7 @@ package org.apache.predictionio.workflow
 import java.io.Serializable
 import java.util.concurrent.TimeUnit
 
-import akka.actor._
 import akka.event.Logging
-import akka.io.IO
-import akka.pattern.ask
-import akka.util.Timeout
 import com.github.nscala_time.time.Imports.DateTime
 import com.twitter.bijection.Injection
 import com.twitter.chill.{KryoBase, KryoInjection, ScalaKryoInstantiator}
@@ -34,7 +30,6 @@ import de.javakaffee.kryoserializers.SynchronizedCollectionsSerializer
 import grizzled.slf4j.Logging
 import org.apache.commons.lang3.exception.ExceptionUtils
 import org.apache.predictionio.authentication.KeyAuthentication
-import org.apache.predictionio.configuration.SSLConfiguration
 import org.apache.predictionio.controller.{Engine, Params, Utils, WithPrId}
 import org.apache.predictionio.core.{BaseAlgorithm, BaseServing, Doer}
 import org.apache.predictionio.data.storage.{EngineInstance, Storage}
@@ -42,15 +37,23 @@ import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption
 import org.json4s._
 import org.json4s.native.JsonMethods._
 import org.json4s.native.Serialization.write
-import spray.can.Http
-import spray.can.server.ServerSettings
-import spray.http.MediaTypes._
-import spray.http._
-import spray.httpx.Json4sSupport
-import spray.routing._
+import akka.actor._
+import akka.http.scaladsl.{ConnectionContext, Http, HttpsConnectionContext}
+import akka.http.scaladsl.Http.ServerBinding
+import akka.http.scaladsl.model.ContentTypes._
+import akka.http.scaladsl.model.{HttpEntity, HttpResponse, StatusCodes}
+import akka.http.scaladsl.server.Directives.complete
+import akka.http.scaladsl.server.directives._
+import akka.http.scaladsl.server._
+import akka.pattern.ask
+import akka.util.Timeout
+import akka.http.scaladsl.server.Directives._
+import akka.stream.ActorMaterializer
+import org.apache.predictionio.akkahttpjson4s.Json4sSupport._
+import org.apache.predictionio.configuration.SSLConfiguration
 
 import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.Future
+import scala.concurrent.{Await, Future}
 import scala.concurrent.duration._
 import scala.language.existentials
 import scala.util.{Failure, Random, Success}
@@ -177,18 +180,21 @@ object CreateServer extends Logging {
         "master")
         implicit val timeout = Timeout(5.seconds)
         master ? StartServer()
-        actorSystem.awaitTermination
+
+        val f = actorSystem.whenTerminated
+        Await.ready(f, Duration.Inf)
+
       } getOrElse {
         error(s"Invalid engine instance ID. Aborting server.")
       }
     }
   }
 
-  def createServerActorWithEngine[TD, EIN, PD, Q, P, A](
+  def createPredictionServerWithEngine[TD, EIN, PD, Q, P, A](
     sc: ServerConfig,
     engineInstance: EngineInstance,
     engine: Engine[TD, EIN, PD, Q, P, A],
-    engineLanguage: EngineLanguage.Value): ActorRef = {
+    engineLanguage: EngineLanguage.Value): PredictionServer[Q, P] = {
 
     val engineParams = engine.engineInstanceToEngineParams(
       engineInstance, sc.jsonExtractor)
@@ -228,36 +234,50 @@ object CreateServer extends Logging {
     val serving = Doer(engine.servingClassMap(servingParamsWithName._1),
       servingParamsWithName._2)
 
-    actorSystem.actorOf(
-      Props(
-        classOf[ServerActor[Q, P]],
-        sc,
-        engineInstance,
-        engine,
-        engineLanguage,
-        engineParams.dataSourceParams._2,
-        engineParams.preparatorParams._2,
-        algorithms,
-        engineParams.algorithmParamsList.map(_._2),
-        models,
-        serving,
-        engineParams.servingParams._2))
+    new PredictionServer(
+      sc,
+      engineInstance,
+      engine,
+      engineLanguage,
+      engineParams.dataSourceParams._2,
+      engineParams.preparatorParams._2,
+      algorithms,
+      engineParams.algorithmParamsList.map(_._2),
+      models,
+      serving,
+      engineParams.servingParams._2,
+      actorSystem)
   }
 }
 
+
+object EngineServerJson4sSupport {
+  implicit val serialization = org.json4s.jackson.Serialization
+  implicit def json4sFormats: Formats = DefaultFormats
+}
+
 class MasterActor (
     sc: ServerConfig,
     engineInstance: EngineInstance,
-    engineFactoryName: String) extends Actor with SSLConfiguration with KeyAuthentication {
+    engineFactoryName: String) extends Actor with KeyAuthentication with SSLConfiguration {
+
   val log = Logging(context.system, this)
+
   implicit val system = context.system
-  var sprayHttpListener: Option[ActorRef] = None
-  var currentServerActor: Option[ActorRef] = None
+  implicit val materializer = ActorMaterializer()
+
+  var currentServerBinding: Option[Future[ServerBinding]] = None
   var retry = 3
   val serverConfig = ConfigFactory.load("server.conf")
   val sslEnforced = serverConfig.getBoolean("org.apache.predictionio.server.ssl-enforced")
   val protocol = if (sslEnforced) "https://" else "http://"
 
+  val https: Option[HttpsConnectionContext] = if(sslEnforced){
+    val https = ConnectionContext.https(sslContext)
+    Http().setDefaultServerHttpContext(https)
+    Some(https)
+  } else None
+
   def undeploy(ip: String, port: Int): Unit = {
     val serverUrl = s"${protocol}${ip}:${port}"
     log.info(
@@ -287,81 +307,74 @@ class MasterActor (
 
   def receive: Actor.Receive = {
     case x: StartServer =>
-      val actor = createServerActor(
-        sc,
-        engineInstance,
-        engineFactoryName)
-      currentServerActor = Some(actor)
       undeploy(sc.ip, sc.port)
       self ! BindServer()
     case x: BindServer =>
-      currentServerActor map { actor =>
-        val settings = ServerSettings(system)
-        IO(Http) ! Http.Bind(
-          actor,
-          interface = sc.ip,
-          port = sc.port,
-          settings = Some(settings.copy(sslEncryption = sslEnforced)))
-      } getOrElse {
-        log.error("Cannot bind a non-existing server backend.")
+      currentServerBinding match {
+        case Some(_) =>
+          log.error("Cannot bind a non-existing server backend.")
+        case None =>
+          val server = createServer(sc, engineInstance, engineFactoryName)
+          val route = server.createRoute()
+          val binding = https match {
+            case Some(https) =>
+              Http().bindAndHandle(route, sc.ip, sc.port, connectionContext = https)
+            case None =>
+              Http().bindAndHandle(route, sc.ip, sc.port)
+          }
+          currentServerBinding = Some(binding)
+
+          val serverUrl = s"${protocol}${sc.ip}:${sc.port}"
+          log.info(s"Engine is deployed and running. Engine API is live at ${serverUrl}.")
       }
     case x: StopServer =>
       log.info(s"Stop server command received.")
-      sprayHttpListener.map { l =>
-        log.info("Server is shutting down.")
-        l ! Http.Unbind(5.seconds)
-        system.shutdown()
-      } getOrElse {
-        log.warning("No active server is running.")
+      currentServerBinding match {
+        case Some(f) =>
+          f.flatMap { binding =>
+            binding.unbind()
+          }.foreach { _ =>
+            system.terminate()
+          }
+        case None =>
+          log.warning("No active server is running.")
       }
     case x: ReloadServer =>
       log.info("Reload server command received.")
-      val latestEngineInstance =
-        CreateServer.engineInstances.getLatestCompleted(
-          engineInstance.engineId,
-          engineInstance.engineVersion,
-          engineInstance.engineVariant)
-      latestEngineInstance map { lr =>
-        val actor = createServerActor(sc, lr, engineFactoryName)
-        sprayHttpListener.map { l =>
-          l ! Http.Unbind(5.seconds)
-          val settings = ServerSettings(system)
-          IO(Http) ! Http.Bind(
-            actor,
-            interface = sc.ip,
-            port = sc.port,
-            settings = Some(settings.copy(sslEncryption = sslEnforced)))
-          currentServerActor.get ! Kill
-          currentServerActor = Some(actor)
-        } getOrElse {
-          log.warning("No active server is running. Abort reloading.")
-        }
-      } getOrElse {
-        log.warning(
-          s"No latest completed engine instance for ${engineInstance.engineId} " +
-          s"${engineInstance.engineVersion}. Abort reloading.")
-      }
-    case x: Http.Bound =>
-      val serverUrl = s"${protocol}${sc.ip}:${sc.port}"
-      log.info(s"Engine is deployed and running. Engine API is live at ${serverUrl}.")
-      sprayHttpListener = Some(sender)
-    case x: Http.CommandFailed =>
-      if (retry > 0) {
-        retry -= 1
-        log.error(s"Bind failed. Retrying... ($retry more trial(s))")
-        context.system.scheduler.scheduleOnce(1.seconds) {
-          self ! BindServer()
+        currentServerBinding match {
+          case Some(f) =>
+            f.flatMap { binding =>
+              binding.unbind()
+            }
+            val latestEngineInstance =
+              CreateServer.engineInstances.getLatestCompleted(
+                engineInstance.engineId,
+                engineInstance.engineVersion,
+                engineInstance.engineVariant)
+            latestEngineInstance map { lr =>
+              val server = createServer(sc, lr, engineFactoryName)
+              val route = server.createRoute()
+              val binding = https match {
+                case Some(https) =>
+                  Http().bindAndHandle(route, sc.ip, sc.port, connectionContext = https)
+                case None =>
+                  Http().bindAndHandle(route, sc.ip, sc.port)
+              }
+              currentServerBinding = Some(binding)
+            } getOrElse {
+              log.warning(
+                s"No latest completed engine instance for ${engineInstance.engineId} " +
+                  s"${engineInstance.engineVersion}. Abort reloading.")
+            }
+          case None =>
+            log.warning("No active server is running. Abort reloading.")
         }
-      } else {
-        log.error("Bind failed. Shutting down.")
-        system.shutdown()
-      }
   }
 
-  def createServerActor(
+  def createServer(
       sc: ServerConfig,
       engineInstance: EngineInstance,
-      engineFactoryName: String): ActorRef = {
+      engineFactoryName: String): PredictionServer[_, _] = {
     val (engineLanguage, engineFactory) =
       WorkflowUtils.getEngine(engineFactoryName, getClass.getClassLoader)
     val engine = engineFactory()
@@ -373,7 +386,7 @@ class MasterActor (
 
     val deployableEngine = engine.asInstanceOf[Engine[_,_,_,_,_,_]]
 
-    CreateServer.createServerActorWithEngine(
+    CreateServer.createPredictionServerWithEngine(
       sc,
       engineInstance,
       // engine,
@@ -382,7 +395,7 @@ class MasterActor (
   }
 }
 
-class ServerActor[Q, P](
+class PredictionServer[Q, P](
     val args: ServerConfig,
     val engineInstance: EngineInstance,
     val engine: Engine[_, _, _, Q, P, _],
@@ -393,23 +406,22 @@ class ServerActor[Q, P](
     val algorithmsParams: Seq[Params],
     val models: Seq[Any],
     val serving: BaseServing[Q, P],
-    val servingParams: Params) extends Actor with HttpService with KeyAuthentication {
+    val servingParams: Params,
+    val system: ActorSystem) extends KeyAuthentication {
+
+  val log = Logging(system, getClass)
   val serverStartTime = DateTime.now
-  val log = Logging(context.system, this)
 
   var requestCount: Int = 0
   var avgServingSec: Double = 0.0
   var lastServingSec: Double = 0.0
 
-  /** The following is required by HttpService */
-  def actorRefFactory: ActorContext = context
-
   implicit val timeout = Timeout(5, TimeUnit.SECONDS)
+
   val pluginsActorRef =
-    context.actorOf(Props(classOf[PluginsActor], args.engineVariant), "PluginsActor")
-  val pluginContext = EngineServerPluginContext(log, args.engineVariant)
+    system.actorOf(Props(classOf[PluginsActor], args.engineVariant), "PluginsActor")
 
-  def receive: Actor.Receive = runRoute(myRoute)
+  val pluginContext = EngineServerPluginContext(log, args.engineVariant)
 
   val feedbackEnabled = if (args.feedback) {
     if (args.accessKey.isEmpty) {
@@ -433,37 +445,44 @@ class ServerActor[Q, P](
     }
   }
 
-  val myRoute =
-    path("") {
-      get {
-        respondWithMediaType(`text/html`) {
-          detach() {
-            complete {
-              html.index(
-                args,
-                engineInstance,
-                algorithms.map(_.toString),
-                algorithmsParams.map(_.toString),
-                models.map(_.toString),
-                dataSourceParams.toString,
-                preparatorParams.toString,
-                servingParams.toString,
-                serverStartTime,
-                feedbackEnabled,
-                args.eventServerIp,
-                args.eventServerPort,
-                requestCount,
-                avgServingSec,
-                lastServingSec
-              ).toString
-            }
-          }
-        }
+  def authenticate[T](authenticator: RequestContext => Future[Either[Rejection, T]]):
+      AuthenticationDirective[T] = {
+    extractRequestContext.flatMap { requestContext =>
+      onSuccess(authenticator(requestContext)).flatMap {
+        case Right(x) => provide(x)
+        case Left(x)  => reject(x): Directive1[T]
       }
-    } ~
-    path("queries.json") {
-      post {
-        detach() {
+    }
+  }
+
+  def createRoute(): Route = {
+    val myRoute =
+      path("") {
+        get {
+          complete(HttpResponse(entity = HttpEntity(
+            `text/html(UTF-8)`,
+            html.index(
+              args,
+              engineInstance,
+              algorithms.map(_.toString),
+              algorithmsParams.map(_.toString),
+              models.map(_.toString),
+              dataSourceParams.toString,
+              preparatorParams.toString,
+              servingParams.toString,
+              serverStartTime,
+              feedbackEnabled,
+              args.eventServerIp,
+              args.eventServerPort,
+              requestCount,
+              avgServingSec,
+              lastServingSec
+            ).toString
+          )))
+        }
+      } ~
+      path("queries.json") {
+        post {
           entity(as[String]) { queryString =>
             try {
               val servingStartTime = DateTime.now
@@ -584,9 +603,8 @@ class ServerActor[Q, P](
                 (requestCount + 1)
               requestCount += 1
 
-              respondWithMediaType(`application/json`) {
-                complete(compact(render(pluginResult)))
-              }
+              complete(compact(render(pluginResult)))
+
             } catch {
               case e: MappingException =>
                 val msg = s"Query:\n$queryString\n\nStack Trace:\n" +
@@ -613,83 +631,76 @@ class ServerActor[Q, P](
             }
           }
         }
-      }
-    } ~
-    path("reload") {
-      authenticate(withAccessKeyFromFile) { request =>
-        post {
-          complete {
-            context.actorSelection("/user/master") ! ReloadServer()
-            "Reloading..."
+      } ~
+      path("reload") {
+        authenticate(withAccessKeyFromFile) { request =>
+          post {
+            system.actorSelection("/user/master") ! ReloadServer()
+            complete("Reloading...")
           }
         }
-      }
-    } ~
-    path("stop") {
-      authenticate(withAccessKeyFromFile) { request =>
-        post {
-          complete {
-            context.system.scheduler.scheduleOnce(1.seconds) {
-              context.actorSelection("/user/master") ! StopServer()
+      } ~
+      path("stop") {
+        authenticate(withAccessKeyFromFile) { request =>
+          post {
+            system.scheduler.scheduleOnce(1.seconds) {
+              system.actorSelection("/user/master") ! StopServer()
             }
-            "Shutting down..."
+            complete("Shutting down...")
           }
         }
-      }
-    } ~
-    pathPrefix("assets") {
-      getFromResourceDirectory("assets")
-    } ~
-    path("plugins.json") {
-      import EngineServerJson4sSupport._
-      get {
-        respondWithMediaType(MediaTypes.`application/json`) {
-          complete {
+      } ~
+      pathPrefix("assets") {
+        getFromResourceDirectory("assets")
+      } ~
+      path("plugins.json") {
+        import EngineServerJson4sSupport._
+        get {
+          complete(
             Map("plugins" -> Map(
               "outputblockers" -> pluginContext.outputBlockers.map { case (n, p) =>
                 n -> Map(
-                  "name" -> p.pluginName,
+                  "name"        -> p.pluginName,
                   "description" -> p.pluginDescription,
-                  "class" -> p.getClass.getName,
-                  "params" -> pluginContext.pluginParams(p.pluginName))
+                  "class"       -> p.getClass.getName,
+                  "params"      -> pluginContext.pluginParams(p.pluginName))
               },
               "outputsniffers" -> pluginContext.outputSniffers.map { case (n, p) =>
                 n -> Map(
-                  "name" -> p.pluginName,
+                  "name"        -> p.pluginName,
                   "description" -> p.pluginDescription,
-                  "class" -> p.getClass.getName,
-                  "params" -> pluginContext.pluginParams(p.pluginName))
+                  "class"       -> p.getClass.getName,
+                  "params"      -> pluginContext.pluginParams(p.pluginName))
               }
             ))
-          }
+          )
         }
-      }
-    } ~
-    path("plugins" / Segments) { segments =>
-      import EngineServerJson4sSupport._
-      get {
-        respondWithMediaType(MediaTypes.`application/json`) {
-          complete {
-            val pluginArgs = segments.drop(2)
-            val pluginType = segments(0)
-            val pluginName = segments(1)
-            pluginType match {
-              case EngineServerPlugin.outputBlocker =>
-                pluginContext.outputBlockers(pluginName).handleREST(
-                  pluginArgs)
-              case EngineServerPlugin.outputSniffer =>
-                pluginsActorRef ? PluginsActor.HandleREST(
-                  pluginName = pluginName,
-                  pluginArgs = pluginArgs) map {
-                  _.asInstanceOf[String]
-                }
-            }
+      } ~
+      path("plugins" / Segments) { segments =>
+        import EngineServerJson4sSupport._
+        get {
+          val pluginArgs = segments.drop(2)
+          val pluginType = segments(0)
+          val pluginName = segments(1)
+          pluginType match {
+            case EngineServerPlugin.outputBlocker =>
+              complete(HttpResponse(entity = HttpEntity(
+                  `application/json`,
+                  pluginContext.outputBlockers(pluginName).handleREST(pluginArgs))))
+
+            case EngineServerPlugin.outputSniffer =>
+              complete(pluginsActorRef ? PluginsActor.HandleREST(
+                pluginName = pluginName,
+                pluginArgs = pluginArgs) map { json =>
+                HttpResponse(entity = HttpEntity(
+                  `application/json`,
+                  json.asInstanceOf[String]
+                ))
+              })
           }
         }
       }
-    }
-}
 
-object EngineServerJson4sSupport extends Json4sSupport {
-  implicit def json4sFormats: Formats = DefaultFormats
+    myRoute
+  }
 }
diff --git a/data/build.sbt b/data/build.sbt
index 87b0d96..23dff1d 100644
--- a/data/build.sbt
+++ b/data/build.sbt
@@ -22,12 +22,9 @@ name := "apache-predictionio-data"
 libraryDependencies ++= Seq(
   "com.github.nscala-time" %% "nscala-time"    % "2.6.0",
   "com.google.guava"        % "guava"          % "14.0.1",
-  "io.spray"               %% "spray-can"      % "1.3.3",
-  "io.spray"               %% "spray-routing"  % "1.3.3",
-  "io.spray"               %% "spray-testkit"  % "1.3.3" % "test",
+  "com.typesafe.akka"      %% "akka-http-testkit" % "10.1.5" % "test",
   "org.apache.spark"       %% "spark-sql"      % sparkVersion.value % "provided",
   "org.clapper"            %% "grizzled-slf4j" % "1.0.2",
-  "org.json4s"             %% "json4s-native"  % json4sVersion.value,
   "org.scalatest"          %% "scalatest"      % "2.1.7" % "test",
   "org.specs2"             %% "specs2"         % "3.3.1" % "test"
     exclude("org.scalaz.stream", s"scalaz-stream_${scalaBinaryVersion.value}"),
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/Common.scala b/data/src/main/scala/org/apache/predictionio/data/api/Common.scala
index 60efea2..02355ce 100644
--- a/data/src/main/scala/org/apache/predictionio/data/api/Common.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/api/Common.scala
@@ -15,68 +15,58 @@
  * limitations under the License.
  */
 
-
 package org.apache.predictionio.data.api
 
-import org.apache.predictionio.data.webhooks.ConnectorException
+import akka.http.scaladsl.server._
 import org.apache.predictionio.data.storage.StorageException
-
-import spray.routing._
-import spray.routing.Directives._
-import spray.routing.Rejection
-import spray.http.StatusCodes
-import spray.httpx.Json4sSupport
-
-import org.json4s.Formats
-import org.json4s.DefaultFormats
+import org.apache.predictionio.data.webhooks.ConnectorException
+import org.json4s.{DefaultFormats, Formats}
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.server.Directives._
+import org.apache.predictionio.akkahttpjson4s.Json4sSupport._
 
 object Common {
 
-  object Json4sProtocol extends Json4sSupport {
+  object Json4sProtocol {
+    implicit val serialization = org.json4s.native.Serialization
     implicit def json4sFormats: Formats = DefaultFormats
   }
 
   import Json4sProtocol._
 
-  val rejectionHandler = RejectionHandler {
-    case MalformedRequestContentRejection(msg, _) :: _ =>
+  val exceptionHandler = ExceptionHandler {
+    case e: ConnectorException => {
+      complete(StatusCodes.BadRequest, Map("message" -> s"${e.getMessage()}"))
+    }
+    case e: StorageException => {
+      complete(StatusCodes.InternalServerError, Map("message" -> s"${e.getMessage()}"))
+    }
+    case e: Exception => {
+      complete(StatusCodes.InternalServerError, Map("message" -> s"${e.getMessage()}"))
+    }
+  }
+
+  val rejectionHandler = RejectionHandler.newBuilder().handle {
+    case MalformedRequestContentRejection(msg, _) =>
       complete(StatusCodes.BadRequest, Map("message" -> msg))
-    case MissingQueryParamRejection(msg) :: _ =>
+
+    case MissingQueryParamRejection(msg) =>
       complete(StatusCodes.NotFound,
         Map("message" -> s"missing required query parameter ${msg}."))
-    case AuthenticationFailedRejection(cause, challengeHeaders) :: _ => {
+
+    case AuthenticationFailedRejection(cause, challengeHeaders) => {
       val msg = cause match {
         case AuthenticationFailedRejection.CredentialsRejected =>
           "Invalid accessKey."
         case AuthenticationFailedRejection.CredentialsMissing =>
           "Missing accessKey."
       }
-      complete(StatusCodes.Unauthorized, challengeHeaders, Map("message" -> msg))
-    }
-    case ChannelRejection(msg) :: _ =>
       complete(StatusCodes.Unauthorized, Map("message" -> msg))
-    case NonExistentAppRejection(msg) :: _ =>
-      complete(StatusCodes.Unauthorized, Map("message" -> msg))
-  }
-
-  val exceptionHandler = ExceptionHandler {
-    case e: ConnectorException => {
-      val msg = s"${e.getMessage()}"
-      complete(StatusCodes.BadRequest, Map("message" -> msg))
     }
-    case e: StorageException => {
-      val msg = s"${e.getMessage()}"
-      complete(StatusCodes.InternalServerError, Map("message" -> msg))
-    }
-    case e: Exception => {
-      val msg = s"${e.getMessage()}"
-      complete(StatusCodes.InternalServerError, Map("message" -> msg))
-    }
-  }
+    case ChannelRejection(msg) =>
+      complete(StatusCodes.Unauthorized, Map("message" -> msg))
+  }.result()
 }
 
 /** invalid channel */
 case class ChannelRejection(msg: String) extends Rejection
-
-/** the app doesn't exist */
-case class NonExistentAppRejection(msg: String) extends Rejection
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala b/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
index 41dfefb..96ff4d0 100644
--- a/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
@@ -18,646 +18,543 @@
 
 package org.apache.predictionio.data.api
 
-import akka.event.Logging
+import akka.event.{Logging, LoggingAdapter}
 import sun.misc.BASE64Decoder
-
 import java.util.concurrent.TimeUnit
 
 import akka.actor._
-import akka.io.IO
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.model.{FormData, HttpEntity, HttpResponse, StatusCodes}
+import akka.http.scaladsl.model.ContentTypes._
+import akka.http.scaladsl.model.headers.HttpChallenge
+import akka.http.scaladsl.server.Directives.complete
+import akka.http.scaladsl.server.directives._
+import akka.http.scaladsl.server._
 import akka.pattern.ask
 import akka.util.Timeout
-import org.apache.predictionio.data.Utils
-import org.apache.predictionio.data.storage.AccessKeys
-import org.apache.predictionio.data.storage.Channels
-import org.apache.predictionio.data.storage.DateTimeJson4sSupport
-import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.storage.EventJson4sSupport
-import org.apache.predictionio.data.storage.BatchEventsJson4sSupport
-import org.apache.predictionio.data.storage.LEvents
-import org.apache.predictionio.data.storage.Storage
-import org.json4s.DefaultFormats
-import org.json4s.Formats
-import org.json4s.JObject
-import org.json4s.native.JsonMethods.parse
-import spray.can.Http
-import spray.http.FormData
-import spray.http.MediaTypes
-import spray.http.StatusCodes
-import spray.httpx.Json4sSupport
-import spray.routing._
-import spray.routing.authentication.Authentication
-
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Try, Success, Failure}
-
-class  EventServiceActor(
-    val eventClient: LEvents,
-    val accessKeysClient: AccessKeys,
-    val channelsClient: Channels,
-    val config: EventServerConfig) extends HttpServiceActor {
-
-  object Json4sProtocol extends Json4sSupport {
-    implicit def json4sFormats: Formats = DefaultFormats +
-      new EventJson4sSupport.APISerializer +
-      new BatchEventsJson4sSupport.APISerializer +
-      // NOTE: don't use Json4s JodaTimeSerializers since it has issues,
-      // some format not converted, or timezone not correct
-      new DateTimeJson4sSupport.Serializer
-  }
-
-
-  val MaxNumberOfEventsPerBatchRequest = 50
-
-  val logger = Logging(context.system, this)
-
-  // we use the enclosing ActorContext's or ActorSystem's dispatcher for our
-  // Futures
-  implicit def executionContext: ExecutionContext = context.dispatcher
-
-  implicit val timeout = Timeout(5, TimeUnit.SECONDS)
-
-  val rejectionHandler = Common.rejectionHandler
+import akka.http.scaladsl.server.Directives._
+import akka.stream.ActorMaterializer
+import org.apache.predictionio.data.storage._
+import org.apache.predictionio.akkahttpjson4s.Json4sSupport._
+import org.json4s.{DefaultFormats, Formats, JObject}
+
+import scala.concurrent._
+import scala.concurrent.duration.Duration
+import scala.util.{Failure, Success, Try}
+
+object Json4sProtocol {
+  implicit val serialization = org.json4s.native.Serialization
+  implicit def json4sFormats: Formats = DefaultFormats +
+    new EventJson4sSupport.APISerializer +
+    new BatchEventsJson4sSupport.APISerializer +
+    // NOTE: don't use Json4s JodaTimeSerializers since it has issues,
+    // some format not converted, or timezone not correct
+    new DateTimeJson4sSupport.Serializer
+}
 
-  val jsonPath = """(.+)\.json$""".r
-  val formPath = """(.+)\.form$""".r
+case class EventServerConfig(
+  ip: String = "localhost",
+  port: Int = 7070,
+  plugins: String = "plugins",
+  stats: Boolean = false)
 
-  val pluginContext = EventServerPluginContext(logger)
+object EventServer {
+  import Json4sProtocol._
+  import FutureDirectives._
+  import Common._
 
+  private val MaxNumberOfEventsPerBatchRequest = 50
   private lazy val base64Decoder = new BASE64Decoder
+  private implicit val timeout = Timeout(5, TimeUnit.SECONDS)
+  private case class AuthData(appId: Int, channelId: Option[Int], events: Seq[String])
 
-  case class AuthData(appId: Int, channelId: Option[Int], events: Seq[String])
-
-  /* with accessKey in query/header, return appId if succeed */
-  def withAccessKey: RequestContext => Future[Authentication[AuthData]] = {
-    ctx: RequestContext =>
-      val accessKeyParamOpt = ctx.request.uri.query.get("accessKey")
-      val channelParamOpt = ctx.request.uri.query.get("channel")
-      Future {
-        // with accessKey in query, return appId if succeed
-        accessKeyParamOpt.map { accessKeyParam =>
-          accessKeysClient.get(accessKeyParam).map { k =>
-            channelParamOpt.map { ch =>
-              val channelMap =
-                channelsClient.getByAppid(k.appid)
-                .map(c => (c.name, c.id)).toMap
-              if (channelMap.contains(ch)) {
-                Right(AuthData(k.appid, Some(channelMap(ch)), k.events))
-              } else {
-                Left(ChannelRejection(s"Invalid channel '$ch'."))
-              }
-            }.getOrElse{
-              Right(AuthData(k.appid, None, k.events))
-            }
-          }.getOrElse(FailedAuth)
-        }.getOrElse {
-          // with accessKey in header, return appId if succeed
-          ctx.request.headers.find(_.name == "Authorization").map { authHeader =>
-            authHeader.value.split("Basic ") match {
-              case Array(_, value) =>
-                val appAccessKey =
-                  new String(base64Decoder.decodeBuffer(value)).trim.split(":")(0)
-                accessKeysClient.get(appAccessKey) match {
-                  case Some(k) => Right(AuthData(k.appid, None, k.events))
-                  case None => FailedAuth
-                }
-
-              case _ => FailedAuth
-            }
-          }.getOrElse(MissedAuth)
-        }
-      }
-  }
-
-  private val FailedAuth = Left(
+  private def FailedAuth[T]: Either[Rejection, T] = Left(
     AuthenticationFailedRejection(
-      AuthenticationFailedRejection.CredentialsRejected, Nil
+      AuthenticationFailedRejection.CredentialsRejected, HttpChallenge("eventserver", None)
     )
   )
 
-  private val MissedAuth = Left(
+  private def MissedAuth[T]: Either[Rejection, T] = Left(
     AuthenticationFailedRejection(
-      AuthenticationFailedRejection.CredentialsMissing, Nil
+      AuthenticationFailedRejection.CredentialsMissing, HttpChallenge("eventserver", None)
     )
   )
 
-  lazy val statsActorRef = actorRefFactory.actorSelection("/user/StatsActor")
-  lazy val pluginsActorRef = actorRefFactory.actorSelection("/user/PluginsActor")
+  def createRoute(eventClient: LEvents,
+                  accessKeysClient: AccessKeys,
+                  channelsClient: Channels,
+                  logger: LoggingAdapter,
+                  statsActorRef: ActorSelection,
+                  pluginsActorRef: ActorSelection,
+                  config: EventServerConfig)(implicit executionContext: ExecutionContext): Route = {
+
+    /* with accessKey in query/header, return appId if succeed */
+    def withAccessKey: RequestContext => Future[Either[Rejection, AuthData]] = {
+      ctx: RequestContext =>
+        val accessKeyParamOpt = ctx.request.uri.query().get("accessKey")
+        val channelParamOpt = ctx.request.uri.query().get("channel")
+        Future {
+          // with accessKey in query, return appId if succeed
+          accessKeyParamOpt.map { accessKeyParam =>
+            accessKeysClient.get(accessKeyParam).map { k =>
+              channelParamOpt.map { ch =>
+                val channelMap =
+                  channelsClient.getByAppid(k.appid)
+                    .map(c => (c.name, c.id)).toMap
+                if (channelMap.contains(ch)) {
+                  Right(AuthData(k.appid, Some(channelMap(ch)), k.events))
+                } else {
+                  Left(ChannelRejection(s"Invalid channel '$ch'."))
+                }
+              }.getOrElse{
+                Right(AuthData(k.appid, None, k.events))
+              }
+            }.getOrElse(FailedAuth)
+          }.getOrElse {
+            // with accessKey in header, return appId if succeed
+            ctx.request.headers.find(_.name == "Authorization").map { authHeader =>
+              authHeader.value.split("Basic ") match {
+                case Array(_, value) =>
+                  val appAccessKey =
+                    new String(base64Decoder.decodeBuffer(value)).trim.split(":")(0)
+                  accessKeysClient.get(appAccessKey) match {
+                    case Some(k) => Right(AuthData(k.appid, None, k.events))
+                    case None => FailedAuth
+                  }
 
-  val route: Route =
-    pathSingleSlash {
-      import Json4sProtocol._
+                case _ => FailedAuth
+              }
+            }.getOrElse(MissedAuth)
+          }
+        }
+    }
 
-      get {
-        respondWithMediaType(MediaTypes.`application/json`) {
-          complete(Map("status" -> "alive"))
+    def authenticate[T](authenticator: RequestContext => Future[Either[Rejection, T]]):
+        AuthenticationDirective[T] = {
+      handleRejections(rejectionHandler).tflatMap { _ =>
+        extractRequestContext.flatMap { requestContext =>
+          onSuccess(authenticator(requestContext)).flatMap {
+            case Right(x) => provide(x)
+            case Left(x)  => reject(x): Directive1[T]
+          }
         }
       }
-    } ~
-    path("plugins.json") {
-      import Json4sProtocol._
-      get {
-        respondWithMediaType(MediaTypes.`application/json`) {
-          complete {
+    }
+
+    val pluginContext = EventServerPluginContext(logger)
+    val jsonPath = """(.+)\.json$""".r
+    val formPath = """(.+)\.form$""".r
+
+    val route: Route =
+      pathSingleSlash {
+        get {
+          complete(Map("status" -> "alive"))
+        }
+      } ~
+      path("plugins.json") {
+        get {
+          complete(
             Map("plugins" -> Map(
               "inputblockers" -> pluginContext.inputBlockers.map { case (n, p) =>
                 n -> Map(
-                  "name" -> p.pluginName,
+                  "name"        -> p.pluginName,
                   "description" -> p.pluginDescription,
-                  "class" -> p.getClass.getName)
+                  "class"       -> p.getClass.getName)
               },
               "inputsniffers" -> pluginContext.inputSniffers.map { case (n, p) =>
                 n -> Map(
-                  "name" -> p.pluginName,
+                  "name"        -> p.pluginName,
                   "description" -> p.pluginDescription,
-                  "class" -> p.getClass.getName)
+                  "class"       -> p.getClass.getName)
               }
             ))
-          }
+          )
         }
-      }
-    } ~
-    path("plugins" / Segments) { segments =>
-      get {
-        handleExceptions(Common.exceptionHandler) {
-          authenticate(withAccessKey) { authData =>
-            respondWithMediaType(MediaTypes.`application/json`) {
-              complete {
-                val pluginArgs = segments.drop(2)
-                val pluginType = segments(0)
-                val pluginName = segments(1)
-                pluginType match {
-                  case EventServerPlugin.inputBlocker =>
+      } ~
+      path("plugins" / Segments) { segments =>
+        get {
+          handleExceptions(exceptionHandler) {
+            authenticate(withAccessKey) { authData =>
+              val pluginArgs = segments.drop(2)
+              val pluginType = segments(0)
+              val pluginName = segments(1)
+              pluginType match {
+                case EventServerPlugin.inputBlocker =>
+                  complete(HttpResponse(entity = HttpEntity(
+                    `application/json`,
                     pluginContext.inputBlockers(pluginName).handleREST(
                       authData.appId,
                       authData.channelId,
                       pluginArgs)
-                  case EventServerPlugin.inputSniffer =>
-                    pluginsActorRef ? PluginsActor.HandleREST(
-                      appId = authData.appId,
-                      channelId = authData.channelId,
-                      pluginName = pluginName,
-                      pluginArgs = pluginArgs) map {
-                      _.asInstanceOf[String]
-                    }
-                }
+                  )))
+
+                case EventServerPlugin.inputSniffer =>
+                  complete(pluginsActorRef ? PluginsActor.HandleREST(
+                    appId = authData.appId,
+                    channelId = authData.channelId,
+                    pluginName = pluginName,
+                    pluginArgs = pluginArgs) map { json =>
+                      HttpResponse(entity = HttpEntity(
+                        `application/json`,
+                        json.asInstanceOf[String]
+                      ))
+                    })
               }
             }
           }
         }
-      }
-    } ~
-    path("events" / jsonPath ) { eventId =>
-
-      import Json4sProtocol._
-
-      get {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
+      } ~
+      path("events" / jsonPath ) { eventId =>
+        get {
+          handleExceptions(exceptionHandler) {
             authenticate(withAccessKey) { authData =>
               val appId = authData.appId
               val channelId = authData.channelId
-              respondWithMediaType(MediaTypes.`application/json`) {
-                complete {
-                  logger.debug(s"GET event ${eventId}.")
-                  val data = eventClient.futureGet(eventId, appId, channelId).map { eventOpt =>
-                    eventOpt.map( event =>
-                      (StatusCodes.OK, event)
-                    ).getOrElse(
-                      (StatusCodes.NotFound, Map("message" -> "Not Found"))
-                    )
-                  }
-                  data
-                }
+              logger.debug(s"GET event ${eventId}.")
+              onSuccess(eventClient.futureGet(eventId, appId, channelId)){ eventOpt =>
+                  eventOpt.map { event =>
+                    complete(StatusCodes.OK, event)
+                  }.getOrElse(
+                    complete(StatusCodes.NotFound, Map("message" -> "Not Found"))
+                  )
               }
             }
           }
-        }
-      } ~
-      delete {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
+        } ~
+        delete {
+          handleExceptions(exceptionHandler) {
             authenticate(withAccessKey) { authData =>
               val appId = authData.appId
               val channelId = authData.channelId
-              respondWithMediaType(MediaTypes.`application/json`) {
-                complete {
-                  logger.debug(s"DELETE event ${eventId}.")
-                  val data = eventClient.futureDelete(eventId, appId, channelId).map { found =>
-                    if (found) {
-                      (StatusCodes.OK, Map("message" -> "Found"))
-                    } else {
-                      (StatusCodes.NotFound, Map("message" -> "Not Found"))
-                    }
-                  }
-                  data
+              logger.debug(s"DELETE event ${eventId}.")
+              onSuccess(eventClient.futureDelete(eventId, appId, channelId)){ found =>
+                if (found) {
+                  complete(StatusCodes.OK, Map("message" -> "Found"))
+                } else {
+                  complete(StatusCodes.NotFound, Map("message" -> "Not Found"))
                 }
               }
             }
           }
         }
-      }
-    } ~
-    path("events.json") {
-
-      import Json4sProtocol._
-
-      post {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
+      } ~
+      path("events.json") {
+        post {
+          handleExceptions(exceptionHandler) {
             authenticate(withAccessKey) { authData =>
               val appId = authData.appId
               val channelId = authData.channelId
               val events = authData.events
               entity(as[Event]) { event =>
-                complete {
-                  if (events.isEmpty || authData.events.contains(event.event)) {
-                    pluginContext.inputBlockers.values.foreach(
-                      _.process(EventInfo(
-                        appId = appId,
-                        channelId = channelId,
-                        event = event), pluginContext))
-                    val data = eventClient.futureInsert(event, appId, channelId).map { id =>
-                      pluginsActorRef ! EventInfo(
-                        appId = appId,
-                        channelId = channelId,
-                        event = event)
-                      val result = (StatusCodes.Created, Map("eventId" -> s"${id}"))
-                      if (config.stats) {
-                        statsActorRef ! Bookkeeping(appId, result._1, event)
-                      }
-                      result
+                if (events.isEmpty || authData.events.contains(event.event)) {
+                  pluginContext.inputBlockers.values.foreach(
+                    _.process(EventInfo(
+                      appId = appId,
+                      channelId = channelId,
+                      event = event), pluginContext))
+                  onSuccess(eventClient.futureInsert(event, appId, channelId)){ id =>
+                    pluginsActorRef ! EventInfo(
+                      appId = appId,
+                      channelId = channelId,
+                      event = event)
+                    val result = (StatusCodes.Created, Map("eventId" -> s"${id}"))
+                    if (config.stats) {
+                      statsActorRef ! Bookkeeping(appId, result._1, event)
                     }
-                    data
-                  } else {
-                    (StatusCodes.Forbidden,
-                      Map("message" -> s"${event.event} events are not allowed"))
+                    complete(result)
                   }
+                } else {
+                  complete(StatusCodes.Forbidden,
+                    Map("message" -> s"${event.event} events are not allowed"))
                 }
               }
             }
           }
-        }
-      } ~
-      get {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
+        } ~
+        get {
+          handleExceptions(exceptionHandler) {
             authenticate(withAccessKey) { authData =>
               val appId = authData.appId
               val channelId = authData.channelId
               parameters(
-                'startTime.as[Option[String]],
-                'untilTime.as[Option[String]],
-                'entityType.as[Option[String]],
-                'entityId.as[Option[String]],
-                'event.as[Option[String]],
-                'targetEntityType.as[Option[String]],
-                'targetEntityId.as[Option[String]],
-                'limit.as[Option[Int]],
-                'reversed.as[Option[Boolean]]) {
+                'startTime.?,
+                'untilTime.?,
+                'entityType.?,
+                'entityId.?,
+                'event.?,
+                'targetEntityType.?,
+                'targetEntityId.?,
+                'limit.as[Int].?,
+                'reversed.as[Boolean].?) {
                 (startTimeStr, untilTimeStr, entityType, entityId,
-                  eventName,  // only support one event name
-                  targetEntityType, targetEntityId,
-                  limit, reversed) =>
-                respondWithMediaType(MediaTypes.`application/json`) {
-                  complete {
-                    logger.debug(
-                      s"GET events of appId=${appId} " +
-                      s"st=${startTimeStr} ut=${untilTimeStr} " +
-                      s"et=${entityType} eid=${entityId} " +
-                      s"li=${limit} rev=${reversed} ")
-
-                    require(!((reversed == Some(true))
-                      && (entityType.isEmpty || entityId.isEmpty)),
-                      "the parameter reversed can only be used with" +
+                eventName,  // only support one event name
+                targetEntityType, targetEntityId,
+                limit, reversed) =>
+                  logger.debug(
+                    s"GET events of appId=${appId} " +
+                    s"st=${startTimeStr} ut=${untilTimeStr} " +
+                    s"et=${entityType} eid=${entityId} " +
+                    s"li=${limit} rev=${reversed} ")
+
+                  require(!((reversed == Some(true))
+                    && (entityType.isEmpty || entityId.isEmpty)),
+                    "the parameter reversed can only be used with" +
                       " both entityType and entityId specified.")
 
-                    val parseTime = Future {
-                      val startTime = startTimeStr.map(Utils.stringToDateTime(_))
-                      val untilTime = untilTimeStr.map(Utils.stringToDateTime(_))
-                      (startTime, untilTime)
-                    }
+                  val parseTime = Future {
+                    val startTime = startTimeStr.map(Utils.stringToDateTime(_))
+                    val untilTime = untilTimeStr.map(Utils.stringToDateTime(_))
+                    (startTime, untilTime)
+                  }
 
 
-                    parseTime.flatMap { case (startTime, untilTime) =>
-                      val data = eventClient.futureFind(
-                        appId = appId,
-                        channelId = channelId,
-                        startTime = startTime,
-                        untilTime = untilTime,
-                        entityType = entityType,
-                        entityId = entityId,
-                        eventNames = eventName.map(List(_)),
-                        targetEntityType = targetEntityType.map(Some(_)),
-                        targetEntityId = targetEntityId.map(Some(_)),
-                        limit = limit.orElse(Some(20)),
-                        reversed = reversed)
-                        .map { eventIter =>
-                          if (eventIter.hasNext) {
-                            (StatusCodes.OK, eventIter.toArray)
-                          } else {
-                            (StatusCodes.NotFound,
-                              Map("message" -> "Not Found"))
-                          }
+                  val f = parseTime.flatMap { case (startTime, untilTime) =>
+                    val data = eventClient.futureFind(
+                      appId = appId,
+                      channelId = channelId,
+                      startTime = startTime,
+                      untilTime = untilTime,
+                      entityType = entityType,
+                      entityId = entityId,
+                      eventNames = eventName.map(List(_)),
+                      targetEntityType = targetEntityType.map(Some(_)),
+                      targetEntityId = targetEntityId.map(Some(_)),
+                      limit = limit.orElse(Some(20)),
+                      reversed = reversed)
+                      .map { eventIter =>
+                        if (eventIter.hasNext) {
+                          (StatusCodes.OK, eventIter.toArray)
+                        } else {
+                          (StatusCodes.NotFound, Map("message" -> "Not Found"))
                         }
-                      data
-                    }.recover {
-                      case e: Exception =>
-                        (StatusCodes.BadRequest, Map("message" -> s"${e}"))
-                    }
+                      }
+                    data
                   }
+
+                  onSuccess(f){ (status, body) => complete(status, body) }
                 }
-              }
             }
           }
         }
-      }
-    } ~
-    path("batch" / "events.json") {
-
-      import Json4sProtocol._
-
-      post {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
+      } ~
+      path("batch" / "events.json") {
+        post {
+          handleExceptions(exceptionHandler) {
             authenticate(withAccessKey) { authData =>
               val appId = authData.appId
               val channelId = authData.channelId
               val allowedEvents = authData.events
 
               entity(as[Seq[Try[Event]]]) { events =>
-                complete {
-                  if (events.length <= MaxNumberOfEventsPerBatchRequest) {
-                    val eventWithIndex = events.zipWithIndex
-
-                    val taggedEvents = eventWithIndex.collect { case (Success(event), i) =>
-                      if(allowedEvents.isEmpty || allowedEvents.contains(event.event)){
-                        (Right(event), i)
-                      } else {
-                        (Left(event), i)
-                      }
-                    }
+                if (events.length <= MaxNumberOfEventsPerBatchRequest) {
+                  val eventWithIndex = events.zipWithIndex
 
-                    val insertEvents = taggedEvents.collect { case (Right(event), i) =>
-                      (event, i)
+                  val taggedEvents = eventWithIndex.collect { case (Success(event), i) =>
+                    if(allowedEvents.isEmpty || allowedEvents.contains(event.event)){
+                      (Right(event), i)
+                    } else {
+                      (Left(event), i)
                     }
+                  }
 
-                    insertEvents.foreach { case (event, i) =>
-                      pluginContext.inputBlockers.values.foreach(
-                        _.process(EventInfo(
-                          appId = appId,
-                          channelId = channelId,
-                          event = event), pluginContext))
-                    }
+                  val insertEvents = taggedEvents.collect { case (Right(event), i) =>
+                    (event, i)
+                  }
 
-                    val f: Future[Seq[Map[String, Any]]] = eventClient.futureInsertBatch(
-                      insertEvents.map(_._1), appId, channelId).map { insertResults =>
-                      val results = insertResults.zip(insertEvents).map { case (id, (event, i)) =>
-                        pluginsActorRef ! EventInfo(
-                          appId = appId,
-                          channelId = channelId,
-                          event = event)
-                        val status = StatusCodes.Created
-                        if (config.stats) {
-                          statsActorRef ! Bookkeeping(appId, status, event)
-                        }
+                  insertEvents.foreach { case (event, i) =>
+                    pluginContext.inputBlockers.values.foreach(
+                      _.process(EventInfo(
+                        appId = appId,
+                        channelId = channelId,
+                        event = event), pluginContext))
+                  }
+
+                  val f: Future[Seq[Map[String, Any]]] = eventClient.futureInsertBatch(
+                    insertEvents.map(_._1), appId, channelId).map { insertResults =>
+                    val results = insertResults.zip(insertEvents).map { case (id, (event, i)) =>
+                      pluginsActorRef ! EventInfo(
+                        appId = appId,
+                        channelId = channelId,
+                        event = event)
+                      val status = StatusCodes.Created
+                      if (config.stats) {
+                        statsActorRef ! Bookkeeping(appId, status, event)
+                      }
+                      (Map(
+                        "status"  -> status.intValue,
+                        "eventId" -> s"${id}"), i)
+                    } ++
+                      // Results of denied events
+                      taggedEvents.collect { case (Left(event), i) =>
                         (Map(
-                          "status" -> status.intValue,
-                          "eventId" -> s"${id}"), i)
+                          "status"  -> StatusCodes.Forbidden.intValue,
+                          "message" -> s"${event.event} events are not allowed"), i)
                       } ++
-                        // Results of denied events
-                        taggedEvents.collect { case (Left(event), i) =>
-                          (Map(
-                            "status" -> StatusCodes.Forbidden.intValue,
-                            "message" -> s"${event.event} events are not allowed"), i)
-                        } ++
-                        // Results of failed to deserialze events
-                        eventWithIndex.collect { case (Failure(exception), i) =>
-                          (Map(
-                            "status" -> StatusCodes.BadRequest.intValue,
-                            "message" -> s"${exception.getMessage()}"), i)
-                        }
+                      // Results of failed to deserialze events
+                      eventWithIndex.collect { case (Failure(exception), i) =>
+                        (Map(
+                          "status"  -> StatusCodes.BadRequest.intValue,
+                          "message" -> s"${exception.getMessage()}"), i)
+                      }
 
-                      // Restore original order
-                      results.sortBy { case (_, i) => i }.map { case (data, _) => data }
-                    }
+                    // Restore original order
+                    results.sortBy { case (_, i) => i }.map { case (data, _) => data }
+                  }
 
-                    f.recover { case exception =>
-                      Map(
-                        "status" -> StatusCodes.InternalServerError.intValue,
-                        "message" -> s"${exception.getMessage()}")
-                    }
+                  onSuccess(f.recover { case exception =>
+                    Map(
+                      "status" -> StatusCodes.InternalServerError.intValue,
+                      "message" -> s"${exception.getMessage()}"
+                    )
+                  }){ res => complete(res) }
 
-                  } else {
-                    (StatusCodes.BadRequest,
-                      Map("message" -> (s"Batch request must have less than or equal to " +
-                        s"${MaxNumberOfEventsPerBatchRequest} events")))
-                  }
+                } else {
+                  complete(StatusCodes.BadRequest,
+                    Map("message" -> (s"Batch request must have less than or equal to " +
+                      s"${MaxNumberOfEventsPerBatchRequest} events")))
                 }
               }
             }
           }
         }
-      }
-    } ~
-    path("stats.json") {
-
-      import Json4sProtocol._
-
-      get {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
+      } ~
+      path("stats.json") {
+        get {
+          handleExceptions(exceptionHandler) {
             authenticate(withAccessKey) { authData =>
               val appId = authData.appId
-              respondWithMediaType(MediaTypes.`application/json`) {
-                if (config.stats) {
-                  complete {
-                    statsActorRef ? GetStats(appId) map {
-                      _.asInstanceOf[Map[String, StatsSnapshot]]
-                    }
+              if (config.stats) {
+                complete {
+                  statsActorRef ? GetStats(appId) map {
+                    _.asInstanceOf[Map[String, StatsSnapshot]]
                   }
-                } else {
-                  complete(
-                    StatusCodes.NotFound,
-                    parse("""{"message": "To see stats, launch Event Server """ +
-                      """with --stats argument."}"""))
                 }
+              } else {
+                complete(
+                  StatusCodes.NotFound,
+                  Map("message" -> "To see stats, launch Event Server with --stats argument.")
+                )
               }
             }
           }
-        }
-      }  // stats.json get
-    } ~
-    path("webhooks" / jsonPath ) { web =>
-      import Json4sProtocol._
-
-      post {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
+        }  // stats.json get
+      } ~
+      path("webhooks" / jsonPath ) { web =>
+        post {
+          handleExceptions(exceptionHandler) {
             authenticate(withAccessKey) { authData =>
               val appId = authData.appId
               val channelId = authData.channelId
-              respondWithMediaType(MediaTypes.`application/json`) {
-                entity(as[JObject]) { jObj =>
-                  complete {
-                    Webhooks.postJson(
-                      appId = appId,
-                      channelId = channelId,
-                      web = web,
-                      data = jObj,
-                      eventClient = eventClient,
-                      log = logger,
-                      stats = config.stats,
-                      statsActorRef = statsActorRef)
-                  }
+              entity(as[JObject]) { jObj =>
+                onSuccess(Webhooks.postJson(
+                  appId = appId,
+                  channelId = channelId,
+                  web = web,
+                  data = jObj,
+                  eventClient = eventClient,
+                  log = logger,
+                  stats = config.stats,
+                  statsActorRef = statsActorRef
+                )){
+                  (status, body) => complete(status, body)
                 }
               }
             }
           }
-        }
-      } ~
-      get {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
+        } ~
+        get {
+          handleExceptions(exceptionHandler) {
             authenticate(withAccessKey) { authData =>
               val appId = authData.appId
               val channelId = authData.channelId
-              respondWithMediaType(MediaTypes.`application/json`) {
-                complete {
-                  Webhooks.getJson(
-                    appId = appId,
-                    channelId = channelId,
-                    web = web,
-                    log = logger)
-                }
+              onSuccess(
+                Webhooks.getJson(
+                appId = appId,
+                channelId = channelId,
+                web = web,
+                log = logger)
+              ){
+                (status, body) => complete(status, body)
               }
             }
           }
         }
-      }
-    } ~
-    path("webhooks" / formPath ) { web =>
-      post {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
+      } ~
+      path("webhooks" / formPath ) { web =>
+        post {
+          handleExceptions(exceptionHandler) {
             authenticate(withAccessKey) { authData =>
               val appId = authData.appId
               val channelId = authData.channelId
-              respondWithMediaType(MediaTypes.`application/json`) {
-                entity(as[FormData]){ formData =>
-                  // logger.debug(formData.toString)
-                  complete {
-                    // respond with JSON
-                    import Json4sProtocol._
-
-                    Webhooks.postForm(
-                      appId = appId,
-                      channelId = channelId,
-                      web = web,
-                      data = formData,
-                      eventClient = eventClient,
-                      log = logger,
-                      stats = config.stats,
-                      statsActorRef = statsActorRef)
-                  }
+              entity(as[FormData]){ formData =>
+                logger.debug(formData.toString)
+                onSuccess(Webhooks.postForm(
+                  appId = appId,
+                  channelId = channelId,
+                  web = web,
+                  data = formData,
+                  eventClient = eventClient,
+                  log = logger,
+                  stats = config.stats,
+                  statsActorRef = statsActorRef
+                )){
+                  (status, body) => complete(status, body)
                 }
               }
             }
           }
-        }
-      } ~
-      get {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
+        } ~
+        get {
+          handleExceptions(exceptionHandler) {
             authenticate(withAccessKey) { authData =>
               val appId = authData.appId
               val channelId = authData.channelId
-              respondWithMediaType(MediaTypes.`application/json`) {
-                complete {
-                  // respond with JSON
-                  import Json4sProtocol._
-
-                  Webhooks.getForm(
-                    appId = appId,
-                    channelId = channelId,
-                    web = web,
-                    log = logger)
-                }
+              onSuccess(Webhooks.getForm(
+                appId = appId,
+                channelId = channelId,
+                web = web,
+                log = logger
+              )){
+                (status, body) => complete(status, body)
               }
             }
           }
         }
       }
 
-    }
-
-  def receive: Actor.Receive = runRoute(route)
-}
-
-
-
-/* message */
-case class StartServer(host: String, port: Int)
-
-class EventServerActor(
-    val eventClient: LEvents,
-    val accessKeysClient: AccessKeys,
-    val channelsClient: Channels,
-    val config: EventServerConfig) extends Actor with ActorLogging {
-  val child = context.actorOf(
-    Props(classOf[EventServiceActor],
-      eventClient,
-      accessKeysClient,
-      channelsClient,
-      config),
-    "EventServiceActor")
-  implicit val system = context.system
-
-  def receive: Actor.Receive = {
-    case StartServer(host, portNum) => {
-      IO(Http) ! Http.Bind(child, interface = host, port = portNum)
-    }
-    case m: Http.Bound => log.info("Bound received. EventServer is ready.")
-    case m: Http.CommandFailed => log.error("Command failed.")
-    case _ => log.error("Unknown message.")
+    route
   }
-}
-
-case class EventServerConfig(
-  ip: String = "localhost",
-  port: Int = 7070,
-  plugins: String = "plugins",
-  stats: Boolean = false)
 
-object EventServer {
   def createEventServer(config: EventServerConfig): ActorSystem = {
     implicit val system = ActorSystem("EventServerSystem")
+    implicit val materializer = ActorMaterializer()
+    implicit val executionContext = system.dispatcher
 
     val eventClient = Storage.getLEvents()
     val accessKeysClient = Storage.getMetaDataAccessKeys()
     val channelsClient = Storage.getMetaDataChannels()
 
-    val serverActor = system.actorOf(
-      Props(
-        classOf[EventServerActor],
-        eventClient,
-        accessKeysClient,
-        channelsClient,
-        config),
-      "EventServerActor"
-    )
-    if (config.stats) system.actorOf(Props[StatsActor], "StatsActor")
-    system.actorOf(Props[PluginsActor], "PluginsActor")
-    serverActor ! StartServer(config.ip, config.port)
+    val statsActorRef = system.actorSelection("/user/StatsActor")
+    val pluginsActorRef = system.actorSelection("/user/PluginsActor")
+
+    val logger = Logging(system, getClass)
+
+    val route = createRoute(eventClient, accessKeysClient, channelsClient,
+      logger, statsActorRef, pluginsActorRef, config)
+
+    Http().bindAndHandle(route, config.ip, config.port)
+
     system
   }
 }
 
 object Run {
   def main(args: Array[String]): Unit = {
-    EventServer.createEventServer(EventServerConfig(
+    val f = EventServer.createEventServer(EventServerConfig(
       ip = "0.0.0.0",
       port = 7070))
-    .awaitTermination
+    .whenTerminated
+
+    Await.ready(f, Duration.Inf)
   }
 }
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala b/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala
index 9bbbc2e..d544b1b 100644
--- a/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala
@@ -18,13 +18,11 @@
 
 package org.apache.predictionio.data.api
 
+import akka.http.scaladsl.model.StatusCode
 import org.apache.predictionio.data.storage.Event
 
-import spray.http.StatusCode
-
-import scala.collection.mutable.{ HashMap => MHashMap }
+import scala.collection.mutable.{HashMap => MHashMap}
 import scala.collection.mutable
-
 import com.github.nscala_time.time.Imports.DateTime
 
 case class EntityTypesEvent(
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/StatsActor.scala b/data/src/main/scala/org/apache/predictionio/data/api/StatsActor.scala
index aa9438b..627d046 100644
--- a/data/src/main/scala/org/apache/predictionio/data/api/StatsActor.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/api/StatsActor.scala
@@ -18,10 +18,9 @@
 
 package org.apache.predictionio.data.api
 
+import akka.http.scaladsl.model.StatusCode
 import org.apache.predictionio.data.storage.Event
 
-import spray.http.StatusCode
-
 import akka.actor.Actor
 import akka.event.Logging
 
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala b/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala
index 57be037..e9a9c53 100644
--- a/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala
@@ -18,13 +18,10 @@
 
 package org.apache.predictionio.data.api
 
+import akka.http.scaladsl.model.{FormData, StatusCode, StatusCodes}
 import org.apache.predictionio.data.webhooks.ConnectorUtil
 import org.apache.predictionio.data.storage.LEvents
 
-import spray.http.StatusCodes
-import spray.http.StatusCode
-import spray.http.FormData
-
 import org.json4s.JObject
 
 import akka.event.LoggingAdapter
diff --git a/data/src/test/scala/org/apache/predictionio/data/api/EventServiceSpec.scala b/data/src/test/scala/org/apache/predictionio/data/api/EventServiceSpec.scala
index 7a45ca1..24cebc7 100644
--- a/data/src/test/scala/org/apache/predictionio/data/api/EventServiceSpec.scala
+++ b/data/src/test/scala/org/apache/predictionio/data/api/EventServiceSpec.scala
@@ -18,55 +18,39 @@
 
 package org.apache.predictionio.data.api
 
-import org.apache.predictionio.data.storage.{Storage, StorageMockContext}
-import akka.testkit.TestProbe
-import akka.actor.{ActorRef, ActorSystem, Props}
-import spray.http.HttpEntity
-import spray.http.HttpResponse
-import spray.http.ContentTypes
-import spray.httpx.RequestBuilding.Get
+import akka.event.Logging
+import org.apache.predictionio.data.storage.Storage
 import org.specs2.mutable.Specification
+import akka.http.scaladsl.testkit.Specs2RouteTest
 
-class EventServiceSpec extends Specification {
 
-  val system = ActorSystem("EventServiceSpecSystem")
+class EventServiceSpec extends Specification with Specs2RouteTest {
+  val eventClient = Storage.getLEvents()
+  val accessKeysClient = Storage.getMetaDataAccessKeys()
+  val channelsClient = Storage.getMetaDataChannels()
 
-  def createEventServiceActor: ActorRef = {
-    val eventClient = Storage.getLEvents()
-    val accessKeysClient = Storage.getMetaDataAccessKeys()
-    val channelsClient = Storage.getMetaDataChannels()
+  val statsActorRef = system.actorSelection("/user/StatsActor")
+  val pluginsActorRef = system.actorSelection("/user/PluginsActor")
 
-    system.actorOf(
-      Props(
-        new EventServiceActor(
-          eventClient,
-          accessKeysClient,
-          channelsClient,
-          EventServerConfig()
-        )
-      )
-    )
-  }
+  val logger = Logging(system, getClass)
+  val config = EventServerConfig(ip = "0.0.0.0", port = 7070)
 
+  val route = EventServer.createRoute(
+    eventClient,
+    accessKeysClient,
+    channelsClient,
+    logger,
+    statsActorRef,
+    pluginsActorRef,
+    config
+  )
 
   "GET / request" should {
-    "properly produce OK HttpResponses" in new StorageMockContext {
-      Thread.sleep(2000)
-      val eventServiceActor = createEventServiceActor
-      val probe = TestProbe()(system)
-      probe.send(eventServiceActor, Get("/"))
-      probe.expectMsg(
-        HttpResponse(
-          200,
-          HttpEntity(
-            contentType = ContentTypes.`application/json`,
-            string = """{"status":"alive"}"""
-          )
-        )
-      )
-      success
+    "properly produce OK HttpResponses" in {
+      Get() ~> route ~> check {
+        status.intValue() shouldEqual 200
+        responseAs[String] shouldEqual """{"status":"alive"}"""
+      }
     }
   }
-
-  step(system.shutdown())
 }
diff --git a/data/src/test/scala/org/apache/predictionio/data/api/SegmentIOAuthSpec.scala b/data/src/test/scala/org/apache/predictionio/data/api/SegmentIOAuthSpec.scala
index 5927824..297c25f 100644
--- a/data/src/test/scala/org/apache/predictionio/data/api/SegmentIOAuthSpec.scala
+++ b/data/src/test/scala/org/apache/predictionio/data/api/SegmentIOAuthSpec.scala
@@ -17,22 +17,20 @@
 
 package org.apache.predictionio.data.api
 
-import akka.actor.{ActorRef, ActorSystem, Props}
-import akka.testkit.TestProbe
+import akka.event.Logging
+import akka.http.scaladsl.model.ContentTypes
+import akka.http.scaladsl.model.headers.RawHeader
+import akka.http.scaladsl.server.Route
 import org.apache.predictionio.data.storage._
 import org.joda.time.DateTime
-import org.scalamock.specs2.MockContext
 import org.specs2.mutable.Specification
-import spray.http.HttpHeaders.RawHeader
-import spray.http.{ContentTypes, HttpEntity, HttpResponse}
-import spray.httpx.RequestBuilding._
 import sun.misc.BASE64Encoder
+import akka.http.scaladsl.testkit.Specs2RouteTest
 
 import scala.concurrent.{ExecutionContext, Future}
 
-class SegmentIOAuthSpec extends Specification {
+class SegmentIOAuthSpec extends Specification with Specs2RouteTest {
 
-  val system = ActorSystem("EventServiceSpecSystem")
   sequential
   isolated
   val eventClient = new LEvents {
@@ -74,75 +72,51 @@ class SegmentIOAuthSpec extends Specification {
 
     override def get(k: String): Option[AccessKey] =
       k match {
-        case "abc" ⇒ Some(AccessKey(k, appId, Seq.empty))
-        case _ ⇒ None
+        case "abc" => Some(AccessKey(k, appId, Seq.empty))
+        case _ => None
       }
   }
 
-  val base64Encoder = new BASE64Encoder
+  val channelsClient = Storage.getMetaDataChannels()
 
-  def createEventServiceActor(): ActorRef = {
-    val channelsClient = Storage.getMetaDataChannels()
-    system.actorOf(
-      Props(
-        new EventServiceActor(
-          eventClient,
-          accessKeysClient,
-          channelsClient,
-          EventServerConfig()
-        )
-      )
-    )
-  }
+  val statsActorRef = system.actorSelection("/user/StatsActor")
+  val pluginsActorRef = system.actorSelection("/user/PluginsActor")
 
-  "Event Service" should {
+  val base64Encoder = new BASE64Encoder
+  val logger = Logging(system, getClass)
+  val config = EventServerConfig(ip = "0.0.0.0", port = 7070)
+
+  val route = EventServer.createRoute(
+    eventClient,
+    accessKeysClient,
+    channelsClient,
+    logger,
+    statsActorRef,
+    pluginsActorRef,
+    config
+  )
 
+  "Event Service" should {
     "reject with CredentialsRejected with invalid credentials" in new StorageMockContext {
-      val eventServiceActor = createEventServiceActor
       val accessKey = "abc123:"
-      val probe = TestProbe()(system)
-      probe.send(
-        eventServiceActor,
-        Post("/webhooks/segmentio.json")
-          .withHeaders(
-            List(
-              RawHeader("Authorization", s"Basic $accessKey")
-            )
-          )
-      )
-      probe.expectMsg(
-        HttpResponse(
-          401,
-          HttpEntity(
-            contentType = ContentTypes.`application/json`,
-            string = """{"message":"Invalid accessKey."}"""
-          )
-        )
-      )
+      Post("/webhooks/segmentio.json")
+          .withHeaders(RawHeader("Authorization", s"Basic $accessKey")) ~> Route.seal(route) ~> check {
+        status.intValue() shouldEqual 401
+        responseAs[String] shouldEqual """{"message":"Invalid accessKey."}"""
+      }
       success
     }
+  }
 
     "reject with CredentialsMissed without credentials" in {
-      val eventServiceActor = createEventServiceActor
-      val probe = TestProbe()(system)
-      probe.send(
-        eventServiceActor,
-        Post("/webhooks/segmentio.json")
-      )
-      probe.expectMsg(
-        HttpResponse(
-          401,
-          HttpEntity(
-            contentType = ContentTypes.`application/json`,
-            string = """{"message":"Missing accessKey."}"""
-          )
-        )
-      )
+      Post("/webhooks/segmentio.json") ~> Route.seal(route) ~> check {
+        status.intValue() shouldEqual 401
+        responseAs[String] shouldEqual """{"message":"Missing accessKey."}"""
+      }
       success
     }
 
     "process SegmentIO identity request properly" in {
-      val eventServiceActor = createEventServiceActor
       val jsonReq =
         """
           |{
@@ -169,32 +143,15 @@ class SegmentIOAuthSpec extends Specification {
 
       val accessKey = "abc:"
       val accessKeyEncoded = base64Encoder.encodeBuffer(accessKey.getBytes)
-      val probe = TestProbe()(system)
-      probe.send(
-        eventServiceActor,
-        Post(
-          "/webhooks/segmentio.json",
-          HttpEntity(ContentTypes.`application/json`, jsonReq.getBytes)
-        ).withHeaders(
-            List(
-              RawHeader("Authorization", s"Basic $accessKeyEncoded")
-            )
-          )
-      )
-      probe.expectMsg(
-        HttpResponse(
-          201,
-          HttpEntity(
-            contentType = ContentTypes.`application/json`,
-            string = """{"eventId":"event_id"}"""
-          )
-        )
-      )
+      Post("/webhooks/segmentio.json")
+          .withHeaders(RawHeader("Authorization", s"Basic $accessKeyEncoded"))
+          .withEntity(ContentTypes.`application/json`, jsonReq) ~> route ~> check {
+        println(responseAs[String])
+        status.intValue() shouldEqual 201
+        responseAs[String] shouldEqual """{"eventId":"event_id"}"""
+      }
       success
-    }
   }
-
-  step(system.shutdown())
 }
 
 
diff --git a/docs/manual/source/datacollection/eventapi.html.md b/docs/manual/source/datacollection/eventapi.html.md
index dea3782..be4b040 100644
--- a/docs/manual/source/datacollection/eventapi.html.md
+++ b/docs/manual/source/datacollection/eventapi.html.md
@@ -67,7 +67,7 @@ Sample response:
 
 ```
 HTTP/1.1 200 OK
-Server: spray-can/1.2.1
+Server: akka-http/10.1.5
 Date: Wed, 10 Sep 2014 22:37:30 GMT
 Content-Type: application/json; charset=UTF-8
 Content-Length: 18
@@ -284,7 +284,7 @@ Sample response:
 
 ```
 HTTP/1.1 201 Created
-Server: spray-can/1.2.1
+Server: akka-http/10.1.5
 Date: Wed, 10 Sep 2014 22:51:33 GMT
 Content-Type: application/json; charset=UTF-8
 Content-Length: 41
diff --git a/docs/manual/source/datacollection/eventmodel.html.md.erb b/docs/manual/source/datacollection/eventmodel.html.md.erb
index ec8e5a8..676de54 100644
--- a/docs/manual/source/datacollection/eventmodel.html.md.erb
+++ b/docs/manual/source/datacollection/eventmodel.html.md.erb
@@ -139,7 +139,7 @@ You should see something like the following, meaning the events are imported suc
 
 ```
 HTTP/1.1 201 Created
-Server: spray-can/1.3.2
+Server: akka-http/10.1.5
 Date: Tue, 02 Jun 2015 23:13:58 GMT
 Content-Type: application/json; charset=UTF-8
 Content-Length: 57
@@ -339,7 +339,7 @@ The order in the response array is corresponding to the order of the request arr
 ###Sample Response:
 
     HTTP/1.1 200 Successful
-    Server: spray-can/1.2.1
+    Server: akka-http/10.1.5
     Date: Wed, 10 Sep 2014 22:51:33 GMT
     Content-Type: application/json; charset=UTF-8
     Content-Length: 41
diff --git a/docs/manual/source/deploy/monitoring.html.md b/docs/manual/source/deploy/monitoring.html.md
index 191898b..886b001 100644
--- a/docs/manual/source/deploy/monitoring.html.md
+++ b/docs/manual/source/deploy/monitoring.html.md
@@ -119,7 +119,7 @@ Be sure to adjust your deploy command to your environment (driver-memry, postgre
  exit 0
 ```
 
-There can be  cases when the process is running but the engine is down however. If the spray REST API used by PredictionIO crashes, the engine process continues but the engine to fail when queried.
+There can be  cases when the process is running but the engine is down however. If the Akka HTTP REST API used by PredictionIO crashes, the engine process continues but the engine to fail when queried.
 
 This sort of crash can be taken care of by using monits `check program` capability.
 
diff --git a/docs/manual/source/index.html.md.erb b/docs/manual/source/index.html.md.erb
index fb715f6..4289278 100644
--- a/docs/manual/source/index.html.md.erb
+++ b/docs/manual/source/index.html.md.erb
@@ -41,7 +41,7 @@ scientists to create predictive engines for any machine learning task. It lets y
 * simplify data infrastructure management.
 
 Apache PredictionIO® can be [installed](/install/) as a full machine
-learning stack, bundled with **Apache Spark**, **MLlib**, **HBase**, **Spray**
+learning stack, bundled with **Apache Spark**, **MLlib**, **HBase**, **Akka HTTP**
 and **Elasticsearch**, which simplifies and accelerates scalable machine
 learning infrastructure management.
 
diff --git a/tools/build.sbt b/tools/build.sbt
index 9375f2a..acdb1fe 100644
--- a/tools/build.sbt
+++ b/tools/build.sbt
@@ -21,11 +21,11 @@ import sbtassembly.AssemblyPlugin.autoImport._
 name := "apache-predictionio-tools"
 
 libraryDependencies ++= Seq(
-  "com.github.zafarkhaja"  %  "java-semver"    % "0.9.0",
-  "org.apache.spark"       %% "spark-sql"      % sparkVersion.value % "provided",
-  "com.typesafe.akka"      %% "akka-slf4j"     % akkaVersion.value,
-  "io.spray"               %% "spray-testkit"  % "1.3.3" % "test",
-  "org.specs2"             %% "specs2"         % "2.3.13" % "test")
+  "com.github.zafarkhaja"  %  "java-semver"       % "0.9.0",
+  "org.apache.spark"       %% "spark-sql"         % sparkVersion.value % "provided",
+  "com.typesafe.akka"      %% "akka-slf4j"        % akkaVersion.value,
+  "com.typesafe.akka"      %% "akka-http-testkit" % "10.1.5" % "test",
+  "org.specs2"             %% "specs2-core"       % "4.2.0" % "test")
 
 assemblyMergeStrategy in assembly := {
   case PathList("META-INF", "LICENSE.txt") => MergeStrategy.concat
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala b/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala
index 7e8fd30..d8bb79f 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala
@@ -14,148 +14,116 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-
 package org.apache.predictionio.tools.admin
 
-import akka.actor.{Actor, ActorSystem, Props}
-import akka.event.Logging
-import akka.io.IO
-import akka.util.Timeout
-import org.apache.predictionio.data.api.StartServer
-import org.apache.predictionio.data.storage.Storage
-import org.json4s.{Formats, DefaultFormats}
-
 import java.util.concurrent.TimeUnit
 
-import spray.can.Http
-import spray.http.{MediaTypes, StatusCodes}
-import spray.httpx.Json4sSupport
-import spray.routing._
+import akka.http.scaladsl.server._
+import org.apache.predictionio.data.storage._
 
-import scala.concurrent.ExecutionContext
 import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext}
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.server.Directives._
+import akka.util.Timeout
+import org.apache.predictionio.akkahttpjson4s.Json4sSupport._
+import org.json4s.{DefaultFormats, Formats}
 
-class AdminServiceActor(val commandClient: CommandClient)
-  extends HttpServiceActor {
+object Json4sProtocol {
+  implicit val serialization = org.json4s.jackson.Serialization
+  implicit def json4sFormats: Formats = DefaultFormats
+}
 
-  object Json4sProtocol extends Json4sSupport {
-    implicit def json4sFormats: Formats = DefaultFormats
-  }
+case class AdminServerConfig(
+  ip: String = "localhost",
+  port: Int = 7071
+)
 
+object AdminServer {
   import Json4sProtocol._
 
-  val log = Logging(context.system, this)
-
-  // we use the enclosing ActorContext's or ActorSystem's dispatcher for our
-  // Futures
-  implicit def executionContext: ExecutionContext = actorRefFactory.dispatcher
-  implicit val timeout: Timeout = Timeout(5, TimeUnit.SECONDS)
+  private implicit val timeout: Timeout = Timeout(5, TimeUnit.SECONDS)
 
   // for better message response
-  val rejectionHandler = RejectionHandler {
-    case MalformedRequestContentRejection(msg, _) :: _ =>
+  private val rejectionHandler = RejectionHandler.newBuilder().handle {
+    case MalformedRequestContentRejection(msg, _) =>
       complete(StatusCodes.BadRequest, Map("message" -> msg))
-    case MissingQueryParamRejection(msg) :: _ =>
+    case MissingQueryParamRejection(msg) =>
       complete(StatusCodes.NotFound,
         Map("message" -> s"missing required query parameter ${msg}."))
-    case AuthenticationFailedRejection(cause, challengeHeaders) :: _ =>
+    case AuthenticationFailedRejection(cause, challengeHeaders) =>
       complete(StatusCodes.Unauthorized, challengeHeaders,
         Map("message" -> s"Invalid accessKey."))
-  }
+  }.result()
 
-  val jsonPath = """(.+)\.json$""".r
+  def createRoute()(implicit executionContext: ExecutionContext): Route = {
+
+    val commandClient = new CommandClient(
+      appClient = Storage.getMetaDataApps,
+      accessKeyClient = Storage.getMetaDataAccessKeys,
+      eventClient = Storage.getLEvents()
+    )
 
-  val route: Route =
-    pathSingleSlash {
-      get {
-        respondWithMediaType(MediaTypes.`application/json`) {
+    val route =
+      pathSingleSlash {
+        get {
           complete(Map("status" -> "alive"))
         }
-      }
-    } ~
+      } ~
       path("cmd" / "app" / Segment / "data") {
         appName => {
           delete {
-            respondWithMediaType(MediaTypes.`application/json`) {
-              complete(commandClient.futureAppDataDelete(appName))
-            }
+            complete(commandClient.futureAppDataDelete(appName))
           }
         }
       } ~
       path("cmd" / "app" / Segment) {
         appName => {
           delete {
-            respondWithMediaType(MediaTypes.`application/json`) {
-              complete(commandClient.futureAppDelete(appName))
-            }
+            complete(commandClient.futureAppDelete(appName))
           }
         }
       } ~
       path("cmd" / "app") {
         get {
-          respondWithMediaType(MediaTypes.`application/json`) {
-            complete(commandClient.futureAppList())
-          }
+          complete(commandClient.futureAppList())
         } ~
-          post {
-            entity(as[AppRequest]) {
-              appArgs => respondWithMediaType(MediaTypes.`application/json`) {
-                complete(commandClient.futureAppNew(appArgs))
+        post {
+          entity(as[AppRequest]) {
+            appArgs =>
+              onSuccess(commandClient.futureAppNew(appArgs)){
+                case res: GeneralResponse => complete(res)
+                case res: AppNewResponse  => complete(res)
               }
-            }
           }
+        }
       }
-  def receive: Actor.Receive = runRoute(route)
-}
-
-class AdminServerActor(val commandClient: CommandClient) extends Actor {
-  val log = Logging(context.system, this)
-  val child = context.actorOf(
-    Props(classOf[AdminServiceActor], commandClient),
-    "AdminServiceActor")
 
-  implicit val system = context.system
-
-  def receive: PartialFunction[Any, Unit] = {
-    case StartServer(host, portNum) => {
-      IO(Http) ! Http.Bind(child, interface = host, port = portNum)
-
-    }
-    case m: Http.Bound => log.info("Bound received. AdminServer is ready.")
-    case m: Http.CommandFailed => log.error("Command failed.")
-    case _ => log.error("Unknown message.")
+    route
   }
-}
 
-case class AdminServerConfig(
-  ip: String = "localhost",
-  port: Int = 7071
-)
 
-object AdminServer {
   def createAdminServer(config: AdminServerConfig): ActorSystem = {
     implicit val system = ActorSystem("AdminServerSystem")
+    implicit val materializer = ActorMaterializer()
+    implicit val executionContext = system.dispatcher
 
-    val commandClient = new CommandClient(
-      appClient = Storage.getMetaDataApps,
-      accessKeyClient = Storage.getMetaDataAccessKeys,
-      eventClient = Storage.getLEvents()
-    )
-
-    val serverActor = system.actorOf(
-      Props(classOf[AdminServerActor], commandClient),
-      "AdminServerActor")
-    serverActor ! StartServer(config.ip, config.port)
+    val route = createRoute()
+    Http().bindAndHandle(route, config.ip, config.port)
     system
   }
 }
 
 object AdminRun {
   def main (args: Array[String]) : Unit = {
-    AdminServer.createAdminServer(AdminServerConfig(
+    val f = AdminServer.createAdminServer(AdminServerConfig(
       ip = "localhost",
       port = 7071))
-    .awaitTermination
+    .whenTerminated
+
+    Await.ready(f, Duration.Inf)
   }
 }
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/admin/README.md b/tools/src/main/scala/org/apache/predictionio/tools/admin/README.md
index 666b572..7944665 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/admin/README.md
+++ b/tools/src/main/scala/org/apache/predictionio/tools/admin/README.md
@@ -26,7 +26,7 @@ $ sbt/sbt "tools/compile"
 $ set -a
 $ source conf/pio-env.sh
 $ set +a
-$ sbt/sbt "tools/run-main org.apache.predictionio.tools.admin.AdminRun"
+$ sbt/sbt "tools/runMain org.apache.predictionio.tools.admin.AdminRun"
 ```
 
 ### Unit test (Very minimal)
@@ -35,7 +35,7 @@ $ sbt/sbt "tools/run-main org.apache.predictionio.tools.admin.AdminRun"
 $ set -a
 $ source conf/pio-env.sh
 $ set +a
-$ sbt/sbt "tools/test-only org.apache.predictionio.tools.admin.AdminAPISpec"
+$ sbt/sbt "tools/testOnly org.apache.predictionio.tools.admin.AdminAPISpec"
 ```
 
 ### Start with pio command adminserver
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala b/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala
index 8c4c6ae..54f5d3f 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala
@@ -25,7 +25,7 @@ import org.apache.predictionio.tools.EventServerArgs
 import org.apache.predictionio.tools.EitherLogging
 import org.apache.predictionio.tools.Common
 import org.apache.predictionio.tools.ReturnTypes._
-import org.apache.predictionio.tools.dashboard.Dashboard
+import org.apache.predictionio.tools.dashboard.DashboardServer
 import org.apache.predictionio.tools.dashboard.DashboardConfig
 import org.apache.predictionio.tools.admin.AdminServer
 import org.apache.predictionio.tools.admin.AdminServerConfig
@@ -62,7 +62,7 @@ object Management extends EitherLogging {
     */
   def dashboard(da: DashboardArgs): ActorSystem = {
     info(s"Creating dashboard at ${da.ip}:${da.port}")
-    Dashboard.createDashboard(DashboardConfig(
+    DashboardServer.createDashboard(DashboardConfig(
       ip = da.ip,
       port = da.port))
   }
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala
index ef4581b..1b4c8a8 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala
@@ -17,16 +17,13 @@
 
 package org.apache.predictionio.tools.console
 
-import org.apache.predictionio.tools.{
-  EventServerArgs, SparkArgs, WorkflowArgs, ServerArgs,
-  DeployArgs, BatchPredictArgs}
-import org.apache.predictionio.tools.commands.{
-  DashboardArgs, AdminServerArgs, ImportArgs, ExportArgs,
-  BuildArgs, EngineArgs, Management, Engine, Import, Export,
-  App => AppCmd, AccessKey => AccessKeysCmd}
+import org.apache.predictionio.tools.{BatchPredictArgs, DeployArgs, EventServerArgs, ServerArgs, SparkArgs, WorkflowArgs}
+import org.apache.predictionio.tools.commands.{AdminServerArgs, BuildArgs, DashboardArgs, Engine, EngineArgs, Export, ExportArgs, Import, ImportArgs, Management, AccessKey => AccessKeysCmd, App => AppCmd}
 import org.apache.predictionio.tools.ReturnTypes._
-
 import grizzled.slf4j.Logging
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
 import scala.language.implicitConversions
 import scala.sys.process._
 
@@ -116,17 +113,17 @@ object Pio extends Logging {
         ea, engineInstanceId, batchPredictArgs, sparkArgs, pioHome, verbose))
 
   def dashboard(da: DashboardArgs): Int = {
-    Management.dashboard(da).awaitTermination
+    Await.ready(Management.dashboard(da).whenTerminated, Duration.Inf)
     0
   }
 
   def eventserver(ea: EventServerArgs): Int = {
-    Management.eventserver(ea).awaitTermination
+    Await.ready(Management.eventserver(ea).whenTerminated, Duration.Inf)
     0
   }
 
   def adminserver(aa: AdminServerArgs): Int = {
-    Management.adminserver(aa).awaitTermination
+    Await.ready(Management.adminserver(aa).whenTerminated, Duration.Inf)
     0
   }
 
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/dashboard/CorsSupport.scala b/tools/src/main/scala/org/apache/predictionio/tools/dashboard/CorsSupport.scala
index 1026996..0a1031d 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/dashboard/CorsSupport.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/dashboard/CorsSupport.scala
@@ -18,60 +18,33 @@
 
 package org.apache.predictionio.tools.dashboard
 
-// Reference from: https://gist.github.com/waymost/4b5598523c2c7361abea
-
-import spray.http.{HttpMethods, HttpMethod, HttpResponse, AllOrigins}
-import spray.http.HttpHeaders._
-import spray.http.HttpEntity
-import spray.routing._
-import spray.http.StatusCodes
-import spray.http.ContentTypes
-
-// see also https://developer.mozilla.org/en-US/docs/Web/HTTP/Access_control_CORS
-trait CORSSupport {
-  this: HttpService =>
-
-  private val allowOriginHeader = `Access-Control-Allow-Origin`(AllOrigins)
-  private val optionsCorsHeaders = List(
-    `Access-Control-Allow-Headers`("""Origin,
-                                      |X-Requested-With,
-                                      |Content-Type,
-                                      |Accept,
-                                      |Accept-Encoding,
-                                      |Accept-Language,
-                                      |Host,
-                                      |Referer,
-                                      |User-Agent""".stripMargin.replace("\n", " ")),
-    `Access-Control-Max-Age`(1728000)
-  )
+// Reference from: https://gist.github.com/jeroenr/5261fa041d592f37cd80
+
+import akka.http.scaladsl.model.HttpMethods._
+import akka.http.scaladsl.model.{StatusCodes, HttpResponse}
+import akka.http.scaladsl.model.headers._
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server.{Directive0, Route}
+import com.typesafe.config.ConfigFactory
+
+trait CorsSupport {
+
+  // this directive adds access control headers to normal responses
+  private def addAccessControlHeaders: Directive0 = {
+    respondWithHeaders(
+      `Access-Control-Allow-Origin`.forRange(HttpOriginRange.`*`),
+      `Access-Control-Allow-Credentials`(true),
+      `Access-Control-Allow-Headers`("Authorization", "Content-Type", "X-Requested-With")
+    )
+  }
 
-  def cors[T]: Directive0 = mapRequestContext { ctx =>
-    ctx.withRouteResponseHandling {
-      // OPTION request for a resource that responds to other methods
-      case Rejected(x) if (ctx.request.method.equals(HttpMethods.OPTIONS) &&
-          x.exists(_.isInstanceOf[MethodRejection])) => {
-        val allowedMethods: List[HttpMethod] = x.collect {
-          case rejection: MethodRejection => rejection.supported
-        }
-        ctx.complete {
-          HttpResponse().withHeaders(
-            `Access-Control-Allow-Methods`(HttpMethods.OPTIONS, allowedMethods :_*) ::
-            allowOriginHeader ::
-            optionsCorsHeaders
-          )
-        }
-      }
-    }.withHttpResponseHeadersMapped { headers =>
-      allowOriginHeader :: headers
-    }
+  // this handles preflight OPTIONS requests.
+  private def preflightRequestHandler: Route = options {
+    complete(HttpResponse(StatusCodes.OK)
+      .withHeaders(`Access-Control-Allow-Methods`(OPTIONS, POST, PUT, GET, DELETE)))
   }
 
-  override def timeoutRoute: StandardRoute = complete {
-    HttpResponse(
-      StatusCodes.InternalServerError,
-      HttpEntity(ContentTypes.`text/plain(UTF-8)`,
-          "The server was not able to produce a timely response to your request."),
-      List(allowOriginHeader)
-    )
+  def corsHandler(r: Route): Route = addAccessControlHeaders {
+    preflightRequestHandler ~ r
   }
 }
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/dashboard/Dashboard.scala b/tools/src/main/scala/org/apache/predictionio/tools/dashboard/Dashboard.scala
index 7d651b1..ddbf715 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/dashboard/Dashboard.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/dashboard/Dashboard.scala
@@ -18,22 +18,23 @@
 
 package org.apache.predictionio.tools.dashboard
 
-import com.typesafe.config.ConfigFactory
 import org.apache.predictionio.authentication.KeyAuthentication
-import org.apache.predictionio.configuration.SSLConfiguration
 import org.apache.predictionio.data.storage.Storage
-import spray.can.server.ServerSettings
-import scala.concurrent.ExecutionContext
-import akka.actor.{ActorContext, Actor, ActorSystem, Props}
-import akka.io.IO
-import akka.pattern.ask
-import akka.util.Timeout
+
+import scala.concurrent.{Await, ExecutionContext, Future}
+import akka.actor.ActorSystem
+import akka.http.scaladsl.server.directives.FutureDirectives.onSuccess
 import com.github.nscala_time.time.Imports.DateTime
 import grizzled.slf4j.Logging
-import spray.can.Http
-import spray.http._
-import spray.http.MediaTypes._
-import spray.routing._
+import akka.http.scaladsl.{ConnectionContext, Http, HttpsConnectionContext}
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server.directives._
+import akka.http.scaladsl.server._
+import akka.stream.ActorMaterializer
+import akka.http.scaladsl.model.ContentTypes._
+import com.typesafe.config.ConfigFactory
+import org.apache.predictionio.configuration.SSLConfiguration
 
 import scala.concurrent.duration._
 
@@ -41,7 +42,8 @@ case class DashboardConfig(
   ip: String = "localhost",
   port: Int = 9000)
 
-object Dashboard extends Logging with SSLConfiguration {
+object Dashboard extends Logging {
+
   def main(args: Array[String]): Unit = {
     val parser = new scopt.OptionParser[DashboardConfig]("Dashboard") {
       opt[String]("ip") action { (x, c) =>
@@ -53,108 +55,108 @@ object Dashboard extends Logging with SSLConfiguration {
     }
 
     parser.parse(args, DashboardConfig()) map { dc =>
-      createDashboard(dc).awaitTermination
+      val f = DashboardServer.createDashboard(dc).whenTerminated
+      Await.result(f, Duration.Inf)
     }
   }
 
+}
+
+object DashboardServer extends KeyAuthentication with CorsSupport with SSLConfiguration {
+
   def createDashboard(dc: DashboardConfig): ActorSystem = {
     val systemName = "pio-dashboard"
     implicit val system = ActorSystem(systemName)
-    val service =
-      system.actorOf(Props(classOf[DashboardActor], dc), "dashboard")
-    implicit val timeout = Timeout(5.seconds)
-    val settings = ServerSettings(system)
+    implicit val materializer = ActorMaterializer()
+    implicit val executionContext = system.dispatcher
     val serverConfig = ConfigFactory.load("server.conf")
     val sslEnforced = serverConfig.getBoolean("org.apache.predictionio.server.ssl-enforced")
-    IO(Http) ? Http.Bind(
-      service,
-      interface = dc.ip,
-      port = dc.port,
-      settings = Some(settings.copy(sslEncryption = sslEnforced)))
+    val route = createRoute(DateTime.now, dc)
+    if(sslEnforced){
+      val https: HttpsConnectionContext = ConnectionContext.https(sslContext)
+      Http().setDefaultServerHttpContext(https)
+      Http().bindAndHandle(route, dc.ip, dc.port, connectionContext = https)
+    } else {
+      Http().bindAndHandle(route, dc.ip, dc.port)
+    }
     system
   }
-}
 
-class DashboardActor(
-    val dc: DashboardConfig)
-  extends Actor with DashboardService {
-  def actorRefFactory: ActorContext = context
-  def receive: Actor.Receive = runRoute(dashboardRoute)
-}
+  def createRoute(serverStartTime: DateTime, dc: DashboardConfig)
+                 (implicit executionContext: ExecutionContext): Route = {
+    val evaluationInstances = Storage.getMetaDataEvaluationInstances
+    val pioEnvVars = sys.env.filter(kv => kv._1.startsWith("PIO_"))
 
-trait DashboardService extends HttpService with KeyAuthentication with CORSSupport {
-
-  implicit def executionContext: ExecutionContext = actorRefFactory.dispatcher
-  val dc: DashboardConfig
-  val evaluationInstances = Storage.getMetaDataEvaluationInstances
-  val pioEnvVars = sys.env.filter(kv => kv._1.startsWith("PIO_"))
-  val serverStartTime = DateTime.now
-  val dashboardRoute =
-    path("") {
-      authenticate(withAccessKeyFromFile) { request =>
-        get {
-          respondWithMediaType(`text/html`) {
-            complete {
-              val completedInstances = evaluationInstances.getCompleted
-              html.index(
-                dc,
-                serverStartTime,
-                pioEnvVars,
-                completedInstances).toString
-            }
-          }
+    def authenticate[T](authenticator: RequestContext => Future[Either[Rejection, T]]):
+        AuthenticationDirective[T] = {
+      extractRequestContext.flatMap { requestContext =>
+        onSuccess(authenticator(requestContext)).flatMap {
+          case Right(x) => provide(x)
+          case Left(x)  => reject(x): Directive1[T]
         }
       }
-    } ~
-    pathPrefix("engine_instances" / Segment) { instanceId =>
-      path("evaluator_results.txt") {
-        get {
-          respondWithMediaType(`text/plain`) {
+    }
+
+    val route: Route =
+      path("") {
+        authenticate(withAccessKeyFromFile) { request =>
+          get {
+            val completedInstances = evaluationInstances.getCompleted
+            complete(HttpResponse(entity = HttpEntity(
+                `text/html(UTF-8)`,
+                 html.index(dc, serverStartTime, pioEnvVars, completedInstances).toString
+            )))
+          }
+        }
+      } ~
+      pathPrefix("engine_instances" / Segment) { instanceId =>
+        path("evaluator_results.txt") {
+          get {
             evaluationInstances.get(instanceId).map { i =>
               complete(i.evaluatorResults)
             } getOrElse {
               complete(StatusCodes.NotFound)
             }
           }
-        }
-      } ~
-      path("evaluator_results.html") {
-        get {
-          respondWithMediaType(`text/html`) {
+        } ~
+        path("evaluator_results.html") {
+          get {
             evaluationInstances.get(instanceId).map { i =>
-              complete(i.evaluatorResultsHTML)
+              complete(HttpResponse(
+                entity = HttpEntity(`text/html(UTF-8)`, i.evaluatorResultsHTML)))
             } getOrElse {
               complete(StatusCodes.NotFound)
             }
           }
-        }
-      } ~
-      path("evaluator_results.json") {
-        get {
-          respondWithMediaType(`application/json`) {
+        } ~
+        path("evaluator_results.json") {
+          get {
             evaluationInstances.get(instanceId).map { i =>
-              complete(i.evaluatorResultsJSON)
+              complete(HttpResponse(
+                entity = HttpEntity(`application/json`, i.evaluatorResultsJSON)))
             } getOrElse {
               complete(StatusCodes.NotFound)
             }
           }
-        }
-      } ~
-      cors {
-        path("local_evaluator_results.json") {
-          get {
-            respondWithMediaType(`application/json`) {
+        } ~
+        corsHandler {
+          path("local_evaluator_results.json") {
+            get {
               evaluationInstances.get(instanceId).map { i =>
-                complete(i.evaluatorResultsJSON)
+                complete(HttpResponse(
+                  entity = HttpEntity(`application/json`, i.evaluatorResultsJSON)))
               } getOrElse {
                 complete(StatusCodes.NotFound)
               }
             }
           }
+        } ~
+        pathPrefix("assets") {
+          getFromResourceDirectory("assets")
         }
       }
-    } ~
-    pathPrefix("assets") {
-      getFromResourceDirectory("assets")
-    }
+
+    route
+  }
+
 }
diff --git a/tools/src/test/scala/org/apache/predictionio/tools/admin/AdminAPISpec.scala b/tools/src/test/scala/org/apache/predictionio/tools/admin/AdminAPISpec.scala
index e6c8bd3..e554ebf 100644
--- a/tools/src/test/scala/org/apache/predictionio/tools/admin/AdminAPISpec.scala
+++ b/tools/src/test/scala/org/apache/predictionio/tools/admin/AdminAPISpec.scala
@@ -17,67 +17,19 @@
 
 package org.apache.predictionio.tools.admin
 
-import akka.actor.{ActorSystem, Props}
-import akka.testkit.TestProbe
-import org.apache.predictionio.data.storage.Storage
 import org.specs2.mutable.Specification
-import spray.http._
-import spray.httpx.RequestBuilding._
-import spray.util._
+import akka.http.scaladsl.testkit.Specs2RouteTest
 
-
-class AdminAPISpec extends Specification{
-
-  val system = ActorSystem(Utils.actorSystemNameFrom(getClass))
-  val config = AdminServerConfig(
-    ip = "localhost",
-    port = 7071)
-
-  val commandClient = new CommandClient(
-    appClient = Storage.getMetaDataApps,
-    accessKeyClient = Storage.getMetaDataAccessKeys,
-    eventClient = Storage.getLEvents()
-  )
-
-  val adminActor= system.actorOf(Props(classOf[AdminServiceActor], commandClient))
+class AdminAPISpec extends Specification with Specs2RouteTest {
+  val route = AdminServer.createRoute()
 
   "GET / request" should {
     "properly produce OK HttpResponses" in {
-      val probe = TestProbe()(system)
-      probe.send(adminActor, Get("/"))
-
-      probe.expectMsg(
-        HttpResponse(
-          200,
-          HttpEntity(
-            contentType = ContentTypes.`application/json`,
-            string = """{"status":"alive"}"""
-          )
-        )
-      )
-      success
-    }
-  }
-
-  "GET /cmd/app request" should {
-    "properly produce OK HttpResponses" in {
-      /*
-      val probe = TestProbe()(system)
-      probe.send(adminActor,Get("/cmd/app"))
-
-      //TODO: Need to convert the response string to the corresponding case object to assert some properties on the object
-      probe.expectMsg(
-        HttpResponse(
-          200,
-          HttpEntity(
-            contentType = ContentTypes.`application/json`,
-            string = """{"status":1}"""
-          )
-        )
-      )*/
-      pending
+      Get() ~> route ~> check {
+        response.status.intValue() shouldEqual 200
+        responseAs[String] shouldEqual """{"status":"alive"}"""
+      }
     }
   }
 
-  step(system.shutdown())
 }