You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2017/09/25 16:50:07 UTC
[incubator-openwhisk] branch master updated: ContainerFactory SPI
(#2659)
This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new be43ad9 ContainerFactory SPI (#2659)
be43ad9 is described below
commit be43ad90e0dce26c7fa76573605324be22817b30
Author: tysonnorris <ty...@gmail.com>
AuthorDate: Mon Sep 25 09:50:04 2017 -0700
ContainerFactory SPI (#2659)
---
common/scala/src/main/resources/reference.conf | 1 +
.../scala/whisk/core/containerpool/Container.scala | 199 +++++++++++++++++++++
.../core/containerpool/ContainerFactory.scala | 59 ++++++
.../whisk/core/containerpool}/HttpUtils.scala | 4 +-
.../scala/whisk/core/containerpool/Container.scala | 83 ---------
.../whisk/core/containerpool/ContainerProxy.scala | 2 +-
.../core/containerpool/docker/DockerClient.scala | 17 +-
.../docker/DockerClientWithFileAccess.scala | 10 +-
.../containerpool/docker/DockerContainer.scala | 163 +++--------------
.../docker/DockerContainerFactory.scala | 101 +++++++++++
.../core/containerpool/docker/RuncClient.scala | 1 +
.../scala/whisk/core/invoker/InvokerReactive.scala | 72 ++------
.../scala/actionContainers/ActionContainer.scala | 2 +-
.../docker/test/ContainerConnectionTests.scala | 4 +-
.../docker/test/DockerClientTests.scala | 8 +-
.../test/DockerClientWithFileAccessTests.scala | 14 +-
.../docker/test/DockerContainerTests.scala | 52 ++++--
.../docker/test/RuncClientTests.scala | 2 +-
.../containerpool/test/ContainerProxyTests.scala | 22 ++-
19 files changed, 481 insertions(+), 335 deletions(-)
diff --git a/common/scala/src/main/resources/reference.conf b/common/scala/src/main/resources/reference.conf
index 52f30c3..50a36a5 100644
--- a/common/scala/src/main/resources/reference.conf
+++ b/common/scala/src/main/resources/reference.conf
@@ -1,4 +1,5 @@
whisk.spi{
ArtifactStoreProvider = whisk.core.database.CouchDbStoreProvider
MessagingProvider = whisk.connector.kafka.KafkaMessagingProvider
+ ContainerFactoryProvider = whisk.core.containerpool.docker.DockerContainerFactoryProvider
}
\ No newline at end of file
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
new file mode 100644
index 0000000..0cecbe6
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool
+
+import java.time.Instant
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.duration._
+import scala.util.Failure
+import scala.util.Success
+import spray.json.JsObject
+import spray.json.DefaultJsonProtocol._
+import whisk.common.Logging
+import whisk.common.LoggingMarkers
+import whisk.common.TransactionId
+import whisk.core.entity.ActivationResponse
+import whisk.core.entity.ActivationResponse.ContainerConnectionError
+import whisk.core.entity.ActivationResponse.ContainerResponse
+import whisk.core.entity.ByteSize
+import whisk.core.entity.size._
+import whisk.http.Messages
+
+/**
+ * An OpenWhisk biased container abstraction. This is **not only** an abstraction
+ * for different container providers, but the implementation also needs to include
+ * OpenWhisk specific behavior, especially for initialize and run.
+ */
+case class ContainerId(val asString: String) {
+ require(asString.nonEmpty, "ContainerId must not be empty")
+}
+case class ContainerAddress(val host: String, val port: Int = 8080) {
+ require(host.nonEmpty, "ContainerIp must not be empty")
+}
+
+trait Container {
+
+ protected val id: ContainerId
+ protected val addr: ContainerAddress
+ protected implicit val logging: Logging
+ protected implicit val ec: ExecutionContext
+
+ /** HTTP connection to the container, will be lazily established by callContainer */
+ private var httpConnection: Option[HttpUtils] = None
+
+ /** Stops the container from consuming CPU cycles. */
+ def suspend()(implicit transid: TransactionId): Future[Unit]
+
+ /** Dual of halt. */
+ def resume()(implicit transid: TransactionId): Future[Unit]
+
+ /** Obtains logs up to a given threshold from the container. Optionally waits for a sentinel to appear. */
+ def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Future[Vector[String]]
+
+ /** Completely destroys this instance of the container. */
+ def destroy()(implicit transid: TransactionId): Future[Unit] = {
+ Future.successful(httpConnection.foreach(_.close()))
+ }
+
+ /** Initializes code in the container. */
+ def initialize(initializer: JsObject, timeout: FiniteDuration)(implicit transid: TransactionId): Future[Interval] = {
+ val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION_INIT, s"sending initialization to $id $addr")
+
+ val body = JsObject("value" -> initializer)
+ callContainer("/init", body, timeout, retry = true)
+ .andThen { // never fails
+ case Success(r: RunResult) =>
+ transid.finished(
+ this,
+ start.copy(start = r.interval.start),
+ s"initialization result: ${r.toBriefString}",
+ endTime = r.interval.end)
+ case Failure(t) =>
+ transid.failed(this, start, s"initializiation failed with $t")
+ }
+ .flatMap { result =>
+ if (result.ok) {
+ Future.successful(result.interval)
+ } else if (result.interval.duration >= timeout) {
+ Future.failed(
+ InitializationError(
+ result.interval,
+ ActivationResponse.applicationError(Messages.timedoutActivation(timeout, true))))
+ } else {
+ Future.failed(
+ InitializationError(
+ result.interval,
+ ActivationResponse.processInitResponseContent(result.response, logging)))
+ }
+ }
+ }
+
+ /** Runs code in the container. */
+ def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration)(
+ implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
+ val actionName = environment.fields.get("action_name").map(_.convertTo[String]).getOrElse("")
+ val start =
+ transid.started(this, LoggingMarkers.INVOKER_ACTIVATION_RUN, s"sending arguments to $actionName at $id $addr")
+
+ val parameterWrapper = JsObject("value" -> parameters)
+ val body = JsObject(parameterWrapper.fields ++ environment.fields)
+ callContainer("/run", body, timeout, retry = false)
+ .andThen { // never fails
+ case Success(r: RunResult) =>
+ transid.finished(
+ this,
+ start.copy(start = r.interval.start),
+ s"running result: ${r.toBriefString}",
+ endTime = r.interval.end)
+ case Failure(t) =>
+ transid.failed(this, start, s"run failed with $t")
+ }
+ .map { result =>
+ val response = if (result.interval.duration >= timeout) {
+ ActivationResponse.applicationError(Messages.timedoutActivation(timeout, false))
+ } else {
+ ActivationResponse.processRunResponseContent(result.response, logging)
+ }
+
+ (result.interval, response)
+ }
+ }
+
+ /**
+ * Makes an HTTP request to the container.
+ *
+ * Note that `http.post` will not throw an exception, hence the generated Future cannot fail.
+ *
+ * @param path relative path to use in the http request
+ * @param body body to send
+ * @param timeout timeout of the request
+ * @param retry whether or not to retry the request
+ */
+ protected def callContainer(path: String,
+ body: JsObject,
+ timeout: FiniteDuration,
+ retry: Boolean = false): Future[RunResult] = {
+ val started = Instant.now()
+ val http = httpConnection.getOrElse {
+ val conn = new HttpUtils(s"${addr.host}:${addr.port}", timeout, 1.MB)
+ httpConnection = Some(conn)
+ conn
+ }
+ Future {
+ http.post(path, body, retry)
+ }.map { response =>
+ val finished = Instant.now()
+ RunResult(Interval(started, finished), response)
+ }
+ }
+}
+
+/** Indicates a general error with the container */
+sealed abstract class ContainerError(msg: String) extends Exception(msg)
+
+/** Indicates an error while starting a container */
+sealed abstract class ContainerStartupError(msg: String) extends ContainerError(msg)
+
+/** Indicates any error while starting a container either of a managed runtime or a non-application-specific blackbox container */
+case class WhiskContainerStartupError(msg: String) extends ContainerStartupError(msg)
+
+/** Indicates an application-specific error while starting a blackbox container */
+case class BlackboxStartupError(msg: String) extends ContainerStartupError(msg)
+
+/** Indicates an error while initializing a container */
+case class InitializationError(interval: Interval, response: ActivationResponse) extends Exception(response.toString)
+
+case class Interval(start: Instant, end: Instant) {
+ def duration = Duration.create(end.toEpochMilli() - start.toEpochMilli(), MILLISECONDS)
+}
+
+case class RunResult(interval: Interval, response: Either[ContainerConnectionError, ContainerResponse]) {
+ def ok = response.right.exists(_.ok)
+ def toBriefString = response.fold(_.toString, _.toString)
+}
+object Interval {
+
+ /** An interval starting now with zero duration. */
+ def zero = {
+ val now = Instant.now
+ Interval(now, now)
+ }
+}
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
new file mode 100644
index 0000000..fb04f0f
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool
+
+import akka.actor.ActorSystem
+import scala.concurrent.Future
+import whisk.common.Logging
+import whisk.common.TransactionId
+import whisk.core.WhiskConfig
+import whisk.core.entity.ByteSize
+import whisk.core.entity.ExecManifest
+import whisk.core.entity.InstanceId
+import whisk.spi.Spi
+
+/**
+ * An abstraction for Container creation
+ */
+trait ContainerFactory {
+
+ /** create a new Container */
+ def createContainer(tid: TransactionId,
+ name: String,
+ actionImage: ExecManifest.ImageName,
+ userProvidedImage: Boolean,
+ memory: ByteSize)(implicit config: WhiskConfig, logging: Logging): Future[Container]
+
+ /** perform any initialization */
+ def init(): Unit
+
+ /** cleanup any remaining Containers; should block until complete; should ONLY be run at startup/shutdown */
+ def cleanup(): Unit
+}
+
+/**
+ * An SPI for ContainerFactory creation
+ * All impls should use the parameters specified as additional args to "docker run" commands
+ */
+trait ContainerFactoryProvider extends Spi {
+ def getContainerFactory(actorSystem: ActorSystem,
+ logging: Logging,
+ config: WhiskConfig,
+ instance: InstanceId,
+ parameters: Map[String, Set[String]]): ContainerFactory
+}
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/HttpUtils.scala b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
similarity index 98%
rename from core/invoker/src/main/scala/whisk/core/containerpool/docker/HttpUtils.scala
rename to common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
index a6b53a7..2815068 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/HttpUtils.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
@@ -15,12 +15,10 @@
* limitations under the License.
*/
-package whisk.core.containerpool.docker
+package whisk.core.containerpool
import java.nio.charset.StandardCharsets
-import scala.Left
-import scala.Right
import scala.concurrent.duration.DurationInt
import scala.concurrent.duration.FiniteDuration
import scala.util.Failure
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/Container.scala b/core/invoker/src/main/scala/whisk/core/containerpool/Container.scala
deleted file mode 100644
index bffc75e..0000000
--- a/core/invoker/src/main/scala/whisk/core/containerpool/Container.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.containerpool
-
-import scala.concurrent.Future
-import scala.concurrent.duration.FiniteDuration
-
-import spray.json.JsObject
-import whisk.common.TransactionId
-import whisk.core.entity.ActivationResponse
-import whisk.core.entity.ByteSize
-import java.time.Instant
-import scala.concurrent.duration._
-
-/**
- * An OpenWhisk biased container abstraction. This is **not only** an abstraction
- * for different container providers, but the implementation also needs to include
- * OpenWhisk specific behavior, especially for initialize and run.
- */
-trait Container {
-
- /** Stops the container from consuming CPU cycles. */
- def suspend()(implicit transid: TransactionId): Future[Unit]
-
- /** Dual of halt. */
- def resume()(implicit transid: TransactionId): Future[Unit]
-
- /** Completely destroys this instance of the container. */
- def destroy()(implicit transid: TransactionId): Future[Unit]
-
- /** Initializes code in the container. */
- def initialize(initializer: JsObject, timeout: FiniteDuration)(implicit transid: TransactionId): Future[Interval]
-
- /** Runs code in the container. */
- def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration)(
- implicit transid: TransactionId): Future[(Interval, ActivationResponse)]
-
- /** Obtains logs up to a given threshold from the container. Optionally waits for a sentinel to appear. */
- def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Future[Vector[String]]
-}
-
-/** Indicates a general error with the container */
-sealed abstract class ContainerError(msg: String) extends Exception(msg)
-
-/** Indicates an error while starting a container */
-sealed abstract class ContainerStartupError(msg: String) extends ContainerError(msg)
-
-/** Indicates any error while starting a container either of a managed runtime or a non-application-specific blackbox container */
-case class WhiskContainerStartupError(msg: String) extends ContainerStartupError(msg)
-
-/** Indicates an application-specific error while starting a blackbox container */
-case class BlackboxStartupError(msg: String) extends ContainerStartupError(msg)
-
-/** Indicates an error while initializing a container */
-case class InitializationError(interval: Interval, response: ActivationResponse) extends Exception(response.toString)
-
-case class Interval(start: Instant, end: Instant) {
- def duration = Duration.create(end.toEpochMilli() - start.toEpochMilli(), MILLISECONDS)
-}
-
-object Interval {
-
- /** An interval starting now with zero duration. */
- def zero = {
- val now = Instant.now
- Interval(now, now)
- }
-}
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index da66e5a..a9d3f82 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -100,7 +100,7 @@ class ContainerProxy(factory: (TransactionId, String, ImageName, Boolean, ByteSi
extends FSM[ContainerState, ContainerData]
with Stash {
implicit val ec = context.system.dispatcher
- val logging = new AkkaLogging(context.system.log)
+ implicit val logging = new AkkaLogging(context.system.log)
startWith(Uninitialized, NoData())
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
index 915e7f6..28b5b7a 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
@@ -20,18 +20,18 @@ package whisk.core.containerpool.docker
import java.io.FileNotFoundException
import java.nio.file.Files
import java.nio.file.Paths
-
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success
import scala.util.Try
-
import akka.event.Logging.ErrorLevel
import whisk.common.Logging
import whisk.common.LoggingMarkers
import whisk.common.TransactionId
import scala.collection.concurrent.TrieMap
+import whisk.core.containerpool.ContainerId
+import whisk.core.containerpool.ContainerAddress
/**
* Serves as interface to the docker CLI tool.
@@ -64,11 +64,11 @@ class DockerClient(dockerHost: Option[String] = None)(executionContext: Executio
def run(image: String, args: Seq[String] = Seq.empty[String])(implicit transid: TransactionId): Future[ContainerId] =
runCmd((Seq("run", "-d") ++ args ++ Seq(image)): _*).map(ContainerId.apply)
- def inspectIPAddress(id: ContainerId, network: String)(implicit transid: TransactionId): Future[ContainerIp] =
+ def inspectIPAddress(id: ContainerId, network: String)(implicit transid: TransactionId): Future[ContainerAddress] =
runCmd("inspect", "--format", s"{{.NetworkSettings.Networks.${network}.IPAddress}}", id.asString).flatMap {
_ match {
case "<no value>" => Future.failed(new NoSuchElementException)
- case stdout => Future.successful(ContainerIp(stdout))
+ case stdout => Future.successful(ContainerAddress(stdout))
}
}
@@ -110,13 +110,6 @@ class DockerClient(dockerHost: Option[String] = None)(executionContext: Executio
}
}
-case class ContainerId(val asString: String) {
- require(asString.nonEmpty, "ContainerId must not be empty")
-}
-case class ContainerIp(val asString: String) {
- require(asString.nonEmpty, "ContainerIp must not be empty")
-}
-
trait DockerApi {
/**
@@ -139,7 +132,7 @@ trait DockerApi {
* @param network name of the network to get the IP address from
* @return ip of the container
*/
- def inspectIPAddress(id: ContainerId, network: String)(implicit transid: TransactionId): Future[ContainerIp]
+ def inspectIPAddress(id: ContainerId, network: String)(implicit transid: TransactionId): Future[ContainerAddress]
/**
* Pauses the container with the given id.
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala
index 0407346..444c365 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala
@@ -22,16 +22,16 @@ import java.io.FileInputStream
import java.io.IOException
import java.nio.ByteBuffer
import java.nio.file.Paths
-
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.blocking
import scala.io.Source
-
import spray.json.DefaultJsonProtocol._
import spray.json._
import whisk.common.Logging
import whisk.common.TransactionId
+import whisk.core.containerpool.ContainerId
+import whisk.core.containerpool.ContainerAddress
class DockerClientWithFileAccess(
dockerHost: Option[String] = None,
@@ -113,18 +113,18 @@ class DockerClientWithFileAccess(
* @param network name of the network to get the IP address from
* @return the ip address of the container
*/
- protected def ipAddressFromFile(id: ContainerId, network: String): Future[ContainerIp] = {
+ protected def ipAddressFromFile(id: ContainerId, network: String): Future[ContainerAddress] = {
configFileContents(containerConfigFile(id)).map { json =>
val networks = json.fields("NetworkSettings").asJsObject.fields("Networks").asJsObject
val specifiedNetwork = networks.fields(network).asJsObject
val ipAddr = specifiedNetwork.fields("IPAddress")
- ContainerIp(ipAddr.convertTo[String])
+ ContainerAddress(ipAddr.convertTo[String])
}
}
// See extended trait for description
override def inspectIPAddress(id: ContainerId, network: String)(
- implicit transid: TransactionId): Future[ContainerIp] = {
+ implicit transid: TransactionId): Future[ContainerAddress] = {
ipAddressFromFile(id, network).recoverWith {
case _ => super.inspectIPAddress(id, network)
}
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
index 7e07247..a63c304 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
@@ -18,7 +18,6 @@
package whisk.core.containerpool.docker
import java.nio.charset.StandardCharsets
-import java.time.Instant
import akka.actor.ActorSystem
@@ -26,23 +25,15 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Failure
-import scala.util.Success
-import spray.json._
-import spray.json.DefaultJsonProtocol._
import whisk.common.Logging
-import whisk.common.LoggingMarkers
import whisk.common.TransactionId
-import whisk.core.containerpool.Interval
import whisk.core.containerpool.BlackboxStartupError
import whisk.core.containerpool.Container
-import whisk.core.containerpool.InitializationError
+import whisk.core.containerpool.ContainerId
+import whisk.core.containerpool.ContainerAddress
import whisk.core.containerpool.WhiskContainerStartupError
-import whisk.core.entity.ActivationResponse
import whisk.core.entity.ByteSize
import whisk.core.entity.size._
-import whisk.http.Messages
-import whisk.core.entity.ActivationResponse.ContainerConnectionError
-import whisk.core.entity.ActivationResponse.ContainerResponse
object DockerContainer {
@@ -69,28 +60,23 @@ object DockerContainer {
environment: Map[String, String] = Map(),
network: String = "bridge",
dnsServers: Seq[String] = Seq(),
- name: Option[String] = None)(implicit docker: DockerApiWithFileAccess,
- runc: RuncApi,
- as: ActorSystem,
- ec: ExecutionContext,
- log: Logging): Future[DockerContainer] = {
+ name: Option[String] = None,
+ dockerRunParameters: Map[String, Set[String]])(implicit docker: DockerApiWithFileAccess,
+ runc: RuncApi,
+ as: ActorSystem,
+ ec: ExecutionContext,
+ log: Logging): Future[DockerContainer] = {
implicit val tid = transid
- val environmentArgs = environment.map {
+ val environmentArgs = environment.flatMap {
case (key, value) => Seq("-e", s"$key=$value")
- }.flatten
+ }
- val dnsArgs = dnsServers.map(Seq("--dns", _)).flatten
+ val params = dockerRunParameters.flatMap {
+ case (key, valueList) => valueList.toList.flatMap(Seq(key, _))
+ }
val args = Seq(
- "--cap-drop",
- "NET_RAW",
- "--cap-drop",
- "NET_ADMIN",
- "--ulimit",
- "nofile=1024:1024",
- "--pids-limit",
- "1024",
"--cpu-shares",
cpuShares.toString,
"--memory",
@@ -99,10 +85,9 @@ object DockerContainer {
s"${memory.toMB}m",
"--network",
network) ++
- dnsArgs ++
environmentArgs ++
- name.map(n => Seq("--name", n)).getOrElse(Seq.empty)
-
+ name.map(n => Seq("--name", n)).getOrElse(Seq.empty) ++
+ params
val pulled = if (userProvidedImage) {
docker.pull(image).recoverWith {
case _ => Future.failed(BlackboxStartupError(s"Failed to pull container image '${image}'."))
@@ -133,13 +118,14 @@ object DockerContainer {
*
* @constructor
* @param id the id of the container
- * @param ip the ip of the container
+ * @param addr the ip of the container
*/
-class DockerContainer(id: ContainerId, ip: ContainerIp)(implicit docker: DockerApiWithFileAccess,
- runc: RuncApi,
- as: ActorSystem,
- ec: ExecutionContext,
- logger: Logging)
+class DockerContainer(protected val id: ContainerId, protected val addr: ContainerAddress)(
+ implicit docker: DockerApiWithFileAccess,
+ runc: RuncApi,
+ as: ActorSystem,
+ protected val ec: ExecutionContext,
+ protected val logging: Logging)
extends Container
with DockerActionLogDriver {
@@ -149,78 +135,13 @@ class DockerContainer(id: ContainerId, ip: ContainerIp)(implicit docker: DockerA
protected val logsRetryCount = 15
protected val logsRetryWait = 100.millis
- /** HTTP connection to the container, will be lazily established by callContainer */
- private var httpConnection: Option[HttpUtils] = None
-
def suspend()(implicit transid: TransactionId): Future[Unit] = runc.pause(id)
def resume()(implicit transid: TransactionId): Future[Unit] = runc.resume(id)
- def destroy()(implicit transid: TransactionId): Future[Unit] = {
- httpConnection.foreach(_.close())
+ override def destroy()(implicit transid: TransactionId): Future[Unit] = {
+ super.destroy()
docker.rm(id)
}
- def initialize(initializer: JsObject, timeout: FiniteDuration)(implicit transid: TransactionId): Future[Interval] = {
- val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION_INIT, s"sending initialization to $id $ip")
-
- val body = JsObject("value" -> initializer)
- callContainer("/init", body, timeout, retry = true)
- .andThen { // never fails
- case Success(r: RunResult) =>
- transid.finished(
- this,
- start.copy(start = r.interval.start),
- s"initialization result: ${r.toBriefString}",
- endTime = r.interval.end)
- case Failure(t) =>
- transid.failed(this, start, s"initializiation failed with $t")
- }
- .flatMap { result =>
- if (result.ok) {
- Future.successful(result.interval)
- } else if (result.interval.duration >= timeout) {
- Future.failed(
- InitializationError(
- result.interval,
- ActivationResponse.applicationError(Messages.timedoutActivation(timeout, true))))
- } else {
- Future.failed(
- InitializationError(
- result.interval,
- ActivationResponse.processInitResponseContent(result.response, logger)))
- }
- }
- }
-
- def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration)(
- implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
- val actionName = environment.fields.get("action_name").map(_.convertTo[String]).getOrElse("")
- val start =
- transid.started(this, LoggingMarkers.INVOKER_ACTIVATION_RUN, s"sending arguments to $actionName at $id $ip")
-
- val parameterWrapper = JsObject("value" -> parameters)
- val body = JsObject(parameterWrapper.fields ++ environment.fields)
- callContainer("/run", body, timeout, retry = false)
- .andThen { // never fails
- case Success(r: RunResult) =>
- transid.finished(
- this,
- start.copy(start = r.interval.start),
- s"running result: ${r.toBriefString}",
- endTime = r.interval.end)
- case Failure(t) =>
- transid.failed(this, start, s"run failed with $t")
- }
- .map { result =>
- val response = if (result.interval.duration >= timeout) {
- ActivationResponse.applicationError(Messages.timedoutActivation(timeout, false))
- } else {
- ActivationResponse.processRunResponseContent(result.response, logger)
- }
-
- (result.interval, response)
- }
- }
-
/**
* Obtains the container's stdout and stderr output and converts it to our own JSON format.
* At the moment, this is done by reading the internal Docker log file for the container.
@@ -254,7 +175,7 @@ class DockerContainer(id: ContainerId, ip: ContainerIp)(implicit docker: DockerA
val (isComplete, isTruncated, formattedLogs) = processJsonDriverLogContents(rawLog, waitForSentinel, limit)
if (retries > 0 && !isComplete && !isTruncated) {
- logger.info(this, s"log cursor advanced but missing sentinel, trying $retries more times")
+ logging.info(this, s"log cursor advanced but missing sentinel, trying $retries more times")
akka.pattern.after(logsRetryWait, as.scheduler)(readLogs(retries - 1))
} else {
logFileOffset += rawLogBytes.position - rawLogBytes.arrayOffset
@@ -263,43 +184,11 @@ class DockerContainer(id: ContainerId, ip: ContainerIp)(implicit docker: DockerA
}
.andThen {
case Failure(e) =>
- logger.error(this, s"Failed to obtain logs of ${id.asString}: ${e.getClass} - ${e.getMessage}")
+ logging.error(this, s"Failed to obtain logs of ${id.asString}: ${e.getClass} - ${e.getMessage}")
}
}
readLogs(logsRetryCount)
}
- /**
- * Makes an HTTP request to the container.
- *
- * Note that `http.post` will not throw an exception, hence the generated Future cannot fail.
- *
- * @param path relative path to use in the http request
- * @param body body to send
- * @param timeout timeout of the request
- * @param retry whether or not to retry the request
- */
- protected def callContainer(path: String,
- body: JsObject,
- timeout: FiniteDuration,
- retry: Boolean = false): Future[RunResult] = {
- val started = Instant.now()
- val http = httpConnection.getOrElse {
- val conn = new HttpUtils(s"${ip.asString}:8080", timeout, 1.MB)
- httpConnection = Some(conn)
- conn
- }
- Future {
- http.post(path, body, retry)
- }.map { response =>
- val finished = Instant.now()
- RunResult(Interval(started, finished), response)
- }
- }
-}
-
-case class RunResult(interval: Interval, response: Either[ContainerConnectionError, ContainerResponse]) {
- def ok = response.right.exists(_.ok)
- def toBriefString = response.fold(_.toString, _.toString)
}
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
new file mode 100644
index 0000000..61e9b7b
--- /dev/null
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.docker
+
+import akka.actor.ActorSystem
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import whisk.common.Logging
+import whisk.common.TransactionId
+import whisk.core.WhiskConfig
+import whisk.core.containerpool.Container
+import whisk.core.containerpool.ContainerFactory
+import whisk.core.containerpool.ContainerFactoryProvider
+import whisk.core.entity.ByteSize
+import whisk.core.entity.ExecManifest
+import whisk.core.entity.InstanceId
+import scala.concurrent.duration._
+
+class DockerContainerFactory(config: WhiskConfig, instance: InstanceId, parameters: Map[String, Set[String]])(
+ implicit actorSystem: ActorSystem,
+ ec: ExecutionContext,
+ logging: Logging)
+ extends ContainerFactory {
+
+ /** Initialize container clients */
+ implicit val docker = new DockerClientWithFileAccess()(ec)
+ implicit val runc = new RuncClient(ec)
+
+ /** Create a container using docker cli */
+ override def createContainer(tid: TransactionId,
+ name: String,
+ actionImage: ExecManifest.ImageName,
+ userProvidedImage: Boolean,
+ memory: ByteSize)(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
+ val image = if (userProvidedImage) {
+ actionImage.publicImageName
+ } else {
+ actionImage.localImageName(config.dockerRegistry, config.dockerImagePrefix, Some(config.dockerImageTag))
+ }
+
+ DockerContainer.create(
+ tid,
+ image = image,
+ userProvidedImage = userProvidedImage,
+ memory = memory,
+ cpuShares = config.invokerCoreShare.toInt,
+ environment = Map("__OW_API_HOST" -> config.wskApiHost),
+ network = config.invokerContainerNetwork,
+ dnsServers = config.invokerContainerDns,
+ name = Some(name),
+ parameters)
+ }
+
+ /** Perform cleanup on init */
+ override def init(): Unit = cleanup()
+
+ /** Cleans up all running wsk_ containers */
+ override def cleanup(): Unit = {
+ val cleaning = docker.ps(Seq("name" -> s"wsk${instance.toInt}_"))(TransactionId.invokerNanny).flatMap {
+ containers =>
+ val removals = containers.map { id =>
+ runc
+ .resume(id)(TransactionId.invokerNanny)
+ .recoverWith {
+ // Ignore resume failures and try to remove anyway
+ case _ => Future.successful(())
+ }
+ .flatMap { _ =>
+ docker.rm(id)(TransactionId.invokerNanny)
+ }
+ }
+ Future.sequence(removals)
+ }
+ Await.ready(cleaning, 30.seconds)
+ }
+}
+
+object DockerContainerFactoryProvider extends ContainerFactoryProvider {
+ override def getContainerFactory(actorSystem: ActorSystem,
+ logging: Logging,
+ config: WhiskConfig,
+ instanceId: InstanceId,
+ parameters: Map[String, Set[String]]): ContainerFactory =
+ new DockerContainerFactory(config, instanceId, parameters)(actorSystem, actorSystem.dispatcher, logging)
+}
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/RuncClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/RuncClient.scala
index 1a488b8..c398765 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/RuncClient.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/RuncClient.scala
@@ -25,6 +25,7 @@ import scala.util.Success
import whisk.common.LoggingMarkers
import whisk.common.Logging
import akka.event.Logging.ErrorLevel
+import whisk.core.containerpool.ContainerId
/**
* Serves as interface to the docker CLI tool.
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 8a3d750..5148824 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -19,15 +19,11 @@ package whisk.core.invoker
import java.nio.charset.StandardCharsets
import java.time.Instant
-
-import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
-
import org.apache.kafka.common.errors.RecordTooLargeException
-
import akka.actor.ActorRefFactory
import akka.actor.ActorSystem
import akka.actor.Props
@@ -42,16 +38,13 @@ import whisk.core.connector.CompletionMessage
import whisk.core.connector.MessageFeed
import whisk.core.connector.MessageProducer
import whisk.core.connector.MessagingProvider
+import whisk.core.containerpool.ContainerFactoryProvider
import whisk.core.containerpool.ContainerPool
import whisk.core.containerpool.ContainerProxy
import whisk.core.containerpool.PrewarmingConfig
import whisk.core.containerpool.Run
-import whisk.core.containerpool.docker.DockerClientWithFileAccess
-import whisk.core.containerpool.docker.DockerContainer
-import whisk.core.containerpool.docker.RuncClient
import whisk.core.database.NoDocumentException
import whisk.core.entity._
-import whisk.core.entity.ExecManifest.ImageName
import whisk.core.entity.size._
import whisk.http.Messages
import whisk.spi.SpiLoader
@@ -61,6 +54,7 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
logging: Logging) {
implicit val ec = actorSystem.dispatcher
+ implicit val cfg = config
/** Initialize needed databases */
private val entityStore = WhiskEntityStore.datastore(config)
@@ -81,53 +75,22 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
new MessageFeed("activation", logging, consumer, maximumContainers, 500.milliseconds, processActivationMessage)
})
- /** Initialize container clients */
- implicit val docker = new DockerClientWithFileAccess()(ec)
- implicit val runc = new RuncClient(ec)
-
- /** Cleans up all running wsk_ containers */
- def cleanup() = {
- val cleaning = docker.ps(Seq("name" -> s"wsk${instance.toInt}_"))(TransactionId.invokerNanny).flatMap {
- containers =>
- val removals = containers.map { id =>
- runc
- .resume(id)(TransactionId.invokerNanny)
- .recoverWith {
- // Ignore resume failures and try to remove anyway
- case _ => Future.successful(())
- }
- .flatMap { _ =>
- docker.rm(id)(TransactionId.invokerNanny)
- }
- }
- Future.sequence(removals)
- }
-
- Await.ready(cleaning, 30.seconds)
- }
- cleanup()
- sys.addShutdownHook(cleanup())
-
/** Factory used by the ContainerProxy to physically create a new container. */
val containerFactory =
- (tid: TransactionId, name: String, actionImage: ImageName, userProvidedImage: Boolean, memory: ByteSize) => {
- val image = if (userProvidedImage) {
- actionImage.publicImageName
- } else {
- actionImage.localImageName(config.dockerRegistry, config.dockerImagePrefix, Some(config.dockerImageTag))
- }
-
- DockerContainer.create(
- tid,
- image = image,
- userProvidedImage = userProvidedImage,
- memory = memory,
- cpuShares = config.invokerCoreShare.toInt,
- environment = Map("__OW_API_HOST" -> config.wskApiHost),
- network = config.invokerContainerNetwork,
- dnsServers = config.invokerContainerDns,
- name = Some(name))
- }
+ SpiLoader
+ .get[ContainerFactoryProvider]
+ .getContainerFactory(
+ actorSystem,
+ logging,
+ config,
+ instance,
+ Map(
+ "--cap-drop" -> Set("NET_RAW", "NET_ADMIN"),
+ "--ulimit" -> Set("nofile=1024:1024"),
+ "--pids-limit" -> Set("1024"),
+ "--dns" -> config.invokerContainerDns.toSet))
+ containerFactory.init()
+ sys.addShutdownHook(containerFactory.cleanup())
/** Sends an active-ack. */
val ack = (tid: TransactionId,
@@ -164,7 +127,8 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
}
/** Creates a ContainerProxy Actor when being called. */
- val childFactory = (f: ActorRefFactory) => f.actorOf(ContainerProxy.props(containerFactory, ack, store, instance))
+ val childFactory = (f: ActorRefFactory) =>
+ f.actorOf(ContainerProxy.props(containerFactory.createContainer _, ack, store, instance))
val prewarmKind = "nodejs:6"
val prewarmExec = ExecManifest.runtimesManifest
diff --git a/tests/src/test/scala/actionContainers/ActionContainer.scala b/tests/src/test/scala/actionContainers/ActionContainer.scala
index f60cbfe..94f6763 100644
--- a/tests/src/test/scala/actionContainers/ActionContainer.scala
+++ b/tests/src/test/scala/actionContainers/ActionContainer.scala
@@ -164,7 +164,7 @@ object ActionContainer {
}
private def syncPost(host: String, port: Int, endPoint: String, content: JsValue): (Int, Option[JsObject]) = {
- whisk.core.containerpool.docker.HttpUtils.post(host, port, endPoint, content)
+ whisk.core.containerpool.HttpUtils.post(host, port, endPoint, content)
}
private class ActionContainerImpl() extends ActionContainer {
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
index 809165a..0e0867a 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
@@ -36,9 +36,9 @@ import org.scalatest.FlatSpec
import org.scalatest.Matchers
import spray.json.JsObject
-import whisk.core.containerpool.docker.HttpUtils
-import whisk.core.entity.ActivationResponse._
+import whisk.core.containerpool.HttpUtils
import whisk.core.entity.size._
+import whisk.core.entity.ActivationResponse._
/**
* Unit tests for HttpUtils which communicate with containers.
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
index 9146034..1235b92 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
@@ -23,21 +23,19 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import scala.concurrent.duration.FiniteDuration
-
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterEach
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
import org.scalatest.Matchers
-
import common.StreamLogging
import whisk.common.LogMarker
import whisk.common.LoggingMarkers.INVOKER_DOCKER_CMD
import whisk.common.TransactionId
-import whisk.core.containerpool.docker.ContainerId
-import whisk.core.containerpool.docker.ContainerIp
import whisk.core.containerpool.docker.DockerClient
import scala.concurrent.Promise
+import whisk.core.containerpool.ContainerId
+import whisk.core.containerpool.ContainerAddress
import whisk.utils.retry
@RunWith(classOf[JUnitRunner])
@@ -171,7 +169,7 @@ class DockerClientTests extends FlatSpec with Matchers with StreamLogging with B
val network = "userland"
val inspectArgs = Seq("--format", s"{{.NetworkSettings.Networks.${network}.IPAddress}}", id.asString)
- runAndVerify(dc.inspectIPAddress(id, network), "inspect", inspectArgs) shouldBe ContainerIp(stdout)
+ runAndVerify(dc.inspectIPAddress(id, network), "inspect", inspectArgs) shouldBe ContainerAddress(stdout)
val image = "image"
val runArgs = Seq("--memory", "256m", "--cpushares", "1024")
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala
index 8bf736d..7c6acbf 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala
@@ -40,8 +40,8 @@ import org.scalatest.fixture.{FlatSpec => FixtureFlatSpec}
import common.StreamLogging
import spray.json._
import whisk.common.TransactionId
-import whisk.core.containerpool.docker.ContainerId
-import whisk.core.containerpool.docker.ContainerIp
+import whisk.core.containerpool.ContainerId
+import whisk.core.containerpool.ContainerAddress
import whisk.core.containerpool.docker.DockerClientWithFileAccess
@RunWith(classOf[JUnitRunner])
@@ -57,25 +57,25 @@ class DockerClientWithFileAccessTestsIp extends FlatSpec with Matchers with Stre
val dockerCommand = "docker"
val networkInConfigFile = "networkConfig"
val networkInDockerInspect = "networkInspect"
- val ipInConfigFile = ContainerIp("10.0.0.1")
- val ipInDockerInspect = ContainerIp("10.0.0.2")
+ val ipInConfigFile = ContainerAddress("10.0.0.1")
+ val ipInDockerInspect = ContainerAddress("10.0.0.2")
val dockerConfig =
JsObject(
"NetworkSettings" ->
JsObject(
"Networks" ->
JsObject(networkInConfigFile ->
- JsObject("IPAddress" -> JsString(ipInConfigFile.asString)))))
+ JsObject("IPAddress" -> JsString(ipInConfigFile.host)))))
/** Returns a DockerClient with mocked results */
- def dockerClient(execResult: Future[String] = Future.successful(ipInDockerInspect.asString),
+ def dockerClient(execResult: Future[String] = Future.successful(ipInDockerInspect.host),
readResult: Future[JsObject] = Future.successful(dockerConfig)) =
new DockerClientWithFileAccess()(global) {
override val dockerCmd = Seq(dockerCommand)
override def executeProcess(args: String*)(implicit ec: ExecutionContext) = execResult
override def configFileContents(configFile: File) = readResult
// Make protected ipAddressFromFile available for testing - requires reflectiveCalls
- def publicIpAddressFromFile(id: ContainerId, network: String): Future[ContainerIp] =
+ def publicIpAddressFromFile(id: ContainerId, network: String): Future[ContainerAddress] =
ipAddressFromFile(id, network)
}
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
index 1451c5d..47e67c2 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -21,7 +21,6 @@ import java.io.IOException
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.time.Instant
-
import scala.collection.mutable
import scala.concurrent.Await
import scala.concurrent.duration._
@@ -71,12 +70,12 @@ class DockerContainerTests
* Constructs a testcontainer with overridden IO methods. Results of the override can be provided
* as parameters.
*/
- def dockerContainer(id: ContainerId = containerId, ip: ContainerIp = ContainerIp("ip"))(
+ def dockerContainer(id: ContainerId = containerId, addr: ContainerAddress = ContainerAddress("ip"))(
ccRes: Future[RunResult] =
Future.successful(RunResult(intervalOf(1.millisecond), Right(ContainerResponse(true, "", None)))),
retryCount: Int = 0)(implicit docker: DockerApiWithFileAccess, runc: RuncApi): DockerContainer = {
- new DockerContainer(id, ip) {
+ new DockerContainer(id, addr) {
override protected def callContainer(path: String,
body: JsObject,
timeout: FiniteDuration,
@@ -94,6 +93,10 @@ class DockerContainerTests
behavior of "DockerContainer"
implicit val transid = TransactionId.testing
+ val parameters = Map(
+ "--cap-drop" -> Set("NET_RAW", "NET_ADMIN"),
+ "--ulimit" -> Set("nofile=1024:1024"),
+ "--pids-limit" -> Set("1024"))
/*
* CONTAINER CREATION
@@ -108,7 +111,6 @@ class DockerContainerTests
val environment = Map("test" -> "hi")
val network = "testwork"
val name = "myContainer"
-
val container = DockerContainer.create(
transid = transid,
image = image,
@@ -116,7 +118,8 @@ class DockerContainerTests
cpuShares = cpuShares,
environment = environment,
network = network,
- name = Some(name))
+ name = Some(name),
+ dockerRunParameters = parameters)
await(container)
@@ -148,7 +151,11 @@ class DockerContainerTests
implicit val docker = new TestDockerClient
implicit val runc = stub[RuncApi]
- val container = DockerContainer.create(transid = transid, image = "image", userProvidedImage = true)
+ val container = DockerContainer.create(
+ transid = transid,
+ image = "image",
+ userProvidedImage = true,
+ dockerRunParameters = parameters)
await(container)
docker.pulls should have size 1
@@ -160,14 +167,14 @@ class DockerContainerTests
it should "remove the container if inspect fails" in {
implicit val docker = new TestDockerClient {
override def inspectIPAddress(id: ContainerId,
- network: String)(implicit transid: TransactionId): Future[ContainerIp] = {
+ network: String)(implicit transid: TransactionId): Future[ContainerAddress] = {
inspects += ((id, network))
Future.failed(new RuntimeException())
}
}
implicit val runc = stub[RuncApi]
- val container = DockerContainer.create(transid = transid, image = "image")
+ val container = DockerContainer.create(transid = transid, image = "image", dockerRunParameters = parameters)
a[WhiskContainerStartupError] should be thrownBy await(container)
docker.pulls should have size 0
@@ -186,7 +193,11 @@ class DockerContainerTests
}
implicit val runc = stub[RuncApi]
- val container = DockerContainer.create(transid = transid, image = "image", userProvidedImage = true)
+ val container = DockerContainer.create(
+ transid = transid,
+ image = "image",
+ userProvidedImage = true,
+ dockerRunParameters = parameters)
a[WhiskContainerStartupError] should be thrownBy await(container)
docker.pulls should have size 1
@@ -198,14 +209,18 @@ class DockerContainerTests
it should "provide a proper error if inspect fails for blackbox containers" in {
implicit val docker = new TestDockerClient {
override def inspectIPAddress(id: ContainerId,
- network: String)(implicit transid: TransactionId): Future[ContainerIp] = {
+ network: String)(implicit transid: TransactionId): Future[ContainerAddress] = {
inspects += ((id, network))
Future.failed(new RuntimeException())
}
}
implicit val runc = stub[RuncApi]
- val container = DockerContainer.create(transid = transid, image = "image", userProvidedImage = true)
+ val container = DockerContainer.create(
+ transid = transid,
+ image = "image",
+ userProvidedImage = true,
+ dockerRunParameters = parameters)
a[WhiskContainerStartupError] should be thrownBy await(container)
docker.pulls should have size 1
@@ -223,7 +238,11 @@ class DockerContainerTests
}
implicit val runc = stub[RuncApi]
- val container = DockerContainer.create(transid = transid, image = "image", userProvidedImage = true)
+ val container = DockerContainer.create(
+ transid = transid,
+ image = "image",
+ userProvidedImage = true,
+ dockerRunParameters = parameters)
a[BlackboxStartupError] should be thrownBy await(container)
docker.pulls should have size 1
@@ -240,7 +259,7 @@ class DockerContainerTests
implicit val runc = stub[RuncApi]
val id = ContainerId("id")
- val container = new DockerContainer(id, ContainerIp("ip"))
+ val container = new DockerContainer(id, ContainerAddress("ip"))
container.suspend()
container.resume()
@@ -254,7 +273,7 @@ class DockerContainerTests
implicit val runc = stub[RuncApi]
val id = ContainerId("id")
- val container = new DockerContainer(id, ContainerIp("ip"))
+ val container = new DockerContainer(id, ContainerAddress("ip"))
container.destroy()
@@ -632,9 +651,10 @@ class DockerContainerTests
Future.successful(ContainerId("testId"))
}
- def inspectIPAddress(id: ContainerId, network: String)(implicit transid: TransactionId): Future[ContainerIp] = {
+ def inspectIPAddress(id: ContainerId, network: String)(
+ implicit transid: TransactionId): Future[ContainerAddress] = {
inspects += ((id, network))
- Future.successful(ContainerIp("testIp"))
+ Future.successful(ContainerAddress("testIp"))
}
def pause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] = {
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/RuncClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/RuncClientTests.scala
index 4d2db5e..b4e2e47 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/RuncClientTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/RuncClientTests.scala
@@ -30,7 +30,7 @@ import scala.concurrent.Await
import org.scalatest.Matchers
import whisk.core.containerpool.docker.RuncClient
import common.StreamLogging
-import whisk.core.containerpool.docker.ContainerId
+import whisk.core.containerpool.ContainerId
import whisk.common.TransactionId
import org.scalatest.BeforeAndAfterEach
import whisk.common.LogMarker
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 0f1e1b6..ca890a3 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -18,18 +18,15 @@
package whisk.core.containerpool.test
import java.time.Instant
-
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
-
import org.junit.runner.RunWith
import org.scalamock.scalatest.MockFactory
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpecLike
import org.scalatest.Matchers
import org.scalatest.junit.JUnitRunner
-
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.FSM
@@ -39,8 +36,11 @@ import akka.actor.FSM.Transition
import akka.testkit.ImplicitSender
import akka.testkit.TestKit
import common.LoggedFunction
+import common.StreamLogging
+import scala.concurrent.ExecutionContext
import spray.json._
import spray.json.DefaultJsonProtocol._
+import whisk.common.Logging
import whisk.common.TransactionId
import whisk.core.connector.ActivationMessage
import whisk.core.containerpool._
@@ -56,11 +56,13 @@ class ContainerProxyTests
with FlatSpecLike
with Matchers
with BeforeAndAfterAll
- with MockFactory {
+ with MockFactory
+ with StreamLogging {
override def afterAll = TestKit.shutdownActorSystem(system)
val timeout = 5.seconds
+ val log = logging
// Common entities to pass to the tests. We don't really care what's inside
// those for the behavior testing here, as none of the contents will really
@@ -504,6 +506,10 @@ class ContainerProxyTests
* Implements all the good cases of a perfect run to facilitate error case overriding.
*/
class TestContainer extends Container {
+ protected val id = ContainerId("testcontainer")
+ protected val addr = ContainerAddress("0.0.0.0")
+ protected implicit val logging: Logging = log
+ protected implicit val ec: ExecutionContext = system.dispatcher
var suspendCount = 0
var resumeCount = 0
var destroyCount = 0
@@ -519,18 +525,18 @@ class ContainerProxyTests
resumeCount += 1
Future.successful(())
}
- def destroy()(implicit transid: TransactionId): Future[Unit] = {
+ override def destroy()(implicit transid: TransactionId): Future[Unit] = {
destroyCount += 1
- Future.successful(())
+ super.destroy()
}
- def initialize(initializer: JsObject, timeout: FiniteDuration)(
+ override def initialize(initializer: JsObject, timeout: FiniteDuration)(
implicit transid: TransactionId): Future[Interval] = {
initializeCount += 1
initializer shouldBe action.containerInitializer
timeout shouldBe action.limits.timeout.duration
Future.successful(Interval.zero)
}
- def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration)(
+ override def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration)(
implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
runCount += 1
environment.fields("api_key") shouldBe message.user.authkey.toJson
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].