You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2017/07/28 14:05:44 UTC

[incubator-openwhisk] branch master updated: Add Invoker instnace ID in invokerReative (#2507)

This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 33f3ac8  Add Invoker instnace ID in invokerReative (#2507)
33f3ac8 is described below

commit 33f3ac828a2f756b299b4a1223b87b1c00aa3b15
Author: Keunseob Kim <ke...@nate.com>
AuthorDate: Fri Jul 28 23:05:42 2017 +0900

    Add Invoker instnace ID in invokerReative (#2507)
    
    need to use invoker instance id to create/delete container
    
    Signed-off-by: keunseob.kim <ke...@samsung.com>
---
 .../whisk/core/containerpool/ContainerProxy.scala  | 12 +++++-----
 .../scala/whisk/core/invoker/InvokerReactive.scala |  4 ++--
 .../containerpool/test/ContainerProxyTests.scala   | 26 +++++++++++-----------
 3 files changed, 22 insertions(+), 20 deletions(-)

diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index 652453d..7e0f698 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -92,6 +92,7 @@ class ContainerProxy(
     factory: (TransactionId, String, ImageName, Boolean, ByteSize) => Future[Container],
     sendActiveAck: (TransactionId, WhiskActivation, InstanceId) => Future[Any],
     storeActivation: (TransactionId, WhiskActivation) => Future[Any],
+    instance: InstanceId,
     unusedTimeout: FiniteDuration,
     pauseGrace: FiniteDuration) extends FSM[ContainerState, ContainerData] with Stash {
     implicit val ec = context.system.dispatcher
@@ -104,7 +105,7 @@ class ContainerProxy(
         case Event(job: Start, _) =>
             factory(
                 TransactionId.invokerWarmup,
-                ContainerProxy.containerName("prewarm", job.exec.kind),
+                ContainerProxy.containerName(instance, "prewarm", job.exec.kind),
                 job.exec.image,
                 job.exec.pull,
                 job.memoryLimit)
@@ -120,7 +121,7 @@ class ContainerProxy(
             // create a new container
             val container = factory(
                 job.msg.transid,
-                ContainerProxy.containerName(job.msg.user.namespace.name, job.action.name.name),
+                ContainerProxy.containerName(instance, job.msg.user.namespace.name, job.action.name.name),
                 job.action.exec.image,
                 job.action.exec.pull,
                 job.action.limits.memory.megabytes.MB)
@@ -369,8 +370,9 @@ object ContainerProxy {
     def props(factory: (TransactionId, String, ImageName, Boolean, ByteSize) => Future[Container],
               ack: (TransactionId, WhiskActivation, InstanceId) => Future[Any],
               store: (TransactionId, WhiskActivation) => Future[Any],
+              instance: InstanceId,
               unusedTimeout: FiniteDuration = 10.minutes,
-              pauseGrace: FiniteDuration = 50.milliseconds) = Props(new ContainerProxy(factory, ack, store, unusedTimeout, pauseGrace))
+              pauseGrace: FiniteDuration = 50.milliseconds) = Props(new ContainerProxy(factory, ack, store, instance, unusedTimeout, pauseGrace))
 
     // Needs to be thread-safe as it's used by multiple proxies concurrently.
     private val containerCount = new Counter
@@ -382,8 +384,8 @@ object ContainerProxy {
      * @param suffix the container name's suffix
      * @return a unique container name
      */
-    def containerName(prefix: String, suffix: String) =
-        s"wsk_${containerCount.next()}_${prefix}_${suffix}".replaceAll("[^a-zA-Z0-9_]", "")
+    def containerName(instance: InstanceId, prefix: String, suffix: String) =
+        s"wsk${instance.toInt}_${containerCount.next()}_${prefix}_${suffix}".replaceAll("[^a-zA-Z0-9_]", "")
 
     /**
      * Creates a WhiskActivation ready to be sent via active ack.
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index e406af9..3203594 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -72,7 +72,7 @@ class InvokerReactive(
 
     /** Cleans up all running wsk_ containers */
     def cleanup() = {
-        val cleaning = docker.ps(Seq("name" -> "wsk_"))(TransactionId.invokerNanny).flatMap { containers =>
+        val cleaning = docker.ps(Seq("name" -> s"wsk${instance.toInt}_"))(TransactionId.invokerNanny).flatMap { containers =>
             val removals = containers.map { id =>
                 runc.resume(id)(TransactionId.invokerNanny).recoverWith {
                     // Ignore resume failures and try to remove anyway
@@ -138,7 +138,7 @@ class InvokerReactive(
     }
 
     /** Creates a ContainerProxy Actor when being called. */
-    val childFactory = (f: ActorRefFactory) => f.actorOf(ContainerProxy.props(containerFactory, ack, store))
+    val childFactory = (f: ActorRefFactory) => f.actorOf(ContainerProxy.props(containerFactory, ack, store, instance))
 
     val prewarmKind = "nodejs:6"
     val prewarmExec = ExecManifest.runtimesManifest.resolveDefaultRuntime(prewarmKind).map { manifest =>
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index ab9c69e..7191590 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -148,14 +148,14 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
         val container = new TestContainer
         val factory = createFactory(Future.successful(container))
 
-        val machine = childActorOf(ContainerProxy.props(factory, createAcker, store, pauseGrace = timeout))
+        val machine = childActorOf(ContainerProxy.props(factory, createAcker, store, InstanceId(0), pauseGrace = timeout))
         registerCallback(machine)
         preWarm(machine)
 
         factory.calls should have size 1
         val (tid, name, _, _, memory) = factory.calls(0)
         tid shouldBe TransactionId.invokerWarmup
-        name should fullyMatch regex """wsk_\d+_prewarm_actionKind"""
+        name should fullyMatch regex """wsk\d+_\d+_prewarm_actionKind"""
         memory shouldBe memoryLimit
     }
 
@@ -164,7 +164,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
         val factory = createFactory(Future.successful(container))
         val acker = createAcker
 
-        val machine = childActorOf(ContainerProxy.props(factory, acker, store, pauseGrace = timeout))
+        val machine = childActorOf(ContainerProxy.props(factory, acker, store, InstanceId(0), pauseGrace = timeout))
         registerCallback(machine)
 
         preWarm(machine)
@@ -196,7 +196,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
         val factory = createFactory(Future.successful(container))
         val acker = createAcker
 
-        val machine = childActorOf(ContainerProxy.props(factory, acker, store, pauseGrace = timeout))
+        val machine = childActorOf(ContainerProxy.props(factory, acker, store, InstanceId(0), pauseGrace = timeout))
         registerCallback(machine)
         preWarm(machine)
 
@@ -220,7 +220,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
         val factory = createFactory(Future.successful(container))
         val acker = createAcker
 
-        val machine = childActorOf(ContainerProxy.props(factory, acker, store, pauseGrace = timeout))
+        val machine = childActorOf(ContainerProxy.props(factory, acker, store, InstanceId(0), pauseGrace = timeout))
         registerCallback(machine)
         preWarm(machine)
 
@@ -246,7 +246,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
         val factory = createFactory(Future.successful(container))
         val acker = createAcker
 
-        val machine = childActorOf(ContainerProxy.props(factory, acker, store, pauseGrace = timeout))
+        val machine = childActorOf(ContainerProxy.props(factory, acker, store, InstanceId(0), pauseGrace = timeout))
         registerCallback(machine)
         run(machine, Uninitialized)
 
@@ -268,7 +268,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
         val factory = createFactory(Future.failed(new Exception()))
         val acker = createAcker
 
-        val machine = childActorOf(ContainerProxy.props(factory, acker, store, pauseGrace = timeout))
+        val machine = childActorOf(ContainerProxy.props(factory, acker, store, InstanceId(0), pauseGrace = timeout))
         registerCallback(machine)
         machine ! Run(action, message)
         expectMsg(Transition(machine, Uninitialized, Running))
@@ -296,7 +296,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
         val factory = createFactory(Future.successful(container))
         val acker = createAcker
 
-        val machine = childActorOf(ContainerProxy.props(factory, acker, store, pauseGrace = timeout))
+        val machine = childActorOf(ContainerProxy.props(factory, acker, store, InstanceId(0), pauseGrace = timeout))
         registerCallback(machine)
         machine ! Run(action, message)
         expectMsg(Transition(machine, Uninitialized, Running))
@@ -324,7 +324,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
         val factory = createFactory(Future.successful(container))
         val acker = createAcker
 
-        val machine = childActorOf(ContainerProxy.props(factory, acker, store, pauseGrace = timeout))
+        val machine = childActorOf(ContainerProxy.props(factory, acker, store, InstanceId(0), pauseGrace = timeout))
         registerCallback(machine)
         machine ! Run(action, message)
         expectMsg(Transition(machine, Uninitialized, Running))
@@ -352,7 +352,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
         val factory = createFactory(Future.successful(container))
         val acker = createAcker
 
-        val machine = childActorOf(ContainerProxy.props(factory, acker, store, pauseGrace = timeout))
+        val machine = childActorOf(ContainerProxy.props(factory, acker, store, InstanceId(0), pauseGrace = timeout))
         registerCallback(machine)
         run(machine, Uninitialized) // first run an activation
         timeout(machine) // times out Ready state so container suspends
@@ -384,7 +384,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
         val factory = createFactory(Future.successful(container))
         val acker = createAcker
 
-        val machine = childActorOf(ContainerProxy.props(factory, acker, store, pauseGrace = timeout))
+        val machine = childActorOf(ContainerProxy.props(factory, acker, store, InstanceId(0), pauseGrace = timeout))
         registerCallback(machine)
         run(machine, Uninitialized)
         timeout(machine) // times out Ready state so container suspends
@@ -415,7 +415,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
         val factory = createFactory(Future.successful(container))
         val acker = createAcker
 
-        val machine = childActorOf(ContainerProxy.props(factory, acker, store, pauseGrace = timeout))
+        val machine = childActorOf(ContainerProxy.props(factory, acker, store, InstanceId(0), pauseGrace = timeout))
         registerCallback(machine)
 
         // Start running the action
@@ -462,7 +462,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
         val factory = createFactory(Future.successful(container))
         val acker = createAcker
 
-        val machine = childActorOf(ContainerProxy.props(factory, acker, store, pauseGrace = timeout))
+        val machine = childActorOf(ContainerProxy.props(factory, acker, store, InstanceId(0), pauseGrace = timeout))
         registerCallback(machine)
         run(machine, Uninitialized)
         timeout(machine)

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].