You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2017/07/28 14:05:44 UTC
[incubator-openwhisk] branch master updated: Add Invoker instnace
ID in invokerReative (#2507)
This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 33f3ac8 Add Invoker instnace ID in invokerReative (#2507)
33f3ac8 is described below
commit 33f3ac828a2f756b299b4a1223b87b1c00aa3b15
Author: Keunseob Kim <ke...@nate.com>
AuthorDate: Fri Jul 28 23:05:42 2017 +0900
Add Invoker instnace ID in invokerReative (#2507)
need to use invoker instance id to create/delete container
Signed-off-by: keunseob.kim <ke...@samsung.com>
---
.../whisk/core/containerpool/ContainerProxy.scala | 12 +++++-----
.../scala/whisk/core/invoker/InvokerReactive.scala | 4 ++--
.../containerpool/test/ContainerProxyTests.scala | 26 +++++++++++-----------
3 files changed, 22 insertions(+), 20 deletions(-)
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index 652453d..7e0f698 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -92,6 +92,7 @@ class ContainerProxy(
factory: (TransactionId, String, ImageName, Boolean, ByteSize) => Future[Container],
sendActiveAck: (TransactionId, WhiskActivation, InstanceId) => Future[Any],
storeActivation: (TransactionId, WhiskActivation) => Future[Any],
+ instance: InstanceId,
unusedTimeout: FiniteDuration,
pauseGrace: FiniteDuration) extends FSM[ContainerState, ContainerData] with Stash {
implicit val ec = context.system.dispatcher
@@ -104,7 +105,7 @@ class ContainerProxy(
case Event(job: Start, _) =>
factory(
TransactionId.invokerWarmup,
- ContainerProxy.containerName("prewarm", job.exec.kind),
+ ContainerProxy.containerName(instance, "prewarm", job.exec.kind),
job.exec.image,
job.exec.pull,
job.memoryLimit)
@@ -120,7 +121,7 @@ class ContainerProxy(
// create a new container
val container = factory(
job.msg.transid,
- ContainerProxy.containerName(job.msg.user.namespace.name, job.action.name.name),
+ ContainerProxy.containerName(instance, job.msg.user.namespace.name, job.action.name.name),
job.action.exec.image,
job.action.exec.pull,
job.action.limits.memory.megabytes.MB)
@@ -369,8 +370,9 @@ object ContainerProxy {
def props(factory: (TransactionId, String, ImageName, Boolean, ByteSize) => Future[Container],
ack: (TransactionId, WhiskActivation, InstanceId) => Future[Any],
store: (TransactionId, WhiskActivation) => Future[Any],
+ instance: InstanceId,
unusedTimeout: FiniteDuration = 10.minutes,
- pauseGrace: FiniteDuration = 50.milliseconds) = Props(new ContainerProxy(factory, ack, store, unusedTimeout, pauseGrace))
+ pauseGrace: FiniteDuration = 50.milliseconds) = Props(new ContainerProxy(factory, ack, store, instance, unusedTimeout, pauseGrace))
// Needs to be thread-safe as it's used by multiple proxies concurrently.
private val containerCount = new Counter
@@ -382,8 +384,8 @@ object ContainerProxy {
* @param suffix the container name's suffix
* @return a unique container name
*/
- def containerName(prefix: String, suffix: String) =
- s"wsk_${containerCount.next()}_${prefix}_${suffix}".replaceAll("[^a-zA-Z0-9_]", "")
+ def containerName(instance: InstanceId, prefix: String, suffix: String) =
+ s"wsk${instance.toInt}_${containerCount.next()}_${prefix}_${suffix}".replaceAll("[^a-zA-Z0-9_]", "")
/**
* Creates a WhiskActivation ready to be sent via active ack.
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index e406af9..3203594 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -72,7 +72,7 @@ class InvokerReactive(
/** Cleans up all running wsk_ containers */
def cleanup() = {
- val cleaning = docker.ps(Seq("name" -> "wsk_"))(TransactionId.invokerNanny).flatMap { containers =>
+ val cleaning = docker.ps(Seq("name" -> s"wsk${instance.toInt}_"))(TransactionId.invokerNanny).flatMap { containers =>
val removals = containers.map { id =>
runc.resume(id)(TransactionId.invokerNanny).recoverWith {
// Ignore resume failures and try to remove anyway
@@ -138,7 +138,7 @@ class InvokerReactive(
}
/** Creates a ContainerProxy Actor when being called. */
- val childFactory = (f: ActorRefFactory) => f.actorOf(ContainerProxy.props(containerFactory, ack, store))
+ val childFactory = (f: ActorRefFactory) => f.actorOf(ContainerProxy.props(containerFactory, ack, store, instance))
val prewarmKind = "nodejs:6"
val prewarmExec = ExecManifest.runtimesManifest.resolveDefaultRuntime(prewarmKind).map { manifest =>
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index ab9c69e..7191590 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -148,14 +148,14 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
val container = new TestContainer
val factory = createFactory(Future.successful(container))
- val machine = childActorOf(ContainerProxy.props(factory, createAcker, store, pauseGrace = timeout))
+ val machine = childActorOf(ContainerProxy.props(factory, createAcker, store, InstanceId(0), pauseGrace = timeout))
registerCallback(machine)
preWarm(machine)
factory.calls should have size 1
val (tid, name, _, _, memory) = factory.calls(0)
tid shouldBe TransactionId.invokerWarmup
- name should fullyMatch regex """wsk_\d+_prewarm_actionKind"""
+ name should fullyMatch regex """wsk\d+_\d+_prewarm_actionKind"""
memory shouldBe memoryLimit
}
@@ -164,7 +164,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
val factory = createFactory(Future.successful(container))
val acker = createAcker
- val machine = childActorOf(ContainerProxy.props(factory, acker, store, pauseGrace = timeout))
+ val machine = childActorOf(ContainerProxy.props(factory, acker, store, InstanceId(0), pauseGrace = timeout))
registerCallback(machine)
preWarm(machine)
@@ -196,7 +196,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
val factory = createFactory(Future.successful(container))
val acker = createAcker
- val machine = childActorOf(ContainerProxy.props(factory, acker, store, pauseGrace = timeout))
+ val machine = childActorOf(ContainerProxy.props(factory, acker, store, InstanceId(0), pauseGrace = timeout))
registerCallback(machine)
preWarm(machine)
@@ -220,7 +220,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
val factory = createFactory(Future.successful(container))
val acker = createAcker
- val machine = childActorOf(ContainerProxy.props(factory, acker, store, pauseGrace = timeout))
+ val machine = childActorOf(ContainerProxy.props(factory, acker, store, InstanceId(0), pauseGrace = timeout))
registerCallback(machine)
preWarm(machine)
@@ -246,7 +246,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
val factory = createFactory(Future.successful(container))
val acker = createAcker
- val machine = childActorOf(ContainerProxy.props(factory, acker, store, pauseGrace = timeout))
+ val machine = childActorOf(ContainerProxy.props(factory, acker, store, InstanceId(0), pauseGrace = timeout))
registerCallback(machine)
run(machine, Uninitialized)
@@ -268,7 +268,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
val factory = createFactory(Future.failed(new Exception()))
val acker = createAcker
- val machine = childActorOf(ContainerProxy.props(factory, acker, store, pauseGrace = timeout))
+ val machine = childActorOf(ContainerProxy.props(factory, acker, store, InstanceId(0), pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
@@ -296,7 +296,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
val factory = createFactory(Future.successful(container))
val acker = createAcker
- val machine = childActorOf(ContainerProxy.props(factory, acker, store, pauseGrace = timeout))
+ val machine = childActorOf(ContainerProxy.props(factory, acker, store, InstanceId(0), pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
@@ -324,7 +324,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
val factory = createFactory(Future.successful(container))
val acker = createAcker
- val machine = childActorOf(ContainerProxy.props(factory, acker, store, pauseGrace = timeout))
+ val machine = childActorOf(ContainerProxy.props(factory, acker, store, InstanceId(0), pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
@@ -352,7 +352,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
val factory = createFactory(Future.successful(container))
val acker = createAcker
- val machine = childActorOf(ContainerProxy.props(factory, acker, store, pauseGrace = timeout))
+ val machine = childActorOf(ContainerProxy.props(factory, acker, store, InstanceId(0), pauseGrace = timeout))
registerCallback(machine)
run(machine, Uninitialized) // first run an activation
timeout(machine) // times out Ready state so container suspends
@@ -384,7 +384,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
val factory = createFactory(Future.successful(container))
val acker = createAcker
- val machine = childActorOf(ContainerProxy.props(factory, acker, store, pauseGrace = timeout))
+ val machine = childActorOf(ContainerProxy.props(factory, acker, store, InstanceId(0), pauseGrace = timeout))
registerCallback(machine)
run(machine, Uninitialized)
timeout(machine) // times out Ready state so container suspends
@@ -415,7 +415,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
val factory = createFactory(Future.successful(container))
val acker = createAcker
- val machine = childActorOf(ContainerProxy.props(factory, acker, store, pauseGrace = timeout))
+ val machine = childActorOf(ContainerProxy.props(factory, acker, store, InstanceId(0), pauseGrace = timeout))
registerCallback(machine)
// Start running the action
@@ -462,7 +462,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
val factory = createFactory(Future.successful(container))
val acker = createAcker
- val machine = childActorOf(ContainerProxy.props(factory, acker, store, pauseGrace = timeout))
+ val machine = childActorOf(ContainerProxy.props(factory, acker, store, InstanceId(0), pauseGrace = timeout))
registerCallback(machine)
run(machine, Uninitialized)
timeout(machine)
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].