You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2019/09/09 05:32:20 UTC

[openwhisk] branch master updated: Pass an optional action instance to ContainerFactory (#4595)

This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new d4d7ceb  Pass an optional action instance to ContainerFactory (#4595)
d4d7ceb is described below

commit d4d7ceb627573670f70993d4e31dcf2a6737c3f8
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Mon Sep 9 11:02:07 2019 +0530

    Pass an optional action instance to ContainerFactory (#4595)
---
 .../core/containerpool/ContainerFactory.scala      | 13 ++++++++++++-
 .../core/containerpool/ContainerProxy.scala        | 22 ++++++++++++++++++----
 tests/src/test/scala/common/LoggedFunction.scala   | 12 ++++++++++++
 .../containerpool/test/ContainerProxyTests.scala   |  4 ++--
 4 files changed, 44 insertions(+), 7 deletions(-)

diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
index 921cabe..3f5f661 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
@@ -20,7 +20,7 @@ package org.apache.openwhisk.core.containerpool
 import akka.actor.ActorSystem
 import org.apache.openwhisk.common.{Logging, TransactionId}
 import org.apache.openwhisk.core.WhiskConfig
-import org.apache.openwhisk.core.entity.{ByteSize, ExecManifest, InvokerInstanceId}
+import org.apache.openwhisk.core.entity.{ByteSize, ExecManifest, ExecutableWhiskAction, InvokerInstanceId}
 import org.apache.openwhisk.spi.Spi
 
 import scala.concurrent.Future
@@ -89,6 +89,17 @@ trait ContainerFactory {
    * - It is desired that the container supports and enforces the specified memory limit and CPU shares.
    *   In particular, action memory limits rely on the underlying container technology.
    */
+  def createContainer(
+    tid: TransactionId,
+    name: String,
+    actionImage: ExecManifest.ImageName,
+    userProvidedImage: Boolean,
+    memory: ByteSize,
+    cpuShares: Int,
+    action: Option[ExecutableWhiskAction])(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
+    createContainer(tid, name, actionImage, userProvidedImage, memory, cpuShares)
+  }
+
   def createContainer(tid: TransactionId,
                       name: String,
                       actionImage: ExecManifest.ImageName,
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index 06d630a..47b2532 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -214,7 +214,13 @@ case object RunCompleted
  * @param pauseGrace time to wait for new work before pausing the container
  */
 class ContainerProxy(
-  factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container],
+  factory: (TransactionId,
+            String,
+            ImageName,
+            Boolean,
+            ByteSize,
+            Int,
+            Option[ExecutableWhiskAction]) => Future[Container],
   sendActiveAck: ActiveAck,
   storeActivation: (TransactionId, WhiskActivation, UserContext) => Future[Any],
   collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
@@ -242,7 +248,8 @@ class ContainerProxy(
         job.exec.image,
         job.exec.pull,
         job.memoryLimit,
-        poolConfig.cpuShare(job.memoryLimit))
+        poolConfig.cpuShare(job.memoryLimit),
+        None)
         .map(container => PreWarmCompleted(PreWarmedData(container, job.exec.kind, job.memoryLimit)))
         .pipeTo(self)
 
@@ -259,7 +266,8 @@ class ContainerProxy(
         job.action.exec.image,
         job.action.exec.pull,
         job.action.limits.memory.megabytes.MB,
-        poolConfig.cpuShare(job.action.limits.memory.megabytes.MB))
+        poolConfig.cpuShare(job.action.limits.memory.megabytes.MB),
+        Some(job.action))
 
       // container factory will either yield a new container ready to execute the action, or
       // starting up the container failed; for the latter, it's either an internal error starting
@@ -684,7 +692,13 @@ final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration, paus
 
 object ContainerProxy {
   def props(
-    factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container],
+    factory: (TransactionId,
+              String,
+              ImageName,
+              Boolean,
+              ByteSize,
+              Int,
+              Option[ExecutableWhiskAction]) => Future[Container],
     ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any],
     store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
     collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
diff --git a/tests/src/test/scala/common/LoggedFunction.scala b/tests/src/test/scala/common/LoggedFunction.scala
index 8b115de..b5bd896 100644
--- a/tests/src/test/scala/common/LoggedFunction.scala
+++ b/tests/src/test/scala/common/LoggedFunction.scala
@@ -82,6 +82,16 @@ class LoggedFunction6[A1, A2, A3, A4, A5, A6, B](body: (A1, A2, A3, A4, A5, A6)
   }
 }
 
+class LoggedFunction7[A1, A2, A3, A4, A5, A6, A7, B](body: (A1, A2, A3, A4, A5, A6, A7) => B)
+    extends Function7[A1, A2, A3, A4, A5, A6, A7, B] {
+  val calls = mutable.Buffer[(A1, A2, A3, A4, A5, A6, A7)]()
+
+  override def apply(v1: A1, v2: A2, v3: A3, v4: A4, v5: A5, v6: A6, v7: A7): B = {
+    calls += ((v1, v2, v3, v4, v5, v6, v7))
+    body(v1, v2, v3, v4, v5, v6, v7)
+  }
+}
+
 class SynchronizedLoggedFunction1[A1, B](body: A1 => B) extends Function1[A1, B] {
   val calls = mutable.Buffer[A1]()
 
@@ -145,6 +155,8 @@ object LoggedFunction {
   def apply[A1, A2, A3, A4, A5, B](body: (A1, A2, A3, A4, A5) => B) = new LoggedFunction5[A1, A2, A3, A4, A5, B](body)
   def apply[A1, A2, A3, A4, A5, A6, B](body: (A1, A2, A3, A4, A5, A6) => B) =
     new LoggedFunction6[A1, A2, A3, A4, A5, A6, B](body)
+  def apply[A1, A2, A3, A4, A5, A6, A7, B](body: (A1, A2, A3, A4, A5, A6, A7) => B) =
+    new LoggedFunction7[A1, A2, A3, A4, A5, A6, A7, B](body)
 }
 
 object SynchronizedLoggedFunction {
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index 111e00c..21833cb 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -179,7 +179,7 @@ class ContainerProxyTests
 
   /** Creates an inspectable factory */
   def createFactory(response: Future[Container]) = LoggedFunction {
-    (_: TransactionId, _: String, _: ImageName, _: Boolean, _: ByteSize, _: Int) =>
+    (_: TransactionId, _: String, _: ImageName, _: Boolean, _: ByteSize, _: Int, _: Option[ExecutableWhiskAction]) =>
       response
   }
 
@@ -242,7 +242,7 @@ class ContainerProxyTests
     preWarm(machine)
 
     factory.calls should have size 1
-    val (tid, name, _, _, memory, cpuShares) = factory.calls(0)
+    val (tid, name, _, _, memory, cpuShares, _) = factory.calls(0)
     tid shouldBe TransactionId.invokerWarmup
     name should fullyMatch regex """wskmyname\d+_\d+_prewarm_actionKind"""
     memory shouldBe memoryLimit