You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ty...@apache.org on 2018/09/06 15:39:23 UTC

[incubator-openwhisk] branch master updated: close connections and reset pool on Container.pause (#3976)

This is an automated email from the ASF dual-hosted git repository.

tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 569b51b  close connections and reset pool on Container.pause (#3976)
569b51b is described below

commit 569b51b819ebc513d35ff3c139f4c47764d765bf
Author: tysonnorris <ty...@gmail.com>
AuthorDate: Thu Sep 6 08:39:14 2018 -0700

    close connections and reset pool on Container.pause (#3976)
    
    * close connections and reset pool on Container.pause
---
 .../whisk/core/containerpool/AkkaContainerClient.scala      |  4 ++--
 .../core/containerpool/ApacheBlockingContainerClient.scala  |  2 +-
 .../src/main/scala/whisk/core/containerpool/Container.scala | 13 +++++++++++--
 .../scala/whisk/core/containerpool/ContainerClient.scala    |  4 ++--
 .../scala/src/main/scala/whisk/core/mesos/MesosTask.scala   |  4 ++--
 .../scala/src/main/scala/whisk/http/PoolingRestClient.scala |  7 ++-----
 .../whisk/core/containerpool/docker/DockerContainer.scala   |  5 +++--
 .../core/containerpool/kubernetes/KubernetesContainer.scala |  4 +++-
 .../logging/test/DockerToActivationLogStoreTests.scala      |  2 +-
 .../whisk/core/containerpool/test/ContainerProxyTests.scala |  4 ++--
 10 files changed, 29 insertions(+), 20 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/core/containerpool/AkkaContainerClient.scala b/common/scala/src/main/scala/whisk/core/containerpool/AkkaContainerClient.scala
index 1b53931..4462712 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/AkkaContainerClient.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/AkkaContainerClient.scala
@@ -71,10 +71,10 @@ protected class AkkaContainerClient(
   maxResponse: ByteSize,
   queueSize: Int,
   retryInterval: FiniteDuration = 100.milliseconds)(implicit logging: Logging, as: ActorSystem)
-    extends PoolingRestClient("http", hostname, port, queueSize, timeout = Some(timeout), keepAlive = Some(false))
+    extends PoolingRestClient("http", hostname, port, queueSize, timeout = Some(timeout))
     with ContainerClient {
 
-  def close() = Await.result(shutdown(), 30.seconds)
+  def close() = shutdown()
 
   /**
    * Posts to hostname/endpoint the given JSON object.
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/ApacheBlockingContainerClient.scala b/common/scala/src/main/scala/whisk/core/containerpool/ApacheBlockingContainerClient.scala
index a6d71c3..9db36a3 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/ApacheBlockingContainerClient.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/ApacheBlockingContainerClient.scala
@@ -72,7 +72,7 @@ protected class ApacheBlockingContainerClient(hostname: String,
    * This will close the HttpClient that is generated for this instance of ApacheBlockingContainerClient. That will also cause the
    * ConnectionManager to be closed alongside.
    */
-  def close(): Unit = HttpClientUtils.closeQuietly(connection)
+  def close(): Future[Unit] = Future.successful(HttpClientUtils.closeQuietly(connection))
 
   /**
    * Posts to hostname/endpoint the given JSON object.
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
index 54c8a1b..1dfe66a 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
@@ -75,7 +75,13 @@ trait Container {
   protected var httpConnection: Option[ContainerClient] = None
 
   /** Stops the container from consuming CPU cycles. */
-  def suspend()(implicit transid: TransactionId): Future[Unit]
+  def suspend()(implicit transid: TransactionId): Future[Unit] = {
+    //close connection first, then close connection pool
+    //(testing pool recreation vs connection closing, time was similar - so using the simpler recreation approach)
+    val toClose = httpConnection
+    httpConnection = None
+    closeConnections(toClose)
+  }
 
   /** Dual of halt. */
   def resume()(implicit transid: TransactionId): Future[Unit]
@@ -85,7 +91,7 @@ trait Container {
 
   /** Completely destroys this instance of the container. */
   def destroy()(implicit transid: TransactionId): Future[Unit] = {
-    Future.successful(httpConnection.foreach(_.close()))
+    closeConnections(httpConnection)
   }
 
   /** Initializes code in the container. */
@@ -194,6 +200,9 @@ trait Container {
         RunResult(Interval(started, finished), response)
       }
   }
+  private def closeConnections(toClose: Option[ContainerClient]): Future[Unit] = {
+    toClose.map(_.close()).getOrElse(Future.successful(()))
+  }
 }
 
 /** Indicates a general error with the container */
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/ContainerClient.scala b/common/scala/src/main/scala/whisk/core/containerpool/ContainerClient.scala
index dfcd231..b816119 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/ContainerClient.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/ContainerClient.scala
@@ -23,8 +23,8 @@ import whisk.common.TransactionId
 import whisk.core.entity.ActivationResponse.ContainerHttpError
 import whisk.core.entity.ActivationResponse._
 
-trait ContainerClient extends AutoCloseable {
+trait ContainerClient {
   def post(endpoint: String, body: JsValue, retry: Boolean)(
     implicit tid: TransactionId): Future[Either[ContainerHttpError, ContainerResponse]]
-
+  def close(): Future[Unit]
 }
diff --git a/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala b/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala
index 1c0dc1c..9b21903 100644
--- a/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala
+++ b/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala
@@ -142,8 +142,8 @@ class MesosTask(override protected val id: ContainerId,
 
   /** Stops the container from consuming CPU cycles. */
   override def suspend()(implicit transid: TransactionId): Future[Unit] = {
-    // suspend not supported
-    Future.successful(Unit)
+    super.suspend()
+    // suspend not supported (just return result from super)
   }
 
   /** Dual of halt. */
diff --git a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
index 1edade1..146592e 100644
--- a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
+++ b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
@@ -24,7 +24,6 @@ import akka.http.scaladsl.marshalling._
 import akka.http.scaladsl.model._
 import akka.http.scaladsl.settings.ConnectionPoolSettings
 import akka.http.scaladsl.unmarshalling._
-import akka.io.Tcp.SO
 import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
 import akka.stream.scaladsl.{Flow, _}
 import spray.json._
@@ -46,8 +45,7 @@ class PoolingRestClient(
   port: Int,
   queueSize: Int,
   httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
-  timeout: Option[FiniteDuration] = None,
-  keepAlive: Option[Boolean] = None)(implicit system: ActorSystem) {
+  timeout: Option[FiniteDuration] = None)(implicit system: ActorSystem) {
   require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.")
 
   protected implicit val context: ExecutionContext = system.dispatcher
@@ -56,8 +54,7 @@ class PoolingRestClient(
   //if specified, override the ClientConnection idle-timeout and keepalive socket option value
   private val timeoutSettings = {
     ConnectionPoolSettings(system.settings.config).withUpdatedConnectionSettings { s =>
-      val t = timeout.map(t => s.withIdleTimeout(t)).getOrElse(s)
-      keepAlive.map(k => t.withSocketOptions(SO.KeepAlive(k) :: Nil)).getOrElse(t)
+      timeout.map(t => s.withIdleTimeout(t)).getOrElse(s)
     }
   }
 
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
index fddabd6..538e367 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
@@ -171,8 +171,9 @@ class DockerContainer(protected val id: ContainerId,
   protected val waitForOomState: FiniteDuration = 2.seconds
   protected val filePollInterval: FiniteDuration = 5.milliseconds
 
-  def suspend()(implicit transid: TransactionId): Future[Unit] =
-    if (useRunc) { runc.pause(id) } else { docker.pause(id) }
+  override def suspend()(implicit transid: TransactionId): Future[Unit] = {
+    super.suspend().flatMap(_ => if (useRunc) runc.pause(id) else docker.pause(id))
+  }
   def resume()(implicit transid: TransactionId): Future[Unit] =
     if (useRunc) { runc.resume(id) } else { docker.unpause(id) }
   override def destroy()(implicit transid: TransactionId): Future[Unit] = {
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
index 7c86c11..5ccec76 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -100,7 +100,9 @@ class KubernetesContainer(protected[core] val id: ContainerId,
 
   protected val waitForLogs: FiniteDuration = 2.seconds
 
-  def suspend()(implicit transid: TransactionId): Future[Unit] = kubernetes.suspend(this)
+  override def suspend()(implicit transid: TransactionId): Future[Unit] = {
+    super.suspend().flatMap(_ => kubernetes.suspend(this))
+  }
 
   def resume()(implicit transid: TransactionId): Future[Unit] = kubernetes.resume(this)
 
diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
index 542d1b1..287c20b 100644
--- a/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
@@ -102,7 +102,7 @@ class DockerToActivationLogStoreTests extends FlatSpec with Matchers with WskAct
                       val addr: ContainerAddress = ContainerAddress("test", 1234))(implicit val ec: ExecutionContext,
                                                                                    val logging: Logging)
       extends Container {
-    def suspend()(implicit transid: TransactionId): Future[Unit] = ???
+    override def suspend()(implicit transid: TransactionId): Future[Unit] = ???
     def resume()(implicit transid: TransactionId): Future[Unit] = ???
 
     def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId) = lines
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index da1d8e9..904daeb 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -814,9 +814,9 @@ class ContainerProxyTests
     var runCount = 0
     var logsCount = 0
 
-    def suspend()(implicit transid: TransactionId): Future[Unit] = {
+    override def suspend()(implicit transid: TransactionId): Future[Unit] = {
       suspendCount += 1
-      Future.successful(())
+      super.suspend()
     }
     def resume()(implicit transid: TransactionId): Future[Unit] = {
       resumeCount += 1