You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2017/09/25 16:50:07 UTC

[incubator-openwhisk] branch master updated: ContainerFactory SPI (#2659)

This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new be43ad9  ContainerFactory SPI (#2659)
be43ad9 is described below

commit be43ad90e0dce26c7fa76573605324be22817b30
Author: tysonnorris <ty...@gmail.com>
AuthorDate: Mon Sep 25 09:50:04 2017 -0700

    ContainerFactory SPI (#2659)
---
 common/scala/src/main/resources/reference.conf     |   1 +
 .../scala/whisk/core/containerpool/Container.scala | 199 +++++++++++++++++++++
 .../core/containerpool/ContainerFactory.scala      |  59 ++++++
 .../whisk/core/containerpool}/HttpUtils.scala      |   4 +-
 .../scala/whisk/core/containerpool/Container.scala |  83 ---------
 .../whisk/core/containerpool/ContainerProxy.scala  |   2 +-
 .../core/containerpool/docker/DockerClient.scala   |  17 +-
 .../docker/DockerClientWithFileAccess.scala        |  10 +-
 .../containerpool/docker/DockerContainer.scala     | 163 +++--------------
 .../docker/DockerContainerFactory.scala            | 101 +++++++++++
 .../core/containerpool/docker/RuncClient.scala     |   1 +
 .../scala/whisk/core/invoker/InvokerReactive.scala |  72 ++------
 .../scala/actionContainers/ActionContainer.scala   |   2 +-
 .../docker/test/ContainerConnectionTests.scala     |   4 +-
 .../docker/test/DockerClientTests.scala            |   8 +-
 .../test/DockerClientWithFileAccessTests.scala     |  14 +-
 .../docker/test/DockerContainerTests.scala         |  52 ++++--
 .../docker/test/RuncClientTests.scala              |   2 +-
 .../containerpool/test/ContainerProxyTests.scala   |  22 ++-
 19 files changed, 481 insertions(+), 335 deletions(-)

diff --git a/common/scala/src/main/resources/reference.conf b/common/scala/src/main/resources/reference.conf
index 52f30c3..50a36a5 100644
--- a/common/scala/src/main/resources/reference.conf
+++ b/common/scala/src/main/resources/reference.conf
@@ -1,4 +1,5 @@
 whisk.spi{
   ArtifactStoreProvider = whisk.core.database.CouchDbStoreProvider
   MessagingProvider = whisk.connector.kafka.KafkaMessagingProvider
+  ContainerFactoryProvider = whisk.core.containerpool.docker.DockerContainerFactoryProvider
 }
\ No newline at end of file
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
new file mode 100644
index 0000000..0cecbe6
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool
+
+import java.time.Instant
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.duration._
+import scala.util.Failure
+import scala.util.Success
+import spray.json.JsObject
+import spray.json.DefaultJsonProtocol._
+import whisk.common.Logging
+import whisk.common.LoggingMarkers
+import whisk.common.TransactionId
+import whisk.core.entity.ActivationResponse
+import whisk.core.entity.ActivationResponse.ContainerConnectionError
+import whisk.core.entity.ActivationResponse.ContainerResponse
+import whisk.core.entity.ByteSize
+import whisk.core.entity.size._
+import whisk.http.Messages
+
+/**
+ * An OpenWhisk biased container abstraction. This is **not only** an abstraction
+ * for different container providers, but the implementation also needs to include
+ * OpenWhisk specific behavior, especially for initialize and run.
+ */
+case class ContainerId(val asString: String) {
+  require(asString.nonEmpty, "ContainerId must not be empty")
+}
+case class ContainerAddress(val host: String, val port: Int = 8080) {
+  require(host.nonEmpty, "ContainerIp must not be empty")
+}
+
+trait Container {
+
+  protected val id: ContainerId
+  protected val addr: ContainerAddress
+  protected implicit val logging: Logging
+  protected implicit val ec: ExecutionContext
+
+  /** HTTP connection to the container, will be lazily established by callContainer */
+  private var httpConnection: Option[HttpUtils] = None
+
+  /** Stops the container from consuming CPU cycles. */
+  def suspend()(implicit transid: TransactionId): Future[Unit]
+
+  /** Dual of halt. */
+  def resume()(implicit transid: TransactionId): Future[Unit]
+
+  /** Obtains logs up to a given threshold from the container. Optionally waits for a sentinel to appear. */
+  def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Future[Vector[String]]
+
+  /** Completely destroys this instance of the container. */
+  def destroy()(implicit transid: TransactionId): Future[Unit] = {
+    Future.successful(httpConnection.foreach(_.close()))
+  }
+
+  /** Initializes code in the container. */
+  def initialize(initializer: JsObject, timeout: FiniteDuration)(implicit transid: TransactionId): Future[Interval] = {
+    val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION_INIT, s"sending initialization to $id $addr")
+
+    val body = JsObject("value" -> initializer)
+    callContainer("/init", body, timeout, retry = true)
+      .andThen { // never fails
+        case Success(r: RunResult) =>
+          transid.finished(
+            this,
+            start.copy(start = r.interval.start),
+            s"initialization result: ${r.toBriefString}",
+            endTime = r.interval.end)
+        case Failure(t) =>
+          transid.failed(this, start, s"initializiation failed with $t")
+      }
+      .flatMap { result =>
+        if (result.ok) {
+          Future.successful(result.interval)
+        } else if (result.interval.duration >= timeout) {
+          Future.failed(
+            InitializationError(
+              result.interval,
+              ActivationResponse.applicationError(Messages.timedoutActivation(timeout, true))))
+        } else {
+          Future.failed(
+            InitializationError(
+              result.interval,
+              ActivationResponse.processInitResponseContent(result.response, logging)))
+        }
+      }
+  }
+
+  /** Runs code in the container. */
+  def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration)(
+    implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
+    val actionName = environment.fields.get("action_name").map(_.convertTo[String]).getOrElse("")
+    val start =
+      transid.started(this, LoggingMarkers.INVOKER_ACTIVATION_RUN, s"sending arguments to $actionName at $id $addr")
+
+    val parameterWrapper = JsObject("value" -> parameters)
+    val body = JsObject(parameterWrapper.fields ++ environment.fields)
+    callContainer("/run", body, timeout, retry = false)
+      .andThen { // never fails
+        case Success(r: RunResult) =>
+          transid.finished(
+            this,
+            start.copy(start = r.interval.start),
+            s"running result: ${r.toBriefString}",
+            endTime = r.interval.end)
+        case Failure(t) =>
+          transid.failed(this, start, s"run failed with $t")
+      }
+      .map { result =>
+        val response = if (result.interval.duration >= timeout) {
+          ActivationResponse.applicationError(Messages.timedoutActivation(timeout, false))
+        } else {
+          ActivationResponse.processRunResponseContent(result.response, logging)
+        }
+
+        (result.interval, response)
+      }
+  }
+
+  /**
+   * Makes an HTTP request to the container.
+   *
+   * Note that `http.post` will not throw an exception, hence the generated Future cannot fail.
+   *
+   * @param path relative path to use in the http request
+   * @param body body to send
+   * @param timeout timeout of the request
+   * @param retry whether or not to retry the request
+   */
+  protected def callContainer(path: String,
+                              body: JsObject,
+                              timeout: FiniteDuration,
+                              retry: Boolean = false): Future[RunResult] = {
+    val started = Instant.now()
+    val http = httpConnection.getOrElse {
+      val conn = new HttpUtils(s"${addr.host}:${addr.port}", timeout, 1.MB)
+      httpConnection = Some(conn)
+      conn
+    }
+    Future {
+      http.post(path, body, retry)
+    }.map { response =>
+      val finished = Instant.now()
+      RunResult(Interval(started, finished), response)
+    }
+  }
+}
+
+/** Indicates a general error with the container */
+sealed abstract class ContainerError(msg: String) extends Exception(msg)
+
+/** Indicates an error while starting a container */
+sealed abstract class ContainerStartupError(msg: String) extends ContainerError(msg)
+
+/** Indicates any error while starting a container either of a managed runtime or a non-application-specific blackbox container */
+case class WhiskContainerStartupError(msg: String) extends ContainerStartupError(msg)
+
+/** Indicates an application-specific error while starting a blackbox container */
+case class BlackboxStartupError(msg: String) extends ContainerStartupError(msg)
+
+/** Indicates an error while initializing a container */
+case class InitializationError(interval: Interval, response: ActivationResponse) extends Exception(response.toString)
+
+case class Interval(start: Instant, end: Instant) {
+  def duration = Duration.create(end.toEpochMilli() - start.toEpochMilli(), MILLISECONDS)
+}
+
+case class RunResult(interval: Interval, response: Either[ContainerConnectionError, ContainerResponse]) {
+  def ok = response.right.exists(_.ok)
+  def toBriefString = response.fold(_.toString, _.toString)
+}
+object Interval {
+
+  /** An interval starting now with zero duration. */
+  def zero = {
+    val now = Instant.now
+    Interval(now, now)
+  }
+}
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
new file mode 100644
index 0000000..fb04f0f
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool
+
+import akka.actor.ActorSystem
+import scala.concurrent.Future
+import whisk.common.Logging
+import whisk.common.TransactionId
+import whisk.core.WhiskConfig
+import whisk.core.entity.ByteSize
+import whisk.core.entity.ExecManifest
+import whisk.core.entity.InstanceId
+import whisk.spi.Spi
+
+/**
+ * An abstraction for Container creation
+ */
+trait ContainerFactory {
+
+  /** create a new Container */
+  def createContainer(tid: TransactionId,
+                      name: String,
+                      actionImage: ExecManifest.ImageName,
+                      userProvidedImage: Boolean,
+                      memory: ByteSize)(implicit config: WhiskConfig, logging: Logging): Future[Container]
+
+  /** perform any initialization */
+  def init(): Unit
+
+  /** cleanup any remaining Containers; should block until complete; should ONLY be run at startup/shutdown */
+  def cleanup(): Unit
+}
+
+/**
+ * An SPI for ContainerFactory creation
+ * All impls should use the parameters specified as additional args to "docker run" commands
+ */
+trait ContainerFactoryProvider extends Spi {
+  def getContainerFactory(actorSystem: ActorSystem,
+                          logging: Logging,
+                          config: WhiskConfig,
+                          instance: InstanceId,
+                          parameters: Map[String, Set[String]]): ContainerFactory
+}
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/HttpUtils.scala b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
similarity index 98%
rename from core/invoker/src/main/scala/whisk/core/containerpool/docker/HttpUtils.scala
rename to common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
index a6b53a7..2815068 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/HttpUtils.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
@@ -15,12 +15,10 @@
  * limitations under the License.
  */
 
