You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/12/11 12:38:42 UTC

[GitHub] cbickel closed pull request #4137: Remove busy loop in case the invoker is overloaded.

cbickel closed pull request #4137: Remove busy loop in case the invoker is overloaded.
URL: https://github.com/apache/incubator-openwhisk/pull/4137
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
index 81c542714f..4edbae59d9 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
@@ -18,7 +18,7 @@
 package org.apache.openwhisk.core.containerpool
 
 import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
-import org.apache.openwhisk.common.{AkkaLogging, LoggingMarkers, TransactionId}
+import org.apache.openwhisk.common.{AkkaLogging, Logging, LoggingMarkers, TransactionId}
 import org.apache.openwhisk.core.connector.MessageFeed
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.entity.size._
@@ -60,7 +60,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
     extends Actor {
   import ContainerPool.memoryConsumptionOf
 
-  implicit val logging = new AkkaLogging(context.system.log)
+  implicit val logging: Logging = new AkkaLogging(context.system.log)
 
   var freePool = immutable.Map.empty[ActorRef, ContainerData]
   var busyPool = immutable.Map.empty[ActorRef, ContainerData]
@@ -68,8 +68,8 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
   // If all memory slots are occupied and if there is currently no container to be removed, than the actions will be
   // buffered here to keep order of computation.
   // Otherwise actions with small memory-limits could block actions with large memory limits.
-  var runBuffer = immutable.Queue.empty[Run]
-  val logMessageInterval = 10.seconds
+  var runBuffer: immutable.Queue[Run] = immutable.Queue.empty[Run]
+  val logMessageInterval: FiniteDuration = 10.seconds
 
   prewarmConfig.foreach { config =>
     logging.info(this, s"pre-warming ${config.count} ${config.exec.kind} ${config.memoryLimit.toString}")(
@@ -99,13 +99,23 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
     // their requests and send them back to the pool for rescheduling (this may happen if "docker" operations
     // fail for example, or a container has aged and was destroying itself when a new request was assigned)
     case r: Run =>
-      // Check if the message is resent from the buffer. Only the first message on the buffer can be resent.
-      val isResentFromBuffer = runBuffer.nonEmpty && runBuffer.dequeueOption.exists(_._1.msg == r.msg)
+      // Checks if the current message is the first in the queue
+      val isFirstInQueue = runBuffer.dequeueOption.exists(_._1.msg == r.msg)
 
-      // Only process request, if there are no other requests waiting for free slots, or if the current request is the
-      // next request to process
-      // It is guaranteed, that only the first message on the buffer is resent.
-      if (runBuffer.isEmpty || isResentFromBuffer) {
+      if (!r.isFromQueue) {
+        // If the request is not from the queue, it will be added to the queue always.
+        runBuffer = runBuffer.enqueue(r)
+        runFirstMessageOfQueue()
+      } else if (r.isFromQueue && !isFirstInQueue) {
+        // The current message was already in buffer and it is not a resned form the container. But it is not the top most one of the buffer.
+        // This could happen because of a race condition of arriving messages in the actor.
+        // If there are two messages, that trigger the first message of the buffer to run, at the same time, the same
+        // Run-message will be sent twice. The first arriving Run-message will be handled correctly and removed from
+        // the buffer. The second one will end up here as it is not the first one in the buffer anymore. Instead we try
+        // to trigger the next Run-message.
+        runFirstMessageOfQueue()
+      } else {
+        // The current message is from the buffer and it is the top most one. So we will process it.
         val createdContainer =
           // Is there enough space on the invoker for this action to be executed.
           if (hasPoolSpaceFor(busyPool, r.action.limits.memory.megabytes.MB)) {
@@ -151,45 +161,28 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
               freePool = freePool - actor
             }
             // Remove the action that get's executed now from the buffer and execute the next one afterwards.
-            if (isResentFromBuffer) {
-              // It is guaranteed that the currently executed messages is the head of the queue, if the message comes
-              // from the buffer
-              val (_, newBuffer) = runBuffer.dequeue
-              runBuffer = newBuffer
-              runBuffer.dequeueOption.foreach { case (run, _) => self ! run }
-            }
+            // It is guaranteed that the currently executed messages is the head of the queue, if the message comes
+            // from the buffer
+            val (_, newBuffer) = runBuffer.dequeue
+            runBuffer = newBuffer
+            // Check if there is also enough space for the next action in buffer.
+            runFirstMessageOfQueue()
             actor ! r // forwards the run request to the container
             logContainerStart(r, containerState, data.activeActivationCount)
           case None =>
             // this can also happen if createContainer fails to start a new container, or
             // if a job is rescheduled but the container it was allocated to has not yet destroyed itself
             // (and a new container would over commit the pool)
-            val isErrorLogged = r.retryLogDeadline.map(_.isOverdue).getOrElse(true)
-            val retryLogDeadline = if (isErrorLogged) {
-              logging.error(
-                this,
-                s"Rescheduling Run message, too many message in the pool, " +
-                  s"freePoolSize: ${freePool.size} containers and ${memoryConsumptionOf(freePool)} MB, " +
-                  s"busyPoolSize: ${busyPool.size} containers and ${memoryConsumptionOf(busyPool)} MB, " +
-                  s"maxContainersMemory ${poolConfig.userMemory.toMB} MB, " +
-                  s"userNamespace: ${r.msg.user.namespace.name}, action: ${r.action}, " +
-                  s"needed memory: ${r.action.limits.memory.megabytes} MB, " +
-                  s"waiting messages: ${runBuffer.size}")(r.msg.transid)
-              Some(logMessageInterval.fromNow)
-            } else {
-              r.retryLogDeadline
-            }
-            if (!isResentFromBuffer) {
-              // Add this request to the buffer, as it is not there yet.
-              runBuffer = runBuffer.enqueue(r)
-            }
-            // As this request is the first one in the buffer, try again to execute it.
-            self ! Run(r.action, r.msg, retryLogDeadline)
+            logging.error(
+              this,
+              s"Rescheduling Run message, too many message in the pool, " +
+                s"freePoolSize: ${freePool.size} containers and ${memoryConsumptionOf(freePool)} MB, " +
+                s"busyPoolSize: ${busyPool.size} containers and ${memoryConsumptionOf(busyPool)} MB, " +
+                s"maxContainersMemory ${poolConfig.userMemory.toMB} MB, " +
+                s"userNamespace: ${r.msg.user.namespace.name}, action: ${r.action}, " +
+                s"needed memory: ${r.action.limits.memory.megabytes} MB, " +
+                s"waiting messages: ${runBuffer.size}")(r.msg.transid)
         }
-      } else {
-        // There are currently actions waiting to be executed before this action gets executed.
-        // These waiting actions were not able to free up enough memory.
-        runBuffer = runBuffer.enqueue(r)
       }
 
     // Container is free to take more work
@@ -200,6 +193,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
         freePool = freePool + (sender() -> data)
         if (busyPool.contains(sender())) {
           busyPool = busyPool - sender()
+          runFirstMessageOfQueue()
           if (data.action.limits.concurrency.maxConcurrent > 1) {
             logging.info(
               this,
@@ -242,6 +236,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
         busyPool = busyPool - sender()
         feed ! MessageFeed.Processed
       }
+      runFirstMessageOfQueue()
 
     // This message is received for one of these reasons:
     // 1. Container errored while resuming a warm container, could not process the job, and sent the job back
@@ -253,6 +248,16 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
       busyPool = busyPool - sender()
   }
 
+  /** Send the first run-Message of the queue again if there is enough space in the busypool now. */
+  def runFirstMessageOfQueue(): Unit = {
+    runBuffer.dequeueOption.foreach {
+      case (runMessage, _) =>
+        if (hasPoolSpaceFor(busyPool, runMessage.action.limits.memory.megabytes.MB)) {
+          self ! runMessage.copy(isFromQueue = true)
+        }
+    }
+  }
+
   /** Creates a new container and updates state accordingly. */
   def createContainer(memoryLimit: ByteSize): (ActorRef, ContainerData) = {
     val ref = childFactory(context)
@@ -294,7 +299,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
   }
 
   /** Removes a container and updates state accordingly. */
-  def removeContainer(toDelete: ActorRef) = {
+  def removeContainer(toDelete: ActorRef): Unit = {
     toDelete ! Remove
     freePool = freePool - toDelete
     busyPool = busyPool - toDelete
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index 0fdea5eb6e..23ba1ed212 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -23,12 +23,7 @@ import akka.actor.Status.{Failure => FailureMessage}
 import akka.actor.{FSM, Props, Stash}
 import akka.event.Logging.InfoLevel
 import akka.pattern.pipe
-import pureconfig.loadConfigOrThrow
-
-import scala.collection.immutable
-import spray.json.DefaultJsonProtocol._
-import spray.json._
-import org.apache.openwhisk.common.{AkkaLogging, Counter, LoggingMarkers, TransactionId}
+import org.apache.openwhisk.common._
 import org.apache.openwhisk.core.ConfigKeys
 import org.apache.openwhisk.core.connector.ActivationMessage
 import org.apache.openwhisk.core.containerpool.logging.LogCollectingException
@@ -38,9 +33,13 @@ import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.core.invoker.InvokerReactive.ActiveAck
 import org.apache.openwhisk.http.Messages
+import pureconfig.loadConfigOrThrow
+import spray.json.DefaultJsonProtocol._
+import spray.json._
 
-import scala.concurrent.Future
+import scala.collection.immutable
 import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
 import scala.util.{Failure, Success}
 
 // States
@@ -79,7 +78,9 @@ case class WarmedData(container: Container,
 
 // Events received by the actor
 case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
-case class Run(action: ExecutableWhiskAction, msg: ActivationMessage, retryLogDeadline: Option[Deadline] = None)
+case class Run(action: ExecutableWhiskAction, msg: ActivationMessage, isFromQueue: Boolean = false) {
+  def fromQueue = copy(isFromQueue = true)
+}
 case object Remove
 
 // Events sent by the actor
@@ -138,10 +139,12 @@ class ContainerProxy(
   pauseGrace: FiniteDuration)
     extends FSM[ContainerState, ContainerData]
     with Stash {
-  implicit val ec = context.system.dispatcher
-  implicit val logging = new AkkaLogging(context.system.log)
-  var rescheduleJob = false // true iff actor receives a job but cannot process it because actor will destroy itself
-  var runBuffer = immutable.Queue.empty[Run] //does not retain order, but does manage jobs that would have pushed past action concurrency limit
+  implicit val ec: ExecutionContext = context.system.dispatcher
+  implicit val logging: Logging = new AkkaLogging(context.system.log)
+  var rescheduleJob
+    : Boolean = false // true iff actor receives a job but cannot process it because actor will destroy itself
+  var runBuffer
+    : immutable.Queue[Run] = immutable.Queue.empty[Run] //does not retain order, but does manage jobs that would have pushed past action concurrency limit
   startWith(Uninitialized, NoData())
 
   when(Uninitialized) {
@@ -161,7 +164,7 @@ class ContainerProxy(
 
     // cold start (no container to reuse or available stem cell container)
     case Event(job: Run, _) =>
-      implicit val transid = job.msg.transid
+      implicit val transid: TransactionId = job.msg.transid
 
       // create a new container
       val container = factory(
@@ -269,7 +272,7 @@ class ContainerProxy(
     case Event(job: Run, data: WarmedData)
         if stateData.activeActivationCount < data.action.limits.concurrency.maxConcurrent && !rescheduleJob => //if there was a delay, and not a failure on resume, skip the run
 
-      implicit val transid = job.msg.transid
+      implicit val transid: TransactionId = job.msg.transid
       val newData = data.incrementActive
 
       initializeAndRun(data.container, job)
@@ -294,7 +297,7 @@ class ContainerProxy(
 
   when(Ready, stateTimeout = pauseGrace) {
     case Event(job: Run, data: WarmedData) =>
-      implicit val transid = job.msg.transid
+      implicit val transid: TransactionId = job.msg.transid
       val newData = data.incrementActive
 
       initializeAndRun(data.container, job)
@@ -346,8 +349,10 @@ class ContainerProxy(
 
   when(Removing) {
     case Event(job: Run, _) =>
-      // Send the job back to the pool to be rescheduled
-      context.parent ! job
+      // Send the job back to the pool to be rescheduled. As we already removed it from the queue, it needs to be added
+      // back, to be processed. Otherwise it is not guaranteed, that the currently processed message is the first
+      // one in the queue. But this may cause a change in order of execution.
+      context.parent ! job.copy(isFromQueue = false)
       stay
     case Event(ContainerRemoved, _)  => stop()
     case Event(_: FailureMessage, _) => stop()
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
index 5e65e4c08e..318c265138 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
@@ -134,11 +134,11 @@ class ContainerPoolTests
     val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref))
 
     pool ! runMessage
-    containers(0).expectMsg(runMessage)
+    containers(0).expectMsg(runMessage.fromQueue)
     containers(0).send(pool, NeedWork(warmedData()))
 
     pool ! runMessage
-    containers(0).expectMsg(runMessage)
+    containers(0).expectMsg(runMessage.fromQueue)
     containers(1).expectNoMessage(100.milliseconds)
   }
 
@@ -149,11 +149,11 @@ class ContainerPoolTests
     val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref))
 
     pool ! runMessage
-    containers(0).expectMsg(runMessage)
+    containers(0).expectMsg(runMessage.fromQueue)
     containers(0).send(pool, NeedWork(warmedData()))
 
     pool ! runMessageDifferentVersion
-    containers(0).expectMsg(runMessageDifferentVersion)
+    containers(0).expectMsg(runMessageDifferentVersion.fromQueue)
     containers(1).expectNoMessage(100.milliseconds)
   }
 
@@ -164,10 +164,10 @@ class ContainerPoolTests
     // Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled.
     val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref))
     pool ! runMessage
-    containers(0).expectMsg(runMessage)
+    containers(0).expectMsg(runMessage.fromQueue)
     // Note that the container doesn't respond, thus it's not free to take work
     pool ! runMessage
-    containers(1).expectMsg(runMessage)
+    containers(1).expectMsg(runMessage.fromQueue)
   }
 
   it should "remove a container to make space in the pool if it is already full and a different action arrives" in within(
@@ -178,12 +178,12 @@ class ContainerPoolTests
     // a pool with only 1 slot
     val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory), feed.ref))
     pool ! runMessage
-    containers(0).expectMsg(runMessage)
+    containers(0).expectMsg(runMessage.fromQueue)
     containers(0).send(pool, NeedWork(warmedData()))
     feed.expectMsg(MessageFeed.Processed)
     pool ! runMessageDifferentEverything
     containers(0).expectMsg(Remove)
-    containers(1).expectMsg(runMessageDifferentEverything)
+    containers(1).expectMsg(runMessageDifferentEverything.fromQueue)
   }
 
   it should "remove several containers to make space in the pool if it is already full and a different large action arrives" in within(
@@ -194,9 +194,9 @@ class ContainerPoolTests
     // a pool with slots for 2 actions with default memory limit.
     val pool = system.actorOf(ContainerPool.props(factory, poolConfig(512.MB), feed.ref))
     pool ! runMessage
-    containers(0).expectMsg(runMessage)
+    containers(0).expectMsg(runMessage.fromQueue)
     pool ! runMessageDifferentAction // 2 * stdMemory taken -> full
-    containers(1).expectMsg(runMessageDifferentAction)
+    containers(1).expectMsg(runMessageDifferentAction.fromQueue)
 
     containers(0).send(pool, NeedWork(warmedData())) // first action finished -> 1 * stdMemory taken
     feed.expectMsg(MessageFeed.Processed)
@@ -206,7 +206,7 @@ class ContainerPoolTests
     pool ! runMessageLarge // need to remove both action to make space for the large action (needs 2 * stdMemory)
     containers(0).expectMsg(Remove)
     containers(1).expectMsg(Remove)
-    containers(2).expectMsg(runMessageLarge)
+    containers(2).expectMsg(runMessageLarge.fromQueue)
   }
 
   it should "cache a container if there is still space in the pool" in within(timeout) {
@@ -218,17 +218,17 @@ class ContainerPoolTests
 
     // Run the first container
     pool ! runMessage
-    containers(0).expectMsg(runMessage)
+    containers(0).expectMsg(runMessage.fromQueue)
     containers(0).send(pool, NeedWork(warmedData(lastUsed = Instant.EPOCH)))
     feed.expectMsg(MessageFeed.Processed)
 
     // Run the second container, don't remove the first one
     pool ! runMessageDifferentEverything
-    containers(1).expectMsg(runMessageDifferentEverything)
+    containers(1).expectMsg(runMessageDifferentEverything.fromQueue)
     containers(1).send(pool, NeedWork(warmedData(lastUsed = Instant.now)))
     feed.expectMsg(MessageFeed.Processed)
     pool ! runMessageDifferentNamespace
-    containers(2).expectMsg(runMessageDifferentNamespace)
+    containers(2).expectMsg(runMessageDifferentNamespace.fromQueue)
 
     // 2 Slots exhausted, remove the first container to make space
     containers(0).expectMsg(Remove)
@@ -242,12 +242,12 @@ class ContainerPoolTests
     // a pool with only 1 slot
     val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory), feed.ref))
     pool ! runMessage
-    containers(0).expectMsg(runMessage)
+    containers(0).expectMsg(runMessage.fromQueue)
     containers(0).send(pool, NeedWork(warmedData()))
     feed.expectMsg(MessageFeed.Processed)
     pool ! runMessageDifferentNamespace
     containers(0).expectMsg(Remove)
-    containers(1).expectMsg(runMessageDifferentNamespace)
+    containers(1).expectMsg(runMessageDifferentNamespace.fromQueue)
   }
 
   it should "reschedule job when container is removed prematurely without sending message to feed" in within(timeout) {
@@ -257,11 +257,11 @@ class ContainerPoolTests
     // a pool with only 1 slot
     val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory), feed.ref))
     pool ! runMessage
