You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2018/02/23 18:38:05 UTC

[incubator-openwhisk] branch master updated: Kubernetes ContainerFactoryProvider implementation (#3219)

This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 83deb85  Kubernetes ContainerFactoryProvider implementation (#3219)
83deb85 is described below

commit 83deb85c7913e824d29dd9c98e9898d4eff34417
Author: Jim Crossley <ji...@crossleys.org>
AuthorDate: Fri Feb 23 13:37:59 2018 -0500

    Kubernetes ContainerFactoryProvider implementation (#3219)
    
    This is an implementation of the ContainerFactory SPI for Kubernetes,
    essentially mirroring the structure of Docker{Container,Client}
    classes and tests with `docker` commands replaced with either
    `kubectl` or the fabric8 client.
    
    Regarding the activation logs returned from k8s, we're massaging them
    to make them look like the json-formatted files the ActionLogDriver
    expects.
    
    Instead of the offset logic used for Docker, we save the timestamp
    from the last message logged from the previous activation, hence the
    'sinceTime' parameter. This value has sub-second granularity but the
    k8s api does not, so we may see messages older than 'sinceTime' and we
    need to ignore them.
    
    Also, the k8s api can't distinguish between messages logged to stdout
    and stderr, so we assume they all go to stdout.
    See https://github.com/kubernetes/kubernetes/issues/28167
    
    A few of the Docker test helpers were converted to singleton objects
    for re-use in the Kubernetes tests.
    
    Co-authored-by:  Ben Browning <bb...@redhat.com>
    Co-authored-by:  Brendan McAdams <bm...@redhat.com>
    Co-authored-by:  Jim Crossley <ji...@crossleys.org>
---
 common/scala/build.gradle                          |   1 +
 .../src/main/scala/whisk/common/Logging.scala      |   1 +
 .../src/main/scala/whisk/core/WhiskConfig.scala    |   3 +
 core/invoker/Dockerfile                            |  10 +-
 core/invoker/src/main/resources/application.conf   |  10 +-
 .../kubernetes/KubernetesClient.scala              | 380 ++++++++++++++
 .../kubernetes/KubernetesContainer.scala           | 158 ++++++
 .../kubernetes/KubernetesContainerFactory.scala    |  82 +++
 .../docker/test/DockerContainerTests.scala         |  42 +-
 .../kubernetes/test/KubernetesClientTests.scala    | 261 ++++++++++
 .../kubernetes/test/KubernetesContainerTests.scala | 552 +++++++++++++++++++++
 11 files changed, 1479 insertions(+), 21 deletions(-)

diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 1a44bce..8e0aeed 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -41,6 +41,7 @@ dependencies {
     }
     compile 'com.github.ben-manes.caffeine:caffeine:2.4.0'
     compile 'com.google.code.findbugs:jsr305:3.0.2'
+    compile 'io.fabric8:kubernetes-client:2.5.7'
     compile 'io.kamon:kamon-core_2.11:0.6.7'
     compile 'io.kamon:kamon-statsd_2.11:0.6.7'
 }
diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala
index cce6b91..37cc1e3 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -263,6 +263,7 @@ object LoggingMarkers {
   def INVOKER_RUNC_CMD(cmd: String) = LogMarkerToken(invoker, s"runc.$cmd", start)
   def INVOKER_CONTAINER_START(containerState: String) =
     LogMarkerToken(invoker, s"container_start_${containerState}", count)
+  def INVOKER_KUBECTL_CMD(cmd: String) = LogMarkerToken(invoker, s"kubectl.$cmd", start)
 
   /*
    * General markers
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 79c6d89..aaa6a89 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -256,6 +256,9 @@ object ConfigKeys {
   val containerFactory = "whisk.container-factory"
   val containerArgs = s"$containerFactory.container-args"
 
+  val kubernetes = "whisk.kubernetes"
+  val kubernetesTimeouts = s"$kubernetes.timeouts"
+
   val transactions = "whisk.transactions"
   val stride = s"$transactions.stride"
 
diff --git a/core/invoker/Dockerfile b/core/invoker/Dockerfile
index da33429..1bc2164 100644
--- a/core/invoker/Dockerfile
+++ b/core/invoker/Dockerfile
@@ -1,6 +1,9 @@
 FROM scala
 
 ENV DOCKER_VERSION 1.12.0
+ENV KUBERNETES_VERSION 1.6.4
+
+RUN apk add --update openssl
 
 # Uncomment to fetch latest version of docker instead: RUN wget -qO- https://get.docker.com | sh
 # Install docker client
@@ -11,6 +14,11 @@ rm -f docker-${DOCKER_VERSION}.tgz && \
 chmod +x /usr/bin/docker && \
 chmod +x /usr/bin/docker-runc
 
+# Install kubernetes client
+RUN wget --no-verbose https://storage.googleapis.com/kubernetes-release/release/v${KUBERNETES_VERSION}/bin/linux/amd64/kubectl && \
+chmod +x kubectl && \
+mv kubectl /usr/bin/kubectl
+
 COPY build/distributions/invoker.tar ./
 RUN tar xf invoker.tar && \
 rm -f invoker.tar
@@ -19,4 +27,4 @@ COPY init.sh /
 RUN chmod +x init.sh
 
 EXPOSE 8080
-CMD ["init.sh", "0"]
\ No newline at end of file
+CMD ["init.sh", "0"]
diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index 94620dd..37d2f5f 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -14,6 +14,14 @@ whisk {
     unpause: 10 seconds
   }
 
+  # Timeouts for k8s commands. Set to "Inf" to disable timeout.
+  kubernetes.timeouts {
+    run: 1 minute
+    rm: 1 minute
+    inspect: 1 minute
+    logs: 1 minute
+  }
+
   # Timeouts for runc commands. Set to "Inf" to disable timeout.
   runc.timeouts {
     pause: 10 seconds
@@ -27,4 +35,4 @@ whisk {
     extra-args: {}
 
   }
-}
\ No newline at end of file
+}
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
new file mode 100644
index 0000000..39155d1
--- /dev/null
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.kubernetes
+
+import java.io.{FileNotFoundException, IOException}
+import java.net.SocketTimeoutException
+import java.nio.file.Files
+import java.nio.file.Paths
+import java.time.{Instant, ZoneId}
+import java.time.format.DateTimeFormatterBuilder
+
+import akka.actor.ActorSystem
+import akka.event.Logging.{ErrorLevel, InfoLevel}
+import akka.http.scaladsl.model.Uri
+import akka.http.scaladsl.model.Uri.Path
+import akka.http.scaladsl.model.Uri.Query
+import akka.stream.{Attributes, Outlet, SourceShape}
+import akka.stream.scaladsl.Source
+import akka.stream.stage._
+import akka.util.ByteString
+import pureconfig.loadConfigOrThrow
+import whisk.common.Logging
+import whisk.common.LoggingMarkers
+import whisk.common.TransactionId
+import whisk.core.ConfigKeys
+import whisk.core.containerpool.ContainerId
+import whisk.core.containerpool.ContainerAddress
+import whisk.core.containerpool.docker.ProcessRunner
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.blocking
+import scala.concurrent.duration._
+import scala.util.Failure
+import scala.util.Success
+import scala.util.Try
+import spray.json._
+import spray.json.DefaultJsonProtocol._
+import io.fabric8.kubernetes.client.ConfigBuilder
+import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import okhttp3.{Call, Callback, Request, Response}
+import okio.BufferedSource
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+/**
+ * Configuration for kubernetes client command timeouts.
+ */
+case class KubernetesClientTimeoutConfig(run: Duration, rm: Duration, inspect: Duration, logs: Duration)
+
+/**
+ * Serves as interface to the kubectl CLI tool.
+ *
+ * Be cautious with the ExecutionContext passed to this, as the
+ * calls to the CLI are blocking.
+ *
+ * You only need one instance (and you shouldn't get more).
+ */
+class KubernetesClient(
+  timeouts: KubernetesClientTimeoutConfig = loadConfigOrThrow[KubernetesClientTimeoutConfig](
+    ConfigKeys.kubernetesTimeouts))(executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem)
+    extends KubernetesApi
+    with ProcessRunner {
+  implicit private val ec = executionContext
+  implicit private val kubeRestClient = new DefaultKubernetesClient(
+    new ConfigBuilder()
+      .withConnectionTimeout(timeouts.logs.toMillis.toInt)
+      .withRequestTimeout(timeouts.logs.toMillis.toInt)
+      .build())
+
+  // Determines how to run kubectl. Failure to find a kubectl binary implies
+  // a failure to initialize this instance of KubernetesClient.
+  protected def findKubectlCmd(): String = {
+    val alternatives = List("/usr/bin/kubectl", "/usr/local/bin/kubectl")
+    val kubectlBin = Try {
+      alternatives.find(a => Files.isExecutable(Paths.get(a))).get
+    } getOrElse {
+      throw new FileNotFoundException(s"Couldn't locate kubectl binary (tried: ${alternatives.mkString(", ")}).")
+    }
+    kubectlBin
+  }
+  protected val kubectlCmd = Seq(findKubectlCmd)
+
+  def run(name: String, image: String, args: Seq[String] = Seq.empty[String])(
+    implicit transid: TransactionId): Future[ContainerId] = {
+    runCmd(Seq("run", name, s"--image=$image") ++ args, timeouts.run)
+      .map(_ => ContainerId(name))
+  }
+
+  def inspectIPAddress(id: ContainerId)(implicit transid: TransactionId): Future[ContainerAddress] = {
+    Future {
+      blocking {
+        val pod =
+          kubeRestClient.pods().withName(id.asString).waitUntilReady(timeouts.inspect.length, timeouts.inspect.unit)
+        ContainerAddress(pod.getStatus().getPodIP())
+      }
+    }.recoverWith {
+      case e =>
+        log.error(this, s"Failed to get IP of Pod '${id.asString}' within timeout: ${e.getClass} - ${e.getMessage}")
+        Future.failed(new Exception(s"Failed to get IP of Pod '${id.asString}'"))
+    }
+  }
+
+  def rm(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
+    runCmd(Seq("delete", "--now", "pod", id.asString), timeouts.rm).map(_ => ())
+
+  def rm(key: String, value: String)(implicit transid: TransactionId): Future[Unit] =
+    runCmd(Seq("delete", "--now", "pod", "-l", s"$key=$value"), timeouts.rm).map(_ => ())
+
+  def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean = false)(
+    implicit transid: TransactionId): Source[TypedLogLine, Any] = {
+
+    log.debug(this, "Parsing logs from Kubernetes Graph Stageā€¦")
+
+    Source
+      .fromGraph(new KubernetesRestLogSourceStage(id, sinceTime, waitForSentinel))
+      .log("foobar")
+
+  }
+
+  private def runCmd(args: Seq[String], timeout: Duration)(implicit transid: TransactionId): Future[String] = {
+    val cmd = kubectlCmd ++ args
+    val start = transid.started(
+      this,
+      LoggingMarkers.INVOKER_KUBECTL_CMD(args.head),
+      s"running ${cmd.mkString(" ")} (timeout: $timeout)",
+      logLevel = InfoLevel)
+    executeProcess(cmd, timeout).andThen {
+      case Success(_) => transid.finished(this, start)
+      case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel)
+    }
+  }
+}
+
+object KubernetesClient {
+
+  // Necessary, as Kubernetes uses nanosecond precision in logs, but java.time.Instant toString uses milliseconds
+  //%Y-%m-%dT%H:%M:%S.%N%z
+  val K8STimestampFormat = new DateTimeFormatterBuilder()
+    .parseCaseInsensitive()
+    .appendPattern("u-MM-dd")
+    .appendLiteral('T')
+    .appendPattern("HH:mm:ss[.n]")
+    .appendLiteral('Z')
+    .toFormatter()
+    .withZone(ZoneId.of("UTC"))
+
+  def parseK8STimestamp(ts: String): Try[Instant] =
+    Try(Instant.from(K8STimestampFormat.parse(ts)))
+
+  def formatK8STimestamp(ts: Instant): Try[String] =
+    Try(K8STimestampFormat.format(ts))
+}
+
+trait KubernetesApi {
+  def run(name: String, image: String, args: Seq[String] = Seq.empty[String])(
+    implicit transid: TransactionId): Future[ContainerId]
+
+  def inspectIPAddress(id: ContainerId)(implicit transid: TransactionId): Future[ContainerAddress]
+
+  def rm(id: ContainerId)(implicit transid: TransactionId): Future[Unit]
+
+  def rm(key: String, value: String)(implicit transid: TransactionId): Future[Unit]
+
+  def logs(containerId: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean = false)(
+    implicit transid: TransactionId): Source[TypedLogLine, Any]
+}
+
+object KubernetesRestLogSourceStage {
+
+  import KubernetesClient.{formatK8STimestamp, parseK8STimestamp}
+
+  def constructPath(namespace: String, containerId: String): Path =
+    Path / "api" / "v1" / "namespaces" / namespace / "pods" / containerId / "log"
+
+  def constructQuery(sinceTime: Option[Instant], waitForSentinel: Boolean): Query = {
+
+    val sinceTimestamp = sinceTime.flatMap(time => formatK8STimestamp(time).toOption)
+
+    Query(Map("timestamps" -> "true") ++ sinceTimestamp.map(time => "sinceTime" -> time))
+
+  }
+
+  @tailrec
+  def readLines(src: BufferedSource,
+                lastTimestamp: Option[Instant],
+                lines: Seq[TypedLogLine] = Seq.empty[TypedLogLine]): Seq[TypedLogLine] = {
+
+    if (!src.exhausted()) {
+      (for {
+        line <- Option(src.readUtf8Line()) if !line.isEmpty
+        timestampDelimiter = line.indexOf(" ")
+        // Kubernetes is ignoring nanoseconds in sinceTime, so we have to filter additionally here
+        rawTimestamp = line.substring(0, timestampDelimiter)
+        timestamp <- parseK8STimestamp(rawTimestamp).toOption if isRelevantLogLine(lastTimestamp, timestamp)
+        msg = line.substring(timestampDelimiter + 1)
+        stream = "stdout" // TODO - when we can distinguish stderr: https://github.com/kubernetes/kubernetes/issues/28167
+      } yield {
+        TypedLogLine(timestamp, stream, msg)
+      }) match {
+        case Some(logLine) =>
+          readLines(src, Option(logLine.time), lines :+ logLine)
+        case None =>
+          // we may have skipped a line for filtering conditions only; keep going
+          readLines(src, lastTimestamp, lines)
+      }
+    } else {
+      lines
+    }
+
+  }
+
+  def isRelevantLogLine(lastTimestamp: Option[Instant], newTimestamp: Instant): Boolean =
+    lastTimestamp match {
+      case Some(last) =>
+        newTimestamp.isAfter(last)
+      case None =>
+        true
+    }
+
+}
+
+final class KubernetesRestLogSourceStage(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+  implicit val kubeRestClient: DefaultKubernetesClient)
+    extends GraphStage[SourceShape[TypedLogLine]] { stage =>
+
+  import KubernetesRestLogSourceStage._
+
+  val out = Outlet[TypedLogLine]("K8SHttpLogging.out")
+
+  override val shape: SourceShape[TypedLogLine] = SourceShape.of(out)
+
+  override protected def initialAttributes: Attributes = Attributes.name("KubernetesHttpLogSource")
+
+  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+    new GraphStageLogicWithLogging(shape) { logic =>
+
+      private val queue = mutable.Queue.empty[TypedLogLine]
+      private var lastTimestamp = sinceTime
+
+      def fetchLogs(): Unit =
+        try {
+          val path = constructPath(kubeRestClient.getNamespace, id.asString)
+          val query = constructQuery(lastTimestamp, waitForSentinel)
+
+          log.debug("* Fetching K8S HTTP Logs w/ Path: {} Query: {}", path, query)
+
+          val url = Uri(kubeRestClient.getMasterUrl.toString)
+            .withPath(path)
+            .withQuery(query)
+
+          val request = new Request.Builder().get().url(url.toString).build
+
+          kubeRestClient.getHttpClient.newCall(request).enqueue(new LogFetchCallback())
+        } catch {
+          case NonFatal(e) =>
+            onFailure(e)
+            throw e
+        }
+
+      def onFailure(e: Throwable): Unit = e match {
+        case _: SocketTimeoutException =>
+          log.warning("* Logging socket to Kubernetes timed out.") // this should only happen with follow behavior
+        case _ =>
+          log.error(e, "* Retrieving the logs from Kubernetes failed.")
+      }
+
+      val emitCallback: AsyncCallback[Seq[TypedLogLine]] = getAsyncCallback[Seq[TypedLogLine]] {
+        case firstLine +: restOfLines if isAvailable(out) =>
+          pushLine(firstLine)
+          queue ++= restOfLines
+        case lines =>
+          queue ++= lines
+      }
+
+      class LogFetchCallback extends Callback {
+
+        override def onFailure(call: Call, e: IOException): Unit = logic.onFailure(e)
+
+        override def onResponse(call: Call, response: Response): Unit =
+          try {
+            val lines = readLines(response.body.source, lastTimestamp)
+
+            response.body.source.close()
+
+            lines.lastOption.foreach { line =>
+              lastTimestamp = Option(line.time)
+            }
+
+            emitCallback.invoke(lines)
+          } catch {
+            case NonFatal(e) =>
+              log.error(e, "* Reading Kubernetes HTTP Response failed.")
+              logic.onFailure(e)
+              throw e
+          }
+      }
+
+      def pushLine(line: TypedLogLine): Unit = {
+        log.debug("* Pushing a chunk of kubernetes logging: {}", line)
+        push(out, line)
+      }
+
+      setHandler(
+        out,
+        new OutHandler {
+          override def onPull(): Unit = {
+            // if we still have lines queued up, return those; else make a new HTTP read.
+            if (queue.nonEmpty)
+              pushLine(queue.dequeue())
+            else
+              fetchLogs()
+          }
+        })
+    }
+}
+
+protected[core] final case class TypedLogLine(time: Instant, stream: String, log: String) {
+  import KubernetesClient.formatK8STimestamp
+
+  lazy val toJson: JsObject =
+    JsObject("time" -> formatK8STimestamp(time).getOrElse("").toJson, "stream" -> stream.toJson, "log" -> log.toJson)
+
+  lazy val jsonPrinted: String = toJson.compactPrint
+  lazy val jsonSize: Int = jsonPrinted.length
+
+  /**
+   * Returns a ByteString representation of the json for this Log Line
+   */
+  val toByteString = ByteString(jsonPrinted)
+
+  override def toString = s"${formatK8STimestamp(time).get} $stream: ${log.trim}"
+}
+
+protected[core] object TypedLogLine {
+
+  import KubernetesClient.{parseK8STimestamp, K8STimestampFormat}
+
+  def readInstant(json: JsValue): Instant = json match {
+    case JsString(str) =>
+      parseK8STimestamp(str) match {
+        case Success(time) =>
+          time
+        case Failure(e) =>
+          deserializationError(
+            s"Could not parse a java.time.Instant from $str (Expected in format: $K8STimestampFormat: $e")
+      }
+    case _ =>
+      deserializationError(s"Could not parse a java.time.Instant from $json (Expected in format: $K8STimestampFormat)")
+  }
+
+  implicit val typedLogLineFormat = new RootJsonFormat[TypedLogLine] {
+    override def write(obj: TypedLogLine): JsValue = obj.toJson
+
+    override def read(json: JsValue): TypedLogLine = {
+      val obj = json.asJsObject
+      val fields = obj.fields
+      TypedLogLine(readInstant(fields("time")), fields("stream").convertTo[String], fields("log").convertTo[String])
+    }
+  }
+
+}
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
new file mode 100644
index 0000000..4c4ccdc
--- /dev/null
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.kubernetes
+
+import java.time.Instant
+import java.util.concurrent.atomic.AtomicReference
+
+import akka.stream.StreamLimitReachedException
+import akka.stream.scaladsl.Framing.FramingException
+import akka.stream.scaladsl.Source
+import akka.util.ByteString
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import whisk.common.Logging
+import whisk.common.TransactionId
+import whisk.core.containerpool.Container
+import whisk.core.containerpool.WhiskContainerStartupError
+import whisk.core.containerpool.ContainerId
+import whisk.core.containerpool.ContainerAddress
+import whisk.core.containerpool.docker.{CompleteAfterOccurrences, DockerContainer, OccurrencesNotFoundException}
+import whisk.core.entity.ByteSize
+import whisk.core.entity.size._
+import whisk.http.Messages
+
+object KubernetesContainer {
+
+  /**
+   * Creates a container running in kubernetes
+   *
+   * @param transid transaction creating the container
+   * @param image image to create the container from
+   * @param userProvidedImage whether the image is provided by the user
+   *     or is an OpenWhisk provided image
+   * @param labels labels to set on the container
+   * @param name optional name for the container
+   * @return a Future which either completes with a KubernetesContainer or one of two specific failures
+   */
+  def create(transid: TransactionId,
+             name: String,
+             image: String,
+             userProvidedImage: Boolean = false,
+             memory: ByteSize = 256.MB,
+             environment: Map[String, String] = Map(),
+             labels: Map[String, String] = Map())(implicit kubernetes: KubernetesApi,
+                                                  ec: ExecutionContext,
+                                                  log: Logging): Future[KubernetesContainer] = {
+    implicit val tid = transid
+
+    val podName = name.replace("_", "-").replaceAll("[()]", "").toLowerCase()
+
+    val environmentArgs = environment.flatMap {
+      case (key, value) => Seq("--env", s"$key=$value")
+    }.toSeq
+
+    val labelArgs = labels.map {
+      case (key, value) => s"$key=$value"
+    } match {
+      case Seq() => Seq()
+      case pairs => Seq("-l") ++ pairs
+    }
+
+    val args = Seq("--generator", "run-pod/v1", "--restart", "Always", "--limits", s"memory=${memory.toMB}Mi") ++ environmentArgs ++ labelArgs
+
+    for {
+      id <- kubernetes.run(podName, image, args).recoverWith {
+        case _ => Future.failed(WhiskContainerStartupError(Messages.resourceProvisionError))
+      }
+      ip <- kubernetes.inspectIPAddress(id).recoverWith {
+        // remove the container immediately if inspect failed as
+        // we cannot recover that case automatically
+        case _ =>
+          kubernetes.rm(id)
+          Future.failed(WhiskContainerStartupError(Messages.resourceProvisionError))
+      }
+    } yield new KubernetesContainer(id, ip)
+  }
+
+}
+
+/**
+ * Represents a container as run by kubernetes.
+ *
+ * This class contains OpenWhisk specific behavior and as such does not necessarily
+ * use kubernetes commands to achieve the effects needed.
+ *
+ * @constructor
+ * @param id the id of the container
+ * @param addr the ip & port of the container
+ */
+class KubernetesContainer(protected val id: ContainerId, protected val addr: ContainerAddress)(
+  implicit kubernetes: KubernetesApi,
+  protected val ec: ExecutionContext,
+  protected val logging: Logging)
+    extends Container {
+
+  /** The last read timestamp in the log file */
+  private val lastTimestamp = new AtomicReference[Option[Instant]](None)
+
+  protected val waitForLogs: FiniteDuration = 2.seconds
+
+  // no-op under Kubernetes
+  def suspend()(implicit transid: TransactionId): Future[Unit] = Future.successful({})
+
+  // no-op under Kubernetes
+  def resume()(implicit transid: TransactionId): Future[Unit] = Future.successful({})
+
+  override def destroy()(implicit transid: TransactionId): Future[Unit] = {
+    super.destroy()
+    kubernetes.rm(id)
+  }
+
+  private val stringSentinel = DockerContainer.ActivationSentinel.utf8String
+
+  def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Source[ByteString, Any] = {
+
+    kubernetes
+      .logs(id, lastTimestamp.get, waitForSentinel)
+      .limitWeighted(limit.toBytes) { obj =>
+        // Adding + 1 since we know there's a newline byte being read
+        obj.jsonSize.toLong + 1
+      }
+      .via(new CompleteAfterOccurrences(_.log == stringSentinel, 2, waitForSentinel))
+      .recover {
+        case _: StreamLimitReachedException =>
+          // While the stream has already ended by failing the limitWeighted stage above, we inject a truncation
+          // notice downstream, which will be processed as usual. This will be the last element of the stream.
+          TypedLogLine(Instant.now, "stderr", Messages.truncateLogs(limit))
+        case _: OccurrencesNotFoundException | _: FramingException =>
+          // Stream has already ended and we insert a notice that data might be missing from the logs. While a
+          // FramingException can also mean exceeding the limits, we cannot decide which case happened so we resort
+          // to the general error message. This will be the last element of the stream.
+          TypedLogLine(Instant.now, "stderr", Messages.logFailure)
+      }
+      .takeWithin(waitForLogs)
+      .map { line =>
+        lastTimestamp.set(Some(line.time))
+        line.toByteString
+      }
+  }
+
+}
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
new file mode 100644
index 0000000..219c8a3
--- /dev/null
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.kubernetes
+
+import akka.actor.ActorSystem
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+import whisk.common.Logging
+import whisk.common.TransactionId
+import whisk.core.containerpool.Container
+import whisk.core.containerpool.ContainerFactory
+import whisk.core.containerpool.ContainerFactoryProvider
+import whisk.core.entity.ByteSize
+import whisk.core.entity.ExecManifest.ImageName
+import whisk.core.entity.InstanceId
+import whisk.core.WhiskConfig
+
+class KubernetesContainerFactory(label: String, config: WhiskConfig)(implicit actorSystem: ActorSystem,
+                                                                     ec: ExecutionContext,
+                                                                     logging: Logging)
+    extends ContainerFactory {
+
+  implicit val kubernetes = new KubernetesClient()(ec)
+
+  /** Perform cleanup on init */
+  override def init(): Unit = cleanup()
+
+  override def cleanup() = {
+    logging.info(this, "Cleaning up function runtimes")
+    val cleaning = kubernetes.rm("invoker", label)(TransactionId.invokerNanny)
+    Await.ready(cleaning, 30.seconds)
+  }
+
+  override def createContainer(tid: TransactionId,
+                               name: String,
+                               actionImage: ImageName,
+                               userProvidedImage: Boolean,
+                               memory: ByteSize)(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
+    val image = if (userProvidedImage) {
+      actionImage.publicImageName
+    } else {
+      actionImage.localImageName(config.dockerRegistry, config.dockerImagePrefix, Some(config.dockerImageTag))
+    }
+
+    KubernetesContainer.create(
+      tid,
+      name,
+      image,
+      userProvidedImage,
+      memory,
+      environment = Map("__OW_API_HOST" -> config.wskApiHost),
+      labels = Map("invoker" -> label))
+  }
+}
+
+object KubernetesContainerFactoryProvider extends ContainerFactoryProvider {
+  override def getContainerFactory(actorSystem: ActorSystem,
+                                   logging: Logging,
+                                   config: WhiskConfig,
+                                   instance: InstanceId,
+                                   parameters: Map[String, Set[String]]): ContainerFactory =
+    new KubernetesContainerFactory(s"invoker${instance.toInt}", config)(actorSystem, actorSystem.dispatcher, logging)
+}
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
index ea77810..25bc0c9 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -48,7 +48,29 @@ import whisk.core.entity.ActivationResponse.ContainerResponse
 import whisk.core.entity.ActivationResponse.Timeout
 import whisk.core.entity.size._
 import whisk.http.Messages
-import whisk.core.entity.size._
+
+import DockerContainerTests._
+
+object DockerContainerTests {
+
+  /** Awaits the given future, throws the exception enclosed in Failure. */
+  def await[A](f: Future[A], timeout: FiniteDuration = 500.milliseconds) = Await.result[A](f, timeout)
+
+  /** Creates an interval starting at EPOCH with the given duration. */
+  def intervalOf(duration: FiniteDuration) = Interval(Instant.EPOCH, Instant.ofEpochMilli(duration.toMillis))
+
+  def toRawLog(log: Seq[LogLine], appendSentinel: Boolean = true): ByteString = {
+    val appendedLog = if (appendSentinel) {
+      val lastTime = log.lastOption.map { case LogLine(time, _, _) => time }.getOrElse(Instant.EPOCH.toString)
+      log :+
+        LogLine(lastTime, "stderr", s"${DockerContainer.ActivationSentinel.utf8String}\n") :+
+        LogLine(lastTime, "stdout", s"${DockerContainer.ActivationSentinel.utf8String}\n")
+    } else {
+      log
+    }
+    ByteString(appendedLog.map(_.toJson.compactPrint).mkString("", "\n", "\n"))
+  }
+}
 
 /**
  * Unit tests for ContainerPool schedule
@@ -69,9 +91,6 @@ class DockerContainerTests
 
   implicit val materializer: ActorMaterializer = ActorMaterializer()
 
-  /** Awaits the given future, throws the exception enclosed in Failure. */
-  def await[A](f: Future[A], timeout: FiniteDuration = 500.milliseconds) = Await.result[A](f, timeout)
-
   /** Reads logs into memory and awaits them */
   def awaitLogs(source: Source[ByteString, Any], timeout: FiniteDuration = 500.milliseconds): Vector[String] =
     Await.result(source.via(DockerToActivationLogStore.toFormattedString).runWith(Sink.seq[String]), timeout).toVector
@@ -100,9 +119,6 @@ class DockerContainerTests
     }
   }
 
