You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2021/04/13 10:34:20 UTC
[openwhisk] branch master updated: [New Scheduler] Add
ActivationService (#5070)
This is an automated email from the ASF dual-hosted git repository.
style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new cd6fded [New Scheduler] Add ActivationService (#5070)
cd6fded is described below
commit cd6fded8a6836756cbfbe4159064c85683b64cd7
Author: Seonghyun Oh <oh...@navercorp.com>
AuthorDate: Tue Apr 13 19:32:48 2021 +0900
[New Scheduler] Add ActivationService (#5070)
* Add ActivationService for scheduler
* Add annotation
* Add license header
* Reformat activation.proto
* Reduce request timeout
* Add license header
* Scan code before compiling the code
---
.../org/apache/openwhisk/core/entity/DocInfo.scala | 4 +
.../core/entity/FullyQualifiedEntityName.scala | 3 +
core/scheduler/build.gradle | 29 ++++
core/scheduler/src/main/java/Empty.java | 22 +++
core/scheduler/src/main/protobuf/activation.proto | 66 ++++++++
.../scheduler/grpc/ActivationServiceImpl.scala | 135 ++++++++++++++++
.../core/scheduler/queue/QueueManager.scala | 119 ++++++++++++++
.../grpc/test/ActivationServiceImplTests.scala | 173 +++++++++++++++++++++
.../core/scheduler/grpc/test/CommonVariable.scala | 40 +++++
tools/travis/runUnitTests.sh | 2 -
tools/travis/setup.sh | 3 +
11 files changed, 594 insertions(+), 2 deletions(-)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/DocInfo.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/DocInfo.scala
index f8849d1..77e2008 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/DocInfo.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/DocInfo.scala
@@ -25,6 +25,7 @@ import spray.json.JsString
import spray.json.JsValue
import spray.json.RootJsonFormat
import spray.json.deserializationError
+import spray.json._
import org.apache.openwhisk.core.entity.ArgNormalizer.trim
@@ -59,6 +60,7 @@ protected[core] class DocRevision private (val rev: String) extends AnyVal {
def asString = rev // to make explicit that this is a string conversion
def empty = rev == null
override def toString = rev
+ def serialize = DocRevision.serdes.write(this).compactPrint
}
/**
@@ -131,6 +133,8 @@ protected[core] object DocRevision {
protected[core] val empty: DocRevision = new DocRevision(null)
+ protected[core] def parse(msg: String) = Try(serdes.read(msg.parseJson))
+
implicit val serdes = new RootJsonFormat[DocRevision] {
def write(d: DocRevision) = if (d.rev != null) JsString(d.rev) else JsNull
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/FullyQualifiedEntityName.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/FullyQualifiedEntityName.scala
index 44bb971..f766967 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/FullyQualifiedEntityName.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/FullyQualifiedEntityName.scala
@@ -56,6 +56,7 @@ protected[core] case class FullyQualifiedEntityName(path: EntityPath,
def namespace: EntityName = path.root
def qualifiedNameWithLeadingSlash: String = EntityPath.PATHSEP + qualifiedName
def asString = path.addPath(name) + version.map("@" + _.toString).getOrElse("")
+ def serialize = FullyQualifiedEntityName.serdes.write(this).compactPrint
override def size = qualifiedName.sizeInBytes
override def toString = asString
@@ -101,6 +102,8 @@ protected[core] object FullyQualifiedEntityName extends DefaultJsonProtocol {
}
}
+ protected[core] def parse(msg: String) = Try(serdes.read(msg.parseJson))
+
/**
* Converts the name to a fully qualified name.
* There are 3 cases:
diff --git a/core/scheduler/build.gradle b/core/scheduler/build.gradle
index 530c962..65d088a 100644
--- a/core/scheduler/build.gradle
+++ b/core/scheduler/build.gradle
@@ -20,6 +20,7 @@ apply plugin: 'application'
apply plugin: 'eclipse'
apply plugin: 'maven'
apply plugin: 'org.scoverage'
+apply plugin: 'com.lightbend.akka.grpc.gradle'
ext.dockerImageName = 'scheduler'
apply from: '../../gradle/docker.gradle'
@@ -33,6 +34,20 @@ ext.coverageDirs = [
]
distDockerCoverage.dependsOn ':common:scala:scoverageClasses', 'scoverageClasses'
+buildscript {
+ repositories {
+ mavenLocal()
+ maven {
+ url "https://plugins.gradle.org/m2/"
+ }
+ }
+ dependencies {
+ // see https://plugins.gradle.org/plugin/com.lightbend.akka.grpc.gradle
+ // for the currently latest version.
+ classpath 'gradle.plugin.com.lightbend.akka.grpc:akka-grpc-gradle-plugin:0.7.2'
+ }
+}
+
// Define a separate configuration for managing the dependency on Jetty ALPN agent.
configurations {
alpnagent
@@ -51,7 +66,21 @@ dependencies {
compile "org.scala-lang:scala-library:${gradle.scala.version}"
compile project(':common:scala')
+}
+// workaround for akka-grpc
+// https://github.com/akka/akka-grpc/issues/786
+printProtocLogs.doFirst {
+ mkdir "$buildDir"
+ file("$buildDir/akka-grpc-gradle-plugin.log").text = "x"
+ mkdir "$project.rootDir/build"
+ file("$project.rootDir/build/akka-grpc-gradle-plugin.log").text = "x"
+}
+printProtocLogs.configure {
+ mkdir "$buildDir"
+ file("$buildDir/akka-grpc-gradle-plugin.log").text = "x"
+ mkdir "$project.rootDir/build"
+ file("$project.rootDir/build/akka-grpc-gradle-plugin.log").text = "x"
}
mainClassName = "org.apache.openwhisk.core.scheduler.Scheduler"
diff --git a/core/scheduler/src/main/java/Empty.java b/core/scheduler/src/main/java/Empty.java
new file mode 100644
index 0000000..b982d8f
--- /dev/null
+++ b/core/scheduler/src/main/java/Empty.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class Empty {
+ // Workaround for this issue https://github.com/akka/akka-grpc/issues/289
+ // Gradle complains about no java sources.
+ // Note. Openwhisk is using a lower gradle version, so the latest akka-grpc version cannot be used.
+}
diff --git a/core/scheduler/src/main/protobuf/activation.proto b/core/scheduler/src/main/protobuf/activation.proto
new file mode 100644
index 0000000..fb16f48
--- /dev/null
+++ b/core/scheduler/src/main/protobuf/activation.proto
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+import "google/protobuf/wrappers.proto";
+
+//#options
+option java_multiple_files = true;
+option java_package = "org.apache.openwhisk.grpc";
+option java_outer_classname = "ActivationProto";
+
+package activation;
+//#options
+
+//#services
+service ActivationService {
+ rpc FetchActivation (FetchRequest) returns (FetchResponse) {}
+ rpc RescheduleActivation (RescheduleRequest) returns (RescheduleResponse) {}
+}
+//#services
+
+//#messages
+// The request message
+message FetchRequest {
+ string invocationNamespace = 1;
+ string fqn = 2;
+ string rev = 3;
+ string containerId = 4;
+ bool warmed = 5;
+ // This allows optional value
+ google.protobuf.Int64Value lastDuration = 6;
+ // to record alive containers
+ bool alive = 7;
+}
+
+// The response message
+message FetchResponse {
+ string activationMessage = 1;
+}
+
+message RescheduleRequest {
+ string invocationNamespace = 1;
+ string fqn = 2;
+ string rev = 3;
+ string activationMessage = 4;
+}
+
+message RescheduleResponse {
+ // if reschedule request is failed, then it will be `false`
+ bool isRescheduled = 1;
+}
+
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala
new file mode 100644
index 0000000..d80cd42
--- /dev/null
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.grpc
+
+import akka.actor.ActorSystem
+import akka.pattern.ask
+import akka.util.Timeout
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.connector.{ActivationMessage, Message}
+import org.apache.openwhisk.core.entity.{DocRevision, FullyQualifiedEntityName}
+import org.apache.openwhisk.core.scheduler.queue._
+import org.apache.openwhisk.grpc.{ActivationService, FetchRequest, FetchResponse, RescheduleRequest, RescheduleResponse}
+import spray.json._
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContextExecutor, Future}
+import scala.util.Try
+
+class ActivationServiceImpl()(implicit actorSystem: ActorSystem, logging: Logging) extends ActivationService {
+ implicit val requestTimeout: Timeout = Timeout(5.seconds)
+ implicit val ec: ExecutionContextExecutor = actorSystem.dispatcher
+
+ override def rescheduleActivation(request: RescheduleRequest): Future[RescheduleResponse] = {
+ logging.info(this, s"Try to reschedule activation ${request.invocationNamespace} ${request.fqn} ${request.rev}")
+ Future(for {
+ fqn <- FullyQualifiedEntityName.parse(request.fqn)
+ rev <- DocRevision.parse(request.rev)
+ msg <- ActivationMessage.parse(request.activationMessage)
+ } yield (fqn, rev, msg)).flatMap(Future.fromTry) flatMap { res =>
+ {
+ val key = res._1.toDocId.asDocInfo(res._2)
+ QueuePool.get(MemoryQueueKey(request.invocationNamespace, key)) match {
+ case Some(queueValue) =>
+ // enqueue activation message to reschedule
+ logging.info(
+ this,
+ s"Enqueue activation message to reschedule ${request.invocationNamespace} ${request.fqn} ${request.rev}")
+ queueValue.queue ? res._3
+ Future.successful(RescheduleResponse(isRescheduled = true))
+ case None =>
+ logging.error(this, s"Queue not found for ${request.invocationNamespace} ${request.fqn} ${request.rev}")
+ Future.successful(RescheduleResponse())
+ }
+ }
+ }
+ }
+
+ override def fetchActivation(request: FetchRequest): Future[FetchResponse] = {
+ Future(for {
+ fqn <- FullyQualifiedEntityName.parse(request.fqn)
+ rev <- DocRevision.parse(request.rev)
+ } yield (fqn, rev)).flatMap(Future.fromTry) flatMap { res =>
+ val key = res._1.toDocId.asDocInfo(res._2)
+ QueuePool.get(MemoryQueueKey(request.invocationNamespace, key)) match {
+ case Some(queueValue) =>
+ (queueValue.queue ? GetActivation(
+ res._1,
+ request.containerId,
+ request.warmed,
+ request.lastDuration,
+ request.alive))
+ .mapTo[ActivationResponse]
+ .map { response =>
+ FetchResponse(response.serialize)
+ }
+ .recover {
+ case t: Throwable =>
+ logging.error(this, s"Failed to get message from QueueManager, error: ${t.getMessage}")
+ FetchResponse(ActivationResponse(Left(NoActivationMessage())).serialize)
+ }
+ case None =>
+ if (QueuePool.keys.exists { mkey =>
+ mkey.invocationNamespace == request.invocationNamespace && mkey.docInfo.id == key.id
+ })
+ Future.successful(FetchResponse(ActivationResponse(Left(ActionMismatch())).serialize))
+ else
+ Future.successful(FetchResponse(ActivationResponse(Left(NoMemoryQueue())).serialize))
+ }
+ }
+ }
+}
+
+object ActivationServiceImpl {
+
+ def apply()(implicit actorSystem: ActorSystem, logging: Logging) =
+ new ActivationServiceImpl()
+}
+
+case class GetActivation(action: FullyQualifiedEntityName,
+ containerId: String,
+ warmed: Boolean,
+ lastDuration: Option[Long],
+ alive: Boolean = true)
+case class ActivationResponse(message: Either[MemoryQueueError, ActivationMessage]) extends Message {
+ override def serialize = ActivationResponse.serdes.write(this).compactPrint
+}
+
+object ActivationResponse extends DefaultJsonProtocol {
+
+ private implicit val noMessageSerdes = NoActivationMessage.serdes
+ private implicit val noQueueSerdes = NoMemoryQueue.serdes
+ private implicit val mismatchSerdes = ActionMismatch.serdes
+ private implicit val messageSerdes = ActivationMessage.serdes
+ private implicit val memoryqueueuErrorSerdes = MemoryQueueErrorSerdes.memoryQueueErrorFormat
+
+ def parse(msg: String) = Try(serdes.read(msg.parseJson))
+
+ implicit def rootEitherFormat[A: RootJsonFormat, B: RootJsonFormat] =
+ new RootJsonFormat[Either[A, B]] {
+ val format = DefaultJsonProtocol.eitherFormat[A, B]
+
+ def write(either: Either[A, B]) = format.write(either)
+
+ def read(value: JsValue) = format.read(value)
+ }
+
+ type ActivationResponse = Either[MemoryQueueError, ActivationMessage]
+ implicit val serdes = jsonFormat(ActivationResponse.apply _, "message")
+
+}
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
new file mode 100644
index 0000000..1b6b818
--- /dev/null
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.queue
+
+import akka.actor.ActorRef
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.entity._
+import spray.json.{DefaultJsonProtocol, _}
+import scala.collection.concurrent.TrieMap
+import scala.util.Try
+
+object QueueSize
+case class MemoryQueueKey(invocationNamespace: String, docInfo: DocInfo)
+case class MemoryQueueValue(queue: ActorRef, isLeader: Boolean)
+
+sealed trait MemoryQueueError extends Product {
+ val causedBy: String
+}
+
+object MemoryQueueErrorSerdes {
+
+ private implicit val noMessageSerdes = NoActivationMessage.serdes
+ private implicit val noQueueSerdes = NoMemoryQueue.serdes
+ private implicit val mismatchSerdes = ActionMismatch.serdes
+
+ // format that discriminates based on an additional
+ // field "type" that can either be "Cat" or "Dog"
+ implicit val memoryQueueErrorFormat = new RootJsonFormat[MemoryQueueError] {
+ def write(obj: MemoryQueueError): JsValue =
+ JsObject((obj match {
+ case msg: NoActivationMessage => msg.toJson
+ case msg: NoMemoryQueue => msg.toJson
+ case msg: ActionMismatch => msg.toJson
+ }).asJsObject.fields + ("type" -> JsString(obj.productPrefix)))
+
+ def read(json: JsValue): MemoryQueueError =
+ json.asJsObject.getFields("type") match {
+ case Seq(JsString("NoActivationMessage")) => json.convertTo[NoActivationMessage]
+ case Seq(JsString("NoMemoryQueue")) => json.convertTo[NoMemoryQueue]
+ case Seq(JsString("ActionMismatch")) => json.convertTo[ActionMismatch]
+ }
+ }
+}
+
+case class NoActivationMessage(noActivationMessage: String = NoActivationMessage.asString)
+ extends MemoryQueueError
+ with Message {
+ override val causedBy: String = noActivationMessage
+ override def serialize = NoActivationMessage.serdes.write(this).compactPrint
+}
+
+object NoActivationMessage extends DefaultJsonProtocol {
+ val asString: String = "no activation message exist"
+ def parse(msg: String) = Try(serdes.read(msg.parseJson))
+ implicit val serdes = jsonFormat(NoActivationMessage.apply _, "noActivationMessage")
+}
+
+case class NoMemoryQueue(noMemoryQueue: String = NoMemoryQueue.asString) extends MemoryQueueError with Message {
+ override val causedBy: String = noMemoryQueue
+ override def serialize = NoMemoryQueue.serdes.write(this).compactPrint
+}
+
+object NoMemoryQueue extends DefaultJsonProtocol {
+ val asString: String = "no memory queue exist"
+ def parse(msg: String) = Try(serdes.read(msg.parseJson))
+ implicit val serdes = jsonFormat(NoMemoryQueue.apply _, "noMemoryQueue")
+}
+
+case class ActionMismatch(actionMisMatch: String = ActionMismatch.asString) extends MemoryQueueError with Message {
+ override val causedBy: String = actionMisMatch
+ override def serialize = ActionMismatch.serdes.write(this).compactPrint
+}
+
+object ActionMismatch extends DefaultJsonProtocol {
+ val asString: String = "action version does not match"
+ def parse(msg: String) = Try(serdes.read(msg.parseJson))
+ implicit val serdes = jsonFormat(ActionMismatch.apply _, "actionMisMatch")
+}
+
+object QueuePool {
+ private val _queuePool = TrieMap[MemoryQueueKey, MemoryQueueValue]()
+
+ private[scheduler] def get(key: MemoryQueueKey) = _queuePool.get(key)
+
+ private[scheduler] def put(key: MemoryQueueKey, value: MemoryQueueValue) = _queuePool.put(key, value)
+
+ private[scheduler] def remove(key: MemoryQueueKey) = _queuePool.remove(key)
+
+ private[scheduler] def countLeader() = _queuePool.count(_._2.isLeader)
+
+ private[scheduler] def clear(): Unit = _queuePool.clear()
+
+ private[scheduler] def size = _queuePool.size
+
+ private[scheduler] def values = _queuePool.values
+
+ private[scheduler] def keys = _queuePool.keys
+}
+
+case class CreateQueue(invocationNamespace: String,
+ fqn: FullyQualifiedEntityName,
+ revision: DocRevision,
+ whiskActionMetaData: WhiskActionMetaData)
+case class CreateQueueResponse(invocationNamespace: String, fqn: FullyQualifiedEntityName, success: Boolean)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala
new file mode 100644
index 0000000..75c913e
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.grpc.test
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.ActorMaterializer
+import akka.testkit.{ImplicitSender, TestKit}
+import common.StreamLogging
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.connector.ActivationMessage
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.scheduler.grpc.ActivationServiceImpl
+import org.apache.openwhisk.core.scheduler.queue.{
+ ActionMismatch,
+ MemoryQueueKey,
+ MemoryQueueValue,
+ NoMemoryQueue,
+ QueuePool
+}
+import org.apache.openwhisk.grpc.{FetchRequest, FetchResponse, RescheduleRequest, RescheduleResponse}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, Matchers}
+import org.apache.openwhisk.core.scheduler.grpc.{ActivationResponse, GetActivation}
+import org.scalatest.concurrent.ScalaFutures
+
+import scala.concurrent.duration._
+
+@RunWith(classOf[JUnitRunner])
+class ActivationServiceImplTests
+ extends TestKit(ActorSystem("ActivationService"))
+ with CommonVariable
+ with ImplicitSender
+ with FlatSpecLike
+ with Matchers
+ with BeforeAndAfterAll
+ with BeforeAndAfterEach
+ with ScalaFutures
+ with StreamLogging {
+
+ override def afterAll = {
+ QueuePool.clear()
+ TestKit.shutdownActorSystem(system)
+ }
+ override def beforeEach = QueuePool.clear()
+
+ behavior of "ActivationService"
+
+ implicit val mat = ActorMaterializer()
+ implicit val ec = system.dispatcher
+
+ val messageTransId = TransactionId(TransactionId.testing.meta.id)
+ val uuid = UUID()
+
+ val testDoc = testFQN.toDocId.asDocInfo(testDocRevision)
+ val message = ActivationMessage(
+ messageTransId,
+ FullyQualifiedEntityName(testEntityPath, testEntityName),
+ DocRevision.empty,
+ Identity(
+ Subject(),
+ Namespace(EntityName(testNamespace), uuid),
+ BasicAuthenticationAuthKey(uuid, Secret()),
+ Set.empty),
+ ActivationId.generate(),
+ ControllerInstanceId("0"),
+ blocking = false,
+ content = None)
+
+ it should "send GetActivation message to the MemoryQueue actor" in {
+
+ val mock = system.actorOf(Props(new Actor() {
+ override def receive: Receive = {
+ case getActivation: GetActivation =>
+ testActor ! getActivation
+ sender() ! ActivationResponse(Right(message))
+ }
+ }))
+
+ QueuePool.put(MemoryQueueKey(testEntityPath.asString, testDoc), MemoryQueueValue(mock, true))
+ val activationServiceImpl = ActivationServiceImpl()
+
+ activationServiceImpl
+ .fetchActivation(
+ FetchRequest(
+ message.user.namespace.name.asString,
+ testFQN.serialize,
+ testDocRevision.serialize,
+ testContainerId,
+ false,
+ alive = true))
+ .futureValue shouldBe FetchResponse(ActivationResponse(Right(message)).serialize)
+
+ expectMsg(GetActivation(testFQN, testContainerId, false, None))
+ }
+
+ it should "return NoMemoryQueue if there is no queue" in {
+ val activationServiceImpl = ActivationServiceImpl()
+
+ activationServiceImpl
+ .fetchActivation(
+ FetchRequest(
+ message.user.namespace.name.asString,
+ testFQN.serialize,
+ testDocRevision.serialize,
+ testContainerId,
+ false,
+ alive = true))
+ .futureValue shouldBe FetchResponse(ActivationResponse(Left(NoMemoryQueue())).serialize)
+
+ expectNoMessage(200.millis)
+ }
+
+ it should "return ActionMismatchError if get request for an old action" in {
+
+ val activationServiceImpl = ActivationServiceImpl()
+
+ QueuePool.put(MemoryQueueKey(testEntityPath.asString, testDoc), MemoryQueueValue(testActor, true))
+
+ activationServiceImpl
+ .fetchActivation(
+ FetchRequest( // same doc id but with a different doc revision
+ message.user.namespace.name.asString,
+ testFQN.serialize,
+ DocRevision("new-one").serialize,
+ testContainerId,
+ false,
+ alive = true))
+ .futureValue shouldBe FetchResponse(ActivationResponse(Left(ActionMismatch())).serialize)
+
+ expectNoMessage(200.millis)
+ }
+
+ it should "reschedule activation message to the queue" in {
+
+ val mock = system.actorOf(Props(new Actor() {
+ override def receive: Receive = {
+ case message: ActivationMessage =>
+ testActor ! message
+ }
+ }))
+ val activationServiceImpl = ActivationServiceImpl()
+
+ QueuePool.put(MemoryQueueKey(testEntityPath.asString, testDoc), MemoryQueueValue(mock, true))
+
+ activationServiceImpl
+ .rescheduleActivation(
+ RescheduleRequest( // same doc id but with a different doc revision
+ message.user.namespace.name.asString,
+ testFQN.serialize,
+ testDocRevision.serialize,
+ message.serialize))
+ .futureValue shouldBe RescheduleResponse(isRescheduled = true)
+
+ expectMsg(message)
+ }
+
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/CommonVariable.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/CommonVariable.scala
new file mode 100644
index 0000000..b3efe97
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/CommonVariable.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.grpc.test
+
+import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
+import org.apache.openwhisk.core.entity._
+
+trait CommonVariable {
+ val testInvocationNamespace = "test-invocation-namespace"
+ val testInvocationEntityPath = EntityPath(testInvocationNamespace)
+ val testNamespace = "test-namespace"
+ val testEntityPath = EntityPath(testNamespace)
+ val testAction = "test-fqn"
+ val testEntityName = EntityName(testAction)
+ val testDocRevision = DocRevision("1-test-revision")
+ val testContainerId = "fakeContainerId"
+ val semVer = SemVer(0, 1, 1)
+ val testVersion = Some(semVer)
+ val testFQN = FullyQualifiedEntityName(testEntityPath, testEntityName, testVersion)
+ val testExec = CodeExecAsString(RuntimeManifest("nodejs:10", ImageName("testImage")), "testCode", None)
+ val testExecMetadata =
+ CodeExecMetaDataAsString(testExec.manifest, entryPoint = testExec.entryPoint)
+ val testActionMetaData =
+ WhiskActionMetaData(testEntityPath, testEntityName, testExecMetadata, version = semVer)
+}
diff --git a/tools/travis/runUnitTests.sh b/tools/travis/runUnitTests.sh
index e2ef4ac..3f66cae 100755
--- a/tools/travis/runUnitTests.sh
+++ b/tools/travis/runUnitTests.sh
@@ -26,8 +26,6 @@ cd $ROOTDIR/tools/travis
export TESTCONTAINERS_RYUK_DISABLED="true"
export ORG_GRADLE_PROJECT_testSetName="REQUIRE_ONLY_DB"
-./scan.sh
-
./setupPrereq.sh
cat "$ROOTDIR/tests/src/test/resources/application.conf"
diff --git a/tools/travis/setup.sh b/tools/travis/setup.sh
index 758d5a0..6d67941 100755
--- a/tools/travis/setup.sh
+++ b/tools/travis/setup.sh
@@ -42,6 +42,9 @@ python -m pip install --user pydocumentdb
# Support the revises log upload script
python -m pip install --user humanize requests
+# Scan code before compiling the code
+./scan.sh
+
# Basic check that all code compiles and depdendencies are downloaded correctly.
# Compiling the tests will compile all components as well.
#