-    containers(0).expectMsg(runMessage)
+    containers(0).expectMsg(runMessage.fromQueue)
     containers(0).send(pool, RescheduleJob) // emulate container failure ...
     containers(0).send(pool, runMessage) // ... causing job to be rescheduled
     feed.expectNoMessage(100.millis)
-    containers(1).expectMsg(runMessage) // job resent to new actor
+    containers(1).expectMsg(runMessage.fromQueue) // job resent to new actor
   }
 
   it should "not start a new container if there is not enough space in the pool" in within(timeout) {
@@ -272,7 +272,7 @@ class ContainerPoolTests
 
     // Start first action
     pool ! runMessage // 1 * stdMemory taken
-    containers(0).expectMsg(runMessage)
+    containers(0).expectMsg(runMessage.fromQueue)
 
     // Send second action to the pool
     pool ! runMessageLarge // message is too large to be processed immediately.
@@ -283,10 +283,7 @@ class ContainerPoolTests
     feed.expectMsg(MessageFeed.Processed)
 
     // Second action should run now
-    containers(1).expectMsgPF() {
-      // The `Some` assures, that it has been retried while the first action was still blocking the invoker.
-      case Run(runMessageLarge.action, runMessageLarge.msg, Some(_)) => true
-    }
+    containers(1).expectMsg(runMessageLarge.fromQueue)
 
     containers(1).send(pool, NeedWork(warmedData()))
     feed.expectMsg(MessageFeed.Processed)
@@ -336,7 +333,7 @@ class ContainerPoolTests
     containers(0).expectMsg(Start(alternativeExec, memoryLimit)) // container0 was prewarmed
     containers(0).send(pool, NeedWork(preWarmedData(alternativeExec.kind)))
     pool ! runMessage
-    containers(1).expectMsg(runMessage) // but container1 is used
+    containers(1).expectMsg(runMessage.fromQueue) // but container1 is used
   }
 
   it should "not use a prewarmed container if it doesn't fit memory wise" in within(timeout) {
@@ -351,7 +348,7 @@ class ContainerPoolTests
     containers(0).expectMsg(Start(exec, alternativeLimit)) // container0 was prewarmed
     containers(0).send(pool, NeedWork(preWarmedData(exec.kind, alternativeLimit)))
     pool ! runMessage
-    containers(1).expectMsg(runMessage) // but container1 is used
+    containers(1).expectMsg(runMessage.fromQueue) // but container1 is used
   }
 
   /*
@@ -365,12 +362,12 @@ class ContainerPoolTests
 
     // container0 is created and used
     pool ! runMessage
-    containers(0).expectMsg(runMessage)
+    containers(0).expectMsg(runMessage.fromQueue)
     containers(0).send(pool, NeedWork(warmedData()))
 
     // container0 is reused
     pool ! runMessage
-    containers(0).expectMsg(runMessage)
+    containers(0).expectMsg(runMessage.fromQueue)
     containers(0).send(pool, NeedWork(warmedData()))
 
     // container0 is deleted
@@ -378,7 +375,7 @@ class ContainerPoolTests
 
     // container1 is created and used
     pool ! runMessage
-    containers(1).expectMsg(runMessage)
+    containers(1).expectMsg(runMessage.fromQueue)
   }
 
   /*
@@ -395,7 +392,7 @@ class ContainerPoolTests
 
     // Send action that blocks the pool
     pool ! runMessageLarge
-    containers(0).expectMsg(runMessageLarge)
+    containers(0).expectMsg(runMessageLarge.fromQueue)
 
     // Send action that should be written to the queue and retried in invoker
     pool ! runMessage
@@ -410,12 +407,10 @@ class ContainerPoolTests
     feed.expectMsg(MessageFeed.Processed)
 
     // Action 1 should start immediately
-    containers(0).expectMsgPF() {
-      // The `Some` assures, that it has been retried while the first action was still blocking the invoker.
-      case Run(runMessage.action, runMessage.msg, Some(_)) => true
-    }
+    containers(0).expectMsg(runMessage.fromQueue)
+
     // Action 2 should start immediately as well (without any retries, as there is already enough space in the pool)
-    containers(1).expectMsg(runMessageDifferentAction)
+    containers(1).expectMsg(runMessageDifferentAction.fromQueue)
   }
 
   it should "process activations in the order they are arriving" in within(timeout) {
@@ -427,7 +422,7 @@ class ContainerPoolTests
 
     // Send 4 actions to the ContainerPool (Action 0, Action 2 and Action 3 with each 265 MB and Action 1 with 512 MB)
     pool ! runMessage
-    containers(0).expectMsg(runMessage)
+    containers(0).expectMsg(runMessage.fromQueue)
     pool ! runMessageLarge
     containers(1).expectNoMessage(100.milliseconds)
     pool ! runMessageDifferentNamespace
@@ -438,10 +433,8 @@ class ContainerPoolTests
     // Action 0 ist finished -> Large action should be executed now
     containers(0).send(pool, NeedWork(warmedData()))
     feed.expectMsg(MessageFeed.Processed)
-    containers(1).expectMsgPF() {
-      // The `Some` assures, that it has been retried while the first action was still blocking the invoker.
-      case Run(runMessageLarge.action, runMessageLarge.msg, Some(_)) => true
-    }
+    // This message should have waited in the pool
+    containers(1).expectMsg(runMessageLarge.fromQueue)
 
     // Send another action to the container pool, that would fit memory-wise
     pool ! runMessageDifferentEverything
@@ -450,20 +443,15 @@ class ContainerPoolTests
     // Action 1 is finished -> Action 2 and Action 3 should be executed now
     containers(1).send(pool, NeedWork(warmedData()))
     feed.expectMsg(MessageFeed.Processed)
-    containers(2).expectMsgPF() {
-      // The `Some` assures, that it has been retried while the first action was still blocking the invoker.
-      case Run(runMessageDifferentNamespace.action, runMessageDifferentNamespace.msg, Some(_)) => true
-    }
-    // Assert retryLogline = false to check if this request has been stored in the queue instead of retrying in the system
-    containers(3).expectMsg(runMessageDifferentAction)
+    containers(2).expectMsg(runMessageDifferentNamespace.fromQueue)
+
+    containers(3).expectMsg(runMessageDifferentAction.fromQueue)
 
     // Action 3 is finished -> Action 4 should start
     containers(3).send(pool, NeedWork(warmedData()))
     feed.expectMsg(MessageFeed.Processed)
-    containers(4).expectMsgPF() {
-      // The `Some` assures, that it has been retried while the first action was still blocking the invoker.
-      case Run(runMessageDifferentEverything.action, runMessageDifferentEverything.msg, Some(_)) => true
-    }
+
+    containers(4).expectMsg(runMessageDifferentEverything.fromQueue)
 
     // Action 2 and 4 are finished
     containers(2).send(pool, NeedWork(warmedData()))
@@ -471,6 +459,54 @@ class ContainerPoolTests
     containers(4).send(pool, NeedWork(warmedData()))
     feed.expectMsg(MessageFeed.Processed)
   }
+
+  it should "handle run messages correctly during race conditions" in within(timeout) {
+    val amountOfContainers = 5
+    val (containers, factory) = testContainers(amountOfContainers)
+    val feed = TestProbe()
+
+    // Pool with 1024 MB usermemory
+    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref))
+
+    // Feel each slot with a new activation
+    (0 until 4).map { i =>
+      val rm = createRunMessage(action, invocationNamespace)
+      pool ! rm
+      containers(i).expectMsg(rm.fromQueue)
+    }
+
+    // Add two more run messages that should be written into buffer
+    pool ! runMessage
+    pool ! runMessageDifferentAction
+
+    // Retrigger the runmessage should not cause something unexpected, because all slots are full anyway.
+    pool ! runMessage.fromQueue
+    feed.expectNoMessage(100.milliseconds)
+    (0 until amountOfContainers).par.foreach(i => containers(i).expectNoMessage(100.milliseconds))
+
+    // Free up one slot. First runmessage in queue should be retriggered.
+    containers(0).send(pool, NeedWork(warmedData()))
+    feed.expectMsg(MessageFeed.Processed)
+    containers(0).expectMsg(runMessage.fromQueue)
+
+    // Retrigger the runmessage again. E.g. by two NeedWork-messages arriving at the same time (At the same time means,
+    // that the second NeedWork-message arrives before the Run-message, that is generated by the first NeedWork-message).
+    // This should not trigger the same action again.
+    pool ! runMessage.fromQueue
+    feed.expectNoMessage(100.milliseconds)
+    (0 until amountOfContainers).par.foreach(i => containers(i).expectNoMessage(100.milliseconds))
+
+    // Free up one slot again. This should process the second message of the queue.
+    containers(1).send(pool, NeedWork(warmedData()))
+    feed.expectMsg(MessageFeed.Processed)
+    containers(1).expectMsg(Remove)
+    containers(4).expectMsg(runMessageDifferentAction.fromQueue)
+
+    // After finishing the next slot, no more actions should be started, because the queue is empty.
+    containers(2).send(pool, NeedWork(warmedData()))
+    feed.expectMsg(MessageFeed.Processed)
+    (0 until amountOfContainers).par.foreach(i => containers(i).expectNoMessage(100.milliseconds))
+  }
 }
 
 /**


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services