You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/01/03 13:38:09 UTC

[GitHub] dubeejw closed pull request #3132: Separate container removal from job completion.

dubeejw closed pull request #3132: Separate container removal from job completion.
URL: https://github.com/apache/incubator-openwhisk/pull/3132
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
index 5177026fed..fb51690a32 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
@@ -81,6 +81,10 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
 
   def receive: Receive = {
     // A job to run on a container
+    //
+    // Run messages are received either via the feed or from child containers which cannot process
+    // their requests and send them back to the pool for rescheduling (this may happen if "docker" operations
+    // fail for example, or a container has aged and was destroying itself when a new request was assigned)
     case r: Run =>
       val container = if (busyPool.size < maxActiveContainers) {
         // Schedule a job to a warm container
@@ -110,6 +114,9 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
           freePool = freePool - actor
           actor ! r // forwards the run request to the container
         case None =>
+          // this can also happen if createContainer fails to start a new container, or
+          // if a job is rescheduled but the container it was allocated to has not yet destroyed itself
+          // (and a new container would over commit the pool)
           logging.error(this, "Rescheduling Run message, too many message in the pool")(r.msg.transid)
           self ! r
       }
@@ -131,8 +138,18 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
       freePool = freePool - sender()
       busyPool.get(sender()).foreach { _ =>
         busyPool = busyPool - sender()
+        // container was busy, so there is capacity to accept another job request
         feed ! MessageFeed.Processed
       }
+
+    // This message is received for one of these reasons:
+    // 1. Container errored while resuming a warm container, could not process the job, and sent the job back
+    // 2. The container aged, is destroying itself, and was assigned a job which it had to send back
+    // 3. The container aged and is destroying itself
+    // Update the free/busy lists but no message is sent to the feed since there is no change in capacity yet
+    case RescheduleJob =>
+      freePool = freePool - sender()
+      busyPool = busyPool - sender()
   }
 
   /** Creates a new container and updates state accordingly. */
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index dd4336ed9f..c3f89c59e3 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -67,7 +67,8 @@ case object Remove
 // Events sent by the actor
 case class NeedWork(data: ContainerData)
 case object ContainerPaused
-case object ContainerRemoved
+case object ContainerRemoved // when container is destroyed
+case object RescheduleJob // job is sent back to parent and could not be processed because container is being destroyed
 
 /**
  * A proxy that wraps a Container. It is used to keep track of the lifecycle
@@ -101,6 +102,7 @@ class ContainerProxy(
     with Stash {
   implicit val ec = context.system.dispatcher
   implicit val logging = new AkkaLogging(context.system.log)
+  var rescheduleJob = false // true iff actor receives a job but cannot process it because actor will destroy itself
 
   startWith(Uninitialized, NoData())
 
@@ -248,7 +250,9 @@ class ContainerProxy(
           // Sending the message to self on a failure will cause the message
           // to ultimately be sent back to the parent (which will retry it)
           // when container removal is done.
-          case Failure(_) => self ! job
+          case Failure(_) =>
+            rescheduleJob = true
+            self ! job
         }
         .flatMap(_ => initializeAndRun(data.container, job))
         .map(_ => WarmedData(data.container, job.msg.user.namespace, job.action, Instant.now))
@@ -256,8 +260,10 @@ class ContainerProxy(
 
       goto(Running)
 
-    // timeout or removing
-    case Event(StateTimeout | Remove, data: WarmedData) => destroyContainer(data.container)
+    // container is reclaimed by the pool or it has become too old
+    case Event(StateTimeout | Remove, data: WarmedData) =>
+      rescheduleJob = true // to supress sending message to the pool and not double count
+      destroyContainer(data.container)
   }
 
   when(Removing) {
@@ -292,7 +298,11 @@ class ContainerProxy(
    * @param container the container to destroy
    */
   def destroyContainer(container: Container) = {
-    context.parent ! ContainerRemoved
+    if (!rescheduleJob) {
+      context.parent ! ContainerRemoved
+    } else {
+      context.parent ! RescheduleJob
+    }
 
     val unpause = stateName match {
       case Paused => container.resume()(TransactionId.invokerNanny)
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
index 25b17b63dd..159e50e950 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
@@ -202,6 +202,20 @@ class ContainerPoolTests
     containers(1).expectMsg(runMessageDifferentNamespace)
   }
 
+  it should "reschedule job when container is removed prematurely without sending message to feed" in within(timeout) {
+    val (containers, factory) = testContainers(2)
+    val feed = TestProbe()
+
+    // a pool with only 1 slot
+    val pool = system.actorOf(ContainerPool.props(factory, 1, 1, feed.ref))
+    pool ! runMessage
+    containers(0).expectMsg(runMessage)
+    containers(0).send(pool, RescheduleJob) // emulate container failure ...
+    containers(0).send(pool, runMessage) // ... causing job to be rescheduled
+    feed.expectNoMsg(100.millis)
+    containers(1).expectMsg(runMessage) // job resent to new actor
+  }
+
   /*
    * CONTAINER PREWARMING
    */
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index ac8da0f527..b278871d57 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -220,7 +220,7 @@ class ContainerProxyTests
 
     // Another pause causes the container to be removed
     timeout(machine)
-    expectMsg(ContainerRemoved)
+    expectMsg(RescheduleJob)
     expectMsg(Transition(machine, Paused, Removing))
 
     awaitAssert {
@@ -528,7 +528,7 @@ class ContainerProxyTests
     val runMessage = Run(action, message)
     machine ! runMessage
     expectMsg(Transition(machine, Paused, Running))
-    expectMsg(ContainerRemoved) // The message is sent as soon as the container decides to destroy itself
+    expectMsg(RescheduleJob)
     expectMsg(Transition(machine, Running, Removing))
     expectMsg(runMessage)
 
@@ -649,7 +649,7 @@ class ContainerProxyTests
     machine ! Run(action, message)
 
     // State-machine shuts down nonetheless.
-    expectMsg(ContainerRemoved)
+    expectMsg(RescheduleJob)
     expectMsg(Transition(machine, Paused, Removing))
 
     // Pool gets the message again.


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services