-package whisk.core.containerpool.docker
+package whisk.core.containerpool
 
 import java.nio.charset.StandardCharsets
 
-import scala.Left
-import scala.Right
 import scala.concurrent.duration.DurationInt
 import scala.concurrent.duration.FiniteDuration
 import scala.util.Failure
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/Container.scala b/core/invoker/src/main/scala/whisk/core/containerpool/Container.scala
deleted file mode 100644
index bffc75e..0000000
--- a/core/invoker/src/main/scala/whisk/core/containerpool/Container.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.containerpool
-
-import scala.concurrent.Future
-import scala.concurrent.duration.FiniteDuration
-
-import spray.json.JsObject
-import whisk.common.TransactionId
-import whisk.core.entity.ActivationResponse
-import whisk.core.entity.ByteSize
-import java.time.Instant
-import scala.concurrent.duration._
-
-/**
- * An OpenWhisk biased container abstraction. This is **not only** an abstraction
- * for different container providers, but the implementation also needs to include
- * OpenWhisk specific behavior, especially for initialize and run.
- */
-trait Container {
-
-  /** Stops the container from consuming CPU cycles. */
-  def suspend()(implicit transid: TransactionId): Future[Unit]
-
-  /** Dual of halt. */
-  def resume()(implicit transid: TransactionId): Future[Unit]
-
-  /** Completely destroys this instance of the container. */
-  def destroy()(implicit transid: TransactionId): Future[Unit]
-
-  /** Initializes code in the container. */
-  def initialize(initializer: JsObject, timeout: FiniteDuration)(implicit transid: TransactionId): Future[Interval]
-
-  /** Runs code in the container. */
-  def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration)(
-    implicit transid: TransactionId): Future[(Interval, ActivationResponse)]
-
-  /** Obtains logs up to a given threshold from the container. Optionally waits for a sentinel to appear. */
-  def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Future[Vector[String]]
-}
-
-/** Indicates a general error with the container */
-sealed abstract class ContainerError(msg: String) extends Exception(msg)
-
-/** Indicates an error while starting a container */
-sealed abstract class ContainerStartupError(msg: String) extends ContainerError(msg)
-
-/** Indicates any error while starting a container either of a managed runtime or a non-application-specific blackbox container */
-case class WhiskContainerStartupError(msg: String) extends ContainerStartupError(msg)
-
-/** Indicates an application-specific error while starting a blackbox container */
-case class BlackboxStartupError(msg: String) extends ContainerStartupError(msg)
-
-/** Indicates an error while initializing a container */
-case class InitializationError(interval: Interval, response: ActivationResponse) extends Exception(response.toString)
-
-case class Interval(start: Instant, end: Instant) {
-  def duration = Duration.create(end.toEpochMilli() - start.toEpochMilli(), MILLISECONDS)
-}
-
-object Interval {
-
-  /** An interval starting now with zero duration. */
-  def zero = {
-    val now = Instant.now
-    Interval(now, now)
-  }
-}
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index da66e5a..a9d3f82 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -100,7 +100,7 @@ class ContainerProxy(factory: (TransactionId, String, ImageName, Boolean, ByteSi
     extends FSM[ContainerState, ContainerData]
     with Stash {
   implicit val ec = context.system.dispatcher
-  val logging = new AkkaLogging(context.system.log)
+  implicit val logging = new AkkaLogging(context.system.log)
 
   startWith(Uninitialized, NoData())
 
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
index 915e7f6..28b5b7a 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
@@ -20,18 +20,18 @@ package whisk.core.containerpool.docker
 import java.io.FileNotFoundException
 import java.nio.file.Files
 import java.nio.file.Paths
-
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.util.Failure
 import scala.util.Success
 import scala.util.Try
-
 import akka.event.Logging.ErrorLevel
 import whisk.common.Logging
 import whisk.common.LoggingMarkers
 import whisk.common.TransactionId
 import scala.collection.concurrent.TrieMap
+import whisk.core.containerpool.ContainerId
+import whisk.core.containerpool.ContainerAddress
 
 /**
  * Serves as interface to the docker CLI tool.
@@ -64,11 +64,11 @@ class DockerClient(dockerHost: Option[String] = None)(executionContext: Executio
   def run(image: String, args: Seq[String] = Seq.empty[String])(implicit transid: TransactionId): Future[ContainerId] =
     runCmd((Seq("run", "-d") ++ args ++ Seq(image)): _*).map(ContainerId.apply)
 
-  def inspectIPAddress(id: ContainerId, network: String)(implicit transid: TransactionId): Future[ContainerIp] =
+  def inspectIPAddress(id: ContainerId, network: String)(implicit transid: TransactionId): Future[ContainerAddress] =
     runCmd("inspect", "--format", s"{{.NetworkSettings.Networks.${network}.IPAddress}}", id.asString).flatMap {
       _ match {
         case "<no value>" => Future.failed(new NoSuchElementException)
-        case stdout       => Future.successful(ContainerIp(stdout))
+        case stdout       => Future.successful(ContainerAddress(stdout))
       }
     }
 
@@ -110,13 +110,6 @@ class DockerClient(dockerHost: Option[String] = None)(executionContext: Executio
   }
 }
 
-case class ContainerId(val asString: String) {
-  require(asString.nonEmpty, "ContainerId must not be empty")
-}
-case class ContainerIp(val asString: String) {
-  require(asString.nonEmpty, "ContainerIp must not be empty")
-}
-
 trait DockerApi {
 
   /**
@@ -139,7 +132,7 @@ trait DockerApi {
    * @param network name of the network to get the IP address from
    * @return ip of the container
    */
-  def inspectIPAddress(id: ContainerId, network: String)(implicit transid: TransactionId): Future[ContainerIp]
+  def inspectIPAddress(id: ContainerId, network: String)(implicit transid: TransactionId): Future[ContainerAddress]
 
   /**
    * Pauses the container with the given id.
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala
index 0407346..444c365 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala
@@ -22,16 +22,16 @@ import java.io.FileInputStream
 import java.io.IOException
 import java.nio.ByteBuffer
 import java.nio.file.Paths
-
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.blocking
 import scala.io.Source
-
 import spray.json.DefaultJsonProtocol._
 import spray.json._
 import whisk.common.Logging
 import whisk.common.TransactionId
+import whisk.core.containerpool.ContainerId
+import whisk.core.containerpool.ContainerAddress
 
 class DockerClientWithFileAccess(
   dockerHost: Option[String] = None,
@@ -113,18 +113,18 @@ class DockerClientWithFileAccess(
    * @param network name of the network to get the IP address from
    * @return the ip address of the container
    */
-  protected def ipAddressFromFile(id: ContainerId, network: String): Future[ContainerIp] = {
+  protected def ipAddressFromFile(id: ContainerId, network: String): Future[ContainerAddress] = {
     configFileContents(containerConfigFile(id)).map { json =>
       val networks = json.fields("NetworkSettings").asJsObject.fields("Networks").asJsObject
       val specifiedNetwork = networks.fields(network).asJsObject
       val ipAddr = specifiedNetwork.fields("IPAddress")
-      ContainerIp(ipAddr.convertTo[String])
+      ContainerAddress(ipAddr.convertTo[String])
     }
   }
 
   // See extended trait for description
   override def inspectIPAddress(id: ContainerId, network: String)(
-    implicit transid: TransactionId): Future[ContainerIp] = {
+    implicit transid: TransactionId): Future[ContainerAddress] = {
     ipAddressFromFile(id, network).recoverWith {
       case _ => super.inspectIPAddress(id, network)
     }
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
index 7e07247..a63c304 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
@@ -18,7 +18,6 @@
 package whisk.core.containerpool.docker
 
 import java.nio.charset.StandardCharsets
-import java.time.Instant
 
 import akka.actor.ActorSystem
 
@@ -26,23 +25,15 @@ import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.util.Failure
-import scala.util.Success
-import spray.json._
-import spray.json.DefaultJsonProtocol._
 import whisk.common.Logging
-import whisk.common.LoggingMarkers
 import whisk.common.TransactionId
-import whisk.core.containerpool.Interval
 import whisk.core.containerpool.BlackboxStartupError
 import whisk.core.containerpool.Container
-import whisk.core.containerpool.InitializationError
+import whisk.core.containerpool.ContainerId
+import whisk.core.containerpool.ContainerAddress
 import whisk.core.containerpool.WhiskContainerStartupError
-import whisk.core.entity.ActivationResponse
 import whisk.core.entity.ByteSize
 import whisk.core.entity.size._
-import whisk.http.Messages
-import whisk.core.entity.ActivationResponse.ContainerConnectionError
-import whisk.core.entity.ActivationResponse.ContainerResponse
 
 object DockerContainer {
 
@@ -69,28 +60,23 @@ object DockerContainer {
              environment: Map[String, String] = Map(),
              network: String = "bridge",
              dnsServers: Seq[String] = Seq(),
-             name: Option[String] = None)(implicit docker: DockerApiWithFileAccess,
-                                          runc: RuncApi,
-                                          as: ActorSystem,
-                                          ec: ExecutionContext,
-                                          log: Logging): Future[DockerContainer] = {
+             name: Option[String] = None,
+             dockerRunParameters: Map[String, Set[String]])(implicit docker: DockerApiWithFileAccess,
+                                                            runc: RuncApi,
+                                                            as: ActorSystem,
+                                                            ec: ExecutionContext,
+                                                            log: Logging): Future[DockerContainer] = {
     implicit val tid = transid
 
-    val environmentArgs = environment.map {
+    val environmentArgs = environment.flatMap {
       case (key, value) => Seq("-e", s"$key=$value")
-    }.flatten
+    }
 
-    val dnsArgs = dnsServers.map(Seq("--dns", _)).flatten
+    val params = dockerRunParameters.flatMap {
+      case (key, valueList) => valueList.toList.flatMap(Seq(key, _))
+    }
 
     val args = Seq(
-      "--cap-drop",
-      "NET_RAW",
-      "--cap-drop",
-      "NET_ADMIN",
-      "--ulimit",
-      "nofile=1024:1024",
-      "--pids-limit",
-      "1024",
       "--cpu-shares",
       cpuShares.toString,
       "--memory",
@@ -99,10 +85,9 @@ object DockerContainer {
       s"${memory.toMB}m",
       "--network",
       network) ++
-      dnsArgs ++
       environmentArgs ++
-      name.map(n => Seq("--name", n)).getOrElse(Seq.empty)
-
+      name.map(n => Seq("--name", n)).getOrElse(Seq.empty) ++
+      params
     val pulled = if (userProvidedImage) {
       docker.pull(image).recoverWith {
         case _ => Future.failed(BlackboxStartupError(s"Failed to pull container image '${image}'."))
@@ -133,13 +118,14 @@ object DockerContainer {
  *
  * @constructor
  * @param id the id of the container
- * @param ip the ip of the container
+ * @param addr the ip of the container
  */
-class DockerContainer(id: ContainerId, ip: ContainerIp)(implicit docker: DockerApiWithFileAccess,
-                                                        runc: RuncApi,
-                                                        as: ActorSystem,
-                                                        ec: ExecutionContext,
-                                                        logger: Logging)
+class DockerContainer(protected val id: ContainerId, protected val addr: ContainerAddress)(
+  implicit docker: DockerApiWithFileAccess,
+  runc: RuncApi,
+  as: ActorSystem,
+  protected val ec: ExecutionContext,
+  protected val logging: Logging)
     extends Container
     with DockerActionLogDriver {
 
@@ -149,78 +135,13 @@ class DockerContainer(id: ContainerId, ip: ContainerIp)(implicit docker: DockerA
   protected val logsRetryCount = 15
   protected val logsRetryWait = 100.millis
 
-  /** HTTP connection to the container, will be lazily established by callContainer */
-  private var httpConnection: Option[HttpUtils] = None
-
   def suspend()(implicit transid: TransactionId): Future[Unit] = runc.pause(id)
   def resume()(implicit transid: TransactionId): Future[Unit] = runc.resume(id)
-  def destroy()(implicit transid: TransactionId): Future[Unit] = {
-    httpConnection.foreach(_.close())
+  override def destroy()(implicit transid: TransactionId): Future[Unit] = {
+    super.destroy()
     docker.rm(id)
   }
 
-  def initialize(initializer: JsObject, timeout: FiniteDuration)(implicit transid: TransactionId): Future[Interval] = {
-    val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION_INIT, s"sending initialization to $id $ip")
-
-    val body = JsObject("value" -> initializer)
-    callContainer("/init", body, timeout, retry = true)
-      .andThen { // never fails
-        case Success(r: RunResult) =>
-          transid.finished(
-            this,
-            start.copy(start = r.interval.start),
-            s"initialization result: ${r.toBriefString}",
-            endTime = r.interval.end)
-        case Failure(t) =>
-          transid.failed(this, start, s"initializiation failed with $t")
-      }
-      .flatMap { result =>
-        if (result.ok) {
-          Future.successful(result.interval)
-        } else if (result.interval.duration >= timeout) {
-          Future.failed(
-            InitializationError(
-              result.interval,
-              ActivationResponse.applicationError(Messages.timedoutActivation(timeout, true))))
-        } else {
-          Future.failed(
-            InitializationError(
-              result.interval,
-              ActivationResponse.processInitResponseContent(result.response, logger)))
-        }
-      }
-  }
-
-  def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration)(
-    implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
-    val actionName = environment.fields.get("action_name").map(_.convertTo[String]).getOrElse("")
-    val start =
-      transid.started(this, LoggingMarkers.INVOKER_ACTIVATION_RUN, s"sending arguments to $actionName at $id $ip")
-
-    val parameterWrapper = JsObject("value" -> parameters)
-    val body = JsObject(parameterWrapper.fields ++ environment.fields)
-    callContainer("/run", body, timeout, retry = false)
-      .andThen { // never fails
-        case Success(r: RunResult) =>
-          transid.finished(
-            this,
-            start.copy(start = r.interval.start),
-            s"running result: ${r.toBriefString}",
-            endTime = r.interval.end)
-        case Failure(t) =>
-          transid.failed(this, start, s"run failed with $t")
-      }
-      .map { result =>
-        val response = if (result.interval.duration >= timeout) {
-          ActivationResponse.applicationError(Messages.timedoutActivation(timeout, false))
-        } else {
-          ActivationResponse.processRunResponseContent(result.response, logger)
-        }
-
-        (result.interval, response)
-      }
-  }
-
   /**
    * Obtains the container's stdout and stderr output and converts it to our own JSON format.
    * At the moment, this is done by reading the internal Docker log file for the container.
@@ -254,7 +175,7 @@ class DockerContainer(id: ContainerId, ip: ContainerIp)(implicit docker: DockerA
           val (isComplete, isTruncated, formattedLogs) = processJsonDriverLogContents(rawLog, waitForSentinel, limit)
 
           if (retries > 0 && !isComplete && !isTruncated) {
-            logger.info(this, s"log cursor advanced but missing sentinel, trying $retries more times")
+            logging.info(this, s"log cursor advanced but missing sentinel, trying $retries more times")
             akka.pattern.after(logsRetryWait, as.scheduler)(readLogs(retries - 1))
           } else {
             logFileOffset += rawLogBytes.position - rawLogBytes.arrayOffset
@@ -263,43 +184,11 @@ class DockerContainer(id: ContainerId, ip: ContainerIp)(implicit docker: DockerA
         }
         .andThen {
           case Failure(e) =>
-            logger.error(this, s"Failed to obtain logs of ${id.asString}: ${e.getClass} - ${e.getMessage}")
+            logging.error(this, s"Failed to obtain logs of ${id.asString}: ${e.getClass} - ${e.getMessage}")
         }
     }
 
     readLogs(logsRetryCount)
   }
 
-  /**
-   * Makes an HTTP request to the container.
-   *
-   * Note that `http.post` will not throw an exception, hence the generated Future cannot fail.
-   *
-   * @param path relative path to use in the http request
-   * @param body body to send
-   * @param timeout timeout of the request
-   * @param retry whether or not to retry the request
-   */
-  protected def callContainer(path: String,
-                              body: JsObject,
-                              timeout: FiniteDuration,
-                              retry: Boolean = false): Future[RunResult] = {
-    val started = Instant.now()
-    val http = httpConnection.getOrElse {
-      val conn = new HttpUtils(s"${ip.asString}:8080", timeout, 1.MB)
-      httpConnection = Some(conn)
-      conn
-    }
-    Future {
-      http.post(path, body, retry)
-    }.map { response =>
-      val finished = Instant.now()
-      RunResult(Interval(started, finished), response)
-    }
-  }
-}
-
-case class RunResult(interval: Interval, response: Either[ContainerConnectionError, ContainerResponse]) {
-  def ok = response.right.exists(_.ok)
-  def toBriefString = response.fold(_.toString, _.toString)
 }
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
new file mode 100644
index 0000000..61e9b7b
--- /dev/null
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.docker
+
+import akka.actor.ActorSystem
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import whisk.common.Logging
+import whisk.common.TransactionId
+import whisk.core.WhiskConfig
+import whisk.core.containerpool.Container
+import whisk.core.containerpool.ContainerFactory
+import whisk.core.containerpool.ContainerFactoryProvider
+import whisk.core.entity.ByteSize
+import whisk.core.entity.ExecManifest
+import whisk.core.entity.InstanceId
+import scala.concurrent.duration._
+
+class DockerContainerFactory(config: WhiskConfig, instance: InstanceId, parameters: Map[String, Set[String]])(
+  implicit actorSystem: ActorSystem,
+  ec: ExecutionContext,
+  logging: Logging)
+    extends ContainerFactory {
+
+  /** Initialize container clients */
+  implicit val docker = new DockerClientWithFileAccess()(ec)
+  implicit val runc = new RuncClient(ec)
+
+  /** Create a container using docker cli */
+  override def createContainer(tid: TransactionId,
+                               name: String,
+                               actionImage: ExecManifest.ImageName,
+                               userProvidedImage: Boolean,
+                               memory: ByteSize)(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
+    val image = if (userProvidedImage) {
+      actionImage.publicImageName
+    } else {
+      actionImage.localImageName(config.dockerRegistry, config.dockerImagePrefix, Some(config.dockerImageTag))
+    }
+
+    DockerContainer.create(
+      tid,
+      image = image,
+      userProvidedImage = userProvidedImage,
+      memory = memory,
+      cpuShares = config.invokerCoreShare.toInt,
+      environment = Map("__OW_API_HOST" -> config.wskApiHost),
+      network = config.invokerContainerNetwork,
+      dnsServers = config.invokerContainerDns,
+      name = Some(name),
+      parameters)
+  }
+
+  /** Perform cleanup on init */
+  override def init(): Unit = cleanup()
+
+  /** Cleans up all running wsk_ containers */
+  override def cleanup(): Unit = {
+    val cleaning = docker.ps(Seq("name" -> s"wsk${instance.toInt}_"))(TransactionId.invokerNanny).flatMap {
+      containers =>
+        val removals = containers.map { id =>
+          runc
+            .resume(id)(TransactionId.invokerNanny)
+            .recoverWith {
+              // Ignore resume failures and try to remove anyway
+              case _ => Future.successful(())
+            }
+            .flatMap { _ =>
+              docker.rm(id)(TransactionId.invokerNanny)
+            }
+        }
+        Future.sequence(removals)
+    }
+    Await.ready(cleaning, 30.seconds)
+  }
+}
+
+object DockerContainerFactoryProvider extends ContainerFactoryProvider {
+  override def getContainerFactory(actorSystem: ActorSystem,
+                                   logging: Logging,
+                                   config: WhiskConfig,
+                                   instanceId: InstanceId,
+                                   parameters: Map[String, Set[String]]): ContainerFactory =
+    new DockerContainerFactory(config, instanceId, parameters)(actorSystem, actorSystem.dispatcher, logging)
+}
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/RuncClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/RuncClient.scala
index 1a488b8..c398765 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/RuncClient.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/RuncClient.scala
@@ -25,6 +25,7 @@ import scala.util.Success
 import whisk.common.LoggingMarkers
 import whisk.common.Logging
 import akka.event.Logging.ErrorLevel
+import whisk.core.containerpool.ContainerId
 
 /**
  * Serves as interface to the docker CLI tool.
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 8a3d750..5148824 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -19,15 +19,11 @@ package whisk.core.invoker
 
 import java.nio.charset.StandardCharsets
 import java.time.Instant
-
-import scala.concurrent.Await
 import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.util.Failure
 import scala.util.Success
-
 import org.apache.kafka.common.errors.RecordTooLargeException
-
 import akka.actor.ActorRefFactory
 import akka.actor.ActorSystem
 import akka.actor.Props
@@ -42,16 +38,13 @@ import whisk.core.connector.CompletionMessage
 import whisk.core.connector.MessageFeed
 import whisk.core.connector.MessageProducer
 import whisk.core.connector.MessagingProvider
+import whisk.core.containerpool.ContainerFactoryProvider
 import whisk.core.containerpool.ContainerPool
 import whisk.core.containerpool.ContainerProxy
 import whisk.core.containerpool.PrewarmingConfig
 import whisk.core.containerpool.Run
-import whisk.core.containerpool.docker.DockerClientWithFileAccess
-import whisk.core.containerpool.docker.DockerContainer
-import whisk.core.containerpool.docker.RuncClient
 import whisk.core.database.NoDocumentException
 import whisk.core.entity._
-import whisk.core.entity.ExecManifest.ImageName
 import whisk.core.entity.size._
 import whisk.http.Messages
 import whisk.spi.SpiLoader
@@ -61,6 +54,7 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
   logging: Logging) {
 
   implicit val ec = actorSystem.dispatcher
+  implicit val cfg = config
 
   /** Initialize needed databases */
   private val entityStore = WhiskEntityStore.datastore(config)
@@ -81,53 +75,22 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
     new MessageFeed("activation", logging, consumer, maximumContainers, 500.milliseconds, processActivationMessage)
   })
 
-  /** Initialize container clients */
-  implicit val docker = new DockerClientWithFileAccess()(ec)
-  implicit val runc = new RuncClient(ec)
-
-  /** Cleans up all running wsk_ containers */
-  def cleanup() = {
-    val cleaning = docker.ps(Seq("name" -> s"wsk${instance.toInt}_"))(TransactionId.invokerNanny).flatMap {
-      containers =>
-        val removals = containers.map { id =>
-          runc
-            .resume(id)(TransactionId.invokerNanny)
-            .recoverWith {
-              // Ignore resume failures and try to remove anyway
-              case _ => Future.successful(())
-            }
-            .flatMap { _ =>
-              docker.rm(id)(TransactionId.invokerNanny)
-            }
-        }
-        Future.sequence(removals)
-    }
-
-    Await.ready(cleaning, 30.seconds)
-  }
-  cleanup()
-  sys.addShutdownHook(cleanup())
-
   /** Factory used by the ContainerProxy to physically create a new container. */
   val containerFactory =
-    (tid: TransactionId, name: String, actionImage: ImageName, userProvidedImage: Boolean, memory: ByteSize) => {
-      val image = if (userProvidedImage) {
-        actionImage.publicImageName
-      } else {
-        actionImage.localImageName(config.dockerRegistry, config.dockerImagePrefix, Some(config.dockerImageTag))
-      }
-
-      DockerContainer.create(
-        tid,
-        image = image,
-        userProvidedImage = userProvidedImage,
-        memory = memory,
-        cpuShares = config.invokerCoreShare.toInt,
-        environment = Map("__OW_API_HOST" -> config.wskApiHost),
-        network = config.invokerContainerNetwork,
-        dnsServers = config.invokerContainerDns,
-        name = Some(name))
-    }
+    SpiLoader
+      .get[ContainerFactoryProvider]
+      .getContainerFactory(
+        actorSystem,
+        logging,
+        config,
+        instance,
+        Map(
+          "--cap-drop" -> Set("NET_RAW", "NET_ADMIN"),
+          "--ulimit" -> Set("nofile=1024:1024"),
+          "--pids-limit" -> Set("1024"),
+          "--dns" -> config.invokerContainerDns.toSet))
+  containerFactory.init()
+  sys.addShutdownHook(containerFactory.cleanup())
 
   /** Sends an active-ack. */
   val ack = (tid: TransactionId,
@@ -164,7 +127,8 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
   }
 
   /** Creates a ContainerProxy Actor when being called. */
-  val childFactory = (f: ActorRefFactory) => f.actorOf(ContainerProxy.props(containerFactory, ack, store, instance))
+  val childFactory = (f: ActorRefFactory) =>
+    f.actorOf(ContainerProxy.props(containerFactory.createContainer _, ack, store, instance))
 
   val prewarmKind = "nodejs:6"
   val prewarmExec = ExecManifest.runtimesManifest
diff --git a/tests/src/test/scala/actionContainers/ActionContainer.scala b/tests/src/test/scala/actionContainers/ActionContainer.scala
index f60cbfe..94f6763 100644
--- a/tests/src/test/scala/actionContainers/ActionContainer.scala
+++ b/tests/src/test/scala/actionContainers/ActionContainer.scala
@@ -164,7 +164,7 @@ object ActionContainer {
   }
 
   private def syncPost(host: String, port: Int, endPoint: String, content: JsValue): (Int, Option[JsObject]) = {
-    whisk.core.containerpool.docker.HttpUtils.post(host, port, endPoint, content)
+    whisk.core.containerpool.HttpUtils.post(host, port, endPoint, content)
   }
 
   private class ActionContainerImpl() extends ActionContainer {
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
index 809165a..0e0867a 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
@@ -36,9 +36,9 @@ import org.scalatest.FlatSpec
 import org.scalatest.Matchers
 
 import spray.json.JsObject
-import whisk.core.containerpool.docker.HttpUtils
-import whisk.core.entity.ActivationResponse._
+import whisk.core.containerpool.HttpUtils
 import whisk.core.entity.size._
+import whisk.core.entity.ActivationResponse._
 
 /**
  * Unit tests for HttpUtils which communicate with containers.
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
index 9146034..1235b92 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
@@ -23,21 +23,19 @@ import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.Future
 import scala.concurrent.duration.DurationInt
 import scala.concurrent.duration.FiniteDuration
-
 import org.junit.runner.RunWith
 import org.scalatest.BeforeAndAfterEach
 import org.scalatest.FlatSpec
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.Matchers
-
 import common.StreamLogging
 import whisk.common.LogMarker
 import whisk.common.LoggingMarkers.INVOKER_DOCKER_CMD
 import whisk.common.TransactionId
-import whisk.core.containerpool.docker.ContainerId
-import whisk.core.containerpool.docker.ContainerIp
 import whisk.core.containerpool.docker.DockerClient
 import scala.concurrent.Promise
+import whisk.core.containerpool.ContainerId
+import whisk.core.containerpool.ContainerAddress
 import whisk.utils.retry
 
 @RunWith(classOf[JUnitRunner])
@@ -171,7 +169,7 @@ class DockerClientTests extends FlatSpec with Matchers with StreamLogging with B
 
     val network = "userland"
     val inspectArgs = Seq("--format", s"{{.NetworkSettings.Networks.${network}.IPAddress}}", id.asString)
-    runAndVerify(dc.inspectIPAddress(id, network), "inspect", inspectArgs) shouldBe ContainerIp(stdout)
+    runAndVerify(dc.inspectIPAddress(id, network), "inspect", inspectArgs) shouldBe ContainerAddress(stdout)
 
     val image = "image"
     val runArgs = Seq("--memory", "256m", "--cpushares", "1024")
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala
index 8bf736d..7c6acbf 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala
@@ -40,8 +40,8 @@ import org.scalatest.fixture.{FlatSpec => FixtureFlatSpec}
 import common.StreamLogging
 import spray.json._
 import whisk.common.TransactionId
-import whisk.core.containerpool.docker.ContainerId
-import whisk.core.containerpool.docker.ContainerIp
+import whisk.core.containerpool.ContainerId
+import whisk.core.containerpool.ContainerAddress
 import whisk.core.containerpool.docker.DockerClientWithFileAccess
 
 @RunWith(classOf[JUnitRunner])
@@ -57,25 +57,25 @@ class DockerClientWithFileAccessTestsIp extends FlatSpec with Matchers with Stre
   val dockerCommand = "docker"
   val networkInConfigFile = "networkConfig"
   val networkInDockerInspect = "networkInspect"
-  val ipInConfigFile = ContainerIp("10.0.0.1")
-  val ipInDockerInspect = ContainerIp("10.0.0.2")
+  val ipInConfigFile = ContainerAddress("10.0.0.1")
+  val ipInDockerInspect = ContainerAddress("10.0.0.2")
   val dockerConfig =
     JsObject(
       "NetworkSettings" ->
         JsObject(
           "Networks" ->
             JsObject(networkInConfigFile ->
-              JsObject("IPAddress" -> JsString(ipInConfigFile.asString)))))
+              JsObject("IPAddress" -> JsString(ipInConfigFile.host)))))
 
   /** Returns a DockerClient with mocked results */
-  def dockerClient(execResult: Future[String] = Future.successful(ipInDockerInspect.asString),
+  def dockerClient(execResult: Future[String] = Future.successful(ipInDockerInspect.host),
                    readResult: Future[JsObject] = Future.successful(dockerConfig)) =
     new DockerClientWithFileAccess()(global) {
       override val dockerCmd = Seq(dockerCommand)
       override def executeProcess(args: String*)(implicit ec: ExecutionContext) = execResult
       override def configFileContents(configFile: File) = readResult
       // Make protected ipAddressFromFile available for testing - requires reflectiveCalls
-      def publicIpAddressFromFile(id: ContainerId, network: String): Future[ContainerIp] =
+      def publicIpAddressFromFile(id: ContainerId, network: String): Future[ContainerAddress] =
         ipAddressFromFile(id, network)
     }
 
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
index 1451c5d..47e67c2 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -21,7 +21,6 @@ import java.io.IOException
 import java.nio.ByteBuffer
 import java.nio.charset.StandardCharsets
 import java.time.Instant
-
 import scala.collection.mutable
 import scala.concurrent.Await
 import scala.concurrent.duration._
@@ -71,12 +70,12 @@ class DockerContainerTests
    * Constructs a testcontainer with overridden IO methods. Results of the override can be provided
    * as parameters.
    */
-  def dockerContainer(id: ContainerId = containerId, ip: ContainerIp = ContainerIp("ip"))(
+  def dockerContainer(id: ContainerId = containerId, addr: ContainerAddress = ContainerAddress("ip"))(
     ccRes: Future[RunResult] =
       Future.successful(RunResult(intervalOf(1.millisecond), Right(ContainerResponse(true, "", None)))),
     retryCount: Int = 0)(implicit docker: DockerApiWithFileAccess, runc: RuncApi): DockerContainer = {
 
-    new DockerContainer(id, ip) {
+    new DockerContainer(id, addr) {
       override protected def callContainer(path: String,
                                            body: JsObject,
                                            timeout: FiniteDuration,
@@ -94,6 +93,10 @@ class DockerContainerTests
   behavior of "DockerContainer"
 
   implicit val transid = TransactionId.testing
+  val parameters = Map(
+    "--cap-drop" -> Set("NET_RAW", "NET_ADMIN"),
+    "--ulimit" -> Set("nofile=1024:1024"),
+    "--pids-limit" -> Set("1024"))
 
   /*
    * CONTAINER CREATION
@@ -108,7 +111,6 @@ class DockerContainerTests
     val environment = Map("test" -> "hi")
     val network = "testwork"
     val name = "myContainer"
-
     val container = DockerContainer.create(
       transid = transid,
       image = image,
@@ -116,7 +118,8 @@ class DockerContainerTests
       cpuShares = cpuShares,
       environment = environment,
       network = network,
-      name = Some(name))
+      name = Some(name),
+      dockerRunParameters = parameters)
 
     await(container)
 
@@ -148,7 +151,11 @@ class DockerContainerTests
     implicit val docker = new TestDockerClient
     implicit val runc = stub[RuncApi]
 
-    val container = DockerContainer.create(transid = transid, image = "image", userProvidedImage = true)
+    val container = DockerContainer.create(
+      transid = transid,
+      image = "image",
+      userProvidedImage = true,
+      dockerRunParameters = parameters)
     await(container)
 
     docker.pulls should have size 1
@@ -160,14 +167,14 @@ class DockerContainerTests
   it should "remove the container if inspect fails" in {
     implicit val docker = new TestDockerClient {
       override def inspectIPAddress(id: ContainerId,
-                                    network: String)(implicit transid: TransactionId): Future[ContainerIp] = {
+                                    network: String)(implicit transid: TransactionId): Future[ContainerAddress] = {
         inspects += ((id, network))
         Future.failed(new RuntimeException())
       }
     }
     implicit val runc = stub[RuncApi]
 
-    val container = DockerContainer.create(transid = transid, image = "image")
+    val container = DockerContainer.create(transid = transid, image = "image", dockerRunParameters = parameters)
     a[WhiskContainerStartupError] should be thrownBy await(container)
 
     docker.pulls should have size 0
@@ -186,7 +193,11 @@ class DockerContainerTests
     }
     implicit val runc = stub[RuncApi]
 
-    val container = DockerContainer.create(transid = transid, image = "image", userProvidedImage = true)
+    val container = DockerContainer.create(
+      transid = transid,
+      image = "image",
+      userProvidedImage = true,
+      dockerRunParameters = parameters)
     a[WhiskContainerStartupError] should be thrownBy await(container)
 
     docker.pulls should have size 1
@@ -198,14 +209,18 @@ class DockerContainerTests
   it should "provide a proper error if inspect fails for blackbox containers" in {
     implicit val docker = new TestDockerClient {
       override def inspectIPAddress(id: ContainerId,
-                                    network: String)(implicit transid: TransactionId): Future[ContainerIp] = {
+                                    network: String)(implicit transid: TransactionId): Future[ContainerAddress] = {
         inspects += ((id, network))
         Future.failed(new RuntimeException())
       }
     }
     implicit val runc = stub[RuncApi]
 
-    val container = DockerContainer.create(transid = transid, image = "image", userProvidedImage = true)
+    val container = DockerContainer.create(
+      transid = transid,
+      image = "image",
+      userProvidedImage = true,
+      dockerRunParameters = parameters)
     a[WhiskContainerStartupError] should be thrownBy await(container)
 
     docker.pulls should have size 1
@@ -223,7 +238,11 @@ class DockerContainerTests
     }
     implicit val runc = stub[RuncApi]
 
-    val container = DockerContainer.create(transid = transid, image = "image", userProvidedImage = true)
+    val container = DockerContainer.create(
+      transid = transid,
+      image = "image",
+      userProvidedImage = true,
+      dockerRunParameters = parameters)
     a[BlackboxStartupError] should be thrownBy await(container)
 
     docker.pulls should have size 1
@@ -240,7 +259,7 @@ class DockerContainerTests
     implicit val runc = stub[RuncApi]
 
     val id = ContainerId("id")
-    val container = new DockerContainer(id, ContainerIp("ip"))
+    val container = new DockerContainer(id, ContainerAddress("ip"))
 
     container.suspend()
     container.resume()
@@ -254,7 +273,7 @@ class DockerContainerTests
     implicit val runc = stub[RuncApi]
 
     val id = ContainerId("id")
-    val container = new DockerContainer(id, ContainerIp("ip"))
+    val container = new DockerContainer(id, ContainerAddress("ip"))
 
     container.destroy()
 
@@ -632,9 +651,10 @@ class DockerContainerTests
       Future.successful(ContainerId("testId"))
     }
 
-    def inspectIPAddress(id: ContainerId, network: String)(implicit transid: TransactionId): Future[ContainerIp] = {
+    def inspectIPAddress(id: ContainerId, network: String)(
+      implicit transid: TransactionId): Future[ContainerAddress] = {
       inspects += ((id, network))
-      Future.successful(ContainerIp("testIp"))
+      Future.successful(ContainerAddress("testIp"))
     }
 
     def pause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] = {
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/RuncClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/RuncClientTests.scala
index 4d2db5e..b4e2e47 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/RuncClientTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/RuncClientTests.scala
@@ -30,7 +30,7 @@ import scala.concurrent.Await
 import org.scalatest.Matchers
 import whisk.core.containerpool.docker.RuncClient
 import common.StreamLogging
-import whisk.core.containerpool.docker.ContainerId
+import whisk.core.containerpool.ContainerId
 import whisk.common.TransactionId
 import org.scalatest.BeforeAndAfterEach
 import whisk.common.LogMarker
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 0f1e1b6..ca890a3 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -18,18 +18,15 @@
 package whisk.core.containerpool.test
 
 import java.time.Instant
-
 import scala.concurrent.Future
 import scala.concurrent.Promise
 import scala.concurrent.duration._
-
 import org.junit.runner.RunWith
 import org.scalamock.scalatest.MockFactory
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FlatSpecLike
 import org.scalatest.Matchers
 import org.scalatest.junit.JUnitRunner
-
 import akka.actor.ActorRef
 import akka.actor.ActorSystem
 import akka.actor.FSM
@@ -39,8 +36,11 @@ import akka.actor.FSM.Transition
 import akka.testkit.ImplicitSender
 import akka.testkit.TestKit
 import common.LoggedFunction
+import common.StreamLogging
+import scala.concurrent.ExecutionContext
 import spray.json._
 import spray.json.DefaultJsonProtocol._
+import whisk.common.Logging
 import whisk.common.TransactionId
 import whisk.core.connector.ActivationMessage
 import whisk.core.containerpool._
@@ -56,11 +56,13 @@ class ContainerProxyTests
     with FlatSpecLike
     with Matchers
     with BeforeAndAfterAll
-    with MockFactory {
+    with MockFactory
+    with StreamLogging {
 
   override def afterAll = TestKit.shutdownActorSystem(system)
 
   val timeout = 5.seconds
+  val log = logging
 
   // Common entities to pass to the tests. We don't really care what's inside
   // those for the behavior testing here, as none of the contents will really
@@ -504,6 +506,10 @@ class ContainerProxyTests
    * Implements all the good cases of a perfect run to facilitate error case overriding.
    */
   class TestContainer extends Container {
+    protected val id = ContainerId("testcontainer")
+    protected val addr = ContainerAddress("0.0.0.0")
+    protected implicit val logging: Logging = log
+    protected implicit val ec: ExecutionContext = system.dispatcher
     var suspendCount = 0
     var resumeCount = 0
     var destroyCount = 0
@@ -519,18 +525,18 @@ class ContainerProxyTests
       resumeCount += 1
       Future.successful(())
     }
-    def destroy()(implicit transid: TransactionId): Future[Unit] = {
+    override def destroy()(implicit transid: TransactionId): Future[Unit] = {
       destroyCount += 1
-      Future.successful(())
+      super.destroy()
     }
-    def initialize(initializer: JsObject, timeout: FiniteDuration)(
+    override def initialize(initializer: JsObject, timeout: FiniteDuration)(
       implicit transid: TransactionId): Future[Interval] = {
       initializeCount += 1
       initializer shouldBe action.containerInitializer
       timeout shouldBe action.limits.timeout.duration
       Future.successful(Interval.zero)
     }
-    def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration)(
+    override def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration)(
       implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
       runCount += 1
       environment.fields("api_key") shouldBe message.user.authkey.toJson

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].