You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2020/09/29 06:11:20 UTC

[GitHub] [openwhisk] bdoyle0182 commented on a change in pull request #4983: [New Scheduler] Initial commit for the scheduler component

bdoyle0182 commented on a change in pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#discussion_r496439601



##########
File path: core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler
+
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation $activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def shutdown(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: ${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: $invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: $invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  val dataManagementService = "" // TODO: TBD
+
+  val creationJobManagerFactory = "" // TODO: TBD
+
+  val containerManager = "" // TODO: TBD
+
+  val memoryQueueFactory = "" // TODO: TBD
+
+  val schedulerConsumer = msgProvider.getConsumer(
+    config,
+    s"scheduler${schedulerId.asString}",
+    s"scheduler${schedulerId.asString}",
+    500, // TODO: to be updated with maxPeek variable
+    maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
+
+  implicit val trasnid = TransactionId.containerCreation
+
+  val queueManager = "" // TODO: TBD
+
+  //val serviceHandlers: HttpRequest => Future[HttpResponse] = ActivationServiceHandler.apply(ActivationServiceImpl())  TODO: TBD
+}
+
+case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None)
+
+trait SchedulerCore {
+  def getState: Future[(List[(SchedulerInstanceId, Int)], Int)]
+
+  def getQueueSize: Future[Int]
+
+  def getQueueStatusData: Future[List[String]] // TODO: Change to the real data class other than just string
+
+  def shutdown(): Unit
+}
+
+object Scheduler {
+
+  protected val protocol = loadConfigOrThrow[String]("whisk.scheduler.protocol")
+
+  def requiredProperties =
+    Map(
+      servicePort -> 8080.toString,
+      schedulerHost -> null,
+      schedulerAkkaPort -> null,
+      schedulerRpcPort -> null,
+      WhiskConfig.actionInvokePerMinuteLimit -> null,
+      WhiskConfig.actionInvokeConcurrentLimit -> null,
+      WhiskConfig.triggerFirePerMinuteLimit -> null) ++
+      kafkaHosts ++
+      zookeeperHosts ++
+      wskApiHost ++
+      ExecManifest.requiredProperties
+
+  def initKamon(instance: SchedulerInstanceId): Unit = {
+    // Replace the hostname of the scheduler to the assigned id of the scheduler.
+    val newKamonConfig = Kamon.config
+      .withValue("kamon.environment.host", ConfigValueFactory.fromAnyRef(s"scheduler${instance.asString}"))
+    Kamon.init(newKamonConfig)
+  }
+
+  def main(args: Array[String]): Unit = {
+    implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
+    implicit val actorSystem: ActorSystem =
+      ActorSystem(name = "scheduler-actor-system", defaultExecutionContext = Some(ec))
+    implicit val materializer = ActorMaterializer.create(actorSystem)
+
+    implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
+
+    // Prepare Kamon shutdown
+    CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () =>
+      logger.info(this, s"Shutting down Kamon with coordinated shutdown")
+      Kamon.stopModules().map(_ => Done)
+    }
+
+    // extract configuration data from the environment
+    implicit val config = new WhiskConfig(requiredProperties)
+    val port = config.servicePort.toInt
+    val host = config.schedulerHost
+    val rpcPort = config.schedulerRpcPort.toInt
+    val akkaPort = config.schedulerAkkaPort.toInt
+
+    // if deploying multiple instances (scale out), must pass the instance number as the

Review comment:
       incomplete comment?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org