You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2019/09/09 05:32:20 UTC
[openwhisk] branch master updated: Pass an optional action instance
to ContainerFactory (#4595)
This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new d4d7ceb Pass an optional action instance to ContainerFactory (#4595)
d4d7ceb is described below
commit d4d7ceb627573670f70993d4e31dcf2a6737c3f8
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Mon Sep 9 11:02:07 2019 +0530
Pass an optional action instance to ContainerFactory (#4595)
---
.../core/containerpool/ContainerFactory.scala | 13 ++++++++++++-
.../core/containerpool/ContainerProxy.scala | 22 ++++++++++++++++++----
tests/src/test/scala/common/LoggedFunction.scala | 12 ++++++++++++
.../containerpool/test/ContainerProxyTests.scala | 4 ++--
4 files changed, 44 insertions(+), 7 deletions(-)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
index 921cabe..3f5f661 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
@@ -20,7 +20,7 @@ package org.apache.openwhisk.core.containerpool
import akka.actor.ActorSystem
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.WhiskConfig
-import org.apache.openwhisk.core.entity.{ByteSize, ExecManifest, InvokerInstanceId}
+import org.apache.openwhisk.core.entity.{ByteSize, ExecManifest, ExecutableWhiskAction, InvokerInstanceId}
import org.apache.openwhisk.spi.Spi
import scala.concurrent.Future
@@ -89,6 +89,17 @@ trait ContainerFactory {
* - It is desired that the container supports and enforces the specified memory limit and CPU shares.
* In particular, action memory limits rely on the underlying container technology.
*/
+ def createContainer(
+ tid: TransactionId,
+ name: String,
+ actionImage: ExecManifest.ImageName,
+ userProvidedImage: Boolean,
+ memory: ByteSize,
+ cpuShares: Int,
+ action: Option[ExecutableWhiskAction])(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
+ createContainer(tid, name, actionImage, userProvidedImage, memory, cpuShares)
+ }
+
def createContainer(tid: TransactionId,
name: String,
actionImage: ExecManifest.ImageName,
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index 06d630a..47b2532 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -214,7 +214,13 @@ case object RunCompleted
* @param pauseGrace time to wait for new work before pausing the container
*/
class ContainerProxy(
- factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container],
+ factory: (TransactionId,
+ String,
+ ImageName,
+ Boolean,
+ ByteSize,
+ Int,
+ Option[ExecutableWhiskAction]) => Future[Container],
sendActiveAck: ActiveAck,
storeActivation: (TransactionId, WhiskActivation, UserContext) => Future[Any],
collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
@@ -242,7 +248,8 @@ class ContainerProxy(
job.exec.image,
job.exec.pull,
job.memoryLimit,
- poolConfig.cpuShare(job.memoryLimit))
+ poolConfig.cpuShare(job.memoryLimit),
+ None)
.map(container => PreWarmCompleted(PreWarmedData(container, job.exec.kind, job.memoryLimit)))
.pipeTo(self)
@@ -259,7 +266,8 @@ class ContainerProxy(
job.action.exec.image,
job.action.exec.pull,
job.action.limits.memory.megabytes.MB,
- poolConfig.cpuShare(job.action.limits.memory.megabytes.MB))
+ poolConfig.cpuShare(job.action.limits.memory.megabytes.MB),
+ Some(job.action))
// container factory will either yield a new container ready to execute the action, or
// starting up the container failed; for the latter, it's either an internal error starting
@@ -684,7 +692,13 @@ final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration, paus
object ContainerProxy {
def props(
- factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container],
+ factory: (TransactionId,
+ String,
+ ImageName,
+ Boolean,
+ ByteSize,
+ Int,
+ Option[ExecutableWhiskAction]) => Future[Container],
ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any],
store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
diff --git a/tests/src/test/scala/common/LoggedFunction.scala b/tests/src/test/scala/common/LoggedFunction.scala
index 8b115de..b5bd896 100644
--- a/tests/src/test/scala/common/LoggedFunction.scala
+++ b/tests/src/test/scala/common/LoggedFunction.scala
@@ -82,6 +82,16 @@ class LoggedFunction6[A1, A2, A3, A4, A5, A6, B](body: (A1, A2, A3, A4, A5, A6)
}
}
+class LoggedFunction7[A1, A2, A3, A4, A5, A6, A7, B](body: (A1, A2, A3, A4, A5, A6, A7) => B)
+ extends Function7[A1, A2, A3, A4, A5, A6, A7, B] {
+ val calls = mutable.Buffer[(A1, A2, A3, A4, A5, A6, A7)]()
+
+ override def apply(v1: A1, v2: A2, v3: A3, v4: A4, v5: A5, v6: A6, v7: A7): B = {
+ calls += ((v1, v2, v3, v4, v5, v6, v7))
+ body(v1, v2, v3, v4, v5, v6, v7)
+ }
+}
+
class SynchronizedLoggedFunction1[A1, B](body: A1 => B) extends Function1[A1, B] {
val calls = mutable.Buffer[A1]()
@@ -145,6 +155,8 @@ object LoggedFunction {
def apply[A1, A2, A3, A4, A5, B](body: (A1, A2, A3, A4, A5) => B) = new LoggedFunction5[A1, A2, A3, A4, A5, B](body)
def apply[A1, A2, A3, A4, A5, A6, B](body: (A1, A2, A3, A4, A5, A6) => B) =
new LoggedFunction6[A1, A2, A3, A4, A5, A6, B](body)
+ def apply[A1, A2, A3, A4, A5, A6, A7, B](body: (A1, A2, A3, A4, A5, A6, A7) => B) =
+ new LoggedFunction7[A1, A2, A3, A4, A5, A6, A7, B](body)
}
object SynchronizedLoggedFunction {
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index 111e00c..21833cb 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -179,7 +179,7 @@ class ContainerProxyTests
/** Creates an inspectable factory */
def createFactory(response: Future[Container]) = LoggedFunction {
- (_: TransactionId, _: String, _: ImageName, _: Boolean, _: ByteSize, _: Int) =>
+ (_: TransactionId, _: String, _: ImageName, _: Boolean, _: ByteSize, _: Int, _: Option[ExecutableWhiskAction]) =>
response
}
@@ -242,7 +242,7 @@ class ContainerProxyTests
preWarm(machine)
factory.calls should have size 1
- val (tid, name, _, _, memory, cpuShares) = factory.calls(0)
+ val (tid, name, _, _, memory, cpuShares, _) = factory.calls(0)
tid shouldBe TransactionId.invokerWarmup
name should fullyMatch regex """wskmyname\d+_\d+_prewarm_actionKind"""
memory shouldBe memoryLimit