You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2021/02/22 23:46:51 UTC

[GitHub] [openwhisk] bdoyle0182 commented on a change in pull request #5070: [New Scheduler] Add ActivationService

bdoyle0182 commented on a change in pull request #5070:
URL: https://github.com/apache/openwhisk/pull/5070#discussion_r580666115



##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala
##########
@@ -0,0 +1,118 @@
+package org.apache.openwhisk.core.scheduler.grpc
+
+import akka.actor.ActorSystem
+import akka.pattern.ask
+import akka.util.Timeout
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.connector.{ActivationMessage, Message}
+import org.apache.openwhisk.core.entity.{DocRevision, FullyQualifiedEntityName}
+import org.apache.openwhisk.core.scheduler.queue._
+import org.apache.openwhisk.grpc.{ActivationService, FetchRequest, FetchResponse, RescheduleRequest, RescheduleResponse}
+import spray.json._
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContextExecutor, Future}
+import scala.util.Try
+
+class ActivationServiceImpl()(implicit actorSystem: ActorSystem, logging: Logging) extends ActivationService {
+  implicit val requestTimeout: Timeout = Timeout(50.seconds)

Review comment:
       should this not be hardcoded?

##########
File path: core/scheduler/src/main/java/Empty.java
##########
@@ -0,0 +1,5 @@
+public class Empty {
+    // Workaround for this issue https://github.com/akka/akka-grpc/issues/289
+    // Gradle complains about no java sources.
+    // Note. Openwhisk is using a lower gradle version, so the latest akka-grpc version cannot be used.

Review comment:
       Can we fix this? The issue has been fixed

##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala
##########
@@ -0,0 +1,118 @@
+package org.apache.openwhisk.core.scheduler.grpc
+
+import akka.actor.ActorSystem
+import akka.pattern.ask
+import akka.util.Timeout
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.connector.{ActivationMessage, Message}
+import org.apache.openwhisk.core.entity.{DocRevision, FullyQualifiedEntityName}
+import org.apache.openwhisk.core.scheduler.queue._
+import org.apache.openwhisk.grpc.{ActivationService, FetchRequest, FetchResponse, RescheduleRequest, RescheduleResponse}
+import spray.json._
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContextExecutor, Future}
+import scala.util.Try
+
+class ActivationServiceImpl()(implicit actorSystem: ActorSystem, logging: Logging) extends ActivationService {
+  implicit val requestTimeout: Timeout = Timeout(50.seconds)
+  implicit val ec: ExecutionContextExecutor = actorSystem.dispatcher
+
+  override def rescheduleActivation(request: RescheduleRequest): Future[RescheduleResponse] = {
+    logging.info(this, s"Try to reschedule activation ${request.invocationNamespace} ${request.fqn} ${request.rev}")
+    Future(for {
+      fqn <- FullyQualifiedEntityName.parse(request.fqn)
+      rev <- DocRevision.parse(request.rev)
+      msg <- ActivationMessage.parse(request.activationMessage)
+    } yield (fqn, rev, msg)).flatMap(Future.fromTry) flatMap { res =>
+      {
+        val key = res._1.toDocId.asDocInfo(res._2)
+        QueuePool.get(MemoryQueueKey(request.invocationNamespace, key)) match {
+          case Some(queueValue) =>
+            // enqueue activation message to reschedule
+            logging.info(

Review comment:
       Should this be `debug`

##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
##########
@@ -0,0 +1,102 @@
+package org.apache.openwhisk.core.scheduler.queue
+
+import akka.actor.ActorRef
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.entity._
+import spray.json.{DefaultJsonProtocol, _}
+import scala.collection.concurrent.TrieMap
+import scala.util.Try
+
+object QueueSize
+case class MemoryQueueKey(invocationNamespace: String, docInfo: DocInfo)
+case class MemoryQueueValue(queue: ActorRef, isLeader: Boolean)
+
+sealed trait MemoryQueueError extends Product {
+  val causedBy: String
+}
+
+object MemoryQueueErrorSerdes {
+
+  private implicit val noMessageSerdes = NoActivationMessage.serdes
+  private implicit val noQueueSerdes = NoMemoryQueue.serdes
+  private implicit val mismatchSerdes = ActionMismatch.serdes
+
+  // format that discriminates based on an additional
+  // field "type" that can either be "Cat" or "Dog"
+  implicit val memoryQueueErrorFormat = new RootJsonFormat[MemoryQueueError] {
+    def write(obj: MemoryQueueError): JsValue =
+      JsObject((obj match {
+        case msg: NoActivationMessage => msg.toJson
+        case msg: NoMemoryQueue       => msg.toJson
+        case msg: ActionMismatch      => msg.toJson
+      }).asJsObject.fields + ("type" -> JsString(obj.productPrefix)))
+
+    def read(json: JsValue): MemoryQueueError =
+      json.asJsObject.getFields("type") match {
+        case Seq(JsString("NoActivationMessage")) => json.convertTo[NoActivationMessage]
+        case Seq(JsString("NoMemoryQueue"))       => json.convertTo[NoMemoryQueue]
+        case Seq(JsString("ActionMismatch"))      => json.convertTo[ActionMismatch]
+      }
+  }
+}
+
+case class NoActivationMessage(noActivationMessage: String = NoActivationMessage.asString)
+    extends MemoryQueueError
+    with Message {
+  override val causedBy: String = noActivationMessage
+  override def serialize = NoActivationMessage.serdes.write(this).compactPrint
+}
+
+object NoActivationMessage extends DefaultJsonProtocol {
+  val asString: String = "no activation message exist"
+  def parse(msg: String) = Try(serdes.read(msg.parseJson))
+  implicit val serdes = jsonFormat(NoActivationMessage.apply _, "noActivationMessage")
+}
+
+case class NoMemoryQueue(noMemoryQueue: String = NoMemoryQueue.asString) extends MemoryQueueError with Message {
+  override val causedBy: String = noMemoryQueue
+  override def serialize = NoMemoryQueue.serdes.write(this).compactPrint
+}
+
+object NoMemoryQueue extends DefaultJsonProtocol {
+  val asString: String = "no memory queue exist"
+  def parse(msg: String) = Try(serdes.read(msg.parseJson))
+  implicit val serdes = jsonFormat(NoMemoryQueue.apply _, "noMemoryQueue")
+}
+
+case class ActionMismatch(actionMisMatch: String = ActionMismatch.asString) extends MemoryQueueError with Message {
+  override val causedBy: String = actionMisMatch
+  override def serialize = ActionMismatch.serdes.write(this).compactPrint
+}
+
+object ActionMismatch extends DefaultJsonProtocol {
+  val asString: String = "action version does not match"
+  def parse(msg: String) = Try(serdes.read(msg.parseJson))
+  implicit val serdes = jsonFormat(ActionMismatch.apply _, "actionMisMatch")
+}
+
+object QueuePool {

Review comment:
       Is this safe as a global static object being accessed from futures?

##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala
##########
@@ -0,0 +1,118 @@
+package org.apache.openwhisk.core.scheduler.grpc
+
+import akka.actor.ActorSystem
+import akka.pattern.ask
+import akka.util.Timeout
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.connector.{ActivationMessage, Message}
+import org.apache.openwhisk.core.entity.{DocRevision, FullyQualifiedEntityName}
+import org.apache.openwhisk.core.scheduler.queue._
+import org.apache.openwhisk.grpc.{ActivationService, FetchRequest, FetchResponse, RescheduleRequest, RescheduleResponse}
+import spray.json._
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContextExecutor, Future}
+import scala.util.Try
+
+class ActivationServiceImpl()(implicit actorSystem: ActorSystem, logging: Logging) extends ActivationService {
+  implicit val requestTimeout: Timeout = Timeout(50.seconds)
+  implicit val ec: ExecutionContextExecutor = actorSystem.dispatcher
+
+  override def rescheduleActivation(request: RescheduleRequest): Future[RescheduleResponse] = {
+    logging.info(this, s"Try to reschedule activation ${request.invocationNamespace} ${request.fqn} ${request.rev}")

Review comment:
       I think this should be `debug` statement

##########
File path: core/scheduler/src/main/protobuf/activation.proto
##########
@@ -0,0 +1,52 @@
+syntax = "proto3";
+import "google/protobuf/wrappers.proto";
+
+//#options
+option java_multiple_files = true;
+option java_package = "org.apache.openwhisk.grpc";
+option java_outer_classname = "ActivationProto";
+
+package activation;
+//#options
+
+//#services
+service ActivationService {
+

Review comment:
       nit: remove extra lines

##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala
##########
@@ -0,0 +1,118 @@
+package org.apache.openwhisk.core.scheduler.grpc
+
+import akka.actor.ActorSystem
+import akka.pattern.ask
+import akka.util.Timeout
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.connector.{ActivationMessage, Message}
+import org.apache.openwhisk.core.entity.{DocRevision, FullyQualifiedEntityName}
+import org.apache.openwhisk.core.scheduler.queue._
+import org.apache.openwhisk.grpc.{ActivationService, FetchRequest, FetchResponse, RescheduleRequest, RescheduleResponse}
+import spray.json._
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContextExecutor, Future}
+import scala.util.Try
+
+class ActivationServiceImpl()(implicit actorSystem: ActorSystem, logging: Logging) extends ActivationService {
+  implicit val requestTimeout: Timeout = Timeout(50.seconds)

Review comment:
       should this not be hardcoded?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org