You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/04/19 03:54:51 UTC

[GitHub] tysonnorris closed pull request #3497: allow singleton (active/passive) and replicated pools in invoker

tysonnorris closed pull request #3497: allow singleton (active/passive) and replicated pools in invoker
URL: https://github.com/apache/incubator-openwhisk/pull/3497
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index c907bd05bb..e265530adf 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -45,7 +45,7 @@ dependencies {
     compile 'io.kamon:kamon-core_2.11:0.6.7'
     compile 'io.kamon:kamon-statsd_2.11:0.6.7'
     //for mesos
-    compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.4'
+    compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.7'
 }
 
 tasks.withType(ScalaCompile) {
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 4c36319523..0d727663ec 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -160,7 +160,11 @@ whisk {
         master-url = "http://localhost:5050" //your mesos master
         master-public-url = "http://localhost:5050" // if mesos-link-log-message == true, this link will be included with the static log message (may or may not be different from master-url)
         role = "*" //see http://mesos.apache.org/documentation/latest/roles/#associating-frameworks-with-roles
-        failover-timeout = 0 seconds  //Timeout allowed for framework to reconnect after disconnection.
+        failover-timeout = 30 seconds  //Timeout allowed for framework to reconnect after disconnection.
         mesos-link-log-message = true //If true, display a link to mesos in the static log message, otherwise do not include a link to mesos.
+        constraints = [] //placement constraint strings to use for managed containers e.g. ["att1 LIKE v1", "att2 UNLIKE v2"]
+        blackbox-constraints = [] //placement constraints to use for blackbox containers
+        constraint-delimiter = " "//used to parse constraint strings
+        teardown-on-exit = false //set to true to disable the mesos framework on system exit; set for false for HA deployments
     }
 }
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index 20d56356bf..7687100627 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -39,12 +39,13 @@ case class KafkaConsumerConfig(sessionTimeoutMs: Long, metricFlushIntervalS: Int
 class KafkaConsumerConnector(
   kafkahost: String,
   groupid: String,
-  topic: String,
+  val topic: String,
   override val maxPeek: Int = Int.MaxValue)(implicit logging: Logging, actorSystem: ActorSystem)
     extends MessageConsumer {
 
   implicit val ec: ExecutionContext = actorSystem.dispatcher
   private val gracefulWaitTime = 100.milliseconds
+  @volatile private var closing = false //to close, set closing=true, then close will happen after peek
 
   // The consumer is generally configured via getProps. This configuration only loads values necessary for "outer"
   // logic, like the wakeup timer.
@@ -62,7 +63,6 @@ class KafkaConsumerConnector(
    */
   override def peek(duration: FiniteDuration = 500.milliseconds,
                     retry: Int = 3): Iterable[(String, Int, Long, Array[Byte])] = {
-
     // poll can be infinitely blocked in edge-cases, so we need to wakeup explicitly.
     val wakeUpTask = actorSystem.scheduler.scheduleOnce(cfg.sessionTimeoutMs.milliseconds + 1.second)(consumer.wakeup())
 
@@ -74,42 +74,60 @@ class KafkaConsumerConnector(
       response
     } catch {
       // Happens if the peek hangs.
-      case _: WakeupException if retry > 0 =>
+      case _: WakeupException if !closing && retry > 0 =>
         logging.error(this, s"poll timeout occurred. Retrying $retry more times.")
         Thread.sleep(gracefulWaitTime.toMillis) // using Thread.sleep is okay, since `poll` is blocking anyway
         peek(duration, retry - 1)
-      case e: RetriableException if retry > 0 =>
+      case e: RetriableException if !closing && retry > 0 =>
         logging.error(this, s"$e: Retrying $retry more times")
         wakeUpTask.cancel()
         Thread.sleep(gracefulWaitTime.toMillis) // using Thread.sleep is okay, since `poll` is blocking anyway
         peek(duration, retry - 1)
+      case _ if (closing) =>
+        wakeUpTask.cancel()
+        logging.info(this, s"closing ${topic}")
+        Seq.empty
       // Every other error results in a restart of the consumer
       case e: Throwable =>
         recreateConsumer()
         throw e
-    } finally wakeUpTask.cancel()
+    } finally {
+      wakeUpTask.cancel()
+      if (closing) consumer.close()
+
+    }
   }
 
   /**
    * Commits offsets from last poll.
    */
   def commit(retry: Int = 3): Unit =
-    try {
-      consumer.commitSync()
-    } catch {
-      case e: RetriableException =>
-        if (retry > 0) {
-          logging.error(this, s"$e: retrying $retry more times")
-          Thread.sleep(gracefulWaitTime.toMillis) // using Thread.sleep is okay, since `commitSync` is blocking anyway
-          commit(retry - 1)
-        } else {
-          throw e
-        }
+    if (closing) {
+      logging.info(this, "skipping commit because close in progress...")
+    } else {
+      try {
+        consumer.commitSync()
+      } catch {
+        case e: RetriableException =>
+          if (retry > 0) {
+            logging.error(this, s"$e: retrying $retry more times")
+            Thread.sleep(gracefulWaitTime.toMillis) // using Thread.sleep is okay, since `commitSync` is blocking anyway
+            commit(retry - 1)
+          } else {
+            throw e
+          }
+        case _ if (closing) =>
+          logging.info(this, s"closing ${topic}")
+      } finally {
+        if (closing) consumer.close()
+      }
     }
 
   override def close(): Unit = {
-    consumer.close()
-    logging.info(this, s"closing '$topic' consumer")
+    //calling close immediately will fail because the consumer doesn't allow multithreaded access
+    closing = true
+    logging.info(this, s"closing '$topic' consumer after wakeup")
+    consumer.wakeup()
   }
 
   private def getProps: Properties = {
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/SeedNodesProvider.scala b/common/scala/src/main/scala/whisk/core/SeedNodesProvider.scala
similarity index 97%
rename from core/controller/src/main/scala/whisk/core/loadBalancer/SeedNodesProvider.scala
rename to common/scala/src/main/scala/whisk/core/SeedNodesProvider.scala
index be630a4234..3e96e51838 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/SeedNodesProvider.scala
+++ b/common/scala/src/main/scala/whisk/core/SeedNodesProvider.scala
@@ -15,10 +15,9 @@
  * limitations under the License.
  */
 
-package whisk.core.loadBalancer
+package whisk.core
 
 import akka.actor.Address
-
 import scala.collection.immutable.Seq
 
 trait SeedNodesProvider {
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 7e6148038d..4a3ce023e5 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -87,6 +87,7 @@ class WhiskConfig(requiredProperties: Map[String, String],
   val actionInvokeSystemOverloadLimit = this(WhiskConfig.actionInvokeSystemOverloadLimit)
   val actionSequenceLimit = this(WhiskConfig.actionSequenceMaxLimit)
   val controllerSeedNodes = this(WhiskConfig.controllerSeedNodes)
+  val seedNodes = this(WhiskConfig.seedNodes)
   val controllerLocalBookkeeping = getAsBoolean(WhiskConfig.controllerLocalBookkeeping, false)
 }
 
@@ -209,6 +210,7 @@ object WhiskConfig {
   val actionInvokeSystemOverloadLimit = "limits.actions.invokes.concurrentInSystem"
   val triggerFirePerMinuteLimit = "limits.triggers.fires.perMinute"
   val controllerSeedNodes = "akka.cluster.seed.nodes"
+  val seedNodes = "akka.cluster.seed.nodes"
   val controllerLocalBookkeeping = "controller.localBookkeeping"
 }
 
@@ -237,6 +239,7 @@ object ConfigKeys {
   val runcTimeouts = s"$runc.timeouts"
   val containerFactory = "whisk.container-factory"
   val containerArgs = s"$containerFactory.container-args"
+  val containerPool = "whisk.container-pool"
   val blacklist = "whisk.blacklist"
 
   val kubernetes = "whisk.kubernetes"
diff --git a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
index 14af69e4a9..c2eae7d726 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
@@ -171,7 +171,10 @@ class MessageFeed(description: String,
   initialize()
 
   private implicit val ec = context.system.dispatcher
-
+  override def postStop(): Unit = {
+    //close consumer as soon as this actor stops, to allow quick failover to another consumer
+    consumer.close()
+  }
   private def fillPipeline(): Unit = {
     if (outstandingMessages.size <= pipelineFillThreshold) {
       Future {
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
index 49c692b086..50b1c93bd6 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
@@ -56,8 +56,8 @@ case class ContainerAddress(val host: String, val port: Int = 8080) {
 
 trait Container {
 
-  protected val id: ContainerId
-  protected val addr: ContainerAddress
+  val id: ContainerId
+  val addr: ContainerAddress
   protected implicit val logging: Logging
   protected implicit val ec: ExecutionContext
 
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
index 17860c015c..cd65f1fd60 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
@@ -46,6 +46,13 @@ trait ContainerFactory {
   /** perform any initialization */
   def init(): Unit
 
+  def attach(id: ContainerId,
+             ip: ContainerAddress,
+             tid: TransactionId,
+             actionImage: ExecManifest.ImageName,
+             userProvidedImage: Boolean,
+             memory: ByteSize): Future[Container]
+
   /** cleanup any remaining Containers; should block until complete; should ONLY be run at startup/shutdown */
   def cleanup(): Unit
 }
diff --git a/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala b/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala
index 9d6204b00c..c51679721c 100644
--- a/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala
+++ b/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala
@@ -19,11 +19,21 @@ package whisk.core.mesos
 
 import akka.actor.ActorRef
 import akka.actor.ActorSystem
+import akka.actor.PoisonPill
+import akka.cluster.Cluster
+import akka.cluster.singleton.ClusterSingletonManager
+import akka.cluster.singleton.ClusterSingletonManagerSettings
+import akka.cluster.singleton.ClusterSingletonProxy
+import akka.cluster.singleton.ClusterSingletonProxySettings
 import akka.pattern.ask
+import com.adobe.api.platform.runtime.mesos.Constraint
+import com.adobe.api.platform.runtime.mesos.DistributedDataTaskStore
+import com.adobe.api.platform.runtime.mesos.LIKE
 import com.adobe.api.platform.runtime.mesos.MesosClient
 import com.adobe.api.platform.runtime.mesos.Subscribe
 import com.adobe.api.platform.runtime.mesos.SubscribeComplete
 import com.adobe.api.platform.runtime.mesos.Teardown
+import com.adobe.api.platform.runtime.mesos.UNLIKE
 import java.time.Instant
 import pureconfig.loadConfigOrThrow
 import scala.concurrent.Await
@@ -32,15 +42,19 @@ import scala.concurrent.Future
 import scala.concurrent.TimeoutException
 import scala.concurrent.duration._
 import scala.util.Try
+import whisk.common.AkkaLogging
 import whisk.common.Counter
 import whisk.common.Logging
 import whisk.common.TransactionId
 import whisk.core.ConfigKeys
+import whisk.core.StaticSeedNodesProvider
 import whisk.core.WhiskConfig
 import whisk.core.containerpool.Container
+import whisk.core.containerpool.ContainerAddress
 import whisk.core.containerpool.ContainerArgsConfig
 import whisk.core.containerpool.ContainerFactory
 import whisk.core.containerpool.ContainerFactoryProvider
+import whisk.core.containerpool.ContainerId
 import whisk.core.entity.ByteSize
 import whisk.core.entity.ExecManifest
 import whisk.core.entity.InstanceId
@@ -53,12 +67,21 @@ import whisk.core.entity.UUID
  * @param role The role used by this framework (see http://mesos.apache.org/documentation/latest/roles/#associating-frameworks-with-roles).
  * @param failoverTimeout Timeout allowed for framework to reconnect after disconnection.
  * @param mesosLinkLogMessage If true, display a link to mesos in the static log message, otherwise do not include a link to mesos.
+ * @param constraints Cluster placement constraints for non-blackbox containers; supports Strings in the form "<agent attribute name><delimiter>LIKE|UNLIKE<delimiter><attribute value regex>"
+ * @param constraintDelimiter Delimiter for constraint strings.
+ * @param blackboxConstraints Cluster placement constraints for blackbox containers; supports Strings in the form "<agent attribute name><delimiter>LIKE|UNLIKE<delimiter><attribute value regex>"
+ * @param teardownOnExit On system exit should the framework be removed? (if so, failover will not be possible; so typically should be false in HA deployment)
+ *
  */
 case class MesosConfig(masterUrl: String,
                        masterPublicUrl: Option[String],
                        role: String,
                        failoverTimeout: FiniteDuration,
-                       mesosLinkLogMessage: Boolean)
+                       mesosLinkLogMessage: Boolean,
+                       constraints: Seq[String],
+                       constraintDelimiter: String,
+                       blackboxConstraints: Seq[String],
+                       teardownOnExit: Boolean) {}
 
 class MesosContainerFactory(config: WhiskConfig,
                             actorSystem: ActorSystem,
@@ -74,10 +97,16 @@ class MesosContainerFactory(config: WhiskConfig,
   val subscribeTimeout = 10.seconds
   val teardownTimeout = 30.seconds
 
+  /** Inits Mesos framework. */
   implicit val as: ActorSystem = actorSystem
   implicit val ec: ExecutionContext = actorSystem.dispatcher
 
-  /** Inits Mesos framework. */
+  implicit val cluster = Cluster(as)
+
+  val seedNodesProvider = new StaticSeedNodesProvider(config.seedNodes, as.name)
+  logging.info(this, s"joining cluster seed nodes ${seedNodesProvider.getSeedNodes()}")
+  cluster.joinSeedNodes(seedNodesProvider.getSeedNodes())
+
   val mesosClientActor = clientFactory(as, mesosConfig)
 
   subscribe()
@@ -88,7 +117,11 @@ class MesosContainerFactory(config: WhiskConfig,
     mesosClientActor
       .ask(Subscribe)(subscribeTimeout)
       .mapTo[SubscribeComplete]
-      .map(complete => logging.info(this, s"subscribe completed successfully... $complete"))
+      .map(complete => {
+        //capture the framework id, so that reconcile will work later if the singleton dies
+        MesosContainerFactory.frameworkId = Some(complete.id)
+        logging.info(this, s"subscribe completed successfully... $complete")
+      })
       .recoverWith {
         case e =>
           logging.error(this, s"subscribe failed... $e}")
@@ -107,8 +140,13 @@ class MesosContainerFactory(config: WhiskConfig,
     } else {
       actionImage.localImageName(config.dockerRegistry, config.dockerImagePrefix, Some(config.dockerImageTag))
     }
+    val constraintStrings = if (userProvidedImage) {
+      mesosConfig.blackboxConstraints
+    } else {
+      mesosConfig.constraints
+    }
 
-    logging.info(this, s"using Mesos to create a container with image $image...")
+    logging.info(this, s"using Mesos to create a container with image ${image}...")
     MesosTask.create(
       mesosClientActor,
       mesosConfig,
@@ -125,36 +163,87 @@ class MesosContainerFactory(config: WhiskConfig,
       //strip any "--" prefixes on parameters (should make this consistent everywhere else)
       parameters
         .map({ case (k, v) => if (k.startsWith("--")) (k.replaceFirst("--", ""), v) else (k, v) })
-        ++ containerArgs.extraArgs)
+        ++ containerArgs.extraArgs,
+      parseConstraints(constraintStrings))
+  }
+
+  /**
+   * Validate that constraint strings are well formed, and ignore constraints with unknown operators
+   * @param constraintStrings
+   * @param logging
+   * @return
+   */
+  def parseConstraints(constraintStrings: Seq[String])(implicit logging: Logging): Seq[Constraint] =
+    constraintStrings.flatMap(cs => {
+      val parts = cs.split(mesosConfig.constraintDelimiter)
+      require(parts.length == 3, "constraint must be in the form <attribute><delimiter><operator><delimiter><value>")
+      Seq(LIKE, UNLIKE).find(_.toString == parts(1)) match {
+        case Some(o) => Some(Constraint(parts(0), o, parts(2)))
+        case _ =>
+          logging.warn(this, s"ignoring unsupported constraint operator ${parts(1)}")
+          None
+      }
+    })
+
+  override def attach(id: ContainerId,
+                      ip: ContainerAddress,
+                      tid: TransactionId,
+                      actionImage: ExecManifest.ImageName,
+                      userProvidedImage: Boolean,
+                      memory: ByteSize): Future[Container] = {
+    logging.info(this, s"attaching to existing mesos task ${id}")
+    Future.successful(new MesosTask(id, ip, ec, logging, id.asString, mesosClientActor, mesosConfig))
   }
 
   override def init(): Unit = Unit
 
-  /** Cleanups any remaining Containers; should block until complete; should ONLY be run at shutdown. */
+  /** cleanup any remaining Containers; should block until complete; should ONLY be run at shutdown */
   override def cleanup(): Unit = {
-    val complete: Future[Any] = mesosClientActor.ask(Teardown)(teardownTimeout)
-    Try(Await.result(complete, teardownTimeout))
-      .map(_ => logging.info(this, "Mesos framework teardown completed."))
-      .recover {
-        case _: TimeoutException => logging.error(this, "Mesos framework teardown took too long.")
-        case t: Throwable =>
-          logging.error(this, s"Mesos framework teardown failed : $t}")
-      }
+    if (mesosConfig.teardownOnExit) {
+      val complete: Future[Any] = mesosClientActor.ask(Teardown)(teardownTimeout)
+      Try(Await.result(complete, teardownTimeout))
+        .map(_ => logging.info(this, "Mesos framework teardown completed."))
+        .recover {
+          case _: TimeoutException => logging.error(this, "Mesos framework teardown took too long.")
+          case t: Throwable =>
+            logging.error(this, s"Mesos framework teardown failed : ${t}")
+        }
+    }
   }
 }
 object MesosContainerFactory {
-  private def createClient(actorSystem: ActorSystem, mesosConfig: MesosConfig): ActorRef =
+  private var frameworkId: Option[String] = None
+  private def createClient(actorSystem: ActorSystem, mesosConfig: MesosConfig): ActorRef = {
+    implicit val cluster = Cluster(actorSystem)
+    implicit val logging = new AkkaLogging(actorSystem.log)
+    //create task store
+    val tasks = new DistributedDataTaskStore(actorSystem)
+
     actorSystem.actorOf(
-      MesosClient
-        .props(
-          "whisk-containerfactory-" + UUID(),
+      ClusterSingletonManager.props(
+        MesosClient.props(
+          () => {
+            logging.info(this, "reseting startTime...")
+            MesosContainerFactory.startTime = Instant.now.getEpochSecond
+            frameworkId.getOrElse("whisk-containerfactory-" + UUID())
+          },
           "whisk-containerfactory-framework",
           mesosConfig.masterUrl,
           mesosConfig.role,
-          mesosConfig.failoverTimeout))
+          mesosConfig.failoverTimeout,
+          autoSubscribe = true,
+          taskStore = tasks),
+        terminationMessage = PoisonPill,
+        settings = ClusterSingletonManagerSettings(actorSystem)),
+      name = "mesosClientMaster")
+    actorSystem.actorOf(
+      ClusterSingletonProxy
+        .props(singletonManagerPath = "/user/mesosClientMaster", settings = ClusterSingletonProxySettings(actorSystem)),
+      name = "mesosClientProxy")
+  }
 
   val counter = new Counter()
-  val startTime = Instant.now.getEpochSecond
+  private var startTime = Instant.now.getEpochSecond
   private def taskIdGenerator(): String = {
     s"whisk-${counter.next()}-${startTime}"
   }
diff --git a/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala b/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala
index 1d6ad86827..ccf4517b5b 100644
--- a/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala
+++ b/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala
@@ -24,6 +24,8 @@ import akka.stream.scaladsl.Source
 import akka.util.ByteString
 import akka.util.Timeout
 import com.adobe.api.platform.runtime.mesos.Bridge
+import com.adobe.api.platform.runtime.mesos.CommandDef
+import com.adobe.api.platform.runtime.mesos.Constraint
 import com.adobe.api.platform.runtime.mesos.DeleteTask
 import com.adobe.api.platform.runtime.mesos.Host
 import com.adobe.api.platform.runtime.mesos.Running
@@ -70,22 +72,23 @@ object MesosTask {
              userProvidedImage: Boolean = false,
              memory: ByteSize = 256.MB,
              cpuShares: Int = 0,
-             environment: Map[String, String] = Map(),
+             environment: Map[String, String] = Map.empty,
              network: String = "bridge",
-             dnsServers: Seq[String] = Seq(),
+             dnsServers: Seq[String] = Seq.empty,
              name: Option[String] = None,
-             parameters: Map[String, Set[String]] = Map())(implicit ec: ExecutionContext,
-                                                           log: Logging,
-                                                           as: ActorSystem): Future[Container] = {
+             parameters: Map[String, Set[String]] = Map.empty,
+             constraints: Seq[Constraint] = Seq.empty)(implicit ec: ExecutionContext,
+                                                       log: Logging,
+                                                       as: ActorSystem): Future[Container] = {
     implicit val tid = transid
 
-    log.info(this, s"creating task for image $image...")
+    log.info(this, s"creating task for image ${image}...")
 
-    val mesosCpuShares = cpuShares / 1024.0 // convert openwhisk (docker based) shares to mesos (cpu percentage)
+    val mesosCpuShares = cpuShares / 1024.0 //convert openwhisk (docker based) shares to mesos (cpu percentage)
     val mesosRam = memory.toMB.toInt
 
     val taskId = taskIdGenerator()
-    val lowerNetwork = network.toLowerCase // match bridge+host without case, but retain case for user specified network
+    val lowerNetwork = network.toLowerCase //match bridge+host without case, but retain case for user specified network
     val taskNetwork = lowerNetwork match {
       case "bridge" => Bridge
       case "host"   => Host
@@ -95,16 +98,17 @@ object MesosTask {
 
     val task = new TaskDef(
       taskId,
-      name.getOrElse(image), // task name either the indicated name, or else the image name
+      name.getOrElse(image), //task name either the indicated name, or else the image name
       image,
       mesosCpuShares,
       mesosRam,
-      List(8080), // all action containers listen on 8080
-      Some(0), // port at index 0 used for health
+      List(8080), //all action containers listen on 8080
+      Some(0), //port at index 0 used for health
       false,
       taskNetwork,
       dnsOrEmpty ++ parameters,
-      environment)
+      Some(CommandDef(environment)),
+      constraints.toSet)
 
     val launched: Future[Running] =
       mesosClientActor.ask(SubmitTask(task))(taskLaunchTimeout).mapTo[Running]
@@ -125,25 +129,27 @@ object MesosTask {
 object JsonFormatters extends DefaultJsonProtocol {
   implicit val createContainerJson = jsonFormat3(CreateContainer)
 }
-
-class MesosTask(override protected val id: ContainerId,
-                override protected val addr: ContainerAddress,
+class MesosTask(override val id: ContainerId,
+                override val addr: ContainerAddress,
                 override protected val ec: ExecutionContext,
                 override protected val logging: Logging,
                 taskId: String,
                 mesosClientActor: ActorRef,
                 mesosConfig: MesosConfig)
-    extends Container {
+    extends Container
+    with Serializable {
+
+  implicit val e = ec
 
   /** Stops the container from consuming CPU cycles. */
   override def suspend()(implicit transid: TransactionId): Future[Unit] = {
-    // suspend not supported
+    //suspend not supported
     Future.successful(Unit)
   }
 
   /** Dual of halt. */
   override def resume()(implicit transid: TransactionId): Future[Unit] = {
-    // resume not supported
+    //resume not supported
     Future.successful(Unit)
   }
 
@@ -153,7 +159,7 @@ class MesosTask(override protected val id: ContainerId,
       .ask(DeleteTask(taskId))(MesosTask.taskDeleteTimeout)
       .mapTo[TaskStatus]
       .map(taskStatus => {
-        // verify that task ended in TASK_KILLED state (but don't fail if it didn't...)
+        //verify that task ended in TASK_KILLED state (but don't fail if it didn't...)
         if (taskStatus.getState != TaskState.TASK_KILLED) {
           logging.error(this, s"task kill resulted in unexpected state ${taskStatus.getState}")
         } else {
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
index 570281d658..2509b8dec4 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
@@ -19,7 +19,6 @@ package whisk.core.loadBalancer
 
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.ThreadLocalRandom
-
 import akka.actor.{ActorSystem, Props}
 import akka.cluster.Cluster
 import akka.pattern.ask
@@ -35,11 +34,11 @@ import whisk.core.{ConfigKeys, WhiskConfig}
 import whisk.spi.SpiLoader
 import akka.event.Logging.InfoLevel
 import pureconfig._
-
 import scala.annotation.tailrec
 import scala.concurrent.duration._
 import scala.concurrent.{ExecutionContext, Future, Promise}
 import scala.util.{Failure, Success}
+import whisk.core.StaticSeedNodesProvider
 
 case class LoadbalancerConfig(blackboxFraction: Double, invokerBusyThreshold: Int)
 
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 1f5f398a87..09aecad5ed 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -20,7 +20,6 @@ package whisk.core.loadBalancer
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.ThreadLocalRandom
 import java.util.concurrent.atomic.LongAdder
-
 import akka.actor.{Actor, ActorSystem, Props}
 import akka.cluster.ClusterEvent._
 import akka.cluster.{Cluster, Member, MemberStatus}
@@ -35,12 +34,12 @@ import whisk.core.connector._
 import whisk.core.entity._
 import whisk.core.{ConfigKeys, WhiskConfig}
 import whisk.spi.SpiLoader
-
 import scala.annotation.tailrec
 import scala.collection.concurrent.TrieMap
 import scala.concurrent.duration._
 import scala.concurrent.{ExecutionContext, Future, Promise}
 import scala.util.{Failure, Success}
+import whisk.core.StaticSeedNodesProvider
 
 /**
  * A loadbalancer that uses "horizontal" sharding to not collide with fellow loadbalancers.
diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index bc651fcdfe..89b22f8dc3 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -44,4 +44,9 @@ whisk {
     dns-servers: []
     extra-args: {}
   }
+
+  container-pool {
+    singleton: false //indicates invokers will operate in an single-active/multi-passive cluster
+    passive-replicated-pools: false //indicates that invoker pools will be replicated to other passive cluster nodes
+  }
 }
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
index 801fc09273..01c966bd2e 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
@@ -17,20 +17,21 @@
 
 package whisk.core.containerpool
 
+import akka.actor.Actor
+import akka.actor.ActorRef
+import akka.actor.ActorRefFactory
+import akka.actor.Props
 import scala.collection.immutable
-
-import whisk.common.{AkkaLogging, LoggingMarkers, TransactionId}
-
-import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
-
+import scala.concurrent.duration._
+import whisk.common.AkkaLogging
+import whisk.common.LoggingMarkers
+import whisk.common.TransactionId
+import whisk.core.connector.MessageFeed
 import whisk.core.entity.ByteSize
 import whisk.core.entity.CodeExec
 import whisk.core.entity.EntityName
 import whisk.core.entity.ExecutableWhiskAction
 import whisk.core.entity.size._
-import whisk.core.connector.MessageFeed
-
-import scala.concurrent.duration._
 
 sealed trait WorkerState
 case object Busy extends WorkerState
@@ -38,6 +39,10 @@ case object Free extends WorkerState
 
 case class WorkerData(data: ContainerData, state: WorkerState)
 
+case object Prewarm //sent to initiate prewarming of containers
+
+case class ContainerPoolConfig(singleton: Boolean, passiveReplicatedPools: Boolean)
+
 /**
  * A pool managing containers to run actions on.
  *
@@ -63,21 +68,26 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
                     maxActiveContainers: Int,
                     maxPoolSize: Int,
                     feed: ActorRef,
-                    prewarmConfig: Option[PrewarmingConfig] = None)
+                    prewarmConfig: Option[PrewarmingConfig] = None,
+                    poolInitializer: ContainerPoolInitializer)
     extends Actor {
   implicit val logging = new AkkaLogging(context.system.log)
-
-  var freePool = immutable.Map.empty[ActorRef, ContainerData]
+  implicit val ec = context.dispatcher
+  var freePool: ContainerPoolMap = poolInitializer.createFreePool
   var busyPool = immutable.Map.empty[ActorRef, ContainerData]
-  var prewarmedPool = immutable.Map.empty[ActorRef, ContainerData]
+  var prewarmedPool: ContainerPoolMap = poolInitializer.createPrewarmPool
+
+  //initialize the pool
+  poolInitializer.initPool(self)
   val logMessageInterval = 10.seconds
 
-  prewarmConfig.foreach { config =>
-    logging.info(this, s"pre-warming ${config.count} ${config.exec.kind} containers")(TransactionId.invokerWarmup)
-    (1 to config.count).foreach { _ =>
-      prewarmContainer(config.exec, config.memoryLimit)
+  def prewarm() =
+    prewarmConfig.foreach { config =>
+      logging.info(this, s"pre-warming ${config.count} ${config.exec.kind} containers")(TransactionId.invokerWarmup)
+      (1 to config.count).foreach { _ =>
+        prewarmContainer(config.exec, config.memoryLimit)
+      }
     }
-  }
 
   def logContainerStart(r: Run, containerState: String): Unit = {
     val namespaceName = r.msg.user.namespace.name
@@ -92,6 +102,9 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
   }
 
   def receive: Receive = {
+    case Prewarm       => prewarm()
+    case a: Attach     => childFactory(context) ! a
+    case a: AttachFree => childFactory(context) ! a
     // A job to run on a container
     //
     // Run messages are received either via the feed or from child containers which cannot process
@@ -102,12 +115,12 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
 
         // Schedule a job to a warm container
         ContainerPool
-          .schedule(r.action, r.msg.user.namespace, freePool)
+          .schedule(r.action, r.msg.user.namespace, freePool.toMap)
           .map(container => {
             (container, "warm")
           })
           .orElse {
-            if (busyPool.size + freePool.size < maxPoolSize) {
+            if (busyPool.size + freePool.toMap.size < maxPoolSize) {
               takePrewarmContainer(r.action)
                 .map(container => {
                   (container, "prewarmed")
@@ -119,7 +132,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
           }
           .orElse {
             // Remove a container and create a new one for the given job
-            ContainerPool.remove(freePool).map { toDelete =>
+            ContainerPool.remove(freePool.toMap).map { toDelete =>
               removeContainer(toDelete)
               takePrewarmContainer(r.action)
                 .map(container => {
@@ -146,7 +159,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
           val retryLogDeadline = if (isErrorLogged) {
             logging.error(
               this,
-              s"Rescheduling Run message, too many message in the pool, freePoolSize: ${freePool.size}, " +
+              s"Rescheduling Run message, too many message in the pool, freePoolSize: ${freePool.toMap.size}, " +
                 s"busyPoolSize: ${busyPool.size}, maxActiveContainers $maxActiveContainers, " +
                 s"userNamespace: ${r.msg.user.namespace}, action: ${r.action}")(r.msg.transid)
             Some(logMessageInterval.fromNow)
@@ -158,6 +171,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
 
     // Container is free to take more work
     case NeedWork(data: WarmedData) =>
+      logging.info(this, s"NeedWork1 ${data}")
       freePool = freePool + (sender() -> data)
       busyPool.get(sender()).foreach { _ =>
         busyPool = busyPool - sender()
@@ -166,7 +180,9 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
 
     // Container is prewarmed and ready to take work
     case NeedWork(data: PreWarmedData) =>
+      logging.info(this, s"NeedWork2 ${data}")
       prewarmedPool = prewarmedPool + (sender() -> data)
+      logging.info(this, s"added to prewarm; new size of prewarm pool:${prewarmedPool.toMap.size}")
 
     // Container got removed
     case ContainerRemoved =>
@@ -210,10 +226,12 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
     prewarmConfig.flatMap { config =>
       val kind = action.exec.kind
       val memory = action.limits.memory.megabytes.MB
-      prewarmedPool
+      prewarmedPool.toMap
         .find {
-          case (_, PreWarmedData(_, `kind`, `memory`)) => true
-          case _                                       => false
+          case (_, PreWarmedData(_, `kind`, `memory`)) =>
+            logging.info(this, s"found prewarm for kind ${kind}")
+            true
+          case _ => false
         }
         .map {
           case (ref, data) =>
@@ -285,9 +303,21 @@ object ContainerPool {
             maxActive: Int,
             size: Int,
             feed: ActorRef,
-            prewarmConfig: Option[PrewarmingConfig] = None) =
-    Props(new ContainerPool(factory, maxActive, size, feed, prewarmConfig))
+            prewarmConfig: Option[PrewarmingConfig] = None,
+            poolInitializer: ContainerPoolInitializer = new DefaultContainerPoolInitializer) =
+    Props(new ContainerPool(factory, maxActive, size, feed, prewarmConfig, poolInitializer))
 }
 
 /** Contains settings needed to perform container prewarming */
 case class PrewarmingConfig(count: Int, exec: CodeExec[_], memoryLimit: ByteSize)
+
+trait ContainerPoolMap {
+  def +(kv: (ActorRef, ContainerData)): ContainerPoolMap
+  def -(k: ActorRef): ContainerPoolMap
+  def toMap: Map[ActorRef, ContainerData]
+}
+class DefaultContainerPoolMap(backing: Map[ActorRef, ContainerData] = immutable.Map.empty) extends ContainerPoolMap {
+  def +(kv: (ActorRef, ContainerData)) = new DefaultContainerPoolMap(backing + kv)
+  def -(k: ActorRef): ContainerPoolMap = new DefaultContainerPoolMap(backing - k)
+  def toMap = backing
+}
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPoolInitializer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPoolInitializer.scala
new file mode 100644
index 0000000000..4e5b0abe35
--- /dev/null
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPoolInitializer.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool
+
+import akka.actor.ActorRef
+
+trait ContainerPoolInitializer {
+  def createFreePool: ContainerPoolMap
+  def createPrewarmPool: ContainerPoolMap
+  def initPool(pool: ActorRef): Unit
+}
+
+class DefaultContainerPoolInitializer extends ContainerPoolInitializer {
+  def createFreePool = new DefaultContainerPoolMap()
+  def createPrewarmPool = new DefaultContainerPoolMap()
+  def initPool(pool: ActorRef) = {
+    pool ! Prewarm
+  }
+
+}
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index d50e07c2af..0ecb4f1d0d 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -37,6 +37,9 @@ import whisk.core.entity.size._
 import whisk.core.entity.ExecManifest.ImageName
 import whisk.http.Messages
 import akka.event.Logging.InfoLevel
+import whisk.common.TransactionId
+import whisk.core.entity.ByteSize
+import whisk.core.entity.ExecManifest.ImageName
 
 // States
 sealed trait ContainerState
@@ -61,6 +64,11 @@ case class WarmedData(container: Container,
 
 // Events received by the actor
 case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
+case class Attach(id: ContainerId, address: ContainerAddress, exec: CodeExec[_], memoryLimit: ByteSize)
+case class AttachFree(id: ContainerId,
+                      address: ContainerAddress,
+                      invocationNamespace: EntityName,
+                      action: ExecutableWhiskAction)
 case class Run(action: ExecutableWhiskAction, msg: ActivationMessage, retryLogDeadline: Option[Deadline] = None)
 case object Remove
 
@@ -92,6 +100,7 @@ case object RescheduleJob // job is sent back to parent and could not be process
  */
 class ContainerProxy(
   factory: (TransactionId, String, ImageName, Boolean, ByteSize) => Future[Container],
+  attachFactory: (ContainerId, ContainerAddress, TransactionId, ImageName, Boolean, ByteSize) => Future[Container],
   sendActiveAck: (TransactionId, WhiskActivation, Boolean, InstanceId) => Future[Any],
   storeActivation: (TransactionId, WhiskActivation) => Future[Any],
   collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
@@ -119,7 +128,32 @@ class ContainerProxy(
         .pipeTo(self)
 
       goto(Starting)
+    case Event(job: Attach, _) =>
+      logging.info(this, "adding existing container to pool...")
+      attachFactory(job.id, job.address, TransactionId.invokerWarmup, job.exec.image, job.exec.pull, job.memoryLimit)
+        .map(container => {
+          logging.info(this, s"got container ${container} from attachFactory...")
+          PreWarmedData(container, job.exec.kind, job.memoryLimit)
+        })
+        .pipeTo(self)
 
+      goto(Starting)
+    case Event(job: AttachFree, _) =>
+      logging.info(this, "adding existing container to pool...")
+      attachFactory(
+        job.id,
+        job.address,
+        TransactionId.invokerWarmup,
+        job.action.exec.image,
+        job.action.exec.pull,
+        job.action.limits.memory.megabytes.MB)
+        .map(container => {
+          logging.info(this, s"got container ${container} from attachFactory...")
+          WarmedData(container, job.invocationNamespace, job.action, Instant.now)
+        })
+        .pipeTo(self)
+
+      goto(Running)
     // cold start (no container to reuse or available stem cell container)
     case Event(job: Run, _) =>
       implicit val transid = job.msg.transid
@@ -171,6 +205,7 @@ class ContainerProxy(
   when(Starting) {
     // container was successfully obtained
     case Event(data: PreWarmedData, _) =>
+      logging.info(this, s"sending NeedWork to parent ${context.parent} for ${data}")
       context.parent ! NeedWork(data)
       goto(Started) using data
 
@@ -407,6 +442,12 @@ class ContainerProxy(
 }
 
 object ContainerProxy {
+  def emptyAttach(id: ContainerId,
+                  addr: ContainerAddress,
+                  tid: TransactionId,
+                  image: ImageName,
+                  collectLogs: Boolean,
+                  memorySize: ByteSize): Future[Container] = Future.failed(new Exception("attach is not supported"))
   def props(
     factory: (TransactionId, String, ImageName, Boolean, ByteSize) => Future[Container],
     ack: (TransactionId, WhiskActivation, Boolean, InstanceId) => Future[Any],
@@ -414,8 +455,10 @@ object ContainerProxy {
     collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
     instance: InstanceId,
     unusedTimeout: FiniteDuration = 10.minutes,
-    pauseGrace: FiniteDuration = 50.milliseconds) =
-    Props(new ContainerProxy(factory, ack, store, collectLogs, instance, unusedTimeout, pauseGrace))
+    pauseGrace: FiniteDuration = 50.milliseconds,
+    attachFactory: (ContainerId, ContainerAddress, TransactionId, ImageName, Boolean, ByteSize) => Future[Container] =
+      emptyAttach) =
+    Props(new ContainerProxy(factory, attachFactory, ack, store, collectLogs, instance, unusedTimeout, pauseGrace))
 
   // Needs to be thread-safe as it's used by multiple proxies concurrently.
   private val containerCount = new Counter
@@ -440,7 +483,8 @@ object ContainerProxy {
    * Creates a WhiskActivation ready to be sent via active ack.
    *
    * @param job the job that was executed
-   * @param interval the time it took to execute the job
+   * @param initInterval the time it took to init the job
+   * @param totalInterval the time it took to execute the job
    * @param response the response to return to the user
    * @return a WhiskActivation to be sent to the user
    */
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ReplicatedPoolInitializer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ReplicatedPoolInitializer.scala
new file mode 100644
index 0000000000..bd1e9009a2
--- /dev/null
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ReplicatedPoolInitializer.scala
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool
+
+import akka.actor.Actor
+import akka.actor.ActorRef
+import akka.actor.ActorSystem
+import akka.actor.PoisonPill
+import akka.actor.Props
+import akka.cluster.Cluster
+import akka.cluster.ClusterEvent.CurrentClusterState
+import akka.cluster.ClusterEvent.InitialStateAsEvents
+import akka.cluster.ClusterEvent.MemberUp
+import akka.cluster.ClusterEvent.UnreachableMember
+import akka.cluster.ddata.DistributedData
+import akka.cluster.ddata.ORSet
+import akka.cluster.ddata.ORSetKey
+import akka.cluster.ddata.Replicator
+import akka.cluster.ddata.Replicator.Get
+import akka.cluster.ddata.Replicator.ReadLocal
+import akka.cluster.ddata.Replicator.Update
+import akka.cluster.ddata.Replicator.WriteLocal
+import akka.cluster.singleton.ClusterSingletonManager
+import akka.cluster.singleton.ClusterSingletonManagerSettings
+import java.time.Instant
+import scala.collection.immutable
+import spray.json.JsValue
+import whisk.common.Logging
+import whisk.common.TransactionId
+import whisk.core.entity.ByteSize
+import whisk.core.entity.CodeExecAsString
+import whisk.core.entity.DocRevision
+import whisk.core.entity.EntityName
+import whisk.core.entity.ExecManifest
+import whisk.core.entity.FullyQualifiedEntityName
+import whisk.core.entity.WhiskAction
+import whisk.core.entity.types.EntityStore
+
+case object Active //sent to initiate making this ContainerPool active (when operating in active/passive mode)
+
+sealed abstract class ReplicatedType
+case class ReplicatedPrewarm(id: ContainerId, ip: ContainerAddress, kind: String, memoryLimit: ByteSize)
+    extends ReplicatedType
+case class ReplicatedWarm(id: ContainerId,
+                          ip: ContainerAddress,
+                          lastUsed: Instant,
+                          invocationNamespace: String,
+                          action: JsValue,
+                          actionRev: String)
+    extends ReplicatedType
+
+class ReplicatedPoolInitializer(as: ActorSystem, entityStore: EntityStore)(implicit logging: Logging)
+    extends ContainerPoolInitializer {
+
+  implicit val cluster = Cluster(as)
+
+  val replicator = DistributedData(as).replicator
+  val prewarmPoolKey = ORSetKey[ReplicatedType]("containerPrewarmPoolData")
+  val freePoolKey = ORSetKey[ReplicatedType]("containerFreePoolData")
+
+  override def initPool(pool: ActorRef): Unit = {
+    //do not init prewarms here
+    as.actorOf(Props(new ReplicatedContainerPoolActor(replicator, prewarmPoolKey, freePoolKey, pool, entityStore)))
+  }
+
+  override def createFreePool: ContainerPoolMap =
+    new ReplicatedMap[ReplicatedType](
+      replicator,
+      freePoolKey, {
+        case _: PreWarmedData =>
+          logging.debug(this, "skipping replication of PreWarmedData on freePool")
+          None
+        case _: NoData =>
+          logging.debug(this, "skipping replication of NoData on freePool")
+          None
+        case w: WarmedData =>
+          Some(
+            ReplicatedWarm(
+              w.container.id,
+              w.container.addr,
+              w.lastUsed,
+              w.invocationNamespace.toString,
+              FullyQualifiedEntityName.serdes.write(w.action.fullyQualifiedName(true)),
+              w.action.rev.asString))
+        case t =>
+          logging.warn(this, s"could not convert ${t} to replicated freePool ")
+          None
+      })
+
+  override def createPrewarmPool: ContainerPoolMap =
+    new ReplicatedMap[ReplicatedType](
+      replicator,
+      prewarmPoolKey, {
+        case p: PreWarmedData =>
+          Some(ReplicatedPrewarm(p.container.id, p.container.addr, p.kind, p.memoryLimit))
+        case _: NoData =>
+          logging.debug(this, "skipping replication of NoData on prewarmPool")
+          None
+        case t =>
+          logging.warn(this, s"could not convert ${t} to replicated prewarmPool ")
+          None
+      })
+}
+
+private class ReplicatedContainerPoolActor(replicator: ActorRef,
+                                           prewarmPoolKey: ORSetKey[ReplicatedType],
+                                           freePoolKey: ORSetKey[ReplicatedType],
+                                           pool: ActorRef,
+                                           entityStore: EntityStore)(implicit logging: Logging, cluster: Cluster)
+    extends Actor { //} ContainerPool(childFactory, maxActiveContainers, maxPoolSize, feed, prewarmConfig, entityStore) {
+
+  implicit val ec = context.dispatcher
+  var active = false
+
+  //use a ClusterSingleton to signal that this nodes ContainerPool should now become active (and resurrect pool maps)
+  val replicatedPool = self
+  val poolActivatorMaster = context.system.actorOf(
+    ClusterSingletonManager.props(Props(new Actor {
+      def receive = Actor.emptyBehavior
+      override def preStart(): Unit = {
+        logging.info(this, "activating pool...")
+        replicatedPool ! Active
+
+      }
+    }), terminationMessage = PoisonPill, settings = ClusterSingletonManagerSettings(context.system)),
+    name = "poolActivatorMaster")
+
+  override def preStart(): Unit = {
+    //do not prewarm, just subscribe to cluster
+    cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberUp], classOf[UnreachableMember])
+
+  }
+  override def postStop(): Unit = cluster.unsubscribe(self)
+  override def receive = {
+
+    case Active =>
+      replicator ! Get(prewarmPoolKey, ReadLocal)
+      replicator ! Get(freePoolKey, ReadLocal)
+    case Replicator.GetFailure(`prewarmPoolKey`, _) =>
+      logging.warn(this, "failed to locate replicated prewarms, creating new")
+      pool ! Prewarm
+    case Replicator.NotFound(`prewarmPoolKey`, _) =>
+      logging.info(this, "no prewarms found, creating new")
+      pool ! Prewarm
+    case g @ Replicator.GetSuccess(`prewarmPoolKey`, _) =>
+      val value = g.get(prewarmPoolKey).elements
+      if (value.size == 0) {
+        logging.info(this, "no existing  prewarms, creating new")
+        pool ! Prewarm
+      } else {
+        logging
+          .info(this, s"resurrecting ${value.size} prewarm containers from replicated data")
+
+        value.foreach(r => {
+          r match {
+            case p: ReplicatedPrewarm =>
+              val prewarmExec = ExecManifest.runtimesManifest
+                .resolveDefaultRuntime(p.kind)
+                .map { manifest =>
+                  new CodeExecAsString(manifest, "", None)
+                }
+                .get
+              //attachPrewarmContainer(p.id, p.ip, prewarmExec, p.memoryLimit)
+              pool ! Attach(p.id, p.ip, prewarmExec, p.memoryLimit)
+            case t => logging.error(this, s"cannot attach ${t} to prewarmPool ")
+          }
+
+        })
+      }
+    case Replicator.GetFailure(`freePoolKey`, _) =>
+      logging.warn(this, "failed to locate replicated freepool, leaving empty")
+    case Replicator.NotFound(`freePoolKey`, _) =>
+      logging.info(this, "no freepool found, leaving empty")
+    case g @ Replicator.GetSuccess(`freePoolKey`, _) =>
+      val value = g.get(freePoolKey).elements
+      if (value.size == 0) {
+        logging.info(this, "no existing freepool, creating new")
+      } else {
+        logging
+          .info(this, s"resurrecting ${value.size} freepool containers from replicated data")
+        value.foreach(r => {
+          r match {
+            case f: ReplicatedWarm =>
+              val invocationNamespace = EntityName(f.invocationNamespace)
+              val actionid =
+                FullyQualifiedEntityName.serdes.read(f.action).toDocId.asDocInfo(DocRevision(f.actionRev))
+              implicit val transid = TransactionId.invokerWarmup
+              WhiskAction
+                .get(entityStore, actionid.id, actionid.rev, fromCache = actionid.rev != DocRevision.empty)
+                .map { action =>
+                  action.toExecutableWhiskAction match {
+                    case Some(executable) =>
+                      //attachFreeContainer(f.id, f.ip, invocationNamespace, executable)
+                      pool ! AttachFree(f.id, f.ip, invocationNamespace, executable)
+                    case None =>
+                      logging.error(
+                        this,
+                        s"non-executable action reached the pool via replication ${action.fullyQualifiedName(false)}")
+                  }
+                }
+            case t => logging.error(this, s"cannot attach ${t} to freePool ")
+          }
+
+        })
+      }
+
+    case UnreachableMember(member) =>
+      logging.info(this, s"Member detected as unreachable: ${member}")
+      //TODO: verify down at marathon, then remove
+      Cluster.get(context.system).down(member.address)
+    case c: CurrentClusterState =>
+      logging.info(this, s"current cluster state ${c}")
+    case MemberUp(member) =>
+      logging.info(this, s"Member is up ${member}")
+  }
+}
+
+/** A ContainerPoolMap that replicates adds/removes to other cluster nodes */
+class ReplicatedMap[R <: ReplicatedType](
+  replicator: ActorRef,
+  key: ORSetKey[R],
+  adapter: (ContainerData) => Option[R],
+  backing: immutable.Map[ActorRef, ContainerData] = immutable.Map.empty)(implicit logging: Logging, cluster: Cluster)
+    extends DefaultContainerPoolMap(backing) {
+  override def +(kv: (ActorRef, ContainerData)) = {
+    //first replicate
+    adapter(kv._2).foreach(r => replicator ! Update(key, ORSet.empty[R], WriteLocal)(_ + r))
+    //then return a new Map
+    new ReplicatedMap(replicator, key, adapter, backing + kv)
+  }
+  override def -(k: ActorRef): ContainerPoolMap = {
+    //first replicate
+    backing.get(k).foreach(adapter(_).foreach(r => replicator ! Update(key, ORSet.empty[R], WriteLocal)(_ - r)))
+    //then return a new Map
+    new ReplicatedMap[R](replicator, key, adapter, backing - k)
+  }
+}
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
index 5c959de4f2..7e8b6a361b 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
@@ -142,13 +142,12 @@ object DockerContainer {
  * @param id the id of the container
  * @param addr the ip of the container
  */
-class DockerContainer(protected val id: ContainerId,
-                      protected val addr: ContainerAddress,
-                      protected val useRunc: Boolean)(implicit docker: DockerApiWithFileAccess,
-                                                      runc: RuncApi,
-                                                      as: ActorSystem,
-                                                      protected val ec: ExecutionContext,
-                                                      protected val logging: Logging)
+class DockerContainer(val id: ContainerId, val addr: ContainerAddress, protected val useRunc: Boolean)(
+  implicit docker: DockerApiWithFileAccess,
+  runc: RuncApi,
+  as: ActorSystem,
+  protected val ec: ExecutionContext,
+  protected val logging: Logging)
     extends Container {
 
   /** The last read-position in the log file */
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
index ff0dfb5f78..abbb4c8bb4 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
@@ -34,7 +34,9 @@ import scala.concurrent.duration._
 import java.util.concurrent.TimeoutException
 import pureconfig._
 import whisk.core.ConfigKeys
+import whisk.core.containerpool.ContainerAddress
 import whisk.core.containerpool.ContainerArgsConfig
+import whisk.core.containerpool.ContainerId
 
 class DockerContainerFactory(config: WhiskConfig,
                              instance: InstanceId,
@@ -78,6 +80,13 @@ class DockerContainerFactory(config: WhiskConfig,
 
   /** Perform cleanup on init */
   override def init(): Unit = removeAllActionContainers()
+  override def attach(id: ContainerId,
+                      ip: ContainerAddress,
+                      tid: TransactionId,
+                      actionImage: ExecManifest.ImageName,
+                      userProvidedImage: Boolean,
+                      memory: ByteSize): Future[Container] =
+    Future.failed(new Exception("DockerContainerFactory cannot attach to existing containers"))
 
   /** Perform cleanup on exit - to be registered as shutdown hook */
   override def cleanup(): Unit = {
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
index 9f0049c2a2..7fcb786d85 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -87,8 +87,8 @@ object KubernetesContainer {
  * @param workerIP the ip of the workernode on which the container is executing
  * @param nativeContainerId the docker/containerd lowlevel id for the container
  */
-class KubernetesContainer(protected[core] val id: ContainerId,
-                          protected[core] val addr: ContainerAddress,
+class KubernetesContainer(val id: ContainerId,
+                          val addr: ContainerAddress,
                           protected[core] val workerIP: String,
                           protected[core] val nativeContainerId: String)(implicit kubernetes: KubernetesApi,
                                                                          protected val ec: ExecutionContext,
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
index 8b2d9189a3..7d1debee6e 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
@@ -24,7 +24,6 @@ import scala.concurrent.Await
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration._
-
 import whisk.common.Logging
 import whisk.common.TransactionId
 import whisk.core.containerpool.Container
@@ -34,6 +33,8 @@ import whisk.core.entity.ByteSize
 import whisk.core.entity.ExecManifest.ImageName
 import whisk.core.entity.InstanceId
 import whisk.core.{ConfigKeys, WhiskConfig}
+import whisk.core.containerpool.ContainerAddress
+import whisk.core.containerpool.ContainerId
 
 class KubernetesContainerFactory(label: String, config: WhiskConfig)(implicit actorSystem: ActorSystem,
                                                                      ec: ExecutionContext,
@@ -80,6 +81,14 @@ class KubernetesContainerFactory(label: String, config: WhiskConfig)(implicit ac
       environment = Map("__OW_API_HOST" -> config.wskApiHost),
       labels = Map("invoker" -> label))
   }
+
+  override def attach(id: ContainerId,
+                      ip: ContainerAddress,
+                      tid: TransactionId,
+                      actionImage: ImageName,
+                      userProvidedImage: Boolean,
+                      memory: ByteSize): Future[Container] =
+    Future.failed(new Exception("KubernetesContainerFactory cannot attach to existing containers"))
 }
 
 object KubernetesContainerFactoryProvider extends ContainerFactoryProvider {
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index e8e1daef14..4a3e2601ce 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -60,7 +60,8 @@ object Invoker {
       dockerImageTag -> "latest",
       invokerNumCore -> "4",
       invokerCoreShare -> "2",
-      invokerUseRunc -> "true") ++
+      invokerUseRunc -> "true",
+      seedNodes -> "") ++
       Map(invokerName -> "")
 
   def main(args: Array[String]): Unit = {
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 0729103a6f..1e42358b0f 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -19,8 +19,14 @@ package whisk.core.invoker
 
 import java.nio.charset.StandardCharsets
 import java.time.Instant
-
-import akka.actor.{ActorRefFactory, ActorSystem, Props}
+import akka.actor.ActorRefFactory
+import akka.actor.ActorSystem
+import akka.actor.PoisonPill
+import akka.actor.Props
+import akka.cluster.singleton.ClusterSingletonManager
+import akka.cluster.singleton.ClusterSingletonManagerSettings
+import akka.cluster.singleton.ClusterSingletonProxy
+import akka.cluster.singleton.ClusterSingletonProxySettings
 import akka.event.Logging.InfoLevel
 import akka.stream.ActorMaterializer
 import org.apache.kafka.common.errors.RecordTooLargeException
@@ -36,7 +42,6 @@ import whisk.core.entity._
 import whisk.core.entity.size._
 import whisk.http.Messages
 import whisk.spi.SpiLoader
-
 import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.duration._
 import scala.util.{Failure, Success}
@@ -100,9 +105,41 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
     maximumContainers,
     maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
 
-  private val activationFeed = actorSystem.actorOf(Props {
-    new MessageFeed("activation", logging, consumer, maximumContainers, 500.milliseconds, processActivationMessage)
-  })
+  val poolConfig = loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool)
+
+  //for replicated pools, use the ReplicatedPoolInitializer
+  val poolInitializer = if (poolConfig.passiveReplicatedPools) {
+    new ReplicatedPoolInitializer(actorSystem, entityStore)
+  } else {
+    new DefaultContainerPoolInitializer
+  }
+
+  //for singleton ContainerPool, only activate a single MessageFeed for the "activation" topic
+  val activationFeed = if (poolConfig.singleton) {
+    actorSystem.actorOf(
+      ClusterSingletonManager.props(
+        Props(
+          new MessageFeed(
+            "activation",
+            logging,
+            consumer,
+            maximumContainers,
+            500.milliseconds,
+            processActivationMessage)),
+        terminationMessage = PoisonPill,
+        settings = ClusterSingletonManagerSettings(actorSystem)),
+      name = "activationFeedMaster")
+    actorSystem.actorOf(
+      ClusterSingletonProxy
+        .props(
+          singletonManagerPath = "/user/activationFeedMaster",
+          settings = ClusterSingletonProxySettings(actorSystem)),
+      name = "activationFeedProxy")
+  } else {
+    actorSystem.actorOf(Props {
+      new MessageFeed("activation", logging, consumer, maximumContainers, 500.milliseconds, processActivationMessage)
+    })
+  }
 
   /** Sends an active-ack. */
   private val ack = (tid: TransactionId,
@@ -140,7 +177,15 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
 
   /** Creates a ContainerProxy Actor when being called. */
   private val childFactory = (f: ActorRefFactory) =>
-    f.actorOf(ContainerProxy.props(containerFactory.createContainer, ack, store, logsProvider.collectLogs, instance))
+    f.actorOf(
+      ContainerProxy
+        .props(
+          containerFactory.createContainer,
+          ack,
+          store,
+          logsProvider.collectLogs,
+          instance,
+          attachFactory = containerFactory.attach))
 
   private val prewarmKind = "nodejs:6"
   private val prewarmExec = ExecManifest.runtimesManifest
@@ -154,7 +199,8 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
       maximumContainers,
       maximumContainers,
       activationFeed,
-      Some(PrewarmingConfig(2, prewarmExec, 256.MB))))
+      Some(PrewarmingConfig(2, prewarmExec, 256.MB)),
+      poolInitializer))
 
   /** Is called when an ActivationMessage is read from Kafka */
   def processActivationMessage(bytes: Array[Byte]): Future[Unit] = {
diff --git a/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala b/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
index deb23ffc26..005aed57eb 100644
--- a/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
@@ -23,17 +23,19 @@ import akka.stream.scaladsl.Sink
 import akka.testkit.TestKit
 import akka.testkit.TestProbe
 import com.adobe.api.platform.runtime.mesos.Bridge
+import com.adobe.api.platform.runtime.mesos.CommandDef
+import com.adobe.api.platform.runtime.mesos.Constraint
 import com.adobe.api.platform.runtime.mesos.DeleteTask
+import com.adobe.api.platform.runtime.mesos.LIKE
 import com.adobe.api.platform.runtime.mesos.Running
 import com.adobe.api.platform.runtime.mesos.SubmitTask
 import com.adobe.api.platform.runtime.mesos.Subscribe
 import com.adobe.api.platform.runtime.mesos.SubscribeComplete
 import com.adobe.api.platform.runtime.mesos.TaskDef
+import com.adobe.api.platform.runtime.mesos.UNLIKE
 import com.adobe.api.platform.runtime.mesos.User
 import common.StreamLogging
-import org.apache.mesos.v1.Protos.AgentID
 import org.apache.mesos.v1.Protos.TaskID
-import org.apache.mesos.v1.Protos.TaskInfo
 import org.apache.mesos.v1.Protos.TaskState
 import org.apache.mesos.v1.Protos.TaskStatus
 import org.junit.runner.RunWith
@@ -88,7 +90,7 @@ class MesosContainerFactoryTest
 
   it should "send Subscribe on init" in {
     val wskConfig = new WhiskConfig(Map())
-    val mesosConfig = MesosConfig("http://master:5050", None, "*", 0.seconds, true)
+    val mesosConfig = MesosConfig("http://master:5050", None, "*", 0.seconds, true, Seq.empty, " ", Seq.empty, true)
     new MesosContainerFactory(
       wskConfig,
       system,
@@ -101,8 +103,17 @@ class MesosContainerFactoryTest
     expectMsg(Subscribe)
   }
 
-  it should "send SubmitTask on create" in {
-    val mesosConfig = MesosConfig("http://master:5050", None, "*", 0.seconds, true)
+  it should "send SubmitTask (with constraints) on create" in {
+    val mesosConfig = MesosConfig(
+      "http://master:5050",
+      None,
+      "*",
+      0.seconds,
+      true,
+      Seq("att1 LIKE v1", "att2 UNLIKE v2"),
+      " ",
+      Seq("bbatt1 LIKE v1", "bbatt2 UNLIKE v2"),
+      true)
 
     val factory =
       new MesosContainerFactory(
@@ -136,11 +147,12 @@ class MesosContainerFactoryTest
           "dns" -> Set("dns1", "dns2"),
           "extra1" -> Set("e1", "e2"),
           "extra2" -> Set("e3", "e4")),
-        Map("__OW_API_HOST" -> wskConfig.wskApiHost))))
+        Some(CommandDef(Map("__OW_API_HOST" -> wskConfig.wskApiHost))),
+        Seq(Constraint("att1", LIKE, "v1"), Constraint("att2", UNLIKE, "v2")).toSet)))
   }
 
   it should "send DeleteTask on destroy" in {
-    val mesosConfig = MesosConfig("http://master:5050", None, "*", 0.seconds, true)
+    val mesosConfig = MesosConfig("http://master:5050", None, "*", 0.seconds, true, Seq.empty, " ", Seq.empty, true)
 
     val probe = TestProbe()
     val factory =
@@ -156,7 +168,7 @@ class MesosContainerFactoryTest
 
     probe.expectMsg(Subscribe)
     //emulate successful subscribe
-    probe.reply(new SubscribeComplete)
+    probe.reply(new SubscribeComplete("testid"))
 
     //create the container
     val c = factory.createContainer(TransactionId.testing, "mesosContainer", ImageName("fakeImage"), false, 1.MB)
@@ -178,19 +190,15 @@ class MesosContainerFactoryTest
           "dns" -> Set("dns1", "dns2"),
           "extra1" -> Set("e1", "e2"),
           "extra2" -> Set("e3", "e4")),
-        Map("__OW_API_HOST" -> wskConfig.wskApiHost))))
+        Some(CommandDef(Map("__OW_API_HOST" -> wskConfig.wskApiHost))))))
 
     //emulate successful task launch
     val taskId = TaskID.newBuilder().setValue(lastTaskId)
 
     probe.reply(
       Running(
-        TaskInfo
-          .newBuilder()
-          .setName("testTask")
-          .setTaskId(taskId)
-          .setAgentId(AgentID.newBuilder().setValue("testAgentID"))
-          .build(),
+        taskId.getValue,
+        "testAgentID",
         TaskStatus.newBuilder().setTaskId(taskId).setState(TaskState.TASK_RUNNING).build(),
         "agenthost",
         Seq(30000)))
@@ -209,7 +217,7 @@ class MesosContainerFactoryTest
   }
 
   it should "return static message for logs" in {
-    val mesosConfig = MesosConfig("http://master:5050", None, "*", 0.seconds, true)
+    val mesosConfig = MesosConfig("http://master:5050", None, "*", 0.seconds, true, Seq.empty, " ", Seq.empty, true)
 
     val probe = TestProbe()
     val factory =
@@ -225,7 +233,7 @@ class MesosContainerFactoryTest
 
     probe.expectMsg(Subscribe)
     //emulate successful subscribe
-    probe.reply(new SubscribeComplete)
+    probe.reply(new SubscribeComplete("testid"))
 
     //create the container
     val c = factory.createContainer(TransactionId.testing, "mesosContainer", ImageName("fakeImage"), false, 1.MB)
@@ -247,19 +255,15 @@ class MesosContainerFactoryTest
           "other" -> Set("v5", "v6"),
           "extra1" -> Set("e1", "e2"),
           "extra2" -> Set("e3", "e4")),
-        Map("__OW_API_HOST" -> wskConfig.wskApiHost))))
+        Some(CommandDef(Map("__OW_API_HOST" -> wskConfig.wskApiHost))))))
 
     //emulate successful task launch
     val taskId = TaskID.newBuilder().setValue(lastTaskId)
 
     probe.reply(
       Running(
-        TaskInfo
-          .newBuilder()
-          .setName("testTask")
-          .setTaskId(taskId)
-          .setAgentId(AgentID.newBuilder().setValue("testAgentID"))
-          .build(),
+        taskId.getValue,
+        "testAgentID",
         TaskStatus.newBuilder().setTaskId(taskId).setState(TaskState.TASK_RUNNING).build(),
         "agenthost",
         Seq(30000)))
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 5a130848c4..37cf282aeb 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -672,8 +672,8 @@ class ContainerProxyTests
    * Implements all the good cases of a perfect run to facilitate error case overriding.
    */
   class TestContainer extends Container {
-    protected val id = ContainerId("testcontainer")
-    protected val addr = ContainerAddress("0.0.0.0")
+    val id = ContainerId("testcontainer")
+    val addr = ContainerAddress("0.0.0.0")
     protected implicit val logging: Logging = log
     protected implicit val ec: ExecutionContext = system.dispatcher
     var suspendCount = 0
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/SeedNodesProviderTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/SeedNodesProviderTests.scala
index ef4ae850c0..67a9d21b09 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/SeedNodesProviderTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/SeedNodesProviderTests.scala
@@ -21,7 +21,7 @@ import akka.actor.Address
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{FlatSpec, Matchers}
-import whisk.core.loadBalancer.StaticSeedNodesProvider
+import whisk.core.StaticSeedNodesProvider
 
 @RunWith(classOf[JUnitRunner])
 class SeedNodesProviderTests extends FlatSpec with Matchers {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services