You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2018/07/26 21:25:47 UTC

[incubator-openwhisk] branch master updated: Introduce a ContainerClient interface and an akka based implementation. (#3812)

This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 15bb04a  Introduce a ContainerClient interface and an akka based implementation. (#3812)
15bb04a is described below

commit 15bb04a449f621d262c2687a7b8417241f3856b8
Author: tysonnorris <ty...@gmail.com>
AuthorDate: Thu Jul 26 14:25:44 2018 -0700

    Introduce a ContainerClient interface and an akka based implementation. (#3812)
    
    HttpUtils (http client for invoker -> action container) uses org.apache.http client that is synchronous and poor performing for concurrent requests. I ran into problems using it with concurrent activation support. Instead of trying to force that client to work, this is work towards replacing it (or re-replacing it) with akka http based client.
---
 .../src/main/scala/whisk/common/Logging.scala      |   3 +
 .../core/containerpool/AkkaContainerClient.scala   | 222 +++++++++++++++++++++
 ...s.scala => ApacheBlockingContainerClient.scala} |  52 +++--
 .../scala/whisk/core/containerpool/Container.scala |  34 +++-
 .../whisk/core/containerpool/ContainerClient.scala |  30 +++
 .../core/containerpool/ContainerFactory.scala      |   2 +-
 .../whisk/core/database/CouchDbRestStore.scala     |   7 +-
 .../main/scala/whisk/core/mesos/MesosTask.scala    |   3 +-
 .../main/scala/whisk/http/PoolingRestClient.scala  |  23 ++-
 core/invoker/src/main/resources/application.conf   |   1 +
 .../containerpool/docker/DockerContainer.scala     |  50 +++--
 .../kubernetes/KubernetesContainer.scala           |   4 +-
 .../scala/actionContainers/ActionContainer.scala   |  11 +-
 .../docker/test/AkkaContainerClientTests.scala     | 214 ++++++++++++++++++++
 ...la => ApacheBlockingContainerClientTests.scala} |  66 ++++--
 .../kubernetes/test/KubernetesClientTests.scala    |   8 +-
 .../test/DockerToActivationLogStoreTests.scala     |   5 +-
 .../mesos/test/MesosContainerFactoryTest.scala     |   2 +-
 .../containerpool/test/ContainerPoolTests.scala    |  28 +--
 .../containerpool/test/ContainerProxyTests.scala   |   3 +-
 20 files changed, 660 insertions(+), 108 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala
index 56ab754..31d42a7 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -250,6 +250,7 @@ object LoggingMarkers {
   private val activation = "activation"
   private val kafka = "kafka"
   private val loadbalancer = "loadbalancer"
+  private val containerClient = "containerClient"
 
   /*
    * Controller related markers
@@ -297,6 +298,8 @@ object LoggingMarkers {
   def INVOKER_KUBECTL_CMD(cmd: String) = LogMarkerToken(invoker, "kubectl", start, Some(cmd), Map("cmd" -> cmd))
   def INVOKER_CONTAINER_START(containerState: String) =
     LogMarkerToken(invoker, "containerStart", count, Some(containerState), Map("containerState" -> containerState))
+  val CONTAINER_CLIENT_RETRIES =
+    LogMarkerToken(containerClient, "retries", count)
 
   // Kafka related markers
   def KAFKA_QUEUE(topic: String) = LogMarkerToken(kafka, topic, count)
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/AkkaContainerClient.scala b/common/scala/src/main/scala/whisk/core/containerpool/AkkaContainerClient.scala
new file mode 100644
index 0000000..a133989
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/containerpool/AkkaContainerClient.scala
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import akka.http.scaladsl.marshalling.Marshal
+import akka.http.scaladsl.model.HttpMethods
+import akka.http.scaladsl.model.HttpRequest
+import akka.http.scaladsl.model.HttpResponse
+import akka.http.scaladsl.model.MediaTypes
+import akka.http.scaladsl.model.MessageEntity
+import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.model.headers.Accept
+import akka.http.scaladsl.model.headers.Connection
+import akka.http.scaladsl.unmarshalling.Unmarshal
+import akka.stream.StreamTcpException
+import akka.stream.scaladsl.Sink
+import akka.stream.scaladsl.Source
+import akka.util.ByteString
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.TimeoutException
+import scala.concurrent.duration._
+import scala.util.Try
+import scala.util.control.NonFatal
+import spray.json._
+import whisk.common.Logging
+import whisk.common.LoggingMarkers.CONTAINER_CLIENT_RETRIES
+import whisk.common.MetricEmitter
+import whisk.common.TransactionId
+import whisk.core.entity.ActivationResponse.ContainerHttpError
+import whisk.core.entity.ActivationResponse._
+import whisk.core.entity.ByteSize
+import whisk.core.entity.size.SizeLong
+import whisk.http.PoolingRestClient
+
+/**
+ * This HTTP client is used only in the invoker to communicate with the action container.
+ * It allows to POST a JSON object and receive JSON object back; that is the
+ * content type and the accept headers are both 'application/json.
+ * This implementation uses the akka http host-level client API.
+ *
+ * @param hostname the host name
+ * @param port the port
+ * @param timeout the timeout in msecs to wait for a response
+ * @param maxResponse the maximum size in bytes the connection will accept
+ * @param queueSize once all connections are used, how big of queue to allow for additional requests
+ * @param retryInterval duration between retries for TCP connection errors
+ */
+protected class AkkaContainerClient(
+  hostname: String,
+  port: Int,
+  timeout: FiniteDuration,
+  maxResponse: ByteSize,
+  queueSize: Int,
+  retryInterval: FiniteDuration = 100.milliseconds)(implicit logging: Logging, as: ActorSystem)
+    extends PoolingRestClient("http", hostname, port, queueSize, timeout = Some(timeout))
+    with ContainerClient {
+
+  def close() = Await.result(shutdown(), 30.seconds)
+
+  /**
+   * Posts to hostname/endpoint the given JSON object.
+   * Waits up to timeout before aborting on a good connection.
+   * If the endpoint is not ready, retry up to timeout.
+   * Every retry reduces the available timeout so that this method should not
+   * wait longer than the total timeout (within a small slack allowance).
+   *
+   * @param endpoint the path the api call relative to hostname
+   * @param body the JSON value to post (this is usually a JSON objecT)
+   * @param retry whether or not to retry on connection failure
+   * @return Left(Error Message) or Right(Status Code, Response as UTF-8 String)
+   */
+  def post(endpoint: String, body: JsValue, retry: Boolean)(
+    implicit tid: TransactionId): Future[Either[ContainerHttpError, ContainerResponse]] = {
+
+    //create the request
+    val req = Marshal(body).to[MessageEntity].map { b =>
+      //DO NOT reuse the connection
+      //For details on Connection: Close handling, see:
+      // - https://doc.akka.io/docs/akka-http/current/common/http-model.html#http-headers
+      // - http://github.com/akka/akka-http/tree/v10.1.3/akka-http-core/src/test/scala/akka/http/impl/engine/rendering/ResponseRendererSpec.scala#L470-L571
+      HttpRequest(HttpMethods.POST, endpoint, entity = b)
+        .withHeaders(Connection("close"), Accept(MediaTypes.`application/json`))
+    }
+
+    retryingRequest(req, timeout, retry)
+      .flatMap {
+        case (response, retries) => {
+          if (retries > 0) {
+            logging.debug(this, s"completed request to $endpoint after $retries retries")
+            MetricEmitter.emitHistogramMetric(CONTAINER_CLIENT_RETRIES, retries)
+          }
+
+          response.entity.contentLengthOption match {
+            case Some(contentLength) if response.status != StatusCodes.NoContent =>
+              if (contentLength <= maxResponse.toBytes) {
+                Unmarshal(response.entity.withSizeLimit(maxResponse.toBytes)).to[String].map { o =>
+                  Right(ContainerResponse(response.status.intValue, o, None))
+                }
+              } else {
+                truncated(response.entity.dataBytes).map { s =>
+                  Right(ContainerResponse(response.status.intValue, s, Some(contentLength.B, maxResponse)))
+                }
+              }
+            case _ =>
+              //handle missing Content-Length as NoResponseReceived
+              //also handle 204 as NoResponseReceived, for parity with ApacheBlockingContainerClient client
+              response.discardEntityBytes().future.map(_ => Left(NoResponseReceived()))
+          }
+        }
+      }
+      .recover {
+        case t: TimeoutException => Left(Timeout(t))
+        case NonFatal(t)         => Left(ConnectionError(t))
+      }
+  }
+  //returns a Future HttpResponse -> Int (where Int is the retryCount)
+  private def retryingRequest(req: Future[HttpRequest],
+                              timeout: FiniteDuration,
+                              retry: Boolean,
+                              retryCount: Int = 0): Future[(HttpResponse, Int)] = {
+    request(req)
+      .map((_, retryCount))
+      .recoverWith {
+        case t: StreamTcpException if retry =>
+          val newTimeout = timeout - retryInterval
+          if (newTimeout > Duration.Zero) {
+            akka.pattern.after(retryInterval, as.scheduler)(retryingRequest(req, newTimeout, retry, retryCount + 1))
+          } else {
+            logging.warn(
+              this,
+              s"POST failed after $retryCount retries with $t - no more retries because timeout exceeded.")
+            Future.failed(new TimeoutException(t.getMessage))
+          }
+      }
+  }
+
+  private def truncated(responseBytes: Source[ByteString, _],
+                        previouslyCaptured: ByteString = ByteString.empty): Future[String] = {
+    responseBytes.prefixAndTail(1).runWith(Sink.head).flatMap {
+      case (Nil, tail) =>
+        //ignore the tail (MUST CONSUME ENTIRE ENTITY!)
+        tail.runWith(Sink.ignore).map(_ => previouslyCaptured.utf8String)
+      case (Seq(prefix), tail) =>
+        val truncatedResponse = previouslyCaptured ++ prefix
+        if (truncatedResponse.size < maxResponse.toBytes) {
+          truncated(tail, truncatedResponse)
+        } else {
+          //ignore the tail (MUST CONSUME ENTIRE ENTITY!)
+          //captured string MAY be larger than the max response, so take only maxResponse bytes to get the exact length
+          tail.runWith(Sink.ignore).map(_ => truncatedResponse.take(maxResponse.toBytes.toInt).utf8String)
+        }
+    }
+  }
+}
+
+object AkkaContainerClient {
+
+  /** A helper method to post one single request to a connection. Used for container tests. */
+  def post(host: String, port: Int, endPoint: String, content: JsValue, timeout: FiniteDuration)(
+    implicit logging: Logging,
+    as: ActorSystem,
+    ec: ExecutionContext,
+    tid: TransactionId): (Int, Option[JsObject]) = {
+    val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1)
+    val response = executeRequest(connection, endPoint, content)
+    val result = Await.result(response, timeout + 10.seconds) //additional timeout to complete futures
+    connection.close()
+    result
+  }
+
+  /** A helper method to post multiple concurrent requests to a single connection. Used for container tests. */
+  def concurrentPost(host: String, port: Int, endPoint: String, contents: Seq[JsValue], timeout: FiniteDuration)(
+    implicit logging: Logging,
+    tid: TransactionId,
+    as: ActorSystem,
+    ec: ExecutionContext): Seq[(Int, Option[JsObject])] = {
+    val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1)
+    val futureResults = contents.map { executeRequest(connection, endPoint, _) }
+    val results = Await.result(Future.sequence(futureResults), timeout + 10.seconds) //additional timeout to complete futures
+    connection.close()
+    results
+  }
+
+  private def executeRequest(connection: AkkaContainerClient, endpoint: String, content: JsValue)(
+    implicit logging: Logging,
+    as: ActorSystem,
+    ec: ExecutionContext,
+    tid: TransactionId): Future[(Int, Option[JsObject])] = {
+
+    val res = connection
+      .post(endpoint, content, true)
+      .map({
+        case Right(r)                   => (r.statusCode, Try(r.entity.parseJson.asJsObject).toOption)
+        case Left(NoResponseReceived()) => throw new IllegalStateException("no response from container")
+        case Left(Timeout(_))           => throw new java.util.concurrent.TimeoutException()
+        case Left(ConnectionError(t: java.net.SocketTimeoutException)) =>
+          throw new java.util.concurrent.TimeoutException()
+        case Left(ConnectionError(t)) => throw new IllegalStateException(t.getMessage)
+      })
+
+    res
+  }
+}
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala b/common/scala/src/main/scala/whisk/core/containerpool/ApacheBlockingContainerClient.scala
similarity index 85%
rename from common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
rename to common/scala/src/main/scala/whisk/core/containerpool/ApacheBlockingContainerClient.scala
index 4ee7363..a6d71c3 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/ApacheBlockingContainerClient.scala
@@ -38,11 +38,15 @@ import whisk.core.entity.ByteSize
 import whisk.core.entity.size.SizeLong
 
 import scala.annotation.tailrec
