You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/10/25 15:23:12 UTC

[GitHub] kpavel closed pull request #4082: Rename prewarmed containers

kpavel closed pull request #4082: Rename prewarmed containers
URL: https://github.com/apache/incubator-openwhisk/pull/4082
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
index 3167176abd..37a2fbafef 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
@@ -64,6 +64,7 @@ object Container {
 
 trait Container {
 
+  protected var _name: String
   implicit protected val as: ActorSystem
   protected val id: ContainerId
   protected val addr: ContainerAddress
@@ -73,6 +74,8 @@ trait Container {
   /** HTTP connection to the container, will be lazily established by callContainer */
   protected var httpConnection: Option[ContainerClient] = None
 
+  def name = _name
+
   /** Stops the container from consuming CPU cycles. */
   def suspend()(implicit transid: TransactionId): Future[Unit] = {
     //close connection first, then close connection pool
@@ -199,6 +202,10 @@ trait Container {
         RunResult(Interval(started, finished), response)
       }
   }
+
+  /** Rename container. */
+  def rename(name: String)(implicit transid: TransactionId): Future[Unit]
+
   private def closeConnections(toClose: Option[ContainerClient]): Future[Unit] = {
     toClose.map(_.close()).getOrElse(Future.successful(()))
   }
diff --git a/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala b/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala
index 9b21903a24..36f459fcf3 100644
--- a/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala
+++ b/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala
@@ -119,7 +119,7 @@ object MesosTask {
       log.info(this, s"launched task with state ${taskDetails.taskStatus.getState} at ${taskHost}:${taskPort}")
       val containerIp = new ContainerAddress(taskHost, taskPort)
       val containerId = new ContainerId(taskId);
-      new MesosTask(containerId, containerIp, ec, log, as, taskId, mesosClientActor, mesosConfig)
+      new MesosTask(name.get, containerId, containerIp, ec, log, as, taskId, mesosClientActor, mesosConfig)
     })
 
   }
@@ -130,7 +130,8 @@ object JsonFormatters extends DefaultJsonProtocol {
   implicit val createContainerJson = jsonFormat3(CreateContainer)
 }
 
-class MesosTask(override protected val id: ContainerId,
+class MesosTask(override protected var _name: String,
+                override protected val id: ContainerId,
                 override protected val addr: ContainerAddress,
                 override protected val ec: ExecutionContext,
                 override protected val logging: Logging,
@@ -182,4 +183,9 @@ class MesosTask(override protected val id: ContainerId,
   override def logs(limit: ByteSize, waitForSentinel: Boolean)(
     implicit transid: TransactionId): Source[ByteString, Any] =
     Source.single(ByteString(LogLine(logMsg, "stdout", Instant.now.toString).toJson.compactPrint))
+
+  /** Rename container. */
+  override def rename(name: String)(implicit transid: TransactionId): Future[Unit] =
+    // rename currently not supported
+    Future.successful(Unit)
 }
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index b34ce587d4..b553ef9e5d 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -330,6 +330,19 @@ class ContainerProxy(
     goto(Removing)
   }
 
+  /**
+   * Checks whether container name is "prewarmed"
+   *
+   * @param suffix the container name's suffix
+   * @return a unique container name
+   */
+  def isPrewarmed(name: String, instance: InvokerInstanceId, suffix: String): Boolean = {
+      val sanitizedSuffix = suffix.filter(ContainerProxy.isAllowed)
+
+    return name.startsWith(s"${ContainerFactory.containerNamePrefix(instance)}_") && name.endsWith(
+      s"_prewarm_${sanitizedSuffix}")
+  }
+
   /**
    * Runs the job, initialize first if necessary.
    * Completes the job by:
@@ -349,7 +362,15 @@ class ContainerProxy(
     // Only initialize iff we haven't yet warmed the container
     val initialize = stateData match {
       case data: WarmedData => Future.successful(None)
-      case _                => container.initialize(job.action.containerInitializer, actionTimeout).map(Some(_))
+      case _ =>
+        // check if container has generic "prewarm" name
+        if (isPrewarmed(container.name, instance, job.action.exec.kind)) {
+          // now rename container to action based name
+          val newName =
+            ContainerProxy.containerName(instance, job.msg.user.namespace.name.asString, job.action.name.asString)
+          container.rename(newName)
+        }
+        container.initialize(job.action.containerInitializer, actionTimeout).map(Some(_))
     }
 
     val activation: Future[WhiskActivation] = initialize
@@ -450,6 +471,8 @@ object ContainerProxy {
 
   val timeouts = loadConfigOrThrow[ContainerProxyTimeoutConfig](ConfigKeys.containerProxyTimeouts)
 
+  def isAllowed(c: Char): Boolean = c.isLetterOrDigit || c == '_'
+
   /**
    * Generates a unique container name.
    *
@@ -458,8 +481,6 @@ object ContainerProxy {
    * @return a unique container name
    */
   def containerName(instance: InvokerInstanceId, prefix: String, suffix: String): String = {
-    def isAllowed(c: Char): Boolean = c.isLetterOrDigit || c == '_'
-
     val sanitizedPrefix = prefix.filter(isAllowed)
     val sanitizedSuffix = suffix.filter(isAllowed)
 
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
index 8a87e379e8..87266599c1 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
@@ -192,6 +192,9 @@ class DockerClient(dockerHost: Option[String] = None,
       case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel)
     }
   }
+
+  def rename(id: ContainerId, name: String)(implicit transid: TransactionId): Future[Unit] =
+    runCmd(Seq("rename", id.asString, name), config.timeouts.pause).map(_ => ())
 }
 
 trait DockerApi {
@@ -268,6 +271,15 @@ trait DockerApi {
    * @return a Future containing whether the container was killed or not
    */
   def isOomKilled(id: ContainerId)(implicit transid: TransactionId): Future[Boolean]
+
+  /**
+   * Rename container.
+   *
+   * @param id the id of the container to rename
+   * @param name new container name
+   * @return a Future completing once the rename is complete
+   */
+  def rename(id: ContainerId, name: String)(implicit transid: TransactionId): Future[Unit]
 }
 
 /** Indicates any error while starting a container that leaves a broken container behind that needs to be removed */
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
index 47b6d2439f..5a94cd1a2b 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
@@ -39,6 +39,7 @@ import spray.json._
 import whisk.core.containerpool.logging.LogLine
 import whisk.core.entity.ExecManifest.ImageName
 import whisk.http.Messages
+import scala.util.{Failure, Success}
 
 object DockerContainer {
 
@@ -141,7 +142,7 @@ object DockerContainer {
           docker.rm(id)
           Future.failed(WhiskContainerStartupError(Messages.resourceProvisionError))
       }
-    } yield new DockerContainer(id, ip, useRunc)
+    } yield new DockerContainer(name.get, id, ip, useRunc)
   }
 }
 
