You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ty...@apache.org on 2018/09/06 15:39:23 UTC
[incubator-openwhisk] branch master updated: close connections and
reset pool on Container.pause (#3976)
This is an automated email from the ASF dual-hosted git repository.
tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 569b51b close connections and reset pool on Container.pause (#3976)
569b51b is described below
commit 569b51b819ebc513d35ff3c139f4c47764d765bf
Author: tysonnorris <ty...@gmail.com>
AuthorDate: Thu Sep 6 08:39:14 2018 -0700
close connections and reset pool on Container.pause (#3976)
* close connections and reset pool on Container.pause
---
.../whisk/core/containerpool/AkkaContainerClient.scala | 4 ++--
.../core/containerpool/ApacheBlockingContainerClient.scala | 2 +-
.../src/main/scala/whisk/core/containerpool/Container.scala | 13 +++++++++++--
.../scala/whisk/core/containerpool/ContainerClient.scala | 4 ++--
.../scala/src/main/scala/whisk/core/mesos/MesosTask.scala | 4 ++--
.../scala/src/main/scala/whisk/http/PoolingRestClient.scala | 7 ++-----
.../whisk/core/containerpool/docker/DockerContainer.scala | 5 +++--
.../core/containerpool/kubernetes/KubernetesContainer.scala | 4 +++-
.../logging/test/DockerToActivationLogStoreTests.scala | 2 +-
.../whisk/core/containerpool/test/ContainerProxyTests.scala | 4 ++--
10 files changed, 29 insertions(+), 20 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/AkkaContainerClient.scala b/common/scala/src/main/scala/whisk/core/containerpool/AkkaContainerClient.scala
index 1b53931..4462712 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/AkkaContainerClient.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/AkkaContainerClient.scala
@@ -71,10 +71,10 @@ protected class AkkaContainerClient(
maxResponse: ByteSize,
queueSize: Int,
retryInterval: FiniteDuration = 100.milliseconds)(implicit logging: Logging, as: ActorSystem)
- extends PoolingRestClient("http", hostname, port, queueSize, timeout = Some(timeout), keepAlive = Some(false))
+ extends PoolingRestClient("http", hostname, port, queueSize, timeout = Some(timeout))
with ContainerClient {
- def close() = Await.result(shutdown(), 30.seconds)
+ def close() = shutdown()
/**
* Posts to hostname/endpoint the given JSON object.
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/ApacheBlockingContainerClient.scala b/common/scala/src/main/scala/whisk/core/containerpool/ApacheBlockingContainerClient.scala
index a6d71c3..9db36a3 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/ApacheBlockingContainerClient.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/ApacheBlockingContainerClient.scala
@@ -72,7 +72,7 @@ protected class ApacheBlockingContainerClient(hostname: String,
* This will close the HttpClient that is generated for this instance of ApacheBlockingContainerClient. That will also cause the
* ConnectionManager to be closed alongside.
*/
- def close(): Unit = HttpClientUtils.closeQuietly(connection)
+ def close(): Future[Unit] = Future.successful(HttpClientUtils.closeQuietly(connection))
/**
* Posts to hostname/endpoint the given JSON object.
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
index 54c8a1b..1dfe66a 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
@@ -75,7 +75,13 @@ trait Container {
protected var httpConnection: Option[ContainerClient] = None
/** Stops the container from consuming CPU cycles. */
- def suspend()(implicit transid: TransactionId): Future[Unit]
+ def suspend()(implicit transid: TransactionId): Future[Unit] = {
+ //close connection first, then close connection pool
+ //(testing pool recreation vs connection closing, time was similar - so using the simpler recreation approach)
+ val toClose = httpConnection
+ httpConnection = None
+ closeConnections(toClose)
+ }
/** Dual of halt. */
def resume()(implicit transid: TransactionId): Future[Unit]
@@ -85,7 +91,7 @@ trait Container {
/** Completely destroys this instance of the container. */
def destroy()(implicit transid: TransactionId): Future[Unit] = {
- Future.successful(httpConnection.foreach(_.close()))
+ closeConnections(httpConnection)
}
/** Initializes code in the container. */
@@ -194,6 +200,9 @@ trait Container {
RunResult(Interval(started, finished), response)
}
}
+ private def closeConnections(toClose: Option[ContainerClient]): Future[Unit] = {
+ toClose.map(_.close()).getOrElse(Future.successful(()))
+ }
}
/** Indicates a general error with the container */
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/ContainerClient.scala b/common/scala/src/main/scala/whisk/core/containerpool/ContainerClient.scala
index dfcd231..b816119 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/ContainerClient.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/ContainerClient.scala
@@ -23,8 +23,8 @@ import whisk.common.TransactionId
import whisk.core.entity.ActivationResponse.ContainerHttpError
import whisk.core.entity.ActivationResponse._
-trait ContainerClient extends AutoCloseable {
+trait ContainerClient {
def post(endpoint: String, body: JsValue, retry: Boolean)(
implicit tid: TransactionId): Future[Either[ContainerHttpError, ContainerResponse]]
-
+ def close(): Future[Unit]
}
diff --git a/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala b/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala
index 1c0dc1c..9b21903 100644
--- a/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala
+++ b/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala
@@ -142,8 +142,8 @@ class MesosTask(override protected val id: ContainerId,
/** Stops the container from consuming CPU cycles. */
override def suspend()(implicit transid: TransactionId): Future[Unit] = {
- // suspend not supported
- Future.successful(Unit)
+ super.suspend()
+ // suspend not supported (just return result from super)
}
/** Dual of halt. */
diff --git a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
index 1edade1..146592e 100644
--- a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
+++ b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
@@ -24,7 +24,6 @@ import akka.http.scaladsl.marshalling._
import akka.http.scaladsl.model._
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.http.scaladsl.unmarshalling._
-import akka.io.Tcp.SO
import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
import akka.stream.scaladsl.{Flow, _}
import spray.json._
@@ -46,8 +45,7 @@ class PoolingRestClient(
port: Int,
queueSize: Int,
httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
- timeout: Option[FiniteDuration] = None,
- keepAlive: Option[Boolean] = None)(implicit system: ActorSystem) {
+ timeout: Option[FiniteDuration] = None)(implicit system: ActorSystem) {
require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.")
protected implicit val context: ExecutionContext = system.dispatcher
@@ -56,8 +54,7 @@ class PoolingRestClient(
//if specified, override the ClientConnection idle-timeout and keepalive socket option value
private val timeoutSettings = {
ConnectionPoolSettings(system.settings.config).withUpdatedConnectionSettings { s =>
- val t = timeout.map(t => s.withIdleTimeout(t)).getOrElse(s)
- keepAlive.map(k => t.withSocketOptions(SO.KeepAlive(k) :: Nil)).getOrElse(t)
+ timeout.map(t => s.withIdleTimeout(t)).getOrElse(s)
}
}
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
index fddabd6..538e367 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
@@ -171,8 +171,9 @@ class DockerContainer(protected val id: ContainerId,
protected val waitForOomState: FiniteDuration = 2.seconds
protected val filePollInterval: FiniteDuration = 5.milliseconds
- def suspend()(implicit transid: TransactionId): Future[Unit] =
- if (useRunc) { runc.pause(id) } else { docker.pause(id) }
+ override def suspend()(implicit transid: TransactionId): Future[Unit] = {
+ super.suspend().flatMap(_ => if (useRunc) runc.pause(id) else docker.pause(id))
+ }
def resume()(implicit transid: TransactionId): Future[Unit] =
if (useRunc) { runc.resume(id) } else { docker.unpause(id) }
override def destroy()(implicit transid: TransactionId): Future[Unit] = {
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
index 7c86c11..5ccec76 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -100,7 +100,9 @@ class KubernetesContainer(protected[core] val id: ContainerId,
protected val waitForLogs: FiniteDuration = 2.seconds
- def suspend()(implicit transid: TransactionId): Future[Unit] = kubernetes.suspend(this)
+ override def suspend()(implicit transid: TransactionId): Future[Unit] = {
+ super.suspend().flatMap(_ => kubernetes.suspend(this))
+ }
def resume()(implicit transid: TransactionId): Future[Unit] = kubernetes.resume(this)
diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
index 542d1b1..287c20b 100644
--- a/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
@@ -102,7 +102,7 @@ class DockerToActivationLogStoreTests extends FlatSpec with Matchers with WskAct
val addr: ContainerAddress = ContainerAddress("test", 1234))(implicit val ec: ExecutionContext,
val logging: Logging)
extends Container {
- def suspend()(implicit transid: TransactionId): Future[Unit] = ???
+ override def suspend()(implicit transid: TransactionId): Future[Unit] = ???
def resume()(implicit transid: TransactionId): Future[Unit] = ???
def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId) = lines
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index da1d8e9..904daeb 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -814,9 +814,9 @@ class ContainerProxyTests
var runCount = 0
var logsCount = 0
- def suspend()(implicit transid: TransactionId): Future[Unit] = {
+ override def suspend()(implicit transid: TransactionId): Future[Unit] = {
suspendCount += 1
- Future.successful(())
+ super.suspend()
}
def resume()(implicit transid: TransactionId): Future[Unit] = {
resumeCount += 1