You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2018/03/01 11:59:39 UTC
[incubator-openwhisk] branch master updated: Implement
suspend/resume for KubernetesContainer through an InvokerAgent. (#3338)
This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 3bc6d1f Implement suspend/resume for KubernetesContainer through an InvokerAgent. (#3338)
3bc6d1f is described below
commit 3bc6d1fa99360f6afcdb05f22a6b531f43b0c4c5
Author: David Grove <dg...@users.noreply.github.com>
AuthorDate: Thu Mar 1 06:59:36 2018 -0500
Implement suspend/resume for KubernetesContainer through an InvokerAgent. (#3338)
---
core/invoker/src/main/resources/application.conf | 19 ++-
.../kubernetes/KubernetesClient.scala | 182 +++++++++++++++++----
.../kubernetes/KubernetesContainer.scala | 58 +++----
.../kubernetes/KubernetesContainerFactory.scala | 2 +-
.../kubernetes/test/KubernetesClientTests.scala | 165 ++++++++-----------
.../kubernetes/test/KubernetesContainerTests.scala | 98 ++++-------
6 files changed, 288 insertions(+), 236 deletions(-)
diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index 37d2f5f..c63e1b7 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -14,12 +14,19 @@ whisk {
unpause: 10 seconds
}
- # Timeouts for k8s commands. Set to "Inf" to disable timeout.
- kubernetes.timeouts {
- run: 1 minute
- rm: 1 minute
- inspect: 1 minute
- logs: 1 minute
+ kubernetes {
+ namespace: openwhisk
+ # Timeouts for k8s commands. Set to "Inf" to disable timeout.
+ timeouts {
+ run: 1 minute
+ rm: 1 minute
+ inspect: 1 minute
+ logs: 1 minute
+ }
+ invoker-agent {
+ enabled: false
+ port: 3233
+ }
}
# Timeouts for runc commands. Set to "Inf" to disable timeout.
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
index 39155d1..5f7c7d6 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -26,13 +26,16 @@ import java.time.format.DateTimeFormatterBuilder
import akka.actor.ActorSystem
import akka.event.Logging.{ErrorLevel, InfoLevel}
-import akka.http.scaladsl.model.Uri
+import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri}
import akka.http.scaladsl.model.Uri.Path
import akka.http.scaladsl.model.Uri.Query
import akka.stream.{Attributes, Outlet, SourceShape}
+import akka.http.scaladsl.Http
+import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import akka.stream.stage._
import akka.util.ByteString
+import io.fabric8.kubernetes.api.model._
import pureconfig.loadConfigOrThrow
import whisk.common.Logging
import whisk.common.LoggingMarkers
@@ -41,17 +44,19 @@ import whisk.core.ConfigKeys
import whisk.core.containerpool.ContainerId
import whisk.core.containerpool.ContainerAddress
import whisk.core.containerpool.docker.ProcessRunner
+import whisk.core.entity.ByteSize
+import whisk.core.entity.size._
import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.blocking
-import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import spray.json._
import spray.json.DefaultJsonProtocol._
+import collection.JavaConverters._
import io.fabric8.kubernetes.client.ConfigBuilder
import io.fabric8.kubernetes.client.DefaultKubernetesClient
import okhttp3.{Call, Callback, Request, Response}
@@ -67,6 +72,18 @@ import scala.util.control.NonFatal
case class KubernetesClientTimeoutConfig(run: Duration, rm: Duration, inspect: Duration, logs: Duration)
/**
+ * Configuration for kubernetes invoker-agent
+ */
+case class KubernetesInvokerAgentConfig(enabled: Boolean, port: Int)
+
+/**
+ * General configuration for kubernetes client
+ */
+case class KubernetesClientConfig(namespace: String,
+ timeouts: KubernetesClientTimeoutConfig,
+ invokerAgent: KubernetesInvokerAgentConfig)
+
+/**
* Serves as interface to the kubectl CLI tool.
*
* Be cautious with the ExecutionContext passed to this, as the
@@ -75,15 +92,16 @@ case class KubernetesClientTimeoutConfig(run: Duration, rm: Duration, inspect: D
* You only need one instance (and you shouldn't get more).
*/
class KubernetesClient(
- timeouts: KubernetesClientTimeoutConfig = loadConfigOrThrow[KubernetesClientTimeoutConfig](
- ConfigKeys.kubernetesTimeouts))(executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem)
+ config: KubernetesClientConfig = loadConfigOrThrow[KubernetesClientConfig](ConfigKeys.kubernetes))(
+ executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem)
extends KubernetesApi
with ProcessRunner {
implicit private val ec = executionContext
+ implicit private val am = ActorMaterializer()
implicit private val kubeRestClient = new DefaultKubernetesClient(
new ConfigBuilder()
- .withConnectionTimeout(timeouts.logs.toMillis.toInt)
- .withRequestTimeout(timeouts.logs.toMillis.toInt)
+ .withConnectionTimeout(config.timeouts.logs.toMillis.toInt)
+ .withRequestTimeout(config.timeouts.logs.toMillis.toInt)
.build())
// Determines how to run kubectl. Failure to find a kubectl binary implies
@@ -99,43 +117,144 @@ class KubernetesClient(
}
protected val kubectlCmd = Seq(findKubectlCmd)
- def run(name: String, image: String, args: Seq[String] = Seq.empty[String])(
- implicit transid: TransactionId): Future[ContainerId] = {
- runCmd(Seq("run", name, s"--image=$image") ++ args, timeouts.run)
- .map(_ => ContainerId(name))
- }
+ def run(name: String,
+ image: String,
+ memory: ByteSize = 256.MB,
+ environment: Map[String, String] = Map.empty,
+ labels: Map[String, String] = Map.empty)(implicit transid: TransactionId): Future[KubernetesContainer] = {
+
+ val envVars = environment.map {
+ case (key, value) => new EnvVarBuilder().withName(key).withValue(value).build()
+ }.toSeq
+
+ val pod = new PodBuilder()
+ .withNewMetadata()
+ .withName(name)
+ .addToLabels("name", name)
+ .addToLabels(labels.asJava)
+ .endMetadata()
+ .withNewSpec()
+ .withRestartPolicy("Always")
+ .addNewContainer()
+ .withNewResources()
+ .withLimits(Map("memory" -> new Quantity(memory.toMB + "Mi")).asJava)
+ .endResources()
+ .withName("user-action")
+ .withImage(image)
+ .withEnv(envVars.asJava)
+ .addNewPort()
+ .withContainerPort(8080)
+ .withName("action")
+ .endPort()
+ .endContainer()
+ .endSpec()
+ .build()
+
+ kubeRestClient.pods.inNamespace(config.namespace).create(pod)
- def inspectIPAddress(id: ContainerId)(implicit transid: TransactionId): Future[ContainerAddress] = {
Future {
blocking {
- val pod =
- kubeRestClient.pods().withName(id.asString).waitUntilReady(timeouts.inspect.length, timeouts.inspect.unit)
- ContainerAddress(pod.getStatus().getPodIP())
+ val createdPod = kubeRestClient.pods
+ .inNamespace(config.namespace)
+ .withName(name)
+ .waitUntilReady(config.timeouts.run.length, config.timeouts.run.unit)
+ toContainer(createdPod)
}
}.recoverWith {
case e =>
- log.error(this, s"Failed to get IP of Pod '${id.asString}' within timeout: ${e.getClass} - ${e.getMessage}")
- Future.failed(new Exception(s"Failed to get IP of Pod '${id.asString}'"))
+ log.error(this, s"Failed create pod for '$name': ${e.getClass} - ${e.getMessage}")
+ Future.failed(new Exception(s"Failed to create pod '$name'"))
+ }
+ }
+
+ def rm(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = {
+ runCmd(Seq("delete", "--now", "pod", container.id.asString), config.timeouts.rm).map(_ => ())
+ }
+
+ def rm(key: String, value: String, ensureUnpaused: Boolean = false)(implicit transid: TransactionId): Future[Unit] = {
+ if (ensureUnpaused && config.invokerAgent.enabled) {
+ // The caller can't guarantee that every container with the label key=value is already unpaused.
+ // Therefore we must enumerate them and ensure they are unpaused before we attempt to delete them.
+ Future {
+ blocking {
+ kubeRestClient
+ .inNamespace(config.namespace)
+ .pods()
+ .withLabel(key, value)
+ .list()
+ .getItems
+ .asScala
+ .map { pod =>
+ val container = toContainer(pod)
+ container
+ .resume()
+ .recover { case _ => () } // Ignore errors; it is possible the container was not actually suspended.
+ .map(_ => rm(container))
+ }
+ }
+ }.flatMap(futures =>
+ Future
+ .sequence(futures)
+ .map(_ => ()))
+ } else {
+ runCmd(Seq("delete", "--now", "pod", "-l", s"$key=$value"), config.timeouts.rm).map(_ => ())
}
}
- def rm(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
- runCmd(Seq("delete", "--now", "pod", id.asString), timeouts.rm).map(_ => ())
+ def suspend(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = {
+ if (config.invokerAgent.enabled) {
+ agentCommand("suspend", container)
+ .map { response =>
+ response.discardEntityBytes()
+ }
+ } else {
+ Future.successful({})
+ }
+ }
- def rm(key: String, value: String)(implicit transid: TransactionId): Future[Unit] =
- runCmd(Seq("delete", "--now", "pod", "-l", s"$key=$value"), timeouts.rm).map(_ => ())
+ def resume(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = {
+ if (config.invokerAgent.enabled) {
+ agentCommand("resume", container)
+ .map { response =>
+ response.discardEntityBytes()
+ }
+ } else {
+ Future.successful({})
+ }
+ }
- def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean = false)(
+ def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean = false)(
implicit transid: TransactionId): Source[TypedLogLine, Any] = {
log.debug(this, "Parsing logs from Kubernetes Graph Stageā¦")
Source
- .fromGraph(new KubernetesRestLogSourceStage(id, sinceTime, waitForSentinel))
+ .fromGraph(new KubernetesRestLogSourceStage(container.id, sinceTime, waitForSentinel))
.log("foobar")
}
+ private def toContainer(pod: Pod): KubernetesContainer = {
+ val id = ContainerId(pod.getMetadata.getName)
+ val addr = ContainerAddress(pod.getStatus.getPodIP)
+ val workerIP = pod.getStatus.getHostIP
+ // Extract the native (docker or containerd) containerId for the container
+ // By convention, kubernetes adds a docker:// prefix when using docker as the low-level container engine
+ val nativeContainerId = pod.getStatus.getContainerStatuses.get(0).getContainerID.stripPrefix("docker://")
+ implicit val kubernetes = this
+ new KubernetesContainer(id, addr, workerIP, nativeContainerId)
+ }
+
+ // Forward a command to invoker-agent daemonset instance on container's worker node
+ private def agentCommand(command: String, container: KubernetesContainer): Future[HttpResponse] = {
+ val uri = Uri()
+ .withScheme("http")
+ .withHost(container.workerIP)
+ .withPort(config.invokerAgent.port)
+ .withPath(Path / command / container.nativeContainerId)
+ Http().singleRequest(HttpRequest(uri = uri))
+ }
+
private def runCmd(args: Seq[String], timeout: Duration)(implicit transid: TransactionId): Future[String] = {
val cmd = kubectlCmd ++ args
val start = transid.started(
@@ -171,16 +290,21 @@ object KubernetesClient {
}
trait KubernetesApi {
- def run(name: String, image: String, args: Seq[String] = Seq.empty[String])(
- implicit transid: TransactionId): Future[ContainerId]
+ def run(name: String,
+ image: String,
+ memory: ByteSize,
+ environment: Map[String, String] = Map.empty,
+ labels: Map[String, String] = Map.empty)(implicit transid: TransactionId): Future[KubernetesContainer]
+
+ def rm(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit]
- def inspectIPAddress(id: ContainerId)(implicit transid: TransactionId): Future[ContainerAddress]
+ def rm(key: String, value: String, ensureUnpaused: Boolean)(implicit transid: TransactionId): Future[Unit]
- def rm(id: ContainerId)(implicit transid: TransactionId): Future[Unit]
+ def suspend(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit]
- def rm(key: String, value: String)(implicit transid: TransactionId): Future[Unit]
+ def resume(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit]
- def logs(containerId: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean = false)(
+ def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean = false)(
implicit transid: TransactionId): Source[TypedLogLine, Any]
}
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
index 4c4ccdc..f1d03d1 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -50,46 +50,26 @@ object KubernetesContainer {
* or is an OpenWhisk provided image
* @param labels labels to set on the container
* @param name optional name for the container
- * @return a Future which either completes with a KubernetesContainer or one of two specific failures
+ * @return a Future which either completes with a KubernetesContainer or a failure to create a container
*/
def create(transid: TransactionId,
name: String,
image: String,
userProvidedImage: Boolean = false,
memory: ByteSize = 256.MB,
- environment: Map[String, String] = Map(),
- labels: Map[String, String] = Map())(implicit kubernetes: KubernetesApi,
- ec: ExecutionContext,
- log: Logging): Future[KubernetesContainer] = {
+ environment: Map[String, String] = Map.empty,
+ labels: Map[String, String] = Map.empty)(implicit kubernetes: KubernetesApi,
+ ec: ExecutionContext,
+ log: Logging): Future[KubernetesContainer] = {
implicit val tid = transid
val podName = name.replace("_", "-").replaceAll("[()]", "").toLowerCase()
- val environmentArgs = environment.flatMap {
- case (key, value) => Seq("--env", s"$key=$value")
- }.toSeq
-
- val labelArgs = labels.map {
- case (key, value) => s"$key=$value"
- } match {
- case Seq() => Seq()
- case pairs => Seq("-l") ++ pairs
- }
-
- val args = Seq("--generator", "run-pod/v1", "--restart", "Always", "--limits", s"memory=${memory.toMB}Mi") ++ environmentArgs ++ labelArgs
-
for {
- id <- kubernetes.run(podName, image, args).recoverWith {
- case _ => Future.failed(WhiskContainerStartupError(Messages.resourceProvisionError))
- }
- ip <- kubernetes.inspectIPAddress(id).recoverWith {
- // remove the container immediately if inspect failed as
- // we cannot recover that case automatically
- case _ =>
- kubernetes.rm(id)
- Future.failed(WhiskContainerStartupError(Messages.resourceProvisionError))
+ container <- kubernetes.run(podName, image, memory, environment, labels).recoverWith {
+ case _ => Future.failed(WhiskContainerStartupError(s"Failed to run container with image '${image}'."))
}
- } yield new KubernetesContainer(id, ip)
+ } yield container
}
}
@@ -103,11 +83,15 @@ object KubernetesContainer {
* @constructor
* @param id the id of the container
* @param addr the ip & port of the container
+ * @param workerIP the ip of the workernode on which the container is executing
+ * @param nativeContainerId the docker/containerd lowlevel id for the container
*/
-class KubernetesContainer(protected val id: ContainerId, protected val addr: ContainerAddress)(
- implicit kubernetes: KubernetesApi,
- protected val ec: ExecutionContext,
- protected val logging: Logging)
+class KubernetesContainer(protected[core] val id: ContainerId,
+ protected[core] val addr: ContainerAddress,
+ protected[core] val workerIP: String,
+ protected[core] val nativeContainerId: String)(implicit kubernetes: KubernetesApi,
+ protected val ec: ExecutionContext,
+ protected val logging: Logging)
extends Container {
/** The last read timestamp in the log file */
@@ -115,15 +99,13 @@ class KubernetesContainer(protected val id: ContainerId, protected val addr: Con
protected val waitForLogs: FiniteDuration = 2.seconds
- // no-op under Kubernetes
- def suspend()(implicit transid: TransactionId): Future[Unit] = Future.successful({})
+ def suspend()(implicit transid: TransactionId): Future[Unit] = kubernetes.suspend(this)
- // no-op under Kubernetes
- def resume()(implicit transid: TransactionId): Future[Unit] = Future.successful({})
+ def resume()(implicit transid: TransactionId): Future[Unit] = kubernetes.resume(this)
override def destroy()(implicit transid: TransactionId): Future[Unit] = {
super.destroy()
- kubernetes.rm(id)
+ kubernetes.rm(this)
}
private val stringSentinel = DockerContainer.ActivationSentinel.utf8String
@@ -131,7 +113,7 @@ class KubernetesContainer(protected val id: ContainerId, protected val addr: Con
def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Source[ByteString, Any] = {
kubernetes
- .logs(id, lastTimestamp.get, waitForSentinel)
+ .logs(this, lastTimestamp.get, waitForSentinel)
.limitWeighted(limit.toBytes) { obj =>
// Adding + 1 since we know there's a newline byte being read
obj.jsonSize.toLong + 1
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
index 219c8a3..b06ec5c 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
@@ -46,7 +46,7 @@ class KubernetesContainerFactory(label: String, config: WhiskConfig)(implicit ac
override def cleanup() = {
logging.info(this, "Cleaning up function runtimes")
- val cleaning = kubernetes.rm("invoker", label)(TransactionId.invokerNanny)
+ val cleaning = kubernetes.rm("invoker", label, true)(TransactionId.invokerNanny)
Await.ready(cleaning, 30.seconds)
}
diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
index 0b14042..0eb4f95 100644
--- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
@@ -37,12 +37,17 @@ import org.scalatest.Matchers
import org.scalatest.time.{Seconds, Span}
import common.{StreamLogging, WskActorSystem}
import okio.Buffer
-import whisk.common.LogMarker
-import whisk.common.LoggingMarkers.INVOKER_KUBECTL_CMD
import whisk.common.TransactionId
import whisk.core.containerpool.{ContainerAddress, ContainerId}
-import whisk.core.containerpool.kubernetes.{KubernetesApi, KubernetesClient, KubernetesRestLogSourceStage, TypedLogLine}
-import whisk.core.containerpool.docker.ProcessRunningException
+import whisk.core.containerpool.kubernetes.{
+ KubernetesApi,
+ KubernetesClient,
+ KubernetesContainer,
+ KubernetesRestLogSourceStage,
+ TypedLogLine
+}
+import whisk.core.entity.ByteSize
+import whisk.core.entity.size._
import scala.collection.mutable
import scala.collection.immutable
@@ -70,6 +75,7 @@ class KubernetesClientTests
implicit val transid = TransactionId.testing
val id = ContainerId("55db56ee082239428b27d3728b4dd324c09068458aad9825727d5bfc1bba6d52")
+ val container = kubernetesContainer(id)
val commandTimeout = 500.milliseconds
def await[A](f: Future[A], timeout: FiniteDuration = commandTimeout) = Await.result(f, timeout)
@@ -83,73 +89,12 @@ class KubernetesClientTests
fixture
}
- behavior of "KubernetesClient"
-
- it should "write proper log markers on a successful command" in {
- // a dummy string works here as we do not assert any output
- // from the methods below
- val stdout = "stdout"
- val client = kubernetesClient { Future.successful(stdout) }
-
- /** Awaits the command and checks for proper logging. */
- def runAndVerify(f: Future[_], cmd: String, args: Seq[String]) = {
- val result = await(f)
-
- logLines.head should include((Seq(kubectlCommand, cmd) ++ args).mkString(" "))
-
- val start = LogMarker.parse(logLines.head)
- start.token shouldBe INVOKER_KUBECTL_CMD(cmd)
-
- val end = LogMarker.parse(logLines.last)
- end.token shouldBe INVOKER_KUBECTL_CMD(cmd).asFinish
-
- stream.reset()
- result
- }
-
- runAndVerify(client.rm(id), "delete", Seq("--now", "pod", id.asString))
+ def kubernetesContainer(id: ContainerId) =
+ new KubernetesContainer(id, ContainerAddress("ip"), "ip", "docker://" + id.asString)(kubernetesClient {
+ Future.successful("")
+ }, global, logging)
- val image = "image"
- val name = "name"
- val expected = Seq(name, s"--image=$image")
- runAndVerify(client.run(name, image), "run", expected) shouldBe ContainerId(name)
- }
-
- it should "write proper log markers on a failing command" in {
- val client = kubernetesClient { Future.failed(new RuntimeException()) }
-
- /** Awaits the command, asserts the exception and checks for proper logging. */
- def runAndVerify(f: Future[_], cmd: String) = {
- a[RuntimeException] should be thrownBy await(f)
-
- val start = LogMarker.parse(logLines.head)
- start.token shouldBe INVOKER_KUBECTL_CMD(cmd)
-
- val end = LogMarker.parse(logLines.last)
- end.token shouldBe INVOKER_KUBECTL_CMD(cmd).asError
-
- stream.reset()
- }
-
- runAndVerify(client.rm(id), "delete")
- runAndVerify(client.run("name", "image"), "run")
- }
-
- it should "fail with ProcessRunningException when run returns with exit code !=125 or no container ID" in {
- def runAndVerify(pre: ProcessRunningException, clue: String) = {
- val client = kubernetesClient { Future.failed(pre) }
- withClue(s"${clue} - exitCode = ${pre.exitCode}, stdout = '${pre.stdout}', stderr = '${pre.stderr}': ") {
- the[ProcessRunningException] thrownBy await(client.run("name", "image")) shouldBe pre
- }
- }
-
- Seq[(ProcessRunningException, String)](
- (ProcessRunningException(126, id.asString, "Unknown command"), "Exit code not 125"),
- (ProcessRunningException(125, "", "Unknown flag: --foo"), "No container ID"),
- (ProcessRunningException(1, "", ""), "Exit code not 125 and no container ID")).foreach {
- case (pre, clue) => runAndVerify(pre, clue)
- }
- }
+ behavior of "KubernetesClient"
val firstLog = """2018-02-06T00:00:18.419889342Z first activation
|2018-02-06T00:00:18.419929471Z XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
@@ -172,14 +117,32 @@ class KubernetesClientTests
.readLines(new Buffer().writeUtf8(secondLog), lastTimestamp, List.empty)
.to[immutable.Seq])
+ it should "forward suspend commands to the client" in {
+ implicit val kubernetes = new TestKubernetesClient
+ val id = ContainerId("id")
+ val container = new KubernetesContainer(id, ContainerAddress("ip"), "127.0.0.1", "docker://foo")
+ container.suspend()
+ kubernetes.suspends should have size 1
+ kubernetes.suspends(0) shouldBe id
+ }
+
+ it should "forward resume commands to the client" in {
+ implicit val kubernetes = new TestKubernetesClient
+ val id = ContainerId("id")
+ val container = new KubernetesContainer(id, ContainerAddress("ip"), "127.0.0.1", "docker://foo")
+ container.resume()
+ kubernetes.resumes should have size 1
+ kubernetes.resumes(0) shouldBe id
+ }
+
it should "return all logs when no sinceTime passed" in {
val client = new TestKubernetesClient {
- override def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+ override def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean)(
implicit transid: TransactionId): Source[TypedLogLine, Any] = {
firstSource()
}
}
- val logs = awaitLogs(client.logs(id, None))
+ val logs = awaitLogs(client.logs(container, None))
logs should have size 3
logs(0) shouldBe TypedLogLine("2018-02-06T00:00:18.419889342Z", "stdout", "first activation")
logs(2) shouldBe TypedLogLine("2018-02-06T00:00:18.419988733Z", "stdout", "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
@@ -189,12 +152,12 @@ class KubernetesClientTests
val testDate: Option[Instant] = "2018-02-06T00:00:18.419988733Z"
val client = new TestKubernetesClient {
- override def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+ override def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean)(
implicit transid: TransactionId): Source[TypedLogLine, Any] = {
Source.combine(firstSource(testDate), secondSource(testDate))(Concat(_))
}
}
- val logs = awaitLogs(client.logs(id, testDate))
+ val logs = awaitLogs(client.logs(container, testDate))
logs should have size 3
logs(0) shouldBe TypedLogLine("2018-02-06T00:09:35.38267193Z", "stdout", "second activation")
logs(2) shouldBe TypedLogLine("2018-02-06T00:09:35.383116503Z", "stdout", "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
@@ -203,12 +166,12 @@ class KubernetesClientTests
it should "return all logs if none match sinceTime" in {
val testDate: Option[Instant] = "2018-02-06T00:00:18.419988733Z"
val client = new TestKubernetesClient {
- override def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+ override def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean)(
implicit transid: TransactionId): Source[TypedLogLine, Any] = {
secondSource(testDate)
}
}
- val logs = awaitLogs(client.logs(id, testDate))
+ val logs = awaitLogs(client.logs(container, testDate))
logs should have size 3
logs(0) shouldBe TypedLogLine("2018-02-06T00:09:35.38267193Z", "stdout", "second activation")
logs(2) shouldBe TypedLogLine("2018-02-06T00:09:35.383116503Z", "stdout", "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
@@ -225,36 +188,52 @@ object KubernetesClientTests {
implicit def strToInstant(str: String): Instant =
strToDate(str).get
- class TestKubernetesClient extends KubernetesApi {
- var runs = mutable.Buffer.empty[(String, String, Seq[String])]
- var inspects = mutable.Buffer.empty[ContainerId]
+ class TestKubernetesClient extends KubernetesApi with StreamLogging {
+ var runs = mutable.Buffer.empty[(String, String, Map[String, String], Map[String, String])]
var rms = mutable.Buffer.empty[ContainerId]
var rmByLabels = mutable.Buffer.empty[(String, String)]
+ var resumes = mutable.Buffer.empty[ContainerId]
+ var suspends = mutable.Buffer.empty[ContainerId]
var logCalls = mutable.Buffer.empty[(ContainerId, Option[Instant])]
- def run(name: String, image: String, args: Seq[String] = Seq.empty[String])(
- implicit transid: TransactionId): Future[ContainerId] = {
- runs += ((name, image, args))
- Future.successful(ContainerId("testId"))
+ def run(name: String,
+ image: String,
+ memory: ByteSize = 256.MB,
+ env: Map[String, String] = Map.empty,
+ labels: Map[String, String] = Map.empty)(implicit transid: TransactionId): Future[KubernetesContainer] = {
+ runs += ((name, image, env, labels))
+ implicit val kubernetes = this
+ val containerId = ContainerId("id")
+ val addr: ContainerAddress = ContainerAddress("ip")
+ val workerIP: String = "127.0.0.1"
+ val nativeContainerId: String = "docker://" + containerId.asString
+ Future.successful(new KubernetesContainer(containerId, addr, workerIP, nativeContainerId))
}
- def inspectIPAddress(id: ContainerId)(implicit transid: TransactionId): Future[ContainerAddress] = {
- inspects += id
- Future.successful(ContainerAddress("testIp"))
- }
-
- def rm(id: ContainerId)(implicit transid: TransactionId): Future[Unit] = {
- rms += id
+ def rm(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = {
+ rms += container.id
Future.successful(())
}
- def rm(key: String, value: String)(implicit transid: TransactionId): Future[Unit] = {
+ def rm(key: String, value: String, ensureUnpause: Boolean = false)(
+ implicit transid: TransactionId): Future[Unit] = {
rmByLabels += ((key, value))
Future.successful(())
}
- def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean = false)(
+
+ def resume(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = {
+ resumes += (container.id)
+ Future.successful({})
+ }
+
+ def suspend(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = {
+ suspends += (container.id)
+ Future.successful({})
+ }
+
+ def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean = false)(
implicit transid: TransactionId): Source[TypedLogLine, Any] = {
- logCalls += ((id, sinceTime))
+ logCalls += ((container.id, sinceTime))
Source(List.empty[TypedLogLine])
}
}
diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index 80dc0a3..3415b68 100644
--- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -43,7 +43,8 @@ import whisk.common.TransactionId
import whisk.core.containerpool._
import whisk.core.containerpool.kubernetes._
import whisk.core.containerpool.docker._
-import whisk.core.entity.ActivationResponse
+import whisk.core.entity.{ActivationResponse, ByteSize}
+import whisk.core.entity.size._
import whisk.core.entity.ActivationResponse.ContainerResponse
import whisk.core.entity.ActivationResponse.Timeout
import whisk.core.entity.size._
@@ -98,7 +99,7 @@ class KubernetesContainerTests
Future.successful(RunResult(intervalOf(1.millisecond), Right(ContainerResponse(true, "", None)))),
awaitLogs: FiniteDuration = 2.seconds)(implicit kubernetes: KubernetesApi): KubernetesContainer = {
- new KubernetesContainer(id, addr) {
+ new KubernetesContainer(id, addr, addr.host, "docker://" + id.asString) {
override protected def callContainer(
path: String,
body: JsObject,
@@ -140,23 +141,13 @@ class KubernetesContainerTests
await(container)
kubernetes.runs should have size 1
- kubernetes.inspects should have size 1
kubernetes.rms should have size 0
- val (testName, testImage, testArgs) = kubernetes.runs.head
+ val (testName, testImage, testEnv, testLabel) = kubernetes.runs.head
testName shouldBe "my-container1"
testImage shouldBe image
- testArgs shouldBe Seq(
- "--generator",
- "run-pod/v1",
- "--restart",
- "Always",
- "--limits",
- "memory=256Mi",
- "--env",
- "test=hi",
- "-l",
- "invoker=0")
+ testEnv shouldBe environment
+ testLabel shouldBe labels
}
it should "pull a user provided image before creating the container" in {
@@ -167,31 +158,18 @@ class KubernetesContainerTests
await(container)
kubernetes.runs should have size 1
- kubernetes.inspects should have size 1
kubernetes.rms should have size 0
}
- it should "remove the container if inspect fails" in {
- implicit val kubernetes = new TestKubernetesClient {
- override def inspectIPAddress(id: ContainerId)(implicit transid: TransactionId): Future[ContainerAddress] = {
- inspects += id
- Future.failed(new RuntimeException())
- }
- }
-
- val container = KubernetesContainer.create(transid = transid, name = "name", image = "image")
- a[WhiskContainerStartupError] should be thrownBy await(container)
-
- kubernetes.runs should have size 1
- kubernetes.inspects should have size 1
- kubernetes.rms should have size 1
- }
-
it should "provide a proper error if run fails for blackbox containers" in {
implicit val kubernetes = new TestKubernetesClient {
- override def run(name: String, image: String, args: Seq[String])(
- implicit transid: TransactionId): Future[ContainerId] = {
- runs += ((name, image, args))
+ override def run(
+ name: String,
+ image: String,
+ memory: ByteSize = 256.MB,
+ env: Map[String, String] = Map.empty,
+ labels: Map[String, String] = Map.empty)(implicit transid: TransactionId): Future[KubernetesContainer] = {
+ runs += ((name, image, env, labels))
Future.failed(ProcessRunningException(1, "", ""))
}
}
@@ -201,27 +179,9 @@ class KubernetesContainerTests
a[WhiskContainerStartupError] should be thrownBy await(container)
kubernetes.runs should have size 1
- kubernetes.inspects should have size 0
kubernetes.rms should have size 0
}
- it should "provide a proper error if inspect fails for blackbox containers" in {
- implicit val kubernetes = new TestKubernetesClient {
- override def inspectIPAddress(id: ContainerId)(implicit transid: TransactionId): Future[ContainerAddress] = {
- inspects += id
- Future.failed(new RuntimeException())
- }
- }
-
- val container =
- KubernetesContainer.create(transid = transid, name = "name", image = "image", userProvidedImage = true)
- a[WhiskContainerStartupError] should be thrownBy await(container)
-
- kubernetes.runs should have size 1
- kubernetes.inspects should have size 1
- kubernetes.rms should have size 1
- }
-
/*
* KUBERNETES COMMANDS
*/
@@ -229,11 +189,11 @@ class KubernetesContainerTests
implicit val kubernetes = stub[KubernetesApi]
val id = ContainerId("id")
- val container = new KubernetesContainer(id, ContainerAddress("ip"))
+ val container = new KubernetesContainer(id, ContainerAddress("ip"), "127.0.0.1", "docker://foo")
container.destroy()
- (kubernetes.rm(_: ContainerId)(_: TransactionId)).verify(id, transid)
+ (kubernetes.rm(_: KubernetesContainer)(_: TransactionId)).verify(container, transid)
}
/*
@@ -340,9 +300,9 @@ class KubernetesContainerTests
val logSrc = logSource(expectedLogEntry, appendSentinel = true)
implicit val kubernetes = new TestKubernetesClient {
- override def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+ override def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean)(
implicit transid: TransactionId): Source[TypedLogLine, Any] = {
- logCalls += ((id, sinceTime))
+ logCalls += ((container.id, sinceTime))
logSrc
}
}
@@ -365,9 +325,9 @@ class KubernetesContainerTests
val logSrc = logSource(expectedLogEntry, appendSentinel = false)
implicit val kubernetes = new TestKubernetesClient {
- override def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+ override def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean)(
implicit transid: TransactionId): Source[TypedLogLine, Any] = {
- logCalls += ((id, sinceTime))
+ logCalls += ((container.id, sinceTime))
logSrc
}
}
@@ -387,9 +347,9 @@ class KubernetesContainerTests
it should "fail log reading if error occurs during file reading" in {
implicit val kubernetes = new TestKubernetesClient {
- override def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+ override def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean)(
implicit transid: TransactionId): Source[TypedLogLine, Any] = {
- logCalls += ((containerId, sinceTime))
+ logCalls += ((container.id, sinceTime))
Source.failed(new IOException)
}
}
@@ -409,9 +369,9 @@ class KubernetesContainerTests
val logSources = mutable.Queue(logSource(firstLog, true), logSource(secondLog, true))
implicit val kubernetes = new TestKubernetesClient {
- override def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+ override def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean)(
implicit transid: TransactionId): Source[TypedLogLine, Any] = {
- logCalls += ((id, sinceTime))
+ logCalls += ((container.id, sinceTime))
logSources.dequeue()
}
}
@@ -441,9 +401,9 @@ class KubernetesContainerTests
rawLog should have size 1
implicit val kubernetes = new TestKubernetesClient {
- override def logs(containerId: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+ override def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean)(
implicit transid: TransactionId): Source[TypedLogLine, Any] = {
- logCalls += ((containerId, sinceTime))
+ logCalls += ((container.id, sinceTime))
// "Fakes" an infinite source with only 1 entry
Source.tick(0.milliseconds, 10.seconds, rawLog.head)
}
@@ -468,9 +428,9 @@ class KubernetesContainerTests
TypedLogLine(currentTsp, "stdout", "This is a log entry.")
implicit val kubernetes = new TestKubernetesClient {
- override def logs(containerId: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+ override def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean)(
implicit transid: TransactionId): Source[TypedLogLine, Any] = {
- logCalls += ((containerId, sinceTime))
+ logCalls += ((container.id, sinceTime))
logSource(Seq(expectedLogEntry, expectedLogEntry), appendSentinel = false)
}
}
@@ -501,9 +461,9 @@ class KubernetesContainerTests
TypedLogLine(currentTsp, "stdout", "This is a log entry.")
implicit val kubernetes = new TestKubernetesClient {
- override def logs(containerId: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+ override def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean)(
implicit transid: TransactionId): Source[TypedLogLine, Any] = {
- logCalls += ((containerId, sinceTime))
+ logCalls += ((container.id, sinceTime))
logSource(expectedLogEntry, appendSentinel = true)
}
}
--
To stop receiving notification emails like this one, please contact
markusthoemmes@apache.org.