You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/05/29 13:49:55 UTC
[GitHub] markusthoemmes closed pull request #3710: Make REST communication
with action containers more robust
markusthoemmes closed pull request #3710: Make REST communication with action containers more robust
URL: https://github.com/apache/incubator-openwhisk/pull/3710
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
index e0fd37ff41..07c6fc7e5b 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
@@ -17,14 +17,15 @@
package whisk.core.containerpool
+import java.net.NoRouteToHostException
import java.nio.charset.StandardCharsets
-import scala.concurrent.duration.DurationInt
-import scala.concurrent.duration.FiniteDuration
+import scala.annotation.tailrec
+import scala.concurrent.duration._
+import scala.util.control.NoStackTrace
import scala.util.Failure
import scala.util.Success
import scala.util.Try
-
import org.apache.commons.io.IOUtils
import org.apache.http.HttpHeaders
import org.apache.http.client.config.RequestConfig
@@ -34,8 +35,9 @@ import org.apache.http.client.utils.URIBuilder
import org.apache.http.conn.HttpHostConnectException
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClientBuilder
-
import spray.json._
+import whisk.common.Logging
+import whisk.common.TransactionId
import whisk.core.entity.ActivationResponse._
import whisk.core.entity.ByteSize
import whisk.core.entity.size.SizeLong
@@ -52,7 +54,8 @@ import whisk.core.entity.size.SizeLong
* @param timeout the timeout in msecs to wait for a response
* @param maxResponse the maximum size in bytes the connection will accept
*/
-protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxResponse: ByteSize) {
+protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxResponse: ByteSize)(
+ implicit logging: Logging) {
def close() = Try(connection.close())
@@ -68,7 +71,8 @@ protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxRe
* @param retry whether or not to retry on connection failure
* @return Left(Error Message) or Right(Status Code, Response as UTF-8 String)
*/
- def post(endpoint: String, body: JsValue, retry: Boolean): Either[ContainerHttpError, ContainerResponse] = {
+ def post(endpoint: String, body: JsValue, retry: Boolean)(
+ implicit tid: TransactionId): Either[ContainerHttpError, ContainerResponse] = {
val entity = new StringEntity(body.compactPrint, StandardCharsets.UTF_8)
entity.setContentType("application/json")
@@ -76,12 +80,15 @@ protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxRe
request.addHeader(HttpHeaders.ACCEPT, "application/json")
request.setEntity(entity)
- execute(request, timeout.toMillis.toInt, retry)
+ execute(request, timeout, retry)
}
- private def execute(request: HttpRequestBase,
- timeoutMsec: Integer,
- retry: Boolean): Either[ContainerHttpError, ContainerResponse] = {
+ // Used internally to wrap all exceptions for which the request can be retried
+ private case class RetryableConnectionError(t: Throwable) extends Exception(t) with NoStackTrace
+
+ // Annotation will make the compiler complain if no tail recursion is possible
+ @tailrec private def execute(request: HttpRequestBase, timeout: FiniteDuration, retry: Boolean)(
+ implicit tid: TransactionId): Either[ContainerHttpError, ContainerResponse] = {
Try(connection.execute(request)).map { response =>
val containerResponse = Option(response.getEntity)
.map { entity =>
@@ -105,15 +112,29 @@ protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxRe
response.close()
containerResponse
+ } recoverWith {
+ // The route to target socket as well as the target socket itself may need some time to be available -
+ // particularly on a loaded system.
+ // The following exceptions occur on such transient conditions. In addition, no data has been transmitted
+ // yet if these exceptions occur. For this reason, it is safe and reasonable to retry.
+ //
+ // HttpHostConnectException: no target socket is listening (yet).
+ case t: HttpHostConnectException => Failure(RetryableConnectionError(t))
+ //
+ // NoRouteToHostException: route to target host is not known (yet).
+ case t: NoRouteToHostException => Failure(RetryableConnectionError(t))
} match {
- case Success(r) => r
- case Failure(t: HttpHostConnectException) if retry =>
- if (timeoutMsec > 0) {
- Thread sleep 100
- val newTimeout = timeoutMsec - 100
- execute(request, newTimeout, retry)
+ case Success(response) => response
+ case Failure(t: RetryableConnectionError) if retry =>
+ val sleepTime = 10.milliseconds
+ if (timeout > Duration.Zero) {
+ logging.info(this, s"POST failed with ${t} - retrying after sleeping ${sleepTime}.")
+ Thread.sleep(sleepTime.toMillis)
+ val newTimeout = timeout - sleepTime
+ execute(request, newTimeout, retry = true)
} else {
- Left(Timeout())
+ logging.warn(this, s"POST failed with ${t} - no retry because timeout exceeded.")
+ Left(Timeout(t))
}
case Failure(t: Throwable) => Left(ConnectionError(t))
}
@@ -141,14 +162,15 @@ protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxRe
object HttpUtils {
/** A helper method to post one single request to a connection. Used for container tests. */
- def post(host: String, port: Int, endPoint: String, content: JsValue): (Int, Option[JsObject]) = {
+ def post(host: String, port: Int, endPoint: String, content: JsValue)(implicit logging: Logging,
+ tid: TransactionId): (Int, Option[JsObject]) = {
val connection = new HttpUtils(s"$host:$port", 90.seconds, 1.MB)
val response = connection.post(endPoint, content, retry = true)
connection.close()
response match {
case Right(r) => (r.statusCode, Try(r.entity.parseJson.asJsObject).toOption)
case Left(NoResponseReceived()) => throw new IllegalStateException("no response from container")
- case Left(Timeout()) => throw new java.util.concurrent.TimeoutException()
+ case Left(Timeout(_)) => throw new java.util.concurrent.TimeoutException()
case Left(ConnectionError(t: java.net.SocketTimeoutException)) =>
throw new java.util.concurrent.TimeoutException()
case Left(ConnectionError(t)) => throw new IllegalStateException(t.getMessage)
diff --git a/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala b/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
index 85a9f36015..5f8c815a7f 100644
--- a/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
@@ -99,7 +99,7 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol {
protected[core] sealed trait ContainerHttpError extends ContainerConnectionError
protected[core] case class ConnectionError(t: Throwable) extends ContainerHttpError
protected[core] case class NoResponseReceived() extends ContainerHttpError
- protected[core] case class Timeout() extends ContainerHttpError
+ protected[core] case class Timeout(t: Throwable) extends ContainerHttpError
protected[core] case class MemoryExhausted() extends ContainerConnectionError
diff --git a/tests/src/test/scala/actionContainers/ActionContainer.scala b/tests/src/test/scala/actionContainers/ActionContainer.scala
index 3ee1f47314..97915c8aad 100644
--- a/tests/src/test/scala/actionContainers/ActionContainer.scala
+++ b/tests/src/test/scala/actionContainers/ActionContainer.scala
@@ -31,13 +31,13 @@ import scala.sys.process.ProcessLogger
import scala.sys.process.stringToProcess
import scala.util.Random
import scala.util.{Failure, Success}
-
import org.apache.commons.lang3.StringUtils
-import org.scalatest.FlatSpec
-import org.scalatest.Matchers
-
+import org.scalatest.{FlatSpec, Matchers}
import akka.actor.ActorSystem
import spray.json._
+import common.StreamLogging
+import whisk.common.Logging
+import whisk.common.TransactionId
import whisk.core.entity.Exec
/**
@@ -49,7 +49,7 @@ trait ActionContainer {
def run(value: JsValue): (Int, Option[JsObject])
}
-trait ActionProxyContainerTestUtils extends FlatSpec with Matchers {
+trait ActionProxyContainerTestUtils extends FlatSpec with Matchers with StreamLogging {
import ActionContainer.{filterSentinel, sentinel}
def initPayload(code: String, main: String = "main"): JsObject =
@@ -149,8 +149,8 @@ object ActionContainer {
val sentinel = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX"
def filterSentinel(str: String): String = str.replaceAll(sentinel, "").trim
- def withContainer(imageName: String, environment: Map[String, String] = Map.empty)(code: ActionContainer => Unit)(
- implicit actorSystem: ActorSystem): (String, String) = {
+ def withContainer(imageName: String, environment: Map[String, String] = Map.empty)(
+ code: ActionContainer => Unit)(implicit actorSystem: ActorSystem, logging: Logging): (String, String) = {
val rand = { val r = Random.nextInt; if (r < 0) -r else r }
val name = imageName.toLowerCase.replaceAll("""[^a-z]""", "") + rand
val envArgs = environment.toSeq
@@ -204,7 +204,11 @@ object ActionContainer {
}
}
- private def syncPost(host: String, port: Int, endPoint: String, content: JsValue): (Int, Option[JsObject]) = {
+ private def syncPost(host: String, port: Int, endPoint: String, content: JsValue)(
+ implicit logging: Logging): (Int, Option[JsObject]) = {
+
+ implicit val transid = TransactionId.testing
+
whisk.core.containerpool.HttpUtils.post(host, port, endPoint, content)
}
}
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
index 0e0867a25d..95754220b2 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
@@ -21,7 +21,6 @@ import java.nio.charset.StandardCharsets
import java.time.Instant
import scala.concurrent.duration._
-
import org.apache.http.HttpRequest
import org.apache.http.HttpResponse
import org.apache.http.entity.StringEntity
@@ -34,8 +33,9 @@ import org.scalatest.BeforeAndAfter
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
import org.scalatest.Matchers
-
import spray.json.JsObject
+import common.StreamLogging
+import whisk.common.TransactionId
import whisk.core.containerpool.HttpUtils
import whisk.core.entity.size._
import whisk.core.entity.ActivationResponse._
@@ -44,7 +44,14 @@ import whisk.core.entity.ActivationResponse._
* Unit tests for HttpUtils which communicate with containers.
*/
@RunWith(classOf[JUnitRunner])
-class ContainerConnectionTests extends FlatSpec with Matchers with BeforeAndAfter with BeforeAndAfterAll {
+class ContainerConnectionTests
+ extends FlatSpec
+ with Matchers
+ with BeforeAndAfter
+ with BeforeAndAfterAll
+ with StreamLogging {
+
+ implicit val transid = TransactionId.testing
var testHang: FiniteDuration = 0.second
var testStatusCode: Int = 200
@@ -75,6 +82,7 @@ class ContainerConnectionTests extends FlatSpec with Matchers with BeforeAndAfte
testHang = 0.second
testStatusCode = 200
testResponse = null
+ stream.reset()
}
override def afterAll = {
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
index 25bc0c980c..b3aa2b0227 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -383,7 +383,7 @@ class DockerContainerTests
val interval = intervalOf(initTimeout + 1.nanoseconds)
val container = dockerContainer() {
- Future.successful(RunResult(interval, Left(Timeout())))
+ Future.successful(RunResult(interval, Left(Timeout(new Throwable()))))
}
val init = container.initialize(JsObject(), initTimeout)
@@ -434,7 +434,7 @@ class DockerContainerTests
val interval = intervalOf(runTimeout + 1.nanoseconds)
val container = dockerContainer() {
- Future.successful(RunResult(interval, Left(Timeout())))
+ Future.successful(RunResult(interval, Left(Timeout(new Throwable()))))
}
val runResult = container.run(JsObject(), JsObject(), runTimeout)
diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index fc3ba6805f..a6e5d20f57 100644
--- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -235,7 +235,7 @@ class KubernetesContainerTests
val interval = intervalOf(initTimeout + 1.nanoseconds)
val container = kubernetesContainer() {
- Future.successful(RunResult(interval, Left(Timeout())))
+ Future.successful(RunResult(interval, Left(Timeout(new Throwable()))))
}
val init = container.initialize(JsObject(), initTimeout)
@@ -284,7 +284,7 @@ class KubernetesContainerTests
val interval = intervalOf(runTimeout + 1.nanoseconds)
val container = kubernetesContainer() {
- Future.successful(RunResult(interval, Left(Timeout())))
+ Future.successful(RunResult(interval, Left(Timeout(new Throwable()))))
}
val runResult = container.run(JsObject(), JsObject(), runTimeout)
diff --git a/tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala b/tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala
index 6e388773d1..78dd29642f 100644
--- a/tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala
+++ b/tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala
@@ -58,7 +58,7 @@ class ActivationResponseTests extends FlatSpec with Matchers {
}
it should "interpret failed init that does not response" in {
- Seq(ConnectionError(new Throwable()), NoResponseReceived(), Timeout())
+ Seq(ConnectionError(new Throwable()), NoResponseReceived(), Timeout(new Throwable()))
.map(Left(_))
.foreach { e =>
val ar = processInitResponseContent(e, logger)
@@ -122,7 +122,7 @@ class ActivationResponseTests extends FlatSpec with Matchers {
}
it should "interpret failed run that does not response" in {
- Seq(ConnectionError(new Throwable()), NoResponseReceived(), Timeout())
+ Seq(ConnectionError(new Throwable()), NoResponseReceived(), Timeout(new Throwable()))
.map(Left(_))
.foreach { e =>
val ar = processRunResponseContent(e, logger)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services