You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cs...@apache.org on 2018/08/16 18:27:15 UTC

[incubator-openwhisk] branch master updated: use KeepAlive(false) (SocketOption) instead of Connection:close (HTTP header) to disuade connection reuse (#3969)

This is an automated email from the ASF dual-hosted git repository.

csantanapr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new ded492d  use KeepAlive(false) (SocketOption) instead of Connection:close (HTTP header) to disuade connection reuse (#3969)
ded492d is described below

commit ded492dbb301e82bd67c9f41e64e25821eec4528
Author: tysonnorris <ty...@gmail.com>
AuthorDate: Thu Aug 16 11:27:10 2018 -0700

    use KeepAlive(false) (SocketOption) instead of Connection:close (HTTP header) to disuade connection reuse (#3969)
---
 .../scala/whisk/core/containerpool/AkkaContainerClient.scala | 10 +++-------
 .../scala/src/main/scala/whisk/http/PoolingRestClient.scala  | 12 ++++++++----
 2 files changed, 11 insertions(+), 11 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/core/containerpool/AkkaContainerClient.scala b/common/scala/src/main/scala/whisk/core/containerpool/AkkaContainerClient.scala
index a133989..1b53931 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/AkkaContainerClient.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/AkkaContainerClient.scala
@@ -27,7 +27,6 @@ import akka.http.scaladsl.model.MediaTypes
 import akka.http.scaladsl.model.MessageEntity
 import akka.http.scaladsl.model.StatusCodes
 import akka.http.scaladsl.model.headers.Accept
-import akka.http.scaladsl.model.headers.Connection
 import akka.http.scaladsl.unmarshalling.Unmarshal
 import akka.stream.StreamTcpException
 import akka.stream.scaladsl.Sink
@@ -56,6 +55,7 @@ import whisk.http.PoolingRestClient
  * It allows to POST a JSON object and receive JSON object back; that is the
  * content type and the accept headers are both 'application/json.
  * This implementation uses the akka http host-level client API.
+ * NOTE: Keepalive is disabled to prevent issues with paused containers
  *
  * @param hostname the host name
  * @param port the port
@@ -71,7 +71,7 @@ protected class AkkaContainerClient(
   maxResponse: ByteSize,
   queueSize: Int,
   retryInterval: FiniteDuration = 100.milliseconds)(implicit logging: Logging, as: ActorSystem)
-    extends PoolingRestClient("http", hostname, port, queueSize, timeout = Some(timeout))
+    extends PoolingRestClient("http", hostname, port, queueSize, timeout = Some(timeout), keepAlive = Some(false))
     with ContainerClient {
 
   def close() = Await.result(shutdown(), 30.seconds)
@@ -93,12 +93,8 @@ protected class AkkaContainerClient(
 
     //create the request
     val req = Marshal(body).to[MessageEntity].map { b =>
-      //DO NOT reuse the connection
-      //For details on Connection: Close handling, see:
-      // - https://doc.akka.io/docs/akka-http/current/common/http-model.html#http-headers
-      // - http://github.com/akka/akka-http/tree/v10.1.3/akka-http-core/src/test/scala/akka/http/impl/engine/rendering/ResponseRendererSpec.scala#L470-L571
       HttpRequest(HttpMethods.POST, endpoint, entity = b)
-        .withHeaders(Connection("close"), Accept(MediaTypes.`application/json`))
+        .withHeaders(Accept(MediaTypes.`application/json`))
     }
 
     retryingRequest(req, timeout, retry)
diff --git a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
index 909fffb..1edade1 100644
--- a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
+++ b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
@@ -24,6 +24,7 @@ import akka.http.scaladsl.marshalling._
 import akka.http.scaladsl.model._
 import akka.http.scaladsl.settings.ConnectionPoolSettings
 import akka.http.scaladsl.unmarshalling._
+import akka.io.Tcp.SO
 import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
 import akka.stream.scaladsl.{Flow, _}
 import spray.json._
@@ -45,16 +46,19 @@ class PoolingRestClient(
   port: Int,
   queueSize: Int,
   httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
-  timeout: Option[FiniteDuration] = None)(implicit system: ActorSystem) {
+  timeout: Option[FiniteDuration] = None,
+  keepAlive: Option[Boolean] = None)(implicit system: ActorSystem) {
   require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.")
 
   protected implicit val context: ExecutionContext = system.dispatcher
   protected implicit val materializer: ActorMaterializer = ActorMaterializer()
 
-  //if specified, override the ClientConnection idle-timeout value
+  //if specified, override the ClientConnection idle-timeout and keepalive socket option value
   private val timeoutSettings = {
-    val ps = ConnectionPoolSettings(system.settings.config)
-    timeout.map(t => ps.withUpdatedConnectionSettings(_.withIdleTimeout(t))).getOrElse(ps)
+    ConnectionPoolSettings(system.settings.config).withUpdatedConnectionSettings { s =>
+      val t = timeout.map(t => s.withIdleTimeout(t)).getOrElse(s)
+      keepAlive.map(k => t.withSocketOptions(SO.KeepAlive(k) :: Nil)).getOrElse(t)
+    }
   }
 
   // Creates or retrieves a connection pool for the host.