@@ -155,7 +156,8 @@ object DockerContainer {
  * @param id the id of the container
  * @param addr the ip of the container
  */
-class DockerContainer(protected val id: ContainerId,
+class DockerContainer(protected var _name: String,
+                      protected val id: ContainerId,
                       protected val addr: ContainerAddress,
                       protected val useRunc: Boolean)(implicit docker: DockerApiWithFileAccess,
                                                       runc: RuncApi,
@@ -181,6 +183,15 @@ class DockerContainer(protected val id: ContainerId,
     docker.rm(id)
   }
 
+  def rename(newName: String)(implicit transid: TransactionId): Future[Unit] = {
+    docker.rename(id, newName).andThen {
+      case Success(_) =>
+        _name = newName
+      case Failure(e) =>
+            logging.error(this, s"Failed to rename container: $e")
+    }
+  }
+
   /**
    * Was the container killed due to memory exhaustion?
    *
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
index c6e5233062..5f08020615 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -224,7 +224,7 @@ class KubernetesClient(
     // By convention, kubernetes adds a docker:// prefix when using docker as the low-level container engine
     val nativeContainerId = pod.getStatus.getContainerStatuses.get(0).getContainerID.stripPrefix("docker://")
     implicit val kubernetes = this
-    new KubernetesContainer(id, addr, workerIP, nativeContainerId)
+    new KubernetesContainer(id.asString, id, addr, workerIP, nativeContainerId)
   }
 }
 
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
index 5ccec760f9..7898c86921 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -86,7 +86,8 @@ object KubernetesContainer {
  * @param workerIP the ip of the workernode on which the container is executing
  * @param nativeContainerId the docker/containerd lowlevel id for the container
  */
-class KubernetesContainer(protected[core] val id: ContainerId,
+class KubernetesContainer(protected[core] var _name: String,
+                          protected[core] val id: ContainerId,
                           protected[core] val addr: ContainerAddress,
                           protected[core] val workerIP: String,
                           protected[core] val nativeContainerId: String)(implicit kubernetes: KubernetesApi,
@@ -138,4 +139,9 @@ class KubernetesContainer(protected[core] val id: ContainerId,
       .takeWithin(waitForLogs)
       .map { _.toByteString }
   }
+
+  /** Rename container. */
+  override def rename(name: String)(implicit transid: TransactionId): Future[Unit] =
+    // rename currently not supported
+    Future.successful(Unit)
 }
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
index 6730e8caf5..2918c0d0a3 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -106,7 +106,7 @@ class DockerContainerTests
       Future.successful(RunResult(intervalOf(1.millisecond), Right(ContainerResponse(true, "", None)))),
     awaitLogs: FiniteDuration = 2.seconds)(implicit docker: DockerApiWithFileAccess, runc: RuncApi): DockerContainer = {
 
-    new DockerContainer(id, addr, true) {
+    new DockerContainer("test", id, addr, true) {
       override protected def callContainer(
         path: String,
         body: JsObject,
@@ -347,7 +347,7 @@ class DockerContainerTests
     implicit val runc = new TestRuncClient
 
     val id = ContainerId("id")
-    val container = new DockerContainer(id, ContainerAddress("ip"), true)
+    val container = new DockerContainer("test", id, ContainerAddress("ip"), true)
 
     val suspend = container.suspend()
     val resume = container.resume()
@@ -367,7 +367,7 @@ class DockerContainerTests
     implicit val runc = new TestRuncClient
 
     val id = ContainerId("id")
-    val container = new DockerContainer(id, ContainerAddress("ip"), false)
+    val container = new DockerContainer("test", id, ContainerAddress("ip"), false)
 
     val suspend = container.suspend()
     val resume = container.resume()
@@ -387,7 +387,7 @@ class DockerContainerTests
     implicit val runc = stub[RuncApi]
 
     val id = ContainerId("id")
-    val container = new DockerContainer(id, ContainerAddress("ip"), true)
+    val container = new DockerContainer("test", id, ContainerAddress("ip"), true)
 
     container.destroy()
 
@@ -849,5 +849,7 @@ class DockerContainerTests
       rawContainerLogsInvocations += ((containerId, fromPos, pollInterval))
       Source.single(ByteString.empty)
     }
+
+    def rename(id: ContainerId, name: String)(implicit transid: TransactionId): Future[Unit] = Future.successful(())
   }
 }
diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
index 3336e831b1..d13a3f6093 100644
--- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
@@ -87,7 +87,7 @@ class KubernetesClientTests
   }
 
   def kubernetesContainer(id: ContainerId) =
-    new KubernetesContainer(id, ContainerAddress("ip"), "ip", "docker://" + id.asString)(kubernetesClient {
+    new KubernetesContainer("test", id, ContainerAddress("ip"), "ip", "docker://" + id.asString)(kubernetesClient {
       Future.successful("")
     }, actorSystem, global, logging)
 
@@ -117,7 +117,7 @@ class KubernetesClientTests
   it should "forward suspend commands to the client" in {
     implicit val kubernetes = new TestKubernetesClient
     val id = ContainerId("id")
-    val container = new KubernetesContainer(id, ContainerAddress("ip"), "127.0.0.1", "docker://foo")
+    val container = new KubernetesContainer("test", id, ContainerAddress("ip"), "127.0.0.1", "docker://foo")
     await(container.suspend())
     kubernetes.suspends should have size 1
     kubernetes.suspends(0) shouldBe id
@@ -126,7 +126,7 @@ class KubernetesClientTests
   it should "forward resume commands to the client" in {
     implicit val kubernetes = new TestKubernetesClient
     val id = ContainerId("id")
-    val container = new KubernetesContainer(id, ContainerAddress("ip"), "127.0.0.1", "docker://foo")
+    val container = new KubernetesContainer("test", id, ContainerAddress("ip"), "127.0.0.1", "docker://foo")
     await(container.resume())
     kubernetes.resumes should have size 1
     kubernetes.resumes(0) shouldBe id
@@ -204,7 +204,7 @@ object KubernetesClientTests {
       val addr: ContainerAddress = ContainerAddress("ip")
       val workerIP: String = "127.0.0.1"
       val nativeContainerId: String = "docker://" + containerId.asString
-      Future.successful(new KubernetesContainer(containerId, addr, workerIP, nativeContainerId))
+      Future.successful(new KubernetesContainer("test", containerId, addr, workerIP, nativeContainerId))
     }
 
     def rm(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = {
diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index 227d3c9089..2cbfa87c37 100644
--- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -102,7 +102,7 @@ class KubernetesContainerTests
       Future.successful(RunResult(intervalOf(1.millisecond), Right(ContainerResponse(true, "", None)))),
     awaitLogs: FiniteDuration = 2.seconds)(implicit kubernetes: KubernetesApi): KubernetesContainer = {
 
-    new KubernetesContainer(id, addr, addr.host, "docker://" + id.asString) {
+    new KubernetesContainer("test", id, addr, addr.host, "docker://" + id.asString) {
       override protected def callContainer(
         path: String,
         body: JsObject,
@@ -192,7 +192,7 @@ class KubernetesContainerTests
     implicit val kubernetes = stub[KubernetesApi]
 
     val id = ContainerId("id")
-    val container = new KubernetesContainer(id, ContainerAddress("ip"), "127.0.0.1", "docker://foo")
+    val container = new KubernetesContainer("test", id, ContainerAddress("ip"), "127.0.0.1", "docker://foo")
 
     container.destroy()
 
diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
index 287c20b61a..6fb0754c8a 100644
--- a/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
@@ -98,6 +98,7 @@ class DockerToActivationLogStoreTests extends FlatSpec with Matchers with WskAct
   }
 
   class TestContainer(lines: Source[ByteString, Any],
+                      var _name: String = "test",
                       val id: ContainerId = ContainerId("test"),
                       val addr: ContainerAddress = ContainerAddress("test", 1234))(implicit val ec: ExecutionContext,
                                                                                    val logging: Logging)
@@ -108,5 +109,7 @@ class DockerToActivationLogStoreTests extends FlatSpec with Matchers with WskAct
     def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId) = lines
 
     override implicit protected val as: ActorSystem = actorSystem
+
+    def rename(name: String)(implicit transid: TransactionId): Future[Unit] = ???
   }
 }
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 920d5358b6..d2d9497aa8 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -908,6 +908,7 @@ class ContainerProxyTests
    * Implements all the good cases of a perfect run to facilitate error case overriding.
    */
   class TestContainer extends Container {
+    protected var _name: String = "test"
     protected val id = ContainerId("testcontainer")
     protected val addr = ContainerAddress("0.0.0.0")
     protected implicit val logging: Logging = log
@@ -957,5 +958,8 @@ class ContainerProxyTests
       Future.successful((runInterval, ActivationResponse.success()))
     }
     def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Source[ByteString, Any] = ???
+    override def rename(name: String)(implicit transid: TransactionId): Future[Unit] =
+      // rename currently not supported
+      Future.successful(Unit)
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services