You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2021/04/13 10:34:20 UTC

[openwhisk] branch master updated: [New Scheduler] Add ActivationService (#5070)

This is an automated email from the ASF dual-hosted git repository.

style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new cd6fded  [New Scheduler] Add ActivationService (#5070)
cd6fded is described below

commit cd6fded8a6836756cbfbe4159064c85683b64cd7
Author: Seonghyun Oh <oh...@navercorp.com>
AuthorDate: Tue Apr 13 19:32:48 2021 +0900

    [New Scheduler] Add ActivationService (#5070)
    
    * Add ActivationService for scheduler
    
    * Add annotation
    
    * Add license header
    
    * Reformat activation.proto
    
    * Reduce request timeout
    
    * Add license header
    
    * Scan code before compiling the code
---
 .../org/apache/openwhisk/core/entity/DocInfo.scala |   4 +
 .../core/entity/FullyQualifiedEntityName.scala     |   3 +
 core/scheduler/build.gradle                        |  29 ++++
 core/scheduler/src/main/java/Empty.java            |  22 +++
 core/scheduler/src/main/protobuf/activation.proto  |  66 ++++++++
 .../scheduler/grpc/ActivationServiceImpl.scala     | 135 ++++++++++++++++
 .../core/scheduler/queue/QueueManager.scala        | 119 ++++++++++++++
 .../grpc/test/ActivationServiceImplTests.scala     | 173 +++++++++++++++++++++
 .../core/scheduler/grpc/test/CommonVariable.scala  |  40 +++++
 tools/travis/runUnitTests.sh                       |   2 -
 tools/travis/setup.sh                              |   3 +
 11 files changed, 594 insertions(+), 2 deletions(-)

diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/DocInfo.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/DocInfo.scala
index f8849d1..77e2008 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/DocInfo.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/DocInfo.scala
@@ -25,6 +25,7 @@ import spray.json.JsString
 import spray.json.JsValue
 import spray.json.RootJsonFormat
 import spray.json.deserializationError
+import spray.json._
 
 import org.apache.openwhisk.core.entity.ArgNormalizer.trim
 
@@ -59,6 +60,7 @@ protected[core] class DocRevision private (val rev: String) extends AnyVal {
   def asString = rev // to make explicit that this is a string conversion
   def empty = rev == null
   override def toString = rev
+  def serialize = DocRevision.serdes.write(this).compactPrint
 }
 
 /**
@@ -131,6 +133,8 @@ protected[core] object DocRevision {
 
   protected[core] val empty: DocRevision = new DocRevision(null)
 
+  protected[core] def parse(msg: String) = Try(serdes.read(msg.parseJson))
+
   implicit val serdes = new RootJsonFormat[DocRevision] {
     def write(d: DocRevision) = if (d.rev != null) JsString(d.rev) else JsNull
 
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/FullyQualifiedEntityName.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/FullyQualifiedEntityName.scala
index 44bb971..f766967 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/FullyQualifiedEntityName.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/FullyQualifiedEntityName.scala
@@ -56,6 +56,7 @@ protected[core] case class FullyQualifiedEntityName(path: EntityPath,
   def namespace: EntityName = path.root
   def qualifiedNameWithLeadingSlash: String = EntityPath.PATHSEP + qualifiedName
   def asString = path.addPath(name) + version.map("@" + _.toString).getOrElse("")
+  def serialize = FullyQualifiedEntityName.serdes.write(this).compactPrint
 
   override def size = qualifiedName.sizeInBytes
   override def toString = asString
@@ -101,6 +102,8 @@ protected[core] object FullyQualifiedEntityName extends DefaultJsonProtocol {
       }
   }
 
+  protected[core] def parse(msg: String) = Try(serdes.read(msg.parseJson))
+
   /**
    * Converts the name to a fully qualified name.
    * There are 3 cases:
diff --git a/core/scheduler/build.gradle b/core/scheduler/build.gradle
index 530c962..65d088a 100644
--- a/core/scheduler/build.gradle
+++ b/core/scheduler/build.gradle
@@ -20,6 +20,7 @@ apply plugin: 'application'
 apply plugin: 'eclipse'
 apply plugin: 'maven'
 apply plugin: 'org.scoverage'
+apply plugin: 'com.lightbend.akka.grpc.gradle'
 
 ext.dockerImageName = 'scheduler'
 apply from: '../../gradle/docker.gradle'
@@ -33,6 +34,20 @@ ext.coverageDirs = [
 ]
 distDockerCoverage.dependsOn ':common:scala:scoverageClasses', 'scoverageClasses'
 
+buildscript {
+    repositories {
+        mavenLocal()
+        maven {
+            url "https://plugins.gradle.org/m2/"
+        }
+    }
+    dependencies {
+        // see https://plugins.gradle.org/plugin/com.lightbend.akka.grpc.gradle
+        // for the currently latest version.
+        classpath 'gradle.plugin.com.lightbend.akka.grpc:akka-grpc-gradle-plugin:0.7.2'
+    }
+}
+
 // Define a separate configuration for managing the dependency on Jetty ALPN agent.
 configurations {
     alpnagent
@@ -51,7 +66,21 @@ dependencies {
 
     compile "org.scala-lang:scala-library:${gradle.scala.version}"
     compile project(':common:scala')
+}
 
+// workaround for akka-grpc
+// https://github.com/akka/akka-grpc/issues/786
+printProtocLogs.doFirst {
+    mkdir "$buildDir"
+    file("$buildDir/akka-grpc-gradle-plugin.log").text = "x"
+    mkdir "$project.rootDir/build"
+    file("$project.rootDir/build/akka-grpc-gradle-plugin.log").text = "x"
+}
+printProtocLogs.configure {
+    mkdir "$buildDir"
+    file("$buildDir/akka-grpc-gradle-plugin.log").text = "x"
+    mkdir "$project.rootDir/build"
+    file("$project.rootDir/build/akka-grpc-gradle-plugin.log").text = "x"
 }
 
 mainClassName = "org.apache.openwhisk.core.scheduler.Scheduler"
diff --git a/core/scheduler/src/main/java/Empty.java b/core/scheduler/src/main/java/Empty.java
new file mode 100644
index 0000000..b982d8f
--- /dev/null
+++ b/core/scheduler/src/main/java/Empty.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class Empty {
+    // Workaround for this issue https://github.com/akka/akka-grpc/issues/289
+    // Gradle complains about no java sources.
+    // Note. Openwhisk is using a lower gradle version, so the latest akka-grpc version cannot be used.
+}
diff --git a/core/scheduler/src/main/protobuf/activation.proto b/core/scheduler/src/main/protobuf/activation.proto
new file mode 100644
index 0000000..fb16f48
--- /dev/null
+++ b/core/scheduler/src/main/protobuf/activation.proto
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+import "google/protobuf/wrappers.proto";
+
+//#options
+option java_multiple_files = true;
+option java_package = "org.apache.openwhisk.grpc";
+option java_outer_classname = "ActivationProto";
+
+package activation;
+//#options
+
+//#services
+service ActivationService {
+    rpc FetchActivation (FetchRequest) returns (FetchResponse) {}
+    rpc RescheduleActivation (RescheduleRequest) returns (RescheduleResponse) {}
+}
+//#services
+
+//#messages
+// The request message
+message FetchRequest {
+    string invocationNamespace = 1;
+    string fqn = 2;
+    string rev = 3;
+    string containerId = 4;
+    bool warmed = 5;
+    // This allows optional value
+    google.protobuf.Int64Value lastDuration = 6;
+    // to record alive containers
+    bool alive = 7;
+}
+
+// The response message
+message FetchResponse {
+    string activationMessage = 1;
+}
+
+message RescheduleRequest {
+    string invocationNamespace = 1;
+    string fqn = 2;
+    string rev = 3;
+    string activationMessage = 4;
+}
+
+message RescheduleResponse {
+    // if reschedule request is failed, then it will be `false`
+    bool isRescheduled = 1;
+}
+
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala
new file mode 100644
index 0000000..d80cd42
--- /dev/null
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.grpc
+
+import akka.actor.ActorSystem
+import akka.pattern.ask
+import akka.util.Timeout
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.connector.{ActivationMessage, Message}
+import org.apache.openwhisk.core.entity.{DocRevision, FullyQualifiedEntityName}
+import org.apache.openwhisk.core.scheduler.queue._
+import org.apache.openwhisk.grpc.{ActivationService, FetchRequest, FetchResponse, RescheduleRequest, RescheduleResponse}
+import spray.json._
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContextExecutor, Future}
+import scala.util.Try
+
+class ActivationServiceImpl()(implicit actorSystem: ActorSystem, logging: Logging) extends ActivationService {
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  implicit val ec: ExecutionContextExecutor = actorSystem.dispatcher
+
+  override def rescheduleActivation(request: RescheduleRequest): Future[RescheduleResponse] = {
+    logging.info(this, s"Try to reschedule activation ${request.invocationNamespace} ${request.fqn} ${request.rev}")
+    Future(for {
+      fqn <- FullyQualifiedEntityName.parse(request.fqn)
+      rev <- DocRevision.parse(request.rev)
+      msg <- ActivationMessage.parse(request.activationMessage)
+    } yield (fqn, rev, msg)).flatMap(Future.fromTry) flatMap { res =>
+      {
+        val key = res._1.toDocId.asDocInfo(res._2)
+        QueuePool.get(MemoryQueueKey(request.invocationNamespace, key)) match {
+          case Some(queueValue) =>
+            // enqueue activation message to reschedule
+            logging.info(
+              this,
+              s"Enqueue activation message to reschedule ${request.invocationNamespace} ${request.fqn} ${request.rev}")
+            queueValue.queue ? res._3
+            Future.successful(RescheduleResponse(isRescheduled = true))
+          case None =>
+            logging.error(this, s"Queue not found for ${request.invocationNamespace} ${request.fqn} ${request.rev}")
+            Future.successful(RescheduleResponse())
+        }
+      }
+    }
+  }
+
+  override def fetchActivation(request: FetchRequest): Future[FetchResponse] = {
+    Future(for {
+      fqn <- FullyQualifiedEntityName.parse(request.fqn)
+      rev <- DocRevision.parse(request.rev)
+    } yield (fqn, rev)).flatMap(Future.fromTry) flatMap { res =>
+      val key = res._1.toDocId.asDocInfo(res._2)
+      QueuePool.get(MemoryQueueKey(request.invocationNamespace, key)) match {
+        case Some(queueValue) =>
+          (queueValue.queue ? GetActivation(
+            res._1,
+            request.containerId,
+            request.warmed,
+            request.lastDuration,
+            request.alive))
+            .mapTo[ActivationResponse]
+            .map { response =>
+              FetchResponse(response.serialize)
+            }
+            .recover {
+              case t: Throwable =>
+                logging.error(this, s"Failed to get message from QueueManager, error: ${t.getMessage}")
+                FetchResponse(ActivationResponse(Left(NoActivationMessage())).serialize)
+            }
+        case None =>
+          if (QueuePool.keys.exists { mkey =>
+                mkey.invocationNamespace == request.invocationNamespace && mkey.docInfo.id == key.id
+              })
+            Future.successful(FetchResponse(ActivationResponse(Left(ActionMismatch())).serialize))
+          else
+            Future.successful(FetchResponse(ActivationResponse(Left(NoMemoryQueue())).serialize))
+      }
+    }
+  }
+}
+
+object ActivationServiceImpl {
+
+  def apply()(implicit actorSystem: ActorSystem, logging: Logging) =
+    new ActivationServiceImpl()
+}
+
+case class GetActivation(action: FullyQualifiedEntityName,
+                         containerId: String,
+                         warmed: Boolean,
+                         lastDuration: Option[Long],
+                         alive: Boolean = true)
+case class ActivationResponse(message: Either[MemoryQueueError, ActivationMessage]) extends Message {
+  override def serialize = ActivationResponse.serdes.write(this).compactPrint
+}
+
+object ActivationResponse extends DefaultJsonProtocol {
+
+  private implicit val noMessageSerdes = NoActivationMessage.serdes
+  private implicit val noQueueSerdes = NoMemoryQueue.serdes
+  private implicit val mismatchSerdes = ActionMismatch.serdes
+  private implicit val messageSerdes = ActivationMessage.serdes
+  private implicit val memoryqueueuErrorSerdes = MemoryQueueErrorSerdes.memoryQueueErrorFormat
+
+  def parse(msg: String) = Try(serdes.read(msg.parseJson))
+
+  implicit def rootEitherFormat[A: RootJsonFormat, B: RootJsonFormat] =
+    new RootJsonFormat[Either[A, B]] {
+      val format = DefaultJsonProtocol.eitherFormat[A, B]
+
+      def write(either: Either[A, B]) = format.write(either)
+
+      def read(value: JsValue) = format.read(value)
+    }
+
+  type ActivationResponse = Either[MemoryQueueError, ActivationMessage]
+  implicit val serdes = jsonFormat(ActivationResponse.apply _, "message")
+
+}
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
new file mode 100644
index 0000000..1b6b818
--- /dev/null
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.queue
+
+import akka.actor.ActorRef
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.entity._
+import spray.json.{DefaultJsonProtocol, _}
+import scala.collection.concurrent.TrieMap
+import scala.util.Try
+
+object QueueSize
+case class MemoryQueueKey(invocationNamespace: String, docInfo: DocInfo)
+case class MemoryQueueValue(queue: ActorRef, isLeader: Boolean)
+
+sealed trait MemoryQueueError extends Product {
+  val causedBy: String
+}
+
+object MemoryQueueErrorSerdes {
+
+  private implicit val noMessageSerdes = NoActivationMessage.serdes
+  private implicit val noQueueSerdes = NoMemoryQueue.serdes
+  private implicit val mismatchSerdes = ActionMismatch.serdes
+
+  // format that discriminates based on an additional
+  // field "type" that can either be "Cat" or "Dog"
+  implicit val memoryQueueErrorFormat = new RootJsonFormat[MemoryQueueError] {
+    def write(obj: MemoryQueueError): JsValue =
+      JsObject((obj match {
+        case msg: NoActivationMessage => msg.toJson
+        case msg: NoMemoryQueue       => msg.toJson
+        case msg: ActionMismatch      => msg.toJson
+      }).asJsObject.fields + ("type" -> JsString(obj.productPrefix)))
+
+    def read(json: JsValue): MemoryQueueError =
+      json.asJsObject.getFields("type") match {
+        case Seq(JsString("NoActivationMessage")) => json.convertTo[NoActivationMessage]
+        case Seq(JsString("NoMemoryQueue"))       => json.convertTo[NoMemoryQueue]
+        case Seq(JsString("ActionMismatch"))      => json.convertTo[ActionMismatch]
+      }
+  }
+}
+
+case class NoActivationMessage(noActivationMessage: String = NoActivationMessage.asString)
+    extends MemoryQueueError
+    with Message {
+  override val causedBy: String = noActivationMessage
+  override def serialize = NoActivationMessage.serdes.write(this).compactPrint
+}
+
+object NoActivationMessage extends DefaultJsonProtocol {
+  val asString: String = "no activation message exist"
+  def parse(msg: String) = Try(serdes.read(msg.parseJson))
+  implicit val serdes = jsonFormat(NoActivationMessage.apply _, "noActivationMessage")
+}
+
+case class NoMemoryQueue(noMemoryQueue: String = NoMemoryQueue.asString) extends MemoryQueueError with Message {
+  override val causedBy: String = noMemoryQueue
+  override def serialize = NoMemoryQueue.serdes.write(this).compactPrint
+}
+
+object NoMemoryQueue extends DefaultJsonProtocol {
+  val asString: String = "no memory queue exist"
+  def parse(msg: String) = Try(serdes.read(msg.parseJson))
+  implicit val serdes = jsonFormat(NoMemoryQueue.apply _, "noMemoryQueue")
+}
+
+case class ActionMismatch(actionMisMatch: String = ActionMismatch.asString) extends MemoryQueueError with Message {
+  override val causedBy: String = actionMisMatch
+  override def serialize = ActionMismatch.serdes.write(this).compactPrint
+}
+
+object ActionMismatch extends DefaultJsonProtocol {
+  val asString: String = "action version does not match"
+  def parse(msg: String) = Try(serdes.read(msg.parseJson))
+  implicit val serdes = jsonFormat(ActionMismatch.apply _, "actionMisMatch")
+}
+
+object QueuePool {
+  private val _queuePool = TrieMap[MemoryQueueKey, MemoryQueueValue]()
+
+  private[scheduler] def get(key: MemoryQueueKey) = _queuePool.get(key)
+
+  private[scheduler] def put(key: MemoryQueueKey, value: MemoryQueueValue) = _queuePool.put(key, value)
+
+  private[scheduler] def remove(key: MemoryQueueKey) = _queuePool.remove(key)
+
+  private[scheduler] def countLeader() = _queuePool.count(_._2.isLeader)
+
+  private[scheduler] def clear(): Unit = _queuePool.clear()
+
+  private[scheduler] def size = _queuePool.size
+
+  private[scheduler] def values = _queuePool.values
+
+  private[scheduler] def keys = _queuePool.keys
+}
+
+case class CreateQueue(invocationNamespace: String,
+                       fqn: FullyQualifiedEntityName,
+                       revision: DocRevision,
+                       whiskActionMetaData: WhiskActionMetaData)
+case class CreateQueueResponse(invocationNamespace: String, fqn: FullyQualifiedEntityName, success: Boolean)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala
new file mode 100644
index 0000000..75c913e
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.grpc.test
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.ActorMaterializer
+import akka.testkit.{ImplicitSender, TestKit}
+import common.StreamLogging
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.connector.ActivationMessage
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.scheduler.grpc.ActivationServiceImpl
+import org.apache.openwhisk.core.scheduler.queue.{
+  ActionMismatch,
+  MemoryQueueKey,
+  MemoryQueueValue,
+  NoMemoryQueue,
+  QueuePool
+}
+import org.apache.openwhisk.grpc.{FetchRequest, FetchResponse, RescheduleRequest, RescheduleResponse}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, Matchers}
+import org.apache.openwhisk.core.scheduler.grpc.{ActivationResponse, GetActivation}
+import org.scalatest.concurrent.ScalaFutures
+
+import scala.concurrent.duration._
+
+@RunWith(classOf[JUnitRunner])
+class ActivationServiceImplTests
+    extends TestKit(ActorSystem("ActivationService"))
+    with CommonVariable
+    with ImplicitSender
+    with FlatSpecLike
+    with Matchers
+    with BeforeAndAfterAll
+    with BeforeAndAfterEach
+    with ScalaFutures
+    with StreamLogging {
+
+  override def afterAll = {
+    QueuePool.clear()
+    TestKit.shutdownActorSystem(system)
+  }
+  override def beforeEach = QueuePool.clear()
+
+  behavior of "ActivationService"
+
+  implicit val mat = ActorMaterializer()
+  implicit val ec = system.dispatcher
+
+  val messageTransId = TransactionId(TransactionId.testing.meta.id)
+  val uuid = UUID()
+
+  val testDoc = testFQN.toDocId.asDocInfo(testDocRevision)
+  val message = ActivationMessage(
+    messageTransId,
+    FullyQualifiedEntityName(testEntityPath, testEntityName),
+    DocRevision.empty,
+    Identity(
+      Subject(),
+      Namespace(EntityName(testNamespace), uuid),
+      BasicAuthenticationAuthKey(uuid, Secret()),
+      Set.empty),
+    ActivationId.generate(),
+    ControllerInstanceId("0"),
+    blocking = false,
+    content = None)
+
+  it should "send GetActivation message to the MemoryQueue actor" in {
+
+    val mock = system.actorOf(Props(new Actor() {
+      override def receive: Receive = {
+        case getActivation: GetActivation =>
+          testActor ! getActivation
+          sender() ! ActivationResponse(Right(message))
+      }
+    }))
+
+    QueuePool.put(MemoryQueueKey(testEntityPath.asString, testDoc), MemoryQueueValue(mock, true))
+    val activationServiceImpl = ActivationServiceImpl()
+
+    activationServiceImpl
+      .fetchActivation(
+        FetchRequest(
+          message.user.namespace.name.asString,
+          testFQN.serialize,
+          testDocRevision.serialize,
+          testContainerId,
+          false,
+          alive = true))
+      .futureValue shouldBe FetchResponse(ActivationResponse(Right(message)).serialize)
+
+    expectMsg(GetActivation(testFQN, testContainerId, false, None))
+  }
+
+  it should "return NoMemoryQueue if there is no queue" in {
+    val activationServiceImpl = ActivationServiceImpl()
+
+    activationServiceImpl
+      .fetchActivation(
+        FetchRequest(
+          message.user.namespace.name.asString,
+          testFQN.serialize,
+          testDocRevision.serialize,
+          testContainerId,
+          false,
+          alive = true))
+      .futureValue shouldBe FetchResponse(ActivationResponse(Left(NoMemoryQueue())).serialize)
+
+    expectNoMessage(200.millis)
+  }
+
+  it should "return ActionMismatchError if get request for an old action" in {
+
+    val activationServiceImpl = ActivationServiceImpl()
+
+    QueuePool.put(MemoryQueueKey(testEntityPath.asString, testDoc), MemoryQueueValue(testActor, true))
+
+    activationServiceImpl
+      .fetchActivation(
+        FetchRequest( // same doc id but with a different doc revision
+          message.user.namespace.name.asString,
+          testFQN.serialize,
+          DocRevision("new-one").serialize,
+          testContainerId,
+          false,
+          alive = true))
+      .futureValue shouldBe FetchResponse(ActivationResponse(Left(ActionMismatch())).serialize)
+
+    expectNoMessage(200.millis)
+  }
+
+  it should "reschedule activation message to the queue" in {
+
+    val mock = system.actorOf(Props(new Actor() {
+      override def receive: Receive = {
+        case message: ActivationMessage =>
+          testActor ! message
+      }
+    }))
+    val activationServiceImpl = ActivationServiceImpl()
+
+    QueuePool.put(MemoryQueueKey(testEntityPath.asString, testDoc), MemoryQueueValue(mock, true))
+
+    activationServiceImpl
+      .rescheduleActivation(
+        RescheduleRequest( // same doc id but with a different doc revision
+          message.user.namespace.name.asString,
+          testFQN.serialize,
+          testDocRevision.serialize,
+          message.serialize))
+      .futureValue shouldBe RescheduleResponse(isRescheduled = true)
+
+    expectMsg(message)
+  }
+
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/CommonVariable.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/CommonVariable.scala
new file mode 100644
index 0000000..b3efe97
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/CommonVariable.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.grpc.test
+
+import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
+import org.apache.openwhisk.core.entity._
+
+trait CommonVariable {
+  val testInvocationNamespace = "test-invocation-namespace"
+  val testInvocationEntityPath = EntityPath(testInvocationNamespace)
+  val testNamespace = "test-namespace"
+  val testEntityPath = EntityPath(testNamespace)
+  val testAction = "test-fqn"
+  val testEntityName = EntityName(testAction)
+  val testDocRevision = DocRevision("1-test-revision")
+  val testContainerId = "fakeContainerId"
+  val semVer = SemVer(0, 1, 1)
+  val testVersion = Some(semVer)
+  val testFQN = FullyQualifiedEntityName(testEntityPath, testEntityName, testVersion)
+  val testExec = CodeExecAsString(RuntimeManifest("nodejs:10", ImageName("testImage")), "testCode", None)
+  val testExecMetadata =
+    CodeExecMetaDataAsString(testExec.manifest, entryPoint = testExec.entryPoint)
+  val testActionMetaData =
+    WhiskActionMetaData(testEntityPath, testEntityName, testExecMetadata, version = semVer)
+}
diff --git a/tools/travis/runUnitTests.sh b/tools/travis/runUnitTests.sh
index e2ef4ac..3f66cae 100755
--- a/tools/travis/runUnitTests.sh
+++ b/tools/travis/runUnitTests.sh
@@ -26,8 +26,6 @@ cd $ROOTDIR/tools/travis
 export TESTCONTAINERS_RYUK_DISABLED="true"
 export ORG_GRADLE_PROJECT_testSetName="REQUIRE_ONLY_DB"
 
-./scan.sh
-
 ./setupPrereq.sh
 
 cat "$ROOTDIR/tests/src/test/resources/application.conf"
diff --git a/tools/travis/setup.sh b/tools/travis/setup.sh
index 758d5a0..6d67941 100755
--- a/tools/travis/setup.sh
+++ b/tools/travis/setup.sh
@@ -42,6 +42,9 @@ python -m pip install --user pydocumentdb
 # Support the revises log upload script
 python -m pip install --user humanize requests
 
+# Scan code before compiling the code
+./scan.sh
+
 # Basic check that all code compiles and depdendencies are downloaded correctly.
 # Compiling the tests will compile all components as well.
 #