You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2018/03/20 02:51:19 UTC

[incubator-openwhisk] branch master updated: Adds a container factor MesosContainerFactory for Mesos. (#2833)

This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new f68df3e  Adds a container factor MesosContainerFactory for Mesos. (#2833)
f68df3e is described below

commit f68df3eedd6507a666465983d803443432528d5b
Author: tysonnorris <ty...@gmail.com>
AuthorDate: Mon Mar 19 19:51:17 2018 -0700

    Adds a container factor MesosContainerFactory for Mesos. (#2833)
---
 common/scala/build.gradle                          |   2 +
 common/scala/src/main/resources/application.conf   |   8 +
 common/scala/src/main/resources/reference.conf     |   2 +-
 .../src/main/scala/whisk/core/WhiskConfig.scala    |   2 +
 .../whisk/core/mesos/MesosContainerFactory.scala   | 170 +++++++++++++
 .../main/scala/whisk/core/mesos/MesosTask.scala    | 180 +++++++++++++
 docs/mesos.md                                      |  47 ++++
 .../mesos/test/MesosContainerFactoryTest.scala     | 279 +++++++++++++++++++++
 8 files changed, 689 insertions(+), 1 deletion(-)

diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 8e0aeed..c907bd0 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -44,6 +44,8 @@ dependencies {
     compile 'io.fabric8:kubernetes-client:2.5.7'
     compile 'io.kamon:kamon-core_2.11:0.6.7'
     compile 'io.kamon:kamon-statsd_2.11:0.6.7'
+    //for mesos
+    compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.4'
 }
 
 tasks.withType(ScalaCompile) {
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 402ae5d..4c36319 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -155,4 +155,12 @@ whisk {
         max = 512 m
         std = 256 m
     }
+
+    mesos {
+        master-url = "http://localhost:5050" //your mesos master
+        master-public-url = "http://localhost:5050" // if mesos-link-log-message == true, this link will be included with the static log message (may or may not be different from master-url)
+        role = "*" //see http://mesos.apache.org/documentation/latest/roles/#associating-frameworks-with-roles
+        failover-timeout = 0 seconds  //Timeout allowed for framework to reconnect after disconnection.
+        mesos-link-log-message = true //If true, display a link to mesos in the static log message, otherwise do not include a link to mesos.
+    }
 }
diff --git a/common/scala/src/main/resources/reference.conf b/common/scala/src/main/resources/reference.conf
index 9821b8f..8048d43 100644
--- a/common/scala/src/main/resources/reference.conf
+++ b/common/scala/src/main/resources/reference.conf
@@ -1,4 +1,4 @@
-whisk.spi{
+whisk.spi {
   ArtifactStoreProvider = whisk.core.database.CouchDbStoreProvider
   MessagingProvider = whisk.connector.kafka.KafkaMessagingProvider
   ContainerFactoryProvider = whisk.core.containerpool.docker.DockerContainerFactoryProvider
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 4c4b4e8..7e61480 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -247,4 +247,6 @@ object ConfigKeys {
 
   val logStore = "whisk.logstore"
   val splunk = s"$logStore.splunk"
+
+  val mesos = "whisk.mesos"
 }
diff --git a/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala b/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala
new file mode 100644
index 0000000..9d6204b
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.mesos
+
+import akka.actor.ActorRef
+import akka.actor.ActorSystem
+import akka.pattern.ask
+import com.adobe.api.platform.runtime.mesos.MesosClient
+import com.adobe.api.platform.runtime.mesos.Subscribe
+import com.adobe.api.platform.runtime.mesos.SubscribeComplete
+import com.adobe.api.platform.runtime.mesos.Teardown
+import java.time.Instant
+import pureconfig.loadConfigOrThrow
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.TimeoutException
+import scala.concurrent.duration._
+import scala.util.Try
+import whisk.common.Counter
+import whisk.common.Logging
+import whisk.common.TransactionId
+import whisk.core.ConfigKeys
+import whisk.core.WhiskConfig
+import whisk.core.containerpool.Container
+import whisk.core.containerpool.ContainerArgsConfig
+import whisk.core.containerpool.ContainerFactory
+import whisk.core.containerpool.ContainerFactoryProvider
+import whisk.core.entity.ByteSize
+import whisk.core.entity.ExecManifest
+import whisk.core.entity.InstanceId
+import whisk.core.entity.UUID
+
+/**
+ * Configuration for MesosClient
+ * @param masterUrl The mesos url e.g. http://leader.mesos:5050.
+ * @param masterPublicUrl A public facing mesos url (which may be different that the internal facing url) e.g. http://mymesos:5050.
+ * @param role The role used by this framework (see http://mesos.apache.org/documentation/latest/roles/#associating-frameworks-with-roles).
+ * @param failoverTimeout Timeout allowed for framework to reconnect after disconnection.
+ * @param mesosLinkLogMessage If true, display a link to mesos in the static log message, otherwise do not include a link to mesos.
+ */
+case class MesosConfig(masterUrl: String,
+                       masterPublicUrl: Option[String],
+                       role: String,
+                       failoverTimeout: FiniteDuration,
+                       mesosLinkLogMessage: Boolean)
+
+class MesosContainerFactory(config: WhiskConfig,
+                            actorSystem: ActorSystem,
+                            logging: Logging,
+                            parameters: Map[String, Set[String]],
+                            containerArgs: ContainerArgsConfig =
+                              loadConfigOrThrow[ContainerArgsConfig](ConfigKeys.containerArgs),
+                            mesosConfig: MesosConfig = loadConfigOrThrow[MesosConfig](ConfigKeys.mesos),
+                            clientFactory: (ActorSystem, MesosConfig) => ActorRef = MesosContainerFactory.createClient,
+                            taskIdGenerator: () => String = MesosContainerFactory.taskIdGenerator)
+    extends ContainerFactory {
+
+  val subscribeTimeout = 10.seconds
+  val teardownTimeout = 30.seconds
+
+  implicit val as: ActorSystem = actorSystem
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+
+  /** Inits Mesos framework. */
+  val mesosClientActor = clientFactory(as, mesosConfig)
+
+  subscribe()
+
+  /** Subscribes Mesos actor to mesos event stream; retry on timeout (which should be unusual). */
+  private def subscribe(): Future[Unit] = {
+    logging.info(this, s"subscribing to Mesos master at ${mesosConfig.masterUrl}")
+    mesosClientActor
+      .ask(Subscribe)(subscribeTimeout)
+      .mapTo[SubscribeComplete]
+      .map(complete => logging.info(this, s"subscribe completed successfully... $complete"))
+      .recoverWith {
+        case e =>
+          logging.error(this, s"subscribe failed... $e}")
+          subscribe()
+      }
+  }
+
+  override def createContainer(tid: TransactionId,
+                               name: String,
+                               actionImage: ExecManifest.ImageName,
+                               userProvidedImage: Boolean,
+                               memory: ByteSize)(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
+    implicit val transid = tid
+    val image = if (userProvidedImage) {
+      actionImage.publicImageName
+    } else {
+      actionImage.localImageName(config.dockerRegistry, config.dockerImagePrefix, Some(config.dockerImageTag))
+    }
+
+    logging.info(this, s"using Mesos to create a container with image $image...")
+    MesosTask.create(
+      mesosClientActor,
+      mesosConfig,
+      taskIdGenerator,
+      tid,
+      image = image,
+      userProvidedImage = userProvidedImage,
+      memory = memory,
+      cpuShares = config.invokerCoreShare.toInt,
+      environment = Map("__OW_API_HOST" -> config.wskApiHost),
+      network = containerArgs.network,
+      dnsServers = containerArgs.dnsServers,
+      name = Some(name),
+      //strip any "--" prefixes on parameters (should make this consistent everywhere else)
+      parameters
+        .map({ case (k, v) => if (k.startsWith("--")) (k.replaceFirst("--", ""), v) else (k, v) })
+        ++ containerArgs.extraArgs)
+  }
+
+  override def init(): Unit = Unit
+
+  /** Cleanups any remaining Containers; should block until complete; should ONLY be run at shutdown. */
+  override def cleanup(): Unit = {
+    val complete: Future[Any] = mesosClientActor.ask(Teardown)(teardownTimeout)
+    Try(Await.result(complete, teardownTimeout))
+      .map(_ => logging.info(this, "Mesos framework teardown completed."))
+      .recover {
+        case _: TimeoutException => logging.error(this, "Mesos framework teardown took too long.")
+        case t: Throwable =>
+          logging.error(this, s"Mesos framework teardown failed : $t}")
+      }
+  }
+}
+object MesosContainerFactory {
+  private def createClient(actorSystem: ActorSystem, mesosConfig: MesosConfig): ActorRef =
+    actorSystem.actorOf(
+      MesosClient
+        .props(
+          "whisk-containerfactory-" + UUID(),
+          "whisk-containerfactory-framework",
+          mesosConfig.masterUrl,
+          mesosConfig.role,
+          mesosConfig.failoverTimeout))
+
+  val counter = new Counter()
+  val startTime = Instant.now.getEpochSecond
+  private def taskIdGenerator(): String = {
+    s"whisk-${counter.next()}-${startTime}"
+  }
+}
+
+object MesosContainerFactoryProvider extends ContainerFactoryProvider {
+  override def getContainerFactory(actorSystem: ActorSystem,
+                                   logging: Logging,
+                                   config: WhiskConfig,
+                                   instance: InstanceId,
+                                   parameters: Map[String, Set[String]]): ContainerFactory =
+    new MesosContainerFactory(config, actorSystem, logging, parameters)
+}
diff --git a/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala b/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala
new file mode 100644
index 0000000..1d6ad86
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.mesos
+
+import akka.actor.ActorRef
+import akka.actor.ActorSystem
+import akka.pattern.ask
+import akka.stream.scaladsl.Source
+import akka.util.ByteString
+import akka.util.Timeout
+import com.adobe.api.platform.runtime.mesos.Bridge
+import com.adobe.api.platform.runtime.mesos.DeleteTask
+import com.adobe.api.platform.runtime.mesos.Host
+import com.adobe.api.platform.runtime.mesos.Running
+import com.adobe.api.platform.runtime.mesos.SubmitTask
+import com.adobe.api.platform.runtime.mesos.TaskDef
+import com.adobe.api.platform.runtime.mesos.User
+import java.time.Instant
+import org.apache.mesos.v1.Protos.TaskState
+import org.apache.mesos.v1.Protos.TaskStatus
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import spray.json._
+import whisk.common.Logging
+import whisk.common.TransactionId
+import whisk.core.containerpool.Container
+import whisk.core.containerpool.ContainerAddress
+import whisk.core.containerpool.ContainerId
+import whisk.core.containerpool.logging.LogLine
+import whisk.core.entity.ByteSize
+import whisk.core.entity.size._
+
+/**
+ * MesosTask implementation of Container.
+ * Differences from DockerContainer include:
+ * - does not launch container using docker cli, but rather a Mesos framework
+ * - does not support pause/resume
+ * - does not support log collection (currently), but does provide a message indicating logs can be viewed via Mesos UI
+ * (external log collection and retrieval must be enabled via LogStore SPI to expose logs to wsk cli)
+ */
+case object Environment
+case class CreateContainer(image: String, memory: String, cpuShare: String)
+
+object MesosTask {
+  val taskLaunchTimeout = Timeout(45 seconds)
+  val taskDeleteTimeout = Timeout(30 seconds)
+
+  def create(mesosClientActor: ActorRef,
+             mesosConfig: MesosConfig,
+             taskIdGenerator: () => String,
+             transid: TransactionId,
+             image: String,
+             userProvidedImage: Boolean = false,
+             memory: ByteSize = 256.MB,
+             cpuShares: Int = 0,
+             environment: Map[String, String] = Map(),
+             network: String = "bridge",
+             dnsServers: Seq[String] = Seq(),
+             name: Option[String] = None,
+             parameters: Map[String, Set[String]] = Map())(implicit ec: ExecutionContext,
+                                                           log: Logging,
+                                                           as: ActorSystem): Future[Container] = {
+    implicit val tid = transid
+
+    log.info(this, s"creating task for image $image...")
+
+    val mesosCpuShares = cpuShares / 1024.0 // convert openwhisk (docker based) shares to mesos (cpu percentage)
+    val mesosRam = memory.toMB.toInt
+
+    val taskId = taskIdGenerator()
+    val lowerNetwork = network.toLowerCase // match bridge+host without case, but retain case for user specified network
+    val taskNetwork = lowerNetwork match {
+      case "bridge" => Bridge
+      case "host"   => Host
+      case _        => User(network)
+    }
+    val dnsOrEmpty = if (dnsServers.nonEmpty) Map("dns" -> dnsServers.toSet) else Map()
+
+    val task = new TaskDef(
+      taskId,
+      name.getOrElse(image), // task name either the indicated name, or else the image name
+      image,
+      mesosCpuShares,
+      mesosRam,
+      List(8080), // all action containers listen on 8080
+      Some(0), // port at index 0 used for health
+      false,
+      taskNetwork,
+      dnsOrEmpty ++ parameters,
+      environment)
+
+    val launched: Future[Running] =
+      mesosClientActor.ask(SubmitTask(task))(taskLaunchTimeout).mapTo[Running]
+
+    launched.map(taskDetails => {
+      val taskHost = taskDetails.hostname
+      val taskPort = taskDetails.hostports(0)
+      log.info(this, s"launched task with state ${taskDetails.taskStatus.getState} at ${taskHost}:${taskPort}")
+      val containerIp = new ContainerAddress(taskHost, taskPort)
+      val containerId = new ContainerId(taskId);
+      new MesosTask(containerId, containerIp, ec, log, taskId, mesosClientActor, mesosConfig)
+    })
+
+  }
+
+}
+
+object JsonFormatters extends DefaultJsonProtocol {
+  implicit val createContainerJson = jsonFormat3(CreateContainer)
+}
+
+class MesosTask(override protected val id: ContainerId,
+                override protected val addr: ContainerAddress,
+                override protected val ec: ExecutionContext,
+                override protected val logging: Logging,
+                taskId: String,
+                mesosClientActor: ActorRef,
+                mesosConfig: MesosConfig)
+    extends Container {
+
+  /** Stops the container from consuming CPU cycles. */
+  override def suspend()(implicit transid: TransactionId): Future[Unit] = {
+    // suspend not supported
+    Future.successful(Unit)
+  }
+
+  /** Dual of halt. */
+  override def resume()(implicit transid: TransactionId): Future[Unit] = {
+    // resume not supported
+    Future.successful(Unit)
+  }
+
+  /** Completely destroys this instance of the container. */
+  override def destroy()(implicit transid: TransactionId): Future[Unit] = {
+    mesosClientActor
+      .ask(DeleteTask(taskId))(MesosTask.taskDeleteTimeout)
+      .mapTo[TaskStatus]
+      .map(taskStatus => {
+        // verify that task ended in TASK_KILLED state (but don't fail if it didn't...)
+        if (taskStatus.getState != TaskState.TASK_KILLED) {
+          logging.error(this, s"task kill resulted in unexpected state ${taskStatus.getState}")
+        } else {
+          logging.info(this, s"task killed ended with state ${taskStatus.getState}")
+        }
+      })(ec)
+  }
+
+  /**
+   * Obtains logs up to a given threshold from the container. Optionally waits for a sentinel to appear.
+   * For Mesos, this log message is static per container, just indicating that Mesos logs can be found via the Mesos UI.
+   * To disable this message, and just store an static log message per activation, set
+   *     whisk.mesos.mesosLinkLogMessage=false
+   */
+  private val linkedLogMsg =
+    s"Logs are not collected from Mesos containers currently. " +
+      s"You can browse the logs for Mesos Task ID $taskId using the mesos UI at ${mesosConfig.masterPublicUrl
+        .getOrElse(mesosConfig.masterUrl)}"
+  private val noLinkLogMsg = "Log collection is not configured correctly, check with your service administrator."
+  private val logMsg = if (mesosConfig.mesosLinkLogMessage) linkedLogMsg else noLinkLogMsg
+  override def logs(limit: ByteSize, waitForSentinel: Boolean)(
+    implicit transid: TransactionId): Source[ByteString, Any] =
+    Source.single(ByteString(LogLine(logMsg, "stdout", Instant.now.toString).toJson.compactPrint))
+}
diff --git a/docs/mesos.md b/docs/mesos.md
new file mode 100644
index 0000000..bf69508
--- /dev/null
+++ b/docs/mesos.md
@@ -0,0 +1,47 @@
+# Mesos Support
+
+The `MesosContainerFactory` enables launching action containers within a Mesos cluster. It does not affect the deployment of OpenWhisk components (invoker, controller).
+
+## Enable
+
+To enable MesosContainerFactory, use the following TypeSafe Config properties
+
+| property | required | details | example |
+| --- | --- | --- | --- |
+| `whisk.spi.ContainerFactoryProvider` | required | enable the MesosContainerFactory | whisk.core.mesos.MesosContainerFactoryProvider |
+| `whisk.mesos.master-url` | required | Mesos master HTTP endpoint to be accessed from the invoker for framework subscription | http://192.168.99.100:5050 |
+| `whisk.mesos.master-url-public` | optional (default to whisk.mesos.master-url) | public facing Mesos master HTTP endpoint for exposing logs to cli users | http://192.168.99.100:5050 |
+| `whisk.mesos.role` | optional (default *) | Mesos framework role| any string e.g. `openwhisk` |
+| `whisk.mesos.failover-timeout-seconds` | optional (default 0) | how long to wait for the framework to reconnect with the same id before tasks are terminated  | see http://mesos.apache.org/documentation/latest/high-availability-framework-guide/ |
+| `whisk.mesos.mesos-link-log-message` | optional (default true) | display a log message with a link to Mesos when using the default LogStore (or no log message) | Since logs are not available for invoker to collect from Mesos in general, you can either use an alternate LogStore or direct users to the Mesos ui |   |
+
+To set these properties for your invoker, set the corresponding environment variables e.g.,
+```properties
+CONFIG_whisk_spi_ContainerFactoryProvider=whisk.core.mesos.MesosContainerFactoryProvider
+CONFIG_whisk_mesos_masterUrl=http://192.168.99.100:5050
+```
+
+## Known Issues
+
+* Logs are not collected from action containers.
+
+  For now, the Mesos public URL will be included in the logs retrieved via the wsk CLI. Once log retrieval from external sources is enabled, logs from mesos containers would have to be routed to the external source, and then retrieved from that source.
+ 
+* No HA or failover support (single invoker per cluster).
+  
+  Currently the version of `mesos-actor` in use does not support HA or failover. Failover support is planned to be provided by:
+  
+  * multiple invokers running in an Akka cluster
+  * the Mesos framework actor is a singleton within the cluster
+  * the Mesos framework actor is available from the other invoker nodes
+  * if the node that contains the Mesos framework actor fails:
+     * the actor will be recreated on a separate invoker node
+     * the actor will resubscribe to mesos scheduler API with the same ID
+     * the tasks that were previously launched by the actor will be reconciled
+     * normal operation resumes
+     
+     
+  
+
+
+
diff --git a/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala b/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
new file mode 100644
index 0000000..deb23ff
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.mesos.test
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.Sink
+import akka.testkit.TestKit
+import akka.testkit.TestProbe
+import com.adobe.api.platform.runtime.mesos.Bridge
+import com.adobe.api.platform.runtime.mesos.DeleteTask
+import com.adobe.api.platform.runtime.mesos.Running
+import com.adobe.api.platform.runtime.mesos.SubmitTask
+import com.adobe.api.platform.runtime.mesos.Subscribe
+import com.adobe.api.platform.runtime.mesos.SubscribeComplete
+import com.adobe.api.platform.runtime.mesos.TaskDef
+import com.adobe.api.platform.runtime.mesos.User
+import common.StreamLogging
+import org.apache.mesos.v1.Protos.AgentID
+import org.apache.mesos.v1.Protos.TaskID
+import org.apache.mesos.v1.Protos.TaskInfo
+import org.apache.mesos.v1.Protos.TaskState
+import org.apache.mesos.v1.Protos.TaskStatus
+import org.junit.runner.RunWith
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.FlatSpecLike
+import org.scalatest.Matchers
+import org.scalatest.junit.JUnitRunner
+import scala.collection.immutable.Map
+import scala.concurrent.Await
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import whisk.common.TransactionId
+import whisk.core.WhiskConfig
+import whisk.core.WhiskConfig._
+import whisk.core.containerpool.ContainerArgsConfig
+import whisk.core.containerpool.logging.DockerToActivationLogStore
+import whisk.core.entity.ExecManifest.ImageName
+import whisk.core.entity.size._
+import whisk.core.mesos.MesosConfig
+import whisk.core.mesos.MesosContainerFactory
+@RunWith(classOf[JUnitRunner])
+class MesosContainerFactoryTest
+    extends TestKit(ActorSystem("MesosActorSystem"))
+    with FlatSpecLike
+    with Matchers
+    with StreamLogging
+    with BeforeAndAfterEach {
+
+  /** Awaits the given future, throws the exception enclosed in Failure. */
+  def await[A](f: Future[A], timeout: FiniteDuration = 500.milliseconds) = Await.result[A](f, timeout)
+
+  implicit val wskConfig =
+    new WhiskConfig(Map(invokerCoreShare -> "2", dockerImageTag -> "latest", wskApiHostname -> "apihost") ++ wskApiHost)
+  var count = 0
+  var lastTaskId: String = null
+  def testTaskId() = {
+    count += 1
+    lastTaskId = "testTaskId" + count
+    lastTaskId
+  }
+
+  //TODO: adjust this once the invokerCoreShare issue is fixed see #3110
+  def cpus() = wskConfig.invokerCoreShare.toInt / 1024.0 //
+
+  val containerArgsConfig =
+    new ContainerArgsConfig("net1", Seq("dns1", "dns2"), Map("extra1" -> Set("e1", "e2"), "extra2" -> Set("e3", "e4")))
+
+  override def beforeEach() = {
+    stream.reset()
+  }
+  behavior of "MesosContainerFactory"
+
+  it should "send Subscribe on init" in {
+    val wskConfig = new WhiskConfig(Map())
+    val mesosConfig = MesosConfig("http://master:5050", None, "*", 0.seconds, true)
+    new MesosContainerFactory(
+      wskConfig,
+      system,
+      logging,
+      Map("--arg1" -> Set("v1", "v2")),
+      containerArgsConfig,
+      mesosConfig,
+      (system, mesosConfig) => testActor)
+
+    expectMsg(Subscribe)
+  }
+
+  it should "send SubmitTask on create" in {
+    val mesosConfig = MesosConfig("http://master:5050", None, "*", 0.seconds, true)
+
+    val factory =
+      new MesosContainerFactory(
+        wskConfig,
+        system,
+        logging,
+        Map("--arg1" -> Set("v1", "v2"), "--arg2" -> Set("v3", "v4"), "other" -> Set("v5", "v6")),
+        containerArgsConfig,
+        mesosConfig,
+        (_, _) => testActor,
+        testTaskId)
+
+    expectMsg(Subscribe)
+    factory.createContainer(TransactionId.testing, "mesosContainer", ImageName("fakeImage"), false, 1.MB)
+
+    expectMsg(
+      SubmitTask(TaskDef(
+        lastTaskId,
+        "mesosContainer",
+        "fakeImage:" + wskConfig.dockerImageTag,
+        cpus,
+        1,
+        List(8080),
+        Some(0),
+        false,
+        User("net1"),
+        Map(
+          "arg1" -> Set("v1", "v2"),
+          "arg2" -> Set("v3", "v4"),
+          "other" -> Set("v5", "v6"),
+          "dns" -> Set("dns1", "dns2"),
+          "extra1" -> Set("e1", "e2"),
+          "extra2" -> Set("e3", "e4")),
+        Map("__OW_API_HOST" -> wskConfig.wskApiHost))))
+  }
+
+  it should "send DeleteTask on destroy" in {
+    val mesosConfig = MesosConfig("http://master:5050", None, "*", 0.seconds, true)
+
+    val probe = TestProbe()
+    val factory =
+      new MesosContainerFactory(
+        wskConfig,
+        system,
+        logging,
+        Map("--arg1" -> Set("v1", "v2"), "--arg2" -> Set("v3", "v4"), "other" -> Set("v5", "v6")),
+        containerArgsConfig,
+        mesosConfig,
+        (system, mesosConfig) => probe.testActor,
+        testTaskId)
+
+    probe.expectMsg(Subscribe)
+    //emulate successful subscribe
+    probe.reply(new SubscribeComplete)
+
+    //create the container
+    val c = factory.createContainer(TransactionId.testing, "mesosContainer", ImageName("fakeImage"), false, 1.MB)
+    probe.expectMsg(
+      SubmitTask(TaskDef(
+        lastTaskId,
+        "mesosContainer",
+        "fakeImage:" + wskConfig.dockerImageTag,
+        cpus,
+        1,
+        List(8080),
+        Some(0),
+        false,
+        User("net1"),
+        Map(
+          "arg1" -> Set("v1", "v2"),
+          "arg2" -> Set("v3", "v4"),
+          "other" -> Set("v5", "v6"),
+          "dns" -> Set("dns1", "dns2"),
+          "extra1" -> Set("e1", "e2"),
+          "extra2" -> Set("e3", "e4")),
+        Map("__OW_API_HOST" -> wskConfig.wskApiHost))))
+
+    //emulate successful task launch
+    val taskId = TaskID.newBuilder().setValue(lastTaskId)
+
+    probe.reply(
+      Running(
+        TaskInfo
+          .newBuilder()
+          .setName("testTask")
+          .setTaskId(taskId)
+          .setAgentId(AgentID.newBuilder().setValue("testAgentID"))
+          .build(),
+        TaskStatus.newBuilder().setTaskId(taskId).setState(TaskState.TASK_RUNNING).build(),
+        "agenthost",
+        Seq(30000)))
+    //wait for container
+    val container = await(c)
+
+    //destroy the container
+    implicit val tid = TransactionId.testing
+    val deleted = container.destroy()
+    probe.expectMsg(DeleteTask(lastTaskId))
+
+    probe.reply(TaskStatus.newBuilder().setTaskId(taskId).setState(TaskState.TASK_KILLED).build())
+
+    await(deleted)
+
+  }
+
+  it should "return static message for logs" in {
+    val mesosConfig = MesosConfig("http://master:5050", None, "*", 0.seconds, true)
+
+    val probe = TestProbe()
+    val factory =
+      new MesosContainerFactory(
+        wskConfig,
+        system,
+        logging,
+        Map("--arg1" -> Set("v1", "v2"), "--arg2" -> Set("v3", "v4"), "other" -> Set("v5", "v6")),
+        new ContainerArgsConfig("bridge", Seq(), Map("extra1" -> Set("e1", "e2"), "extra2" -> Set("e3", "e4"))),
+        mesosConfig,
+        (system, mesosConfig) => probe.testActor,
+        testTaskId)
+
+    probe.expectMsg(Subscribe)
+    //emulate successful subscribe
+    probe.reply(new SubscribeComplete)
+
+    //create the container
+    val c = factory.createContainer(TransactionId.testing, "mesosContainer", ImageName("fakeImage"), false, 1.MB)
+
+    probe.expectMsg(
+      SubmitTask(TaskDef(
+        lastTaskId,
+        "mesosContainer",
+        "fakeImage:" + wskConfig.dockerImageTag,
+        cpus,
+        1,
+        List(8080),
+        Some(0),
+        false,
+        Bridge,
+        Map(
+          "arg1" -> Set("v1", "v2"),
+          "arg2" -> Set("v3", "v4"),
+          "other" -> Set("v5", "v6"),
+          "extra1" -> Set("e1", "e2"),
+          "extra2" -> Set("e3", "e4")),
+        Map("__OW_API_HOST" -> wskConfig.wskApiHost))))
+
+    //emulate successful task launch
+    val taskId = TaskID.newBuilder().setValue(lastTaskId)
+
+    probe.reply(
+      Running(
+        TaskInfo
+          .newBuilder()
+          .setName("testTask")
+          .setTaskId(taskId)
+          .setAgentId(AgentID.newBuilder().setValue("testAgentID"))
+          .build(),
+        TaskStatus.newBuilder().setTaskId(taskId).setState(TaskState.TASK_RUNNING).build(),
+        "agenthost",
+        Seq(30000)))
+    //wait for container
+    val container = await(c)
+
+    implicit val tid = TransactionId.testing
+    implicit val m = ActorMaterializer()
+    val logs = container
+      .logs(1.MB, false)
+      .via(DockerToActivationLogStore.toFormattedString)
+      .runWith(Sink.seq)
+    await(logs)(0) should endWith
+    " stdout: Logs are not collected from Mesos containers currently. You can browse the logs for Mesos Task ID testTaskId using the mesos UI at http://master:5050"
+
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
rabbah@apache.org.