You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2018/03/01 11:59:39 UTC

[incubator-openwhisk] branch master updated: Implement suspend/resume for KubernetesContainer through an InvokerAgent. (#3338)

This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bc6d1f  Implement suspend/resume for KubernetesContainer through  an InvokerAgent. (#3338)
3bc6d1f is described below

commit 3bc6d1fa99360f6afcdb05f22a6b531f43b0c4c5
Author: David Grove <dg...@users.noreply.github.com>
AuthorDate: Thu Mar 1 06:59:36 2018 -0500

    Implement suspend/resume for KubernetesContainer through  an InvokerAgent. (#3338)
---
 core/invoker/src/main/resources/application.conf   |  19 ++-
 .../kubernetes/KubernetesClient.scala              | 182 +++++++++++++++++----
 .../kubernetes/KubernetesContainer.scala           |  58 +++----
 .../kubernetes/KubernetesContainerFactory.scala    |   2 +-
 .../kubernetes/test/KubernetesClientTests.scala    | 165 ++++++++-----------
 .../kubernetes/test/KubernetesContainerTests.scala |  98 ++++-------
 6 files changed, 288 insertions(+), 236 deletions(-)

diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index 37d2f5f..c63e1b7 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -14,12 +14,19 @@ whisk {
     unpause: 10 seconds
   }
 
-  # Timeouts for k8s commands. Set to "Inf" to disable timeout.
-  kubernetes.timeouts {
-    run: 1 minute
-    rm: 1 minute
-    inspect: 1 minute
-    logs: 1 minute
+  kubernetes {
+    namespace: openwhisk
+    # Timeouts for k8s commands. Set to "Inf" to disable timeout.
+    timeouts {
+      run: 1 minute
+      rm: 1 minute
+      inspect: 1 minute
+      logs: 1 minute
+    }
+    invoker-agent {
+      enabled: false
+      port: 3233
+    }
   }
 
   # Timeouts for runc commands. Set to "Inf" to disable timeout.
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
index 39155d1..5f7c7d6 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -26,13 +26,16 @@ import java.time.format.DateTimeFormatterBuilder
 
 import akka.actor.ActorSystem
 import akka.event.Logging.{ErrorLevel, InfoLevel}
-import akka.http.scaladsl.model.Uri
+import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri}
 import akka.http.scaladsl.model.Uri.Path
 import akka.http.scaladsl.model.Uri.Query
 import akka.stream.{Attributes, Outlet, SourceShape}
+import akka.http.scaladsl.Http
+import akka.stream.ActorMaterializer
 import akka.stream.scaladsl.Source
 import akka.stream.stage._
 import akka.util.ByteString
+import io.fabric8.kubernetes.api.model._
 import pureconfig.loadConfigOrThrow
 import whisk.common.Logging
 import whisk.common.LoggingMarkers
@@ -41,17 +44,19 @@ import whisk.core.ConfigKeys
 import whisk.core.containerpool.ContainerId
 import whisk.core.containerpool.ContainerAddress
 import whisk.core.containerpool.docker.ProcessRunner
+import whisk.core.entity.ByteSize
+import whisk.core.entity.size._
 
 import scala.concurrent.duration.Duration
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.blocking
-import scala.concurrent.duration._
 import scala.util.Failure
 import scala.util.Success
 import scala.util.Try
 import spray.json._
 import spray.json.DefaultJsonProtocol._
+import collection.JavaConverters._
 import io.fabric8.kubernetes.client.ConfigBuilder
 import io.fabric8.kubernetes.client.DefaultKubernetesClient
 import okhttp3.{Call, Callback, Request, Response}
@@ -67,6 +72,18 @@ import scala.util.control.NonFatal
 case class KubernetesClientTimeoutConfig(run: Duration, rm: Duration, inspect: Duration, logs: Duration)
 
 /**
+ * Configuration for kubernetes invoker-agent
+ */
+case class KubernetesInvokerAgentConfig(enabled: Boolean, port: Int)
+
+/**
+ * General configuration for kubernetes client
+ */
+case class KubernetesClientConfig(namespace: String,
+                                  timeouts: KubernetesClientTimeoutConfig,
+                                  invokerAgent: KubernetesInvokerAgentConfig)
+
+/**
  * Serves as interface to the kubectl CLI tool.
  *
  * Be cautious with the ExecutionContext passed to this, as the
@@ -75,15 +92,16 @@ case class KubernetesClientTimeoutConfig(run: Duration, rm: Duration, inspect: D
  * You only need one instance (and you shouldn't get more).
  */
 class KubernetesClient(
-  timeouts: KubernetesClientTimeoutConfig = loadConfigOrThrow[KubernetesClientTimeoutConfig](
-    ConfigKeys.kubernetesTimeouts))(executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem)
+  config: KubernetesClientConfig = loadConfigOrThrow[KubernetesClientConfig](ConfigKeys.kubernetes))(
+  executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem)
     extends KubernetesApi
     with ProcessRunner {
   implicit private val ec = executionContext
+  implicit private val am = ActorMaterializer()
   implicit private val kubeRestClient = new DefaultKubernetesClient(
     new ConfigBuilder()
-      .withConnectionTimeout(timeouts.logs.toMillis.toInt)
-      .withRequestTimeout(timeouts.logs.toMillis.toInt)
+      .withConnectionTimeout(config.timeouts.logs.toMillis.toInt)
+      .withRequestTimeout(config.timeouts.logs.toMillis.toInt)
       .build())
 
   // Determines how to run kubectl. Failure to find a kubectl binary implies
@@ -99,43 +117,144 @@ class KubernetesClient(
   }
   protected val kubectlCmd = Seq(findKubectlCmd)
 
-  def run(name: String, image: String, args: Seq[String] = Seq.empty[String])(
-    implicit transid: TransactionId): Future[ContainerId] = {
-    runCmd(Seq("run", name, s"--image=$image") ++ args, timeouts.run)
-      .map(_ => ContainerId(name))
-  }
+  def run(name: String,
+          image: String,
+          memory: ByteSize = 256.MB,
+          environment: Map[String, String] = Map.empty,
+          labels: Map[String, String] = Map.empty)(implicit transid: TransactionId): Future[KubernetesContainer] = {
+
+    val envVars = environment.map {
+      case (key, value) => new EnvVarBuilder().withName(key).withValue(value).build()
+    }.toSeq
+
+    val pod = new PodBuilder()
+      .withNewMetadata()
+      .withName(name)
+      .addToLabels("name", name)
+      .addToLabels(labels.asJava)
+      .endMetadata()
+      .withNewSpec()
+      .withRestartPolicy("Always")
+      .addNewContainer()
+      .withNewResources()
+      .withLimits(Map("memory" -> new Quantity(memory.toMB + "Mi")).asJava)
+      .endResources()
+      .withName("user-action")
+      .withImage(image)
+      .withEnv(envVars.asJava)
+      .addNewPort()
+      .withContainerPort(8080)
+      .withName("action")
+      .endPort()
+      .endContainer()
+      .endSpec()
+      .build()
+
+    kubeRestClient.pods.inNamespace(config.namespace).create(pod)
 
-  def inspectIPAddress(id: ContainerId)(implicit transid: TransactionId): Future[ContainerAddress] = {
     Future {
       blocking {
-        val pod =
-          kubeRestClient.pods().withName(id.asString).waitUntilReady(timeouts.inspect.length, timeouts.inspect.unit)
-        ContainerAddress(pod.getStatus().getPodIP())
+        val createdPod = kubeRestClient.pods
+          .inNamespace(config.namespace)
+          .withName(name)
+          .waitUntilReady(config.timeouts.run.length, config.timeouts.run.unit)
+        toContainer(createdPod)
       }
     }.recoverWith {
       case e =>
-        log.error(this, s"Failed to get IP of Pod '${id.asString}' within timeout: ${e.getClass} - ${e.getMessage}")
-        Future.failed(new Exception(s"Failed to get IP of Pod '${id.asString}'"))
+        log.error(this, s"Failed create pod for '$name': ${e.getClass} - ${e.getMessage}")
+        Future.failed(new Exception(s"Failed to create pod '$name'"))
+    }
+  }
+
+  def rm(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = {
+    runCmd(Seq("delete", "--now", "pod", container.id.asString), config.timeouts.rm).map(_ => ())
+  }
+
+  def rm(key: String, value: String, ensureUnpaused: Boolean = false)(implicit transid: TransactionId): Future[Unit] = {
+    if (ensureUnpaused && config.invokerAgent.enabled) {
+      // The caller can't guarantee that every container with the label key=value is already unpaused.
+      // Therefore we must enumerate them and ensure they are unpaused before we attempt to delete them.
+      Future {
+        blocking {
+          kubeRestClient
+            .inNamespace(config.namespace)
+            .pods()
+            .withLabel(key, value)
+            .list()
+            .getItems
+            .asScala
+            .map { pod =>
+              val container = toContainer(pod)
+              container
+                .resume()
+                .recover { case _ => () } // Ignore errors; it is possible the container was not actually suspended.
+                .map(_ => rm(container))
+            }
+        }
+      }.flatMap(futures =>
+        Future
+          .sequence(futures)
+          .map(_ => ()))
+    } else {
+      runCmd(Seq("delete", "--now", "pod", "-l", s"$key=$value"), config.timeouts.rm).map(_ => ())
     }
   }
 
-  def rm(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
-    runCmd(Seq("delete", "--now", "pod", id.asString), timeouts.rm).map(_ => ())
+  def suspend(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = {
+    if (config.invokerAgent.enabled) {
+      agentCommand("suspend", container)
+        .map { response =>
+          response.discardEntityBytes()
+        }
+    } else {
+      Future.successful({})
+    }
+  }
 
-  def rm(key: String, value: String)(implicit transid: TransactionId): Future[Unit] =
-    runCmd(Seq("delete", "--now", "pod", "-l", s"$key=$value"), timeouts.rm).map(_ => ())
+  def resume(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = {
+    if (config.invokerAgent.enabled) {
+      agentCommand("resume", container)
+        .map { response =>
+          response.discardEntityBytes()
+        }
+    } else {
+      Future.successful({})
+    }
+  }
 
-  def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean = false)(
+  def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean = false)(
     implicit transid: TransactionId): Source[TypedLogLine, Any] = {
 
     log.debug(this, "Parsing logs from Kubernetes Graph Stageā€¦")
 
     Source
-      .fromGraph(new KubernetesRestLogSourceStage(id, sinceTime, waitForSentinel))
+      .fromGraph(new KubernetesRestLogSourceStage(container.id, sinceTime, waitForSentinel))
       .log("foobar")
 
   }
 
+  private def toContainer(pod: Pod): KubernetesContainer = {
+    val id = ContainerId(pod.getMetadata.getName)
+    val addr = ContainerAddress(pod.getStatus.getPodIP)
+    val workerIP = pod.getStatus.getHostIP
+    // Extract the native (docker or containerd) containerId for the container
+    // By convention, kubernetes adds a docker:// prefix when using docker as the low-level container engine
+    val nativeContainerId = pod.getStatus.getContainerStatuses.get(0).getContainerID.stripPrefix("docker://")
+    implicit val kubernetes = this
+    new KubernetesContainer(id, addr, workerIP, nativeContainerId)
+  }
+
+  // Forward a command to invoker-agent daemonset instance on container's worker node
+  private def agentCommand(command: String, container: KubernetesContainer): Future[HttpResponse] = {
+    val uri = Uri()
+      .withScheme("http")
+      .withHost(container.workerIP)
+      .withPort(config.invokerAgent.port)
+      .withPath(Path / command / container.nativeContainerId)
+    Http().singleRequest(HttpRequest(uri = uri))
+  }
+
   private def runCmd(args: Seq[String], timeout: Duration)(implicit transid: TransactionId): Future[String] = {
     val cmd = kubectlCmd ++ args
     val start = transid.started(
@@ -171,16 +290,21 @@ object KubernetesClient {
 }
 
 trait KubernetesApi {
-  def run(name: String, image: String, args: Seq[String] = Seq.empty[String])(
-    implicit transid: TransactionId): Future[ContainerId]
+  def run(name: String,
+          image: String,
+          memory: ByteSize,
+          environment: Map[String, String] = Map.empty,
+          labels: Map[String, String] = Map.empty)(implicit transid: TransactionId): Future[KubernetesContainer]
+
+  def rm(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit]
 
-  def inspectIPAddress(id: ContainerId)(implicit transid: TransactionId): Future[ContainerAddress]
+  def rm(key: String, value: String, ensureUnpaused: Boolean)(implicit transid: TransactionId): Future[Unit]
 
-  def rm(id: ContainerId)(implicit transid: TransactionId): Future[Unit]
+  def suspend(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit]
 
-  def rm(key: String, value: String)(implicit transid: TransactionId): Future[Unit]
+  def resume(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit]
 
-  def logs(containerId: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean = false)(
+  def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean = false)(
     implicit transid: TransactionId): Source[TypedLogLine, Any]
 }
 
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
index 4c4ccdc..f1d03d1 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -50,46 +50,26 @@ object KubernetesContainer {
    *     or is an OpenWhisk provided image
    * @param labels labels to set on the container
    * @param name optional name for the container
-   * @return a Future which either completes with a KubernetesContainer or one of two specific failures
+   * @return a Future which either completes with a KubernetesContainer or a failure to create a container
    */
   def create(transid: TransactionId,
              name: String,
              image: String,
              userProvidedImage: Boolean = false,
              memory: ByteSize = 256.MB,
-             environment: Map[String, String] = Map(),
-             labels: Map[String, String] = Map())(implicit kubernetes: KubernetesApi,
-                                                  ec: ExecutionContext,
-                                                  log: Logging): Future[KubernetesContainer] = {
+             environment: Map[String, String] = Map.empty,
+             labels: Map[String, String] = Map.empty)(implicit kubernetes: KubernetesApi,
+                                                      ec: ExecutionContext,
+                                                      log: Logging): Future[KubernetesContainer] = {
     implicit val tid = transid
 
     val podName = name.replace("_", "-").replaceAll("[()]", "").toLowerCase()
 
-    val environmentArgs = environment.flatMap {
-      case (key, value) => Seq("--env", s"$key=$value")
-    }.toSeq
-
-    val labelArgs = labels.map {
-      case (key, value) => s"$key=$value"
-    } match {
-      case Seq() => Seq()
-      case pairs => Seq("-l") ++ pairs
-    }
-
-    val args = Seq("--generator", "run-pod/v1", "--restart", "Always", "--limits", s"memory=${memory.toMB}Mi") ++ environmentArgs ++ labelArgs
-
     for {
-      id <- kubernetes.run(podName, image, args).recoverWith {
-        case _ => Future.failed(WhiskContainerStartupError(Messages.resourceProvisionError))
-      }
-      ip <- kubernetes.inspectIPAddress(id).recoverWith {
-        // remove the container immediately if inspect failed as
-        // we cannot recover that case automatically
-        case _ =>
-          kubernetes.rm(id)
-          Future.failed(WhiskContainerStartupError(Messages.resourceProvisionError))
+      container <- kubernetes.run(podName, image, memory, environment, labels).recoverWith {
+        case _ => Future.failed(WhiskContainerStartupError(s"Failed to run container with image '${image}'."))
       }
-    } yield new KubernetesContainer(id, ip)
+    } yield container
   }
 
 }
@@ -103,11 +83,15 @@ object KubernetesContainer {
  * @constructor
  * @param id the id of the container
  * @param addr the ip & port of the container
+ * @param workerIP the ip of the workernode on which the container is executing
+ * @param nativeContainerId the docker/containerd lowlevel id for the container
  */
-class KubernetesContainer(protected val id: ContainerId, protected val addr: ContainerAddress)(
-  implicit kubernetes: KubernetesApi,
-  protected val ec: ExecutionContext,
-  protected val logging: Logging)
+class KubernetesContainer(protected[core] val id: ContainerId,
+                          protected[core] val addr: ContainerAddress,
+                          protected[core] val workerIP: String,
+                          protected[core] val nativeContainerId: String)(implicit kubernetes: KubernetesApi,
+                                                                         protected val ec: ExecutionContext,
+                                                                         protected val logging: Logging)
     extends Container {
 
   /** The last read timestamp in the log file */
@@ -115,15 +99,13 @@ class KubernetesContainer(protected val id: ContainerId, protected val addr: Con
 
   protected val waitForLogs: FiniteDuration = 2.seconds
 
-  // no-op under Kubernetes
-  def suspend()(implicit transid: TransactionId): Future[Unit] = Future.successful({})
+  def suspend()(implicit transid: TransactionId): Future[Unit] = kubernetes.suspend(this)
 
-  // no-op under Kubernetes
-  def resume()(implicit transid: TransactionId): Future[Unit] = Future.successful({})
+  def resume()(implicit transid: TransactionId): Future[Unit] = kubernetes.resume(this)
 
   override def destroy()(implicit transid: TransactionId): Future[Unit] = {
     super.destroy()
-    kubernetes.rm(id)
+    kubernetes.rm(this)
   }
 
   private val stringSentinel = DockerContainer.ActivationSentinel.utf8String
@@ -131,7 +113,7 @@ class KubernetesContainer(protected val id: ContainerId, protected val addr: Con
   def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Source[ByteString, Any] = {
 
     kubernetes
-      .logs(id, lastTimestamp.get, waitForSentinel)
+      .logs(this, lastTimestamp.get, waitForSentinel)
       .limitWeighted(limit.toBytes) { obj =>
         // Adding + 1 since we know there's a newline byte being read
         obj.jsonSize.toLong + 1
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
index 219c8a3..b06ec5c 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
@@ -46,7 +46,7 @@ class KubernetesContainerFactory(label: String, config: WhiskConfig)(implicit ac
 
   override def cleanup() = {
     logging.info(this, "Cleaning up function runtimes")
-    val cleaning = kubernetes.rm("invoker", label)(TransactionId.invokerNanny)
+    val cleaning = kubernetes.rm("invoker", label, true)(TransactionId.invokerNanny)
     Await.ready(cleaning, 30.seconds)
   }
 
diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
index 0b14042..0eb4f95 100644
--- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
@@ -37,12 +37,17 @@ import org.scalatest.Matchers
 import org.scalatest.time.{Seconds, Span}
 import common.{StreamLogging, WskActorSystem}
 import okio.Buffer
-import whisk.common.LogMarker
-import whisk.common.LoggingMarkers.INVOKER_KUBECTL_CMD
 import whisk.common.TransactionId
 import whisk.core.containerpool.{ContainerAddress, ContainerId}
-import whisk.core.containerpool.kubernetes.{KubernetesApi, KubernetesClient, KubernetesRestLogSourceStage, TypedLogLine}
-import whisk.core.containerpool.docker.ProcessRunningException
+import whisk.core.containerpool.kubernetes.{
+  KubernetesApi,
+  KubernetesClient,
+  KubernetesContainer,
+  KubernetesRestLogSourceStage,
+  TypedLogLine
+}
+import whisk.core.entity.ByteSize
+import whisk.core.entity.size._
 
 import scala.collection.mutable
 import scala.collection.immutable
@@ -70,6 +75,7 @@ class KubernetesClientTests
 
   implicit val transid = TransactionId.testing
   val id = ContainerId("55db56ee082239428b27d3728b4dd324c09068458aad9825727d5bfc1bba6d52")
+  val container = kubernetesContainer(id)
 
   val commandTimeout = 500.milliseconds
   def await[A](f: Future[A], timeout: FiniteDuration = commandTimeout) = Await.result(f, timeout)
@@ -83,73 +89,12 @@ class KubernetesClientTests
       fixture
   }
 
-  behavior of "KubernetesClient"
-
-  it should "write proper log markers on a successful command" in {
-    // a dummy string works here as we do not assert any output
-    // from the methods below
-    val stdout = "stdout"
-    val client = kubernetesClient { Future.successful(stdout) }
-
-    /** Awaits the command and checks for proper logging. */
-    def runAndVerify(f: Future[_], cmd: String, args: Seq[String]) = {
-      val result = await(f)
-
-      logLines.head should include((Seq(kubectlCommand, cmd) ++ args).mkString(" "))
-
-      val start = LogMarker.parse(logLines.head)
-      start.token shouldBe INVOKER_KUBECTL_CMD(cmd)
-
-      val end = LogMarker.parse(logLines.last)
-      end.token shouldBe INVOKER_KUBECTL_CMD(cmd).asFinish
-
-      stream.reset()
-      result
-    }
-
-    runAndVerify(client.rm(id), "delete", Seq("--now", "pod", id.asString))
+  def kubernetesContainer(id: ContainerId) =
+    new KubernetesContainer(id, ContainerAddress("ip"), "ip", "docker://" + id.asString)(kubernetesClient {
+      Future.successful("")
+    }, global, logging)
 
-    val image = "image"
-    val name = "name"
-    val expected = Seq(name, s"--image=$image")
-    runAndVerify(client.run(name, image), "run", expected) shouldBe ContainerId(name)
-  }
-
-  it should "write proper log markers on a failing command" in {
-    val client = kubernetesClient { Future.failed(new RuntimeException()) }
-
-    /** Awaits the command, asserts the exception and checks for proper logging. */
-    def runAndVerify(f: Future[_], cmd: String) = {
-      a[RuntimeException] should be thrownBy await(f)
-
-      val start = LogMarker.parse(logLines.head)
-      start.token shouldBe INVOKER_KUBECTL_CMD(cmd)
-
-      val end = LogMarker.parse(logLines.last)
-      end.token shouldBe INVOKER_KUBECTL_CMD(cmd).asError
-
-      stream.reset()
-    }
-
-    runAndVerify(client.rm(id), "delete")
-    runAndVerify(client.run("name", "image"), "run")
-  }
-
-  it should "fail with ProcessRunningException when run returns with exit code !=125 or no container ID" in {
-    def runAndVerify(pre: ProcessRunningException, clue: String) = {
-      val client = kubernetesClient { Future.failed(pre) }
-      withClue(s"${clue} - exitCode = ${pre.exitCode}, stdout = '${pre.stdout}', stderr = '${pre.stderr}': ") {
-        the[ProcessRunningException] thrownBy await(client.run("name", "image")) shouldBe pre
-      }
-    }
-
-    Seq[(ProcessRunningException, String)](
-      (ProcessRunningException(126, id.asString, "Unknown command"), "Exit code not 125"),
-      (ProcessRunningException(125, "", "Unknown flag: --foo"), "No container ID"),
-      (ProcessRunningException(1, "", ""), "Exit code not 125 and no container ID")).foreach {
-      case (pre, clue) => runAndVerify(pre, clue)
-    }
-  }
+  behavior of "KubernetesClient"
 
   val firstLog = """2018-02-06T00:00:18.419889342Z first activation
                    |2018-02-06T00:00:18.419929471Z XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
@@ -172,14 +117,32 @@ class KubernetesClientTests
         .readLines(new Buffer().writeUtf8(secondLog), lastTimestamp, List.empty)
         .to[immutable.Seq])
 
+  it should "forward suspend commands to the client" in {
+    implicit val kubernetes = new TestKubernetesClient
+    val id = ContainerId("id")
+    val container = new KubernetesContainer(id, ContainerAddress("ip"), "127.0.0.1", "docker://foo")
+    container.suspend()
+    kubernetes.suspends should have size 1
+    kubernetes.suspends(0) shouldBe id
+  }
+
+  it should "forward resume commands to the client" in {
+    implicit val kubernetes = new TestKubernetesClient
+    val id = ContainerId("id")
+    val container = new KubernetesContainer(id, ContainerAddress("ip"), "127.0.0.1", "docker://foo")
+    container.resume()
+    kubernetes.resumes should have size 1
+    kubernetes.resumes(0) shouldBe id
+  }
+
   it should "return all logs when no sinceTime passed" in {
     val client = new TestKubernetesClient {
-      override def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+      override def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean)(
         implicit transid: TransactionId): Source[TypedLogLine, Any] = {
         firstSource()
       }
     }
-    val logs = awaitLogs(client.logs(id, None))
+    val logs = awaitLogs(client.logs(container, None))
     logs should have size 3
     logs(0) shouldBe TypedLogLine("2018-02-06T00:00:18.419889342Z", "stdout", "first activation")
     logs(2) shouldBe TypedLogLine("2018-02-06T00:00:18.419988733Z", "stdout", "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
@@ -189,12 +152,12 @@ class KubernetesClientTests
 
     val testDate: Option[Instant] = "2018-02-06T00:00:18.419988733Z"
     val client = new TestKubernetesClient {
-      override def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+      override def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean)(
         implicit transid: TransactionId): Source[TypedLogLine, Any] = {
         Source.combine(firstSource(testDate), secondSource(testDate))(Concat(_))
       }
     }
-    val logs = awaitLogs(client.logs(id, testDate))
+    val logs = awaitLogs(client.logs(container, testDate))
     logs should have size 3
     logs(0) shouldBe TypedLogLine("2018-02-06T00:09:35.38267193Z", "stdout", "second activation")
     logs(2) shouldBe TypedLogLine("2018-02-06T00:09:35.383116503Z", "stdout", "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
@@ -203,12 +166,12 @@ class KubernetesClientTests
   it should "return all logs if none match sinceTime" in {
     val testDate: Option[Instant] = "2018-02-06T00:00:18.419988733Z"
     val client = new TestKubernetesClient {
-      override def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+      override def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean)(
         implicit transid: TransactionId): Source[TypedLogLine, Any] = {
         secondSource(testDate)
       }
     }
-    val logs = awaitLogs(client.logs(id, testDate))
+    val logs = awaitLogs(client.logs(container, testDate))
     logs should have size 3
     logs(0) shouldBe TypedLogLine("2018-02-06T00:09:35.38267193Z", "stdout", "second activation")
     logs(2) shouldBe TypedLogLine("2018-02-06T00:09:35.383116503Z", "stdout", "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
@@ -225,36 +188,52 @@ object KubernetesClientTests {
   implicit def strToInstant(str: String): Instant =
     strToDate(str).get
 
-  class TestKubernetesClient extends KubernetesApi {
-    var runs = mutable.Buffer.empty[(String, String, Seq[String])]
-    var inspects = mutable.Buffer.empty[ContainerId]
+  class TestKubernetesClient extends KubernetesApi with StreamLogging {
+    var runs = mutable.Buffer.empty[(String, String, Map[String, String], Map[String, String])]
     var rms = mutable.Buffer.empty[ContainerId]
     var rmByLabels = mutable.Buffer.empty[(String, String)]
+    var resumes = mutable.Buffer.empty[ContainerId]
+    var suspends = mutable.Buffer.empty[ContainerId]
     var logCalls = mutable.Buffer.empty[(ContainerId, Option[Instant])]
 
-    def run(name: String, image: String, args: Seq[String] = Seq.empty[String])(
-      implicit transid: TransactionId): Future[ContainerId] = {
-      runs += ((name, image, args))
-      Future.successful(ContainerId("testId"))
+    def run(name: String,
+            image: String,
+            memory: ByteSize = 256.MB,
+            env: Map[String, String] = Map.empty,
+            labels: Map[String, String] = Map.empty)(implicit transid: TransactionId): Future[KubernetesContainer] = {
+      runs += ((name, image, env, labels))
+      implicit val kubernetes = this
+      val containerId = ContainerId("id")
+      val addr: ContainerAddress = ContainerAddress("ip")
+      val workerIP: String = "127.0.0.1"
+      val nativeContainerId: String = "docker://" + containerId.asString
+      Future.successful(new KubernetesContainer(containerId, addr, workerIP, nativeContainerId))
     }
 
-    def inspectIPAddress(id: ContainerId)(implicit transid: TransactionId): Future[ContainerAddress] = {
-      inspects += id
-      Future.successful(ContainerAddress("testIp"))
-    }
-
-    def rm(id: ContainerId)(implicit transid: TransactionId): Future[Unit] = {
-      rms += id
+    def rm(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = {
+      rms += container.id
       Future.successful(())
     }
 
-    def rm(key: String, value: String)(implicit transid: TransactionId): Future[Unit] = {
+    def rm(key: String, value: String, ensureUnpause: Boolean = false)(
+      implicit transid: TransactionId): Future[Unit] = {
       rmByLabels += ((key, value))
       Future.successful(())
     }
-    def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean = false)(
+
+    def resume(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = {
+      resumes += (container.id)
+      Future.successful({})
+    }
+
+    def suspend(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = {
+      suspends += (container.id)
+      Future.successful({})
+    }
+
+    def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean = false)(
       implicit transid: TransactionId): Source[TypedLogLine, Any] = {
-      logCalls += ((id, sinceTime))
+      logCalls += ((container.id, sinceTime))
       Source(List.empty[TypedLogLine])
     }
   }
diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index 80dc0a3..3415b68 100644
--- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -43,7 +43,8 @@ import whisk.common.TransactionId
 import whisk.core.containerpool._
 import whisk.core.containerpool.kubernetes._
 import whisk.core.containerpool.docker._
-import whisk.core.entity.ActivationResponse
+import whisk.core.entity.{ActivationResponse, ByteSize}
+import whisk.core.entity.size._
 import whisk.core.entity.ActivationResponse.ContainerResponse
 import whisk.core.entity.ActivationResponse.Timeout
 import whisk.core.entity.size._
@@ -98,7 +99,7 @@ class KubernetesContainerTests
       Future.successful(RunResult(intervalOf(1.millisecond), Right(ContainerResponse(true, "", None)))),
     awaitLogs: FiniteDuration = 2.seconds)(implicit kubernetes: KubernetesApi): KubernetesContainer = {
 
-    new KubernetesContainer(id, addr) {
+    new KubernetesContainer(id, addr, addr.host, "docker://" + id.asString) {
       override protected def callContainer(
         path: String,
         body: JsObject,
@@ -140,23 +141,13 @@ class KubernetesContainerTests
     await(container)
 
     kubernetes.runs should have size 1
-    kubernetes.inspects should have size 1
     kubernetes.rms should have size 0
 
-    val (testName, testImage, testArgs) = kubernetes.runs.head
+    val (testName, testImage, testEnv, testLabel) = kubernetes.runs.head
     testName shouldBe "my-container1"
     testImage shouldBe image
-    testArgs shouldBe Seq(
-      "--generator",
-      "run-pod/v1",
-      "--restart",
-      "Always",
-      "--limits",
-      "memory=256Mi",
-      "--env",
-      "test=hi",
-      "-l",
-      "invoker=0")
+    testEnv shouldBe environment
+    testLabel shouldBe labels
   }
 
   it should "pull a user provided image before creating the container" in {
@@ -167,31 +158,18 @@ class KubernetesContainerTests
     await(container)
 
     kubernetes.runs should have size 1
-    kubernetes.inspects should have size 1
     kubernetes.rms should have size 0
   }
 
-  it should "remove the container if inspect fails" in {
-    implicit val kubernetes = new TestKubernetesClient {
-      override def inspectIPAddress(id: ContainerId)(implicit transid: TransactionId): Future[ContainerAddress] = {
-        inspects += id
-        Future.failed(new RuntimeException())
-      }
-    }
-
-    val container = KubernetesContainer.create(transid = transid, name = "name", image = "image")
-    a[WhiskContainerStartupError] should be thrownBy await(container)
-
-    kubernetes.runs should have size 1
-    kubernetes.inspects should have size 1
-    kubernetes.rms should have size 1
-  }
-
   it should "provide a proper error if run fails for blackbox containers" in {
     implicit val kubernetes = new TestKubernetesClient {
-      override def run(name: String, image: String, args: Seq[String])(
-        implicit transid: TransactionId): Future[ContainerId] = {
-        runs += ((name, image, args))
+      override def run(
+        name: String,
+        image: String,
+        memory: ByteSize = 256.MB,
+        env: Map[String, String] = Map.empty,
+        labels: Map[String, String] = Map.empty)(implicit transid: TransactionId): Future[KubernetesContainer] = {
+        runs += ((name, image, env, labels))
         Future.failed(ProcessRunningException(1, "", ""))
       }
     }
@@ -201,27 +179,9 @@ class KubernetesContainerTests
     a[WhiskContainerStartupError] should be thrownBy await(container)
 
     kubernetes.runs should have size 1
-    kubernetes.inspects should have size 0
     kubernetes.rms should have size 0
   }
 
-  it should "provide a proper error if inspect fails for blackbox containers" in {
-    implicit val kubernetes = new TestKubernetesClient {
-      override def inspectIPAddress(id: ContainerId)(implicit transid: TransactionId): Future[ContainerAddress] = {
-        inspects += id
-        Future.failed(new RuntimeException())
-      }
-    }
-
-    val container =
-      KubernetesContainer.create(transid = transid, name = "name", image = "image", userProvidedImage = true)
-    a[WhiskContainerStartupError] should be thrownBy await(container)
-
-    kubernetes.runs should have size 1
-    kubernetes.inspects should have size 1
-    kubernetes.rms should have size 1
-  }
-
   /*
    * KUBERNETES COMMANDS
    */
@@ -229,11 +189,11 @@ class KubernetesContainerTests
     implicit val kubernetes = stub[KubernetesApi]
 
     val id = ContainerId("id")
-    val container = new KubernetesContainer(id, ContainerAddress("ip"))
+    val container = new KubernetesContainer(id, ContainerAddress("ip"), "127.0.0.1", "docker://foo")
 
     container.destroy()
 
-    (kubernetes.rm(_: ContainerId)(_: TransactionId)).verify(id, transid)
+    (kubernetes.rm(_: KubernetesContainer)(_: TransactionId)).verify(container, transid)
   }
 
   /*
@@ -340,9 +300,9 @@ class KubernetesContainerTests
     val logSrc = logSource(expectedLogEntry, appendSentinel = true)
 
     implicit val kubernetes = new TestKubernetesClient {
-      override def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+      override def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean)(
         implicit transid: TransactionId): Source[TypedLogLine, Any] = {
-        logCalls += ((id, sinceTime))
+        logCalls += ((container.id, sinceTime))
         logSrc
       }
     }
@@ -365,9 +325,9 @@ class KubernetesContainerTests
     val logSrc = logSource(expectedLogEntry, appendSentinel = false)
 
     implicit val kubernetes = new TestKubernetesClient {
-      override def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+      override def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean)(
         implicit transid: TransactionId): Source[TypedLogLine, Any] = {
-        logCalls += ((id, sinceTime))
+        logCalls += ((container.id, sinceTime))
         logSrc
       }
     }
@@ -387,9 +347,9 @@ class KubernetesContainerTests
 
   it should "fail log reading if error occurs during file reading" in {
     implicit val kubernetes = new TestKubernetesClient {
-      override def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+      override def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean)(
         implicit transid: TransactionId): Source[TypedLogLine, Any] = {
-        logCalls += ((containerId, sinceTime))
+        logCalls += ((container.id, sinceTime))
         Source.failed(new IOException)
       }
     }
@@ -409,9 +369,9 @@ class KubernetesContainerTests
     val logSources = mutable.Queue(logSource(firstLog, true), logSource(secondLog, true))
 
     implicit val kubernetes = new TestKubernetesClient {
-      override def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+      override def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean)(
         implicit transid: TransactionId): Source[TypedLogLine, Any] = {
-        logCalls += ((id, sinceTime))
+        logCalls += ((container.id, sinceTime))
         logSources.dequeue()
       }
     }
@@ -441,9 +401,9 @@ class KubernetesContainerTests
     rawLog should have size 1
 
     implicit val kubernetes = new TestKubernetesClient {
-      override def logs(containerId: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+      override def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean)(
         implicit transid: TransactionId): Source[TypedLogLine, Any] = {
-        logCalls += ((containerId, sinceTime))
+        logCalls += ((container.id, sinceTime))
         // "Fakes" an infinite source with only 1 entry
         Source.tick(0.milliseconds, 10.seconds, rawLog.head)
       }
@@ -468,9 +428,9 @@ class KubernetesContainerTests
       TypedLogLine(currentTsp, "stdout", "This is a log entry.")
 
     implicit val kubernetes = new TestKubernetesClient {
-      override def logs(containerId: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+      override def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean)(
         implicit transid: TransactionId): Source[TypedLogLine, Any] = {
-        logCalls += ((containerId, sinceTime))
+        logCalls += ((container.id, sinceTime))
         logSource(Seq(expectedLogEntry, expectedLogEntry), appendSentinel = false)
       }
     }
@@ -501,9 +461,9 @@ class KubernetesContainerTests
       TypedLogLine(currentTsp, "stdout", "This is a log entry.")
 
     implicit val kubernetes = new TestKubernetesClient {
-      override def logs(containerId: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+      override def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean)(
         implicit transid: TransactionId): Source[TypedLogLine, Any] = {
-        logCalls += ((containerId, sinceTime))
+        logCalls += ((container.id, sinceTime))
         logSource(expectedLogEntry, appendSentinel = true)
       }
     }

-- 
To stop receiving notification emails like this one, please contact
markusthoemmes@apache.org.