You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/05/29 13:49:55 UTC

[GitHub] markusthoemmes closed pull request #3710: Make REST communication with action containers more robust

markusthoemmes closed pull request #3710: Make REST communication with action containers more robust
URL: https://github.com/apache/incubator-openwhisk/pull/3710
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
index e0fd37ff41..07c6fc7e5b 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
@@ -17,14 +17,15 @@
 
 package whisk.core.containerpool
 
+import java.net.NoRouteToHostException
 import java.nio.charset.StandardCharsets
 
-import scala.concurrent.duration.DurationInt
-import scala.concurrent.duration.FiniteDuration
+import scala.annotation.tailrec
+import scala.concurrent.duration._
+import scala.util.control.NoStackTrace
 import scala.util.Failure
 import scala.util.Success
 import scala.util.Try
-
 import org.apache.commons.io.IOUtils
 import org.apache.http.HttpHeaders
 import org.apache.http.client.config.RequestConfig
@@ -34,8 +35,9 @@ import org.apache.http.client.utils.URIBuilder
 import org.apache.http.conn.HttpHostConnectException
 import org.apache.http.entity.StringEntity
 import org.apache.http.impl.client.HttpClientBuilder
-
 import spray.json._
+import whisk.common.Logging
+import whisk.common.TransactionId
 import whisk.core.entity.ActivationResponse._
 import whisk.core.entity.ByteSize
 import whisk.core.entity.size.SizeLong
@@ -52,7 +54,8 @@ import whisk.core.entity.size.SizeLong
  * @param timeout the timeout in msecs to wait for a response
  * @param maxResponse the maximum size in bytes the connection will accept
  */
-protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxResponse: ByteSize) {
+protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxResponse: ByteSize)(
+  implicit logging: Logging) {
 
   def close() = Try(connection.close())
 
@@ -68,7 +71,8 @@ protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxRe
    * @param retry whether or not to retry on connection failure
    * @return Left(Error Message) or Right(Status Code, Response as UTF-8 String)
    */
-  def post(endpoint: String, body: JsValue, retry: Boolean): Either[ContainerHttpError, ContainerResponse] = {
+  def post(endpoint: String, body: JsValue, retry: Boolean)(
+    implicit tid: TransactionId): Either[ContainerHttpError, ContainerResponse] = {
     val entity = new StringEntity(body.compactPrint, StandardCharsets.UTF_8)
     entity.setContentType("application/json")
 
@@ -76,12 +80,15 @@ protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxRe
     request.addHeader(HttpHeaders.ACCEPT, "application/json")
     request.setEntity(entity)
 
-    execute(request, timeout.toMillis.toInt, retry)
+    execute(request, timeout, retry)
   }
 
-  private def execute(request: HttpRequestBase,
-                      timeoutMsec: Integer,
-                      retry: Boolean): Either[ContainerHttpError, ContainerResponse] = {
+  // Used internally to wrap all exceptions for which the request can be retried
+  private case class RetryableConnectionError(t: Throwable) extends Exception(t) with NoStackTrace
+
+  // Annotation will make the compiler complain if no tail recursion is possible
+  @tailrec private def execute(request: HttpRequestBase, timeout: FiniteDuration, retry: Boolean)(
+    implicit tid: TransactionId): Either[ContainerHttpError, ContainerResponse] = {
     Try(connection.execute(request)).map { response =>
       val containerResponse = Option(response.getEntity)
         .map { entity =>
@@ -105,15 +112,29 @@ protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxRe
 
       response.close()
       containerResponse
+    } recoverWith {
+      // The route to target socket as well as the target socket itself may need some time to be available -
+      // particularly on a loaded system.
+      // The following exceptions occur on such transient conditions. In addition, no data has been transmitted
+      // yet if these exceptions occur. For this reason, it is safe and reasonable to retry.
+      //
+      // HttpHostConnectException: no target socket is listening (yet).
+      case t: HttpHostConnectException => Failure(RetryableConnectionError(t))
+      //
+      // NoRouteToHostException: route to target host is not known (yet).
+      case t: NoRouteToHostException => Failure(RetryableConnectionError(t))
     } match {
-      case Success(r) => r
-      case Failure(t: HttpHostConnectException) if retry =>
-        if (timeoutMsec > 0) {
-          Thread sleep 100
-          val newTimeout = timeoutMsec - 100
-          execute(request, newTimeout, retry)
+      case Success(response) => response
+      case Failure(t: RetryableConnectionError) if retry =>
+        val sleepTime = 10.milliseconds
+        if (timeout > Duration.Zero) {
+          logging.info(this, s"POST failed with ${t} - retrying after sleeping ${sleepTime}.")
+          Thread.sleep(sleepTime.toMillis)
+          val newTimeout = timeout - sleepTime
+          execute(request, newTimeout, retry = true)
         } else {
-          Left(Timeout())
+          logging.warn(this, s"POST failed with ${t} - no retry because timeout exceeded.")
+          Left(Timeout(t))
         }
       case Failure(t: Throwable) => Left(ConnectionError(t))
     }
@@ -141,14 +162,15 @@ protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxRe
 object HttpUtils {
 
   /** A helper method to post one single request to a connection. Used for container tests. */
-  def post(host: String, port: Int, endPoint: String, content: JsValue): (Int, Option[JsObject]) = {
+  def post(host: String, port: Int, endPoint: String, content: JsValue)(implicit logging: Logging,
+                                                                        tid: TransactionId): (Int, Option[JsObject]) = {
     val connection = new HttpUtils(s"$host:$port", 90.seconds, 1.MB)
     val response = connection.post(endPoint, content, retry = true)
     connection.close()
     response match {
       case Right(r)                   => (r.statusCode, Try(r.entity.parseJson.asJsObject).toOption)
       case Left(NoResponseReceived()) => throw new IllegalStateException("no response from container")
-      case Left(Timeout())            => throw new java.util.concurrent.TimeoutException()
+      case Left(Timeout(_))           => throw new java.util.concurrent.TimeoutException()
       case Left(ConnectionError(t: java.net.SocketTimeoutException)) =>
         throw new java.util.concurrent.TimeoutException()
       case Left(ConnectionError(t)) => throw new IllegalStateException(t.getMessage)
diff --git a/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala b/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
index 85a9f36015..5f8c815a7f 100644
--- a/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
@@ -99,7 +99,7 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol {
   protected[core] sealed trait ContainerHttpError extends ContainerConnectionError
   protected[core] case class ConnectionError(t: Throwable) extends ContainerHttpError
   protected[core] case class NoResponseReceived() extends ContainerHttpError
-  protected[core] case class Timeout() extends ContainerHttpError
+  protected[core] case class Timeout(t: Throwable) extends ContainerHttpError
 
   protected[core] case class MemoryExhausted() extends ContainerConnectionError
 
diff --git a/tests/src/test/scala/actionContainers/ActionContainer.scala b/tests/src/test/scala/actionContainers/ActionContainer.scala
index 3ee1f47314..97915c8aad 100644
--- a/tests/src/test/scala/actionContainers/ActionContainer.scala
+++ b/tests/src/test/scala/actionContainers/ActionContainer.scala
@@ -31,13 +31,13 @@ import scala.sys.process.ProcessLogger
 import scala.sys.process.stringToProcess
 import scala.util.Random
 import scala.util.{Failure, Success}
-
 import org.apache.commons.lang3.StringUtils
-import org.scalatest.FlatSpec
-import org.scalatest.Matchers
-
+import org.scalatest.{FlatSpec, Matchers}
 import akka.actor.ActorSystem
 import spray.json._
+import common.StreamLogging
+import whisk.common.Logging
+import whisk.common.TransactionId
 import whisk.core.entity.Exec
 
 /**
@@ -49,7 +49,7 @@ trait ActionContainer {
   def run(value: JsValue): (Int, Option[JsObject])
 }
 
-trait ActionProxyContainerTestUtils extends FlatSpec with Matchers {
+trait ActionProxyContainerTestUtils extends FlatSpec with Matchers with StreamLogging {
   import ActionContainer.{filterSentinel, sentinel}
 
   def initPayload(code: String, main: String = "main"): JsObject =
@@ -149,8 +149,8 @@ object ActionContainer {
   val sentinel = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX"
   def filterSentinel(str: String): String = str.replaceAll(sentinel, "").trim
 
-  def withContainer(imageName: String, environment: Map[String, String] = Map.empty)(code: ActionContainer => Unit)(
-    implicit actorSystem: ActorSystem): (String, String) = {
+  def withContainer(imageName: String, environment: Map[String, String] = Map.empty)(
+    code: ActionContainer => Unit)(implicit actorSystem: ActorSystem, logging: Logging): (String, String) = {
     val rand = { val r = Random.nextInt; if (r < 0) -r else r }
     val name = imageName.toLowerCase.replaceAll("""[^a-z]""", "") + rand
     val envArgs = environment.toSeq
@@ -204,7 +204,11 @@ object ActionContainer {
     }
   }
 
-  private def syncPost(host: String, port: Int, endPoint: String, content: JsValue): (Int, Option[JsObject]) = {
+  private def syncPost(host: String, port: Int, endPoint: String, content: JsValue)(
+    implicit logging: Logging): (Int, Option[JsObject]) = {
+
+    implicit val transid = TransactionId.testing
+
     whisk.core.containerpool.HttpUtils.post(host, port, endPoint, content)
   }
 }
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
index 0e0867a25d..95754220b2 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
@@ -21,7 +21,6 @@ import java.nio.charset.StandardCharsets
 import java.time.Instant
 
 import scala.concurrent.duration._
-
 import org.apache.http.HttpRequest
 import org.apache.http.HttpResponse
 import org.apache.http.entity.StringEntity
@@ -34,8 +33,9 @@ import org.scalatest.BeforeAndAfter
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FlatSpec
 import org.scalatest.Matchers
-
 import spray.json.JsObject
+import common.StreamLogging
+import whisk.common.TransactionId
 import whisk.core.containerpool.HttpUtils
 import whisk.core.entity.size._
 import whisk.core.entity.ActivationResponse._
@@ -44,7 +44,14 @@ import whisk.core.entity.ActivationResponse._
  * Unit tests for HttpUtils which communicate with containers.
  */
 @RunWith(classOf[JUnitRunner])
-class ContainerConnectionTests extends FlatSpec with Matchers with BeforeAndAfter with BeforeAndAfterAll {
+class ContainerConnectionTests
+    extends FlatSpec
+    with Matchers
+    with BeforeAndAfter
+    with BeforeAndAfterAll
+    with StreamLogging {
+
+  implicit val transid = TransactionId.testing
 
   var testHang: FiniteDuration = 0.second
   var testStatusCode: Int = 200
@@ -75,6 +82,7 @@ class ContainerConnectionTests extends FlatSpec with Matchers with BeforeAndAfte
     testHang = 0.second
     testStatusCode = 200
     testResponse = null
+    stream.reset()
   }
 
   override def afterAll = {
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
index 25bc0c980c..b3aa2b0227 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -383,7 +383,7 @@ class DockerContainerTests
     val interval = intervalOf(initTimeout + 1.nanoseconds)
 
     val container = dockerContainer() {
-      Future.successful(RunResult(interval, Left(Timeout())))
+      Future.successful(RunResult(interval, Left(Timeout(new Throwable()))))
     }
 
     val init = container.initialize(JsObject(), initTimeout)
@@ -434,7 +434,7 @@ class DockerContainerTests
     val interval = intervalOf(runTimeout + 1.nanoseconds)
 
     val container = dockerContainer() {
-      Future.successful(RunResult(interval, Left(Timeout())))
+      Future.successful(RunResult(interval, Left(Timeout(new Throwable()))))
     }
 
     val runResult = container.run(JsObject(), JsObject(), runTimeout)
diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index fc3ba6805f..a6e5d20f57 100644
--- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -235,7 +235,7 @@ class KubernetesContainerTests
     val interval = intervalOf(initTimeout + 1.nanoseconds)
 
     val container = kubernetesContainer() {
-      Future.successful(RunResult(interval, Left(Timeout())))
+      Future.successful(RunResult(interval, Left(Timeout(new Throwable()))))
     }
 
     val init = container.initialize(JsObject(), initTimeout)
@@ -284,7 +284,7 @@ class KubernetesContainerTests
     val interval = intervalOf(runTimeout + 1.nanoseconds)
 
     val container = kubernetesContainer() {
-      Future.successful(RunResult(interval, Left(Timeout())))
+      Future.successful(RunResult(interval, Left(Timeout(new Throwable()))))
     }
 
     val runResult = container.run(JsObject(), JsObject(), runTimeout)
diff --git a/tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala b/tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala
index 6e388773d1..78dd29642f 100644
--- a/tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala
+++ b/tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala
@@ -58,7 +58,7 @@ class ActivationResponseTests extends FlatSpec with Matchers {
   }
 
   it should "interpret failed init that does not response" in {
-    Seq(ConnectionError(new Throwable()), NoResponseReceived(), Timeout())
+    Seq(ConnectionError(new Throwable()), NoResponseReceived(), Timeout(new Throwable()))
       .map(Left(_))
       .foreach { e =>
         val ar = processInitResponseContent(e, logger)
@@ -122,7 +122,7 @@ class ActivationResponseTests extends FlatSpec with Matchers {
   }
 
   it should "interpret failed run that does not response" in {
-    Seq(ConnectionError(new Throwable()), NoResponseReceived(), Timeout())
+    Seq(ConnectionError(new Throwable()), NoResponseReceived(), Timeout(new Throwable()))
       .map(Left(_))
       .foreach { e =>
         val ar = processRunResponseContent(e, logger)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services