You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/06/11 15:02:21 UTC

[GitHub] tysonnorris closed pull request #3707: enable concurrent runs on ActionContainer test utility

tysonnorris closed pull request #3707: enable concurrent runs on ActionContainer test utility 
URL: https://github.com/apache/incubator-openwhisk/pull/3707
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
index d39118201d..d9bdbdc3ca 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
@@ -20,12 +20,16 @@ package whisk.core.containerpool
 import java.net.NoRouteToHostException
 import java.nio.charset.StandardCharsets
 
-import scala.annotation.tailrec
 import scala.concurrent.duration._
+import scala.annotation.tailrec
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
 import scala.util.control.NoStackTrace
 import scala.util.Failure
 import scala.util.Success
 import scala.util.Try
+
 import org.apache.commons.io.IOUtils
 import org.apache.http.HttpHeaders
 import org.apache.http.client.config.RequestConfig
@@ -35,6 +39,7 @@ import org.apache.http.client.utils.URIBuilder
 import org.apache.http.conn.HttpHostConnectException
 import org.apache.http.entity.StringEntity
 import org.apache.http.impl.client.HttpClientBuilder
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager
 import spray.json._
 import whisk.common.Logging
 import whisk.common.TransactionId
@@ -53,8 +58,9 @@ import whisk.core.entity.size.SizeLong
  * @param hostname the host name
  * @param timeout the timeout in msecs to wait for a response
  * @param maxResponse the maximum size in bytes the connection will accept
+ * @param maxConcurrent the maximum number of concurrent requests allowed (Default is 1)
  */
-protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxResponse: ByteSize)(
+protected class HttpUtils(hostname: String, timeout: FiniteDuration, maxResponse: ByteSize, maxConcurrent: Int = 1)(
   implicit logging: Logging) {
 
   def close() = Try(connection.close())
@@ -80,14 +86,14 @@ protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxRe
     request.addHeader(HttpHeaders.ACCEPT, "application/json")
     request.setEntity(entity)
 
-    execute(request, timeout, retry)
+    execute(request, timeout, maxConcurrent, retry)
   }
 
   // Used internally to wrap all exceptions for which the request can be retried
   private case class RetryableConnectionError(t: Throwable) extends Exception(t) with NoStackTrace
 
   // Annotation will make the compiler complain if no tail recursion is possible
-  @tailrec private def execute(request: HttpRequestBase, timeout: FiniteDuration, retry: Boolean)(
+  @tailrec private def execute(request: HttpRequestBase, timeout: FiniteDuration, maxConcurrent: Int, retry: Boolean)(
     implicit tid: TransactionId): Either[ContainerHttpError, ContainerResponse] = {
     Try(connection.execute(request)).map { response =>
       val containerResponse = Option(response.getEntity)
@@ -130,9 +136,9 @@ protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxRe
         if (timeout > Duration.Zero) {
           Thread.sleep(sleepTime.toMillis)
           val newTimeout = timeout - sleepTime
-          execute(request, newTimeout, retry = true)
+          execute(request, newTimeout, maxConcurrent, retry = true)
         } else {
-          logging.warn(this, s"POST failed with ${t} - no retry because timeout exceeded.")
+          logging.warn(this, s"POST failed with $t - no retry because timeout exceeded.")
           Left(Timeout(t))
         }
       case Failure(t: Throwable) => Left(ConnectionError(t))
@@ -153,6 +159,15 @@ protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxRe
 
   private val connection = HttpClientBuilder.create
     .setDefaultRequestConfig(httpconfig)
+    .setConnectionManager(if (maxConcurrent > 1) {
+      // Use PoolingHttpClientConnectionManager so that concurrent activation processing (if enabled) will reuse connections
+      val cm = new PoolingHttpClientConnectionManager
+      // Increase default max connections per route (default is 2)
+      cm.setDefaultMaxPerRoute(maxConcurrent)
+      // Increase max total connections (default is 20)
+      cm.setMaxTotal(maxConcurrent)
+      cm
+    } else null) //set the Pooling connection manager IFF maxConcurrent > 1
     .useSystemProperties()
     .disableAutomaticRetries()
     .build
@@ -164,9 +179,27 @@ object HttpUtils {
   def post(host: String, port: Int, endPoint: String, content: JsValue)(implicit logging: Logging,
                                                                         tid: TransactionId): (Int, Option[JsObject]) = {
     val connection = new HttpUtils(s"$host:$port", 90.seconds, 1.MB)
-    val response = connection.post(endPoint, content, retry = true)
+    val response = executeRequest(connection, endPoint, content)
+    connection.close()
+    response
+  }
+
+  /** A helper method to post multiple concurrent requests to a single connection. Used for container tests. */
+  def concurrentPost(host: String, port: Int, endPoint: String, contents: Seq[JsValue], timeout: Duration)(
+    implicit logging: Logging,
+    tid: TransactionId,
+    ec: ExecutionContext): Seq[(Int, Option[JsObject])] = {
+    val connection = new HttpUtils(s"$host:$port", 90.seconds, 1.MB, contents.size)
+    val futureResults = contents.map(content => Future { executeRequest(connection, endPoint, content) })
+    val results = Await.result(Future.sequence(futureResults), timeout)
     connection.close()
-    response match {
+    results
+  }
+
+  private def executeRequest(connection: HttpUtils, endpoint: String, content: JsValue)(
+    implicit logging: Logging,
+    tid: TransactionId): (Int, Option[JsObject]) = {
+    connection.post(endpoint, content, retry = true) match {
       case Right(r)                   => (r.statusCode, Try(r.entity.parseJson.asJsObject).toOption)
       case Left(NoResponseReceived()) => throw new IllegalStateException("no response from container")
       case Left(Timeout(_))           => throw new java.util.concurrent.TimeoutException()
diff --git a/tests/src/test/scala/actionContainers/ActionContainer.scala b/tests/src/test/scala/actionContainers/ActionContainer.scala
index b02d3acff1..e0a2cafada 100644
--- a/tests/src/test/scala/actionContainers/ActionContainer.scala
+++ b/tests/src/test/scala/actionContainers/ActionContainer.scala
@@ -34,6 +34,7 @@ import scala.util.{Failure, Success}
 import org.apache.commons.lang3.StringUtils
 import org.scalatest.{FlatSpec, Matchers}
 import akka.actor.ActorSystem
+import scala.concurrent.ExecutionContext
 import spray.json._
 import common.StreamLogging
 import whisk.common.Logging
@@ -48,6 +49,7 @@ import common.WhiskProperties
 trait ActionContainer {
   def init(value: JsValue): (Int, Option[JsObject])
   def run(value: JsValue): (Int, Option[JsObject])
+  def runMultiple(values: Seq[JsValue])(implicit ec: ExecutionContext): Seq[(Int, Option[JsObject])]
 }
 
 trait ActionProxyContainerTestUtils extends FlatSpec with Matchers with StreamLogging {
@@ -204,6 +206,8 @@ object ActionContainer {
     val mock = new ActionContainer {
       def init(value: JsValue): (Int, Option[JsObject]) = syncPost(ip, port, "/init", value)
       def run(value: JsValue): (Int, Option[JsObject]) = syncPost(ip, port, "/run", value)
+      def runMultiple(values: Seq[JsValue])(implicit ec: ExecutionContext): Seq[(Int, Option[JsObject])] =
+        concurrentSyncPost(ip, port, "/run", values)
     }
 
     try {
@@ -226,4 +230,12 @@ object ActionContainer {
 
     whisk.core.containerpool.HttpUtils.post(host, port, endPoint, content)
   }
+  private def concurrentSyncPost(host: String, port: Int, endPoint: String, contents: Seq[JsValue])(
+    implicit logging: Logging,
+    ec: ExecutionContext): Seq[(Int, Option[JsObject])] = {
+
+    implicit val transid = TransactionId.testing
+
+    whisk.core.containerpool.HttpUtils.concurrentPost(host, port, endPoint, contents, 30.seconds)
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services