-  /** Creates an interval starting at EPOCH with the given duration. */
-  def intervalOf(duration: FiniteDuration) = Interval(Instant.EPOCH, Instant.ofEpochMilli(duration.toMillis))
-
   behavior of "DockerContainer"
 
   implicit val transid = TransactionId.testing
@@ -433,18 +449,6 @@ class DockerContainerTests
   /*
    * LOGS
    */
-  def toRawLog(log: Seq[LogLine], appendSentinel: Boolean = true): ByteString = {
-    val appendedLog = if (appendSentinel) {
-      val lastTime = log.lastOption.map { case LogLine(time, _, _) => time }.getOrElse(Instant.EPOCH.toString)
-      log :+
-        LogLine(lastTime, "stderr", s"${DockerContainer.ActivationSentinel.utf8String}\n") :+
-        LogLine(lastTime, "stdout", s"${DockerContainer.ActivationSentinel.utf8String}\n")
-    } else {
-      log
-    }
-    ByteString(appendedLog.map(_.toJson.compactPrint).mkString("", "\n", "\n"))
-  }
-
   it should "read a simple log with sentinel" in {
     val expectedLogEntry = LogLine(Instant.EPOCH.toString, "stdout", "This is a log entry.\n")
     val rawLog = toRawLog(Seq(expectedLogEntry), appendSentinel = true)
diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
new file mode 100644
index 0000000..0b14042
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.kubernetes.test
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.{Concat, Sink, Source}
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import org.junit.runner.RunWith
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.concurrent.Eventually
+import org.scalatest.FlatSpec
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.Matchers
+import org.scalatest.time.{Seconds, Span}
+import common.{StreamLogging, WskActorSystem}
+import okio.Buffer
+import whisk.common.LogMarker
+import whisk.common.LoggingMarkers.INVOKER_KUBECTL_CMD
+import whisk.common.TransactionId
+import whisk.core.containerpool.{ContainerAddress, ContainerId}
+import whisk.core.containerpool.kubernetes.{KubernetesApi, KubernetesClient, KubernetesRestLogSourceStage, TypedLogLine}
+import whisk.core.containerpool.docker.ProcessRunningException
+
+import scala.collection.mutable
+import scala.collection.immutable
+
+@RunWith(classOf[JUnitRunner])
+class KubernetesClientTests
+    extends FlatSpec
+    with Matchers
+    with StreamLogging
+    with BeforeAndAfterEach
+    with Eventually
+    with WskActorSystem {
+
+  import KubernetesClientTests._
+
+  implicit val materializer: ActorMaterializer = ActorMaterializer()
+
+  /** Reads logs into memory and awaits them */
+  def awaitLogs(source: Source[TypedLogLine, Any], timeout: FiniteDuration = 1000.milliseconds): Vector[TypedLogLine] =
+    Await.result(source.runWith(Sink.seq[TypedLogLine]), timeout).toVector
+
+  override def beforeEach = stream.reset()
+
+  implicit override val patienceConfig = PatienceConfig(timeout = scaled(Span(5, Seconds)))
+
+  implicit val transid = TransactionId.testing
+  val id = ContainerId("55db56ee082239428b27d3728b4dd324c09068458aad9825727d5bfc1bba6d52")
+
+  val commandTimeout = 500.milliseconds
+  def await[A](f: Future[A], timeout: FiniteDuration = commandTimeout) = Await.result(f, timeout)
+
+  val kubectlCommand = "kubectl"
+
+  /** Returns a KubernetesClient with a mocked result for 'executeProcess' */
+  def kubernetesClient(fixture: => Future[String]) = new KubernetesClient()(global) {
+    override def findKubectlCmd() = kubectlCommand
+    override def executeProcess(args: Seq[String], timeout: Duration)(implicit ec: ExecutionContext, as: ActorSystem) =
+      fixture
+  }
+
+  behavior of "KubernetesClient"
+
+  it should "write proper log markers on a successful command" in {
+    // a dummy string works here as we do not assert any output
+    // from the methods below
+    val stdout = "stdout"
+    val client = kubernetesClient { Future.successful(stdout) }
+
+    /** Awaits the command and checks for proper logging. */
+    def runAndVerify(f: Future[_], cmd: String, args: Seq[String]) = {
+      val result = await(f)
+
+      logLines.head should include((Seq(kubectlCommand, cmd) ++ args).mkString(" "))
+
+      val start = LogMarker.parse(logLines.head)
+      start.token shouldBe INVOKER_KUBECTL_CMD(cmd)
+
+      val end = LogMarker.parse(logLines.last)
+      end.token shouldBe INVOKER_KUBECTL_CMD(cmd).asFinish
+
+      stream.reset()
+      result
+    }
+
+    runAndVerify(client.rm(id), "delete", Seq("--now", "pod", id.asString))
+
+    val image = "image"
+    val name = "name"
+    val expected = Seq(name, s"--image=$image")
+    runAndVerify(client.run(name, image), "run", expected) shouldBe ContainerId(name)
+  }
+
+  it should "write proper log markers on a failing command" in {
+    val client = kubernetesClient { Future.failed(new RuntimeException()) }
+
+    /** Awaits the command, asserts the exception and checks for proper logging. */
+    def runAndVerify(f: Future[_], cmd: String) = {
+      a[RuntimeException] should be thrownBy await(f)
+
+      val start = LogMarker.parse(logLines.head)
+      start.token shouldBe INVOKER_KUBECTL_CMD(cmd)
+
+      val end = LogMarker.parse(logLines.last)
+      end.token shouldBe INVOKER_KUBECTL_CMD(cmd).asError
+
+      stream.reset()
+    }
+
+    runAndVerify(client.rm(id), "delete")
+    runAndVerify(client.run("name", "image"), "run")
+  }
+
+  it should "fail with ProcessRunningException when run returns with exit code !=125 or no container ID" in {
+    def runAndVerify(pre: ProcessRunningException, clue: String) = {
+      val client = kubernetesClient { Future.failed(pre) }
+      withClue(s"${clue} - exitCode = ${pre.exitCode}, stdout = '${pre.stdout}', stderr = '${pre.stderr}': ") {
+        the[ProcessRunningException] thrownBy await(client.run("name", "image")) shouldBe pre
+      }
+    }
+
+    Seq[(ProcessRunningException, String)](
+      (ProcessRunningException(126, id.asString, "Unknown command"), "Exit code not 125"),
+      (ProcessRunningException(125, "", "Unknown flag: --foo"), "No container ID"),
+      (ProcessRunningException(1, "", ""), "Exit code not 125 and no container ID")).foreach {
+      case (pre, clue) => runAndVerify(pre, clue)
+    }
+  }
+
+  val firstLog = """2018-02-06T00:00:18.419889342Z first activation
+                   |2018-02-06T00:00:18.419929471Z XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+                   |2018-02-06T00:00:18.419988733Z XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+                   |""".stripMargin
+  val secondLog = """2018-02-06T00:09:35.38267193Z second activation
+                    |2018-02-06T00:09:35.382990278Z XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+                    |2018-02-06T00:09:35.383116503Z XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+                    |""".stripMargin
+
+  def firstSource(lastTimestamp: Option[Instant] = None): Source[TypedLogLine, Any] =
+    Source(
+      KubernetesRestLogSourceStage
+        .readLines(new Buffer().writeUtf8(firstLog), lastTimestamp, List.empty)
+        .to[immutable.Seq])
+
+  def secondSource(lastTimestamp: Option[Instant] = None): Source[TypedLogLine, Any] =
+    Source(
+      KubernetesRestLogSourceStage
+        .readLines(new Buffer().writeUtf8(secondLog), lastTimestamp, List.empty)
+        .to[immutable.Seq])
+
+  it should "return all logs when no sinceTime passed" in {
+    val client = new TestKubernetesClient {
+      override def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+        implicit transid: TransactionId): Source[TypedLogLine, Any] = {
+        firstSource()
+      }
+    }
+    val logs = awaitLogs(client.logs(id, None))
+    logs should have size 3
+    logs(0) shouldBe TypedLogLine("2018-02-06T00:00:18.419889342Z", "stdout", "first activation")
+    logs(2) shouldBe TypedLogLine("2018-02-06T00:00:18.419988733Z", "stdout", "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
+  }
+
+  it should "return all logs after the one matching sinceTime" in {
+
+    val testDate: Option[Instant] = "2018-02-06T00:00:18.419988733Z"
+    val client = new TestKubernetesClient {
+      override def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+        implicit transid: TransactionId): Source[TypedLogLine, Any] = {
+        Source.combine(firstSource(testDate), secondSource(testDate))(Concat(_))
+      }
+    }
+    val logs = awaitLogs(client.logs(id, testDate))
+    logs should have size 3
+    logs(0) shouldBe TypedLogLine("2018-02-06T00:09:35.38267193Z", "stdout", "second activation")
+    logs(2) shouldBe TypedLogLine("2018-02-06T00:09:35.383116503Z", "stdout", "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
+  }
+
+  it should "return all logs if none match sinceTime" in {
+    val testDate: Option[Instant] = "2018-02-06T00:00:18.419988733Z"
+    val client = new TestKubernetesClient {
+      override def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+        implicit transid: TransactionId): Source[TypedLogLine, Any] = {
+        secondSource(testDate)
+      }
+    }
+    val logs = awaitLogs(client.logs(id, testDate))
+    logs should have size 3
+    logs(0) shouldBe TypedLogLine("2018-02-06T00:09:35.38267193Z", "stdout", "second activation")
+    logs(2) shouldBe TypedLogLine("2018-02-06T00:09:35.383116503Z", "stdout", "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
+  }
+
+}
+
+object KubernetesClientTests {
+  import scala.language.implicitConversions
+
+  implicit def strToDate(str: String): Option[Instant] =
+    KubernetesClient.parseK8STimestamp(str).toOption
+
+  implicit def strToInstant(str: String): Instant =
+    strToDate(str).get
+
+  class TestKubernetesClient extends KubernetesApi {
+    var runs = mutable.Buffer.empty[(String, String, Seq[String])]
+    var inspects = mutable.Buffer.empty[ContainerId]
+    var rms = mutable.Buffer.empty[ContainerId]
+    var rmByLabels = mutable.Buffer.empty[(String, String)]
+    var logCalls = mutable.Buffer.empty[(ContainerId, Option[Instant])]
+
+    def run(name: String, image: String, args: Seq[String] = Seq.empty[String])(
+      implicit transid: TransactionId): Future[ContainerId] = {
+      runs += ((name, image, args))
+      Future.successful(ContainerId("testId"))
+    }
+
+    def inspectIPAddress(id: ContainerId)(implicit transid: TransactionId): Future[ContainerAddress] = {
+      inspects += id
+      Future.successful(ContainerAddress("testIp"))
+    }
+
+    def rm(id: ContainerId)(implicit transid: TransactionId): Future[Unit] = {
+      rms += id
+      Future.successful(())
+    }
+
+    def rm(key: String, value: String)(implicit transid: TransactionId): Future[Unit] = {
+      rmByLabels += ((key, value))
+      Future.successful(())
+    }
+    def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean = false)(
+      implicit transid: TransactionId): Source[TypedLogLine, Any] = {
+      logCalls += ((id, sinceTime))
+      Source(List.empty[TypedLogLine])
+    }
+  }
+}
diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
new file mode 100644
index 0000000..80dc0a3
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.kubernetes.test
+
+import java.io.IOException
+import java.time.{Instant, ZoneId}
+
+import akka.NotUsed
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.{Flow, Sink, Source}
+import akka.util.ByteString
+import common.TimingHelpers
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.concurrent.Future
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.FlatSpec
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.Matchers
+import common.{StreamLogging, WskActorSystem}
+import spray.json._
+import whisk.common.LoggingMarkers._
+import whisk.common.LogMarker
+import whisk.common.TransactionId
+import whisk.core.containerpool._
+import whisk.core.containerpool.kubernetes._
+import whisk.core.containerpool.docker._
+import whisk.core.entity.ActivationResponse
+import whisk.core.entity.ActivationResponse.ContainerResponse
+import whisk.core.entity.ActivationResponse.Timeout
+import whisk.core.entity.size._
+import whisk.http.Messages
+import whisk.core.containerpool.docker.test.DockerContainerTests._
+
+import scala.collection.{immutable, mutable}
+
+/**
+ * Unit tests for ContainerPool schedule
+ */
+@RunWith(classOf[JUnitRunner])
+class KubernetesContainerTests
+    extends FlatSpec
+    with Matchers
+    with MockFactory
+    with StreamLogging
+    with BeforeAndAfterEach
+    with WskActorSystem
+    with TimingHelpers {
+
+  import KubernetesClientTests.TestKubernetesClient
+  import KubernetesContainerTests._
+
+  override def beforeEach() = {
+    stream.reset()
+  }
+
+  implicit val materializer: ActorMaterializer = ActorMaterializer()
+
+  def instantDT(instant: Instant): Instant = Instant.from(instant.atZone(ZoneId.of("GMT+0")))
+
+  val Epoch = Instant.EPOCH
+  val EpochDateTime = instantDT(Epoch)
+
+  /** Transforms chunked JsObjects into formatted strings */
+  val toFormattedString: Flow[ByteString, String, NotUsed] =
+    Flow[ByteString].map(_.utf8String.parseJson.convertTo[TypedLogLine].toString)
+
+  /** Reads logs into memory and awaits them */
+  def awaitLogs(source: Source[ByteString, Any], timeout: FiniteDuration = 500.milliseconds): Vector[String] =
+    Await.result(source.via(toFormattedString).runWith(Sink.seq[String]), timeout).toVector
+
+  val containerId = ContainerId("id")
+
+  /**
+   * Constructs a testcontainer with overridden IO methods. Results of the override can be provided
+   * as parameters.
+   */
+  def kubernetesContainer(id: ContainerId = containerId, addr: ContainerAddress = ContainerAddress("ip"))(
+    ccRes: Future[RunResult] =
+      Future.successful(RunResult(intervalOf(1.millisecond), Right(ContainerResponse(true, "", None)))),
+    awaitLogs: FiniteDuration = 2.seconds)(implicit kubernetes: KubernetesApi): KubernetesContainer = {
+
+    new KubernetesContainer(id, addr) {
+      override protected def callContainer(
+        path: String,
+        body: JsObject,
+        timeout: FiniteDuration,
+        retry: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
+        ccRes
+      }
+      override protected val waitForLogs = awaitLogs
+    }
+  }
+
+  behavior of "KubernetesContainer"
+
+  implicit val transid = TransactionId.testing
+  val parameters = Map(
+    "--cap-drop" -> Set("NET_RAW", "NET_ADMIN"),
+    "--ulimit" -> Set("nofile=1024:1024"),
+    "--pids-limit" -> Set("1024"))
+
+  /*
+   * CONTAINER CREATION
+   */
+  it should "create a new instance" in {
+    implicit val kubernetes = new TestKubernetesClient
+
+    val image = "image"
+    val userProvidedImage = false
+    val environment = Map("test" -> "hi")
+    val labels = Map("invoker" -> "0")
+    val name = "my_Container(1)"
+    val container = KubernetesContainer.create(
+      transid = transid,
+      image = image,
+      userProvidedImage = userProvidedImage,
+      environment = environment,
+      labels = labels,
+      name = name)
+
+    await(container)
+
+    kubernetes.runs should have size 1
+    kubernetes.inspects should have size 1
+    kubernetes.rms should have size 0
+
+    val (testName, testImage, testArgs) = kubernetes.runs.head
+    testName shouldBe "my-container1"
+    testImage shouldBe image
+    testArgs shouldBe Seq(
+      "--generator",
+      "run-pod/v1",
+      "--restart",
+      "Always",
+      "--limits",
+      "memory=256Mi",
+      "--env",
+      "test=hi",
+      "-l",
+      "invoker=0")
+  }
+
+  it should "pull a user provided image before creating the container" in {
+    implicit val kubernetes = new TestKubernetesClient
+
+    val container =
+      KubernetesContainer.create(transid = transid, name = "name", image = "image", userProvidedImage = true)
+    await(container)
+
+    kubernetes.runs should have size 1
+    kubernetes.inspects should have size 1
+    kubernetes.rms should have size 0
+  }
+
+  it should "remove the container if inspect fails" in {
+    implicit val kubernetes = new TestKubernetesClient {
+      override def inspectIPAddress(id: ContainerId)(implicit transid: TransactionId): Future[ContainerAddress] = {
+        inspects += id
+        Future.failed(new RuntimeException())
+      }
+    }
+
+    val container = KubernetesContainer.create(transid = transid, name = "name", image = "image")
+    a[WhiskContainerStartupError] should be thrownBy await(container)
+
+    kubernetes.runs should have size 1
+    kubernetes.inspects should have size 1
+    kubernetes.rms should have size 1
+  }
+
+  it should "provide a proper error if run fails for blackbox containers" in {
+    implicit val kubernetes = new TestKubernetesClient {
+      override def run(name: String, image: String, args: Seq[String])(
+        implicit transid: TransactionId): Future[ContainerId] = {
+        runs += ((name, image, args))
+        Future.failed(ProcessRunningException(1, "", ""))
+      }
+    }
+
+    val container =
+      KubernetesContainer.create(transid = transid, name = "name", image = "image", userProvidedImage = true)
+    a[WhiskContainerStartupError] should be thrownBy await(container)
+
+    kubernetes.runs should have size 1
+    kubernetes.inspects should have size 0
+    kubernetes.rms should have size 0
+  }
+
+  it should "provide a proper error if inspect fails for blackbox containers" in {
+    implicit val kubernetes = new TestKubernetesClient {
+      override def inspectIPAddress(id: ContainerId)(implicit transid: TransactionId): Future[ContainerAddress] = {
+        inspects += id
+        Future.failed(new RuntimeException())
+      }
+    }
+
+    val container =
+      KubernetesContainer.create(transid = transid, name = "name", image = "image", userProvidedImage = true)
+    a[WhiskContainerStartupError] should be thrownBy await(container)
+
+    kubernetes.runs should have size 1
+    kubernetes.inspects should have size 1
+    kubernetes.rms should have size 1
+  }
+
+  /*
+   * KUBERNETES COMMANDS
+   */
+  it should "destroy a container via Kubernetes" in {
+    implicit val kubernetes = stub[KubernetesApi]
+
+    val id = ContainerId("id")
+    val container = new KubernetesContainer(id, ContainerAddress("ip"))
+
+    container.destroy()
+
+    (kubernetes.rm(_: ContainerId)(_: TransactionId)).verify(id, transid)
+  }
+
+  /*
+   * INITIALIZE
+   *
+   * Only tests for quite simple cases. Disambiguation of errors is delegated to ActivationResponse
+   * and so are the tests for those.
+   */
+  it should "initialize a container" in {
+    implicit val kubernetes = stub[KubernetesApi]
+
+    val initTimeout = 1.second
+    val interval = intervalOf(1.millisecond)
+    val container = kubernetesContainer() {
+      Future.successful(RunResult(interval, Right(ContainerResponse(true, "", None))))
+    }
+
+    val initInterval = container.initialize(JsObject(), initTimeout)
+    await(initInterval, initTimeout) shouldBe interval
+
+    // assert the starting log is there
+    val start = LogMarker.parse(logLines.head)
+    start.token shouldBe INVOKER_ACTIVATION_INIT
+
+    // assert the end log is there
+    val end = LogMarker.parse(logLines.last)
+    end.token shouldBe INVOKER_ACTIVATION_INIT.asFinish
+    end.deltaToMarkerStart shouldBe Some(interval.duration.toMillis)
+  }
+
+  it should "properly deal with a timeout during initialization" in {
+    implicit val kubernetes = stub[KubernetesApi]
+
+    val initTimeout = 1.second
+    val interval = intervalOf(initTimeout + 1.nanoseconds)
+
+    val container = kubernetesContainer() {
+      Future.successful(RunResult(interval, Left(Timeout())))
+    }
+
+    val init = container.initialize(JsObject(), initTimeout)
+
+    val error = the[InitializationError] thrownBy await(init, initTimeout)
+    error.interval shouldBe interval
+    error.response.statusCode shouldBe ActivationResponse.ApplicationError
+
+    // assert the finish log is there
+    val end = LogMarker.parse(logLines.last)
+    end.token shouldBe INVOKER_ACTIVATION_INIT.asFinish
+  }
+
+  /*
+   * RUN
+   *
+   * Only tests for quite simple cases. Disambiguation of errors is delegated to ActivationResponse
+   * and so are the tests for those.
+   */
+  it should "run a container" in {
+    implicit val kubernetes = stub[KubernetesApi]
+
+    val interval = intervalOf(1.millisecond)
+    val result = JsObject()
+    val container = kubernetesContainer() {
+      Future.successful(RunResult(interval, Right(ContainerResponse(true, result.compactPrint, None))))
+    }
+
+    val runResult = container.run(JsObject(), JsObject(), 1.second)
+    await(runResult) shouldBe (interval, ActivationResponse.success(Some(result)))
+
+    // assert the starting log is there
+    val start = LogMarker.parse(logLines.head)
+    start.token shouldBe INVOKER_ACTIVATION_RUN
+
+    // assert the end log is there
+    val end = LogMarker.parse(logLines.last)
+    end.token shouldBe INVOKER_ACTIVATION_RUN.asFinish
+    end.deltaToMarkerStart shouldBe Some(interval.duration.toMillis)
+  }
+
+  it should "properly deal with a timeout during run" in {
+    implicit val kubernetes = stub[KubernetesApi]
+
+    val runTimeout = 1.second
+    val interval = intervalOf(runTimeout + 1.nanoseconds)
+
+    val container = kubernetesContainer() {
+      Future.successful(RunResult(interval, Left(Timeout())))
+    }
+
+    val runResult = container.run(JsObject(), JsObject(), runTimeout)
+    await(runResult) shouldBe (interval, ActivationResponse.applicationError(
+      Messages.timedoutActivation(runTimeout, false)))
+
+    // assert the finish log is there
+    val end = LogMarker.parse(logLines.last)
+    end.token shouldBe INVOKER_ACTIVATION_RUN.asFinish
+  }
+
+  /*
+   * LOGS
+   */
+  it should "read a simple log with sentinel" in {
+    val expectedLogEntry = TypedLogLine(currentTsp, "stdout", "This is a log entry.")
+    val logSrc = logSource(expectedLogEntry, appendSentinel = true)
+
+    implicit val kubernetes = new TestKubernetesClient {
+      override def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+        implicit transid: TransactionId): Source[TypedLogLine, Any] = {
+        logCalls += ((id, sinceTime))
+        logSrc
+      }
+    }
+
+    val container = kubernetesContainer(id = containerId)()
+    // Read with tight limit to verify that no truncation occurs TODO: Need to figure out how to handle this with the Source-based kubernetes logs
+    val processedLogs = awaitLogs(container.logs(limit = 4096.B, waitForSentinel = true))
+
+    kubernetes.logCalls should have size 1
+    val (id, sinceTime) = kubernetes.logCalls(0)
+    id shouldBe containerId
+    sinceTime shouldBe None
+
+    processedLogs should have size 1
+    processedLogs shouldBe Vector(expectedLogEntry.rawString)
+  }
+
+  it should "read a simple log without sentinel" in {
+    val expectedLogEntry = TypedLogLine(currentTsp, "stdout", "This is a log entry.")
+    val logSrc = logSource(expectedLogEntry, appendSentinel = false)
+
+    implicit val kubernetes = new TestKubernetesClient {
+      override def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+        implicit transid: TransactionId): Source[TypedLogLine, Any] = {
+        logCalls += ((id, sinceTime))
+        logSrc
+      }
+    }
+
+    val container = kubernetesContainer(id = containerId)()
+    // Read without tight limit so that the full read result is processed
+    val processedLogs = awaitLogs(container.logs(limit = 1.MB, waitForSentinel = false))
+
+    kubernetes.logCalls should have size 1
+    val (id, sinceTime) = kubernetes.logCalls(0)
+    id shouldBe containerId
+    sinceTime shouldBe None
+
+    processedLogs should have size 1
+    processedLogs shouldBe Vector(expectedLogEntry.rawString)
+  }
+
+  it should "fail log reading if error occurs during file reading" in {
+    implicit val kubernetes = new TestKubernetesClient {
+      override def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+        implicit transid: TransactionId): Source[TypedLogLine, Any] = {
+        logCalls += ((containerId, sinceTime))
+        Source.failed(new IOException)
+      }
+    }
+
+    val container = kubernetesContainer()()
+    an[IOException] should be thrownBy awaitLogs(container.logs(limit = 1.MB, waitForSentinel = true))
+
+    kubernetes.logCalls should have size 1
+    val (id, sinceTime) = kubernetes.logCalls(0)
+    id shouldBe containerId
+    sinceTime shouldBe None
+  }
+
+  it should "read two consecutive logs with sentinel" in {
+    val firstLog = TypedLogLine(Instant.EPOCH, "stdout", "This is the first log.")
+    val secondLog = TypedLogLine(Instant.EPOCH.plusSeconds(1l), "stderr", "This is the second log.")
+    val logSources = mutable.Queue(logSource(firstLog, true), logSource(secondLog, true))
+
+    implicit val kubernetes = new TestKubernetesClient {
+      override def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+        implicit transid: TransactionId): Source[TypedLogLine, Any] = {
+        logCalls += ((id, sinceTime))
+        logSources.dequeue()
+      }
+    }
+
+    val container = kubernetesContainer()()
+    // Read without tight limit so that the full read result is processed
+    val processedFirstLog = awaitLogs(container.logs(limit = 1.MB, waitForSentinel = true))
+    val processedSecondLog = awaitLogs(container.logs(limit = 1.MB, waitForSentinel = true))
+
+    kubernetes.logCalls should have size 2
+    val (_, sinceTime1) = kubernetes.logCalls(0)
+    sinceTime1 shouldBe None
+    val (_, sinceTime2) = kubernetes.logCalls(1)
+    sinceTime2 shouldBe Some(EpochDateTime) // second read should start behind the first line
+
+    processedFirstLog should have size 1
+    processedFirstLog shouldBe Vector(firstLog.rawString)
+    processedSecondLog should have size 1
+    processedSecondLog shouldBe Vector(secondLog.rawString)
+
+  }
+
+  it should "eventually terminate even if no sentinels can be found" in {
+    val expectedLog = TypedLogLine(currentTsp, "stdout", s"This is log entry.")
+    val rawLog = toLogs(expectedLog, appendSentinel = false)
+
+    rawLog should have size 1
+
+    implicit val kubernetes = new TestKubernetesClient {
+      override def logs(containerId: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+        implicit transid: TransactionId): Source[TypedLogLine, Any] = {
+        logCalls += ((containerId, sinceTime))
+        // "Fakes" an infinite source with only 1 entry
+        Source.tick(0.milliseconds, 10.seconds, rawLog.head)
+      }
+    }
+
+    val waitForLogs = 100.milliseconds
+    val container = kubernetesContainer()(awaitLogs = waitForLogs)
+    // Read without tight limit so that the full read result is processed
+
+    val (interval, processedLog) = durationOf(awaitLogs(container.logs(limit = 1.MB, waitForSentinel = true)))
+
+    interval.toMillis should (be >= waitForLogs.toMillis and be < (waitForLogs * 2).toMillis)
+
+    kubernetes.logCalls should have size 1
+
+    /*    processedLog should have size expectedLog.length
+    processedLog shouldBe expectedLog.map(_.toFormattedString)*/
+  }
+
+  it should "include an incomplete warning if sentinels have not been found only if we wait for sentinels" in {
+    val expectedLogEntry =
+      TypedLogLine(currentTsp, "stdout", "This is a log entry.")
+
+    implicit val kubernetes = new TestKubernetesClient {
+      override def logs(containerId: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+        implicit transid: TransactionId): Source[TypedLogLine, Any] = {
+        logCalls += ((containerId, sinceTime))
+        logSource(Seq(expectedLogEntry, expectedLogEntry), appendSentinel = false)
+      }
+    }
+
+    val waitForLogs = 100.milliseconds
+    val container = kubernetesContainer()(awaitLogs = waitForLogs)
+    // Read with tight limit to verify that no truncation occurs
+    val processedLogs = awaitLogs(container.logs(limit = 4096.B, waitForSentinel = true))
+
+    kubernetes.logCalls should have size 1
+    val (id, sinceTime) = kubernetes.logCalls(0)
+    id shouldBe containerId
+    sinceTime shouldBe None
+
+    processedLogs should have size 3
+    processedLogs(0) shouldBe expectedLogEntry.rawString
+    processedLogs(1) shouldBe expectedLogEntry.rawString
+    processedLogs(2) should include(Messages.logFailure)
+
+    val processedLogsFalse = awaitLogs(container.logs(limit = 4096.B, waitForSentinel = false))
+    processedLogsFalse should have size 2
+    processedLogsFalse(0) shouldBe expectedLogEntry.rawString
+    processedLogsFalse(1) shouldBe expectedLogEntry.rawString
+  }
+
+  it should "strip sentinel lines if it waits or doesn't wait for them" in {
+    val expectedLogEntry =
+      TypedLogLine(currentTsp, "stdout", "This is a log entry.")
+
+    implicit val kubernetes = new TestKubernetesClient {
+      override def logs(containerId: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
+        implicit transid: TransactionId): Source[TypedLogLine, Any] = {
+        logCalls += ((containerId, sinceTime))
+        logSource(expectedLogEntry, appendSentinel = true)
+      }
+    }
+
+    val container = kubernetesContainer(id = containerId)()
+    val processedLogs = awaitLogs(container.logs(limit = 1.MB, waitForSentinel = true))
+    processedLogs should have size 1
+    processedLogs(0) shouldBe expectedLogEntry.rawString
+
+    val processedLogsFalse = awaitLogs(container.logs(limit = 1.MB, waitForSentinel = false))
+    processedLogsFalse should have size 1
+    processedLogsFalse(0) shouldBe expectedLogEntry.rawString
+  }
+
+  def currentTsp: Instant = Instant.now
+
+}
+
+object KubernetesContainerTests {
+
+  def logSource(logLine: TypedLogLine, appendSentinel: Boolean): Source[TypedLogLine, Any] =
+    logSource(Seq(logLine), appendSentinel)
+
+  def logSource(logs: Seq[TypedLogLine], appendSentinel: Boolean): Source[TypedLogLine, Any] =
+    Source(toLogs(logs, appendSentinel).to[immutable.Seq])
+
+  def toLogs(logLine: TypedLogLine, appendSentinel: Boolean): Seq[TypedLogLine] =
+    toLogs(Seq(logLine), appendSentinel)
+
+  def toLogs(log: Seq[TypedLogLine], appendSentinel: Boolean): Seq[TypedLogLine] =
+    if (appendSentinel) {
+      val lastTime = log.lastOption.map { case TypedLogLine(time, _, _) => time }.getOrElse(Instant.EPOCH)
+      log :+
+        TypedLogLine(lastTime, "stderr", s"${DockerContainer.ActivationSentinel.utf8String}") :+
+        TypedLogLine(lastTime, "stdout", s"${DockerContainer.ActivationSentinel.utf8String}")
+    } else {
+      log
+    }
+
+  implicit class TypedLogHelper(log: TypedLogLine) {
+    import KubernetesClient.formatK8STimestamp
+
+    def rawString: String = "%s %s: %s".format(formatK8STimestamp(log.time).get.trim, log.stream, log.log)
+  }
+
+}

-- 
To stop receiving notification emails like this one, please contact
markusthoemmes@apache.org.