You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2020/09/24 23:15:59 UTC

[GitHub] [openwhisk] style95 opened a new pull request #4983: [New scheduler] Initial commit for the scheduler component

style95 opened a new pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983


   ## Description
   This is the first change to add the new scheduler component.
   A series of subsequent PRs would be opened by me and my team members.
   We will add the "scheduler" label and `[New scheduler]` prefix in the title of PRs.
   
   Please refer to the issue for more information and discussion history.
   You may find this useful as well: [new scheduler design](https://cwiki.apache.org/confluence/display/OPENWHISK/New+architecture+proposal)
   JFYI, this scheduler is running in production in Naver.
   
   There are many "TODO"s in this PR.
   As we agreed to incrementally merge PRs along with temporal commits, I left many parts of the original codes as "TODO".
   I wanted to give some hints to reviewers about how it would be working.
   
   
   ## Related issue and scope
   - [x] I opened an issue to propose and discuss this change (#4922)
   
   ## My changes affect the following components
   - [ ] API
   - [ ] Controller
   - [ ] Message Bus (e.g., Kafka)
   - [ ] Loadbalancer
   - [ ] Invoker
   - [ ] Intrinsic actions (e.g., sequences, conductors)
   - [ ] Data stores (e.g., CouchDB)
   - [ ] Tests
   - [ ] Deployment
   - [ ] CLI
   - [ ] General tooling
   - [ ] Documentation
   
   ## Types of changes
   - [ ] Bug fix (generally a non-breaking change which closes an issue).
   - [x] Enhancement or new feature (adds new functionality).
   - [ ] Breaking change (a bug fix or enhancement which changes existing behavior).
   
   ## Checklist:
   <!--- Please review the points below which help you make sure you've covered all aspects of the change you're making. -->
   
   - [x] I signed an [Apache CLA](https://github.com/apache/openwhisk/blob/master/CONTRIBUTING.md).
   - [x] I reviewed the [style guides](https://github.com/apache/openwhisk/wiki/Contributing:-Git-guidelines#code-readiness) and followed the recommendations (Travis CI will check :).
   - [ ] I added tests to cover my changes.
   - [ ] My changes require further changes to the documentation.
   - [ ] I updated the documentation where necessary.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on a change in pull request #4983: [New Scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
style95 commented on a change in pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#discussion_r496444672



##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
##########
@@ -0,0 +1,269 @@
+package org.apache.openwhisk.core.scheduler
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation $activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def shutdown(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: ${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: $invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: $invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  val dataManagementService = "" // TODO: TBD
+
+  val creationJobManagerFactory = "" // TODO: TBD
+
+  val containerManager = "" // TODO: TBD
+
+  val memoryQueueFactory = "" // TODO: TBD
+
+  val schedulerConsumer = msgProvider.getConsumer(
+    config,
+    s"scheduler${schedulerId.asString}",
+    s"scheduler${schedulerId.asString}",
+    500, // TODO: to be updated with maxPeek variable
+    maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
+
+  implicit val trasnid = TransactionId.containerCreation
+
+  val queueManager = "" // TODO: TBD
+
+  //val serviceHandlers: HttpRequest => Future[HttpResponse] = ActivationServiceHandler.apply(ActivationServiceImpl())  TODO: TBD
+}
+
+case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None)
+
+trait SchedulerCore {
+  def getState: Future[(List[(SchedulerInstanceId, Int)], Int)]
+
+  def getQueueSize: Future[Int]
+
+  def getQueueStatusData: Future[List[String]] // TODO: Change to the real data class other than just string
+
+  def shutdown(): Unit
+}
+
+object Scheduler {
+
+  protected val protocol = loadConfigOrThrow[String]("whisk.scheduler.protocol")
+
+  def requiredProperties =
+    Map(
+      servicePort -> 8080.toString,
+      schedulerHost -> null,
+      schedulerAkkaPort -> null,
+      schedulerRpcPort -> null,
+      WhiskConfig.actionInvokePerMinuteLimit -> null,
+      WhiskConfig.actionInvokeConcurrentLimit -> null,
+      WhiskConfig.triggerFirePerMinuteLimit -> null) ++
+      kafkaHosts ++
+      zookeeperHosts ++
+      wskApiHost ++
+      ExecManifest.requiredProperties
+
+  def initKamon(instance: SchedulerInstanceId): Unit = {
+    // Replace the hostname of the scheduler to the assigned id of the scheduler.
+    val newKamonConfig = Kamon.config
+      .withValue("kamon.environment.host", ConfigValueFactory.fromAnyRef(s"scheduler${instance.asString}"))
+    Kamon.init(newKamonConfig)
+  }
+
+  def main(args: Array[String]): Unit = {
+    implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
+    implicit val actorSystem: ActorSystem =
+      ActorSystem(name = "scheduler-actor-system", defaultExecutionContext = Some(ec))
+    implicit val materializer = ActorMaterializer.create(actorSystem)
+
+    implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
+
+    // Prepare Kamon shutdown
+    CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () =>
+      logger.info(this, s"Shutting down Kamon with coordinated shutdown")
+      Kamon.stopModules().map(_ => Done)
+    }
+
+    // extract configuration data from the environment
+    implicit val config = new WhiskConfig(requiredProperties)
+    val port = config.servicePort.toInt
+    val host = config.schedulerHost
+    val rpcPort = config.schedulerRpcPort.toInt
+    val akkaPort = config.schedulerAkkaPort.toInt
+
+    // if deploying multiple instances (scale out), must pass the instance number as the
+    require(args.length >= 1, "controller instance required")
+    val instanceId = SchedulerInstanceId(args(0))
+
+    initKamon(instanceId)
+
+    def abort(message: String) = {
+      logger.error(this, message)
+      actorSystem.terminate()
+      Await.result(actorSystem.whenTerminated, 30.seconds)
+      sys.exit(1)
+    }
+
+    if (!config.isValid) {
+      abort("Bad configuration, cannot start.")
+    }
+
+    val execManifest = ExecManifest.initialize(config)
+    if (execManifest.isFailure) {
+      logger.error(this, s"Invalid runtimes manifest: ${execManifest.failed.get}")
+      abort("Bad configuration, cannot start.")
+    }
+
+    val msgProvider = SpiLoader.get[MessagingProvider]
+
+    Seq(
+      ("scheduler" + instanceId.asString, "actions", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)),
+      ("creationAck" + instanceId.asString, "creationAck", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)))

Review comment:
       Correct.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] bdoyle0182 commented on a change in pull request #4983: [New Scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
bdoyle0182 commented on a change in pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#discussion_r496445861



##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler
+
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation $activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def shutdown(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: ${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: $invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: $invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  val dataManagementService = "" // TODO: TBD
+
+  val creationJobManagerFactory = "" // TODO: TBD
+
+  val containerManager = "" // TODO: TBD
+
+  val memoryQueueFactory = "" // TODO: TBD
+
+  val schedulerConsumer = msgProvider.getConsumer(
+    config,
+    s"scheduler${schedulerId.asString}",
+    s"scheduler${schedulerId.asString}",
+    500, // TODO: to be updated with maxPeek variable
+    maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
+
+  implicit val trasnid = TransactionId.containerCreation
+
+  val queueManager = "" // TODO: TBD
+
+  //val serviceHandlers: HttpRequest => Future[HttpResponse] = ActivationServiceHandler.apply(ActivationServiceImpl())  TODO: TBD
+}
+
+case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None)
+
+trait SchedulerCore {
+  def getState: Future[(List[(SchedulerInstanceId, Int)], Int)]
+
+  def getQueueSize: Future[Int]
+
+  def getQueueStatusData: Future[List[String]] // TODO: Change to the real data class other than just string
+
+  def shutdown(): Unit
+}
+
+object Scheduler {
+
+  protected val protocol = loadConfigOrThrow[String]("whisk.scheduler.protocol")
+
+  def requiredProperties =
+    Map(
+      servicePort -> 8080.toString,
+      schedulerHost -> null,
+      schedulerAkkaPort -> null,
+      schedulerRpcPort -> null,
+      WhiskConfig.actionInvokePerMinuteLimit -> null,
+      WhiskConfig.actionInvokeConcurrentLimit -> null,
+      WhiskConfig.triggerFirePerMinuteLimit -> null) ++
+      kafkaHosts ++
+      zookeeperHosts ++
+      wskApiHost ++
+      ExecManifest.requiredProperties
+
+  def initKamon(instance: SchedulerInstanceId): Unit = {
+    // Replace the hostname of the scheduler to the assigned id of the scheduler.
+    val newKamonConfig = Kamon.config
+      .withValue("kamon.environment.host", ConfigValueFactory.fromAnyRef(s"scheduler${instance.asString}"))
+    Kamon.init(newKamonConfig)
+  }
+
+  def main(args: Array[String]): Unit = {
+    implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
+    implicit val actorSystem: ActorSystem =
+      ActorSystem(name = "scheduler-actor-system", defaultExecutionContext = Some(ec))
+    implicit val materializer = ActorMaterializer.create(actorSystem)
+
+    implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
+
+    // Prepare Kamon shutdown
+    CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () =>
+      logger.info(this, s"Shutting down Kamon with coordinated shutdown")
+      Kamon.stopModules().map(_ => Done)
+    }
+
+    // extract configuration data from the environment
+    implicit val config = new WhiskConfig(requiredProperties)
+    val port = config.servicePort.toInt
+    val host = config.schedulerHost
+    val rpcPort = config.schedulerRpcPort.toInt
+    val akkaPort = config.schedulerAkkaPort.toInt
+
+    // if deploying multiple instances (scale out), must pass the instance number as the
+    require(args.length >= 1, "controller instance required")
+    val instanceId = SchedulerInstanceId(args(0))
+
+    initKamon(instanceId)
+
+    def abort(message: String) = {

Review comment:
       nit: move abort higher up in main so that the `config.isValid` check can come right after loading the config




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on a change in pull request #4983: [New Scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
style95 commented on a change in pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#discussion_r501442446



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -426,3 +426,16 @@ object EventMessage extends DefaultJsonProtocol {
 
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
+
+object StatusQuery
+case class StatusData(invocationNamespace: String, fqn: String, waitingActivation: Int, status: String, data: String)

Review comment:
       The `waitingActivation` is the number of waiting activations in a queue.
   Generally, this value would be a small number or zero as most activation messages would be properly handled in time.
   But if there is any issue; containers are not provisioned in time, any disconnection with other components happens, etc, there can be many activations waiting for processing.
   
   Regarding the basic types, we just used the string as it is simple.
   If we need to change it to certain types with proper serde, how about adding them when we add a feature to use this data class?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] bdoyle0182 commented on a change in pull request #4983: [New Scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
bdoyle0182 commented on a change in pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#discussion_r496445360



##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler
+
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation $activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def shutdown(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: ${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: $invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: $invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  val dataManagementService = "" // TODO: TBD
+
+  val creationJobManagerFactory = "" // TODO: TBD
+
+  val containerManager = "" // TODO: TBD
+
+  val memoryQueueFactory = "" // TODO: TBD
+
+  val schedulerConsumer = msgProvider.getConsumer(
+    config,
+    s"scheduler${schedulerId.asString}",
+    s"scheduler${schedulerId.asString}",
+    500, // TODO: to be updated with maxPeek variable
+    maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
+
+  implicit val trasnid = TransactionId.containerCreation
+
+  val queueManager = "" // TODO: TBD
+
+  //val serviceHandlers: HttpRequest => Future[HttpResponse] = ActivationServiceHandler.apply(ActivationServiceImpl())  TODO: TBD
+}
+
+case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None)
+
+trait SchedulerCore {
+  def getState: Future[(List[(SchedulerInstanceId, Int)], Int)]
+
+  def getQueueSize: Future[Int]
+
+  def getQueueStatusData: Future[List[String]] // TODO: Change to the real data class other than just string
+
+  def shutdown(): Unit
+}
+
+object Scheduler {
+
+  protected val protocol = loadConfigOrThrow[String]("whisk.scheduler.protocol")
+
+  def requiredProperties =
+    Map(
+      servicePort -> 8080.toString,
+      schedulerHost -> null,
+      schedulerAkkaPort -> null,
+      schedulerRpcPort -> null,
+      WhiskConfig.actionInvokePerMinuteLimit -> null,
+      WhiskConfig.actionInvokeConcurrentLimit -> null,
+      WhiskConfig.triggerFirePerMinuteLimit -> null) ++
+      kafkaHosts ++
+      zookeeperHosts ++
+      wskApiHost ++
+      ExecManifest.requiredProperties
+
+  def initKamon(instance: SchedulerInstanceId): Unit = {
+    // Replace the hostname of the scheduler to the assigned id of the scheduler.
+    val newKamonConfig = Kamon.config
+      .withValue("kamon.environment.host", ConfigValueFactory.fromAnyRef(s"scheduler${instance.asString}"))
+    Kamon.init(newKamonConfig)
+  }
+
+  def main(args: Array[String]): Unit = {
+    implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
+    implicit val actorSystem: ActorSystem =
+      ActorSystem(name = "scheduler-actor-system", defaultExecutionContext = Some(ec))
+    implicit val materializer = ActorMaterializer.create(actorSystem)
+
+    implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
+
+    // Prepare Kamon shutdown
+    CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () =>
+      logger.info(this, s"Shutting down Kamon with coordinated shutdown")
+      Kamon.stopModules().map(_ => Done)
+    }
+
+    // extract configuration data from the environment
+    implicit val config = new WhiskConfig(requiredProperties)
+    val port = config.servicePort.toInt
+    val host = config.schedulerHost
+    val rpcPort = config.schedulerRpcPort.toInt
+    val akkaPort = config.schedulerAkkaPort.toInt
+
+    // if deploying multiple instances (scale out), must pass the instance number as the
+    require(args.length >= 1, "controller instance required")

Review comment:
       should this be `scheduler instance required`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] codecov-io commented on pull request #4983: [New Scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#issuecomment-707116829


   # [Codecov](https://codecov.io/gh/apache/openwhisk/pull/4983?src=pr&el=h1) Report
   > Merging [#4983](https://codecov.io/gh/apache/openwhisk/pull/4983?src=pr&el=desc) into [master](https://codecov.io/gh/apache/openwhisk/commit/af161221244399e801b0217af554abd00e96c2b2?el=desc) will **decrease** coverage by `7.08%`.
   > The diff coverage is `46.66%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/openwhisk/pull/4983/graphs/tree.svg?width=650&height=150&src=pr&token=l0YmsiSAso)](https://codecov.io/gh/apache/openwhisk/pull/4983?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #4983      +/-   ##
   ==========================================
   - Coverage   83.96%   76.87%   -7.09%     
   ==========================================
     Files         202      202              
     Lines        9795     9822      +27     
     Branches      423      416       -7     
   ==========================================
   - Hits         8224     7551     -673     
   - Misses       1571     2271     +700     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/openwhisk/pull/4983?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [.../org/apache/openwhisk/core/connector/Message.scala](https://codecov.io/gh/apache/openwhisk/pull/4983/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29ubmVjdG9yL01lc3NhZ2Uuc2NhbGE=) | `78.90% <0.00%> (-0.46%)` | :arrow_down: |
   | [.../org/apache/openwhisk/core/entity/InstanceId.scala](https://codecov.io/gh/apache/openwhisk/pull/4983/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZW50aXR5L0luc3RhbmNlSWQuc2NhbGE=) | `80.70% <0.00%> (-9.50%)` | :arrow_down: |
   | [...la/org/apache/openwhisk/common/TransactionId.scala](https://codecov.io/gh/apache/openwhisk/pull/4983/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvbW1vbi9UcmFuc2FjdGlvbklkLnNjYWxh) | `95.18% <100.00%> (+0.05%)` | :arrow_up: |
   | [.../scala/org/apache/openwhisk/core/WhiskConfig.scala](https://codecov.io/gh/apache/openwhisk/pull/4983/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvV2hpc2tDb25maWcuc2NhbGE=) | `95.27% <100.00%> (+0.23%)` | :arrow_up: |
   | [...core/database/cosmosdb/RxObservableImplicits.scala](https://codecov.io/gh/apache/openwhisk/pull/4983/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZGF0YWJhc2UvY29zbW9zZGIvUnhPYnNlcnZhYmxlSW1wbGljaXRzLnNjYWxh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ore/database/cosmosdb/cache/CacheInvalidator.scala](https://codecov.io/gh/apache/openwhisk/pull/4983/diff?src=pr&el=tree#diff-Y29yZS9jb3Ntb3NkYi9jYWNoZS1pbnZhbGlkYXRvci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL29wZW53aGlzay9jb3JlL2RhdGFiYXNlL2Nvc21vc2RiL2NhY2hlL0NhY2hlSW52YWxpZGF0b3Iuc2NhbGE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...e/database/cosmosdb/cache/ChangeFeedConsumer.scala](https://codecov.io/gh/apache/openwhisk/pull/4983/diff?src=pr&el=tree#diff-Y29yZS9jb3Ntb3NkYi9jYWNoZS1pbnZhbGlkYXRvci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL29wZW53aGlzay9jb3JlL2RhdGFiYXNlL2Nvc21vc2RiL2NhY2hlL0NoYW5nZUZlZWRDb25zdW1lci5zY2FsYQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...core/database/cosmosdb/CosmosDBArtifactStore.scala](https://codecov.io/gh/apache/openwhisk/pull/4983/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZGF0YWJhc2UvY29zbW9zZGIvQ29zbW9zREJBcnRpZmFjdFN0b3JlLnNjYWxh) | `0.00% <0.00%> (-96.23%)` | :arrow_down: |
   | [...sk/core/database/cosmosdb/CosmosDBViewMapper.scala](https://codecov.io/gh/apache/openwhisk/pull/4983/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZGF0YWJhc2UvY29zbW9zZGIvQ29zbW9zREJWaWV3TWFwcGVyLnNjYWxh) | `0.00% <0.00%> (-93.90%)` | :arrow_down: |
   | [...tabase/cosmosdb/cache/CacheInvalidatorConfig.scala](https://codecov.io/gh/apache/openwhisk/pull/4983/diff?src=pr&el=tree#diff-Y29yZS9jb3Ntb3NkYi9jYWNoZS1pbnZhbGlkYXRvci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL29wZW53aGlzay9jb3JlL2RhdGFiYXNlL2Nvc21vc2RiL2NhY2hlL0NhY2hlSW52YWxpZGF0b3JDb25maWcuc2NhbGE=) | `0.00% <0.00%> (-92.31%)` | :arrow_down: |
   | ... and [24 more](https://codecov.io/gh/apache/openwhisk/pull/4983/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/openwhisk/pull/4983?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/openwhisk/pull/4983?src=pr&el=footer). Last update [af16122...21c5da6](https://codecov.io/gh/apache/openwhisk/pull/4983?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] bdoyle0182 commented on a change in pull request #4983: [New Scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
bdoyle0182 commented on a change in pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#discussion_r496448324



##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.server.Route
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.http.BasicRasService
+import org.apache.openwhisk.http.ErrorResponse.terminate
+import spray.json._
+
+import scala.concurrent.ExecutionContext
+
+/**
+ * Implements web server to handle certain REST API calls.
+ * Currently provides a health ping route, only.
+ */
+class SchedulerServer(scheduler: SchedulerCore, systemUsername: String, systemPassword: String)(
+  implicit val ec: ExecutionContext,
+  implicit val actorSystem: ActorSystem,
+  implicit val logger: Logging)
+    extends BasicRasService {
+
+  override def routes(implicit transid: TransactionId): Route = {
+    super.routes ~ extractCredentials {
+      case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
+        (path("disable") & post) {

Review comment:
       I see that's left as todo for now for the graceful shutdown. So in theory the scheduler could hang forever if the queues don't drain? Or is there going to be some hard kill after some timeout?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on a change in pull request #4983: [New scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
style95 commented on a change in pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#discussion_r494661944



##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
##########
@@ -0,0 +1,269 @@
+package org.apache.openwhisk.core.scheduler
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation $activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def shutdown(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: ${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: $invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: $invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  val dataManagementService = "" // TODO: TBD
+
+  val creationJobManagerFactory = "" // TODO: TBD
+
+  val containerManager = "" // TODO: TBD
+
+  val memoryQueueFactory = "" // TODO: TBD

Review comment:
       This is a factory to create memory queues.
   In the new architecture, each action is given its own dedicated queue.
   

##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
##########
@@ -0,0 +1,269 @@
+package org.apache.openwhisk.core.scheduler
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation $activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def shutdown(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: ${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: $invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: $invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  val dataManagementService = "" // TODO: TBD
+
+  val creationJobManagerFactory = "" // TODO: TBD
+
+  val containerManager = "" // TODO: TBD
+
+  val memoryQueueFactory = "" // TODO: TBD
+
+  val schedulerConsumer = msgProvider.getConsumer(
+    config,
+    s"scheduler${schedulerId.asString}",
+    s"scheduler${schedulerId.asString}",
+    500, // TODO: to be updated with maxPeek variable
+    maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
+
+  implicit val trasnid = TransactionId.containerCreation
+
+  val queueManager = "" // TODO: TBD

Review comment:
       This is one of the major components which take charge of managing queues and coordinating requests among the scheduler, controllers, and invokers.

##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
##########
@@ -0,0 +1,269 @@
+package org.apache.openwhisk.core.scheduler
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation $activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def shutdown(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: ${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: $invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: $invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  val dataManagementService = "" // TODO: TBD

Review comment:
       This component is in charge of storing data to ETCD.
   

##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
##########
@@ -0,0 +1,269 @@
+package org.apache.openwhisk.core.scheduler
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation $activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def shutdown(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: ${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: $invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: $invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  val dataManagementService = "" // TODO: TBD
+
+  val creationJobManagerFactory = "" // TODO: TBD
+
+  val containerManager = "" // TODO: TBD
+
+  val memoryQueueFactory = "" // TODO: TBD
+
+  val schedulerConsumer = msgProvider.getConsumer(
+    config,
+    s"scheduler${schedulerId.asString}",
+    s"scheduler${schedulerId.asString}",
+    500, // TODO: to be updated with maxPeek variable
+    maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
+
+  implicit val trasnid = TransactionId.containerCreation
+
+  val queueManager = "" // TODO: TBD
+
+  //val serviceHandlers: HttpRequest => Future[HttpResponse] = ActivationServiceHandler.apply(ActivationServiceImpl())  TODO: TBD
+}
+
+case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None)
+
+trait SchedulerCore {
+  def getState: Future[(List[(SchedulerInstanceId, Int)], Int)]
+
+  def getQueueSize: Future[Int]
+
+  def getQueueStatusData: Future[List[String]] // TODO: Change to the real data class other than just string
+
+  def shutdown(): Unit
+}
+
+object Scheduler {
+
+  protected val protocol = loadConfigOrThrow[String]("whisk.scheduler.protocol")
+
+  def requiredProperties =
+    Map(
+      servicePort -> 8080.toString,
+      schedulerHost -> null,
+      schedulerAkkaPort -> null,

Review comment:
       The scheduler has two ports, one for akka-remote and the other for akka-grpc.
   

##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
##########
@@ -0,0 +1,269 @@
+package org.apache.openwhisk.core.scheduler
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation $activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def shutdown(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: ${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: $invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: $invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  val dataManagementService = "" // TODO: TBD
+
+  val creationJobManagerFactory = "" // TODO: TBD
+
+  val containerManager = "" // TODO: TBD
+
+  val memoryQueueFactory = "" // TODO: TBD
+
+  val schedulerConsumer = msgProvider.getConsumer(
+    config,
+    s"scheduler${schedulerId.asString}",
+    s"scheduler${schedulerId.asString}",
+    500, // TODO: to be updated with maxPeek variable
+    maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
+
+  implicit val trasnid = TransactionId.containerCreation
+
+  val queueManager = "" // TODO: TBD
+
+  //val serviceHandlers: HttpRequest => Future[HttpResponse] = ActivationServiceHandler.apply(ActivationServiceImpl())  TODO: TBD
+}
+
+case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None)
+
+trait SchedulerCore {
+  def getState: Future[(List[(SchedulerInstanceId, Int)], Int)]
+
+  def getQueueSize: Future[Int]
+
+  def getQueueStatusData: Future[List[String]] // TODO: Change to the real data class other than just string
+
+  def shutdown(): Unit
+}
+
+object Scheduler {
+
+  protected val protocol = loadConfigOrThrow[String]("whisk.scheduler.protocol")
+
+  def requiredProperties =
+    Map(
+      servicePort -> 8080.toString,
+      schedulerHost -> null,
+      schedulerAkkaPort -> null,
+      schedulerRpcPort -> null,
+      WhiskConfig.actionInvokePerMinuteLimit -> null,
+      WhiskConfig.actionInvokeConcurrentLimit -> null,
+      WhiskConfig.triggerFirePerMinuteLimit -> null) ++
+      kafkaHosts ++
+      zookeeperHosts ++
+      wskApiHost ++
+      ExecManifest.requiredProperties
+
+  def initKamon(instance: SchedulerInstanceId): Unit = {
+    // Replace the hostname of the scheduler to the assigned id of the scheduler.
+    val newKamonConfig = Kamon.config
+      .withValue("kamon.environment.host", ConfigValueFactory.fromAnyRef(s"scheduler${instance.asString}"))
+    Kamon.init(newKamonConfig)
+  }
+
+  def main(args: Array[String]): Unit = {
+    implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
+    implicit val actorSystem: ActorSystem =
+      ActorSystem(name = "scheduler-actor-system", defaultExecutionContext = Some(ec))
+    implicit val materializer = ActorMaterializer.create(actorSystem)
+
+    implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
+
+    // Prepare Kamon shutdown
+    CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () =>
+      logger.info(this, s"Shutting down Kamon with coordinated shutdown")
+      Kamon.stopModules().map(_ => Done)
+    }
+
+    // extract configuration data from the environment
+    implicit val config = new WhiskConfig(requiredProperties)
+    val port = config.servicePort.toInt
+    val host = config.schedulerHost
+    val rpcPort = config.schedulerRpcPort.toInt
+    val akkaPort = config.schedulerAkkaPort.toInt
+
+    // if deploying multiple instances (scale out), must pass the instance number as the
+    require(args.length >= 1, "controller instance required")
+    val instanceId = SchedulerInstanceId(args(0))
+
+    initKamon(instanceId)
+
+    def abort(message: String) = {
+      logger.error(this, message)
+      actorSystem.terminate()
+      Await.result(actorSystem.whenTerminated, 30.seconds)
+      sys.exit(1)
+    }
+
+    if (!config.isValid) {
+      abort("Bad configuration, cannot start.")
+    }
+
+    val execManifest = ExecManifest.initialize(config)
+    if (execManifest.isFailure) {
+      logger.error(this, s"Invalid runtimes manifest: ${execManifest.failed.get}")
+      abort("Bad configuration, cannot start.")
+    }
+
+    val msgProvider = SpiLoader.get[MessagingProvider]
+
+    Seq(
+      ("scheduler" + instanceId.asString, "actions", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)),
+      ("creationAck" + instanceId.asString, "creationAck", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)))

Review comment:
       The final goal is to remove Kafka from the critical path, but it still relies on Kafka as of now.
   Now activation messages are sent to the scheduler via `schedulerN` topic and container creation messages are sent to invoker via `invokerN` topic.
   

##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
##########
@@ -0,0 +1,269 @@
+package org.apache.openwhisk.core.scheduler
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation $activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def shutdown(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: ${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: $invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: $invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  val dataManagementService = "" // TODO: TBD
+
+  val creationJobManagerFactory = "" // TODO: TBD
+
+  val containerManager = "" // TODO: TBD

Review comment:
       This component is responsible for creating containers for a given action.
   It relies on the `creationJobManager` to manage the container creation job.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] bdoyle0182 commented on a change in pull request #4983: [New Scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
bdoyle0182 commented on a change in pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#discussion_r498978469



##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.server.Route
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.http.BasicRasService
+import org.apache.openwhisk.http.ErrorResponse.terminate
+import spray.json._
+
+import scala.concurrent.ExecutionContext
+
+/**
+ * Implements web server to handle certain REST API calls.
+ * Currently provides a health ping route, only.
+ */
+class SchedulerServer(scheduler: SchedulerCore, systemUsername: String, systemPassword: String)(
+  implicit val ec: ExecutionContext,
+  implicit val actorSystem: ActorSystem,
+  implicit val logger: Logging)
+    extends BasicRasService {
+
+  override def routes(implicit transid: TransactionId): Route = {
+    super.routes ~ extractCredentials {
+      case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
+        (path("disable") & post) {

Review comment:
       Yes I didn't realize the scheduler service would still be running on the disabled node. Disabled makes sense in that case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on a change in pull request #4983: [New Scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
style95 commented on a change in pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#discussion_r501440428



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -426,3 +426,16 @@ object EventMessage extends DefaultJsonProtocol {
 
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
+
+object StatusQuery
+case class StatusData(invocationNamespace: String, fqn: String, waitingActivation: Int, status: String, data: String)

Review comment:
       This is to query the queue status from the scheduler.
   
   The following is an example.
   When a `MemoryQueue(AkkaFSM)` is running, there can be a different combination of status and data.
   This would be useful to figure out the status when any issue happens in a queue or scheduler.
   
   ```
   [
   ...
     {
       "data": "RunningData",
       "fqn": "whisk.system/elasticsearch/status-alarm@0.0.2",
       "invocationNamespace": "style95",
       "status": "Running",
       "waitingActivation": 1
     }
   ...
   ]
   ```

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -426,3 +426,16 @@ object EventMessage extends DefaultJsonProtocol {
 
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
+
+object StatusQuery
+case class StatusData(invocationNamespace: String, fqn: String, waitingActivation: Int, status: String, data: String)

Review comment:
       The `waitingActivation` is the number of waiting activations in a queue.
   Generally, this value would be a small number or zero as most activation messages would be properly handled in time.
   But if there is any issue; containers are not provisioned in time, any disconnection with other components happens, etc, there can be many activations waiting for processing.
   
   Regarding the basic types, we just used the string as it is simple.
   If we need to change it to certain types with proper serde, how about adding them when we add a feature to use this data class?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on pull request #4983: [New Scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
style95 commented on pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#issuecomment-706886842


   It seems there are some issues in the travis job.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] bdoyle0182 commented on a change in pull request #4983: [New Scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
bdoyle0182 commented on a change in pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#discussion_r496440594



##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler
+
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation $activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def shutdown(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: ${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: $invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: $invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  val dataManagementService = "" // TODO: TBD
+
+  val creationJobManagerFactory = "" // TODO: TBD
+
+  val containerManager = "" // TODO: TBD
+
+  val memoryQueueFactory = "" // TODO: TBD
+
+  val schedulerConsumer = msgProvider.getConsumer(
+    config,
+    s"scheduler${schedulerId.asString}",
+    s"scheduler${schedulerId.asString}",
+    500, // TODO: to be updated with maxPeek variable
+    maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
+
+  implicit val trasnid = TransactionId.containerCreation
+
+  val queueManager = "" // TODO: TBD
+
+  //val serviceHandlers: HttpRequest => Future[HttpResponse] = ActivationServiceHandler.apply(ActivationServiceImpl())  TODO: TBD
+}
+
+case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None)
+
+trait SchedulerCore {
+  def getState: Future[(List[(SchedulerInstanceId, Int)], Int)]
+
+  def getQueueSize: Future[Int]
+
+  def getQueueStatusData: Future[List[String]] // TODO: Change to the real data class other than just string
+
+  def shutdown(): Unit
+}
+
+object Scheduler {
+
+  protected val protocol = loadConfigOrThrow[String]("whisk.scheduler.protocol")
+
+  def requiredProperties =
+    Map(
+      servicePort -> 8080.toString,
+      schedulerHost -> null,
+      schedulerAkkaPort -> null,
+      schedulerRpcPort -> null,
+      WhiskConfig.actionInvokePerMinuteLimit -> null,
+      WhiskConfig.actionInvokeConcurrentLimit -> null,
+      WhiskConfig.triggerFirePerMinuteLimit -> null) ++
+      kafkaHosts ++
+      zookeeperHosts ++
+      wskApiHost ++
+      ExecManifest.requiredProperties
+
+  def initKamon(instance: SchedulerInstanceId): Unit = {
+    // Replace the hostname of the scheduler to the assigned id of the scheduler.
+    val newKamonConfig = Kamon.config
+      .withValue("kamon.environment.host", ConfigValueFactory.fromAnyRef(s"scheduler${instance.asString}"))
+    Kamon.init(newKamonConfig)
+  }
+
+  def main(args: Array[String]): Unit = {
+    implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
+    implicit val actorSystem: ActorSystem =
+      ActorSystem(name = "scheduler-actor-system", defaultExecutionContext = Some(ec))
+    implicit val materializer = ActorMaterializer.create(actorSystem)
+
+    implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
+
+    // Prepare Kamon shutdown
+    CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () =>
+      logger.info(this, s"Shutting down Kamon with coordinated shutdown")
+      Kamon.stopModules().map(_ => Done)
+    }
+
+    // extract configuration data from the environment
+    implicit val config = new WhiskConfig(requiredProperties)
+    val port = config.servicePort.toInt
+    val host = config.schedulerHost
+    val rpcPort = config.schedulerRpcPort.toInt
+    val akkaPort = config.schedulerAkkaPort.toInt
+
+    // if deploying multiple instances (scale out), must pass the instance number as the
+    require(args.length >= 1, "controller instance required")
+    val instanceId = SchedulerInstanceId(args(0))
+
+    initKamon(instanceId)
+
+    def abort(message: String) = {
+      logger.error(this, message)
+      actorSystem.terminate()
+      Await.result(actorSystem.whenTerminated, 30.seconds)
+      sys.exit(1)
+    }
+
+    if (!config.isValid) {
+      abort("Bad configuration, cannot start.")
+    }
+
+    val execManifest = ExecManifest.initialize(config)
+    if (execManifest.isFailure) {
+      logger.error(this, s"Invalid runtimes manifest: ${execManifest.failed.get}")
+      abort("Bad configuration, cannot start.")

Review comment:
       reason should specific to runtime manifest being improperly formatted




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] bdoyle0182 commented on a change in pull request #4983: [New Scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
bdoyle0182 commented on a change in pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#discussion_r496442604



##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
##########
@@ -0,0 +1,269 @@
+package org.apache.openwhisk.core.scheduler
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation $activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def shutdown(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: ${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: $invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: $invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  val dataManagementService = "" // TODO: TBD
+
+  val creationJobManagerFactory = "" // TODO: TBD
+
+  val containerManager = "" // TODO: TBD
+
+  val memoryQueueFactory = "" // TODO: TBD
+
+  val schedulerConsumer = msgProvider.getConsumer(
+    config,
+    s"scheduler${schedulerId.asString}",
+    s"scheduler${schedulerId.asString}",
+    500, // TODO: to be updated with maxPeek variable
+    maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
+
+  implicit val trasnid = TransactionId.containerCreation
+
+  val queueManager = "" // TODO: TBD
+
+  //val serviceHandlers: HttpRequest => Future[HttpResponse] = ActivationServiceHandler.apply(ActivationServiceImpl())  TODO: TBD
+}
+
+case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None)
+
+trait SchedulerCore {
+  def getState: Future[(List[(SchedulerInstanceId, Int)], Int)]
+
+  def getQueueSize: Future[Int]
+
+  def getQueueStatusData: Future[List[String]] // TODO: Change to the real data class other than just string
+
+  def shutdown(): Unit
+}
+
+object Scheduler {
+
+  protected val protocol = loadConfigOrThrow[String]("whisk.scheduler.protocol")
+
+  def requiredProperties =
+    Map(
+      servicePort -> 8080.toString,
+      schedulerHost -> null,
+      schedulerAkkaPort -> null,
+      schedulerRpcPort -> null,
+      WhiskConfig.actionInvokePerMinuteLimit -> null,
+      WhiskConfig.actionInvokeConcurrentLimit -> null,
+      WhiskConfig.triggerFirePerMinuteLimit -> null) ++
+      kafkaHosts ++
+      zookeeperHosts ++
+      wskApiHost ++
+      ExecManifest.requiredProperties
+
+  def initKamon(instance: SchedulerInstanceId): Unit = {
+    // Replace the hostname of the scheduler to the assigned id of the scheduler.
+    val newKamonConfig = Kamon.config
+      .withValue("kamon.environment.host", ConfigValueFactory.fromAnyRef(s"scheduler${instance.asString}"))
+    Kamon.init(newKamonConfig)
+  }
+
+  def main(args: Array[String]): Unit = {
+    implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
+    implicit val actorSystem: ActorSystem =
+      ActorSystem(name = "scheduler-actor-system", defaultExecutionContext = Some(ec))
+    implicit val materializer = ActorMaterializer.create(actorSystem)
+
+    implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
+
+    // Prepare Kamon shutdown
+    CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () =>
+      logger.info(this, s"Shutting down Kamon with coordinated shutdown")
+      Kamon.stopModules().map(_ => Done)
+    }
+
+    // extract configuration data from the environment
+    implicit val config = new WhiskConfig(requiredProperties)
+    val port = config.servicePort.toInt
+    val host = config.schedulerHost
+    val rpcPort = config.schedulerRpcPort.toInt
+    val akkaPort = config.schedulerAkkaPort.toInt
+
+    // if deploying multiple instances (scale out), must pass the instance number as the
+    require(args.length >= 1, "controller instance required")
+    val instanceId = SchedulerInstanceId(args(0))
+
+    initKamon(instanceId)
+
+    def abort(message: String) = {
+      logger.error(this, message)
+      actorSystem.terminate()
+      Await.result(actorSystem.whenTerminated, 30.seconds)
+      sys.exit(1)
+    }
+
+    if (!config.isValid) {
+      abort("Bad configuration, cannot start.")
+    }
+
+    val execManifest = ExecManifest.initialize(config)
+    if (execManifest.isFailure) {
+      logger.error(this, s"Invalid runtimes manifest: ${execManifest.failed.get}")
+      abort("Bad configuration, cannot start.")
+    }
+
+    val msgProvider = SpiLoader.get[MessagingProvider]
+
+    Seq(
+      ("scheduler" + instanceId.asString, "actions", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)),
+      ("creationAck" + instanceId.asString, "creationAck", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)))

Review comment:
       controller is posting to `schedulerN` and scheduler is posting to `invokerN` correct?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] bdoyle0182 commented on pull request #4983: [New Scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
bdoyle0182 commented on pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#issuecomment-700479672


   From what I understand so far, LGTM for an initial commit. Can always come back and add comments if I learn something later


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on a change in pull request #4983: [New Scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
style95 commented on a change in pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#discussion_r496494388



##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.server.Route
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.http.BasicRasService
+import org.apache.openwhisk.http.ErrorResponse.terminate
+import spray.json._
+
+import scala.concurrent.ExecutionContext
+
+/**
+ * Implements web server to handle certain REST API calls.
+ * Currently provides a health ping route, only.
+ */
+class SchedulerServer(scheduler: SchedulerCore, systemUsername: String, systemPassword: String)(
+  implicit val ec: ExecutionContext,
+  implicit val actorSystem: ActorSystem,
+  implicit val logger: Logging)
+    extends BasicRasService {
+
+  override def routes(implicit transid: TransactionId): Route = {
+    super.routes ~ extractCredentials {
+      case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
+        (path("disable") & post) {

Review comment:
       I hope we can keep it as the scheduler is still running and working as expected after it is disabled.
   (It just no longer receives any new messages.)
   And I believe we can easily add an `enable` feature to re-enable the scheduler without restarting the component itself.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] rabbah commented on a change in pull request #4983: [New Scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
rabbah commented on a change in pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#discussion_r501101156



##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.server.Route
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.http.BasicRasService
+import org.apache.openwhisk.http.ErrorResponse.terminate
+import spray.json._
+
+import scala.concurrent.ExecutionContext
+
+/**
+ * Implements web server to handle certain REST API calls.
+ * Currently provides a health ping route, only.
+ */
+class SchedulerServer(scheduler: SchedulerCore, systemUsername: String, systemPassword: String)(
+  implicit val ec: ExecutionContext,
+  implicit val actorSystem: ActorSystem,
+  implicit val logger: Logging)
+    extends BasicRasService {
+
+  override def routes(implicit transid: TransactionId): Route = {
+    super.routes ~ extractCredentials {
+      case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
+        (path("disable") & post) {

Review comment:
       Should `scheduler.shutdown()` then also be called `disable` (or pause)?
   

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -426,3 +426,16 @@ object EventMessage extends DefaultJsonProtocol {
 
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
+
+object StatusQuery
+case class StatusData(invocationNamespace: String, fqn: String, waitingActivation: Int, status: String, data: String)

Review comment:
       what is waiting activation?
   what is status / why is it a string?
   what is data?
   are these basic (string & int) types for performance reasons?
   
   Some comments or examples would be helpful here.

##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.server.Route
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.http.BasicRasService
+import org.apache.openwhisk.http.ErrorResponse.terminate
+import spray.json._
+
+import scala.concurrent.ExecutionContext
+
+/**
+ * Implements web server to handle certain REST API calls.
+ * Currently provides a health ping route, only.
+ */
+class SchedulerServer(scheduler: SchedulerCore, systemUsername: String, systemPassword: String)(
+  implicit val ec: ExecutionContext,
+  implicit val actorSystem: ActorSystem,
+  implicit val logger: Logging)
+    extends BasicRasService {
+
+  override def routes(implicit transid: TransactionId): Route = {
+    super.routes ~ extractCredentials {
+      case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
+        (path("disable") & post) {

Review comment:
       This exchange is helpful to include as comments in the code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 merged pull request #4983: [New Scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
style95 merged pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 closed pull request #4983: [New Scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
style95 closed pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on a change in pull request #4983: [New Scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
style95 commented on a change in pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#discussion_r496445167



##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.server.Route
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.http.BasicRasService
+import org.apache.openwhisk.http.ErrorResponse.terminate
+import spray.json._
+
+import scala.concurrent.ExecutionContext
+
+/**
+ * Implements web server to handle certain REST API calls.
+ * Currently provides a health ping route, only.
+ */
+class SchedulerServer(scheduler: SchedulerCore, systemUsername: String, systemPassword: String)(
+  implicit val ec: ExecutionContext,
+  implicit val actorSystem: ActorSystem,
+  implicit val logger: Logging)
+    extends BasicRasService {
+
+  override def routes(implicit transid: TransactionId): Route = {
+    super.routes ~ extractCredentials {
+      case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
+        (path("disable") & post) {

Review comment:
       That is for the graceful shutdown.
   Once we disable a scheduler, it no longer receives any new requests.
   After we confirm there is no activation in all queues, we can finally stop the scheduler.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] bdoyle0182 commented on a change in pull request #4983: [New Scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
bdoyle0182 commented on a change in pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#discussion_r496439601



##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler
+
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation $activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def shutdown(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: ${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: $invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: $invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  val dataManagementService = "" // TODO: TBD
+
+  val creationJobManagerFactory = "" // TODO: TBD
+
+  val containerManager = "" // TODO: TBD
+
+  val memoryQueueFactory = "" // TODO: TBD
+
+  val schedulerConsumer = msgProvider.getConsumer(
+    config,
+    s"scheduler${schedulerId.asString}",
+    s"scheduler${schedulerId.asString}",
+    500, // TODO: to be updated with maxPeek variable
+    maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
+
+  implicit val trasnid = TransactionId.containerCreation
+
+  val queueManager = "" // TODO: TBD
+
+  //val serviceHandlers: HttpRequest => Future[HttpResponse] = ActivationServiceHandler.apply(ActivationServiceImpl())  TODO: TBD
+}
+
+case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None)
+
+trait SchedulerCore {
+  def getState: Future[(List[(SchedulerInstanceId, Int)], Int)]
+
+  def getQueueSize: Future[Int]
+
+  def getQueueStatusData: Future[List[String]] // TODO: Change to the real data class other than just string
+
+  def shutdown(): Unit
+}
+
+object Scheduler {
+
+  protected val protocol = loadConfigOrThrow[String]("whisk.scheduler.protocol")
+
+  def requiredProperties =
+    Map(
+      servicePort -> 8080.toString,
+      schedulerHost -> null,
+      schedulerAkkaPort -> null,
+      schedulerRpcPort -> null,
+      WhiskConfig.actionInvokePerMinuteLimit -> null,
+      WhiskConfig.actionInvokeConcurrentLimit -> null,
+      WhiskConfig.triggerFirePerMinuteLimit -> null) ++
+      kafkaHosts ++
+      zookeeperHosts ++
+      wskApiHost ++
+      ExecManifest.requiredProperties
+
+  def initKamon(instance: SchedulerInstanceId): Unit = {
+    // Replace the hostname of the scheduler to the assigned id of the scheduler.
+    val newKamonConfig = Kamon.config
+      .withValue("kamon.environment.host", ConfigValueFactory.fromAnyRef(s"scheduler${instance.asString}"))
+    Kamon.init(newKamonConfig)
+  }
+
+  def main(args: Array[String]): Unit = {
+    implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
+    implicit val actorSystem: ActorSystem =
+      ActorSystem(name = "scheduler-actor-system", defaultExecutionContext = Some(ec))
+    implicit val materializer = ActorMaterializer.create(actorSystem)
+
+    implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
+
+    // Prepare Kamon shutdown
+    CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () =>
+      logger.info(this, s"Shutting down Kamon with coordinated shutdown")
+      Kamon.stopModules().map(_ => Done)
+    }
+
+    // extract configuration data from the environment
+    implicit val config = new WhiskConfig(requiredProperties)
+    val port = config.servicePort.toInt
+    val host = config.schedulerHost
+    val rpcPort = config.schedulerRpcPort.toInt
+    val akkaPort = config.schedulerAkkaPort.toInt
+
+    // if deploying multiple instances (scale out), must pass the instance number as the

Review comment:
       incomplete comment?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on a change in pull request #4983: [New Scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
style95 commented on a change in pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#discussion_r501440428



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -426,3 +426,16 @@ object EventMessage extends DefaultJsonProtocol {
 
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
+
+object StatusQuery
+case class StatusData(invocationNamespace: String, fqn: String, waitingActivation: Int, status: String, data: String)

Review comment:
       This is to query the queue status from the scheduler.
   
   The following is an example.
   When a `MemoryQueue(AkkaFSM)` is running, there can be a different combination of status and data.
   This would be useful to figure out the status when any issue happens in a queue or scheduler.
   
   ```
   [
   ...
     {
       "data": "RunningData",
       "fqn": "whisk.system/elasticsearch/status-alarm@0.0.2",
       "invocationNamespace": "style95",
       "status": "Running",
       "waitingActivation": 1
     }
   ...
   ]
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on pull request #4983: [New Scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
style95 commented on pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#issuecomment-708074761


   Let's postpone merging this PR until we release the OpenWhisk core 1.0


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] bdoyle0182 commented on a change in pull request #4983: [New Scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
bdoyle0182 commented on a change in pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#discussion_r496442005



##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.server.Route
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.http.BasicRasService
+import org.apache.openwhisk.http.ErrorResponse.terminate
+import spray.json._
+
+import scala.concurrent.ExecutionContext
+
+/**
+ * Implements web server to handle certain REST API calls.
+ * Currently provides a health ping route, only.
+ */
+class SchedulerServer(scheduler: SchedulerCore, systemUsername: String, systemPassword: String)(
+  implicit val ec: ExecutionContext,
+  implicit val actorSystem: ActorSystem,
+  implicit val logger: Logging)
+    extends BasicRasService {
+
+  override def routes(implicit transid: TransactionId): Route = {
+    super.routes ~ extractCredentials {
+      case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
+        (path("disable") & post) {

Review comment:
       what's the point of having this disable route?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on a change in pull request #4983: [New Scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
style95 commented on a change in pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#discussion_r496459586



##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler
+
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation $activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def shutdown(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: ${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: $invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: $invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  val dataManagementService = "" // TODO: TBD
+
+  val creationJobManagerFactory = "" // TODO: TBD
+
+  val containerManager = "" // TODO: TBD
+
+  val memoryQueueFactory = "" // TODO: TBD
+
+  val schedulerConsumer = msgProvider.getConsumer(
+    config,
+    s"scheduler${schedulerId.asString}",
+    s"scheduler${schedulerId.asString}",
+    500, // TODO: to be updated with maxPeek variable
+    maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
+
+  implicit val trasnid = TransactionId.containerCreation
+
+  val queueManager = "" // TODO: TBD
+
+  //val serviceHandlers: HttpRequest => Future[HttpResponse] = ActivationServiceHandler.apply(ActivationServiceImpl())  TODO: TBD
+}
+
+case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None)
+
+trait SchedulerCore {
+  def getState: Future[(List[(SchedulerInstanceId, Int)], Int)]
+
+  def getQueueSize: Future[Int]
+
+  def getQueueStatusData: Future[List[String]] // TODO: Change to the real data class other than just string
+
+  def shutdown(): Unit
+}
+
+object Scheduler {
+
+  protected val protocol = loadConfigOrThrow[String]("whisk.scheduler.protocol")
+
+  def requiredProperties =
+    Map(
+      servicePort -> 8080.toString,
+      schedulerHost -> null,
+      schedulerAkkaPort -> null,
+      schedulerRpcPort -> null,
+      WhiskConfig.actionInvokePerMinuteLimit -> null,
+      WhiskConfig.actionInvokeConcurrentLimit -> null,
+      WhiskConfig.triggerFirePerMinuteLimit -> null) ++
+      kafkaHosts ++
+      zookeeperHosts ++
+      wskApiHost ++
+      ExecManifest.requiredProperties
+
+  def initKamon(instance: SchedulerInstanceId): Unit = {
+    // Replace the hostname of the scheduler to the assigned id of the scheduler.
+    val newKamonConfig = Kamon.config
+      .withValue("kamon.environment.host", ConfigValueFactory.fromAnyRef(s"scheduler${instance.asString}"))
+    Kamon.init(newKamonConfig)
+  }
+
+  def main(args: Array[String]): Unit = {
+    implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
+    implicit val actorSystem: ActorSystem =
+      ActorSystem(name = "scheduler-actor-system", defaultExecutionContext = Some(ec))
+    implicit val materializer = ActorMaterializer.create(actorSystem)
+
+    implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
+
+    // Prepare Kamon shutdown
+    CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () =>
+      logger.info(this, s"Shutting down Kamon with coordinated shutdown")
+      Kamon.stopModules().map(_ => Done)
+    }
+
+    // extract configuration data from the environment
+    implicit val config = new WhiskConfig(requiredProperties)
+    val port = config.servicePort.toInt
+    val host = config.schedulerHost
+    val rpcPort = config.schedulerRpcPort.toInt
+    val akkaPort = config.schedulerAkkaPort.toInt
+
+    // if deploying multiple instances (scale out), must pass the instance number as the

Review comment:
       Seems it's introduced from the existing file.
   https://github.com/apache/openwhisk/blob/master/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala#L248
   
   Anyway, updated accordingly.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] bdoyle0182 commented on a change in pull request #4983: [New Scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
bdoyle0182 commented on a change in pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#discussion_r496451072



##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.server.Route
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.http.BasicRasService
+import org.apache.openwhisk.http.ErrorResponse.terminate
+import spray.json._
+
+import scala.concurrent.ExecutionContext
+
+/**
+ * Implements web server to handle certain REST API calls.
+ * Currently provides a health ping route, only.
+ */
+class SchedulerServer(scheduler: SchedulerCore, systemUsername: String, systemPassword: String)(
+  implicit val ec: ExecutionContext,
+  implicit val actorSystem: ActorSystem,
+  implicit val logger: Logging)
+    extends BasicRasService {
+
+  override def routes(implicit transid: TransactionId): Route = {
+    super.routes ~ extractCredentials {
+      case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
+        (path("disable") & post) {

Review comment:
       Can we rename the route to `shutdown`? `disable` I associate more with temporarily turning off the resource but not stopping the entire service. not a big deal if you want to leave it as is




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] codecov-io edited a comment on pull request #4983: [New Scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#issuecomment-707116829


   # [Codecov](https://codecov.io/gh/apache/openwhisk/pull/4983?src=pr&el=h1) Report
   > Merging [#4983](https://codecov.io/gh/apache/openwhisk/pull/4983?src=pr&el=desc) into [master](https://codecov.io/gh/apache/openwhisk/commit/9134a03c37f9be104cfc3523748ea6b8cfbfea38?el=desc) will **decrease** coverage by `6.85%`.
   > The diff coverage is `46.66%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/openwhisk/pull/4983/graphs/tree.svg?width=650&height=150&src=pr&token=l0YmsiSAso)](https://codecov.io/gh/apache/openwhisk/pull/4983?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #4983      +/-   ##
   ==========================================
   - Coverage   83.70%   76.85%   -6.86%     
   ==========================================
     Files         202      202              
     Lines        9808     9823      +15     
     Branches      413      420       +7     
   ==========================================
   - Hits         8210     7549     -661     
   - Misses       1598     2274     +676     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/openwhisk/pull/4983?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [.../org/apache/openwhisk/core/connector/Message.scala](https://codecov.io/gh/apache/openwhisk/pull/4983/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29ubmVjdG9yL01lc3NhZ2Uuc2NhbGE=) | `78.12% <0.00%> (-0.45%)` | :arrow_down: |
   | [.../org/apache/openwhisk/core/entity/InstanceId.scala](https://codecov.io/gh/apache/openwhisk/pull/4983/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZW50aXR5L0luc3RhbmNlSWQuc2NhbGE=) | `80.70% <0.00%> (-9.50%)` | :arrow_down: |
   | [...la/org/apache/openwhisk/common/TransactionId.scala](https://codecov.io/gh/apache/openwhisk/pull/4983/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvbW1vbi9UcmFuc2FjdGlvbklkLnNjYWxh) | `95.18% <100.00%> (+0.05%)` | :arrow_up: |
   | [.../scala/org/apache/openwhisk/core/WhiskConfig.scala](https://codecov.io/gh/apache/openwhisk/pull/4983/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvV2hpc2tDb25maWcuc2NhbGE=) | `95.27% <100.00%> (+0.19%)` | :arrow_up: |
   | [...core/database/cosmosdb/RxObservableImplicits.scala](https://codecov.io/gh/apache/openwhisk/pull/4983/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZGF0YWJhc2UvY29zbW9zZGIvUnhPYnNlcnZhYmxlSW1wbGljaXRzLnNjYWxh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ore/database/cosmosdb/cache/CacheInvalidator.scala](https://codecov.io/gh/apache/openwhisk/pull/4983/diff?src=pr&el=tree#diff-Y29yZS9jb3Ntb3NkYi9jYWNoZS1pbnZhbGlkYXRvci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL29wZW53aGlzay9jb3JlL2RhdGFiYXNlL2Nvc21vc2RiL2NhY2hlL0NhY2hlSW52YWxpZGF0b3Iuc2NhbGE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...e/database/cosmosdb/cache/ChangeFeedConsumer.scala](https://codecov.io/gh/apache/openwhisk/pull/4983/diff?src=pr&el=tree#diff-Y29yZS9jb3Ntb3NkYi9jYWNoZS1pbnZhbGlkYXRvci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL29wZW53aGlzay9jb3JlL2RhdGFiYXNlL2Nvc21vc2RiL2NhY2hlL0NoYW5nZUZlZWRDb25zdW1lci5zY2FsYQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...core/database/cosmosdb/CosmosDBArtifactStore.scala](https://codecov.io/gh/apache/openwhisk/pull/4983/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZGF0YWJhc2UvY29zbW9zZGIvQ29zbW9zREJBcnRpZmFjdFN0b3JlLnNjYWxh) | `0.00% <0.00%> (-95.85%)` | :arrow_down: |
   | [...sk/core/database/cosmosdb/CosmosDBViewMapper.scala](https://codecov.io/gh/apache/openwhisk/pull/4983/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZGF0YWJhc2UvY29zbW9zZGIvQ29zbW9zREJWaWV3TWFwcGVyLnNjYWxh) | `0.00% <0.00%> (-93.90%)` | :arrow_down: |
   | [...tabase/cosmosdb/cache/CacheInvalidatorConfig.scala](https://codecov.io/gh/apache/openwhisk/pull/4983/diff?src=pr&el=tree#diff-Y29yZS9jb3Ntb3NkYi9jYWNoZS1pbnZhbGlkYXRvci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL29wZW53aGlzay9jb3JlL2RhdGFiYXNlL2Nvc21vc2RiL2NhY2hlL0NhY2hlSW52YWxpZGF0b3JDb25maWcuc2NhbGE=) | `0.00% <0.00%> (-92.31%)` | :arrow_down: |
   | ... and [14 more](https://codecov.io/gh/apache/openwhisk/pull/4983/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/openwhisk/pull/4983?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/openwhisk/pull/4983?src=pr&el=footer). Last update [9134a03...4bc1f21](https://codecov.io/gh/apache/openwhisk/pull/4983?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on pull request #4983: [New Scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
style95 commented on pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#issuecomment-725291599


   I merged this as the core 1.0.0 is released.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on a change in pull request #4983: [New Scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
style95 commented on a change in pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#discussion_r496457509



##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.server.Route
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.http.BasicRasService
+import org.apache.openwhisk.http.ErrorResponse.terminate
+import spray.json._
+
+import scala.concurrent.ExecutionContext
+
+/**
+ * Implements web server to handle certain REST API calls.
+ * Currently provides a health ping route, only.
+ */
+class SchedulerServer(scheduler: SchedulerCore, systemUsername: String, systemPassword: String)(
+  implicit val ec: ExecutionContext,
+  implicit val actorSystem: ActorSystem,
+  implicit val logger: Logging)
+    extends BasicRasService {
+
+  override def routes(implicit transid: TransactionId): Route = {
+    super.routes ~ extractCredentials {
+      case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
+        (path("disable") & post) {

Review comment:
       Generally, even if a scheduler is disabled, existing containers still can fetch data from it.
   But in case there are some issues in the system and activations are not properly handled in time, we just drop them with a `whiskError`.
   The grace period for dropping activations is configurable and we are using 10s internally.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on a change in pull request #4983: [New scheduler] Initial commit for the scheduler component

Posted by GitBox <gi...@apache.org>.
style95 commented on a change in pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#discussion_r494661944



##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
##########
@@ -0,0 +1,269 @@
+package org.apache.openwhisk.core.scheduler
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation $activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def shutdown(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: ${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: $invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: $invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  val dataManagementService = "" // TODO: TBD
+
+  val creationJobManagerFactory = "" // TODO: TBD
+
+  val containerManager = "" // TODO: TBD
+
+  val memoryQueueFactory = "" // TODO: TBD

Review comment:
       This is a factory to create memory queues.
   In the new architecture, each action is given its own dedicated queue.
   

##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
##########
@@ -0,0 +1,269 @@
+package org.apache.openwhisk.core.scheduler
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation $activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def shutdown(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: ${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: $invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: $invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  val dataManagementService = "" // TODO: TBD
+
+  val creationJobManagerFactory = "" // TODO: TBD
+
+  val containerManager = "" // TODO: TBD
+
+  val memoryQueueFactory = "" // TODO: TBD
+
+  val schedulerConsumer = msgProvider.getConsumer(
+    config,
+    s"scheduler${schedulerId.asString}",
+    s"scheduler${schedulerId.asString}",
+    500, // TODO: to be updated with maxPeek variable
+    maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
+
+  implicit val trasnid = TransactionId.containerCreation
+
+  val queueManager = "" // TODO: TBD

Review comment:
       This is one of the major components which take charge of managing queues and coordinating requests among the scheduler, controllers, and invokers.

##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
##########
@@ -0,0 +1,269 @@
+package org.apache.openwhisk.core.scheduler
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation $activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def shutdown(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: ${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: $invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: $invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  val dataManagementService = "" // TODO: TBD

Review comment:
       This component is in charge of storing data to ETCD.
   

##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
##########
@@ -0,0 +1,269 @@
+package org.apache.openwhisk.core.scheduler
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation $activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def shutdown(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: ${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: $invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: $invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  val dataManagementService = "" // TODO: TBD
+
+  val creationJobManagerFactory = "" // TODO: TBD
+
+  val containerManager = "" // TODO: TBD
+
+  val memoryQueueFactory = "" // TODO: TBD
+
+  val schedulerConsumer = msgProvider.getConsumer(
+    config,
+    s"scheduler${schedulerId.asString}",
+    s"scheduler${schedulerId.asString}",
+    500, // TODO: to be updated with maxPeek variable
+    maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
+
+  implicit val trasnid = TransactionId.containerCreation
+
+  val queueManager = "" // TODO: TBD
+
+  //val serviceHandlers: HttpRequest => Future[HttpResponse] = ActivationServiceHandler.apply(ActivationServiceImpl())  TODO: TBD
+}
+
+case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None)
+
+trait SchedulerCore {
+  def getState: Future[(List[(SchedulerInstanceId, Int)], Int)]
+
+  def getQueueSize: Future[Int]
+
+  def getQueueStatusData: Future[List[String]] // TODO: Change to the real data class other than just string
+
+  def shutdown(): Unit
+}
+
+object Scheduler {
+
+  protected val protocol = loadConfigOrThrow[String]("whisk.scheduler.protocol")
+
+  def requiredProperties =
+    Map(
+      servicePort -> 8080.toString,
+      schedulerHost -> null,
+      schedulerAkkaPort -> null,

Review comment:
       The scheduler has two ports, one for akka-remote and the other for akka-grpc.
   

##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
##########
@@ -0,0 +1,269 @@
+package org.apache.openwhisk.core.scheduler
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation $activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def shutdown(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: ${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: $invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: $invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  val dataManagementService = "" // TODO: TBD
+
+  val creationJobManagerFactory = "" // TODO: TBD
+
+  val containerManager = "" // TODO: TBD
+
+  val memoryQueueFactory = "" // TODO: TBD
+
+  val schedulerConsumer = msgProvider.getConsumer(
+    config,
+    s"scheduler${schedulerId.asString}",
+    s"scheduler${schedulerId.asString}",
+    500, // TODO: to be updated with maxPeek variable
+    maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
+
+  implicit val trasnid = TransactionId.containerCreation
+
+  val queueManager = "" // TODO: TBD
+
+  //val serviceHandlers: HttpRequest => Future[HttpResponse] = ActivationServiceHandler.apply(ActivationServiceImpl())  TODO: TBD
+}
+
+case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None)
+
+trait SchedulerCore {
+  def getState: Future[(List[(SchedulerInstanceId, Int)], Int)]
+
+  def getQueueSize: Future[Int]
+
+  def getQueueStatusData: Future[List[String]] // TODO: Change to the real data class other than just string
+
+  def shutdown(): Unit
+}
+
+object Scheduler {
+
+  protected val protocol = loadConfigOrThrow[String]("whisk.scheduler.protocol")
+
+  def requiredProperties =
+    Map(
+      servicePort -> 8080.toString,
+      schedulerHost -> null,
+      schedulerAkkaPort -> null,
+      schedulerRpcPort -> null,
+      WhiskConfig.actionInvokePerMinuteLimit -> null,
+      WhiskConfig.actionInvokeConcurrentLimit -> null,
+      WhiskConfig.triggerFirePerMinuteLimit -> null) ++
+      kafkaHosts ++
+      zookeeperHosts ++
+      wskApiHost ++
+      ExecManifest.requiredProperties
+
+  def initKamon(instance: SchedulerInstanceId): Unit = {
+    // Replace the hostname of the scheduler to the assigned id of the scheduler.
+    val newKamonConfig = Kamon.config
+      .withValue("kamon.environment.host", ConfigValueFactory.fromAnyRef(s"scheduler${instance.asString}"))
+    Kamon.init(newKamonConfig)
+  }
+
+  def main(args: Array[String]): Unit = {
+    implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
+    implicit val actorSystem: ActorSystem =
+      ActorSystem(name = "scheduler-actor-system", defaultExecutionContext = Some(ec))
+    implicit val materializer = ActorMaterializer.create(actorSystem)
+
+    implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
+
+    // Prepare Kamon shutdown
+    CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () =>
+      logger.info(this, s"Shutting down Kamon with coordinated shutdown")
+      Kamon.stopModules().map(_ => Done)
+    }
+
+    // extract configuration data from the environment
+    implicit val config = new WhiskConfig(requiredProperties)
+    val port = config.servicePort.toInt
+    val host = config.schedulerHost
+    val rpcPort = config.schedulerRpcPort.toInt
+    val akkaPort = config.schedulerAkkaPort.toInt
+
+    // if deploying multiple instances (scale out), must pass the instance number as the
+    require(args.length >= 1, "controller instance required")
+    val instanceId = SchedulerInstanceId(args(0))
+
+    initKamon(instanceId)
+
+    def abort(message: String) = {
+      logger.error(this, message)
+      actorSystem.terminate()
+      Await.result(actorSystem.whenTerminated, 30.seconds)
+      sys.exit(1)
+    }
+
+    if (!config.isValid) {
+      abort("Bad configuration, cannot start.")
+    }
+
+    val execManifest = ExecManifest.initialize(config)
+    if (execManifest.isFailure) {
+      logger.error(this, s"Invalid runtimes manifest: ${execManifest.failed.get}")
+      abort("Bad configuration, cannot start.")
+    }
+
+    val msgProvider = SpiLoader.get[MessagingProvider]
+
+    Seq(
+      ("scheduler" + instanceId.asString, "actions", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)),
+      ("creationAck" + instanceId.asString, "creationAck", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)))

Review comment:
       The final goal is to remove Kafka from the critical path, but it still relies on Kafka as of now.
   Now activation messages are sent to the scheduler via `schedulerN` topic and container creation messages are sent to invoker via `invokerN` topic.
   

##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
##########
@@ -0,0 +1,269 @@
+package org.apache.openwhisk.core.scheduler
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation $activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def shutdown(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: ${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: $invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: $invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  val dataManagementService = "" // TODO: TBD
+
+  val creationJobManagerFactory = "" // TODO: TBD
+
+  val containerManager = "" // TODO: TBD

Review comment:
       This component is responsible for creating containers for a given action.
   It relies on the `creationJobManager` to manage the container creation job.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org