You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2017/06/22 15:39:02 UTC

[incubator-openwhisk] branch master updated: Add ability to deploy a "hot-standby" controller (#2205)

This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new fdbf073  Add ability to deploy a "hot-standby" controller (#2205)
fdbf073 is described below

commit fdbf073a386c33aed1b06ed93eaea39ee4382c7b
Author: Christian Bickel <gi...@cbickel.de>
AuthorDate: Thu Jun 22 17:38:59 2017 +0200

    Add ability to deploy a "hot-standby" controller (#2205)
    
    It is now possible to deploy a hot-standby controller. Each controller needs its own instance. This instance is a consecutive numbering, starting with 0. The state and cache of each controller is not shared to the other controllers. If the base controller crashes, the hot-standby controller will be used. After the base controller is up again, it will be used again. Because of the empty cache after restart, there are no problems with inconsistency. The only problem that could occur is, [...]
---
 ansible/group_vars/all                             |   4 +-
 ansible/roles/controller/tasks/clean.yml           |  16 +++-
 ansible/roles/controller/tasks/deploy.yml          |  16 ++--
 ansible/roles/kafka/tasks/deploy.yml               |  10 +-
 ansible/roles/nginx/templates/nginx.conf.j2        |  20 +++-
 ansible/templates/whisk.properties.j2              |   9 +-
 .../main/scala/whisk/common/TransactionId.scala    |  10 +-
 .../connector/kafka/KafkaConsumerConnector.scala   |   4 +-
 .../src/main/scala/whisk/core/WhiskConfig.scala    |  12 +--
 .../main/scala/whisk/core/connector/Message.scala  |   4 +-
 .../main/scala/whisk/core/entity/InstanceId.scala  |  27 ++++++
 .../main/scala/whisk/http/BasicRasService.scala    |  28 ------
 core/controller/Dockerfile                         |   3 +-
 .../scala/whisk/core/controller/Controller.scala   |  33 +++++--
 .../scala/whisk/core/controller/RestAPIs.scala     |   7 +-
 .../core/controller/actions/PrimitiveActions.scala |   9 +-
 .../core/entitlement/ActivationThrottler.scala     |  32 +++----
 .../scala/whisk/core/entitlement/Entitlement.scala |   3 -
 .../core/loadBalancer/InvokerSupervision.scala     |   6 +-
 .../core/loadBalancer/LoadBalancerService.scala    |  11 ++-
 .../scala/whisk/core/container/ContainerPool.scala |   4 +-
 .../whisk/core/container/WhiskContainer.scala      |   1 +
 .../whisk/core/containerpool/ContainerProxy.scala  |   8 +-
 .../main/scala/whisk/core/invoker/Invoker.scala    |  35 ++++---
 .../scala/whisk/core/invoker/InvokerReactive.scala |  14 +--
 .../scala/whisk/core/invoker/InvokerServer.scala   |   7 +-
 tests/src/test/scala/common/LoggedFunction.scala   |  10 ++
 tests/src/test/scala/common/WhiskProperties.java   |  16 ++--
 tests/src/test/scala/services/HeadersTests.scala   |   2 +-
 tests/src/test/scala/services/PingTests.scala      | 102 +++++++++++++--------
 .../whisk/core/cli/test/WskBasicUsageTests.scala   |   2 +-
 .../core/container/test/ContainerPoolTests.scala   |   3 +-
 .../containerpool/test/ContainerPoolTests.scala    |   1 +
 .../containerpool/test/ContainerProxyTests.scala   |   3 +-
 .../controller/test/ControllerTestCommon.scala     |   6 +-
 .../scala/whisk/core/database/test/DbUtils.scala   |   5 +-
 .../core/dispatcher/test/DispatcherTests.scala     |   2 +-
 .../test/InvokerSupervisionTests.scala             |  10 +-
 tools/admin/wskadmin                               |   2 +-
 tools/build/checkLogs.py                           |   2 +-
 40 files changed, 297 insertions(+), 202 deletions(-)

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 4e42368..c4f438b 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -89,10 +89,11 @@ defaultLimits:
 
 # port means outer port
 controller:
-  port: 10001
+  basePort: 10001
   heap: "{{ controller_heap | default('2g') }}"
   arguments: "{{ controller_arguments | default('') }}"
   blackboxFraction: 0.10
+  instances: "{{ groups['controllers'] | length }}"
 
 consul:
   confdir: "{{ config_root_dir }}/consul"
@@ -129,6 +130,7 @@ invoker:
   serializeDockerPull: true
   useRunc: false
   useReactivePool: "{{ invoker_use_reactive_pool | default(false) }}"
+  instances: "{{ groups['invokers'] | length }}"
 
 nginx:
   confdir: "{{ config_root_dir }}/nginx"
diff --git a/ansible/roles/controller/tasks/clean.yml b/ansible/roles/controller/tasks/clean.yml
index ba173b1..24910f6 100644
--- a/ansible/roles/controller/tasks/clean.yml
+++ b/ansible/roles/controller/tasks/clean.yml
@@ -3,6 +3,20 @@
 
 - name: remove controller
   docker_container:
+    name: "controller{{ groups['controllers'].index(inventory_hostname) }}"
+    image: "{{ docker_registry }}{{ docker_image_prefix }}/controller:{{ docker_image_tag }}"
+    state: absent
+  ignore_errors: True
+
+- name: remove controller log directory
+  file:
+    path: "{{ whisk_logs_dir }}/controller{{ groups['controllers'].index(inventory_hostname) }}"
+    state: absent
+  become: true
+
+# Remove controller without prefix
+- name: remove controller
+  docker_container:
     name: controller
     image: "{{ docker_registry }}{{ docker_image_prefix }}/controller:{{ docker_image_tag }}"
     state: absent
@@ -12,4 +26,4 @@
   file:
     path: "{{ whisk_logs_dir }}/controller"
     state: absent
-  become: true
\ No newline at end of file
+  become: true
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 9dfbae4..e9b91ca 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -10,41 +10,41 @@
 
 - name: ensure controller log directory is created with permissions
   file:
-    path: "{{ whisk_logs_dir }}/controller"
+    path: "{{ whisk_logs_dir }}/controller{{ groups['controllers'].index(inventory_hostname) }}"
     state: directory
     mode: 0777
   become: true
 
 - name: (re)start controller
   docker_container:
-    name: controller
+    name: controller{{ groups['controllers'].index(inventory_hostname) }}
     image: "{{ docker_registry }}{{ docker_image_prefix }}/controller:{{ docker_image_tag }}"
     state: started
     recreate: true
     restart_policy: "{{ docker.restart.policy }}"
-    hostname: controller
+    hostname: "controller{{ groups['controllers'].index(inventory_hostname) }}"
     env:
-      "COMPONENT_NAME": "controller"
+      "COMPONENT_NAME": "controller{{ groups['controllers'].index(inventory_hostname) }}"
       "CONSULSERVER_HOST": "{{ groups['consul_servers'] | first }}"
       "CONSUL_HOST_PORT4": "{{ consul.port.http }}"
       "PORT": 8080
       "WHISK_VERSION_NAME": "{{ whisk_version_name }}"
       "WHISK_VERSION_DATE": "{{ whisk.version.date }}"
       "WHISK_VERSION_BUILDNO": "{{ docker_image_tag }}"
-      "KAFKA_NUMPARTITIONS": 2
       "SERVICE_CHECK_HTTP": "/ping"
       "SERVICE_CHECK_TIMEOUT": "2s"
       "SERVICE_CHECK_INTERVAL": "15s"
       "JAVA_OPTS": "-Xmx{{ controller.heap }}"
       "CONTROLLER_OPTS": "{{ controller.arguments }}"
     volumes:
-      - "{{ whisk_logs_dir }}/controller:/logs"
+      - "{{ whisk_logs_dir }}/controller{{ groups['controllers'].index(inventory_hostname) }}:/logs"
     ports:
-      - "{{ controller.port }}:8080"
+      - "{{ controller.basePort + groups['controllers'].index(inventory_hostname) }}:8080"
+    command: /bin/sh -c "controller/bin/controller {{ groups['controllers'].index(inventory_hostname) }} >> /logs/controller{{ groups['controllers'].index(inventory_hostname) }}_logs.log 2>&1"
 
 - name: wait until the Controller in this host is up and running
   uri:
-    url: "http://{{ inventory_hostname }}:{{ controller.port }}/ping"
+    url: "http://{{ inventory_hostname }}:{{ controller.basePort + groups['controllers'].index(inventory_hostname) }}/ping"
   register: result
   until: result.status == 200
   retries: 12
diff --git a/ansible/roles/kafka/tasks/deploy.yml b/ansible/roles/kafka/tasks/deploy.yml
index ecccd59..d438d33 100644
--- a/ansible/roles/kafka/tasks/deploy.yml
+++ b/ansible/roles/kafka/tasks/deploy.yml
@@ -47,15 +47,21 @@
   retries: 10
   delay: 5
 
-- name: create the active-ack and health topic
+- name: create the health topic
   shell: "docker exec kafka bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic {{ item }} --replication-factor 1 --partitions 1 --zookeeper {{ inventory_hostname }}:{{ zookeeper.port }}'"
   with_items:
-    - completed
     - health
   register: command_result
   failed_when: "not ('Created topic' in command_result.stdout or 'already exists' in command_result.stdout)"
   changed_when: "'Created topic' in command_result.stdout"
 
+- name: create the active-ack topics
+  shell: "docker exec kafka bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic health{{ item.0 }} --replication-factor 1 --partitions 1 --zookeeper {{ inventory_hostname }}:{{ zookeeper.port }}'"
+  with_indexed_items: "{{ groups['controllers'] }}"
+  register: command_result
+  failed_when: "not ('Created topic' in command_result.stdout or 'already exists' in command_result.stdout)"
+  changed_when: "'Created topic' in command_result.stdout"
+
 - name: create the invoker topics
   shell: "docker exec kafka bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic invoker{{ item.0 }} --replication-factor 1 --partitions 1 --zookeeper {{ inventory_hostname }}:{{ zookeeper.port }}'"
   with_indexed_items: "{{ groups['invokers'] }}"
diff --git a/ansible/roles/nginx/templates/nginx.conf.j2 b/ansible/roles/nginx/templates/nginx.conf.j2
index d1c05dc..4795746 100644
--- a/ansible/roles/nginx/templates/nginx.conf.j2
+++ b/ansible/roles/nginx/templates/nginx.conf.j2
@@ -2,7 +2,7 @@
 
 events {
 {# default: 1024 #}
-    worker_connections  4096;  
+    worker_connections  4096;
 }
 
 http {
@@ -16,6 +16,20 @@ http {
         '$http_referer $http_user_agent $upstream_addr';
     access_log /logs/nginx_access.log combined-upstream;
 
+    upstream controllers {
+        # fail_timeout: period of time the server will be considered unavailable
+        # Mark the controller as unavailable for at least 60 seconds, to not get any requests during restart.
+        # Otherwise, nginx would dispatch requests when the container is up, but the backend in the container not.
+        # From the docs:
+        # "normally, requests with a non-idempotent method (POST, LOCK, PATCH) are not passed to the next server if a request has been sent to an upstream server"
+        server {{ groups['controllers'] | first }}:{{ controller.basePort }} fail_timeout=60s;
+{% for ip in groups['controllers'] %}
+    {% if groups['controllers'].index(ip) > 0 %}
+        server {{ ip }}:{{ controller.basePort + groups['controllers'].index(ip) }} backup;
+    {% endif %}
+{% endfor %}
+    }
+
     server {
         listen 443 default ssl;
 
@@ -42,7 +56,7 @@ http {
             if ($namespace) {
               rewrite    /(.*) /api/v1/web/${namespace}/$1 break;
             }
-            proxy_pass http://{{ groups['controllers']|first }}:{{ controller.port }};
+            proxy_pass http://controllers;
             proxy_read_timeout 70s; # 60+10 additional seconds to allow controller to terminate request
         }
 
@@ -51,7 +65,7 @@ http {
             if ($namespace) {
               rewrite    ^ /api/v1/web/${namespace}/public/index.html break;
             }
-            proxy_pass http://{{ groups['controllers']|first }}:{{ controller.port }};
+            proxy_pass http://controllers;
             proxy_read_timeout 70s; # 60+10 additional seconds to allow controller to terminate request
         }
 
diff --git a/ansible/templates/whisk.properties.j2 b/ansible/templates/whisk.properties.j2
index 5ee84f7..af444f3 100644
--- a/ansible/templates/whisk.properties.j2
+++ b/ansible/templates/whisk.properties.j2
@@ -47,10 +47,8 @@ limits.triggers.fires.perMinute={{ limits.triggers.fires.perMinute }}
 {% endif %}
 
 consulserver.host={{ groups["consul_servers"]|first }}
-controller.host={{ groups["controllers"]|first }}
 edge.host={{ groups["edge"]|first }}
 kafka.host={{ groups["kafka"]|first }}
-loadbalancer.host={{ groups["controllers"]|first }}
 router.host={{ groups["edge"]|first }}
 zookeeper.host={{ groups["kafka"]|first }}
 invoker.hosts={{ groups["invokers"] | join(",") }}
@@ -59,12 +57,14 @@ edge.host.apiport=443
 zookeeper.host.port={{ zookeeper.port }}
 kafka.host.port={{ kafka.port }}
 kafkaras.host.port={{ kafka.ras.port }}
-controller.host.port={{ controller.port }}
-loadbalancer.host.port={{ controller.port }}
 consul.host.port4={{ consul.port.http }}
 consul.host.port5={{ consul.port.server }}
 invoker.hosts.baseport={{ invoker.port }}
 
+controller.hosts={{ groups["controllers"] | join(",") }}
+controller.host.basePort={{ controller.basePort }}
+controller.instances={{ controller.instances }}
+
 invoker.container.network=bridge
 invoker.container.policy={{ invoker_container_policy_name | default()}}
 invoker.container.dns={{ invoker_container_network_dns_servers | default()}}
@@ -74,6 +74,7 @@ invoker.serializeDockerOp={{ invoker.serializeDockerOp }}
 invoker.serializeDockerPull={{ invoker.serializeDockerPull }}
 invoker.useRunc={{ invoker_use_runc | default(invoker.useRunc) }}
 invoker.useReactivePool={{ invoker.useReactivePool }}
+invoker.instances={{ invoker.instances }}
 
 consulserver.docker.endpoint={{ groups["consul_servers"]|first }}:{{ docker.port }}
 edge.docker.endpoint={{ groups["edge"]|first }}:{{ docker.port }}
diff --git a/common/scala/src/main/scala/whisk/common/TransactionId.scala b/common/scala/src/main/scala/whisk/common/TransactionId.scala
index 0c4fc8e..c05034b 100644
--- a/common/scala/src/main/scala/whisk/common/TransactionId.scala
+++ b/common/scala/src/main/scala/whisk/common/TransactionId.scala
@@ -31,6 +31,7 @@ import spray.json.JsArray
 import spray.json.JsNumber
 import spray.json.JsValue
 import spray.json.RootJsonFormat
+import whisk.core.entity.InstanceId
 
 /**
  * A transaction id for tracking operations in the system that are specific to a request.
@@ -175,9 +176,12 @@ object TransactionId {
  * A thread-safe transaction counter.
  */
 trait TransactionCounter {
+    val numberOfInstances: Int
+    val instance: InstanceId
+
+    private lazy val cnt = new AtomicInteger(numberOfInstances + instance.toInt)
+
     def transid(): TransactionId = {
-        TransactionId(cnt.incrementAndGet())
+        TransactionId(cnt.addAndGet(numberOfInstances))
     }
-
-    private val cnt = new AtomicInteger(1)
 }
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index 9ec2cd8..ea1ff11 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -33,7 +33,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 
 import whisk.common.Logging
-import whisk.common.TransactionCounter
 import whisk.core.connector.MessageConsumer
 
 class KafkaConsumerConnector(
@@ -45,8 +44,7 @@ class KafkaConsumerConnector(
     sessionTimeout: FiniteDuration = 30 seconds,
     autoCommitInterval: FiniteDuration = 10 seconds)(
         implicit logging: Logging)
-    extends MessageConsumer
-    with TransactionCounter {
+    extends MessageConsumer {
 
     /**
      * Long poll for messages. Method returns once message are available but no later than given
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 267ad38..6af2fcb 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -79,15 +79,15 @@ class WhiskConfig(
     val invokerSerializeDockerPull = this(WhiskConfig.invokerSerializeDockerPull)
     val invokerUseRunc = this(WhiskConfig.invokerUseRunc)
     val invokerUseReactivePool = this(WhiskConfig.invokerUseReactivePool)
+    val invokerInstances = this(WhiskConfig.invokerInstances)
 
     val wskApiHost = this(WhiskConfig.wskApiProtocol) + "://" + this(WhiskConfig.wskApiHostname) + ":" + this(WhiskConfig.wskApiPort)
-    val controllerHost = this(WhiskConfig.controllerHostName) + ":" + this(WhiskConfig.controllerHostPort)
     val controllerBlackboxFraction = this.getAsDouble(WhiskConfig.controllerBlackboxFraction, 0.10)
     val loadbalancerInvokerBusyThreshold = this.getAsInt(WhiskConfig.loadbalancerInvokerBusyThreshold, 16)
+    val controllerInstances = this(WhiskConfig.controllerInstances)
 
     val edgeHost = this(WhiskConfig.edgeHostName) + ":" + this(WhiskConfig.edgeHostApiPort)
     val kafkaHost = this(WhiskConfig.kafkaHostName) + ":" + this(WhiskConfig.kafkaHostPort)
-    val loadbalancerHost = this(WhiskConfig.loadbalancerHostName) + ":" + this(WhiskConfig.loadbalancerHostPort)
 
     val edgeHostName = this(WhiskConfig.edgeHostName)
 
@@ -237,6 +237,7 @@ object WhiskConfig {
     val invokerSerializeDockerPull = "invoker.serializeDockerPull"
     val invokerUseRunc = "invoker.useRunc"
     val invokerUseReactivePool = "invoker.useReactivePool"
+    val invokerInstances = "invoker.instances"
 
     val wskApiProtocol = "whisk.api.host.proto"
     val wskApiPort = "whisk.api.host.port"
@@ -247,19 +248,16 @@ object WhiskConfig {
     val kafkaDockerEndpoint = "kafka.docker.endpoint"
     val mainDockerEndpoint = "main.docker.endpoint"
 
-    private val controllerHostName = "controller.host"
-    private val controllerHostPort = "controller.host.port"
     private val controllerBlackboxFraction = "controller.blackboxFraction"
+    val controllerInstances = "controller.instances"
 
     val loadbalancerInvokerBusyThreshold = "loadbalancer.invokerBusyThreshold"
 
     val kafkaHostName = "kafka.host"
-    val loadbalancerHostName = "loadbalancer.host"
     private val zookeeperHostName = "zookeeper.host"
 
     private val edgeHostApiPort = "edge.host.apiport"
     val kafkaHostPort = "kafka.host.port"
-    private val loadbalancerHostPort = "loadbalancer.host.port"
     private val zookeeperHostPort = "zookeeper.host.port"
 
     val consulServerHost = "consulserver.host"
@@ -270,8 +268,6 @@ object WhiskConfig {
     val consulServer = Map(consulServerHost -> null, consulPort -> null)
     val invokerHosts = Map(invokerHostsList -> null)
     val kafkaHost = Map(kafkaHostName -> null, kafkaHostPort -> null)
-    val controllerHost = Map(controllerHostName -> null, controllerHostPort -> null)
-    val loadbalancerHost = Map(loadbalancerHostName -> null, loadbalancerHostPort -> null)
 
     val runtimesManifest = "runtimes.manifest"
 
diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index 5ed4ed6..c895690 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -26,6 +26,7 @@ import whisk.core.entity.DocRevision
 import whisk.core.entity.EntityPath
 import whisk.core.entity.FullyQualifiedEntityName
 import whisk.core.entity.Identity
+import whisk.core.entity.InstanceId
 import whisk.core.entity.WhiskActivation
 
 /** Basic trait for messages that are sent on a message bus connector. */
@@ -53,6 +54,7 @@ case class ActivationMessage(
     user: Identity,
     activationId: ActivationId,
     activationNamespace: EntityPath,
+    rootControllerIndex: InstanceId,
     content: Option[JsObject],
     cause: Option[ActivationId] = None)
     extends Message {
@@ -80,7 +82,7 @@ object ActivationMessage extends DefaultJsonProtocol {
     def parse(msg: String) = Try(serdes.read(msg.parseJson))
 
     private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
-    implicit val serdes = jsonFormat8(ActivationMessage.apply)
+    implicit val serdes = jsonFormat9(ActivationMessage.apply)
 }
 
 /**
diff --git a/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala b/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala
new file mode 100644
index 0000000..f573b04
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2015-2016 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.entity
+
+import spray.json.DefaultJsonProtocol
+
+case class InstanceId(val instance: Int) {
+    def toInt: Int = instance
+}
+
+object InstanceId extends DefaultJsonProtocol {
+    implicit val serdes = jsonFormat1(InstanceId.apply)
+}
diff --git a/common/scala/src/main/scala/whisk/http/BasicRasService.scala b/common/scala/src/main/scala/whisk/http/BasicRasService.scala
index 4926e4c..f7d0034 100644
--- a/common/scala/src/main/scala/whisk/http/BasicRasService.scala
+++ b/common/scala/src/main/scala/whisk/http/BasicRasService.scala
@@ -17,10 +17,7 @@
 
 package whisk.http
 
-import akka.actor.Actor
-import akka.actor.ActorSystem
 import akka.event.Logging
-import akka.japi.Creator
 import spray.httpx.SprayJsonSupport._
 import whisk.common.Logging
 import whisk.common.TransactionId
@@ -45,28 +42,3 @@ trait BasicRasService extends BasicHttpService {
         get { complete("pong") }
     }
 }
-
-/**
- * Singleton which provides a factory for instances of the BasicRasService.
- */
-object BasicRasService {
-
-    def startService(system: ActorSystem, name: String, interface: String, port: Integer)(implicit logging: Logging) = {
-        BasicHttpService.startService(system, name, interface, port, new ServiceBuilder)
-    }
-
-    /**
-     * In spray, we send messages to an Akka Actor. A RasService represents an Actor
-     * which extends the BasicRasService trait.
-     */
-    private class RasService(implicit val logging: Logging) extends BasicRasService with Actor {
-        override def actorRefFactory = context
-    }
-
-    /**
-     * Akka-style factory for RasService.
-     */
-    private class ServiceBuilder(implicit logging: Logging) extends Creator[RasService] {
-        def create = new RasService
-    }
-}
diff --git a/core/controller/Dockerfile b/core/controller/Dockerfile
index 1b24625..ecfc0ec 100644
--- a/core/controller/Dockerfile
+++ b/core/controller/Dockerfile
@@ -5,7 +5,7 @@ ENV DEBIAN_FRONTEND noninteractive
 # Install swagger-ui
 RUN wget --no-verbose https://github.com/swagger-api/swagger-ui/archive/v2.1.4.tar.gz && \
     mkdir swagger-ui && \
-    tar zxf v2.1.4.tar.gz -C /swagger-ui --strip-components=2 swagger-ui-2.1.4/dist && \ 
+    tar zxf v2.1.4.tar.gz -C /swagger-ui --strip-components=2 swagger-ui-2.1.4/dist && \
     rm v2.1.4.tar.gz && \
     perl -pi -w -e  's{http://petstore.swagger.io/v2/swagger.json}{/api/v1/api-docs}g;' /swagger-ui/index.html
 
@@ -13,6 +13,5 @@ RUN wget --no-verbose https://github.com/swagger-api/swagger-ui/archive/v2.1.4.t
 # Copy app jars
 COPY build/distributions/controller.tar ./
 RUN tar xf controller.tar
-CMD controller/bin/controller >> /logs/${COMPONENT_NAME}_logs.log 2>&1
 
 EXPOSE 8080
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index f383cd0..2740190 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -32,19 +32,19 @@ import spray.routing.Directive.pimpApply
 import spray.routing.Route
 import whisk.common.AkkaLogging
 import whisk.common.Logging
+import whisk.common.LoggingMarkers
 import whisk.common.TransactionId
 import whisk.core.WhiskConfig
 import whisk.core.entitlement._
 import whisk.core.entitlement.EntitlementProvider
 import whisk.core.entity._
-import whisk.core.entity.ExecManifest.Runtimes
 import whisk.core.entity.ActivationId.ActivationIdGenerator
+import whisk.core.entity.ExecManifest.Runtimes
 import whisk.core.loadBalancer.LoadBalancerService
 import whisk.http.BasicHttpService
 import whisk.http.BasicRasService
-import whisk.common.LoggingMarkers
 
-import scala.util.{Failure, Success}
+import scala.util.{ Failure, Success }
 
 /**
  * The Controller is the service that provides the REST API for OpenWhisk.
@@ -53,6 +53,15 @@ import scala.util.{Failure, Success}
  *
  * Spray sends messages to akka Actors -- the Controller is an Actor, ready to receive messages.
  *
+ * It is possible to deploy a hot-standby controller. Each controller needs its own instance. This instance is a
+ * consecutive numbering, starting with 0.
+ * The state and cache of each controller is not shared to the other controllers.
+ * If the base controller crashes, the hot-standby controller will be used. After the base controller is up again,
+ * it will be used again. Because of the empty cache after restart, there are no problems with inconsistency.
+ * The only problem that could occur is, that the base controller is not reachable, but does not restart. After switching
+ * back to the base controller, there could be an inconsistency in the cache (e.g. if a user has updated an action). This
+ * inconsistency will be resolved by its own after removing the cached item, 5 minutes after it has been generated.
+ *
  * @Idioglossia uses the spray-routing DSL
  * http://spray.io/documentation/1.1.3/spray-routing/advanced-topics/understanding-dsl-structure/
  *
@@ -62,7 +71,7 @@ import scala.util.{Failure, Success}
  * @param executionContext Scala runtime support for concurrent operations
  */
 class Controller(
-    instance: Int,
+    override val instance: InstanceId,
     runtimes: Runtimes,
     implicit val whiskConfig: WhiskConfig,
     implicit val logging: Logging)
@@ -72,6 +81,8 @@ class Controller(
     // each akka Actor has an implicit context
     override def actorRefFactory: ActorContext = context
 
+    override val numberOfInstances = whiskConfig.controllerInstances.toInt
+
     /**
      * A Route in spray is technically a function taking a RequestContext as a parameter.
      *
@@ -96,7 +107,7 @@ class Controller(
         }
     }
 
-    TransactionId.controller.mark(this, LoggingMarkers.CONTROLLER_STARTUP(instance), s"starting controller instance ${instance}")
+    TransactionId.controller.mark(this, LoggingMarkers.CONTROLLER_STARTUP(instance.toInt), s"starting controller instance ${instance.toInt}")
 
     // initialize datastores
     private implicit val actorSystem = context.system
@@ -106,7 +117,7 @@ class Controller(
     private implicit val activationStore = WhiskActivationStore.datastore(whiskConfig)
 
     // initialize backend services
-    private implicit val loadBalancer = new LoadBalancerService(whiskConfig, entityStore)
+    private implicit val loadBalancer = new LoadBalancerService(whiskConfig, instance, entityStore)
     private implicit val consulServer = whiskConfig.consulServer
     private implicit val entitlementProvider = new LocalEntitlementProvider(whiskConfig, loadBalancer)
     private implicit val activationIdFactory = new ActivationIdGenerator {}
@@ -115,6 +126,7 @@ class Controller(
     Collection.initialize(entityStore)
 
     /** The REST APIs. */
+    implicit val controllerInstance = instance
     private val apiv1 = new RestAPIVersion("api", "v1")
     private val swagger = new SwaggerDocs(Uri.Path.Empty, "infoswagger.json")
 
@@ -144,6 +156,7 @@ object Controller {
     // a value, and whose values are default values.   A null value in the Map means there is
     // no default value specified, so it must appear in the properties file
     def requiredProperties = Map(WhiskConfig.servicePort -> 8080.toString) ++
+        Map(WhiskConfig.controllerInstances -> 1.toString) ++
         ExecManifest.requiredProperties ++
         RestApiCommons.requiredProperties ++
         LoadBalancerService.requiredProperties ++
@@ -164,7 +177,7 @@ object Controller {
         "runtimes" -> runtimes.toJson)
 
     // akka-style factory to create a Controller object
-    private class ServiceBuilder(config: WhiskConfig, instance: Int, logging: Logging) extends Creator[Controller] {
+    private class ServiceBuilder(config: WhiskConfig, instance: InstanceId, logging: Logging) extends Creator[Controller] {
         // this method is not reached unless ExecManifest was initialized successfully
         def create = new Controller(instance, ExecManifest.runtimesManifest, config, logging)
     }
@@ -177,8 +190,8 @@ object Controller {
         val config = new WhiskConfig(requiredProperties, optionalProperties)
 
         // if deploying multiple instances (scale out), must pass the instance number as the
-        // second argument.  (TODO .. seems fragile)
-        val instance = if (args.length > 0) args(1).toInt else 0
+        require(args.length >= 1, "controller instance required")
+        val instance = args(0).toInt
 
         def abort() = {
             logger.error(this, "Bad configuration, cannot start.")
@@ -194,7 +207,7 @@ object Controller {
         ExecManifest.initialize(config) match {
             case Success(_) =>
                 val port = config.servicePort.toInt
-                BasicHttpService.startService(actorSystem, "controller", "0.0.0.0", port, new ServiceBuilder(config, instance, logger))
+                BasicHttpService.startService(actorSystem, "controller", "0.0.0.0", port, new ServiceBuilder(config, InstanceId(instance), logger))
 
             case Failure(t) =>
                 logger.error(this, s"Invalid runtimes manifest: $t")
diff --git a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
index 6202868..08cf874 100644
--- a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
@@ -19,8 +19,8 @@ package whisk.core.controller
 
 import scala.concurrent.ExecutionContext
 
+import RestApiCommons._
 import akka.actor.ActorSystem
-
 import spray.http.AllOrigins
 import spray.http.HttpHeaders._
 import spray.http.StatusCodes._
@@ -31,7 +31,6 @@ import spray.json.DefaultJsonProtocol._
 import spray.routing.Directive.pimpApply
 import spray.routing.Directives
 import spray.routing.Route
-
 import whisk.common.Logging
 import whisk.common.TransactionId
 import whisk.core.WhiskConfig
@@ -42,7 +41,6 @@ import whisk.core.entity._
 import whisk.core.entity.ActivationId.ActivationIdGenerator
 import whisk.core.entity.types._
 import whisk.core.loadBalancer.LoadBalancerService
-import RestApiCommons._
 
 /**
  * Abstract class which provides basic Directives which are used to construct route structures
@@ -106,6 +104,7 @@ protected[controller] object RestApiCommons {
         override val webApiDirectives: WebApiDirectives)(
             implicit override val authStore: AuthStore,
             implicit val entityStore: EntityStore,
+            override val activeAckTopicIndex: InstanceId,
             override val activationStore: ActivationStore,
             override val entitlementProvider: EntitlementProvider,
             override val activationIdFactory: ActivationIdGenerator,
@@ -133,6 +132,7 @@ protected[controller] trait RespondWithHeaders extends Directives {
  * An object which creates the Routes that define v1 of the whisk REST API.
  */
 protected[controller] class RestAPIVersion(apipath: String, apiversion: String)(
+    implicit val activeAckTopicIndex: InstanceId,
     implicit val authStore: AuthStore,
     implicit val entityStore: EntityStore,
     implicit val activationStore: ActivationStore,
@@ -224,6 +224,7 @@ protected[controller] class RestAPIVersion(apipath: String, apiversion: String)(
         val apipath: String,
         val apiversion: String)(
             implicit override val actorSystem: ActorSystem,
+            override val activeAckTopicIndex: InstanceId,
             override val entityStore: EntityStore,
             override val activationStore: ActivationStore,
             override val entitlementProvider: EntitlementProvider,
diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
index 9f98a71..0911a75 100644
--- a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
@@ -29,8 +29,8 @@ import whisk.common.Logging
 import whisk.common.LoggingMarkers
 import whisk.common.TransactionId
 import whisk.core.connector.ActivationMessage
-import whisk.core.controller.WhiskServices
 import whisk.core.controller.WhiskActionsApi
+import whisk.core.controller.WhiskServices
 import whisk.core.database.NoDocumentException
 import whisk.core.entity._
 import whisk.core.entity.types.ActivationStore
@@ -48,6 +48,12 @@ protected[actions] trait PrimitiveActions {
 
     protected implicit val logging: Logging
 
+    /**
+     *  The index of the active ack topic, this controller is listening for.
+     *  Typically this is also the instance number of the controller
+     */
+    protected val activeAckTopicIndex: InstanceId
+
     /** Database service to CRUD actions. */
     protected val entityStore: EntityStore
 
@@ -99,6 +105,7 @@ protected[actions] trait PrimitiveActions {
             user,
             activationIdFactory.make(), // activation id created here
             activationNamespace = user.namespace.toPath,
+            activeAckTopicIndex,
             args,
             cause = cause)
 
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
index 4e71260..4ba44e9 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
@@ -21,12 +21,9 @@ import scala.concurrent.Future
 import scala.concurrent.duration.DurationInt
 
 import akka.actor.ActorSystem
-import spray.json._
-import spray.json.DefaultJsonProtocol._
-import whisk.common.ConsulClient
-import whisk.common.ConsulKV.ControllerKeys
 import whisk.common.Logging
 import whisk.common.Scheduler
+import whisk.common.TransactionId
 import whisk.core.entity.Subject
 import whisk.core.loadBalancer.LoadBalancer
 
@@ -54,34 +51,27 @@ class ActivationThrottler(consulServer: String, loadBalancer: LoadBalancer, conc
     private var userActivationCounter = Map.empty[String, Int]
 
     private val healthCheckInterval = 5.seconds
-    private val consul = new ConsulClient(consulServer)
 
     /**
      * Checks whether the operation should be allowed to proceed.
      */
-    def check(subject: Subject): Boolean = userActivationCounter.getOrElse(subject.asString, 0) < concurrencyLimit
+    def check(subject: Subject)(implicit tid: TransactionId): Boolean = {
+        val concurrentActivations = userActivationCounter.getOrElse(subject.asString, 0)
+        logging.info(this, s"subject = ${subject.toString}, concurrent activations = $concurrentActivations, below limit = $concurrencyLimit")
+        concurrentActivations < concurrencyLimit
+    }
 
     /**
      * Checks whether the system is in a generally overloaded state.
      */
-    def isOverloaded = userActivationCounter.values.sum > systemOverloadLimit
-
-    /**
-     * Publish into Consul KV values showing the controller's view
-     * of concurrent activations on a per-user basis.
-     */
-    private def publishUserConcurrentActivation() = {
-        // Any sort of partitioning will be ok for monitoring
-        Future.sequence(userActivationCounter.groupBy(_._1.take(1)).map {
-            case (prefix, items) =>
-                val key = ControllerKeys.userActivationCountKey + "/" + prefix
-                consul.kv.put(key, items.toJson.compactPrint)
-        })
+    def isOverloaded()(implicit tid: TransactionId): Boolean = {
+        val concurrentActivations = userActivationCounter.values.sum
+        logging.info(this, s"concurrent activations in system = $concurrentActivations, below limit = $systemOverloadLimit")
+        concurrentActivations > systemOverloadLimit
     }
 
     Scheduler.scheduleWaitAtLeast(healthCheckInterval) { () =>
         userActivationCounter = loadBalancer.getActiveUserActivationCounts
-        publishUserConcurrentActivation()
+        Future.successful(Unit)
     }
-
 }
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
index caf33ae..6c205b5 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
@@ -28,7 +28,6 @@ import Privilege.REJECT
 import akka.actor.ActorSystem
 import spray.http.StatusCodes.Forbidden
 import spray.http.StatusCodes.TooManyRequests
-import whisk.common.ConsulClient
 import whisk.common.Logging
 import whisk.common.TransactionId
 import whisk.core.WhiskConfig
@@ -87,8 +86,6 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
     private val triggerRateThrottler = new RateThrottler("triggers per minute", config.triggerFirePerMinuteLimit.toInt)
     private val concurrentInvokeThrottler = new ActivationThrottler(config.consulServer, loadBalancer, config.actionInvokeConcurrentLimit.toInt, config.actionInvokeSystemOverloadLimit.toInt)
 
-    private val consul = new ConsulClient(config.consulServer)
-
     /**
      * Grants a subject the right to access a resources.
      *
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index 436c247..d7e9025 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -56,6 +56,7 @@ import whisk.core.entity.DocRevision
 import whisk.core.entity.EntityName
 import whisk.core.entity.ExecManifest
 import whisk.core.entity.Identity
+import whisk.core.entity.InstanceId
 import whisk.core.entity.Secret
 import whisk.core.entity.Subject
 import whisk.core.entity.UUID
@@ -190,7 +191,7 @@ object InvokerPool {
  * This finite state-machine represents an Invoker in its possible
  * states "Healthy" and "Offline".
  */
-class InvokerActor extends FSM[InvokerState, InvokerInfo] {
+class InvokerActor(controllerInstance: InstanceId) extends FSM[InvokerState, InvokerInfo] {
     implicit val transid = TransactionId.invokerHealth
     implicit val logging = new AkkaLogging(context.system.log)
     def name = self.path.name
@@ -311,6 +312,7 @@ class InvokerActor extends FSM[InvokerState, InvokerInfo] {
                 // Create a new Activation ID for this activation
                 activationId = new ActivationIdGenerator {}.make(),
                 activationNamespace = action.namespace,
+                rootControllerIndex = controllerInstance,
                 content = None)
 
             context.parent ! ActivationRequest(activationMessage, name)
@@ -328,7 +330,7 @@ class InvokerActor extends FSM[InvokerState, InvokerInfo] {
 }
 
 object InvokerActor {
-    def props() = Props[InvokerActor]
+    def props(controllerInstance: InstanceId) = Props(new InvokerActor(controllerInstance))
 
     val bufferSize = 10
     val bufferErrorTolerance = 3
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index 7e633c5..636f157 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -18,7 +18,6 @@
 package whisk.core.loadBalancer
 
 import java.nio.charset.StandardCharsets
-
 import java.time.{ Clock, Instant }
 
 import scala.collection.concurrent.TrieMap
@@ -50,6 +49,7 @@ import whisk.core.connector.{ ActivationMessage, CompletionMessage }
 import whisk.core.connector.MessageProducer
 import whisk.core.database.NoDocumentException
 import whisk.core.entity.{ ActivationId, CodeExec, WhiskAction, WhiskActivation }
+import whisk.core.entity.InstanceId
 import whisk.core.entity.WhiskAction
 import whisk.core.entity.types.EntityStore
 import scala.annotation.tailrec
@@ -77,7 +77,7 @@ trait LoadBalancer {
 
 }
 
-class LoadBalancerService(config: WhiskConfig, entityStore: EntityStore)(implicit val actorSystem: ActorSystem, logging: Logging) extends LoadBalancer {
+class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore: EntityStore)(implicit val actorSystem: ActorSystem, logging: Logging) extends LoadBalancer {
 
     /** The execution context for futures */
     implicit val executionContext: ExecutionContext = actorSystem.dispatcher
@@ -220,8 +220,9 @@ class LoadBalancerService(config: WhiskConfig, entityStore: EntityStore)(implici
         }
 
         val consul = new ConsulClient(config.consulServer)
-        val pingConsumer = new KafkaConsumerConnector(config.kafkaHost, "health", "health")
-        val invokerFactory = (f: ActorRefFactory, name: String) => f.actorOf(InvokerActor.props, name)
+        // Each controller gets its own Group Id, to receive all messages
+        val pingConsumer = new KafkaConsumerConnector(config.kafkaHost, s"health${instance.toInt}", "health")
+        val invokerFactory = (f: ActorRefFactory, name: String) => f.actorOf(InvokerActor.props(instance), name)
 
         actorSystem.actorOf(InvokerPool.props(invokerFactory, consul.kv, invoker => {
             clearInvokerState(invoker)
@@ -230,7 +231,7 @@ class LoadBalancerService(config: WhiskConfig, entityStore: EntityStore)(implici
     }
 
     /** Subscribes to active acks (completion messages from the invokers). */
-    private val activeAckConsumer = new KafkaConsumerConnector(config.kafkaHost, "completions", "completed")
+    private val activeAckConsumer = new KafkaConsumerConnector(config.kafkaHost, "completions", s"completed${instance.toInt}")
 
     /** Registers a handler for received active acks from invokers. */
     activeAckConsumer.onMessage((topic, _, _, bytes) => {
diff --git a/core/invoker/src/main/scala/whisk/core/container/ContainerPool.scala b/core/invoker/src/main/scala/whisk/core/container/ContainerPool.scala
index 0b28f2e..af3d9e4 100644
--- a/core/invoker/src/main/scala/whisk/core/container/ContainerPool.scala
+++ b/core/invoker/src/main/scala/whisk/core/container/ContainerPool.scala
@@ -52,7 +52,7 @@ import whisk.core.entity._
  */
 class ContainerPool(
     config: WhiskConfig,
-    invokerInstance: Integer = 0,
+    invokerInstance: InstanceId = InstanceId(0),
     standalone: Boolean = false,
     saveContainerLog: Boolean = false)(implicit actorSystem: ActorSystem, val logging: Logging)
     extends ContainerUtils {
@@ -429,7 +429,7 @@ class ContainerPool(
 
     // Sample container name: wsk1_1_joeibmcomhelloWorldDemo_20150901T202701852Z
     private def makeContainerName(localName: String): ContainerName =
-        ContainerCounter.containerName(invokerInstance.toString(), localName)
+        ContainerCounter.containerName(invokerInstance.toInt.toString, localName)
 
     private def makeContainerName(action: WhiskAction): ContainerName =
         makeContainerName(action.fullyQualifiedName(true).toString)
diff --git a/core/invoker/src/main/scala/whisk/core/container/WhiskContainer.scala b/core/invoker/src/main/scala/whisk/core/container/WhiskContainer.scala
index a68be9d..2e0e378 100644
--- a/core/invoker/src/main/scala/whisk/core/container/WhiskContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/container/WhiskContainer.scala
@@ -120,6 +120,7 @@ class WhiskContainer(
             WhiskAuth(Subject(), AuthKey()).toIdentity,
             ActivationId(),
             EntityPath("no_namespace"),
+            InstanceId(0),
             None)
         run(msg, params, 30000.milliseconds)(system, TransactionId.testing)
     }
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index 55c7999..1b8c7fe 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -96,7 +96,7 @@ case object ActivationCompleted
  */
 class ContainerProxy(
     factory: (TransactionId, String, ImageName, Boolean, ByteSize) => Future[Container],
-    sendActiveAck: (TransactionId, WhiskActivation) => Future[Any],
+    sendActiveAck: (TransactionId, WhiskActivation, InstanceId) => Future[Any],
     storeActivation: (TransactionId, WhiskActivation) => Future[Any],
     unusedTimeout: FiniteDuration,
     pauseGrace: FiniteDuration) extends FSM[ContainerState, ContainerData] with Stash {
@@ -154,7 +154,7 @@ class ContainerProxy(
                     // transitions to Running
                     val activation = ContainerProxy.constructWhiskActivation(job, Interval.zero, response)
                     self ! ActivationCompleted
-                    sendActiveAck(transid, activation)
+                    sendActiveAck(transid, activation, job.msg.rootControllerIndex)
                     storeActivation(transid, activation)
             }.flatMap {
                 container =>
@@ -361,7 +361,7 @@ class ContainerProxy(
         // asynchronous.
         activation.andThen {
             // the activation future will always complete with Success
-            case Success(ack) => sendActiveAck(tid, ack)
+            case Success(ack) => sendActiveAck(tid, ack, job.msg.rootControllerIndex)
         }.flatMap { activation =>
             container.logs(job.action.limits.logs.asMegaBytes, job.action.exec.sentinelledLogs).map { logs =>
                 activation.withLogs(ActivationLogs(logs.toVector))
@@ -380,7 +380,7 @@ class ContainerProxy(
 
 object ContainerProxy {
     def props(factory: (TransactionId, String, ImageName, Boolean, ByteSize) => Future[Container],
-              ack: (TransactionId, WhiskActivation) => Future[Any],
+              ack: (TransactionId, WhiskActivation, InstanceId) => Future[Any],
               store: (TransactionId, WhiskActivation) => Future[Any],
               unusedTimeout: FiniteDuration = 10.minutes,
               pauseGrace: FiniteDuration = 50.milliseconds) = Props(new ContainerProxy(factory, ack, store, unusedTimeout, pauseGrace))
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 4ccc129..813cc6b 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -18,7 +18,6 @@
 package whisk.core.invoker
 
 import java.nio.charset.StandardCharsets
-
 import java.time.{ Clock, Instant }
 
 import scala.concurrent.{ Await, ExecutionContext, Future }
@@ -26,6 +25,7 @@ import scala.concurrent.Promise
 import scala.concurrent.duration.{ Duration, DurationInt }
 import scala.language.postfixOps
 import scala.util.{ Failure, Success }
+import scala.util.Try
 
 import akka.actor.{ ActorRef, ActorSystem, actorRef2Scala }
 import akka.japi.Creator
@@ -33,10 +33,13 @@ import spray.json._
 import spray.json.DefaultJsonProtocol._
 import whisk.common.{ Counter, Logging, LoggingMarkers, TransactionId }
 import whisk.common.AkkaLogging
+import whisk.common.Scheduler
 import whisk.connector.kafka.{ KafkaConsumerConnector, KafkaProducerConnector }
 import whisk.core.WhiskConfig
 import whisk.core.WhiskConfig.{ consulServer, dockerImagePrefix, dockerRegistry, kafkaHost, logsDir, servicePort, whiskVersion, invokerUseReactivePool }
 import whisk.core.connector.{ ActivationMessage, CompletionMessage }
+import whisk.core.connector.MessageProducer
+import whisk.core.connector.PingMessage
 import whisk.core.container._
 import whisk.core.dispatcher.{ Dispatcher, MessageHandler }
 import whisk.core.dispatcher.ActivationFeed.{ ActivationNotification, ContainerReleased, FailedActivation }
@@ -44,10 +47,6 @@ import whisk.core.entity._
 import whisk.http.BasicHttpService
 import whisk.http.Messages
 import whisk.utils.ExecutionContextFactory
-import whisk.common.Scheduler
-import whisk.core.connector.PingMessage
-import scala.util.Try
-import whisk.core.connector.MessageProducer
 
 /**
  * A kafka message handler that invokes actions as directed by message on topic "/actions/invoke".
@@ -59,16 +58,16 @@ import whisk.core.connector.MessageProducer
  */
 class Invoker(
     config: WhiskConfig,
-    instance: Int,
+    instance: InstanceId,
     activationFeed: ActorRef,
     producer: MessageProducer,
     runningInContainer: Boolean = true)(implicit actorSystem: ActorSystem, logging: Logging)
-    extends MessageHandler(s"invoker$instance")
+    extends MessageHandler(s"invoker${instance.toInt}")
     with ActionLogDriver {
 
     private implicit val executionContext: ExecutionContext = actorSystem.dispatcher
 
-    TransactionId.invoker.mark(this, LoggingMarkers.INVOKER_STARTUP(instance), s"starting invoker instance ${instance}")
+    TransactionId.invoker.mark(this, LoggingMarkers.INVOKER_STARTUP(instance.toInt), s"starting invoker instance ${instance.toInt}")
 
     /**
      * This is the handler for the kafka message
@@ -257,7 +256,7 @@ class Invoker(
         val activationResult = makeWhiskActivation(msg, EntityPath(action.fullyQualifiedName(false).toString), action.version, activationResponse, activationInterval, Some(action.limits))
         val completeMsg = CompletionMessage(transid, activationResult, this.name)
 
-        producer.send("completed", completeMsg) map { status =>
+        producer.send(s"completed${msg.rootControllerIndex.toInt}", completeMsg) map { status =>
             logging.info(this, s"posted completion of activation ${msg.activationId}")
         }
 
@@ -409,7 +408,8 @@ object Invoker {
         logsDir -> null,
         dockerRegistry -> null,
         dockerImagePrefix -> null,
-        invokerUseReactivePool -> false.toString) ++
+        invokerUseReactivePool -> false.toString,
+        WhiskConfig.invokerInstances -> null) ++
         ExecManifest.requiredProperties ++
         WhiskAuthStore.requiredProperties ++
         WhiskEntityStore.requiredProperties ++
@@ -421,7 +421,7 @@ object Invoker {
 
     def main(args: Array[String]): Unit = {
         require(args.length == 1, "invoker instance required")
-        val instance = args(0).toInt
+        val invokerInstance = InstanceId(args(0).toInt)
 
         implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
         implicit val actorSystem: ActorSystem = ActorSystem(
@@ -449,7 +449,7 @@ object Invoker {
             abort()
         }
 
-        val topic = s"invoker$instance"
+        val topic = s"invoker${invokerInstance.toInt}"
         val groupid = "invokers"
         val maxdepth = ContainerPool.getDefaultMaxActive(config)
         val consumer = new KafkaConsumerConnector(config.kafkaHost, groupid, topic, maxdepth)
@@ -457,9 +457,9 @@ object Invoker {
         val dispatcher = new Dispatcher(consumer, 500 milliseconds, 2 * maxdepth, actorSystem)
 
         val invoker = if (Try(config.invokerUseReactivePool.toBoolean).getOrElse(false)) {
-            new InvokerReactive(config, instance, dispatcher.activationFeed, producer)
+            new InvokerReactive(config, invokerInstance, dispatcher.activationFeed, producer)
         } else {
-            new Invoker(config, instance, dispatcher.activationFeed, producer)
+            new Invoker(config, invokerInstance, dispatcher.activationFeed, producer)
         }
         logger.info(this, s"using $invoker")
 
@@ -467,18 +467,15 @@ object Invoker {
         dispatcher.start()
 
         Scheduler.scheduleWaitAtMost(1.seconds)(() => {
-            producer.send("health", PingMessage(s"invoker$instance")).andThen {
+            producer.send("health", PingMessage(s"invoker${invokerInstance.toInt}")).andThen {
                 case Failure(t) => logger.error(this, s"failed to ping the controller: $t")
             }
         })
 
         val port = config.servicePort.toInt
         BasicHttpService.startService(actorSystem, "invoker", "0.0.0.0", port, new Creator[InvokerServer] {
-            def create = new InvokerServer {
-                override implicit val logging = logger
-            }
+            def create = new InvokerServer(invokerInstance, config.invokerInstances.toInt)
         })
-
     }
 }
 
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index cf7e3ae..b90db49 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -37,27 +37,27 @@ import whisk.core.connector.CompletionMessage
 import whisk.core.connector.MessageProducer
 import whisk.core.container.{ ContainerPool => OldContainerPool }
 import whisk.core.container.Interval
+import whisk.core.containerpool.ContainerPool
 import whisk.core.containerpool.ContainerProxy
 import whisk.core.containerpool.PrewarmingConfig
 import whisk.core.containerpool.Run
 import whisk.core.containerpool.docker.DockerClientWithFileAccess
 import whisk.core.containerpool.docker.DockerContainer
 import whisk.core.containerpool.docker.RuncClient
+import whisk.core.database.NoDocumentException
 import whisk.core.dispatcher.ActivationFeed.FailedActivation
 import whisk.core.dispatcher.MessageHandler
 import whisk.core.entity._
 import whisk.core.entity.ExecManifest.ImageName
 import whisk.core.entity.size._
-import whisk.core.containerpool.ContainerPool
-import whisk.core.database.NoDocumentException
 import whisk.http.Messages
 
 class InvokerReactive(
     config: WhiskConfig,
-    instance: Int,
+    instance: InstanceId,
     activationFeed: ActorRef,
     producer: MessageProducer)(implicit actorSystem: ActorSystem, logging: Logging)
-    extends MessageHandler(s"invoker$instance") {
+    extends MessageHandler(s"invoker${instance.toInt}") {
 
     implicit val ec = actorSystem.dispatcher
 
@@ -107,9 +107,9 @@ class InvokerReactive(
     }
 
     /** Sends an active-ack. */
-    val ack = (tid: TransactionId, activation: WhiskActivation) => {
+    val ack = (tid: TransactionId, activation: WhiskActivation, controllerInstance: InstanceId) => {
         implicit val transid = tid
-        producer.send("completed", CompletionMessage(tid, activation, s"invoker$instance")).andThen {
+        producer.send(s"completed${controllerInstance.toInt}", CompletionMessage(tid, activation, s"invoker${instance.toInt}")).andThen {
             case Success(_) => logging.info(this, s"posted completion of activation ${activation.activationId}")
         }
     }
@@ -195,7 +195,7 @@ class InvokerReactive(
                     })
 
                 activationFeed ! FailedActivation(msg.transid)
-                ack(msg.transid, activation)
+                ack(msg.transid, activation, msg.rootControllerIndex)
                 store(msg.transid, activation)
         }
     }
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerServer.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerServer.scala
index 9b71a8a..cd083cb 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerServer.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerServer.scala
@@ -18,13 +18,18 @@
 package whisk.core.invoker
 
 import akka.actor.Actor
+import whisk.common.Logging
+import whisk.core.entity.InstanceId
 import whisk.http.BasicRasService
 
 /**
  * Implements web server to handle certain REST API calls.
  * Currently provides a health ping route, only.
  */
-trait InvokerServer
+class InvokerServer(
+    override val instance: InstanceId,
+    override val numberOfInstances: Int)(
+        override implicit val logging: Logging)
     extends BasicRasService
     with Actor {
 
diff --git a/tests/src/test/scala/common/LoggedFunction.scala b/tests/src/test/scala/common/LoggedFunction.scala
index 467bc5a..e6c0aad 100644
--- a/tests/src/test/scala/common/LoggedFunction.scala
+++ b/tests/src/test/scala/common/LoggedFunction.scala
@@ -38,6 +38,15 @@ class LoggedFunction2[A1, A2, B](body: (A1, A2) => B) extends Function2[A1, A2,
     }
 }
 
+class LoggedFunction3[A1, A2, A3, B](body: (A1, A2, A3) => B) extends Function3[A1, A2, A3, B] {
+    val calls = mutable.Buffer[(A1, A2, A3)]()
+
+    override def apply(v1: A1, v2: A2, v3: A3): B = {
+        calls += ((v1, v2, v3))
+        body(v1, v2, v3)
+    }
+}
+
 class LoggedFunction5[A1, A2, A3, A4, A5, B](body: (A1, A2, A3, A4, A5) => B) extends Function5[A1, A2, A3, A4, A5, B] {
     val calls = mutable.Buffer[(A1, A2, A3, A4, A5)]()
 
@@ -49,5 +58,6 @@ class LoggedFunction5[A1, A2, A3, A4, A5, B](body: (A1, A2, A3, A4, A5) => B) ex
 
 object LoggedFunction {
     def apply[A1, A2, B](body: (A1, A2) => B) = new LoggedFunction2(body)
+    def apply[A1, A2, A3, B](body: (A1, A2, A3) => B) = new LoggedFunction3(body)
     def apply[A1, A2, A3, A4, A5, B](body: (A1, A2, A3, A4, A5) => B) = new LoggedFunction5(body)
 }
diff --git a/tests/src/test/scala/common/WhiskProperties.java b/tests/src/test/scala/common/WhiskProperties.java
index 3591225..70176e7 100644
--- a/tests/src/test/scala/common/WhiskProperties.java
+++ b/tests/src/test/scala/common/WhiskProperties.java
@@ -238,20 +238,20 @@ public class WhiskProperties {
         return Integer.parseInt(whiskProperties.getProperty("edge.host.apiport"));
     }
 
-    public static String getLoadbalancerHost() {
-        return whiskProperties.getProperty("loadbalancer.host");
+    public static String getControllerHosts() {
+        return whiskProperties.getProperty("controller.hosts");
     }
 
-    public static int getLoadbalancerPort() {
-        return Integer.parseInt(whiskProperties.getProperty("loadbalancer.host.port"));
+    public static int getControllerBasePort() {
+        return Integer.parseInt(whiskProperties.getProperty("controller.host.basePort"));
     }
 
-    public static String getControllerHost() {
-        return whiskProperties.getProperty("controller.host");
+    public static String getBaseControllerHost() {
+    	return getControllerHosts().split(",")[0];
     }
 
-    public static int getControllerPort() {
-        return Integer.parseInt(whiskProperties.getProperty("controller.host.port"));
+    public static String getBaseControllerAddress() {
+    	return getBaseControllerHost() + ":" + getControllerBasePort();
     }
 
     public static int getMaxActionInvokesPerMinute() {
diff --git a/tests/src/test/scala/services/HeadersTests.scala b/tests/src/test/scala/services/HeadersTests.scala
index 0f43d10..82a85d6 100644
--- a/tests/src/test/scala/services/HeadersTests.scala
+++ b/tests/src/test/scala/services/HeadersTests.scala
@@ -75,7 +75,7 @@ class HeadersTests extends FlatSpec
     val allowOrigin = `Access-Control-Allow-Origin`(AllOrigins)
     val allowHeaders = `Access-Control-Allow-Headers`("Authorization", "Content-Type")
 
-    val url = Uri(s"http://${WhiskProperties.getControllerHost}:${WhiskProperties.getControllerPort}")
+    val url = Uri(s"http://${WhiskProperties.getBaseControllerAddress()}")
     val pipeline: HttpRequest => Future[HttpResponse] = (
         sendReceive
         ~> unmarshal[HttpResponse])
diff --git a/tests/src/test/scala/services/PingTests.scala b/tests/src/test/scala/services/PingTests.scala
index 3883f7f..72d840d 100644
--- a/tests/src/test/scala/services/PingTests.scala
+++ b/tests/src/test/scala/services/PingTests.scala
@@ -17,19 +17,34 @@
 
 package services
 
-import org.junit.Assert.assertTrue
-
 import java.io.File
 
-import org.junit.Rule
-import org.junit.Test
-import org.junit.rules.TestRule
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration.DurationInt
+import scala.util.Try
+
+import org.junit.Assert.assertTrue
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
 
 import com.jayway.restassured.RestAssured
 
+import akka.actor.ActorSystem
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.model.HttpRequest
+import akka.http.scaladsl.unmarshalling.Unmarshal
+import akka.stream.ActorMaterializer
+import akka.stream.Materializer
 import common.TestUtils
-import common.WhiskProperties
 import common.TestUtils.RunResult
+import common.WhiskProperties
+import common.WskActorSystem
+import common.WskTestHelpers
+import akka.http.scaladsl.model.StatusCodes
 
 /**
  * Basic tests to check that a Whisk installation is healthy in that all
@@ -38,14 +53,27 @@ import common.TestUtils.RunResult
 object PingTests {
     val bin: File = WhiskProperties.getFileRelativeToWhiskHome("tools/health")
 
-    def isAlive(name: String, whiskPropertyFile: String): RunResult = {
+    def isAliveScript(name: String, whiskPropertyFile: String): RunResult = {
         TestUtils.runCmd(TestUtils.SUCCESS_EXIT, bin, WhiskProperties.python, "isAlive", "-d", whiskPropertyFile, "--wait", "30", name)
     }
+
+    def ping(host: String, port: Int)(implicit actorSystem: ActorSystem, ec: ExecutionContext, materializer: Materializer) = {
+        val response = Try { Await.result(Http().singleRequest(HttpRequest(uri = s"http://$host:$port/ping")), 10.seconds) }.toOption
+
+        response.map { res =>
+            (res.status, Await.result(Unmarshal(res).to[String], 10.seconds))
+        }
+    }
 }
 
-class PingTests {
-    @Rule
-    def watcher(): TestRule = TestUtils.makeTestWatcher
+@RunWith(classOf[JUnitRunner])
+class PingTests extends FlatSpec
+    with Matchers
+    with WskTestHelpers
+    with ScalaFutures
+    with WskActorSystem {
+
+    implicit val materializer = ActorMaterializer()
 
     /**
      * Check that the docker REST interface at endpoint is up. envVar is the
@@ -61,45 +89,41 @@ class PingTests {
         assertTrue(response.contains("Containers"))
     }
 
-    /**
-     * Check that the main docker endpoint is functioning.
-     */
-    @Test
-    def pingMainDocker(): Unit = {
+    behavior of "PingTest"
+
+    it should "check that the main docker endpoint is functioning" in {
         pingDocker("main.docker.endpoint", WhiskProperties.getMainDockerEndpoint)
     }
 
-    /**
-     * Check the kafka docker endpoint is functioning.
-     */
-    @Test
-    def pingKafkaDocker(): Unit = {
+    it should "Check the kafka docker endpoint is functioning" in {
         pingDocker("kafka.docker.endpoint", WhiskProperties.getKafkaDockerEndpoint)
     }
 
-    /**
-     * Check that the zookeeper endpoint is up and running
-     */
-    @Test
-    def pingZookeeper(): Unit = {
-        PingTests.isAlive("zookeeper", WhiskProperties.getFileRelativeToWhiskHome(".").getAbsolutePath)
+    it should "check that the zookeeper endpoint is up and running" in {
+        PingTests.isAliveScript("zookeeper", WhiskProperties.getFileRelativeToWhiskHome(".").getAbsolutePath)
     }
 
-    /**
-     * Check that the invoker endpoints are up and running
-     */
-    @Test
-    def pingInvoker(): Unit = {
-        for (i <- 0 until WhiskProperties.numberOfInvokers) {
-            PingTests.isAlive("invoker" + i, WhiskProperties.getFileRelativeToWhiskHome(".").getAbsolutePath)
+    it should "Check that the invoker endpoints are up and running" in {
+        val basePort = WhiskProperties.getProperty("invoker.hosts.baseport").toInt
+        val invokers = WhiskProperties.getInvokerHosts.zipWithIndex.map {
+            case (invoker, instance) =>
+                val res = PingTests.ping(invoker, basePort + instance)
+
+                res shouldBe defined
+                res.get._1 shouldBe StatusCodes.OK
+                res.get._2 shouldBe "pong"
         }
     }
 
-    /**
-     * Check that the controller endpoint is up and running
-     */
-    @Test
-    def pingController(): Unit = {
-        PingTests.isAlive("controller", WhiskProperties.getFileRelativeToWhiskHome(".").getAbsolutePath)
+    it should "check that the controller endpoint is up and running" in {
+        val basePort = WhiskProperties.getControllerBasePort()
+        val controllers = WhiskProperties.getControllerHosts().split(",").zipWithIndex.map {
+            case (controller, instance) =>
+                val res = PingTests.ping(controller, basePort + instance)
+
+                res shouldBe defined
+                res.get._1 shouldBe StatusCodes.OK
+                res.get._2 shouldBe "pong"
+        }
     }
 }
diff --git a/tests/src/test/scala/whisk/core/cli/test/WskBasicUsageTests.scala b/tests/src/test/scala/whisk/core/cli/test/WskBasicUsageTests.scala
index ec526b4..a5cf39c 100644
--- a/tests/src/test/scala/whisk/core/cli/test/WskBasicUsageTests.scala
+++ b/tests/src/test/scala/whisk/core/cli/test/WskBasicUsageTests.scala
@@ -151,7 +151,7 @@ class WskBasicUsageTests
         val tmpwskprops = File.createTempFile("wskprops", ".tmp")
         try {
             val env = Map("WSK_CONFIG_FILE" -> tmpwskprops.getAbsolutePath())
-            val apihost = s"http://${WhiskProperties.getControllerHost}:${WhiskProperties.getControllerPort}"
+            val apihost = s"http://${WhiskProperties.getBaseControllerAddress()}"
             wsk.cli(Seq("property", "set", "--apihost", apihost), env = env)
             val rr = wsk.cli(Seq("property", "get", "--apibuild", "-i"), env = env)
             rr.stdout should not include regex("""whisk API build\s*Unknown""")
diff --git a/tests/src/test/scala/whisk/core/container/test/ContainerPoolTests.scala b/tests/src/test/scala/whisk/core/container/test/ContainerPoolTests.scala
index 43fbae1..42d2e42 100644
--- a/tests/src/test/scala/whisk/core/container/test/ContainerPoolTests.scala
+++ b/tests/src/test/scala/whisk/core/container/test/ContainerPoolTests.scala
@@ -41,6 +41,7 @@ import whisk.core.container.ContainerPool
 import whisk.core.entity.AuthKey
 import whisk.core.entity.EntityName
 import whisk.core.entity.EntityPath
+import whisk.core.entity.InstanceId
 import whisk.core.entity.WhiskAction
 import whisk.core.entity.WhiskAuthStore
 import whisk.core.entity.WhiskEntityStore
@@ -72,7 +73,7 @@ class ContainerPoolTests extends FlatSpec
 
     assert(config.isValid)
 
-    val pool = new ContainerPool(config, 0, true, true)
+    val pool = new ContainerPool(config, InstanceId(0), true, true)
     pool.logDir = "/tmp"
 
     val datastore = WhiskEntityStore.datastore(config)
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
index e262525..d386597 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
@@ -77,6 +77,7 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool"))
             Identity(Subject(), invocationNamespace, AuthKey(), Set()),
             ActivationId(),
             invocationNamespace.toPath,
+            InstanceId(0),
             None)
         Run(action, message)
     }
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index f7a7e12..3f3f24b 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -79,6 +79,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
         Identity(Subject(), invocationNamespace, AuthKey(), Set()),
         ActivationId(),
         invocationNamespace.toPath,
+        InstanceId(0),
         None)
 
     /*
@@ -130,7 +131,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
     }
 
     /** Creates an inspectable version of the ack method, which records all calls in a buffer */
-    def createAcker = LoggedFunction { (_: TransactionId, _: WhiskActivation) => Future.successful(()) }
+    def createAcker = LoggedFunction { (_: TransactionId, _: WhiskActivation, _: InstanceId) => Future.successful(()) }
 
     /** Creates an inspectable factory */
     def createFactory(response: Future[Container]) = LoggedFunction {
diff --git a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
index 276e15f..8d8dc1d 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
@@ -37,8 +37,8 @@ import whisk.common.TransactionCounter
 import whisk.common.TransactionId
 import whisk.core.WhiskConfig
 import whisk.core.connector.ActivationMessage
-import whisk.core.controller.WhiskServices
 import whisk.core.controller.RestApiCommons
+import whisk.core.controller.WhiskServices
 import whisk.core.database.DocumentFactory
 import whisk.core.database.test.DbUtils
 import whisk.core.entitlement._
@@ -59,6 +59,10 @@ protected trait ControllerTestCommon
     with HttpService
     with StreamLogging {
 
+    override val instance = InstanceId(0)
+    override val numberOfInstances = 1
+    val activeAckTopicIndex = InstanceId(0)
+
     override val actorRefFactory = null
     implicit val routeTestTimeout = RouteTestTimeout(90 seconds)
 
diff --git a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
index 7ad2eef..355c2ec 100644
--- a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
+++ b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
@@ -38,15 +38,16 @@ import whisk.core.database.ArtifactStore
 import whisk.core.database.CouchDbRestClient
 import whisk.core.database.DocumentFactory
 import whisk.core.database.NoDocumentException
+import whisk.core.entity.AuthKey
 import whisk.core.entity.DocId
 import whisk.core.entity.DocInfo
 import whisk.core.entity.EntityPath
 import whisk.core.entity.Identity
+import whisk.core.entity.InstanceId
 import whisk.core.entity.WhiskDocument
 import whisk.core.entity.WhiskEntityQueries
 import whisk.core.entity.types.AuthStore
 import whisk.core.entity.types.EntityStore
-import whisk.core.entity.AuthKey
 
 /**
  * WARNING: the put/get/del operations in this trait operate directly on the datastore,
@@ -56,6 +57,8 @@ import whisk.core.entity.AuthKey
  */
 trait DbUtils extends TransactionCounter {
     implicit val dbOpTimeout = 15 seconds
+    override val numberOfInstances = 1
+    override val instance = InstanceId(0)
     val docsToDelete = ListBuffer[(ArtifactStore[_], DocInfo)]()
     case class RetryOp() extends Throwable
 
diff --git a/tests/src/test/scala/whisk/core/dispatcher/test/DispatcherTests.scala b/tests/src/test/scala/whisk/core/dispatcher/test/DispatcherTests.scala
index f867b75..3084270 100644
--- a/tests/src/test/scala/whisk/core/dispatcher/test/DispatcherTests.scala
+++ b/tests/src/test/scala/whisk/core/dispatcher/test/DispatcherTests.scala
@@ -65,7 +65,7 @@ class DispatcherTests
         val content = JsObject("payload" -> JsNumber(count))
         val user = WhiskAuth(Subject(), AuthKey()).toIdentity
         val path = FullyQualifiedEntityName(EntityPath("test"), EntityName(s"count-$count"), Some(SemVer()))
-        val msg = Message(TransactionId.testing, path, DocRevision.empty, user, ActivationId(), EntityPath(user.subject.asString), Some(content))
+        val msg = Message(TransactionId.testing, path, DocRevision.empty, user, ActivationId(), EntityPath(user.subject.asString), InstanceId(0), Some(content))
         connector.send(msg)
     }
 
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index aac1c43..d3feabe 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -61,6 +61,7 @@ import whisk.core.entity.EntityPath
 import whisk.core.entity.ExecManifest
 import whisk.core.entity.FullyQualifiedEntityName
 import whisk.core.entity.Identity
+import whisk.core.entity.InstanceId
 import whisk.core.entity.Secret
 import whisk.core.entity.Subject
 import whisk.core.entity.UUID
@@ -224,6 +225,7 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
             user = Identity(Subject("unhealthyInvokerCheck"), EntityName("unhealthyInvokerCheck"), AuthKey(UUID(), Secret()), Set[Privilege]()),
             activationId = new ActivationIdGenerator {}.make(),
             activationNamespace = EntityPath("guest"),
+            rootControllerIndex = InstanceId(0),
             content = None)
         val msg = ActivationRequest(activationMessage, invokerName)
 
@@ -241,7 +243,7 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
     // offline -> unhealthy
     it should "start unhealthy, go offline if the state times out and goes unhealthy on a successful ping again" in {
         val pool = TestProbe()
-        val invoker = pool.system.actorOf(InvokerActor.props)
+        val invoker = pool.system.actorOf(InvokerActor.props(InstanceId(0)))
 
         within(timeout.duration) {
             pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
@@ -257,7 +259,7 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
     // unhealthy -> healthy
     it should "goto healthy again, if unhealthy and error buffer has enough successful invocations" in {
         val pool = TestProbe()
-        val invoker = pool.system.actorOf(InvokerActor.props)
+        val invoker = pool.system.actorOf(InvokerActor.props(InstanceId(0)))
 
         within(timeout.duration) {
             pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
@@ -280,7 +282,7 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
     // offline -> unhealthy
     it should "go offline when unhealthy, if the state times out and go unhealthy on a successful ping again" in {
         val pool = TestProbe()
-        val invoker = pool.system.actorOf(InvokerActor.props)
+        val invoker = pool.system.actorOf(InvokerActor.props(InstanceId(0)))
 
         within(timeout.duration) {
             pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
@@ -295,7 +297,7 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
     }
 
     it should "start timer to send testactions when unhealthy" in {
-        val invoker = TestFSMRef(new InvokerActor)
+        val invoker = TestFSMRef(new InvokerActor(InstanceId(0)))
         invoker.stateName shouldBe UnHealthy
 
         invoker.isTimerActive(InvokerActor.timerName) shouldBe true
diff --git a/tools/admin/wskadmin b/tools/admin/wskadmin
index 659e81d..3e4b342 100755
--- a/tools/admin/wskadmin
+++ b/tools/admin/wskadmin
@@ -121,7 +121,7 @@ def parseArgs():
     subparser = propmenu.add_subparsers(title='available commands', dest='subcmd')
 
     subcmd = subparser.add_parser('get', help='get logs')
-    subcmd.add_argument('components', help='components, one or more of [controller, invokerN] where N is invoker index', nargs='*', default=['controller', 'invoker0'])
+    subcmd.add_argument('components', help='components, one or more of [controllerN, invokerN] where N is the instance', nargs='*', default=['controller0', 'invoker0'])
     subcmd.add_argument('-t', '--tid', help='retrieve logs for the transaction id')
     subcmd.add_argument('-g', '--grep', help='retrieve logs that match grep expression')
 
diff --git a/tools/build/checkLogs.py b/tools/build/checkLogs.py
index b7f9e5d..cf315b7 100755
--- a/tools/build/checkLogs.py
+++ b/tools/build/checkLogs.py
@@ -91,7 +91,7 @@ if __name__ == "__main__":
         ("db-rules.log", [ partial(database_has_at_most_x_entries, 0) ]),
         ("db-triggers.log", [ partial(database_has_at_most_x_entries, 0) ]),
         # Assert that stdout of the container is correctly piped and empty
-        ("controller.log", [ partial(file_has_at_most_x_bytes, 0) ]),
+        ("controller0.log", [ partial(file_has_at_most_x_bytes, 0) ]),
         ("invoker0.log", [ partial(file_has_at_most_x_bytes, 0) ])
     ]
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].