You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2018/07/06 12:07:22 UTC

[incubator-openwhisk] branch master updated: Properly consume and optimize reading of http entities, drop connection reuse. (#3843)

This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new e7e8414  Properly consume and optimize reading of http entities, drop connection reuse. (#3843)
e7e8414 is described below

commit e7e841476dd30905dc8b4997f3af1ff0adba73d0
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Fri Jul 6 13:07:18 2018 +0100

    Properly consume and optimize reading of http entities, drop connection reuse. (#3843)
    
    - Explicitly consume and close the response's entity in any case (even if ignored) to make sure the connections get released properly and are not leaked.
    - Use optimized path for consuming the entire entity into a string if its length is within bounds.
    - Don't reuse connections.
    
    To the last point: Reusing connections when a runtime doesn't support it adds a significant latency overhead when closing the response's entity. That's likely due to some sort of mismatched behavior (client wants to keep the connection open, the server doesn't even know the concept). The latency overhead in those cases (10-20ms of latency added in latency tests) seem far higher than the latency overhead of establishing a new connection (not measurable in latency tests).
    Dropping connection reuse also solves any issues that might be encountered due to the pause/resume cycles of the container, in which sockets don't react to any event at all.
---
 common/scala/build.gradle                          |  2 +-
 .../scala/whisk/core/containerpool/HttpUtils.scala | 53 ++++++++++++----------
 2 files changed, 31 insertions(+), 24 deletions(-)

diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 02e9453..0983fd8 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -51,7 +51,7 @@ dependencies {
     compile 'commons-io:commons-io:2.6'
     compile 'commons-collections:commons-collections:3.2.2'
     compile 'org.apache.kafka:kafka-clients:0.11.0.1'
-    compile ('org.apache.httpcomponents:httpclient:4.4.1') {
+    compile ('org.apache.httpcomponents:httpclient:4.5.5') {
         exclude group: 'commons-logging'
     }
     compile ('com.fasterxml.uuid:java-uuid-generator:3.1.3') {
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
index bf95441..4ee7363 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
@@ -20,32 +20,29 @@ package whisk.core.containerpool
 import java.net.NoRouteToHostException
 import java.nio.charset.StandardCharsets
 
-import scala.concurrent.duration._
-import scala.annotation.tailrec
-import scala.concurrent.Await
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.util.control.NoStackTrace
-import scala.util.Failure
-import scala.util.Success
-import scala.util.Try
 import org.apache.commons.io.IOUtils
 import org.apache.http.HttpHeaders
 import org.apache.http.client.config.RequestConfig
-import org.apache.http.client.methods.HttpPost
-import org.apache.http.client.methods.HttpRequestBase
-import org.apache.http.client.utils.URIBuilder
+import org.apache.http.client.methods.{HttpPost, HttpRequestBase}
+import org.apache.http.client.utils.{HttpClientUtils, URIBuilder}
 import org.apache.http.conn.HttpHostConnectException
 import org.apache.http.entity.StringEntity
+import org.apache.http.impl.NoConnectionReuseStrategy
 import org.apache.http.impl.client.HttpClientBuilder
 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager
+import org.apache.http.util.EntityUtils
 import spray.json._
-import whisk.common.Logging
-import whisk.common.TransactionId
+import whisk.common.{Logging, TransactionId}
 import whisk.core.entity.ActivationResponse._
 import whisk.core.entity.ByteSize
 import whisk.core.entity.size.SizeLong
 
+import scala.annotation.tailrec
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.duration._
+import scala.util.{Failure, Success, Try}
+import scala.util.control.NoStackTrace
+
 /**
  * This HTTP client is used only in the invoker to communicate with the action container.
  * It allows to POST a JSON object and receive JSON object back; that is the
@@ -68,7 +65,7 @@ protected class HttpUtils(hostname: String, timeout: FiniteDuration, maxResponse
    * This will close the HttpClient that is generated for this instance of HttpUtils. That will also cause the
    * ConnectionManager to be closed alongside.
    */
-  def close() = Try(connection.close())
+  def close(): Unit = HttpClientUtils.closeQuietly(connection)
 
   /**
    * Posts to hostname/endpoint the given JSON object.
@@ -106,13 +103,20 @@ protected class HttpUtils(hostname: String, timeout: FiniteDuration, maxResponse
           val statusCode = response.getStatusLine.getStatusCode
           val contentLength = entity.getContentLength
 
+          // Negative contentLength means unknown or overflow. We don't want to consume in either case.
           if (contentLength >= 0) {
-            val bytesToRead = Math.min(contentLength, maxResponseBytes)
-            val bytes = IOUtils.toByteArray(entity.getContent, bytesToRead)
-            val str = new String(bytes, StandardCharsets.UTF_8)
-            val truncated = if (contentLength <= maxResponseBytes) None else Some(contentLength.B, maxResponse)
-            Right(ContainerResponse(statusCode, str, truncated))
+            if (contentLength <= maxResponseBytes) {
+              // optimized route to consume the entire stream into a string
+              val str = EntityUtils.toString(entity, StandardCharsets.UTF_8) // consumes and closes the whole stream
+              Right(ContainerResponse(statusCode, str, None))
+            } else {
+              // only consume a bounded number of bytes according to the system limits
+              val str = new String(IOUtils.toByteArray(entity.getContent, maxResponseBytes), StandardCharsets.UTF_8)
+              EntityUtils.consumeQuietly(entity) // consume the rest of the stream to free the connection
+              Right(ContainerResponse(statusCode, str, Some(contentLength.B, maxResponse)))
+            }
           } else {
+            EntityUtils.consumeQuietly(entity) // silently consume the whole stream to free the connection
             Left(NoResponseReceived())
           }
         }
@@ -164,7 +168,10 @@ protected class HttpUtils(hostname: String, timeout: FiniteDuration, maxResponse
 
   private val connection = HttpClientBuilder.create
     .setDefaultRequestConfig(httpconfig)
-    .setConnectionManager(if (maxConcurrent > 1) {
+    // Connections are not reused by most of the available runtimes. To circumvent any issues we might have regarding
+    // connections randomly breaking due to our pause/resume cycle, we don't reuse connections at all.
+    .setConnectionReuseStrategy(new NoConnectionReuseStrategy)
+    .setConnectionManager {
       // A PoolingHttpClientConnectionManager is the default when not specifying any ConnectionManager.
       // The PoolingHttpClientConnectionManager has the benefit of actively checking if a connection has become stale,
       // which is very important because pausing/resuming containers can cause a connection to become silently broken.
@@ -178,7 +185,7 @@ protected class HttpUtils(hostname: String, timeout: FiniteDuration, maxResponse
       cm.setDefaultMaxPerRoute(maxConcurrent)
       cm.setMaxTotal(maxConcurrent)
       cm
-    } else null)
+    }
     .useSystemProperties()
     .disableAutomaticRetries()
     .build
@@ -214,7 +221,7 @@ object HttpUtils {
       case Right(r)                   => (r.statusCode, Try(r.entity.parseJson.asJsObject).toOption)
       case Left(NoResponseReceived()) => throw new IllegalStateException("no response from container")
       case Left(Timeout(_))           => throw new java.util.concurrent.TimeoutException()
-      case Left(ConnectionError(t: java.net.SocketTimeoutException)) =>
+      case Left(ConnectionError(_: java.net.SocketTimeoutException)) =>
         throw new java.util.concurrent.TimeoutException()
       case Left(ConnectionError(t)) => throw new IllegalStateException(t.getMessage)
     }