You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2021/03/02 01:10:07 UTC

[GitHub] [openwhisk] ningyougang opened a new pull request #5061: [New Scheduler] Implement InvokerHealthyManager

ningyougang opened a new pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061


   <!--- Provide a concise summary of your changes in the Title -->
   
   ## Description
   Manage New scheduler's invoker healthy management
   Design document: https://cwiki.apache.org/confluence/display/OPENWHISK/InvokerHealthyManager
   
   ## Related issue and scope
   <!--- Please include a link to a related issue if there is one. -->
   - [ ] I opened an issue to propose and discuss this change (#????)
   
   ## My changes affect the following components
   <!--- Select below all system components are affected by your change. -->
   <!--- Enter an `x` in all applicable boxes. -->
   - [ ] API
   - [ ] Controller
   - [ ] Message Bus (e.g., Kafka)
   - [ ] Loadbalancer
   - [x] Invoker
   - [ ] Intrinsic actions (e.g., sequences, conductors)
   - [ ] Data stores (e.g., CouchDB)
   - [ ] Tests
   - [ ] Deployment
   - [ ] CLI
   - [ ] General tooling
   - [ ] Documentation
   
   ## Types of changes
   <!--- What types of changes does your code introduce? Use `x` in all the boxes that apply: -->
   - [ ] Bug fix (generally a non-breaking change which closes an issue).
   - [x] Enhancement or new feature (adds new functionality).
   - [ ] Breaking change (a bug fix or enhancement which changes existing behavior).
   
   ## Checklist:
   <!--- Please review the points below which help you make sure you've covered all aspects of the change you're making. -->
   
   - [x] I signed an [Apache CLA](https://github.com/apache/openwhisk/blob/master/CONTRIBUTING.md).
   - [x] I reviewed the [style guides](https://github.com/apache/openwhisk/wiki/Contributing:-Git-guidelines#code-readiness) and followed the recommendations (Travis CI will check :).
   - [x] I added tests to cover my changes.
   - [ ] My changes require further changes to the documentation.
   - [ ] I updated the documentation where necessary.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] codecov-io edited a comment on pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#issuecomment-773863520


   # [Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=h1) Report
   > Merging [#5061](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=desc) (77d68af) into [master](https://codecov.io/gh/apache/openwhisk/commit/4a13303fae4d9750da6662bb39b3ec92d6ccf56c?el=desc) (4a13303) will **decrease** coverage by `46.75%`.
   > The diff coverage is `73.45%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/openwhisk/pull/5061/graphs/tree.svg?width=650&height=150&src=pr&token=l0YmsiSAso)](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff             @@
   ##           master    #5061       +/-   ##
   ===========================================
   - Coverage   81.95%   35.19%   -46.76%     
   ===========================================
     Files         210      205        -5     
     Lines       10167    10063      -104     
     Branches      440      426       -14     
   ===========================================
   - Hits         8332     3542     -4790     
   - Misses       1835     6521     +4686     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...ntainerpool/v2/FunctionPullingContainerProxy.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9pbnZva2VyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29udGFpbmVycG9vbC92Mi9GdW5jdGlvblB1bGxpbmdDb250YWluZXJQcm94eS5zY2FsYQ==) | `0.00% <0.00%> (ø)` | |
   | [...la/org/apache/openwhisk/common/TransactionId.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvbW1vbi9UcmFuc2FjdGlvbklkLnNjYWxh) | `91.76% <50.00%> (-3.42%)` | :arrow_down: |
   | [.../core/containerpool/v2/InvokerHealthyManager.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9pbnZva2VyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29udGFpbmVycG9vbC92Mi9JbnZva2VySGVhbHRoeU1hbmFnZXIuc2NhbGE=) | `75.37% <75.37%> (ø)` | |
   | [.../org/apache/openwhisk/core/connector/Message.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29ubmVjdG9yL01lc3NhZ2Uuc2NhbGE=) | `44.92% <80.00%> (-34.76%)` | :arrow_down: |
   | [.../org/apache/openwhisk/core/entity/InstanceId.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZW50aXR5L0luc3RhbmNlSWQuc2NhbGE=) | `81.81% <90.00%> (-9.41%)` | :arrow_down: |
   | [...a/org/apache/openwhisk/common/ConfigMapValue.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvbW1vbi9Db25maWdNYXBWYWx1ZS5zY2FsYQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../apache/openwhisk/core/controller/Namespaces.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9jb250cm9sbGVyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29udHJvbGxlci9OYW1lc3BhY2VzLnNjYWxh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...pache/openwhisk/core/controller/CorsSettings.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9jb250cm9sbGVyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29udHJvbGxlci9Db3JzU2V0dGluZ3Muc2NhbGE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...che/openwhisk/core/entitlement/RateThrottler.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9jb250cm9sbGVyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZW50aXRsZW1lbnQvUmF0ZVRocm90dGxlci5zY2FsYQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...he/openwhisk/core/entitlement/KindRestrictor.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9jb250cm9sbGVyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZW50aXRsZW1lbnQvS2luZFJlc3RyaWN0b3Iuc2NhbGE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [146 more](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=footer). Last update [4a13303...77d68af](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r572519467



##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthyManager.scala
##########
@@ -0,0 +1,379 @@
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
+import org.apache.openwhisk.core.entitlement.Privilege
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.types.EntityStore
+import org.apache.openwhisk.core.entity.{ActivationResponse => _, _}
+import org.apache.openwhisk.core.etcd.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+class InvokerHealthManager(instanceId: InvokerInstanceId,
+                           healthContainerProxyFactory: (ActorRefFactory, ActorRef) => ActorRef,
+                           dataManagementService: ActorRef,
+                           entityStore: ArtifactStore[WhiskEntity])(implicit actorSystem: ActorSystem, logging: Logging)
+    extends FSM[InvokerState, InvokerHealthData]
+    with Stash {
+
+  implicit val requestTimeout = Timeout(5.seconds)
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  implicit val transid: TransactionId = TransactionId.invokerHealth
+
+  private[containerpool] var healthActionProxy: Option[ActorRef] = None
+
+  startWith(
+    Offline,
+    InvokerInfo(
+      new RingBuffer[Boolean](InvokerHealthManager.bufferSize),
+      memory = MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+
+  when(Offline) {
+    case Event(GracefulShutdown, _: InvokerInfo) =>
+      logging.warn(this, "Received a graceful shutdown flag, stopping the invoker.")
+      stay
+
+    case Event(Enable, _) =>
+      InvokerHealthManager.prepare(entityStore, instanceId).map { _ =>
+        startTestAction(self)
+      }
+
+      goto(UnHealthy)
+  }
+
+  when(UnHealthy) {
+    case Event(ContainerRemoved, _) =>
+      healthActionProxy = None
+      startTestAction(self)
+
+      stay
+
+    case Event(msg: FailureMessage, _) =>
+      logging.error(this, s"invoker${instanceId}, status:${stateName} got a failure message: ${msg}")
+
+      stay
+
+    case Event(ContainerCreationFailed(_), _) =>
+      stay
+  }
+
+  when(Healthy) {
+    case Event(msg: FailureMessage, _) =>
+      logging.error(this, s"invoker${instanceId}, status:${stateName} got a failure message: ${msg}")
+      goto(UnHealthy)
+  }
+
+  whenUnhandled {
+    case Event(_: Initialized, _) =>
+      // Initialized messages sent by ContainerProxy for HealthManger
+      stay()
+    case Event(ContainerRemoved, _) =>
+      // Drop messages sent by ContainerProxy for HealthManger
+      healthActionProxy = None
+      stay()
+
+    case Event(GracefulShutdown, _) =>
+      self ! GracefulShutdown
+      goto(Offline)
+
+    case Event(healthMsg: HealthMessage, data: InvokerInfo) =>
+      if (stateName != Offline) {
+        handleHealthMessage(healthMsg.state, data.buffer)
+      } else {
+        stay
+      }
+
+    case Event(memoryInfo: MemoryInfo, data: InvokerInfo) =>
+      publishHealthStatusAndStay(stateName, data.copy(memory = memoryInfo))
+
+    // in case of StatusRuntimeException: NOT_FOUND: etcdserver: requested lease not found, we need to get the lease again.
+    case Event(t: FailureMessage, _) =>
+      logging.error(this, s"Failure happens, restart InvokerHealthManager: ${t}")
+
+      goto(Offline)
+
+  }
+
+  // It is important to note that stateName and the stateData in onTransition callback refer to the previous one.
+  // We should access to the next data with nextStateData
+  onTransition {
+    case Offline -> UnHealthy =>
+      publishHealthStatusAndStay(UnHealthy, nextStateData)
+
+    case Healthy -> UnHealthy =>
+      unstashAll()
+      transid.mark(
+        this,
+        LoggingMarkers.LOADBALANCER_INVOKER_STATUS_CHANGE(UnHealthy.asString),
+        s"invoker${instanceId.toInt} is unhealthy",
+        akka.event.Logging.WarningLevel)
+      startTestAction(self)
+      publishHealthStatusAndStay(UnHealthy, nextStateData)
+
+    case _ -> Healthy =>
+      logging.info(this, s"invoker became healthy, stop health action proxy.")
+      unstashAll()
+      stopTestAction()
+
+      publishHealthStatusAndStay(Healthy, nextStateData)
+
+    case Offline -> Offline =>
+    // this is an initial transition due to startWith, do nothing
+
+    case _ -> newState =>
+      publishHealthStatusAndStay(newState, nextStateData)
+
+      unstashAll()
+
+  }
+
+  private def publishHealthStatusAndStay(state: InvokerState, stateData: InvokerHealthData) = {
+    stateData match {
+      case data: InvokerInfo =>
+        val invokerResourceMessage = InvokerResourceMessage(
+          state.asString,
+          data.memory.freeMemory,
+          data.memory.busyMemory,
+          data.memory.inProgressMemory,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces)
+        InvokerHealthManager.useMemory = invokerResourceMessage.busyMemory + invokerResourceMessage.inProgressMemory
+        dataManagementService ! UpdateDataOnChange(InvokerKeys.health(instanceId), invokerResourceMessage.serialize)
+
+        stay using data.copy(currentInvokerResource = Some(invokerResourceMessage))
+
+      case data =>
+        logging.error(this, s"unexpected data is found: $data")
+
+        stay
+    }
+  }
+
+  initialize()
+
+  private def startTestAction(manager: ActorRef): Unit = {
+    val namespace = InvokerHealthManager.healthActionIdentity.namespace.name.asString
+    val docId = InvokerHealthManager.healthAction(instanceId).get.docid
+
+    WhiskAction.get(entityStore, docId).onComplete {
+      case Success(action) =>
+        val initialize = Initialize(namespace, action.toExecutableWhiskAction.get, "", 0, transid)
+        startHealthAction(initialize, manager)
+      case Failure(t) => logging.error(this, s"get health action error: ${t.getMessage}")
+    }
+  }
+
+  private def startHealthAction(initialize: Initialize, manager: ActorRef): Unit = {
+    healthActionProxy match {
+      case Some(proxy) =>
+        // make healthContainerProxy's status is Running, then healthContainerProxy can fetch the activation using ActivationServiceClient
+        proxy ! initialize
+      case None =>
+        val proxy = healthContainerProxyFactory(context, manager)
+        proxy ! initialize
+        healthActionProxy = Some(proxy)
+    }
+  }
+
+  def stopTestAction(): Unit = {
+    healthActionProxy.foreach {
+      healthActionProxy = None
+      _ ! GracefulShutdown
+    }
+  }
+
+  /**
+   * This method is to handle health message from ContainerProxy.pub
+   * It can induce status change.
+   *
+   * @param state  activation result state
+   * @param buffer RingBuffer to track status
+   * @return
+   */
+  def handleHealthMessage(state: Boolean, buffer: RingBuffer[Boolean]): State = {
+    buffer.add(state)
+    val falseStateCount = buffer.toList.count(_ == false)
+    if (falseStateCount < InvokerHealthManager.bufferErrorTolerance) {
+      gotoIfNotThere(Healthy)
+    } else {
+      logging.warn(
+        this,
+        s"become unhealthy because system error exceeded the error tolerance, falseStateCount $falseStateCount, errorTolerance ${InvokerHealthManager.bufferErrorTolerance}")
+      gotoIfNotThere(UnHealthy)
+    }
+  }
+
+  /**
+   * This is to decide wether to change from the newState or not.
+   * If current state is already newState, it will stay, otherwise it will change its state.
+   *
+   * @param newState the desired state to change.
+   * @return
+   */
+  private def gotoIfNotThere(newState: InvokerState) = {
+    if (stateName == newState) {
+      stay()
+    } else {
+      goto(newState)
+    }
+  }
+
+  /** Delays all incoming messages until unstashAll() is called */
+  def delay = {
+    stash()
+    stay
+  }
+
+}
+
+case class HealthActivationServiceClient() extends Actor {
+
+  private var closed: Boolean = false
+
+  override def receive: Receive = {
+    case StartClient => sender() ! ClientCreationCompleted()
+    case _: RequestActivation =>
+      InvokerHealthManager.healthActivation match {
+        case Some(activation) if !closed =>
+          sender() ! activation.copy(
+            transid = TransactionId.invokerHealthActivation,
+            activationId = ActivationId.generate())
+
+        case _ if closed =>
+          context.parent ! ClientClosed
+          context.stop(self)
+
+        case _ => // do nothing
+      }
+
+    case CloseClientProxy =>
+      closed = true
+
+  }
+}
+
+object InvokerHealthManager {
+  val healthActionNamePrefix = "invokerHealthTestAction"
+  val bufferSize = 10
+  val bufferErrorTolerance = 3
+
+  var useMemory = 0l

Review comment:
       I removed `var useMemory = 0l`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] codecov-io edited a comment on pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#issuecomment-773863520


   # [Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=h1) Report
   > Merging [#5061](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=desc) (c90f829) into [master](https://codecov.io/gh/apache/openwhisk/commit/4a13303fae4d9750da6662bb39b3ec92d6ccf56c?el=desc) (4a13303) will **decrease** coverage by `6.76%`.
   > The diff coverage is `73.45%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/openwhisk/pull/5061/graphs/tree.svg?width=650&height=150&src=pr&token=l0YmsiSAso)](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #5061      +/-   ##
   ==========================================
   - Coverage   81.95%   75.18%   -6.77%     
   ==========================================
     Files         210      213       +3     
     Lines       10167    10376     +209     
     Branches      440      433       -7     
   ==========================================
   - Hits         8332     7801     -531     
   - Misses       1835     2575     +740     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...ntainerpool/v2/FunctionPullingContainerProxy.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9pbnZva2VyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29udGFpbmVycG9vbC92Mi9GdW5jdGlvblB1bGxpbmdDb250YWluZXJQcm94eS5zY2FsYQ==) | `0.00% <0.00%> (ø)` | |
   | [...la/org/apache/openwhisk/common/TransactionId.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvbW1vbi9UcmFuc2FjdGlvbklkLnNjYWxh) | `94.11% <50.00%> (-1.07%)` | :arrow_down: |
   | [.../core/containerpool/v2/InvokerHealthyManager.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9pbnZva2VyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29udGFpbmVycG9vbC92Mi9JbnZva2VySGVhbHRoeU1hbmFnZXIuc2NhbGE=) | `75.37% <75.37%> (ø)` | |
   | [.../org/apache/openwhisk/core/connector/Message.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29ubmVjdG9yL01lc3NhZ2Uuc2NhbGE=) | `79.71% <80.00%> (+0.02%)` | :arrow_up: |
   | [.../org/apache/openwhisk/core/entity/InstanceId.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZW50aXR5L0luc3RhbmNlSWQuc2NhbGE=) | `90.90% <90.00%> (-0.32%)` | :arrow_down: |
   | [...core/database/cosmosdb/RxObservableImplicits.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZGF0YWJhc2UvY29zbW9zZGIvUnhPYnNlcnZhYmxlSW1wbGljaXRzLnNjYWxh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ore/database/cosmosdb/cache/CacheInvalidator.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9jb3Ntb3NkYi9jYWNoZS1pbnZhbGlkYXRvci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL29wZW53aGlzay9jb3JlL2RhdGFiYXNlL2Nvc21vc2RiL2NhY2hlL0NhY2hlSW52YWxpZGF0b3Iuc2NhbGE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...e/database/cosmosdb/cache/ChangeFeedConsumer.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9jb3Ntb3NkYi9jYWNoZS1pbnZhbGlkYXRvci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL29wZW53aGlzay9jb3JlL2RhdGFiYXNlL2Nvc21vc2RiL2NhY2hlL0NoYW5nZUZlZWRDb25zdW1lci5zY2FsYQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...core/database/cosmosdb/CosmosDBArtifactStore.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZGF0YWJhc2UvY29zbW9zZGIvQ29zbW9zREJBcnRpZmFjdFN0b3JlLnNjYWxh) | `0.00% <0.00%> (-95.85%)` | :arrow_down: |
   | [...sk/core/database/cosmosdb/CosmosDBViewMapper.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZGF0YWJhc2UvY29zbW9zZGIvQ29zbW9zREJWaWV3TWFwcGVyLnNjYWxh) | `0.00% <0.00%> (-93.90%)` | :arrow_down: |
   | ... and [18 more](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=footer). Last update [4a13303...c90f829](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] codecov-io edited a comment on pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#issuecomment-773863520


   # [Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=h1) Report
   > Merging [#5061](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=desc) (a440d86) into [master](https://codecov.io/gh/apache/openwhisk/commit/5eda22171a238e933121b3918c5940e37fb009c5?el=desc) (5eda221) will **decrease** coverage by `47.39%`.
   > The diff coverage is `6.17%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/openwhisk/pull/5061/graphs/tree.svg?width=650&height=150&src=pr&token=l0YmsiSAso)](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff             @@
   ##           master    #5061       +/-   ##
   ===========================================
   - Coverage   81.26%   33.86%   -47.40%     
   ===========================================
     Files         203      205        +2     
     Lines        9902    10063      +161     
     Branches      437      426       -11     
   ===========================================
   - Hits         8047     3408     -4639     
   - Misses       1855     6655     +4800     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [.../org/apache/openwhisk/core/connector/Message.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29ubmVjdG9yL01lc3NhZ2Uuc2NhbGE=) | `38.40% <0.00%> (-35.04%)` | :arrow_down: |
   | [...ntainerpool/v2/FunctionPullingContainerProxy.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9pbnZva2VyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29udGFpbmVycG9vbC92Mi9GdW5jdGlvblB1bGxpbmdDb250YWluZXJQcm94eS5zY2FsYQ==) | `0.00% <0.00%> (ø)` | |
   | [.../core/containerpool/v2/InvokerHealthyManager.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9pbnZva2VyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29udGFpbmVycG9vbC92Mi9JbnZva2VySGVhbHRoeU1hbmFnZXIuc2NhbGE=) | `0.00% <0.00%> (ø)` | |
   | [...la/org/apache/openwhisk/common/TransactionId.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvbW1vbi9UcmFuc2FjdGlvbklkLnNjYWxh) | `91.76% <50.00%> (-3.42%)` | :arrow_down: |
   | [.../org/apache/openwhisk/core/entity/InstanceId.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZW50aXR5L0luc3RhbmNlSWQuc2NhbGE=) | `72.72% <90.00%> (-18.51%)` | :arrow_down: |
   | [...a/org/apache/openwhisk/common/ConfigMapValue.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvbW1vbi9Db25maWdNYXBWYWx1ZS5zY2FsYQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../apache/openwhisk/core/controller/Namespaces.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9jb250cm9sbGVyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29udHJvbGxlci9OYW1lc3BhY2VzLnNjYWxh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...pache/openwhisk/core/controller/CorsSettings.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9jb250cm9sbGVyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29udHJvbGxlci9Db3JzU2V0dGluZ3Muc2NhbGE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...che/openwhisk/core/entitlement/RateThrottler.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9jb250cm9sbGVyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZW50aXRsZW1lbnQvUmF0ZVRocm90dGxlci5zY2FsYQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...he/openwhisk/core/entitlement/KindRestrictor.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9jb250cm9sbGVyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZW50aXRsZW1lbnQvS2luZFJlc3RyaWN0b3Iuc2NhbGE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [138 more](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=footer). Last update [5eda221...a440d86](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r570691563



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],
+                                  dedicatedNamespaces: Seq[String])
+    extends Message {
+
+  /**
+   * Serializes message to string. Must be idempotent.
+   */
+  override def serialize: String = InvokerResourceMessage.serdes.write(this).compactPrint
+
+  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
+
+  override def equals(that: Any): Boolean =

Review comment:
       Ah, yes, i tested in my local, after removed `the override def equals and the override def hashCode`, works well both, e.g.
   for two InvokerResourceMessage object,
   * If all field value is same, object1.equals(object) will be `true`, 
   * if any one filed value is different,  object1.equals(object) will be `false` 
   
   I already removed `the override def equals and the override def hashCode` in this pr




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r570695768



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -440,34 +440,6 @@ case class InvokerResourceMessage(status: String,
    * Serializes message to string. Must be idempotent.
    */
   override def serialize: String = InvokerResourceMessage.serdes.write(this).compactPrint
-
-  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
-
-  override def equals(that: Any): Boolean =
-    that match {
-      case that: InvokerResourceMessage => {
-        that.canEqual(this) &&
-        this.status == that.status &&
-        this.freeMemory == that.freeMemory &&
-        this.busyMemory == that.busyMemory &&
-        this.inProgressMemory == that.inProgressMemory &&
-        this.tags.toSet == that.tags.toSet
-        this.dedicatedNamespaces.toSet == that.dedicatedNamespaces.toSet
-      }
-      case _ => false
-    }
-
-  override def hashCode: Int = {

Review comment:
       After removed InvokerResourceMessage's hashCode and equals, i tested below scenes
   
   for two InvokerResourceMessage object,
   * If all field value is same, object1.equals(object) will be `true`, 
   * if any one filed value is different,  object1.equals(object) will be `false`
   
   Regarding `why added the override hashCode and equals in our code`
   Seems have no specifal reason 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang closed pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang closed pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] bdoyle0182 commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
bdoyle0182 commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r570470462



##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthyManager.scala
##########
@@ -0,0 +1,379 @@
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
+import org.apache.openwhisk.core.entitlement.Privilege
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.types.EntityStore
+import org.apache.openwhisk.core.entity.{ActivationResponse => _, _}
+import org.apache.openwhisk.core.etcd.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+class InvokerHealthManager(instanceId: InvokerInstanceId,
+                           healthContainerProxyFactory: (ActorRefFactory, ActorRef) => ActorRef,
+                           dataManagementService: ActorRef,
+                           entityStore: ArtifactStore[WhiskEntity])(implicit actorSystem: ActorSystem, logging: Logging)
+    extends FSM[InvokerState, InvokerHealthData]
+    with Stash {
+
+  implicit val requestTimeout = Timeout(5.seconds)
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  implicit val transid: TransactionId = TransactionId.invokerHealth
+
+  private[containerpool] var healthActionProxy: Option[ActorRef] = None
+
+  startWith(
+    Offline,
+    InvokerInfo(
+      new RingBuffer[Boolean](InvokerHealthManager.bufferSize),
+      memory = MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+
+  when(Offline) {
+    case Event(GracefulShutdown, _: InvokerInfo) =>
+      logging.warn(this, "Received a graceful shutdown flag, stopping the invoker.")
+      stay
+
+    case Event(Enable, _) =>
+      InvokerHealthManager.prepare(entityStore, instanceId).map { _ =>
+        startTestAction(self)
+      }
+
+      goto(UnHealthy)
+  }
+
+  when(UnHealthy) {

Review comment:
       ```suggestion
     when(Unhealthy) {
   ```

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],

Review comment:
       What are tags used for?

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala
##########
@@ -29,11 +29,18 @@ import scala.util.Try
  * @param instance a numeric value used for the load balancing and Kafka topic creation
  * @param uniqueName an identifier required for dynamic instance assignment by Zookeeper
  * @param displayedName an identifier that is required for the health protocol to correlate Kafka topics with invoker container names
+ * @param userMemory invoker user memory
+ * @param busyMemory invoker busy memory
+ * @param tags actions which included specified annotation tags can be run on this invoker
+ * @param dedicatedNamespaces only dedicatedNamespaces's actions can be run on this invoker
  */
 case class InvokerInstanceId(val instance: Int,
                              uniqueName: Option[String] = None,
                              displayedName: Option[String] = None,
-                             val userMemory: ByteSize)
+                             val userMemory: ByteSize,

Review comment:
       Is this going to affect the message bus? As always I have to check how this will affect rolling restarts of the controllers and invokers, will one component be unhealthy while the other is upgraded.

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],
+                                  dedicatedNamespaces: Seq[String])
+    extends Message {
+
+  /**
+   * Serializes message to string. Must be idempotent.
+   */
+  override def serialize: String = InvokerResourceMessage.serdes.write(this).compactPrint
+
+  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
+
+  override def equals(that: Any): Boolean =
+    that match {
+      case that: InvokerResourceMessage => {
+        that.canEqual(this) &&
+        this.status == that.status &&
+        this.freeMemory == that.freeMemory &&
+        this.busyMemory == that.busyMemory &&
+        this.inProgressMemory == that.inProgressMemory &&
+        this.tags.toSet == that.tags.toSet
+        this.dedicatedNamespaces.toSet == that.dedicatedNamespaces.toSet
+      }
+      case _ => false
+    }
+
+  override def hashCode: Int = {
+    var result = 1;

Review comment:
       nit: you can map over each one to make this functional rather than use a var. i.e.
   
   `1.map(start => prime * start + status.hashCode()).map(next => prime * next + freeMemory.hashCode())...`

##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
##########
@@ -0,0 +1,89 @@
+package org.apache.openwhisk.core.containerpool.v2
+
+import java.time.Instant
+
+import akka.actor.ActorRef
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.containerpool.Container
+import org.apache.openwhisk.core.entity.{ByteSize, CodeExec, DocRevision, ExecutableWhiskAction}
+import org.apache.openwhisk.core.entity.size._
+
+// Events received by the actor
+case class Initialize(invocationNamespace: String,
+                      action: ExecutableWhiskAction,
+                      schedulerHost: String,
+                      rpcPort: Int,
+                      transId: TransactionId)
+case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
+
+// Event sent by the actor
+case class ContainerCreationFailed(throwable: Throwable)
+case class ContainerIsPaused(data: WarmData)
+case class ClientCreationFailed(throwable: Throwable,
+                                container: Container,
+                                invocationNamespace: String,
+                                action: ExecutableWhiskAction)
+case class ReadyToWork(data: Data)
+case class Initialized(data: InitializedData)
+case class Resumed(data: WarmData)
+case class ResumeFailed(data: WarmData)
+case class RecreateClient(action: ExecutableWhiskAction)
+case object ContainerRemoved // when container is destroyed
+
+// States
+sealed trait ProxyState
+case object LeaseStart extends ProxyState
+case object Uninitialized extends ProxyState
+case object ContainerCreating extends ProxyState
+case object ContainerCreated extends ProxyState
+case object ClientCreating extends ProxyState
+case object ClientCreated extends ProxyState
+case object Running extends ProxyState
+case object Pausing extends ProxyState
+case object Paused extends ProxyState
+case object Removing extends ProxyState
+case object Rescheduling extends ProxyState
+
+// Data
+sealed abstract class Data(val memoryLimit: ByteSize) {
+  def getContainer: Option[Container]
+}
+case class NoExistData() extends Data(0.B) {

Review comment:
       ```suggestion
   case class NonexistentData() extends Data(0.B) {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] rabbah commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
rabbah commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r570578014



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],
+                                  dedicatedNamespaces: Seq[String])
+    extends Message {
+
+  /**
+   * Serializes message to string. Must be idempotent.
+   */
+  override def serialize: String = InvokerResourceMessage.serdes.write(this).compactPrint
+
+  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
+
+  override def equals(that: Any): Boolean =

Review comment:
       is this method needed? cases class equals already provides this implementation by definition.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r570713341



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],

Review comment:
       This is used for some special actions. e.g. need high memory, need power gpu.
   We can make the action's invocations run on corresponding invokers
   
   * Firstly, add some annotation to the actions, e.g.
   
   ```
   wsk action create hello-gpu ~/hello-gpu.js --annotation  invoker-resources ["gpu"]
   ```
   * Secondly, when deploy invoker, add the relative tag to the invoker as well
   ```
   whisk/invokers/0/0
   {"busyMemory":0,"dedicatedNamespaces":[],"freeMemory":10240,"inProgressMemory":0,"status":"up","tags":['gpu']}
   whisk/invokers/1/1
   {"busyMemory":0,"dedicatedNamespaces":[],"freeMemory":10240,"inProgressMemory":0,"status":"up","tags":[]}
   ```
   * Finally, when run the action, the activations for that action will run on above invoker0 which includes tag: gpu




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
style95 commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r570686880



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -440,34 +440,6 @@ case class InvokerResourceMessage(status: String,
    * Serializes message to string. Must be idempotent.
    */
   override def serialize: String = InvokerResourceMessage.serdes.write(this).compactPrint
-
-  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
-
-  override def equals(that: Any): Boolean =
-    that match {
-      case that: InvokerResourceMessage => {
-        that.canEqual(this) &&
-        this.status == that.status &&
-        this.freeMemory == that.freeMemory &&
-        this.busyMemory == that.busyMemory &&
-        this.inProgressMemory == that.inProgressMemory &&
-        this.tags.toSet == that.tags.toSet
-        this.dedicatedNamespaces.toSet == that.dedicatedNamespaces.toSet
-      }
-      case _ => false
-    }
-
-  override def hashCode: Int = {

Review comment:
       @ningyougang 
   IIRC, we introduced this not to store data to ETCD if the data is the same as the previous one.
   Can we make sure this requirement with the default comparison functionality provided by the case class?
   
   I am not quite sure but we were using this class without these methods but introduced them at some point for some reason.
   
   Could you confirm once?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] codecov-io edited a comment on pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#issuecomment-773863520


   # [Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=h1) Report
   > Merging [#5061](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=desc) (c90f829) into [master](https://codecov.io/gh/apache/openwhisk/commit/4a13303fae4d9750da6662bb39b3ec92d6ccf56c?el=desc) (4a13303) will **decrease** coverage by `6.76%`.
   > The diff coverage is `73.45%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/openwhisk/pull/5061/graphs/tree.svg?width=650&height=150&src=pr&token=l0YmsiSAso)](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #5061      +/-   ##
   ==========================================
   - Coverage   81.95%   75.18%   -6.77%     
   ==========================================
     Files         210      213       +3     
     Lines       10167    10376     +209     
     Branches      440      433       -7     
   ==========================================
   - Hits         8332     7801     -531     
   - Misses       1835     2575     +740     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...ntainerpool/v2/FunctionPullingContainerProxy.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9pbnZva2VyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29udGFpbmVycG9vbC92Mi9GdW5jdGlvblB1bGxpbmdDb250YWluZXJQcm94eS5zY2FsYQ==) | `0.00% <0.00%> (ø)` | |
   | [...la/org/apache/openwhisk/common/TransactionId.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvbW1vbi9UcmFuc2FjdGlvbklkLnNjYWxh) | `94.11% <50.00%> (-1.07%)` | :arrow_down: |
   | [.../core/containerpool/v2/InvokerHealthyManager.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9pbnZva2VyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29udGFpbmVycG9vbC92Mi9JbnZva2VySGVhbHRoeU1hbmFnZXIuc2NhbGE=) | `75.37% <75.37%> (ø)` | |
   | [.../org/apache/openwhisk/core/connector/Message.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29ubmVjdG9yL01lc3NhZ2Uuc2NhbGE=) | `79.71% <80.00%> (+0.02%)` | :arrow_up: |
   | [.../org/apache/openwhisk/core/entity/InstanceId.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZW50aXR5L0luc3RhbmNlSWQuc2NhbGE=) | `90.90% <90.00%> (-0.32%)` | :arrow_down: |
   | [...core/database/cosmosdb/RxObservableImplicits.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZGF0YWJhc2UvY29zbW9zZGIvUnhPYnNlcnZhYmxlSW1wbGljaXRzLnNjYWxh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ore/database/cosmosdb/cache/CacheInvalidator.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9jb3Ntb3NkYi9jYWNoZS1pbnZhbGlkYXRvci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL29wZW53aGlzay9jb3JlL2RhdGFiYXNlL2Nvc21vc2RiL2NhY2hlL0NhY2hlSW52YWxpZGF0b3Iuc2NhbGE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...e/database/cosmosdb/cache/ChangeFeedConsumer.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9jb3Ntb3NkYi9jYWNoZS1pbnZhbGlkYXRvci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL29wZW53aGlzay9jb3JlL2RhdGFiYXNlL2Nvc21vc2RiL2NhY2hlL0NoYW5nZUZlZWRDb25zdW1lci5zY2FsYQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...core/database/cosmosdb/CosmosDBArtifactStore.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZGF0YWJhc2UvY29zbW9zZGIvQ29zbW9zREJBcnRpZmFjdFN0b3JlLnNjYWxh) | `0.00% <0.00%> (-95.85%)` | :arrow_down: |
   | [...sk/core/database/cosmosdb/CosmosDBViewMapper.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZGF0YWJhc2UvY29zbW9zZGIvQ29zbW9zREJWaWV3TWFwcGVyLnNjYWxh) | `0.00% <0.00%> (-93.90%)` | :arrow_down: |
   | ... and [18 more](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=footer). Last update [4a13303...c90f829](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] codecov-io edited a comment on pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#issuecomment-773863520


   # [Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=h1) Report
   > Merging [#5061](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=desc) (c90f829) into [master](https://codecov.io/gh/apache/openwhisk/commit/4a13303fae4d9750da6662bb39b3ec92d6ccf56c?el=desc) (4a13303) will **decrease** coverage by `7.63%`.
   > The diff coverage is `73.45%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/openwhisk/pull/5061/graphs/tree.svg?width=650&height=150&src=pr&token=l0YmsiSAso)](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #5061      +/-   ##
   ==========================================
   - Coverage   81.95%   74.31%   -7.64%     
   ==========================================
     Files         210      213       +3     
     Lines       10167    10376     +209     
     Branches      440      433       -7     
   ==========================================
   - Hits         8332     7711     -621     
   - Misses       1835     2665     +830     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...ntainerpool/v2/FunctionPullingContainerProxy.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9pbnZva2VyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29udGFpbmVycG9vbC92Mi9GdW5jdGlvblB1bGxpbmdDb250YWluZXJQcm94eS5zY2FsYQ==) | `0.00% <0.00%> (ø)` | |
   | [...la/org/apache/openwhisk/common/TransactionId.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvbW1vbi9UcmFuc2FjdGlvbklkLnNjYWxh) | `94.11% <50.00%> (-1.07%)` | :arrow_down: |
   | [.../core/containerpool/v2/InvokerHealthyManager.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9pbnZva2VyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29udGFpbmVycG9vbC92Mi9JbnZva2VySGVhbHRoeU1hbmFnZXIuc2NhbGE=) | `75.37% <75.37%> (ø)` | |
   | [.../org/apache/openwhisk/core/connector/Message.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29ubmVjdG9yL01lc3NhZ2Uuc2NhbGE=) | `78.98% <80.00%> (-0.71%)` | :arrow_down: |
   | [.../org/apache/openwhisk/core/entity/InstanceId.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZW50aXR5L0luc3RhbmNlSWQuc2NhbGE=) | `90.90% <90.00%> (-0.32%)` | :arrow_down: |
   | [...core/database/cosmosdb/RxObservableImplicits.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZGF0YWJhc2UvY29zbW9zZGIvUnhPYnNlcnZhYmxlSW1wbGljaXRzLnNjYWxh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ore/database/cosmosdb/cache/CacheInvalidator.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9jb3Ntb3NkYi9jYWNoZS1pbnZhbGlkYXRvci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL29wZW53aGlzay9jb3JlL2RhdGFiYXNlL2Nvc21vc2RiL2NhY2hlL0NhY2hlSW52YWxpZGF0b3Iuc2NhbGE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...e/database/cosmosdb/cache/ChangeFeedConsumer.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9jb3Ntb3NkYi9jYWNoZS1pbnZhbGlkYXRvci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL29wZW53aGlzay9jb3JlL2RhdGFiYXNlL2Nvc21vc2RiL2NhY2hlL0NoYW5nZUZlZWRDb25zdW1lci5zY2FsYQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...core/database/cosmosdb/CosmosDBArtifactStore.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZGF0YWJhc2UvY29zbW9zZGIvQ29zbW9zREJBcnRpZmFjdFN0b3JlLnNjYWxh) | `0.00% <0.00%> (-95.85%)` | :arrow_down: |
   | [.../openwhisk/core/scheduler/FPCSchedulerServer.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9zY2hlZHVsZXIvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9vcGVud2hpc2svY29yZS9zY2hlZHVsZXIvRlBDU2NoZWR1bGVyU2VydmVyLnNjYWxh) | `0.00% <0.00%> (-95.84%)` | :arrow_down: |
   | ... and [26 more](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=footer). Last update [4a13303...c90f829](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang closed pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang closed pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r572519649



##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthyManager.scala
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
+import org.apache.openwhisk.core.entitlement.Privilege
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.types.EntityStore
+import org.apache.openwhisk.core.entity.{ActivationResponse => _, _}
+import org.apache.openwhisk.core.etcd.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+class InvokerHealthManager(instanceId: InvokerInstanceId,
+                           healthContainerProxyFactory: (ActorRefFactory, ActorRef) => ActorRef,
+                           dataManagementService: ActorRef,
+                           entityStore: ArtifactStore[WhiskEntity])(implicit actorSystem: ActorSystem, logging: Logging)
+    extends FSM[InvokerState, InvokerHealthData]
+    with Stash {
+
+  implicit val requestTimeout = Timeout(5.seconds)
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  implicit val transid: TransactionId = TransactionId.invokerHealth
+
+  private[containerpool] var healthActionProxy: Option[ActorRef] = None
+
+  startWith(
+    Offline,
+    InvokerInfo(
+      new RingBuffer[Boolean](InvokerHealthManager.bufferSize),
+      memory = MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+
+  when(Offline) {

Review comment:
       Already cleaned up.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
style95 commented on pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#issuecomment-803790696


   I suppose Brendan already approved this so we can merge this and move forward?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r570695768



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -440,34 +440,6 @@ case class InvokerResourceMessage(status: String,
    * Serializes message to string. Must be idempotent.
    */
   override def serialize: String = InvokerResourceMessage.serdes.write(this).compactPrint
-
-  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
-
-  override def equals(that: Any): Boolean =
-    that match {
-      case that: InvokerResourceMessage => {
-        that.canEqual(this) &&
-        this.status == that.status &&
-        this.freeMemory == that.freeMemory &&
-        this.busyMemory == that.busyMemory &&
-        this.inProgressMemory == that.inProgressMemory &&
-        this.tags.toSet == that.tags.toSet
-        this.dedicatedNamespaces.toSet == that.dedicatedNamespaces.toSet
-      }
-      case _ => false
-    }
-
-  override def hashCode: Int = {

Review comment:
       After removed InvokerResourceMessage's hashCode and equals, i tested below scenes
   
   for two InvokerResourceMessage object,
   * If all field value is same, object1.equals(object) will be `true`, 
   * if any one filed value is different,  object1.equals(object) will be `false` 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] codecov-io edited a comment on pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#issuecomment-773863520


   # [Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=h1) Report
   > Merging [#5061](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=desc) (2dde4f3) into [master](https://codecov.io/gh/apache/openwhisk/commit/ecb15098caded058ddb6976c630f5b6dcd656177?el=desc) (ecb1509) will **decrease** coverage by `30.18%`.
   > The diff coverage is `0.61%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/openwhisk/pull/5061/graphs/tree.svg?width=650&height=150&src=pr&token=l0YmsiSAso)](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #5061       +/-   ##
   ==========================================
   - Coverage   35.93%   5.74%   -30.19%     
   ==========================================
     Files         207     216        +9     
     Lines       10183   10609      +426     
     Branches      467     453       -14     
   ==========================================
   - Hits         3659     610     -3049     
   - Misses       6524    9999     +3475     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [.../org/apache/openwhisk/core/connector/Message.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29ubmVjdG9yL01lc3NhZ2Uuc2NhbGE=) | `21.73% <0.00%> (-20.45%)` | :arrow_down: |
   | [.../org/apache/openwhisk/core/entity/InstanceId.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZW50aXR5L0luc3RhbmNlSWQuc2NhbGE=) | `0.00% <0.00%> (-80.71%)` | :arrow_down: |
   | [...ntainerpool/v2/FunctionPullingContainerProxy.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9pbnZva2VyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29udGFpbmVycG9vbC92Mi9GdW5jdGlvblB1bGxpbmdDb250YWluZXJQcm94eS5zY2FsYQ==) | `0.00% <0.00%> (ø)` | |
   | [...k/core/containerpool/v2/InvokerHealthManager.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9pbnZva2VyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29udGFpbmVycG9vbC92Mi9JbnZva2VySGVhbHRoTWFuYWdlci5zY2FsYQ==) | `0.00% <0.00%> (ø)` | |
   | [...la/org/apache/openwhisk/common/TransactionId.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvbW1vbi9UcmFuc2FjdGlvbklkLnNjYWxh) | `32.94% <50.00%> (-59.83%)` | :arrow_down: |
   | [...ain/scala/org/apache/openwhisk/spi/SpiLoader.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL3NwaS9TcGlMb2FkZXIuc2NhbGE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...scala/org/apache/openwhisk/core/FeatureFlags.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvRmVhdHVyZUZsYWdzLnNjYWxh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...la/org/apache/openwhisk/http/BasicRasService.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2h0dHAvQmFzaWNSYXNTZXJ2aWNlLnNjYWxh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...pache/openwhisk/http/LenientSprayJsonSupport.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2h0dHAvTGVuaWVudFNwcmF5SnNvblN1cHBvcnQuc2NhbGE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ache/openwhisk/core/database/DocumentFactory.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZGF0YWJhc2UvRG9jdW1lbnRGYWN0b3J5LnNjYWxh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [111 more](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=footer). Last update [ecb1509...2dde4f3](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r570695768



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -440,34 +440,6 @@ case class InvokerResourceMessage(status: String,
    * Serializes message to string. Must be idempotent.
    */
   override def serialize: String = InvokerResourceMessage.serdes.write(this).compactPrint
-
-  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
-
-  override def equals(that: Any): Boolean =
-    that match {
-      case that: InvokerResourceMessage => {
-        that.canEqual(this) &&
-        this.status == that.status &&
-        this.freeMemory == that.freeMemory &&
-        this.busyMemory == that.busyMemory &&
-        this.inProgressMemory == that.inProgressMemory &&
-        this.tags.toSet == that.tags.toSet
-        this.dedicatedNamespaces.toSet == that.dedicatedNamespaces.toSet
-      }
-      case _ => false
-    }
-
-  override def hashCode: Int = {

Review comment:
       After removed InvokerResourceMessage's hashCode and equals, i tested below scenes
   
   for two InvokerResourceMessage object,
   * If all field value is same, object1.equals(object) will be `true`, 
   * if any one filed value is different,  object1.equals(object) will be `false`
   
   For why added the override hashCode and equals.
   Seems have no specifal reason 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r572519985



##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthyManager.scala
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
+import org.apache.openwhisk.core.entitlement.Privilege
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.types.EntityStore
+import org.apache.openwhisk.core.entity.{ActivationResponse => _, _}
+import org.apache.openwhisk.core.etcd.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+class InvokerHealthManager(instanceId: InvokerInstanceId,
+                           healthContainerProxyFactory: (ActorRefFactory, ActorRef) => ActorRef,
+                           dataManagementService: ActorRef,
+                           entityStore: ArtifactStore[WhiskEntity])(implicit actorSystem: ActorSystem, logging: Logging)
+    extends FSM[InvokerState, InvokerHealthData]
+    with Stash {
+
+  implicit val requestTimeout = Timeout(5.seconds)
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  implicit val transid: TransactionId = TransactionId.invokerHealth
+
+  private[containerpool] var healthActionProxy: Option[ActorRef] = None
+
+  startWith(
+    Offline,
+    InvokerInfo(
+      new RingBuffer[Boolean](InvokerHealthManager.bufferSize),
+      memory = MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+
+  when(Offline) {
+    case Event(GracefulShutdown, _: InvokerInfo) =>
+      logging.warn(this, "Received a graceful shutdown flag, stopping the invoker.")
+      stay
+
+    case Event(Enable, _) =>
+      InvokerHealthManager.prepare(entityStore, instanceId).map { _ =>
+        startTestAction(self)
+      }
+
+      goto(Unhealthy)
+  }
+
+  when(Unhealthy) {
+    case Event(ContainerRemoved, _) =>
+      healthActionProxy = None
+      startTestAction(self)
+
+      stay
+
+    case Event(msg: FailureMessage, _) =>
+      logging.error(this, s"invoker${instanceId}, status:${stateName} got a failure message: ${msg}")
+
+      stay
+
+    case Event(ContainerCreationFailed(_), _) =>
+      stay
+  }
+
+  when(Healthy) {
+    case Event(msg: FailureMessage, _) =>
+      logging.error(this, s"invoker${instanceId}, status:${stateName} got a failure message: ${msg}")
+      goto(Unhealthy)
+  }
+
+  whenUnhandled {
+    case Event(_: Initialized, _) =>
+      // Initialized messages sent by ContainerProxy for HealthManger
+      stay()
+    case Event(ContainerRemoved, _) =>
+      // Drop messages sent by ContainerProxy for HealthManger
+      healthActionProxy = None
+      stay()
+
+    case Event(GracefulShutdown, _) =>
+      self ! GracefulShutdown
+      goto(Offline)
+
+    case Event(healthMsg: HealthMessage, data: InvokerInfo) =>
+      if (stateName != Offline) {
+        handleHealthMessage(healthMsg.state, data.buffer)
+      } else {
+        stay
+      }
+
+    case Event(memoryInfo: MemoryInfo, data: InvokerInfo) =>
+      publishHealthStatusAndStay(stateName, data.copy(memory = memoryInfo))
+
+    // in case of StatusRuntimeException: NOT_FOUND: etcdserver: requested lease not found, we need to get the lease again.
+    case Event(t: FailureMessage, _) =>
+      logging.error(this, s"Failure happens, restart InvokerHealthManager: ${t}")
+
+      goto(Offline)
+
+  }
+
+  // It is important to note that stateName and the stateData in onTransition callback refer to the previous one.
+  // We should access to the next data with nextStateData
+  onTransition {
+    case Offline -> Unhealthy =>
+      publishHealthStatusAndStay(Unhealthy, nextStateData)
+
+    case Healthy -> Unhealthy =>
+      unstashAll()
+      transid.mark(
+        this,
+        LoggingMarkers.LOADBALANCER_INVOKER_STATUS_CHANGE(Unhealthy.asString),
+        s"invoker${instanceId.toInt} is unhealthy",
+        akka.event.Logging.WarningLevel)
+      startTestAction(self)
+      publishHealthStatusAndStay(Unhealthy, nextStateData)
+
+    case _ -> Healthy =>
+      logging.info(this, s"invoker became healthy, stop health action proxy.")
+      unstashAll()
+      stopTestAction()
+
+      publishHealthStatusAndStay(Healthy, nextStateData)
+
+    case Offline -> Offline =>
+    // this is an initial transition due to startWith, do nothing
+
+    case _ -> newState =>
+      publishHealthStatusAndStay(newState, nextStateData)
+
+      unstashAll()
+
+  }
+
+  private def publishHealthStatusAndStay(state: InvokerState, stateData: InvokerHealthData) = {
+    stateData match {
+      case data: InvokerInfo =>
+        val invokerResourceMessage = InvokerResourceMessage(
+          state.asString,
+          data.memory.freeMemory,
+          data.memory.busyMemory,
+          data.memory.inProgressMemory,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces)
+        InvokerHealthManager.useMemory = invokerResourceMessage.busyMemory + invokerResourceMessage.inProgressMemory
+        dataManagementService ! UpdateDataOnChange(InvokerKeys.health(instanceId), invokerResourceMessage.serialize)
+
+        stay using data.copy(currentInvokerResource = Some(invokerResourceMessage))
+
+      case data =>
+        logging.error(this, s"unexpected data is found: $data")
+
+        stay
+    }
+  }
+
+  initialize()
+
+  private def startTestAction(manager: ActorRef): Unit = {
+    val namespace = InvokerHealthManager.healthActionIdentity.namespace.name.asString
+    val docId = InvokerHealthManager.healthAction(instanceId).get.docid
+
+    WhiskAction.get(entityStore, docId).onComplete {
+      case Success(action) =>
+        val initialize = Initialize(namespace, action.toExecutableWhiskAction.get, "", 0, transid)
+        startHealthAction(initialize, manager)
+      case Failure(t) => logging.error(this, s"get health action error: ${t.getMessage}")
+    }
+  }
+
+  private def startHealthAction(initialize: Initialize, manager: ActorRef): Unit = {
+    healthActionProxy match {
+      case Some(proxy) =>
+        // make healthContainerProxy's status is Running, then healthContainerProxy can fetch the activation using ActivationServiceClient
+        proxy ! initialize
+      case None =>
+        val proxy = healthContainerProxyFactory(context, manager)
+        proxy ! initialize
+        healthActionProxy = Some(proxy)
+    }
+  }
+
+  def stopTestAction(): Unit = {
+    healthActionProxy.foreach {
+      healthActionProxy = None
+      _ ! GracefulShutdown
+    }
+  }
+
+  /**
+   * This method is to handle health message from ContainerProxy.pub
+   * It can induce status change.
+   *
+   * @param state  activation result state
+   * @param buffer RingBuffer to track status
+   * @return
+   */
+  def handleHealthMessage(state: Boolean, buffer: RingBuffer[Boolean]): State = {

Review comment:
       Currently, i used a simple method to check invoker's health status.
   did't check for system error.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r570695584



##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
##########
@@ -0,0 +1,89 @@
+package org.apache.openwhisk.core.containerpool.v2
+
+import java.time.Instant
+
+import akka.actor.ActorRef
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.containerpool.Container
+import org.apache.openwhisk.core.entity.{ByteSize, CodeExec, DocRevision, ExecutableWhiskAction}
+import org.apache.openwhisk.core.entity.size._
+
+// Events received by the actor
+case class Initialize(invocationNamespace: String,
+                      action: ExecutableWhiskAction,
+                      schedulerHost: String,
+                      rpcPort: Int,
+                      transId: TransactionId)
+case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
+
+// Event sent by the actor
+case class ContainerCreationFailed(throwable: Throwable)
+case class ContainerIsPaused(data: WarmData)
+case class ClientCreationFailed(throwable: Throwable,
+                                container: Container,
+                                invocationNamespace: String,
+                                action: ExecutableWhiskAction)
+case class ReadyToWork(data: Data)
+case class Initialized(data: InitializedData)
+case class Resumed(data: WarmData)
+case class ResumeFailed(data: WarmData)
+case class RecreateClient(action: ExecutableWhiskAction)
+case object ContainerRemoved // when container is destroyed
+
+// States
+sealed trait ProxyState
+case object LeaseStart extends ProxyState
+case object Uninitialized extends ProxyState
+case object ContainerCreating extends ProxyState
+case object ContainerCreated extends ProxyState
+case object ClientCreating extends ProxyState
+case object ClientCreated extends ProxyState
+case object Running extends ProxyState
+case object Pausing extends ProxyState
+case object Paused extends ProxyState
+case object Removing extends ProxyState
+case object Rescheduling extends ProxyState
+
+// Data
+sealed abstract class Data(val memoryLimit: ByteSize) {
+  def getContainer: Option[Container]
+}
+case class NoExistData() extends Data(0.B) {

Review comment:
       Updated accordingly

##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthyManager.scala
##########
@@ -0,0 +1,379 @@
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
+import org.apache.openwhisk.core.entitlement.Privilege
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.types.EntityStore
+import org.apache.openwhisk.core.entity.{ActivationResponse => _, _}
+import org.apache.openwhisk.core.etcd.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+class InvokerHealthManager(instanceId: InvokerInstanceId,
+                           healthContainerProxyFactory: (ActorRefFactory, ActorRef) => ActorRef,
+                           dataManagementService: ActorRef,
+                           entityStore: ArtifactStore[WhiskEntity])(implicit actorSystem: ActorSystem, logging: Logging)
+    extends FSM[InvokerState, InvokerHealthData]
+    with Stash {
+
+  implicit val requestTimeout = Timeout(5.seconds)
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  implicit val transid: TransactionId = TransactionId.invokerHealth
+
+  private[containerpool] var healthActionProxy: Option[ActorRef] = None
+
+  startWith(
+    Offline,
+    InvokerInfo(
+      new RingBuffer[Boolean](InvokerHealthManager.bufferSize),
+      memory = MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+
+  when(Offline) {
+    case Event(GracefulShutdown, _: InvokerInfo) =>
+      logging.warn(this, "Received a graceful shutdown flag, stopping the invoker.")
+      stay
+
+    case Event(Enable, _) =>
+      InvokerHealthManager.prepare(entityStore, instanceId).map { _ =>
+        startTestAction(self)
+      }
+
+      goto(UnHealthy)
+  }
+
+  when(UnHealthy) {

Review comment:
       Updated accordingly




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] codecov-io edited a comment on pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#issuecomment-773863520


   # [Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=h1) Report
   > Merging [#5061](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=desc) (77d68af) into [master](https://codecov.io/gh/apache/openwhisk/commit/4a13303fae4d9750da6662bb39b3ec92d6ccf56c?el=desc) (4a13303) will **decrease** coverage by `53.11%`.
   > The diff coverage is `73.45%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/openwhisk/pull/5061/graphs/tree.svg?width=650&height=150&src=pr&token=l0YmsiSAso)](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff             @@
   ##           master    #5061       +/-   ##
   ===========================================
   - Coverage   81.95%   28.83%   -53.12%     
   ===========================================
     Files         210      205        -5     
     Lines       10167    10063      -104     
     Branches      440      426       -14     
   ===========================================
   - Hits         8332     2902     -5430     
   - Misses       1835     7161     +5326     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...ntainerpool/v2/FunctionPullingContainerProxy.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9pbnZva2VyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29udGFpbmVycG9vbC92Mi9GdW5jdGlvblB1bGxpbmdDb250YWluZXJQcm94eS5zY2FsYQ==) | `0.00% <0.00%> (ø)` | |
   | [...la/org/apache/openwhisk/common/TransactionId.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvbW1vbi9UcmFuc2FjdGlvbklkLnNjYWxh) | `75.29% <50.00%> (-19.89%)` | :arrow_down: |
   | [.../core/containerpool/v2/InvokerHealthyManager.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9pbnZva2VyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29udGFpbmVycG9vbC92Mi9JbnZva2VySGVhbHRoeU1hbmFnZXIuc2NhbGE=) | `75.37% <75.37%> (ø)` | |
   | [.../org/apache/openwhisk/core/connector/Message.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29ubmVjdG9yL01lc3NhZ2Uuc2NhbGE=) | `26.81% <80.00%> (-52.88%)` | :arrow_down: |
   | [.../org/apache/openwhisk/core/entity/InstanceId.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZW50aXR5L0luc3RhbmNlSWQuc2NhbGE=) | `81.81% <90.00%> (-9.41%)` | :arrow_down: |
   | [.../main/scala/org/apache/openwhisk/utils/Retry.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL3V0aWxzL1JldHJ5LnNjYWxh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...n/scala/org/apache/openwhisk/utils/JsHelpers.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL3V0aWxzL0pzSGVscGVycy5zY2FsYQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...a/org/apache/openwhisk/common/ConfigMapValue.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvbW1vbi9Db25maWdNYXBWYWx1ZS5zY2FsYQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...g/apache/openwhisk/common/ResizableSemaphore.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvbW1vbi9SZXNpemFibGVTZW1hcGhvcmUuc2NhbGE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...pache/openwhisk/http/LenientSprayJsonSupport.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2h0dHAvTGVuaWVudFNwcmF5SnNvblN1cHBvcnQuc2NhbGE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [159 more](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=footer). Last update [4a13303...77d68af](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] bdoyle0182 merged pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
bdoyle0182 merged pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r570691563



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],
+                                  dedicatedNamespaces: Seq[String])
+    extends Message {
+
+  /**
+   * Serializes message to string. Must be idempotent.
+   */
+  override def serialize: String = InvokerResourceMessage.serdes.write(this).compactPrint
+
+  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
+
+  override def equals(that: Any): Boolean =

Review comment:
       Ah, yes, i tested in my local, after removed `the override def equals and the override def hashCode`, works well both, e.g.
   for two InvokerResourceMessage object,
   * If all field value is same, object1.equals(object) will be `true`, 
   * if any one filed value is different,  object1.equals(object) will be `false` 

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],
+                                  dedicatedNamespaces: Seq[String])
+    extends Message {
+
+  /**
+   * Serializes message to string. Must be idempotent.
+   */
+  override def serialize: String = InvokerResourceMessage.serdes.write(this).compactPrint
+
+  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
+
+  override def equals(that: Any): Boolean =

Review comment:
       Ah, yes, i tested in my local, after removed `the override def equals and the override def hashCode`, works well both, e.g.
   for two InvokerResourceMessage object,
   * If all field value is same, object1.equals(object) will be `true`, 
   * if any one filed value is different,  object1.equals(object) will be `false` 
   
   I already removed `the override def equals and the override def hashCode` in this pr

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala
##########
@@ -29,11 +29,18 @@ import scala.util.Try
  * @param instance a numeric value used for the load balancing and Kafka topic creation
  * @param uniqueName an identifier required for dynamic instance assignment by Zookeeper
  * @param displayedName an identifier that is required for the health protocol to correlate Kafka topics with invoker container names
+ * @param userMemory invoker user memory
+ * @param busyMemory invoker busy memory
+ * @param tags actions which included specified annotation tags can be run on this invoker
+ * @param dedicatedNamespaces only dedicatedNamespaces's actions can be run on this invoker
  */
 case class InvokerInstanceId(val instance: Int,
                              uniqueName: Option[String] = None,
                              displayedName: Option[String] = None,
-                             val userMemory: ByteSize)
+                             val userMemory: ByteSize,

Review comment:
       Good question.
   
   I tested in my local, doesn't affect the message bus.  This pr's invoker in upsteam master's controller's healthy status is `up`.
   But your said problem i meet before, in that time, it seems the PingMessage is changed, so lead to the invoker of new codes in controller of old code's healthy status is `unhealthy`. if this issue comes. we solved it using another deployment method, e.g.
   * Remove half controller from nginx.
   * Disable half invoker
   * Deploy half controller/invoker using new codes
   * Add the half controller nginx.
   * Deploy another half components using above steps

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],
+                                  dedicatedNamespaces: Seq[String])
+    extends Message {
+
+  /**
+   * Serializes message to string. Must be idempotent.
+   */
+  override def serialize: String = InvokerResourceMessage.serdes.write(this).compactPrint
+
+  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
+
+  override def equals(that: Any): Boolean =
+    that match {
+      case that: InvokerResourceMessage => {
+        that.canEqual(this) &&
+        this.status == that.status &&
+        this.freeMemory == that.freeMemory &&
+        this.busyMemory == that.busyMemory &&
+        this.inProgressMemory == that.inProgressMemory &&
+        this.tags.toSet == that.tags.toSet
+        this.dedicatedNamespaces.toSet == that.dedicatedNamespaces.toSet
+      }
+      case _ => false
+    }
+
+  override def hashCode: Int = {
+    var result = 1;

Review comment:
       I already removed the equals and hashCode.

##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
##########
@@ -0,0 +1,89 @@
+package org.apache.openwhisk.core.containerpool.v2
+
+import java.time.Instant
+
+import akka.actor.ActorRef
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.containerpool.Container
+import org.apache.openwhisk.core.entity.{ByteSize, CodeExec, DocRevision, ExecutableWhiskAction}
+import org.apache.openwhisk.core.entity.size._
+
+// Events received by the actor
+case class Initialize(invocationNamespace: String,
+                      action: ExecutableWhiskAction,
+                      schedulerHost: String,
+                      rpcPort: Int,
+                      transId: TransactionId)
+case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
+
+// Event sent by the actor
+case class ContainerCreationFailed(throwable: Throwable)
+case class ContainerIsPaused(data: WarmData)
+case class ClientCreationFailed(throwable: Throwable,
+                                container: Container,
+                                invocationNamespace: String,
+                                action: ExecutableWhiskAction)
+case class ReadyToWork(data: Data)
+case class Initialized(data: InitializedData)
+case class Resumed(data: WarmData)
+case class ResumeFailed(data: WarmData)
+case class RecreateClient(action: ExecutableWhiskAction)
+case object ContainerRemoved // when container is destroyed
+
+// States
+sealed trait ProxyState
+case object LeaseStart extends ProxyState
+case object Uninitialized extends ProxyState
+case object ContainerCreating extends ProxyState
+case object ContainerCreated extends ProxyState
+case object ClientCreating extends ProxyState
+case object ClientCreated extends ProxyState
+case object Running extends ProxyState
+case object Pausing extends ProxyState
+case object Paused extends ProxyState
+case object Removing extends ProxyState
+case object Rescheduling extends ProxyState
+
+// Data
+sealed abstract class Data(val memoryLimit: ByteSize) {
+  def getContainer: Option[Container]
+}
+case class NoExistData() extends Data(0.B) {

Review comment:
       Updated accordingly

##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthyManager.scala
##########
@@ -0,0 +1,379 @@
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
+import org.apache.openwhisk.core.entitlement.Privilege
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.types.EntityStore
+import org.apache.openwhisk.core.entity.{ActivationResponse => _, _}
+import org.apache.openwhisk.core.etcd.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+class InvokerHealthManager(instanceId: InvokerInstanceId,
+                           healthContainerProxyFactory: (ActorRefFactory, ActorRef) => ActorRef,
+                           dataManagementService: ActorRef,
+                           entityStore: ArtifactStore[WhiskEntity])(implicit actorSystem: ActorSystem, logging: Logging)
+    extends FSM[InvokerState, InvokerHealthData]
+    with Stash {
+
+  implicit val requestTimeout = Timeout(5.seconds)
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  implicit val transid: TransactionId = TransactionId.invokerHealth
+
+  private[containerpool] var healthActionProxy: Option[ActorRef] = None
+
+  startWith(
+    Offline,
+    InvokerInfo(
+      new RingBuffer[Boolean](InvokerHealthManager.bufferSize),
+      memory = MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+
+  when(Offline) {
+    case Event(GracefulShutdown, _: InvokerInfo) =>
+      logging.warn(this, "Received a graceful shutdown flag, stopping the invoker.")
+      stay
+
+    case Event(Enable, _) =>
+      InvokerHealthManager.prepare(entityStore, instanceId).map { _ =>
+        startTestAction(self)
+      }
+
+      goto(UnHealthy)
+  }
+
+  when(UnHealthy) {

Review comment:
       Updated accordingly

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -440,34 +440,6 @@ case class InvokerResourceMessage(status: String,
    * Serializes message to string. Must be idempotent.
    */
   override def serialize: String = InvokerResourceMessage.serdes.write(this).compactPrint
-
-  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
-
-  override def equals(that: Any): Boolean =
-    that match {
-      case that: InvokerResourceMessage => {
-        that.canEqual(this) &&
-        this.status == that.status &&
-        this.freeMemory == that.freeMemory &&
-        this.busyMemory == that.busyMemory &&
-        this.inProgressMemory == that.inProgressMemory &&
-        this.tags.toSet == that.tags.toSet
-        this.dedicatedNamespaces.toSet == that.dedicatedNamespaces.toSet
-      }
-      case _ => false
-    }
-
-  override def hashCode: Int = {

Review comment:
       Ok, i will check

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -440,34 +440,6 @@ case class InvokerResourceMessage(status: String,
    * Serializes message to string. Must be idempotent.
    */
   override def serialize: String = InvokerResourceMessage.serdes.write(this).compactPrint
-
-  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
-
-  override def equals(that: Any): Boolean =
-    that match {
-      case that: InvokerResourceMessage => {
-        that.canEqual(this) &&
-        this.status == that.status &&
-        this.freeMemory == that.freeMemory &&
-        this.busyMemory == that.busyMemory &&
-        this.inProgressMemory == that.inProgressMemory &&
-        this.tags.toSet == that.tags.toSet
-        this.dedicatedNamespaces.toSet == that.dedicatedNamespaces.toSet
-      }
-      case _ => false
-    }
-
-  override def hashCode: Int = {

Review comment:
       After removed InvokerResourceMessage's hashCode and equals, i tested below scenes
   
   for two InvokerResourceMessage object,
   * If all field value is same, object1.equals(object) will be `true`, 
   * if any one filed value is different,  object1.equals(object) will be `false` 

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -440,34 +440,6 @@ case class InvokerResourceMessage(status: String,
    * Serializes message to string. Must be idempotent.
    */
   override def serialize: String = InvokerResourceMessage.serdes.write(this).compactPrint
-
-  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
-
-  override def equals(that: Any): Boolean =
-    that match {
-      case that: InvokerResourceMessage => {
-        that.canEqual(this) &&
-        this.status == that.status &&
-        this.freeMemory == that.freeMemory &&
-        this.busyMemory == that.busyMemory &&
-        this.inProgressMemory == that.inProgressMemory &&
-        this.tags.toSet == that.tags.toSet
-        this.dedicatedNamespaces.toSet == that.dedicatedNamespaces.toSet
-      }
-      case _ => false
-    }
-
-  override def hashCode: Int = {

Review comment:
       After removed InvokerResourceMessage's hashCode and equals, i tested below scenes
   
   for two InvokerResourceMessage object,
   * If all field value is same, object1.equals(object) will be `true`, 
   * if any one filed value is different,  object1.equals(object) will be `false`
   
   For why added the override hashCode and equals.
   Seems have no specifal reason 

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -440,34 +440,6 @@ case class InvokerResourceMessage(status: String,
    * Serializes message to string. Must be idempotent.
    */
   override def serialize: String = InvokerResourceMessage.serdes.write(this).compactPrint
-
-  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
-
-  override def equals(that: Any): Boolean =
-    that match {
-      case that: InvokerResourceMessage => {
-        that.canEqual(this) &&
-        this.status == that.status &&
-        this.freeMemory == that.freeMemory &&
-        this.busyMemory == that.busyMemory &&
-        this.inProgressMemory == that.inProgressMemory &&
-        this.tags.toSet == that.tags.toSet
-        this.dedicatedNamespaces.toSet == that.dedicatedNamespaces.toSet
-      }
-      case _ => false
-    }
-
-  override def hashCode: Int = {

Review comment:
       After removed InvokerResourceMessage's hashCode and equals, i tested below scenes
   
   for two InvokerResourceMessage object,
   * If all field value is same, object1.equals(object) will be `true`, 
   * if any one filed value is different,  object1.equals(object) will be `false`
   
   Regarding `why added the override hashCode and equals in our code`
   Seems have no specifal reason 

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],

Review comment:
       This is used for some special actions. e.g. need high memory, need power gpu.
   We can make the action's invocations run on corresponding invokers
   
   * Firstly, add some annotation to the actions, e.g.
   
   ```
   wsk action create hello-gpu ~/hello-gpu.js --annotation  invoker-resources ["gpu"]
   ```
   * Secondly, when deploy invoker, add the relative tag to the invoker as well
   ```
   whisk/invokers/0/0
   {"busyMemory":0,"dedicatedNamespaces":[],"freeMemory":10240,"inProgressMemory":0,"status":"up","tags":['gpu']}
   whisk/invokers/1/1
   {"busyMemory":0,"dedicatedNamespaces":[],"freeMemory":10240,"inProgressMemory":0,"status":"up","tags":[]}
   ```
   * Finally, when run the action, the activations for that action will run on above invoker0 which includes tag: gpu

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],

Review comment:
       BTW, dedicatedNamespaces means dedicatedNamespaces's all actions ran on corresponding invoker.

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],

Review comment:
       This is used for some special actions. e.g. need high memory, need power gpu.
   We can make the action's invocations run on corresponding invokers (BTW, this requirement comes from user in our company)
   
   * Firstly, add some annotation to the actions, e.g.
   
   ```
   wsk action create hello-gpu ~/hello-gpu.js --annotation  invoker-resources ["gpu"]
   ```
   * Secondly, when deploy invoker, add the relative tag to the invoker as well
   ```
   whisk/invokers/0/0
   {"busyMemory":0,"dedicatedNamespaces":[],"freeMemory":10240,"inProgressMemory":0,"status":"up","tags":['gpu']}
   whisk/invokers/1/1
   {"busyMemory":0,"dedicatedNamespaces":[],"freeMemory":10240,"inProgressMemory":0,"status":"up","tags":[]}
   ```
   * Finally, when run the action, the activations for that action will run on above invoker0 which includes tag: gpu

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],

Review comment:
       BTW, dedicatedNamespaces means dedicatedNamespaces's all actions run on corresponding invoker.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r570714082



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],

Review comment:
       BTW, dedicatedNamespaces means dedicatedNamespaces's all actions ran on corresponding invoker.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] codecov-io commented on pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#issuecomment-773863520


   # [Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=h1) Report
   > Merging [#5061](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=desc) (1ecd1c0) into [master](https://codecov.io/gh/apache/openwhisk/commit/1753946ac16b91b2d2a3fc55ab215b14e71c2b39?el=desc) (1753946) will **decrease** coverage by `54.11%`.
   > The diff coverage is `5.91%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/openwhisk/pull/5061/graphs/tree.svg?width=650&height=150&src=pr&token=l0YmsiSAso)](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff             @@
   ##           master    #5061       +/-   ##
   ===========================================
   - Coverage   82.80%   28.69%   -54.12%     
   ===========================================
     Files         207      203        -4     
     Lines       10034     9937       -97     
     Branches      444      435        -9     
   ===========================================
   - Hits         8309     2851     -5458     
   - Misses       1725     7086     +5361     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [.../org/apache/openwhisk/core/connector/Message.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29ubmVjdG9yL01lc3NhZ2Uuc2NhbGE=) | `28.98% <0.00%> (-50.71%)` | :arrow_down: |
   | [...ala/org/apache/openwhisk/core/etcd/EtcdUtils.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZXRjZC9FdGNkVXRpbHMuc2NhbGE=) | `0.00% <0.00%> (ø)` | |
   | [...ntainerpool/v2/FunctionPullingContainerProxy.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9pbnZva2VyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29udGFpbmVycG9vbC92Mi9GdW5jdGlvblB1bGxpbmdDb250YWluZXJQcm94eS5zY2FsYQ==) | `0.00% <0.00%> (ø)` | |
   | [.../core/containerpool/v2/InvokerHealthyManager.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9pbnZva2VyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29udGFpbmVycG9vbC92Mi9JbnZva2VySGVhbHRoeU1hbmFnZXIuc2NhbGE=) | `0.00% <0.00%> (ø)` | |
   | [...la/org/apache/openwhisk/common/TransactionId.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvbW1vbi9UcmFuc2FjdGlvbklkLnNjYWxh) | `82.35% <50.00%> (-12.83%)` | :arrow_down: |
   | [.../org/apache/openwhisk/core/entity/InstanceId.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZW50aXR5L0luc3RhbmNlSWQuc2NhbGE=) | `72.72% <90.00%> (-18.51%)` | :arrow_down: |
   | [...n/scala/org/apache/openwhisk/utils/JsHelpers.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL3V0aWxzL0pzSGVscGVycy5zY2FsYQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...a/org/apache/openwhisk/common/ConfigMapValue.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvbW1vbi9Db25maWdNYXBWYWx1ZS5zY2FsYQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...g/apache/openwhisk/common/ResizableSemaphore.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvbW1vbi9SZXNpemFibGVTZW1hcGhvcmUuc2NhbGE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...pache/openwhisk/http/LenientSprayJsonSupport.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2h0dHAvTGVuaWVudFNwcmF5SnNvblN1cHBvcnQuc2NhbGE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [164 more](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=footer). Last update [1753946...1ecd1c0](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] bdoyle0182 commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
bdoyle0182 commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r570470462



##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthyManager.scala
##########
@@ -0,0 +1,379 @@
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
+import org.apache.openwhisk.core.entitlement.Privilege
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.types.EntityStore
+import org.apache.openwhisk.core.entity.{ActivationResponse => _, _}
+import org.apache.openwhisk.core.etcd.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+class InvokerHealthManager(instanceId: InvokerInstanceId,
+                           healthContainerProxyFactory: (ActorRefFactory, ActorRef) => ActorRef,
+                           dataManagementService: ActorRef,
+                           entityStore: ArtifactStore[WhiskEntity])(implicit actorSystem: ActorSystem, logging: Logging)
+    extends FSM[InvokerState, InvokerHealthData]
+    with Stash {
+
+  implicit val requestTimeout = Timeout(5.seconds)
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  implicit val transid: TransactionId = TransactionId.invokerHealth
+
+  private[containerpool] var healthActionProxy: Option[ActorRef] = None
+
+  startWith(
+    Offline,
+    InvokerInfo(
+      new RingBuffer[Boolean](InvokerHealthManager.bufferSize),
+      memory = MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+
+  when(Offline) {
+    case Event(GracefulShutdown, _: InvokerInfo) =>
+      logging.warn(this, "Received a graceful shutdown flag, stopping the invoker.")
+      stay
+
+    case Event(Enable, _) =>
+      InvokerHealthManager.prepare(entityStore, instanceId).map { _ =>
+        startTestAction(self)
+      }
+
+      goto(UnHealthy)
+  }
+
+  when(UnHealthy) {

Review comment:
       ```suggestion
     when(Unhealthy) {
   ```

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],

Review comment:
       What are tags used for?

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala
##########
@@ -29,11 +29,18 @@ import scala.util.Try
  * @param instance a numeric value used for the load balancing and Kafka topic creation
  * @param uniqueName an identifier required for dynamic instance assignment by Zookeeper
  * @param displayedName an identifier that is required for the health protocol to correlate Kafka topics with invoker container names
+ * @param userMemory invoker user memory
+ * @param busyMemory invoker busy memory
+ * @param tags actions which included specified annotation tags can be run on this invoker
+ * @param dedicatedNamespaces only dedicatedNamespaces's actions can be run on this invoker
  */
 case class InvokerInstanceId(val instance: Int,
                              uniqueName: Option[String] = None,
                              displayedName: Option[String] = None,
-                             val userMemory: ByteSize)
+                             val userMemory: ByteSize,

Review comment:
       Is this going to affect the message bus? As always I have to check how this will affect rolling restarts of the controllers and invokers, will one component be unhealthy while the other is upgraded.

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],
+                                  dedicatedNamespaces: Seq[String])
+    extends Message {
+
+  /**
+   * Serializes message to string. Must be idempotent.
+   */
+  override def serialize: String = InvokerResourceMessage.serdes.write(this).compactPrint
+
+  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
+
+  override def equals(that: Any): Boolean =
+    that match {
+      case that: InvokerResourceMessage => {
+        that.canEqual(this) &&
+        this.status == that.status &&
+        this.freeMemory == that.freeMemory &&
+        this.busyMemory == that.busyMemory &&
+        this.inProgressMemory == that.inProgressMemory &&
+        this.tags.toSet == that.tags.toSet
+        this.dedicatedNamespaces.toSet == that.dedicatedNamespaces.toSet
+      }
+      case _ => false
+    }
+
+  override def hashCode: Int = {
+    var result = 1;

Review comment:
       nit: you can map over each one to make this functional rather than use a var. i.e.
   
   `1.map(start => prime * start + status.hashCode()).map(next => prime * next + freeMemory.hashCode())...`

##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
##########
@@ -0,0 +1,89 @@
+package org.apache.openwhisk.core.containerpool.v2
+
+import java.time.Instant
+
+import akka.actor.ActorRef
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.containerpool.Container
+import org.apache.openwhisk.core.entity.{ByteSize, CodeExec, DocRevision, ExecutableWhiskAction}
+import org.apache.openwhisk.core.entity.size._
+
+// Events received by the actor
+case class Initialize(invocationNamespace: String,
+                      action: ExecutableWhiskAction,
+                      schedulerHost: String,
+                      rpcPort: Int,
+                      transId: TransactionId)
+case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
+
+// Event sent by the actor
+case class ContainerCreationFailed(throwable: Throwable)
+case class ContainerIsPaused(data: WarmData)
+case class ClientCreationFailed(throwable: Throwable,
+                                container: Container,
+                                invocationNamespace: String,
+                                action: ExecutableWhiskAction)
+case class ReadyToWork(data: Data)
+case class Initialized(data: InitializedData)
+case class Resumed(data: WarmData)
+case class ResumeFailed(data: WarmData)
+case class RecreateClient(action: ExecutableWhiskAction)
+case object ContainerRemoved // when container is destroyed
+
+// States
+sealed trait ProxyState
+case object LeaseStart extends ProxyState
+case object Uninitialized extends ProxyState
+case object ContainerCreating extends ProxyState
+case object ContainerCreated extends ProxyState
+case object ClientCreating extends ProxyState
+case object ClientCreated extends ProxyState
+case object Running extends ProxyState
+case object Pausing extends ProxyState
+case object Paused extends ProxyState
+case object Removing extends ProxyState
+case object Rescheduling extends ProxyState
+
+// Data
+sealed abstract class Data(val memoryLimit: ByteSize) {
+  def getContainer: Option[Container]
+}
+case class NoExistData() extends Data(0.B) {

Review comment:
       ```suggestion
   case class NonexistentData() extends Data(0.B) {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r597423259



##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthyManager.scala
##########
@@ -0,0 +1,384 @@
+/*

Review comment:
       Already changed the file name

##########
File path: tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/InvokerHealthyManagerTests.scala
##########
@@ -0,0 +1,452 @@
+/*

Review comment:
       Already changed the file name




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r572518297



##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthyManager.scala
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
+import org.apache.openwhisk.core.entitlement.Privilege
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.types.EntityStore
+import org.apache.openwhisk.core.entity.{ActivationResponse => _, _}
+import org.apache.openwhisk.core.etcd.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+class InvokerHealthManager(instanceId: InvokerInstanceId,
+                           healthContainerProxyFactory: (ActorRefFactory, ActorRef) => ActorRef,
+                           dataManagementService: ActorRef,
+                           entityStore: ArtifactStore[WhiskEntity])(implicit actorSystem: ActorSystem, logging: Logging)
+    extends FSM[InvokerState, InvokerHealthData]
+    with Stash {
+
+  implicit val requestTimeout = Timeout(5.seconds)
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  implicit val transid: TransactionId = TransactionId.invokerHealth
+
+  private[containerpool] var healthActionProxy: Option[ActorRef] = None
+
+  startWith(
+    Offline,
+    InvokerInfo(
+      new RingBuffer[Boolean](InvokerHealthManager.bufferSize),
+      memory = MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+
+  when(Offline) {

Review comment:
       cleaned up




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r597423151



##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthyManager.scala
##########
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
+import org.apache.openwhisk.core.entitlement.Privilege
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.types.EntityStore
+import org.apache.openwhisk.core.entity.{ActivationResponse => _, _}
+import org.apache.openwhisk.core.etcd.EtcdKV.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+class InvokerHealthManager(instanceId: InvokerInstanceId,
+                           healthContainerProxyFactory: (ActorRefFactory, ActorRef) => ActorRef,
+                           dataManagementService: ActorRef,
+                           entityStore: ArtifactStore[WhiskEntity])(implicit actorSystem: ActorSystem, logging: Logging)
+    extends FSM[InvokerState, InvokerHealthData]
+    with Stash {
+
+  implicit val requestTimeout = Timeout(5.seconds)
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  implicit val transid: TransactionId = TransactionId.invokerHealth
+
+  private[containerpool] var healthActionProxy: Option[ActorRef] = None
+
+  startWith(
+    Offline,
+    InvokerInfo(
+      new RingBuffer[Boolean](InvokerHealthManager.bufferSize),
+      memory = MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+
+  when(Offline) {
+    case Event(GracefulShutdown, _: InvokerInfo) =>
+      logging.warn(this, "Received a graceful shutdown flag, stopping the invoker.")
+      stay
+
+    case Event(Enable, _) =>
+      InvokerHealthManager.prepare(entityStore, instanceId).map { _ =>
+        startTestAction(self)
+      }
+      goto(Unhealthy)
+  }
+
+  when(Unhealthy) {
+    case Event(ContainerRemoved, _) =>
+      healthActionProxy = None
+      startTestAction(self)
+      stay
+
+    case Event(msg: FailureMessage, _) =>
+      logging.error(this, s"invoker${instanceId}, status:${stateName} got a failure message: ${msg}")
+      stay
+
+    case Event(ContainerCreationFailed(_), _) =>
+      stay
+  }
+
+  when(Healthy) {
+    case Event(msg: FailureMessage, _) =>
+      logging.error(this, s"invoker${instanceId}, status:${stateName} got a failure message: ${msg}")
+      goto(Unhealthy)
+  }
+
+  whenUnhandled {
+    case Event(_: Initialized, _) =>
+      // Initialized messages sent by ContainerProxy for HealthManger
+      stay()
+
+    case Event(ContainerRemoved, _) =>
+      // Drop messages sent by ContainerProxy for HealthManger
+      healthActionProxy = None
+      stay()
+
+    case Event(GracefulShutdown, _) =>
+      self ! GracefulShutdown
+      goto(Offline)
+
+    case Event(healthMsg: HealthMessage, data: InvokerInfo) =>
+      if (stateName != Offline) {
+        handleHealthMessage(healthMsg.state, data.buffer)
+      } else {
+        stay
+      }
+
+    case Event(memoryInfo: MemoryInfo, data: InvokerInfo) =>
+      publishHealthStatusAndStay(stateName, data.copy(memory = memoryInfo))
+
+    // in case of StatusRuntimeException: NOT_FOUND: etcdserver: requested lease not found, we need to get the lease again.
+    case Event(t: FailureMessage, _) =>
+      logging.error(this, s"Failure happens, restart InvokerHealthManager: ${t}")
+      goto(Offline)
+  }
+
+  // It is important to note that stateName and the stateData in onTransition callback refer to the previous one.
+  // We should access to the next data with nextStateData
+  onTransition {
+    case Offline -> Unhealthy =>
+      publishHealthStatusAndStay(Unhealthy, nextStateData)
+
+    case Healthy -> Unhealthy =>
+      unstashAll()
+      transid.mark(
+        this,
+        LoggingMarkers.LOADBALANCER_INVOKER_STATUS_CHANGE(Unhealthy.asString),
+        s"invoker${instanceId.toInt} is unhealthy",
+        akka.event.Logging.WarningLevel)
+      startTestAction(self)
+      publishHealthStatusAndStay(Unhealthy, nextStateData)
+
+    case _ -> Healthy =>
+      logging.info(this, s"invoker became healthy, stop health action proxy.")
+      unstashAll()
+      stopTestAction()
+
+      publishHealthStatusAndStay(Healthy, nextStateData)
+
+    case Offline -> Offline =>

Review comment:
       Already fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] bdoyle0182 commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
bdoyle0182 commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r586756142



##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthyManager.scala
##########
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
+import org.apache.openwhisk.core.entitlement.Privilege
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.types.EntityStore
+import org.apache.openwhisk.core.entity.{ActivationResponse => _, _}
+import org.apache.openwhisk.core.etcd.EtcdKV.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+class InvokerHealthManager(instanceId: InvokerInstanceId,
+                           healthContainerProxyFactory: (ActorRefFactory, ActorRef) => ActorRef,
+                           dataManagementService: ActorRef,
+                           entityStore: ArtifactStore[WhiskEntity])(implicit actorSystem: ActorSystem, logging: Logging)
+    extends FSM[InvokerState, InvokerHealthData]
+    with Stash {
+
+  implicit val requestTimeout = Timeout(5.seconds)
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  implicit val transid: TransactionId = TransactionId.invokerHealth
+
+  private[containerpool] var healthActionProxy: Option[ActorRef] = None
+
+  startWith(
+    Offline,
+    InvokerInfo(
+      new RingBuffer[Boolean](InvokerHealthManager.bufferSize),
+      memory = MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+
+  when(Offline) {
+    case Event(GracefulShutdown, _: InvokerInfo) =>
+      logging.warn(this, "Received a graceful shutdown flag, stopping the invoker.")
+      stay
+
+    case Event(Enable, _) =>
+      InvokerHealthManager.prepare(entityStore, instanceId).map { _ =>
+        startTestAction(self)
+      }
+      goto(Unhealthy)
+  }
+
+  when(Unhealthy) {
+    case Event(ContainerRemoved, _) =>
+      healthActionProxy = None
+      startTestAction(self)
+      stay
+
+    case Event(msg: FailureMessage, _) =>
+      logging.error(this, s"invoker${instanceId}, status:${stateName} got a failure message: ${msg}")
+      stay
+
+    case Event(ContainerCreationFailed(_), _) =>
+      stay
+  }
+
+  when(Healthy) {
+    case Event(msg: FailureMessage, _) =>
+      logging.error(this, s"invoker${instanceId}, status:${stateName} got a failure message: ${msg}")
+      goto(Unhealthy)
+  }
+
+  whenUnhandled {
+    case Event(_: Initialized, _) =>
+      // Initialized messages sent by ContainerProxy for HealthManger
+      stay()
+
+    case Event(ContainerRemoved, _) =>
+      // Drop messages sent by ContainerProxy for HealthManger
+      healthActionProxy = None
+      stay()
+
+    case Event(GracefulShutdown, _) =>
+      self ! GracefulShutdown
+      goto(Offline)
+
+    case Event(healthMsg: HealthMessage, data: InvokerInfo) =>
+      if (stateName != Offline) {
+        handleHealthMessage(healthMsg.state, data.buffer)
+      } else {
+        stay
+      }
+
+    case Event(memoryInfo: MemoryInfo, data: InvokerInfo) =>
+      publishHealthStatusAndStay(stateName, data.copy(memory = memoryInfo))
+
+    // in case of StatusRuntimeException: NOT_FOUND: etcdserver: requested lease not found, we need to get the lease again.
+    case Event(t: FailureMessage, _) =>
+      logging.error(this, s"Failure happens, restart InvokerHealthManager: ${t}")
+      goto(Offline)
+  }
+
+  // It is important to note that stateName and the stateData in onTransition callback refer to the previous one.
+  // We should access to the next data with nextStateData
+  onTransition {
+    case Offline -> Unhealthy =>
+      publishHealthStatusAndStay(Unhealthy, nextStateData)
+
+    case Healthy -> Unhealthy =>
+      unstashAll()
+      transid.mark(
+        this,
+        LoggingMarkers.LOADBALANCER_INVOKER_STATUS_CHANGE(Unhealthy.asString),
+        s"invoker${instanceId.toInt} is unhealthy",
+        akka.event.Logging.WarningLevel)
+      startTestAction(self)
+      publishHealthStatusAndStay(Unhealthy, nextStateData)
+
+    case _ -> Healthy =>
+      logging.info(this, s"invoker became healthy, stop health action proxy.")
+      unstashAll()
+      stopTestAction()
+
+      publishHealthStatusAndStay(Healthy, nextStateData)
+
+    case Offline -> Offline =>

Review comment:
       nit: can't this be removed? you can just leave out state transitions that don't do anything for this partial function




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang edited a comment on pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang edited a comment on pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#issuecomment-803774283


   Have any commit ? if this pr is merged, i can submit subsequent feature


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] codecov-io commented on pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#issuecomment-773863520


   # [Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=h1) Report
   > Merging [#5061](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=desc) (1ecd1c0) into [master](https://codecov.io/gh/apache/openwhisk/commit/1753946ac16b91b2d2a3fc55ab215b14e71c2b39?el=desc) (1753946) will **decrease** coverage by `54.11%`.
   > The diff coverage is `5.91%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/openwhisk/pull/5061/graphs/tree.svg?width=650&height=150&src=pr&token=l0YmsiSAso)](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff             @@
   ##           master    #5061       +/-   ##
   ===========================================
   - Coverage   82.80%   28.69%   -54.12%     
   ===========================================
     Files         207      203        -4     
     Lines       10034     9937       -97     
     Branches      444      435        -9     
   ===========================================
   - Hits         8309     2851     -5458     
   - Misses       1725     7086     +5361     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [.../org/apache/openwhisk/core/connector/Message.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29ubmVjdG9yL01lc3NhZ2Uuc2NhbGE=) | `28.98% <0.00%> (-50.71%)` | :arrow_down: |
   | [...ala/org/apache/openwhisk/core/etcd/EtcdUtils.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZXRjZC9FdGNkVXRpbHMuc2NhbGE=) | `0.00% <0.00%> (ø)` | |
   | [...ntainerpool/v2/FunctionPullingContainerProxy.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9pbnZva2VyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29udGFpbmVycG9vbC92Mi9GdW5jdGlvblB1bGxpbmdDb250YWluZXJQcm94eS5zY2FsYQ==) | `0.00% <0.00%> (ø)` | |
   | [.../core/containerpool/v2/InvokerHealthyManager.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9pbnZva2VyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29udGFpbmVycG9vbC92Mi9JbnZva2VySGVhbHRoeU1hbmFnZXIuc2NhbGE=) | `0.00% <0.00%> (ø)` | |
   | [...la/org/apache/openwhisk/common/TransactionId.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvbW1vbi9UcmFuc2FjdGlvbklkLnNjYWxh) | `82.35% <50.00%> (-12.83%)` | :arrow_down: |
   | [.../org/apache/openwhisk/core/entity/InstanceId.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZW50aXR5L0luc3RhbmNlSWQuc2NhbGE=) | `72.72% <90.00%> (-18.51%)` | :arrow_down: |
   | [...n/scala/org/apache/openwhisk/utils/JsHelpers.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL3V0aWxzL0pzSGVscGVycy5zY2FsYQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...a/org/apache/openwhisk/common/ConfigMapValue.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvbW1vbi9Db25maWdNYXBWYWx1ZS5zY2FsYQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...g/apache/openwhisk/common/ResizableSemaphore.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvbW1vbi9SZXNpemFibGVTZW1hcGhvcmUuc2NhbGE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...pache/openwhisk/http/LenientSprayJsonSupport.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2h0dHAvTGVuaWVudFNwcmF5SnNvblN1cHBvcnQuc2NhbGE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [164 more](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=footer). Last update [1753946...1ecd1c0](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
style95 commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r570661010



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,6 @@
+package org.apache.openwhisk.core.service
+
+case class UpdateDataOnChange(key: String, value: String)
+
+// TODO, all operations for etcd via DataManagementService

Review comment:
       I hope this PR is rebased after this kind of module is landed according to this plan:
   https://cwiki.apache.org/confluence/display/OPENWHISK/Component+Design




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r572519129



##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthyManager.scala
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
+import org.apache.openwhisk.core.entitlement.Privilege
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.types.EntityStore
+import org.apache.openwhisk.core.entity.{ActivationResponse => _, _}
+import org.apache.openwhisk.core.etcd.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+class InvokerHealthManager(instanceId: InvokerInstanceId,
+                           healthContainerProxyFactory: (ActorRefFactory, ActorRef) => ActorRef,
+                           dataManagementService: ActorRef,
+                           entityStore: ArtifactStore[WhiskEntity])(implicit actorSystem: ActorSystem, logging: Logging)
+    extends FSM[InvokerState, InvokerHealthData]
+    with Stash {
+
+  implicit val requestTimeout = Timeout(5.seconds)
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  implicit val transid: TransactionId = TransactionId.invokerHealth
+
+  private[containerpool] var healthActionProxy: Option[ActorRef] = None
+
+  startWith(
+    Offline,
+    InvokerInfo(
+      new RingBuffer[Boolean](InvokerHealthManager.bufferSize),
+      memory = MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+
+  when(Offline) {
+    case Event(GracefulShutdown, _: InvokerInfo) =>
+      logging.warn(this, "Received a graceful shutdown flag, stopping the invoker.")
+      stay
+
+    case Event(Enable, _) =>
+      InvokerHealthManager.prepare(entityStore, instanceId).map { _ =>
+        startTestAction(self)
+      }
+
+      goto(Unhealthy)
+  }
+
+  when(Unhealthy) {
+    case Event(ContainerRemoved, _) =>
+      healthActionProxy = None
+      startTestAction(self)
+
+      stay
+
+    case Event(msg: FailureMessage, _) =>
+      logging.error(this, s"invoker${instanceId}, status:${stateName} got a failure message: ${msg}")
+
+      stay
+
+    case Event(ContainerCreationFailed(_), _) =>
+      stay
+  }
+
+  when(Healthy) {
+    case Event(msg: FailureMessage, _) =>
+      logging.error(this, s"invoker${instanceId}, status:${stateName} got a failure message: ${msg}")
+      goto(Unhealthy)
+  }
+
+  whenUnhandled {
+    case Event(_: Initialized, _) =>
+      // Initialized messages sent by ContainerProxy for HealthManger
+      stay()
+    case Event(ContainerRemoved, _) =>
+      // Drop messages sent by ContainerProxy for HealthManger
+      healthActionProxy = None
+      stay()
+
+    case Event(GracefulShutdown, _) =>
+      self ! GracefulShutdown
+      goto(Offline)
+
+    case Event(healthMsg: HealthMessage, data: InvokerInfo) =>
+      if (stateName != Offline) {
+        handleHealthMessage(healthMsg.state, data.buffer)
+      } else {
+        stay
+      }
+
+    case Event(memoryInfo: MemoryInfo, data: InvokerInfo) =>
+      publishHealthStatusAndStay(stateName, data.copy(memory = memoryInfo))
+
+    // in case of StatusRuntimeException: NOT_FOUND: etcdserver: requested lease not found, we need to get the lease again.
+    case Event(t: FailureMessage, _) =>
+      logging.error(this, s"Failure happens, restart InvokerHealthManager: ${t}")
+
+      goto(Offline)
+
+  }
+
+  // It is important to note that stateName and the stateData in onTransition callback refer to the previous one.
+  // We should access to the next data with nextStateData
+  onTransition {
+    case Offline -> Unhealthy =>
+      publishHealthStatusAndStay(Unhealthy, nextStateData)
+
+    case Healthy -> Unhealthy =>
+      unstashAll()
+      transid.mark(
+        this,
+        LoggingMarkers.LOADBALANCER_INVOKER_STATUS_CHANGE(Unhealthy.asString),
+        s"invoker${instanceId.toInt} is unhealthy",
+        akka.event.Logging.WarningLevel)
+      startTestAction(self)
+      publishHealthStatusAndStay(Unhealthy, nextStateData)
+
+    case _ -> Healthy =>
+      logging.info(this, s"invoker became healthy, stop health action proxy.")
+      unstashAll()
+      stopTestAction()
+
+      publishHealthStatusAndStay(Healthy, nextStateData)
+
+    case Offline -> Offline =>
+    // this is an initial transition due to startWith, do nothing
+
+    case _ -> newState =>
+      publishHealthStatusAndStay(newState, nextStateData)
+
+      unstashAll()
+
+  }
+
+  private def publishHealthStatusAndStay(state: InvokerState, stateData: InvokerHealthData) = {
+    stateData match {
+      case data: InvokerInfo =>
+        val invokerResourceMessage = InvokerResourceMessage(
+          state.asString,
+          data.memory.freeMemory,
+          data.memory.busyMemory,
+          data.memory.inProgressMemory,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces)
+        InvokerHealthManager.useMemory = invokerResourceMessage.busyMemory + invokerResourceMessage.inProgressMemory
+        dataManagementService ! UpdateDataOnChange(InvokerKeys.health(instanceId), invokerResourceMessage.serialize)
+
+        stay using data.copy(currentInvokerResource = Some(invokerResourceMessage))
+
+      case data =>
+        logging.error(this, s"unexpected data is found: $data")
+
+        stay
+    }
+  }
+
+  initialize()
+
+  private def startTestAction(manager: ActorRef): Unit = {
+    val namespace = InvokerHealthManager.healthActionIdentity.namespace.name.asString
+    val docId = InvokerHealthManager.healthAction(instanceId).get.docid
+
+    WhiskAction.get(entityStore, docId).onComplete {
+      case Success(action) =>
+        val initialize = Initialize(namespace, action.toExecutableWhiskAction.get, "", 0, transid)
+        startHealthAction(initialize, manager)
+      case Failure(t) => logging.error(this, s"get health action error: ${t.getMessage}")
+    }
+  }
+
+  private def startHealthAction(initialize: Initialize, manager: ActorRef): Unit = {
+    healthActionProxy match {
+      case Some(proxy) =>
+        // make healthContainerProxy's status is Running, then healthContainerProxy can fetch the activation using ActivationServiceClient
+        proxy ! initialize
+      case None =>
+        val proxy = healthContainerProxyFactory(context, manager)
+        proxy ! initialize
+        healthActionProxy = Some(proxy)
+    }
+  }
+
+  def stopTestAction(): Unit = {
+    healthActionProxy.foreach {
+      healthActionProxy = None
+      _ ! GracefulShutdown
+    }
+  }
+
+  /**
+   * This method is to handle health message from ContainerProxy.pub
+   * It can induce status change.
+   *
+   * @param state  activation result state
+   * @param buffer RingBuffer to track status
+   * @return
+   */
+  def handleHealthMessage(state: Boolean, buffer: RingBuffer[Boolean]): State = {

Review comment:
       Currently, i used a simple method to check invoker's health status.
   did't check for system error.
   
   Maybe after this pr merged, we can submit another pr to optimize it. (I din't have much more good idea for it.)

##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthyManager.scala
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
+import org.apache.openwhisk.core.entitlement.Privilege
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.types.EntityStore
+import org.apache.openwhisk.core.entity.{ActivationResponse => _, _}
+import org.apache.openwhisk.core.etcd.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+class InvokerHealthManager(instanceId: InvokerInstanceId,
+                           healthContainerProxyFactory: (ActorRefFactory, ActorRef) => ActorRef,
+                           dataManagementService: ActorRef,
+                           entityStore: ArtifactStore[WhiskEntity])(implicit actorSystem: ActorSystem, logging: Logging)
+    extends FSM[InvokerState, InvokerHealthData]
+    with Stash {
+
+  implicit val requestTimeout = Timeout(5.seconds)
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  implicit val transid: TransactionId = TransactionId.invokerHealth
+
+  private[containerpool] var healthActionProxy: Option[ActorRef] = None
+
+  startWith(
+    Offline,
+    InvokerInfo(
+      new RingBuffer[Boolean](InvokerHealthManager.bufferSize),
+      memory = MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+
+  when(Offline) {
+    case Event(GracefulShutdown, _: InvokerInfo) =>
+      logging.warn(this, "Received a graceful shutdown flag, stopping the invoker.")
+      stay
+
+    case Event(Enable, _) =>
+      InvokerHealthManager.prepare(entityStore, instanceId).map { _ =>
+        startTestAction(self)
+      }
+
+      goto(Unhealthy)
+  }
+
+  when(Unhealthy) {
+    case Event(ContainerRemoved, _) =>
+      healthActionProxy = None
+      startTestAction(self)
+
+      stay
+
+    case Event(msg: FailureMessage, _) =>
+      logging.error(this, s"invoker${instanceId}, status:${stateName} got a failure message: ${msg}")
+
+      stay
+
+    case Event(ContainerCreationFailed(_), _) =>
+      stay
+  }
+
+  when(Healthy) {
+    case Event(msg: FailureMessage, _) =>
+      logging.error(this, s"invoker${instanceId}, status:${stateName} got a failure message: ${msg}")
+      goto(Unhealthy)
+  }
+
+  whenUnhandled {
+    case Event(_: Initialized, _) =>
+      // Initialized messages sent by ContainerProxy for HealthManger
+      stay()
+    case Event(ContainerRemoved, _) =>
+      // Drop messages sent by ContainerProxy for HealthManger
+      healthActionProxy = None
+      stay()
+
+    case Event(GracefulShutdown, _) =>
+      self ! GracefulShutdown
+      goto(Offline)
+
+    case Event(healthMsg: HealthMessage, data: InvokerInfo) =>
+      if (stateName != Offline) {
+        handleHealthMessage(healthMsg.state, data.buffer)
+      } else {
+        stay
+      }
+
+    case Event(memoryInfo: MemoryInfo, data: InvokerInfo) =>
+      publishHealthStatusAndStay(stateName, data.copy(memory = memoryInfo))
+
+    // in case of StatusRuntimeException: NOT_FOUND: etcdserver: requested lease not found, we need to get the lease again.
+    case Event(t: FailureMessage, _) =>
+      logging.error(this, s"Failure happens, restart InvokerHealthManager: ${t}")
+
+      goto(Offline)
+
+  }
+
+  // It is important to note that stateName and the stateData in onTransition callback refer to the previous one.
+  // We should access to the next data with nextStateData
+  onTransition {
+    case Offline -> Unhealthy =>
+      publishHealthStatusAndStay(Unhealthy, nextStateData)
+
+    case Healthy -> Unhealthy =>
+      unstashAll()
+      transid.mark(
+        this,
+        LoggingMarkers.LOADBALANCER_INVOKER_STATUS_CHANGE(Unhealthy.asString),
+        s"invoker${instanceId.toInt} is unhealthy",
+        akka.event.Logging.WarningLevel)
+      startTestAction(self)
+      publishHealthStatusAndStay(Unhealthy, nextStateData)
+
+    case _ -> Healthy =>
+      logging.info(this, s"invoker became healthy, stop health action proxy.")
+      unstashAll()
+      stopTestAction()
+
+      publishHealthStatusAndStay(Healthy, nextStateData)
+
+    case Offline -> Offline =>
+    // this is an initial transition due to startWith, do nothing
+
+    case _ -> newState =>
+      publishHealthStatusAndStay(newState, nextStateData)
+
+      unstashAll()
+
+  }
+
+  private def publishHealthStatusAndStay(state: InvokerState, stateData: InvokerHealthData) = {
+    stateData match {
+      case data: InvokerInfo =>
+        val invokerResourceMessage = InvokerResourceMessage(
+          state.asString,
+          data.memory.freeMemory,
+          data.memory.busyMemory,
+          data.memory.inProgressMemory,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces)
+        InvokerHealthManager.useMemory = invokerResourceMessage.busyMemory + invokerResourceMessage.inProgressMemory
+        dataManagementService ! UpdateDataOnChange(InvokerKeys.health(instanceId), invokerResourceMessage.serialize)
+
+        stay using data.copy(currentInvokerResource = Some(invokerResourceMessage))
+
+      case data =>
+        logging.error(this, s"unexpected data is found: $data")
+
+        stay
+    }
+  }
+
+  initialize()
+
+  private def startTestAction(manager: ActorRef): Unit = {
+    val namespace = InvokerHealthManager.healthActionIdentity.namespace.name.asString
+    val docId = InvokerHealthManager.healthAction(instanceId).get.docid
+
+    WhiskAction.get(entityStore, docId).onComplete {
+      case Success(action) =>
+        val initialize = Initialize(namespace, action.toExecutableWhiskAction.get, "", 0, transid)
+        startHealthAction(initialize, manager)
+      case Failure(t) => logging.error(this, s"get health action error: ${t.getMessage}")
+    }
+  }
+
+  private def startHealthAction(initialize: Initialize, manager: ActorRef): Unit = {
+    healthActionProxy match {
+      case Some(proxy) =>
+        // make healthContainerProxy's status is Running, then healthContainerProxy can fetch the activation using ActivationServiceClient
+        proxy ! initialize
+      case None =>
+        val proxy = healthContainerProxyFactory(context, manager)
+        proxy ! initialize
+        healthActionProxy = Some(proxy)
+    }
+  }
+
+  def stopTestAction(): Unit = {
+    healthActionProxy.foreach {
+      healthActionProxy = None
+      _ ! GracefulShutdown
+    }
+  }
+
+  /**
+   * This method is to handle health message from ContainerProxy.pub
+   * It can induce status change.
+   *
+   * @param state  activation result state
+   * @param buffer RingBuffer to track status
+   * @return
+   */
+  def handleHealthMessage(state: Boolean, buffer: RingBuffer[Boolean]): State = {

Review comment:
       Currently, i used a simple method to check invoker's health status.
   did't check for system error.
   
   Maybe after this pr merged, we can submit another pr to optimize it. (I didn't have much more good idea for it.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r570695549



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],
+                                  dedicatedNamespaces: Seq[String])
+    extends Message {
+
+  /**
+   * Serializes message to string. Must be idempotent.
+   */
+  override def serialize: String = InvokerResourceMessage.serdes.write(this).compactPrint
+
+  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
+
+  override def equals(that: Any): Boolean =
+    that match {
+      case that: InvokerResourceMessage => {
+        that.canEqual(this) &&
+        this.status == that.status &&
+        this.freeMemory == that.freeMemory &&
+        this.busyMemory == that.busyMemory &&
+        this.inProgressMemory == that.inProgressMemory &&
+        this.tags.toSet == that.tags.toSet
+        this.dedicatedNamespaces.toSet == that.dedicatedNamespaces.toSet
+      }
+      case _ => false
+    }
+
+  override def hashCode: Int = {
+    var result = 1;

Review comment:
       I already removed the equals and hashCode.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
style95 commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r597303170



##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthyManager.scala
##########
@@ -0,0 +1,384 @@
+/*

Review comment:
       I think the filename should be `InvokerHealthManager`?

##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthyManager.scala
##########
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
+import org.apache.openwhisk.core.entitlement.Privilege
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.types.EntityStore
+import org.apache.openwhisk.core.entity.{ActivationResponse => _, _}
+import org.apache.openwhisk.core.etcd.EtcdKV.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+class InvokerHealthManager(instanceId: InvokerInstanceId,
+                           healthContainerProxyFactory: (ActorRefFactory, ActorRef) => ActorRef,
+                           dataManagementService: ActorRef,
+                           entityStore: ArtifactStore[WhiskEntity])(implicit actorSystem: ActorSystem, logging: Logging)
+    extends FSM[InvokerState, InvokerHealthData]
+    with Stash {
+
+  implicit val requestTimeout = Timeout(5.seconds)
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  implicit val transid: TransactionId = TransactionId.invokerHealth
+
+  private[containerpool] var healthActionProxy: Option[ActorRef] = None
+
+  startWith(
+    Offline,
+    InvokerInfo(
+      new RingBuffer[Boolean](InvokerHealthManager.bufferSize),
+      memory = MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+
+  when(Offline) {
+    case Event(GracefulShutdown, _: InvokerInfo) =>
+      logging.warn(this, "Received a graceful shutdown flag, stopping the invoker.")
+      stay
+
+    case Event(Enable, _) =>
+      InvokerHealthManager.prepare(entityStore, instanceId).map { _ =>
+        startTestAction(self)
+      }
+      goto(Unhealthy)
+  }
+
+  when(Unhealthy) {
+    case Event(ContainerRemoved, _) =>
+      healthActionProxy = None
+      startTestAction(self)
+      stay
+
+    case Event(msg: FailureMessage, _) =>
+      logging.error(this, s"invoker${instanceId}, status:${stateName} got a failure message: ${msg}")
+      stay
+
+    case Event(ContainerCreationFailed(_), _) =>
+      stay
+  }
+
+  when(Healthy) {
+    case Event(msg: FailureMessage, _) =>
+      logging.error(this, s"invoker${instanceId}, status:${stateName} got a failure message: ${msg}")
+      goto(Unhealthy)
+  }
+
+  whenUnhandled {
+    case Event(_: Initialized, _) =>
+      // Initialized messages sent by ContainerProxy for HealthManger
+      stay()
+
+    case Event(ContainerRemoved, _) =>
+      // Drop messages sent by ContainerProxy for HealthManger
+      healthActionProxy = None
+      stay()
+
+    case Event(GracefulShutdown, _) =>
+      self ! GracefulShutdown
+      goto(Offline)
+
+    case Event(healthMsg: HealthMessage, data: InvokerInfo) =>
+      if (stateName != Offline) {
+        handleHealthMessage(healthMsg.state, data.buffer)
+      } else {
+        stay
+      }
+
+    case Event(memoryInfo: MemoryInfo, data: InvokerInfo) =>
+      publishHealthStatusAndStay(stateName, data.copy(memory = memoryInfo))
+
+    // in case of StatusRuntimeException: NOT_FOUND: etcdserver: requested lease not found, we need to get the lease again.
+    case Event(t: FailureMessage, _) =>
+      logging.error(this, s"Failure happens, restart InvokerHealthManager: ${t}")
+      goto(Offline)
+  }
+
+  // It is important to note that stateName and the stateData in onTransition callback refer to the previous one.
+  // We should access to the next data with nextStateData
+  onTransition {
+    case Offline -> Unhealthy =>
+      publishHealthStatusAndStay(Unhealthy, nextStateData)
+
+    case Healthy -> Unhealthy =>
+      unstashAll()
+      transid.mark(
+        this,
+        LoggingMarkers.LOADBALANCER_INVOKER_STATUS_CHANGE(Unhealthy.asString),
+        s"invoker${instanceId.toInt} is unhealthy",
+        akka.event.Logging.WarningLevel)
+      startTestAction(self)
+      publishHealthStatusAndStay(Unhealthy, nextStateData)
+
+    case _ -> Healthy =>
+      logging.info(this, s"invoker became healthy, stop health action proxy.")
+      unstashAll()
+      stopTestAction()
+
+      publishHealthStatusAndStay(Healthy, nextStateData)
+
+    case Offline -> Offline =>

Review comment:
       +1

##########
File path: tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/InvokerHealthyManagerTests.scala
##########
@@ -0,0 +1,452 @@
+/*

Review comment:
       The filename should be `InvokerHealthManagerTests`?

##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthyManager.scala
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
+import org.apache.openwhisk.core.entitlement.Privilege
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.types.EntityStore
+import org.apache.openwhisk.core.entity.{ActivationResponse => _, _}
+import org.apache.openwhisk.core.etcd.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+class InvokerHealthManager(instanceId: InvokerInstanceId,
+                           healthContainerProxyFactory: (ActorRefFactory, ActorRef) => ActorRef,
+                           dataManagementService: ActorRef,
+                           entityStore: ArtifactStore[WhiskEntity])(implicit actorSystem: ActorSystem, logging: Logging)
+    extends FSM[InvokerState, InvokerHealthData]
+    with Stash {
+
+  implicit val requestTimeout = Timeout(5.seconds)
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  implicit val transid: TransactionId = TransactionId.invokerHealth
+
+  private[containerpool] var healthActionProxy: Option[ActorRef] = None
+
+  startWith(
+    Offline,
+    InvokerInfo(
+      new RingBuffer[Boolean](InvokerHealthManager.bufferSize),
+      memory = MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+
+  when(Offline) {
+    case Event(GracefulShutdown, _: InvokerInfo) =>
+      logging.warn(this, "Received a graceful shutdown flag, stopping the invoker.")
+      stay
+
+    case Event(Enable, _) =>
+      InvokerHealthManager.prepare(entityStore, instanceId).map { _ =>
+        startTestAction(self)
+      }
+
+      goto(Unhealthy)
+  }
+
+  when(Unhealthy) {
+    case Event(ContainerRemoved, _) =>
+      healthActionProxy = None
+      startTestAction(self)
+
+      stay
+
+    case Event(msg: FailureMessage, _) =>
+      logging.error(this, s"invoker${instanceId}, status:${stateName} got a failure message: ${msg}")
+
+      stay
+
+    case Event(ContainerCreationFailed(_), _) =>
+      stay
+  }
+
+  when(Healthy) {
+    case Event(msg: FailureMessage, _) =>
+      logging.error(this, s"invoker${instanceId}, status:${stateName} got a failure message: ${msg}")
+      goto(Unhealthy)
+  }
+
+  whenUnhandled {
+    case Event(_: Initialized, _) =>
+      // Initialized messages sent by ContainerProxy for HealthManger
+      stay()
+    case Event(ContainerRemoved, _) =>
+      // Drop messages sent by ContainerProxy for HealthManger
+      healthActionProxy = None
+      stay()
+
+    case Event(GracefulShutdown, _) =>
+      self ! GracefulShutdown
+      goto(Offline)
+
+    case Event(healthMsg: HealthMessage, data: InvokerInfo) =>
+      if (stateName != Offline) {
+        handleHealthMessage(healthMsg.state, data.buffer)
+      } else {
+        stay
+      }
+
+    case Event(memoryInfo: MemoryInfo, data: InvokerInfo) =>
+      publishHealthStatusAndStay(stateName, data.copy(memory = memoryInfo))
+
+    // in case of StatusRuntimeException: NOT_FOUND: etcdserver: requested lease not found, we need to get the lease again.
+    case Event(t: FailureMessage, _) =>
+      logging.error(this, s"Failure happens, restart InvokerHealthManager: ${t}")
+
+      goto(Offline)
+
+  }
+
+  // It is important to note that stateName and the stateData in onTransition callback refer to the previous one.
+  // We should access to the next data with nextStateData
+  onTransition {
+    case Offline -> Unhealthy =>
+      publishHealthStatusAndStay(Unhealthy, nextStateData)
+
+    case Healthy -> Unhealthy =>
+      unstashAll()
+      transid.mark(
+        this,
+        LoggingMarkers.LOADBALANCER_INVOKER_STATUS_CHANGE(Unhealthy.asString),
+        s"invoker${instanceId.toInt} is unhealthy",
+        akka.event.Logging.WarningLevel)
+      startTestAction(self)
+      publishHealthStatusAndStay(Unhealthy, nextStateData)
+
+    case _ -> Healthy =>
+      logging.info(this, s"invoker became healthy, stop health action proxy.")
+      unstashAll()
+      stopTestAction()
+
+      publishHealthStatusAndStay(Healthy, nextStateData)
+
+    case Offline -> Offline =>
+    // this is an initial transition due to startWith, do nothing
+
+    case _ -> newState =>
+      publishHealthStatusAndStay(newState, nextStateData)
+
+      unstashAll()
+
+  }
+
+  private def publishHealthStatusAndStay(state: InvokerState, stateData: InvokerHealthData) = {
+    stateData match {
+      case data: InvokerInfo =>
+        val invokerResourceMessage = InvokerResourceMessage(
+          state.asString,
+          data.memory.freeMemory,
+          data.memory.busyMemory,
+          data.memory.inProgressMemory,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces)
+        InvokerHealthManager.useMemory = invokerResourceMessage.busyMemory + invokerResourceMessage.inProgressMemory
+        dataManagementService ! UpdateDataOnChange(InvokerKeys.health(instanceId), invokerResourceMessage.serialize)
+
+        stay using data.copy(currentInvokerResource = Some(invokerResourceMessage))
+
+      case data =>
+        logging.error(this, s"unexpected data is found: $data")
+
+        stay
+    }
+  }
+
+  initialize()
+
+  private def startTestAction(manager: ActorRef): Unit = {
+    val namespace = InvokerHealthManager.healthActionIdentity.namespace.name.asString
+    val docId = InvokerHealthManager.healthAction(instanceId).get.docid
+
+    WhiskAction.get(entityStore, docId).onComplete {
+      case Success(action) =>
+        val initialize = Initialize(namespace, action.toExecutableWhiskAction.get, "", 0, transid)
+        startHealthAction(initialize, manager)
+      case Failure(t) => logging.error(this, s"get health action error: ${t.getMessage}")
+    }
+  }
+
+  private def startHealthAction(initialize: Initialize, manager: ActorRef): Unit = {
+    healthActionProxy match {
+      case Some(proxy) =>
+        // make healthContainerProxy's status is Running, then healthContainerProxy can fetch the activation using ActivationServiceClient
+        proxy ! initialize
+      case None =>
+        val proxy = healthContainerProxyFactory(context, manager)
+        proxy ! initialize
+        healthActionProxy = Some(proxy)
+    }
+  }
+
+  def stopTestAction(): Unit = {
+    healthActionProxy.foreach {
+      healthActionProxy = None
+      _ ! GracefulShutdown
+    }
+  }
+
+  /**
+   * This method is to handle health message from ContainerProxy.pub
+   * It can induce status change.
+   *
+   * @param state  activation result state
+   * @param buffer RingBuffer to track status
+   * @return
+   */
+  def handleHealthMessage(state: Boolean, buffer: RingBuffer[Boolean]): State = {

Review comment:
       We just preserved the existing health check logic but I think we can make it configurable.
   Each container proxy is supposed to send a message indicating a failed result to InvokerHealthManager




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
style95 commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r570661010



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,6 @@
+package org.apache.openwhisk.core.service
+
+case class UpdateDataOnChange(key: String, value: String)
+
+// TODO, all operations for etcd via DataManagementService

Review comment:
       I hope this PR is rebased after this kind of module is landed according to this plan:
   https://cwiki.apache.org/confluence/display/OPENWHISK/Component+Design

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -440,34 +440,6 @@ case class InvokerResourceMessage(status: String,
    * Serializes message to string. Must be idempotent.
    */
   override def serialize: String = InvokerResourceMessage.serdes.write(this).compactPrint
-
-  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
-
-  override def equals(that: Any): Boolean =
-    that match {
-      case that: InvokerResourceMessage => {
-        that.canEqual(this) &&
-        this.status == that.status &&
-        this.freeMemory == that.freeMemory &&
-        this.busyMemory == that.busyMemory &&
-        this.inProgressMemory == that.inProgressMemory &&
-        this.tags.toSet == that.tags.toSet
-        this.dedicatedNamespaces.toSet == that.dedicatedNamespaces.toSet
-      }
-      case _ => false
-    }
-
-  override def hashCode: Int = {

Review comment:
       @ningyougang 
   IIRC, we introduced this not to store data to ETCD if the data is the same as the previous one.
   Can we make sure this requirement with the default comparison functionality provided by the case class?
   
   I am not quite sure but we were using this class without these methods but introduced them at some point for some reason.
   
   Could you confirm once?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] rabbah commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
rabbah commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r570578014



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],
+                                  dedicatedNamespaces: Seq[String])
+    extends Message {
+
+  /**
+   * Serializes message to string. Must be idempotent.
+   */
+  override def serialize: String = InvokerResourceMessage.serdes.write(this).compactPrint
+
+  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
+
+  override def equals(that: Any): Boolean =

Review comment:
       is this method needed? cases class equals already provides this implementation by definition.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] codecov-io edited a comment on pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#issuecomment-773863520


   # [Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=h1) Report
   > Merging [#5061](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=desc) (c90f829) into [master](https://codecov.io/gh/apache/openwhisk/commit/4a13303fae4d9750da6662bb39b3ec92d6ccf56c?el=desc) (4a13303) will **decrease** coverage by `7.58%`.
   > The diff coverage is `73.45%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/openwhisk/pull/5061/graphs/tree.svg?width=650&height=150&src=pr&token=l0YmsiSAso)](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #5061      +/-   ##
   ==========================================
   - Coverage   81.95%   74.36%   -7.59%     
   ==========================================
     Files         210      213       +3     
     Lines       10167    10376     +209     
     Branches      440      433       -7     
   ==========================================
   - Hits         8332     7716     -616     
   - Misses       1835     2660     +825     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...ntainerpool/v2/FunctionPullingContainerProxy.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9pbnZva2VyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29udGFpbmVycG9vbC92Mi9GdW5jdGlvblB1bGxpbmdDb250YWluZXJQcm94eS5zY2FsYQ==) | `0.00% <0.00%> (ø)` | |
   | [...la/org/apache/openwhisk/common/TransactionId.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvbW1vbi9UcmFuc2FjdGlvbklkLnNjYWxh) | `94.11% <50.00%> (-1.07%)` | :arrow_down: |
   | [.../core/containerpool/v2/InvokerHealthyManager.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9pbnZva2VyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29udGFpbmVycG9vbC92Mi9JbnZva2VySGVhbHRoeU1hbmFnZXIuc2NhbGE=) | `75.37% <75.37%> (ø)` | |
   | [.../org/apache/openwhisk/core/connector/Message.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29ubmVjdG9yL01lc3NhZ2Uuc2NhbGE=) | `78.98% <80.00%> (-0.71%)` | :arrow_down: |
   | [.../org/apache/openwhisk/core/entity/InstanceId.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZW50aXR5L0luc3RhbmNlSWQuc2NhbGE=) | `90.90% <90.00%> (-0.32%)` | :arrow_down: |
   | [...core/database/cosmosdb/RxObservableImplicits.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZGF0YWJhc2UvY29zbW9zZGIvUnhPYnNlcnZhYmxlSW1wbGljaXRzLnNjYWxh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ore/database/cosmosdb/cache/CacheInvalidator.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9jb3Ntb3NkYi9jYWNoZS1pbnZhbGlkYXRvci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL29wZW53aGlzay9jb3JlL2RhdGFiYXNlL2Nvc21vc2RiL2NhY2hlL0NhY2hlSW52YWxpZGF0b3Iuc2NhbGE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...e/database/cosmosdb/cache/ChangeFeedConsumer.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9jb3Ntb3NkYi9jYWNoZS1pbnZhbGlkYXRvci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL29wZW53aGlzay9jb3JlL2RhdGFiYXNlL2Nvc21vc2RiL2NhY2hlL0NoYW5nZUZlZWRDb25zdW1lci5zY2FsYQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...core/database/cosmosdb/CosmosDBArtifactStore.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZGF0YWJhc2UvY29zbW9zZGIvQ29zbW9zREJBcnRpZmFjdFN0b3JlLnNjYWxh) | `0.00% <0.00%> (-95.85%)` | :arrow_down: |
   | [.../openwhisk/core/scheduler/FPCSchedulerServer.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9zY2hlZHVsZXIvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9vcGVud2hpc2svY29yZS9zY2hlZHVsZXIvRlBDU2NoZWR1bGVyU2VydmVyLnNjYWxh) | `0.00% <0.00%> (-95.84%)` | :arrow_down: |
   | ... and [23 more](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=footer). Last update [4a13303...c90f829](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang commented on pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang commented on pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#issuecomment-802577794


   Already rebased the latest code, and ready to review again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r570713341



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],

Review comment:
       This is used for some special actions. e.g. need high memory, need power gpu.
   We can make the action's invocations run on corresponding invokers (BTW, this requirement comes from user in our company)
   
   * Firstly, add some annotation to the actions, e.g.
   
   ```
   wsk action create hello-gpu ~/hello-gpu.js --annotation  invoker-resources ["gpu"]
   ```
   * Secondly, when deploy invoker, add the relative tag to the invoker as well
   ```
   whisk/invokers/0/0
   {"busyMemory":0,"dedicatedNamespaces":[],"freeMemory":10240,"inProgressMemory":0,"status":"up","tags":['gpu']}
   whisk/invokers/1/1
   {"busyMemory":0,"dedicatedNamespaces":[],"freeMemory":10240,"inProgressMemory":0,"status":"up","tags":[]}
   ```
   * Finally, when run the action, the activations for that action will run on above invoker0 which includes tag: gpu




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang edited a comment on pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang edited a comment on pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#issuecomment-803774283


   Have any commit ? if this pr merged, i can submit subsequent feature


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang commented on pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang commented on pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#issuecomment-789351618


   @bdoyle0182 
   
   > I think this generally looks good to me. But I think I need to see the the upstream DataManagementService this is forwarding message to and possibly the new downstream ContainerProxy that I believe is requesting activations by sending a message to this fsm?
   
   Sorry, i don't understand what you mean. i tried to share like below
   This is the DataManagementService pr: https://github.com/apache/openwhisk/pull/5063, store data to etcd with lease / delete data from etcd.  invokerHealthyManager will use DataManagementService  to store its healthy data to etcd,  e.g. dataManagementService ! UpdateDataOnChange(InvokerKeys.health(instanceId), invokerResourceMessage.serialize)
   
   
   > So is my understanding correct that every invoker will have a healthcheck container running at all times? What is the gap between requesting healthcheck activations from this fsm from the container or is it just always trying to run the healthcheck code with no gaps?
   
   InvokerHealthyManager doesn't have a healthcheck container running at all time, it will create a healthcheck container at the very beginning or when the healthy status's changed from Healthy to Unhealthy, for the activation request for the healthcheck container,  it will fetch the local activation, e.g.
   ![image](https://user-images.githubusercontent.com/11749867/109737890-d1b45500-7c01-11eb-8ba7-0100db62a0bd.png)
   BTW, for the other action container, it will fetch activation from scheduler component's memoryQueue using grpc.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] codecov-io edited a comment on pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#issuecomment-773863520


   # [Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=h1) Report
   > Merging [#5061](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=desc) (c90f829) into [master](https://codecov.io/gh/apache/openwhisk/commit/4a13303fae4d9750da6662bb39b3ec92d6ccf56c?el=desc) (4a13303) will **decrease** coverage by `6.77%`.
   > The diff coverage is `73.45%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/openwhisk/pull/5061/graphs/tree.svg?width=650&height=150&src=pr&token=l0YmsiSAso)](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #5061      +/-   ##
   ==========================================
   - Coverage   81.95%   75.17%   -6.78%     
   ==========================================
     Files         210      213       +3     
     Lines       10167    10376     +209     
     Branches      440      433       -7     
   ==========================================
   - Hits         8332     7800     -532     
   - Misses       1835     2576     +741     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...ntainerpool/v2/FunctionPullingContainerProxy.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9pbnZva2VyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29udGFpbmVycG9vbC92Mi9GdW5jdGlvblB1bGxpbmdDb250YWluZXJQcm94eS5zY2FsYQ==) | `0.00% <0.00%> (ø)` | |
   | [...la/org/apache/openwhisk/common/TransactionId.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvbW1vbi9UcmFuc2FjdGlvbklkLnNjYWxh) | `94.11% <50.00%> (-1.07%)` | :arrow_down: |
   | [.../core/containerpool/v2/InvokerHealthyManager.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9pbnZva2VyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29udGFpbmVycG9vbC92Mi9JbnZva2VySGVhbHRoeU1hbmFnZXIuc2NhbGE=) | `75.37% <75.37%> (ø)` | |
   | [.../org/apache/openwhisk/core/connector/Message.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvY29ubmVjdG9yL01lc3NhZ2Uuc2NhbGE=) | `79.71% <80.00%> (+0.02%)` | :arrow_up: |
   | [.../org/apache/openwhisk/core/entity/InstanceId.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZW50aXR5L0luc3RhbmNlSWQuc2NhbGE=) | `90.90% <90.00%> (-0.32%)` | :arrow_down: |
   | [...core/database/cosmosdb/RxObservableImplicits.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZGF0YWJhc2UvY29zbW9zZGIvUnhPYnNlcnZhYmxlSW1wbGljaXRzLnNjYWxh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ore/database/cosmosdb/cache/CacheInvalidator.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9jb3Ntb3NkYi9jYWNoZS1pbnZhbGlkYXRvci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL29wZW53aGlzay9jb3JlL2RhdGFiYXNlL2Nvc21vc2RiL2NhY2hlL0NhY2hlSW52YWxpZGF0b3Iuc2NhbGE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...e/database/cosmosdb/cache/ChangeFeedConsumer.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29yZS9jb3Ntb3NkYi9jYWNoZS1pbnZhbGlkYXRvci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL29wZW53aGlzay9jb3JlL2RhdGFiYXNlL2Nvc21vc2RiL2NhY2hlL0NoYW5nZUZlZWRDb25zdW1lci5zY2FsYQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...core/database/cosmosdb/CosmosDBArtifactStore.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZGF0YWJhc2UvY29zbW9zZGIvQ29zbW9zREJBcnRpZmFjdFN0b3JlLnNjYWxh) | `0.00% <0.00%> (-95.85%)` | :arrow_down: |
   | [...sk/core/database/cosmosdb/CosmosDBViewMapper.scala](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZGF0YWJhc2UvY29zbW9zZGIvQ29zbW9zREJWaWV3TWFwcGVyLnNjYWxh) | `0.00% <0.00%> (-93.90%)` | :arrow_down: |
   | ... and [19 more](https://codecov.io/gh/apache/openwhisk/pull/5061/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=footer). Last update [4a13303...c90f829](https://codecov.io/gh/apache/openwhisk/pull/5061?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r570693954



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala
##########
@@ -29,11 +29,18 @@ import scala.util.Try
  * @param instance a numeric value used for the load balancing and Kafka topic creation
  * @param uniqueName an identifier required for dynamic instance assignment by Zookeeper
  * @param displayedName an identifier that is required for the health protocol to correlate Kafka topics with invoker container names
+ * @param userMemory invoker user memory
+ * @param busyMemory invoker busy memory
+ * @param tags actions which included specified annotation tags can be run on this invoker
+ * @param dedicatedNamespaces only dedicatedNamespaces's actions can be run on this invoker
  */
 case class InvokerInstanceId(val instance: Int,
                              uniqueName: Option[String] = None,
                              displayedName: Option[String] = None,
-                             val userMemory: ByteSize)
+                             val userMemory: ByteSize,

Review comment:
       Good question.
   
   I tested in my local, doesn't affect the message bus.  This pr's invoker in upsteam master's controller's healthy status is `up`.
   But your said problem i meet before, in that time, it seems the PingMessage is changed, so lead to the invoker of new codes in controller of old code's healthy status is `unhealthy`. if this issue comes. we solved it using another deployment method, e.g.
   * Remove half controller from nginx.
   * Disable half invoker
   * Deploy half controller/invoker using new codes
   * Add the half controller nginx.
   * Deploy another half components using above steps




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] bdoyle0182 commented on pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
bdoyle0182 commented on pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#issuecomment-773500124


   Now that we are beginning to touch the existing components, we should send out a message on the mailing list that new changes are entering the existing code that are not yet in use as the scheduler is slowly being rolled out in code.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r570714082



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],

Review comment:
       BTW, dedicatedNamespaces means dedicatedNamespaces's all actions run on corresponding invoker.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang commented on pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang commented on pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#issuecomment-803774283


   have any commit, if this pr merged, i can submit subsequent feature


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r570695768



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -440,34 +440,6 @@ case class InvokerResourceMessage(status: String,
    * Serializes message to string. Must be idempotent.
    */
   override def serialize: String = InvokerResourceMessage.serdes.write(this).compactPrint
-
-  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
-
-  override def equals(that: Any): Boolean =
-    that match {
-      case that: InvokerResourceMessage => {
-        that.canEqual(this) &&
-        this.status == that.status &&
-        this.freeMemory == that.freeMemory &&
-        this.busyMemory == that.busyMemory &&
-        this.inProgressMemory == that.inProgressMemory &&
-        this.tags.toSet == that.tags.toSet
-        this.dedicatedNamespaces.toSet == that.dedicatedNamespaces.toSet
-      }
-      case _ => false
-    }
-
-  override def hashCode: Int = {

Review comment:
       Ok, i will check




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] bdoyle0182 commented on pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
bdoyle0182 commented on pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#issuecomment-790079446


   LGTM but this should wait until rebasing with DataManagementService to merge


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] bdoyle0182 commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
bdoyle0182 commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r571218281



##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import java.time.Instant
+
+import akka.actor.ActorRef
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.containerpool.Container
+import org.apache.openwhisk.core.entity.{ByteSize, CodeExec, DocRevision, ExecutableWhiskAction}
+import org.apache.openwhisk.core.entity.size._
+
+// Events received by the actor
+case class Initialize(invocationNamespace: String,
+                      action: ExecutableWhiskAction,
+                      schedulerHost: String,
+                      rpcPort: Int,
+                      transId: TransactionId)
+case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
+
+// Event sent by the actor
+case class ContainerCreationFailed(throwable: Throwable)
+case class ContainerIsPaused(data: WarmData)
+case class ClientCreationFailed(throwable: Throwable,
+                                container: Container,
+                                invocationNamespace: String,
+                                action: ExecutableWhiskAction)
+case class ReadyToWork(data: Data)
+case class Initialized(data: InitializedData)
+case class Resumed(data: WarmData)
+case class ResumeFailed(data: WarmData)
+case class RecreateClient(action: ExecutableWhiskAction)
+case object ContainerRemoved // when container is destroyed
+
+// States
+sealed trait ProxyState
+case object LeaseStart extends ProxyState
+case object Uninitialized extends ProxyState
+case object ContainerCreating extends ProxyState

Review comment:
       ```suggestion
   case object CreatingContainer extends ProxyState
   ```

##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthyManager.scala
##########
@@ -0,0 +1,379 @@
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
+import org.apache.openwhisk.core.entitlement.Privilege
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.types.EntityStore
+import org.apache.openwhisk.core.entity.{ActivationResponse => _, _}
+import org.apache.openwhisk.core.etcd.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+class InvokerHealthManager(instanceId: InvokerInstanceId,
+                           healthContainerProxyFactory: (ActorRefFactory, ActorRef) => ActorRef,
+                           dataManagementService: ActorRef,
+                           entityStore: ArtifactStore[WhiskEntity])(implicit actorSystem: ActorSystem, logging: Logging)
+    extends FSM[InvokerState, InvokerHealthData]
+    with Stash {
+
+  implicit val requestTimeout = Timeout(5.seconds)
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  implicit val transid: TransactionId = TransactionId.invokerHealth
+
+  private[containerpool] var healthActionProxy: Option[ActorRef] = None
+
+  startWith(
+    Offline,
+    InvokerInfo(
+      new RingBuffer[Boolean](InvokerHealthManager.bufferSize),
+      memory = MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+
+  when(Offline) {
+    case Event(GracefulShutdown, _: InvokerInfo) =>
+      logging.warn(this, "Received a graceful shutdown flag, stopping the invoker.")
+      stay
+
+    case Event(Enable, _) =>
+      InvokerHealthManager.prepare(entityStore, instanceId).map { _ =>
+        startTestAction(self)
+      }
+
+      goto(UnHealthy)
+  }
+
+  when(UnHealthy) {
+    case Event(ContainerRemoved, _) =>
+      healthActionProxy = None
+      startTestAction(self)
+
+      stay
+
+    case Event(msg: FailureMessage, _) =>
+      logging.error(this, s"invoker${instanceId}, status:${stateName} got a failure message: ${msg}")
+
+      stay
+
+    case Event(ContainerCreationFailed(_), _) =>
+      stay
+  }
+
+  when(Healthy) {
+    case Event(msg: FailureMessage, _) =>
+      logging.error(this, s"invoker${instanceId}, status:${stateName} got a failure message: ${msg}")
+      goto(UnHealthy)
+  }
+
+  whenUnhandled {
+    case Event(_: Initialized, _) =>
+      // Initialized messages sent by ContainerProxy for HealthManger
+      stay()
+    case Event(ContainerRemoved, _) =>
+      // Drop messages sent by ContainerProxy for HealthManger
+      healthActionProxy = None
+      stay()
+
+    case Event(GracefulShutdown, _) =>
+      self ! GracefulShutdown
+      goto(Offline)
+
+    case Event(healthMsg: HealthMessage, data: InvokerInfo) =>
+      if (stateName != Offline) {
+        handleHealthMessage(healthMsg.state, data.buffer)
+      } else {
+        stay
+      }
+
+    case Event(memoryInfo: MemoryInfo, data: InvokerInfo) =>
+      publishHealthStatusAndStay(stateName, data.copy(memory = memoryInfo))
+
+    // in case of StatusRuntimeException: NOT_FOUND: etcdserver: requested lease not found, we need to get the lease again.
+    case Event(t: FailureMessage, _) =>
+      logging.error(this, s"Failure happens, restart InvokerHealthManager: ${t}")
+
+      goto(Offline)
+
+  }
+
+  // It is important to note that stateName and the stateData in onTransition callback refer to the previous one.
+  // We should access to the next data with nextStateData
+  onTransition {
+    case Offline -> UnHealthy =>
+      publishHealthStatusAndStay(UnHealthy, nextStateData)
+
+    case Healthy -> UnHealthy =>
+      unstashAll()
+      transid.mark(
+        this,
+        LoggingMarkers.LOADBALANCER_INVOKER_STATUS_CHANGE(UnHealthy.asString),
+        s"invoker${instanceId.toInt} is unhealthy",
+        akka.event.Logging.WarningLevel)
+      startTestAction(self)
+      publishHealthStatusAndStay(UnHealthy, nextStateData)
+
+    case _ -> Healthy =>
+      logging.info(this, s"invoker became healthy, stop health action proxy.")
+      unstashAll()
+      stopTestAction()
+
+      publishHealthStatusAndStay(Healthy, nextStateData)
+
+    case Offline -> Offline =>
+    // this is an initial transition due to startWith, do nothing
+
+    case _ -> newState =>
+      publishHealthStatusAndStay(newState, nextStateData)
+
+      unstashAll()
+
+  }
+
+  private def publishHealthStatusAndStay(state: InvokerState, stateData: InvokerHealthData) = {
+    stateData match {
+      case data: InvokerInfo =>
+        val invokerResourceMessage = InvokerResourceMessage(
+          state.asString,
+          data.memory.freeMemory,
+          data.memory.busyMemory,
+          data.memory.inProgressMemory,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces)
+        InvokerHealthManager.useMemory = invokerResourceMessage.busyMemory + invokerResourceMessage.inProgressMemory
+        dataManagementService ! UpdateDataOnChange(InvokerKeys.health(instanceId), invokerResourceMessage.serialize)
+
+        stay using data.copy(currentInvokerResource = Some(invokerResourceMessage))
+
+      case data =>
+        logging.error(this, s"unexpected data is found: $data")
+
+        stay
+    }
+  }
+
+  initialize()
+
+  private def startTestAction(manager: ActorRef): Unit = {
+    val namespace = InvokerHealthManager.healthActionIdentity.namespace.name.asString
+    val docId = InvokerHealthManager.healthAction(instanceId).get.docid
+
+    WhiskAction.get(entityStore, docId).onComplete {
+      case Success(action) =>
+        val initialize = Initialize(namespace, action.toExecutableWhiskAction.get, "", 0, transid)
+        startHealthAction(initialize, manager)
+      case Failure(t) => logging.error(this, s"get health action error: ${t.getMessage}")
+    }
+  }
+
+  private def startHealthAction(initialize: Initialize, manager: ActorRef): Unit = {
+    healthActionProxy match {
+      case Some(proxy) =>
+        // make healthContainerProxy's status is Running, then healthContainerProxy can fetch the activation using ActivationServiceClient
+        proxy ! initialize
+      case None =>
+        val proxy = healthContainerProxyFactory(context, manager)
+        proxy ! initialize
+        healthActionProxy = Some(proxy)
+    }
+  }
+
+  def stopTestAction(): Unit = {
+    healthActionProxy.foreach {
+      healthActionProxy = None
+      _ ! GracefulShutdown
+    }
+  }
+
+  /**
+   * This method is to handle health message from ContainerProxy.pub
+   * It can induce status change.
+   *
+   * @param state  activation result state
+   * @param buffer RingBuffer to track status
+   * @return
+   */
+  def handleHealthMessage(state: Boolean, buffer: RingBuffer[Boolean]): State = {
+    buffer.add(state)
+    val falseStateCount = buffer.toList.count(_ == false)
+    if (falseStateCount < InvokerHealthManager.bufferErrorTolerance) {
+      gotoIfNotThere(Healthy)
+    } else {
+      logging.warn(
+        this,
+        s"become unhealthy because system error exceeded the error tolerance, falseStateCount $falseStateCount, errorTolerance ${InvokerHealthManager.bufferErrorTolerance}")
+      gotoIfNotThere(UnHealthy)
+    }
+  }
+
+  /**
+   * This is to decide wether to change from the newState or not.
+   * If current state is already newState, it will stay, otherwise it will change its state.
+   *
+   * @param newState the desired state to change.
+   * @return
+   */
+  private def gotoIfNotThere(newState: InvokerState) = {
+    if (stateName == newState) {
+      stay()
+    } else {
+      goto(newState)
+    }
+  }
+
+  /** Delays all incoming messages until unstashAll() is called */
+  def delay = {
+    stash()
+    stay
+  }
+
+}
+
+case class HealthActivationServiceClient() extends Actor {
+
+  private var closed: Boolean = false
+
+  override def receive: Receive = {
+    case StartClient => sender() ! ClientCreationCompleted()
+    case _: RequestActivation =>
+      InvokerHealthManager.healthActivation match {
+        case Some(activation) if !closed =>
+          sender() ! activation.copy(
+            transid = TransactionId.invokerHealthActivation,
+            activationId = ActivationId.generate())
+
+        case _ if closed =>
+          context.parent ! ClientClosed
+          context.stop(self)
+
+        case _ => // do nothing
+      }
+
+    case CloseClientProxy =>
+      closed = true
+
+  }
+}
+
+object InvokerHealthManager {
+  val healthActionNamePrefix = "invokerHealthTestAction"
+  val bufferSize = 10
+  val bufferErrorTolerance = 3
+
+  var useMemory = 0l

Review comment:
       Shouldn't use vars in singleton object if possible

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],

Review comment:
       This is a VERY powerful feature to group invokers depending on the operator's needs without creating multiple clusters. I'm glad it's being introduced with the new scheduler.

##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthyManager.scala
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
+import org.apache.openwhisk.core.entitlement.Privilege
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.types.EntityStore
+import org.apache.openwhisk.core.entity.{ActivationResponse => _, _}
+import org.apache.openwhisk.core.etcd.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+class InvokerHealthManager(instanceId: InvokerInstanceId,
+                           healthContainerProxyFactory: (ActorRefFactory, ActorRef) => ActorRef,
+                           dataManagementService: ActorRef,
+                           entityStore: ArtifactStore[WhiskEntity])(implicit actorSystem: ActorSystem, logging: Logging)
+    extends FSM[InvokerState, InvokerHealthData]
+    with Stash {
+
+  implicit val requestTimeout = Timeout(5.seconds)
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  implicit val transid: TransactionId = TransactionId.invokerHealth
+
+  private[containerpool] var healthActionProxy: Option[ActorRef] = None
+
+  startWith(
+    Offline,
+    InvokerInfo(
+      new RingBuffer[Boolean](InvokerHealthManager.bufferSize),
+      memory = MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+
+  when(Offline) {

Review comment:
       nit: There's a lot of unnecessary blank lines from here on down we can clean up

##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import java.time.Instant
+
+import akka.actor.ActorRef
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.containerpool.Container
+import org.apache.openwhisk.core.entity.{ByteSize, CodeExec, DocRevision, ExecutableWhiskAction}
+import org.apache.openwhisk.core.entity.size._
+
+// Events received by the actor
+case class Initialize(invocationNamespace: String,
+                      action: ExecutableWhiskAction,
+                      schedulerHost: String,
+                      rpcPort: Int,
+                      transId: TransactionId)
+case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
+
+// Event sent by the actor
+case class ContainerCreationFailed(throwable: Throwable)
+case class ContainerIsPaused(data: WarmData)
+case class ClientCreationFailed(throwable: Throwable,
+                                container: Container,
+                                invocationNamespace: String,
+                                action: ExecutableWhiskAction)
+case class ReadyToWork(data: Data)
+case class Initialized(data: InitializedData)
+case class Resumed(data: WarmData)
+case class ResumeFailed(data: WarmData)
+case class RecreateClient(action: ExecutableWhiskAction)
+case object ContainerRemoved // when container is destroyed
+
+// States
+sealed trait ProxyState
+case object LeaseStart extends ProxyState
+case object Uninitialized extends ProxyState
+case object ContainerCreating extends ProxyState
+case object ContainerCreated extends ProxyState
+case object ClientCreating extends ProxyState

Review comment:
       ```suggestion
   case object CreatingClient extends ProxyState
   ```

##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthyManager.scala
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
+import org.apache.openwhisk.core.entitlement.Privilege
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.types.EntityStore
+import org.apache.openwhisk.core.entity.{ActivationResponse => _, _}
+import org.apache.openwhisk.core.etcd.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+class InvokerHealthManager(instanceId: InvokerInstanceId,
+                           healthContainerProxyFactory: (ActorRefFactory, ActorRef) => ActorRef,
+                           dataManagementService: ActorRef,
+                           entityStore: ArtifactStore[WhiskEntity])(implicit actorSystem: ActorSystem, logging: Logging)
+    extends FSM[InvokerState, InvokerHealthData]
+    with Stash {
+
+  implicit val requestTimeout = Timeout(5.seconds)
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  implicit val transid: TransactionId = TransactionId.invokerHealth
+
+  private[containerpool] var healthActionProxy: Option[ActorRef] = None
+
+  startWith(
+    Offline,
+    InvokerInfo(
+      new RingBuffer[Boolean](InvokerHealthManager.bufferSize),
+      memory = MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+
+  when(Offline) {
+    case Event(GracefulShutdown, _: InvokerInfo) =>
+      logging.warn(this, "Received a graceful shutdown flag, stopping the invoker.")
+      stay
+
+    case Event(Enable, _) =>
+      InvokerHealthManager.prepare(entityStore, instanceId).map { _ =>
+        startTestAction(self)
+      }
+
+      goto(Unhealthy)
+  }
+
+  when(Unhealthy) {
+    case Event(ContainerRemoved, _) =>
+      healthActionProxy = None
+      startTestAction(self)
+
+      stay
+
+    case Event(msg: FailureMessage, _) =>
+      logging.error(this, s"invoker${instanceId}, status:${stateName} got a failure message: ${msg}")
+
+      stay
+
+    case Event(ContainerCreationFailed(_), _) =>
+      stay
+  }
+
+  when(Healthy) {
+    case Event(msg: FailureMessage, _) =>
+      logging.error(this, s"invoker${instanceId}, status:${stateName} got a failure message: ${msg}")
+      goto(Unhealthy)
+  }
+
+  whenUnhandled {
+    case Event(_: Initialized, _) =>
+      // Initialized messages sent by ContainerProxy for HealthManger
+      stay()
+    case Event(ContainerRemoved, _) =>
+      // Drop messages sent by ContainerProxy for HealthManger
+      healthActionProxy = None
+      stay()
+
+    case Event(GracefulShutdown, _) =>
+      self ! GracefulShutdown
+      goto(Offline)
+
+    case Event(healthMsg: HealthMessage, data: InvokerInfo) =>
+      if (stateName != Offline) {
+        handleHealthMessage(healthMsg.state, data.buffer)
+      } else {
+        stay
+      }
+
+    case Event(memoryInfo: MemoryInfo, data: InvokerInfo) =>
+      publishHealthStatusAndStay(stateName, data.copy(memory = memoryInfo))
+
+    // in case of StatusRuntimeException: NOT_FOUND: etcdserver: requested lease not found, we need to get the lease again.
+    case Event(t: FailureMessage, _) =>
+      logging.error(this, s"Failure happens, restart InvokerHealthManager: ${t}")
+
+      goto(Offline)
+
+  }
+
+  // It is important to note that stateName and the stateData in onTransition callback refer to the previous one.
+  // We should access to the next data with nextStateData
+  onTransition {
+    case Offline -> Unhealthy =>
+      publishHealthStatusAndStay(Unhealthy, nextStateData)
+
+    case Healthy -> Unhealthy =>
+      unstashAll()
+      transid.mark(
+        this,
+        LoggingMarkers.LOADBALANCER_INVOKER_STATUS_CHANGE(Unhealthy.asString),
+        s"invoker${instanceId.toInt} is unhealthy",
+        akka.event.Logging.WarningLevel)
+      startTestAction(self)
+      publishHealthStatusAndStay(Unhealthy, nextStateData)
+
+    case _ -> Healthy =>
+      logging.info(this, s"invoker became healthy, stop health action proxy.")
+      unstashAll()
+      stopTestAction()
+
+      publishHealthStatusAndStay(Healthy, nextStateData)
+
+    case Offline -> Offline =>
+    // this is an initial transition due to startWith, do nothing
+
+    case _ -> newState =>
+      publishHealthStatusAndStay(newState, nextStateData)
+
+      unstashAll()
+
+  }
+
+  private def publishHealthStatusAndStay(state: InvokerState, stateData: InvokerHealthData) = {
+    stateData match {
+      case data: InvokerInfo =>
+        val invokerResourceMessage = InvokerResourceMessage(
+          state.asString,
+          data.memory.freeMemory,
+          data.memory.busyMemory,
+          data.memory.inProgressMemory,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces)
+        InvokerHealthManager.useMemory = invokerResourceMessage.busyMemory + invokerResourceMessage.inProgressMemory
+        dataManagementService ! UpdateDataOnChange(InvokerKeys.health(instanceId), invokerResourceMessage.serialize)
+
+        stay using data.copy(currentInvokerResource = Some(invokerResourceMessage))
+
+      case data =>
+        logging.error(this, s"unexpected data is found: $data")
+
+        stay
+    }
+  }
+
+  initialize()
+
+  private def startTestAction(manager: ActorRef): Unit = {
+    val namespace = InvokerHealthManager.healthActionIdentity.namespace.name.asString
+    val docId = InvokerHealthManager.healthAction(instanceId).get.docid
+
+    WhiskAction.get(entityStore, docId).onComplete {
+      case Success(action) =>
+        val initialize = Initialize(namespace, action.toExecutableWhiskAction.get, "", 0, transid)
+        startHealthAction(initialize, manager)
+      case Failure(t) => logging.error(this, s"get health action error: ${t.getMessage}")
+    }
+  }
+
+  private def startHealthAction(initialize: Initialize, manager: ActorRef): Unit = {
+    healthActionProxy match {
+      case Some(proxy) =>
+        // make healthContainerProxy's status is Running, then healthContainerProxy can fetch the activation using ActivationServiceClient
+        proxy ! initialize
+      case None =>
+        val proxy = healthContainerProxyFactory(context, manager)
+        proxy ! initialize
+        healthActionProxy = Some(proxy)
+    }
+  }
+
+  def stopTestAction(): Unit = {
+    healthActionProxy.foreach {
+      healthActionProxy = None
+      _ ! GracefulShutdown
+    }
+  }
+
+  /**
+   * This method is to handle health message from ContainerProxy.pub
+   * It can induce status change.
+   *
+   * @param state  activation result state
+   * @param buffer RingBuffer to track status
+   * @return
+   */
+  def handleHealthMessage(state: Boolean, buffer: RingBuffer[Boolean]): State = {

Review comment:
       In the existing invoker supervision, the buffer includes user actions result for the ring buffer to check for system error. I don't know without seeing more of the new scheduler code, but we'll still have those checks right elsewhere? Also since this ring buffer seems to only include health check activations, can this be more aggressive? I would think you can just go unhealthy if a single one fails rather than three of the last ten.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] bdoyle0182 commented on pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
bdoyle0182 commented on pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#issuecomment-773500124


   Now that we are beginning to touch the existing components, we should send out a message on the mailing list that new changes are entering the existing code that are not yet in use as the scheduler is slowly being rolled out in code.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r570691563



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],
+                                  dedicatedNamespaces: Seq[String])
+    extends Message {
+
+  /**
+   * Serializes message to string. Must be idempotent.
+   */
+  override def serialize: String = InvokerResourceMessage.serdes.write(this).compactPrint
+
+  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
+
+  override def equals(that: Any): Boolean =

Review comment:
       Ah, yes, i tested in my local, after removed `the override def equals and the override def hashCode`, works well both, e.g.
   for two InvokerResourceMessage object,
   * If all field value is same, object1.equals(object) will be `true`, 
   * if any one filed value is different,  object1.equals(object) will be `false` 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] ningyougang commented on a change in pull request #5061: [New Scheduler] Implement InvokerHealthyManager

Posted by GitBox <gi...@apache.org>.
ningyougang commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r572519550



##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import java.time.Instant
+
+import akka.actor.ActorRef
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.containerpool.Container
+import org.apache.openwhisk.core.entity.{ByteSize, CodeExec, DocRevision, ExecutableWhiskAction}
+import org.apache.openwhisk.core.entity.size._
+
+// Events received by the actor
+case class Initialize(invocationNamespace: String,
+                      action: ExecutableWhiskAction,
+                      schedulerHost: String,
+                      rpcPort: Int,
+                      transId: TransactionId)
+case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
+
+// Event sent by the actor
+case class ContainerCreationFailed(throwable: Throwable)
+case class ContainerIsPaused(data: WarmData)
+case class ClientCreationFailed(throwable: Throwable,
+                                container: Container,
+                                invocationNamespace: String,
+                                action: ExecutableWhiskAction)
+case class ReadyToWork(data: Data)
+case class Initialized(data: InitializedData)
+case class Resumed(data: WarmData)
+case class ResumeFailed(data: WarmData)
+case class RecreateClient(action: ExecutableWhiskAction)
+case object ContainerRemoved // when container is destroyed
+
+// States
+sealed trait ProxyState
+case object LeaseStart extends ProxyState
+case object Uninitialized extends ProxyState
+case object ContainerCreating extends ProxyState

Review comment:
       Updated accordingly.

##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import java.time.Instant
+
+import akka.actor.ActorRef
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.containerpool.Container
+import org.apache.openwhisk.core.entity.{ByteSize, CodeExec, DocRevision, ExecutableWhiskAction}
+import org.apache.openwhisk.core.entity.size._
+
+// Events received by the actor
+case class Initialize(invocationNamespace: String,
+                      action: ExecutableWhiskAction,
+                      schedulerHost: String,
+                      rpcPort: Int,
+                      transId: TransactionId)
+case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
+
+// Event sent by the actor
+case class ContainerCreationFailed(throwable: Throwable)
+case class ContainerIsPaused(data: WarmData)
+case class ClientCreationFailed(throwable: Throwable,
+                                container: Container,
+                                invocationNamespace: String,
+                                action: ExecutableWhiskAction)
+case class ReadyToWork(data: Data)
+case class Initialized(data: InitializedData)
+case class Resumed(data: WarmData)
+case class ResumeFailed(data: WarmData)
+case class RecreateClient(action: ExecutableWhiskAction)
+case object ContainerRemoved // when container is destroyed
+
+// States
+sealed trait ProxyState
+case object LeaseStart extends ProxyState
+case object Uninitialized extends ProxyState
+case object ContainerCreating extends ProxyState
+case object ContainerCreated extends ProxyState
+case object ClientCreating extends ProxyState

Review comment:
       Updated accordingly




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org