+import scala.concurrent._
 import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.concurrent.duration._
 import scala.util.{Failure, Success, Try}
 import scala.util.control.NoStackTrace
 
+// Used internally to wrap all exceptions for which the request can be retried
+protected[containerpool] case class RetryableConnectionError(t: Throwable) extends Exception(t) with NoStackTrace
+
 /**
  * This HTTP client is used only in the invoker to communicate with the action container.
  * It allows to POST a JSON object and receive JSON object back; that is the
@@ -56,13 +60,16 @@ import scala.util.control.NoStackTrace
  * @param maxResponse the maximum size in bytes the connection will accept
  * @param maxConcurrent the maximum number of concurrent requests allowed (Default is 1)
  */
-protected class HttpUtils(hostname: String, timeout: FiniteDuration, maxResponse: ByteSize, maxConcurrent: Int = 1)(
-  implicit logging: Logging) {
+protected class ApacheBlockingContainerClient(hostname: String,
+                                              timeout: FiniteDuration,
+                                              maxResponse: ByteSize,
+                                              maxConcurrent: Int = 1)(implicit logging: Logging, ec: ExecutionContext)
+    extends ContainerClient {
 
   /**
    * Closes the HttpClient and all resources allocated by it.
    *
-   * This will close the HttpClient that is generated for this instance of HttpUtils. That will also cause the
+   * This will close the HttpClient that is generated for this instance of ApacheBlockingContainerClient. That will also cause the
    * ConnectionManager to be closed alongside.
    */
   def close(): Unit = HttpClientUtils.closeQuietly(connection)
@@ -80,7 +87,7 @@ protected class HttpUtils(hostname: String, timeout: FiniteDuration, maxResponse
    * @return Left(Error Message) or Right(Status Code, Response as UTF-8 String)
    */
   def post(endpoint: String, body: JsValue, retry: Boolean)(
-    implicit tid: TransactionId): Either[ContainerHttpError, ContainerResponse] = {
+    implicit tid: TransactionId): Future[Either[ContainerHttpError, ContainerResponse]] = {
     val entity = new StringEntity(body.compactPrint, StandardCharsets.UTF_8)
     entity.setContentType("application/json")
 
@@ -88,12 +95,13 @@ protected class HttpUtils(hostname: String, timeout: FiniteDuration, maxResponse
     request.addHeader(HttpHeaders.ACCEPT, "application/json")
     request.setEntity(entity)
 
-    execute(request, timeout, maxConcurrent, retry)
+    Future {
+      blocking {
+        execute(request, timeout, maxConcurrent, retry)
+      }
+    }
   }
 
-  // Used internally to wrap all exceptions for which the request can be retried
-  private case class RetryableConnectionError(t: Throwable) extends Exception(t) with NoStackTrace
-
   // Annotation will make the compiler complain if no tail recursion is possible
   @tailrec private def execute(request: HttpRequestBase, timeout: FiniteDuration, maxConcurrent: Int, retry: Boolean)(
     implicit tid: TransactionId): Either[ContainerHttpError, ContainerResponse] = {
@@ -191,15 +199,19 @@ protected class HttpUtils(hostname: String, timeout: FiniteDuration, maxResponse
     .build
 }
 
-object HttpUtils {
+object ApacheBlockingContainerClient {
 
   /** A helper method to post one single request to a connection. Used for container tests. */
-  def post(host: String, port: Int, endPoint: String, content: JsValue)(implicit logging: Logging,
-                                                                        tid: TransactionId): (Int, Option[JsObject]) = {
-    val connection = new HttpUtils(s"$host:$port", 90.seconds, 1.MB)
+  def post(host: String, port: Int, endPoint: String, content: JsValue)(
+    implicit logging: Logging,
+    tid: TransactionId,
+    ec: ExecutionContext): (Int, Option[JsObject]) = {
+    val timeout = 90.seconds
+    val connection = new ApacheBlockingContainerClient(s"$host:$port", timeout, 1.MB)
     val response = executeRequest(connection, endPoint, content)
+    val result = Await.result(response, timeout)
     connection.close()
-    response
+    result
   }
 
   /** A helper method to post multiple concurrent requests to a single connection. Used for container tests. */
@@ -207,17 +219,20 @@ object HttpUtils {
     implicit logging: Logging,
     tid: TransactionId,
     ec: ExecutionContext): Seq[(Int, Option[JsObject])] = {
-    val connection = new HttpUtils(s"$host:$port", 90.seconds, 1.MB, contents.size)
-    val futureResults = contents.map(content => Future { executeRequest(connection, endPoint, content) })
+    val connection = new ApacheBlockingContainerClient(s"$host:$port", 90.seconds, 1.MB, contents.size)
+    val futureResults = contents.map { content =>
+      executeRequest(connection, endPoint, content)
+    }
     val results = Await.result(Future.sequence(futureResults), timeout)
     connection.close()
     results
   }
 
-  private def executeRequest(connection: HttpUtils, endpoint: String, content: JsValue)(
+  private def executeRequest(connection: ApacheBlockingContainerClient, endpoint: String, content: JsValue)(
     implicit logging: Logging,
-    tid: TransactionId): (Int, Option[JsObject]) = {
-    connection.post(endpoint, content, retry = true) match {
+    tid: TransactionId,
+    ec: ExecutionContext): Future[(Int, Option[JsObject])] = {
+    connection.post(endpoint, content, retry = true) map {
       case Right(r)                   => (r.statusCode, Try(r.entity.parseJson.asJsObject).toOption)
       case Left(NoResponseReceived()) => throw new IllegalStateException("no response from container")
       case Left(Timeout(_))           => throw new java.util.concurrent.TimeoutException()
@@ -225,5 +240,6 @@ object HttpUtils {
         throw new java.util.concurrent.TimeoutException()
       case Left(ConnectionError(t)) => throw new IllegalStateException(t.getMessage)
     }
+
   }
 }
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
index 49c692b..7c46615 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
@@ -17,11 +17,11 @@
 
 package whisk.core.containerpool
 
+import akka.actor.ActorSystem
 import java.time.Instant
-
 import akka.stream.scaladsl.Source
 import akka.util.ByteString
-
+import pureconfig._
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration.Duration
@@ -38,9 +38,10 @@ import whisk.core.entity.ActivationResponse
 import whisk.core.entity.ActivationResponse.ContainerConnectionError
 import whisk.core.entity.ActivationResponse.ContainerResponse
 import whisk.core.entity.ByteSize
-import whisk.core.entity.size._
 import whisk.http.Messages
 import akka.event.Logging.InfoLevel
+import whisk.core.ConfigKeys
+import whisk.core.entity.ActivationEntityLimit
 
 /**
  * An OpenWhisk biased container abstraction. This is **not only** an abstraction
@@ -56,13 +57,17 @@ case class ContainerAddress(val host: String, val port: Int = 8080) {
 
 trait Container {
 
+  implicit protected val as: ActorSystem
   protected val id: ContainerId
   protected val addr: ContainerAddress
   protected implicit val logging: Logging
   protected implicit val ec: ExecutionContext
 
+  protected[containerpool] val config: ContainerPoolConfig =
+    loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool)
+
   /** HTTP connection to the container, will be lazily established by callContainer */
-  protected var httpConnection: Option[HttpUtils] = None
+  protected var httpConnection: Option[ContainerClient] = None
 
   /** Stops the container from consuming CPU cycles. */
   def suspend()(implicit transid: TransactionId): Future[Unit]
@@ -166,16 +171,23 @@ trait Container {
     implicit transid: TransactionId): Future[RunResult] = {
     val started = Instant.now()
     val http = httpConnection.getOrElse {
-      val conn = new HttpUtils(s"${addr.host}:${addr.port}", timeout, 1.MB)
+      val conn = if (config.akkaClient) {
+        new AkkaContainerClient(addr.host, addr.port, timeout, ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT, 1024)
+      } else {
+        new ApacheBlockingContainerClient(
+          s"${addr.host}:${addr.port}",
+          timeout,
+          ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT)
+      }
       httpConnection = Some(conn)
       conn
     }
-    Future {
-      http.post(path, body, retry)
-    }.map { response =>
-      val finished = Instant.now()
-      RunResult(Interval(started, finished), response)
-    }
+    http
+      .post(path, body, retry)
+      .map { response =>
+        val finished = Instant.now()
+        RunResult(Interval(started, finished), response)
+      }
   }
 }
 
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/ContainerClient.scala b/common/scala/src/main/scala/whisk/core/containerpool/ContainerClient.scala
new file mode 100644
index 0000000..dfcd231
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/containerpool/ContainerClient.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool
+
+import scala.concurrent.Future
+import spray.json._
+import whisk.common.TransactionId
+import whisk.core.entity.ActivationResponse.ContainerHttpError
+import whisk.core.entity.ActivationResponse._
+
+trait ContainerClient extends AutoCloseable {
+  def post(endpoint: String, body: JsValue, retry: Boolean)(
+    implicit tid: TransactionId): Future[Either[ContainerHttpError, ContainerResponse]]
+
+}
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
index 5c972f0..3c56cf9 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
@@ -31,7 +31,7 @@ case class ContainerArgsConfig(network: String,
                                dnsServers: Seq[String] = Seq.empty,
                                extraArgs: Map[String, Set[String]] = Map.empty)
 
-case class ContainerPoolConfig(numCore: Int, coreShare: Int) {
+case class ContainerPoolConfig(numCore: Int, coreShare: Int, akkaClient: Boolean) {
 
   /**
    * The total number of containers is simply the number of cores dilated by the cpu sharing.
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
index 1e43eb7..aa35358 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
@@ -23,6 +23,8 @@ import akka.http.scaladsl.model._
 import akka.stream.ActorMaterializer
 import akka.stream.scaladsl._
 import akka.util.ByteString
+import scala.concurrent.Await
+import scala.concurrent.duration._
 import spray.json._
 import whisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
 import whisk.core.database.StoreUtils._
@@ -30,8 +32,7 @@ import whisk.core.entity.Attachments.Attached
 import whisk.core.entity.{BulkEntityResult, DocInfo, DocumentReader, UUID}
 import whisk.http.Messages
 
-import scala.concurrent.duration._
-import scala.concurrent.{Await, Future}
+import scala.concurrent.Future
 import scala.util.Try
 
 /**
@@ -513,7 +514,7 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
       .getOrElse(Future.successful(true)) // For CouchDB it is expected that the entire document is deleted.
 
   override def shutdown(): Unit = {
-    Await.ready(client.shutdown(), 1.minute)
+    Await.result(client.shutdown(), 30.seconds)
     attachmentStore.foreach(_.shutdown())
   }
 
diff --git a/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala b/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala
index 0e50543..db3f87c 100644
--- a/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala
+++ b/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala
@@ -119,7 +119,7 @@ object MesosTask {
       log.info(this, s"launched task with state ${taskDetails.taskStatus.getState} at ${taskHost}:${taskPort}")
       val containerIp = new ContainerAddress(taskHost, taskPort)
       val containerId = new ContainerId(taskId);
-      new MesosTask(containerId, containerIp, ec, log, taskId, mesosClientActor, mesosConfig)
+      new MesosTask(containerId, containerIp, ec, log, as, taskId, mesosClientActor, mesosConfig)
     })
 
   }
@@ -134,6 +134,7 @@ class MesosTask(override protected val id: ContainerId,
                 override protected val addr: ContainerAddress,
                 override protected val ec: ExecutionContext,
                 override protected val logging: Logging,
+                override protected val as: ActorSystem,
                 taskId: String,
                 mesosClientActor: ActorRef,
                 mesosConfig: MesosConfig)
diff --git a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
index 5e24b29..909fffb 100644
--- a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
+++ b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
@@ -22,12 +22,13 @@ import akka.http.scaladsl.Http
 import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
 import akka.http.scaladsl.marshalling._
 import akka.http.scaladsl.model._
+import akka.http.scaladsl.settings.ConnectionPoolSettings
 import akka.http.scaladsl.unmarshalling._
 import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
 import akka.stream.scaladsl.{Flow, _}
 import spray.json._
-
 import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.concurrent.duration._
 import scala.util.{Failure, Success, Try}
 
 /**
@@ -43,18 +44,24 @@ class PoolingRestClient(
   host: String,
   port: Int,
   queueSize: Int,
-  httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None)(
-  implicit system: ActorSystem) {
+  httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
+  timeout: Option[FiniteDuration] = None)(implicit system: ActorSystem) {
   require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.")
 
   protected implicit val context: ExecutionContext = system.dispatcher
   protected implicit val materializer: ActorMaterializer = ActorMaterializer()
 
+  //if specified, override the ClientConnection idle-timeout value
+  private val timeoutSettings = {
+    val ps = ConnectionPoolSettings(system.settings.config)
+    timeout.map(t => ps.withUpdatedConnectionSettings(_.withIdleTimeout(t))).getOrElse(ps)
+  }
+
   // Creates or retrieves a connection pool for the host.
   private val pool = if (protocol == "http") {
-    Http().cachedHostConnectionPool[Promise[HttpResponse]](host = host, port = port)
+    Http().cachedHostConnectionPool[Promise[HttpResponse]](host = host, port = port, settings = timeoutSettings)
   } else {
-    Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host = host, port = port)
+    Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host = host, port = port, settings = timeoutSettings)
   }
 
   // Additional queue in case all connections are busy. Should hardly ever be
@@ -64,8 +71,10 @@ class PoolingRestClient(
     .queue(queueSize, OverflowStrategy.dropNew)
     .via(httpFlow.getOrElse(pool))
     .toMat(Sink.foreach({
-      case ((Success(response), p)) => p.success(response)
-      case ((Failure(error), p))    => p.failure(error)
+      case (Success(response), p) =>
+        p.success(response)
+      case (Failure(error), p) =>
+        p.failure(error)
     }))(Keep.left)
     .run
 
diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index 00c3339..8edb386 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -29,6 +29,7 @@ whisk {
   container-pool {
     num-core: 4      # used for computing --cpushares, and max number of containers allowed
     core-share: 2    # used for computing --cpushares, and max number of containers allowed
+    akka-client:  false # if true, use PoolingContainerClient for HTTP from invoker to action container (otherwise use ApacheBlockingContainerClient)
   }
 
   kubernetes {
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
index 499ae42..99ff3c3 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
@@ -20,12 +20,10 @@ package whisk.core.containerpool.docker
 import java.time.Instant
 import java.util.concurrent.TimeoutException
 import java.util.concurrent.atomic.AtomicLong
-
 import akka.actor.ActorSystem
 import akka.stream._
 import akka.stream.scaladsl.Framing.FramingException
 import spray.json._
-
 import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.duration._
 import whisk.common.Logging
@@ -166,7 +164,7 @@ class DockerContainer(protected val id: ContainerId,
                       protected val addr: ContainerAddress,
                       protected val useRunc: Boolean)(implicit docker: DockerApiWithFileAccess,
                                                       runc: RuncApi,
-                                                      as: ActorSystem,
+                                                      override protected val as: ActorSystem,
                                                       protected val ec: ExecutionContext,
                                                       protected val logging: Logging)
     extends Container {
@@ -210,29 +208,37 @@ class DockerContainer(protected val id: ContainerId,
     implicit transid: TransactionId): Future[RunResult] = {
     val started = Instant.now()
     val http = httpConnection.getOrElse {
-      val conn = new HttpUtils(s"${addr.host}:${addr.port}", timeout, ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT)
+      val conn = if (config.akkaClient) {
+        new AkkaContainerClient(addr.host, addr.port, timeout, ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT, 1024)
+      } else {
+        new ApacheBlockingContainerClient(
+          s"${addr.host}:${addr.port}",
+          timeout,
+          ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT)
+      }
       httpConnection = Some(conn)
       conn
     }
-    Future {
-      http.post(path, body, retry)
-    }.flatMap { response =>
-      val finished = Instant.now()
 
-      response.left
-        .map {
-          // Only check for memory exhaustion if there was a
-          // terminal connection error.
-          case error: ConnectionError =>
-            isOomKilled().map {
-              case true  => MemoryExhausted()
-              case false => error
-            }
-          case other => Future.successful(other)
-        }
-        .fold(_.map(Left(_)), right => Future.successful(Right(right)))
-        .map(res => RunResult(Interval(started, finished), res))
-    }
+    http
+      .post(path, body, retry)
+      .flatMap { response =>
+        val finished = Instant.now()
+
+        response.left
+          .map {
+            // Only check for memory exhaustion if there was a
+            // terminal connection error.
+            case error: ConnectionError =>
+              isOomKilled().map {
+                case true  => MemoryExhausted()
+                case false => error
+              }
+            case other => Future.successful(other)
+          }
+          .fold(_.map(Left(_)), right => Future.successful(Right(right)))
+          .map(res => RunResult(Interval(started, finished), res))
+      }
   }
 
   /**
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
index 9f0049c..f812add 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -17,15 +17,14 @@
 
 package whisk.core.containerpool.kubernetes
 
+import akka.actor.ActorSystem
 import java.time.Instant
 import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
-
 import akka.stream.StreamLimitReachedException
 import akka.stream.scaladsl.Framing.FramingException
 import akka.stream.scaladsl.Source
 import akka.util.ByteString
 import spray.json._
-
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration._
@@ -91,6 +90,7 @@ class KubernetesContainer(protected[core] val id: ContainerId,
                           protected[core] val addr: ContainerAddress,
                           protected[core] val workerIP: String,
                           protected[core] val nativeContainerId: String)(implicit kubernetes: KubernetesApi,
+                                                                         override protected val as: ActorSystem,
                                                                          protected val ec: ExecutionContext,
                                                                          protected val logging: Logging)
     extends Container {
diff --git a/tests/src/test/scala/actionContainers/ActionContainer.scala b/tests/src/test/scala/actionContainers/ActionContainer.scala
index a168889..69f45ec 100644
--- a/tests/src/test/scala/actionContainers/ActionContainer.scala
+++ b/tests/src/test/scala/actionContainers/ActionContainer.scala
@@ -229,18 +229,21 @@ object ActionContainer {
   }
 
   private def syncPost(host: String, port: Int, endPoint: String, content: JsValue)(
-    implicit logging: Logging): (Int, Option[JsObject]) = {
+    implicit logging: Logging,
+    as: ActorSystem): (Int, Option[JsObject]) = {
 
     implicit val transid = TransactionId.testing
 
-    whisk.core.containerpool.HttpUtils.post(host, port, endPoint, content)
+    whisk.core.containerpool.AkkaContainerClient.post(host, port, endPoint, content, 30.seconds)
   }
   private def concurrentSyncPost(host: String, port: Int, endPoint: String, contents: Seq[JsValue])(
     implicit logging: Logging,
-    ec: ExecutionContext): Seq[(Int, Option[JsObject])] = {
+    ec: ExecutionContext,
+    as: ActorSystem): Seq[(Int, Option[JsObject])] = {
 
     implicit val transid = TransactionId.testing
 
-    whisk.core.containerpool.HttpUtils.concurrentPost(host, port, endPoint, contents, 30.seconds)
+    whisk.core.containerpool.AkkaContainerClient.concurrentPost(host, port, endPoint, contents, 30.seconds)
   }
+
 }
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/AkkaContainerClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/AkkaContainerClientTests.scala
new file mode 100644
index 0000000..1afdeb2
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/AkkaContainerClientTests.scala
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.docker.test
+
+import common.StreamLogging
+import common.WskActorSystem
+import java.nio.charset.StandardCharsets
+import java.time.Instant
+import org.apache.http.HttpRequest
+import org.apache.http.HttpResponse
+import org.apache.http.entity.StringEntity
+import org.apache.http.localserver.LocalServerTestBase
+import org.apache.http.protocol.HttpContext
+import org.apache.http.protocol.HttpRequestHandler
+import org.junit.runner.RunWith
+import org.scalatest.BeforeAndAfter
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import org.scalatest.junit.JUnitRunner
+import scala.concurrent.Await
+import scala.concurrent.TimeoutException
+import scala.concurrent.duration._
+import spray.json.JsObject
+import whisk.common.TransactionId
+import whisk.core.containerpool.AkkaContainerClient
+import whisk.core.entity.ActivationResponse._
+import whisk.core.entity.size._
+
+/**
+ * Unit tests for AkkaContainerClientTests which communicate with containers.
+ */
+@RunWith(classOf[JUnitRunner])
+class AkkaContainerClientTests
+    extends FlatSpec
+    with Matchers
+    with BeforeAndAfter
+    with BeforeAndAfterAll
+    with StreamLogging
+    with WskActorSystem {
+
+  implicit val transid = TransactionId.testing
+  implicit val ec = actorSystem.dispatcher
+
+  var testHang: FiniteDuration = 0.second
+  var testStatusCode: Int = 200
+  var testResponse: String = null
+  var testConnectionFailCount: Int = 0
+
+  val mockServer = new LocalServerTestBase {
+    var failcount = 0
+    override def setUp() = {
+      super.setUp()
+      this.serverBootstrap
+        .registerHandler(
+          "/init",
+          new HttpRequestHandler() {
+            override def handle(request: HttpRequest, response: HttpResponse, context: HttpContext) = {
+              if (testHang.length > 0) {
+                Thread.sleep(testHang.toMillis)
+              }
+              if (testConnectionFailCount > 0 && failcount < testConnectionFailCount) {
+                failcount += 1
+                println("failing in test")
+                throw new RuntimeException("failing...")
+              }
+              response.setStatusCode(testStatusCode);
+              if (testResponse != null) {
+                response.setEntity(new StringEntity(testResponse, StandardCharsets.UTF_8))
+              }
+            }
+          })
+    }
+  }
+
+  mockServer.setUp()
+  val httpHost = mockServer.start()
+  val hostWithPort = s"${httpHost.getHostName}:${httpHost.getPort}"
+
+  before {
+    testHang = 0.second
+    testStatusCode = 200
+    testResponse = null
+    testConnectionFailCount = 0
+    stream.reset()
+  }
+
+  override def afterAll = {
+    mockServer.shutDown()
+  }
+
+  behavior of "AkkaContainerClient"
+
+  it should "not wait longer than set timeout" in {
+    val timeout = 5.seconds
+    val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout, 1.B, 100)
+    testHang = timeout * 2
+    val start = Instant.now()
+    val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
+
+    val end = Instant.now()
+    val waited = end.toEpochMilli - start.toEpochMilli
+    result shouldBe 'left
+    waited should be > timeout.toMillis
+    waited should be < (timeout * 2).toMillis
+  }
+
+  it should "handle empty entity response" in {
+    val timeout = 5.seconds
+    val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout, 1.B, 100)
+    testStatusCode = 204
+    val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
+    result shouldBe Left(NoResponseReceived())
+  }
+
+  it should "retry till timeout on StreamTcpException" in {
+    val timeout = 5.seconds
+    val connection = new AkkaContainerClient("0.0.0.0", 12345, timeout, 1.B, 100)
+    val start = Instant.now()
+    val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
+    val end = Instant.now()
+    val waited = end.toEpochMilli - start.toEpochMilli
+    result match {
+      case Left(Timeout(_: TimeoutException)) => // good
+      case _                                  => fail(s"$result was not a Timeout(TimeoutException)")
+    }
+    waited should be > timeout.toMillis
+    waited should be < (timeout * 2).toMillis
+  }
+
+  it should "retry till success within timeout limit" in {
+    val timeout = 5.seconds
+    val retryInterval = 500.milliseconds
+    val connection =
+      new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout, 1.B, 100, retryInterval)
+    val start = Instant.now()
+    testConnectionFailCount = 5
+    testResponse = ""
+    val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
+    val end = Instant.now()
+    val waited = end.toEpochMilli - start.toEpochMilli
+    result shouldBe Right {
+      ContainerResponse(true, "", None)
+    }
+
+    waited should be > (testConnectionFailCount * retryInterval).toMillis
+    waited should be < timeout.toMillis
+  }
+
+  it should "not truncate responses within limit" in {
+    val timeout = 1.minute.toMillis
+    val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout.millis, 50.B, 100)
+    Seq(true, false).foreach { success =>
+      Seq(null, "", "abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
+        testStatusCode = if (success) 200 else 500
+        testResponse = r
+        val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
+        result shouldBe Right {
+          ContainerResponse(okStatus = success, if (r != null) r else "", None)
+        }
+      }
+    }
+  }
+
+  it should "truncate responses that exceed limit" in {
+    val timeout = 1.minute.toMillis
+    val limit = 1.B
+    val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout.millis, limit, 100)
+    Seq(true, false).foreach { success =>
+      Seq("abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
+        testStatusCode = if (success) 200 else 500
+        testResponse = r
+        val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
+        result shouldBe Right {
+          ContainerResponse(okStatus = success, r.take(limit.toBytes.toInt), Some((r.length.B, limit)))
+        }
+      }
+    }
+  }
+
+  it should "truncate large responses that exceed limit" in {
+    val timeout = 1.minute.toMillis
+    //use a limit large enough to not fit into a single ByteString as response entity is parsed into multiple ByteStrings
+    //seems like this varies, but often is ~64k or ~128k
+    val limit = 300.KB
+    val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout.millis, limit, 100)
+    Seq(true, false).foreach { success =>
+      // Generate a response that's 1MB
+      val response = "0" * 1024 * 1024
+      testStatusCode = if (success) 200 else 500
+      testResponse = response
+      val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
+      result shouldBe Right {
+        ContainerResponse(okStatus = success, response.take(limit.toBytes.toInt), Some((response.length.B, limit)))
+      }
+
+    }
+  }
+}
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala
similarity index 60%
rename from tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
rename to tests/src/test/scala/whisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala
index 3816cfc..a38fa52 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala
@@ -19,7 +19,6 @@ package whisk.core.containerpool.docker.test
 
 import java.nio.charset.StandardCharsets
 import java.time.Instant
-
 import scala.concurrent.duration._
 import org.apache.http.HttpRequest
 import org.apache.http.HttpResponse
@@ -35,23 +34,30 @@ import org.scalatest.FlatSpec
 import org.scalatest.Matchers
 import spray.json.JsObject
 import common.StreamLogging
+import common.WskActorSystem
+import org.apache.http.conn.HttpHostConnectException
+import scala.concurrent.Await
 import whisk.common.TransactionId
-import whisk.core.containerpool.HttpUtils
+import whisk.core.containerpool.ApacheBlockingContainerClient
+import whisk.core.containerpool.RetryableConnectionError
+import whisk.core.entity.ActivationResponse.Timeout
 import whisk.core.entity.size._
 import whisk.core.entity.ActivationResponse._
 
 /**
- * Unit tests for HttpUtils which communicate with containers.
+ * Unit tests for ApacheBlockingContainerClient which communicate with containers.
  */
 @RunWith(classOf[JUnitRunner])
-class ContainerConnectionTests
+class ApacheBlockingContainerClientTests
     extends FlatSpec
     with Matchers
     with BeforeAndAfter
     with BeforeAndAfterAll
-    with StreamLogging {
+    with StreamLogging
+    with WskActorSystem {
 
   implicit val transid = TransactionId.testing
+  implicit val ec = actorSystem.dispatcher
 
   var testHang: FiniteDuration = 0.second
   var testStatusCode: Int = 200
@@ -89,14 +95,15 @@ class ContainerConnectionTests
     mockServer.shutDown()
   }
 
-  behavior of "Container HTTP Utils"
+  behavior of "ApacheBlockingContainerClient"
 
   it should "not wait longer than set timeout" in {
     val timeout = 5.seconds
-    val connection = new HttpUtils(hostWithPort, timeout, 1.B)
+    val connection = new ApacheBlockingContainerClient(hostWithPort, timeout, 1.B)
     testHang = timeout * 2
     val start = Instant.now()
-    val result = connection.post("/init", JsObject.empty, retry = true)
+    val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
+
     val end = Instant.now()
     val waited = end.toEpochMilli - start.toEpochMilli
     result shouldBe 'left
@@ -106,22 +113,41 @@ class ContainerConnectionTests
 
   it should "handle empty entity response" in {
     val timeout = 5.seconds
-    val connection = new HttpUtils(hostWithPort, timeout, 1.B)
+    val connection = new ApacheBlockingContainerClient(hostWithPort, timeout, 1.B)
     testStatusCode = 204
-    val result = connection.post("/init", JsObject.empty, retry = true)
+    val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
     result shouldBe Left(NoResponseReceived())
   }
 
+  it should "retry till timeout on HttpHostConnectException" in {
+    val timeout = 5.seconds
+    val badHostAndPort = "0.0.0.0:12345"
+    val connection = new ApacheBlockingContainerClient(badHostAndPort, timeout, 1.B)
+    testStatusCode = 204
+    val start = Instant.now()
+    val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
+    val end = Instant.now()
+    val waited = end.toEpochMilli - start.toEpochMilli
+    result match {
+      case Left(Timeout(RetryableConnectionError(_: HttpHostConnectException))) => // all good
+      case _ =>
+        fail(s"$result was not a Timeout(RetryableConnectionError(HttpHostConnectException)))")
+    }
+
+    waited should be > timeout.toMillis
+    waited should be < (timeout * 2).toMillis
+  }
+
   it should "not truncate responses within limit" in {
     val timeout = 1.minute.toMillis
-    val connection = new HttpUtils(hostWithPort, timeout.millis, 50.B)
-    Seq(true, false).foreach { code =>
+    val connection = new ApacheBlockingContainerClient(hostWithPort, timeout.millis, 50.B)
+    Seq(true, false).foreach { success =>
       Seq(null, "", "abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
-        testStatusCode = if (code) 200 else 500
+        testStatusCode = if (success) 200 else 500
         testResponse = r
-        val result = connection.post("/init", JsObject.empty, retry = true)
+        val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
         result shouldBe Right {
-          ContainerResponse(okStatus = code, if (r != null) r else "", None)
+          ContainerResponse(okStatus = success, if (r != null) r else "", None)
         }
       }
     }
@@ -131,14 +157,14 @@ class ContainerConnectionTests
     val timeout = 1.minute.toMillis
     val limit = 1.B
     val excess = limit + 1.B
-    val connection = new HttpUtils(hostWithPort, timeout.millis, limit)
-    Seq(true, false).foreach { code =>
+    val connection = new ApacheBlockingContainerClient(hostWithPort, timeout.millis, limit)
+    Seq(true, false).foreach { success =>
       Seq("abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
-        testStatusCode = if (code) 200 else 500
+        testStatusCode = if (success) 200 else 500
         testResponse = r
-        val result = connection.post("/init", JsObject.empty, retry = true)
+        val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
         result shouldBe Right {
-          ContainerResponse(okStatus = code, r.take(limit.toBytes.toInt), Some((r.length.B, limit)))
+          ContainerResponse(okStatus = success, r.take(limit.toBytes.toInt), Some((r.length.B, limit)))
         }
       }
     }
diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
index 5190a2f..1f584d7 100644
--- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
@@ -92,7 +92,7 @@ class KubernetesClientTests
   def kubernetesContainer(id: ContainerId) =
     new KubernetesContainer(id, ContainerAddress("ip"), "ip", "docker://" + id.asString)(kubernetesClient {
       Future.successful("")
-    }, global, logging)
+    }, actorSystem, global, logging)
 
   behavior of "KubernetesClient"
 
@@ -188,7 +188,7 @@ object KubernetesClientTests {
   implicit def strToInstant(str: String): Instant =
     strToDate(str).get
 
-  class TestKubernetesClient extends KubernetesApi with StreamLogging {
+  class TestKubernetesClient(implicit as: ActorSystem) extends KubernetesApi with StreamLogging {
     var runs = mutable.Buffer.empty[(String, String, Map[String, String], Map[String, String])]
     var rms = mutable.Buffer.empty[ContainerId]
     var rmByLabels = mutable.Buffer.empty[(String, String)]
@@ -238,7 +238,9 @@ object KubernetesClientTests {
     }
   }
 
-  class TestKubernetesClientWithInvokerAgent extends TestKubernetesClient with KubernetesApiWithInvokerAgent {
+  class TestKubernetesClientWithInvokerAgent(implicit as: ActorSystem)
+      extends TestKubernetesClient
+      with KubernetesApiWithInvokerAgent {
     var agentCommands = mutable.Buffer.empty[(ContainerId, String, Option[Map[String, JsValue]])]
     var forwardLogs = mutable.Buffer.empty[(ContainerId, Long)]
 
diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
index c63cad5..2de3543 100644
--- a/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
@@ -17,6 +17,7 @@
 
 package whisk.core.containerpool.logging.test
 
+import akka.actor.ActorSystem
 import common.{StreamLogging, WskActorSystem}
 import org.junit.runner.RunWith
 import org.scalatest.{FlatSpec, Matchers}
@@ -25,14 +26,12 @@ import whisk.core.containerpool.logging.{DockerToActivationLogStoreProvider, Log
 import whisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
 import whisk.core.entity._
 import java.time.Instant
-
 import akka.stream.scaladsl.Source
 import akka.util.ByteString
 import spray.json._
 import whisk.common.{Logging, TransactionId}
 import whisk.core.containerpool.{Container, ContainerAddress, ContainerId}
 import whisk.http.Messages
-
 import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.concurrent.duration._
 
@@ -107,5 +106,7 @@ class DockerToActivationLogStoreTests extends FlatSpec with Matchers with WskAct
     def resume()(implicit transid: TransactionId): Future[Unit] = ???
 
     def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId) = lines
+
+    override implicit protected val as: ActorSystem = actorSystem
   }
 }
diff --git a/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala b/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
index 51d899c..b686d56 100644
--- a/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
@@ -78,7 +78,7 @@ class MesosContainerFactoryTest
     lastTaskId
   }
 
-  val poolConfig = ContainerPoolConfig(8, 10)
+  val poolConfig = ContainerPoolConfig(8, 10, false)
   val dockerCpuShares = poolConfig.cpuShare
   val mesosCpus = poolConfig.cpuShare / 1024.0
 
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
index 66630b2..3de575d 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
@@ -113,6 +113,8 @@ class ContainerPoolTests
     (containers, factory)
   }
 
+  def poolConfig(numCore: Int, coreShare: Int) = ContainerPoolConfig(numCore, coreShare, false)
+
   behavior of "ContainerPool"
 
   /*
@@ -124,7 +126,7 @@ class ContainerPoolTests
   it should "reuse a warm container" in within(timeout) {
     val (containers, factory) = testContainers(2)
     val feed = TestProbe()
-    val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(2, 2), feed.ref))
+    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(2, 2), feed.ref))
 
     pool ! runMessage
     containers(0).expectMsg(runMessage)
@@ -138,7 +140,7 @@ class ContainerPoolTests
   it should "reuse a warm container when action is the same even if revision changes" in within(timeout) {
     val (containers, factory) = testContainers(2)
     val feed = TestProbe()
-    val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(2, 2), feed.ref))
+    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(2, 2), feed.ref))
 
     pool ! runMessage
     containers(0).expectMsg(runMessage)
@@ -153,7 +155,7 @@ class ContainerPoolTests
     val (containers, factory) = testContainers(2)
     val feed = TestProbe()
 
-    val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(2, 2), feed.ref))
+    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(2, 2), feed.ref))
     pool ! runMessage
     containers(0).expectMsg(runMessage)
     // Note that the container doesn't respond, thus it's not free to take work
@@ -167,7 +169,7 @@ class ContainerPoolTests
     val feed = TestProbe()
 
     // a pool with only 1 slot
-    val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(1, 1), feed.ref))
+    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(1, 1), feed.ref))
     pool ! runMessage
     containers(0).expectMsg(runMessage)
     containers(0).send(pool, NeedWork(warmedData()))
@@ -182,7 +184,7 @@ class ContainerPoolTests
     val feed = TestProbe()
 
     // a pool with only 1 active slot but 2 slots in total
-    val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(1, 2), feed.ref))
+    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(1, 2), feed.ref))
 
     // Run the first container
     pool ! runMessage
@@ -208,7 +210,7 @@ class ContainerPoolTests
     val feed = TestProbe()
 
     // a pool with only 1 slot
-    val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(1, 1), feed.ref))
+    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(1, 1), feed.ref))
     pool ! runMessage
     containers(0).expectMsg(runMessage)
     containers(0).send(pool, NeedWork(warmedData()))
@@ -223,7 +225,7 @@ class ContainerPoolTests
     val feed = TestProbe()
 
     // a pool with only 1 slot
-    val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(1, 1), feed.ref))
+    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(1, 1), feed.ref))
     pool ! runMessage
     containers(0).expectMsg(runMessage)
     containers(0).send(pool, RescheduleJob) // emulate container failure ...
@@ -241,7 +243,8 @@ class ContainerPoolTests
 
     val pool =
       system.actorOf(
-        ContainerPool.props(factory, ContainerPoolConfig(0, 0), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit))))
+        ContainerPool
+          .props(factory, poolConfig(0, 0), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit))))
     containers(0).expectMsg(Start(exec, memoryLimit))
   }
 
@@ -251,7 +254,8 @@ class ContainerPoolTests
 
     val pool =
       system.actorOf(
-        ContainerPool.props(factory, ContainerPoolConfig(1, 1), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit))))
+        ContainerPool
+          .props(factory, poolConfig(1, 1), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit))))
     containers(0).expectMsg(Start(exec, memoryLimit))
     containers(0).send(pool, NeedWork(preWarmedData(exec.kind)))
     pool ! runMessage
@@ -266,7 +270,7 @@ class ContainerPoolTests
 
     val pool = system.actorOf(
       ContainerPool
-        .props(factory, ContainerPoolConfig(1, 1), feed.ref, List(PrewarmingConfig(1, alternativeExec, memoryLimit))))
+        .props(factory, poolConfig(1, 1), feed.ref, List(PrewarmingConfig(1, alternativeExec, memoryLimit))))
     containers(0).expectMsg(Start(alternativeExec, memoryLimit)) // container0 was prewarmed
     containers(0).send(pool, NeedWork(preWarmedData(alternativeExec.kind)))
     pool ! runMessage
@@ -282,7 +286,7 @@ class ContainerPoolTests
     val pool =
       system.actorOf(
         ContainerPool
-          .props(factory, ContainerPoolConfig(1, 1), feed.ref, List(PrewarmingConfig(1, exec, alternativeLimit))))
+          .props(factory, poolConfig(1, 1), feed.ref, List(PrewarmingConfig(1, exec, alternativeLimit))))
     containers(0).expectMsg(Start(exec, alternativeLimit)) // container0 was prewarmed
     containers(0).send(pool, NeedWork(preWarmedData(exec.kind, alternativeLimit)))
     pool ! runMessage
@@ -296,7 +300,7 @@ class ContainerPoolTests
     val (containers, factory) = testContainers(2)
     val feed = TestProbe()
 
-    val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(2, 2), feed.ref))
+    val pool = system.actorOf(ContainerPool.props(factory, poolConfig(2, 2), feed.ref))
 
     // container0 is created and used
     pool ! runMessage
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 9c5ef66..25b0303 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -167,7 +167,7 @@ class ContainerProxyTests
     Future.successful(())
   }
 
-  val poolConfig = ContainerPoolConfig(1, 2)
+  val poolConfig = ContainerPoolConfig(1, 2, false)
 
   behavior of "ContainerProxy"
 
@@ -732,6 +732,7 @@ class ContainerProxyTests
     protected val addr = ContainerAddress("0.0.0.0")
     protected implicit val logging: Logging = log
     protected implicit val ec: ExecutionContext = system.dispatcher
+    override implicit protected val as: ActorSystem = system
     var suspendCount = 0
     var resumeCount = 0
     var destroyCount = 0