You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2022/11/01 02:07:17 UTC

[openwhisk] branch master updated: Add zero downtime deployment (#5338)

This is an automated email from the ASF dual-hosted git repository.

style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 651a2e957 Add zero downtime deployment (#5338)
651a2e957 is described below

commit 651a2e95726f69fb8403c49ce909371521e8986f
Author: Dominic Kim <st...@apache.org>
AuthorDate: Tue Nov 1 11:07:12 2022 +0900

    Add zero downtime deployment (#5338)
    
    * Deploy controllers without downtime
    
    * Deploy invokers without downtime
    
    * Deploy schedulers without downtime
    
    * Fix typo
    
    * Fix typo
    
    * Add a disable API to controllers
    
    * Remove unnecessary steps
    
    * Add more logs for container liveness
    
    * Change Set to thread-safe one
    
    * Use the transaction ID of the activation
    
    * Gracefully shutdown activation client proxy
    
    * Update core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
    
    Apply suggestion
    
    Co-authored-by: Brendan Doyle <bd...@gmail.com>
    
    * Update core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
    
    Apply suggestion
    
    Co-authored-by: Brendan Doyle <bd...@gmail.com>
    
    * Update core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
    
    Co-authored-by: Brendan Doyle <bd...@gmail.com>
    
    * Apply https://github.com/apache/openwhisk/pull/5334
    
    * Remove akka-http dependency from the invoker reactive
    
    * Exclude the prewarm containers count from the /pool/count route
    
    * Add missing import
    
    * Make it compatible with scala-2.13
    
    In scala-2.13 mapValues returns a MapView, and it cannot be cast to Map by default.
    
    * Fix test cases
    
    * Add container id to the logs of ActivationClientProxy
    
    Co-authored-by: Brendan Doyle <bd...@gmail.com>
---
 ansible/group_vars/all                             | 13 +++-
 ansible/roles/controller/tasks/deploy.yml          | 83 ++++++++++++++++++++++
 ansible/roles/invoker/tasks/clean.yml              | 33 +++++++++
 ansible/roles/invoker/tasks/deploy.yml             |  6 ++
 ansible/roles/schedulers/tasks/deploy.yml          | 21 ++----
 ansible/templates/whisk.properties.j2              |  4 ++
 .../org/apache/openwhisk/core/WhiskConfig.scala    |  3 +
 .../apache/openwhisk/core/connector/Message.scala  |  8 ++-
 .../openwhisk/core/service/WatcherService.scala    | 10 ++-
 .../controller/src/main/resources/application.conf |  4 ++
 .../openwhisk/core/controller/Controller.scala     | 46 +++++++++++-
 .../core/loadBalancer/CommonLoadBalancer.scala     |  5 +-
 .../core/loadBalancer/FPCPoolBalancer.scala        |  5 +-
 .../openwhisk/core/loadBalancer/LoadBalancer.scala |  2 +-
 .../containerpool/v2/ActivationClientProxy.scala   | 61 ++++++++--------
 .../v2/FunctionPullingContainerPool.scala          | 36 +++++++++-
 .../v2/FunctionPullingContainerProxy.scala         | 33 ++++++---
 .../containerpool/v2/InvokerHealthManager.scala    |  2 +-
 .../core/invoker/DefaultInvokerServer.scala        |  6 +-
 .../core/invoker/FPCInvokerReactive.scala          | 27 ++++---
 .../openwhisk/core/invoker/FPCInvokerServer.scala  | 28 +++++++-
 .../apache/openwhisk/core/invoker/Invoker.scala    | 11 +--
 .../openwhisk/core/invoker/InvokerReactive.scala   | 31 ++++----
 .../core/scheduler/FPCSchedulerServer.scala        | 28 +++++---
 .../openwhisk/core/scheduler/Scheduler.scala       |  4 +-
 .../scheduler/grpc/ActivationServiceImpl.scala     |  2 +
 .../core/scheduler/queue/MemoryQueue.scala         | 52 +++++++++-----
 .../core/scheduler/queue/QueueManager.scala        | 10 +--
 .../v2/test/ActivationClientProxyTests.scala       | 10 +--
 .../test/FunctionPullingContainerPoolTests.scala   | 49 ++++++++++---
 .../test/FunctionPullingContainerProxyTests.scala  |  4 +-
 .../controller/test/ControllerTestCommon.scala     |  2 +-
 .../invoker/test/DefaultInvokerServerTests.scala   | 22 ++++--
 .../core/invoker/test/FPCInvokerServerTests.scala  | 22 ++++--
 .../core/scheduler/FPCSchedulerServerTests.scala   | 15 ++--
 .../queue/test/MemoryQueueFlowTests.scala          | 83 +++++++++++++++-------
 .../scheduler/queue/test/MemoryQueueTests.scala    | 50 +++++++------
 .../scheduler/queue/test/QueueManagerTests.scala   |  9 +--
 38 files changed, 607 insertions(+), 233 deletions(-)

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index bac6e2372..072e4aebb 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -112,6 +112,8 @@ controller:
   authentication:
     spi: "{{ controller_authentication_spi | default('') }}"
   loglevel: "{{ controller_loglevel | default(whisk_loglevel) | default('INFO') }}"
+  username: "{{ controller_username | default('controller.user') }}"
+  password: "{{ controller_password | default('controller.pass') }}"
   entitlement:
     spi: "{{ controller_entitlement_spi | default('') }}"
   protocol: "{{ controller_protocol | default('https') }}"
@@ -126,6 +128,10 @@ controller:
       password: "openwhisk"
       name: "{{ __controller_ssl_keyPrefix }}openwhisk-keystore.p12"
   extraEnv: "{{ controller_extraEnv | default({}) }}"
+  deployment:
+    ignore_error: "{{ controller_deployment_ignore_error | default('False') }}"
+    retries: "{{ controller_deployment_retries | default(180) }}"
+    delay: "{{ controller_deployment_delay | default(5) }}"
 
 jmx:
   basePortController: 15000
@@ -234,6 +240,10 @@ invoker:
     creationMaxPeek: "{{ container_creation_max_peek | default(500) }}"
   reactiveSpi: "{{ invokerReactive_spi | default('') }}"
   serverSpi: "{{ invokerServer_spi | default('') }}"
+  deployment:
+    ignore_error: "{{ invoker_deployment_ignore_error | default('False') }}"
+    retries: "{{ invoker_deployment_retries | default(180) }}"
+    delay: "{{ invoker_deployment_delay | default(5) }}"
 
 userLogs:
   spi: "{{ userLogs_spi | default('org.apache.openwhisk.core.containerpool.logging.DockerToActivationLogStoreProvider') }}"
@@ -450,8 +460,7 @@ metrics:
 user_events: "{{ user_events_enabled | default(false) | lower }}"
 
 zeroDowntimeDeployment:
-  enabled: "{{ zerodowntime_deployment_switch | default(true) }}"
-  solution: "{{ zerodowntime_deployment_solution | default('apicall') }}"
+  enabled: "{{ zerodowntime_deployment_switch | default(false) }}"
 
 etcd:
   version: "{{ etcd_version | default('v3.4.0') }}"
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 26d241ce6..5c46392a3 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -155,6 +155,8 @@
       "CONFIG_whisk_info_date": "{{ whisk.version.date }}"
       "CONFIG_whisk_info_buildNo": "{{ docker.image.tag }}"
       "CONFIG_whisk_cluster_name": "{{ whisk.cluster_name | lower }}"
+      "CONFIG_whisk_controller_username": "{{ controller.username }}"
+      "CONFIG_whisk_controller_password": "{{ controller.password }}"
 
       "KAFKA_HOSTS": "{{ kafka_connect_string }}"
       "CONFIG_whisk_kafka_replicationFactor":
@@ -363,6 +365,53 @@
   include_tasks: "lean.yml"
   when: lean
 
+# Before redeploy controller, should remove that controller instance from nginx
+- name: remove the controller from nginx's upstream configuration
+  shell:
+    docker exec -t nginx sh -c "sed -i  \"s/ server {{ ansible_host }}:{{ controller.basePort + (controller_index | int) }}/ \#server {{ ansible_host }}:{{ controller.basePort + (controller_index | int) }}/g\"  /etc/nginx/nginx.conf  && nginx -s reload"
+  delegate_to: "{{ item }}"
+  with_items: "{{ groups['edge'] }}"
+  when: zeroDowntimeDeployment.enabled == true
+
+- name: wait some time for controllers fire all existing triggers
+  shell: sleep 5s
+  when: zeroDowntimeDeployment.enabled == true
+
+- name: wait until {{ controller_name }} executes all existing activations
+  uri:
+    url: "{{ controller.protocol }}://{{ ansible_host }}:{{ controller.basePort + (controller_index | int) }}/activation/count"
+    validate_certs: no
+    client_key: "{{ controller.confdir }}/controller{{ groups['controllers'].index(inventory_hostname) }}/{{ controller.ssl.key }}"
+    client_cert: "{{ controller.confdir }}/controller{{ groups['controllers'].index(inventory_hostname) }}/{{ controller.ssl.cert }}"
+    return_content: yes
+    user: "{{ controller.username }}"
+    password: "{{ controller.password }}"
+    force_basic_auth: yes
+  register: result
+  until: result.content == '0'
+  retries: "{{ controller.deployment.retries }}"
+  delay: "{{ controller.deployment.delay }}"
+  when: zeroDowntimeDeployment.enabled == true
+  ignore_errors: "{{ controller.deployment.ignore_error }}"
+
+- name: Disable {{ controller_name }} before remove controller
+  uri:
+    url: "{{ controller.protocol }}://{{ ansible_host }}:{{ controller.basePort + groups['controllers'].index(inventory_hostname) }}/disable"
+    validate_certs: no
+    client_key: "{{ controller.confdir }}/{{ controller_name }}/{{ controller.ssl.key }}"
+    client_cert: "{{ controller.confdir }}/{{ controller_name }}/{{ controller.ssl.cert }}"
+    method: POST
+    status_code: 200
+    user: "{{ controller.username }}"
+    password: "{{ controller.password }}"
+    force_basic_auth: yes
+  ignore_errors: "{{ controller.deployment.ignore_error }}"
+  when: zeroDowntimeDeployment.enabled == true
+
+- name: wait some time for controller to gracefully shutdown the consumer for activation ack
+  shell: sleep 5s
+  when: zeroDowntimeDeployment.enabled == true
+
 - name: (re)start controller
   docker_container:
     name: "{{ controller_name }}"
@@ -397,3 +446,37 @@
   until: result.status == 200
   retries: 12
   delay: 10
+
+- name: warm up activation path
+  uri:
+    url:
+      "{{controller.protocol}}://{{ lookup('file', '{{ catalog_auth_key }}')}}@{{ansible_host}}:{{controller_port}}/api/v1/namespaces/_/actions/invokerHealthTestAction{{controller_index}}?blocking=false&result=false"
+    validate_certs: "no"
+    client_key:
+      "{{ controller.confdir }}/{{ controller_name }}/{{ controller.ssl.key }}"
+    client_cert:
+      "{{ controller.confdir }}/{{ controller_name }}/{{ controller.ssl.cert }}"
+    method: POST
+  ignore_errors: True
+
+- name: wait for all invokers in {{ controller_name }} to become up
+  uri:
+    url: "{{ controller.protocol }}://{{ ansible_host }}:{{ controller.basePort + (controller_index | int) }}/invokers"
+    validate_certs: no
+    client_key: "{{ controller.confdir }}/controller{{ groups['controllers'].index(inventory_hostname) }}/{{ controller.ssl.key }}"
+    client_cert: "{{ controller.confdir }}/controller{{ groups['controllers'].index(inventory_hostname) }}/{{ controller.ssl.cert }}"
+    return_content: yes
+  register: invokerStatus
+  until: invokerStatus.json|length >= 1 and "unhealthy" not in invokerStatus.content
+  retries: 14
+  delay: 5
+  when: zeroDowntimeDeployment.enabled == true
+
+# When all invokers report their status to controller, add the controller instance to nginx when exist at least one invoker is up
+- name: Add the controller back to nginx's upstream configuration when there exist at least one healthy invoker
+  shell:
+    docker exec -t nginx sh -c "sed -i  \"s/ \#server {{ ansible_host }}:{{ controller.basePort + (controller_index | int) }}/ server {{ ansible_host }}:{{ controller.basePort + (controller_index | int) }}/g\"  /etc/nginx/nginx.conf  && nginx -s reload"
+  delegate_to: "{{ item }}"
+  with_items: "{{ groups['edge'] }}"
+  ignore_errors: True
+  when: zeroDowntimeDeployment.enabled == true and "up" in invokerStatus.content
diff --git a/ansible/roles/invoker/tasks/clean.yml b/ansible/roles/invoker/tasks/clean.yml
index 8d7ebaca2..b8f0b2f34 100644
--- a/ansible/roles/invoker/tasks/clean.yml
+++ b/ansible/roles/invoker/tasks/clean.yml
@@ -22,6 +22,37 @@
     invoker_name: "{{ name_prefix ~ ((invoker_index_base | int) + host_group.index(inventory_hostname)) }}"
     invoker_index: "{{ (invoker_index_base | int) + host_group.index(inventory_hostname) }}"
 
+- name: disable invoker{{ groups['invokers'].index(inventory_hostname) }}
+  uri:
+    url: "{{ invoker.protocol }}://{{ ansible_host }}:{{ invoker.port + groups['invokers'].index(inventory_hostname) }}/disable"
+    validate_certs: no
+    client_key: "{{ invoker.confdir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}/{{ invoker.ssl.key }}"
+    client_cert: "{{ invoker.confdir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}/{{ invoker.ssl.cert }}"
+    method: POST
+    status_code: 200
+    user: "{{ invoker.username }}"
+    password: "{{ invoker.password }}"
+    force_basic_auth: yes
+  ignore_errors: "{{ invoker.deployment.ignore_error }}"
+  when: zeroDowntimeDeployment.enabled == true and enable_scheduler
+
+- name: wait invoker{{ groups['invokers'].index(inventory_hostname) }} to clean up all existing containers
+  uri:
+    url: "{{ invoker.protocol }}://{{ ansible_host }}:{{ invoker.port + groups['invokers'].index(inventory_hostname) }}/pool/count"
+    validate_certs: no
+    client_key: "{{ invoker.confdir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}/{{ invoker.ssl.key }}"
+    client_cert: "{{ invoker.confdir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}/{{ invoker.ssl.cert }}"
+    user: "{{ invoker.username }}"
+    password: "{{ invoker.password }}"
+    force_basic_auth: yes
+    return_content: yes
+  register: result
+  until: result.content == '0'
+  retries: "{{ invoker.deployment.retries }}"
+  delay: "{{ invoker.deployment.delay }}"
+  when: zeroDowntimeDeployment.enabled == true and enable_scheduler
+  ignore_errors: "{{ invoker.deployment.ignore_error }}"
+
 - name: remove invoker
   docker_container:
     name: "{{ invoker_name }}"
@@ -59,12 +90,14 @@
     path: "{{ whisk_logs_dir }}/{{ invoker_name }}"
     state: absent
   become: "{{ logs.dir.become }}"
+  when: mode == "clean"
 
 - name: remove invoker conf directory
   file:
     path: "{{ invoker.confdir }}/{{ invoker_name }}"
     state: absent
   become: "{{ invoker.dir.become }}"
+  when: mode == "clean"
 
 # Workaround for orphaned ifstate.veth* files on Ubuntu 14.04
 # See https://github.com/moby/moby/issues/22513
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 0223df97d..62e21fb34 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -17,6 +17,12 @@
 ---
 # This role installs invokers.
 
+###
+# When the zero-downtime-deployment is enabled, clean.yml is used to gracefully shut down the invoker.
+#
+- import_tasks: clean.yml
+  when: zeroDowntimeDeployment.enabled == true and enable_scheduler
+
 - import_tasks: docker_login.yml
 
 - name: get invoker name and index
diff --git a/ansible/roles/schedulers/tasks/deploy.yml b/ansible/roles/schedulers/tasks/deploy.yml
index d1ec6e487..280ea68b7 100644
--- a/ansible/roles/schedulers/tasks/deploy.yml
+++ b/ansible/roles/schedulers/tasks/deploy.yml
@@ -280,11 +280,6 @@
   include_tasks: "{{ item }}.yml"
   with_items: "{{ scheduler_plugins | default([]) }}"
 
-- name: Judge current scheduler whether deployed
-  shell: echo $(docker ps | grep {{ scheduler_name }} | wc -l)
-  register: schedulerDeployed
-  when: zeroDowntimeDeployment.enabled == true
-
 - name: disable scheduler{{ groups['schedulers'].index(inventory_hostname) }} before redeploy scheduler
   uri:
     url: "{{ scheduler.protocol }}://{{ ansible_host }}:{{ scheduler_port }}/disable"
@@ -295,27 +290,23 @@
     password: "{{ scheduler.password }}"
     force_basic_auth: yes
   ignore_errors: "{{ scheduler.deployment_ignore_error }}"
-  when: zeroDowntimeDeployment.enabled == true and schedulerDeployed.stdout != "0"
+  when: zeroDowntimeDeployment.enabled == true
 
-- name: wait until all queue and create queue task is finished before redeploy scheduler when using apicall solution or half solution
+- name: wait until all activation is finished before redeploy scheduler
   uri:
-    url: "{{ scheduler.protocol }}://{{ ansible_host }}:{{ scheduler_port }}/queue/total"
+    url: "{{ scheduler.protocol }}://{{ ansible_host }}:{{ scheduler_port }}/activation/count"
     validate_certs: no
     return_content: yes
     user: "{{ scheduler.username }}"
     password: "{{ scheduler.password }}"
     force_basic_auth: yes
-  register: totalQueue
-  until: totalQueue.content == "0"
+  register: result
+  until: result.content == "0"
   retries: 180
   delay: 5
-  when: zeroDowntimeDeployment.enabled == true and schedulerDeployed.stdout != "0"
+  when: zeroDowntimeDeployment.enabled == true
   ignore_errors: "{{ scheduler.deployment_ignore_error }}"
 
-- name: wait until all queue and create queue task is finished before redeploy scheduler using sleep solution
-  shell: sleep 120s
-  when: zeroDowntimeDeployment.enabled == true and schedulerDeployed.stdout != "0" and zeroDowntimeDeployment.solution == 'sleep'
-
 - name: (re)start scheduler
   docker_container:
     name: "{{ scheduler_name }}"
diff --git a/ansible/templates/whisk.properties.j2 b/ansible/templates/whisk.properties.j2
index 03ade2b2d..e9dc2c2dd 100644
--- a/ansible/templates/whisk.properties.j2
+++ b/ansible/templates/whisk.properties.j2
@@ -53,11 +53,15 @@ edge.host.apiport=443
 kafkaras.host.port={{ kafka.ras.port }}
 redis.host.port={{ redis.port }}
 invoker.hosts.basePort={{ invoker.port }}
+invoker.username={{ invoker.username }}
+invoker.password={{ invoker.password }}
 
 controller.hosts={{ groups["controllers"] | map('extract', hostvars, 'ansible_host') | list | join(",") }}
 controller.host.basePort={{ controller.basePort }}
 controller.instances={{ controller.instances }}
 controller.protocol={{ controller.protocol }}
+controller.username={{ controller.username }}
+controller.password={{ controller.password }}
 
 invoker.container.network=bridge
 invoker.container.policy={{ invoker_container_policy_name | default()}}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index c029edb9d..f9039e947 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -312,6 +312,9 @@ object ConfigKeys {
 
   val dataManagementServiceRetryInterval = "whisk.scheduler.data-management-service.retry-interval"
 
+  val whiskControllerUsername = "whisk.controller.username"
+  val whiskControllerPassword = "whisk.controller.password"
+
   val whiskSchedulerUsername = "whisk.scheduler.username"
   val whiskSchedulerPassword = "whisk.scheduler.password"
 
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
index b65b11496..da0c4d102 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
@@ -532,9 +532,13 @@ object InvokerResourceMessage extends DefaultJsonProtocol {
  * ...
  * ]
  */
-object StatusQuery
+object GetState
 
-case class StatusData(invocationNamespace: String, fqn: String, waitingActivation: Int, status: String, data: String)
+case class StatusData(invocationNamespace: String,
+                      fqn: String,
+                      waitingActivation: List[ActivationId],
+                      status: String,
+                      data: String)
     extends Message {
 
   override def serialize: String = StatusData.serdes.write(this).compactPrint
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala
index 2c544185e..e5f3397da 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala
@@ -20,9 +20,10 @@ package org.apache.openwhisk.core.service
 import akka.actor.{Actor, ActorRef, ActorSystem, Props}
 import com.ibm.etcd.api.Event.EventType
 import com.ibm.etcd.client.kv.WatchUpdate
-import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.common.{GracefulShutdown, Logging}
 import org.apache.openwhisk.core.etcd.EtcdClient
 import org.apache.openwhisk.core.etcd.EtcdType._
+
 import scala.collection.JavaConverters._
 import scala.collection.concurrent.TrieMap
 
@@ -141,6 +142,13 @@ class WatcherService(etcdClient: EtcdClient)(implicit logging: Logging, actorSys
       // always send WatcherClosed back to sender if it need a feedback
       if (request.needFeedback)
         sender ! WatcherClosed(request.watchKey, request.isPrefix)
+
+    case GracefulShutdown =>
+      watcher.close()
+      putWatchers.clear()
+      deleteWatchers.clear()
+      prefixPutWatchers.clear()
+      prefixDeleteWatchers.clear()
   }
 }
 
diff --git a/core/controller/src/main/resources/application.conf b/core/controller/src/main/resources/application.conf
index e7bf1a956..7467b2df1 100644
--- a/core/controller/src/main/resources/application.conf
+++ b/core/controller/src/main/resources/application.conf
@@ -122,4 +122,8 @@ whisk{
     file-system : true
     dir-path : "/swagger-ui/"
   }
+  controller {
+    username: "controller.user"
+    password: "controller.pass"
+  }
 }
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
index 0e662faa4..2ff1ecb56 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
@@ -22,7 +22,8 @@ import akka.actor.{ActorSystem, CoordinatedShutdown}
 import akka.event.Logging.InfoLevel
 import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
 import akka.http.scaladsl.model.StatusCodes._
-import akka.http.scaladsl.model.Uri
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.model.{StatusCodes, Uri}
 import akka.http.scaladsl.server.Route
 import kamon.Kamon
 import org.apache.openwhisk.common.Https.HttpsConfig
@@ -36,6 +37,7 @@ import org.apache.openwhisk.core.entity.ActivationId.ActivationIdGenerator
 import org.apache.openwhisk.core.entity.ExecManifest.Runtimes
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.loadBalancer.LoadBalancerProvider
+import org.apache.openwhisk.http.ErrorResponse.terminate
 import org.apache.openwhisk.http.{BasicHttpService, BasicRasService}
 import org.apache.openwhisk.spi.SpiLoader
 import pureconfig._
@@ -96,7 +98,7 @@ class Controller(val instance: ControllerInstanceId,
       (pathEndOrSingleSlash & get) {
         complete(info)
       }
-    } ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth
+    } ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth ~ activationStatus ~ disable
   }
 
   // initialize datastores
@@ -167,6 +169,22 @@ class Controller(val instance: ControllerInstanceId,
     }
   }
 
+  /**
+   * Handles GET /activation URI.
+   *
+   * @return running activation
+   */
+  protected[controller] val activationStatus = {
+    implicit val executionContext = actorSystem.dispatcher
+    (pathPrefix("activation") & get) {
+      pathEndOrSingleSlash {
+        complete(loadBalancer.activeActivationsByController.map(_.toJson))
+      } ~ path("count") {
+        complete(loadBalancer.activeActivationsByController(controllerInstance.asString).map(_.toJson))
+      }
+    }
+  }
+
   // controller top level info
   private val info = Controller.info(
     whiskConfig,
@@ -175,6 +193,30 @@ class Controller(val instance: ControllerInstanceId,
     LogLimit.config,
     runtimes,
     List(apiV1.basepath()))
+
+  private val controllerUsername = loadConfigOrThrow[String](ConfigKeys.whiskControllerUsername)
+  private val controllerPassword = loadConfigOrThrow[String](ConfigKeys.whiskControllerPassword)
+
+  /**
+   * disable controller
+   */
+  private def disable(implicit transid: TransactionId) = {
+    implicit val executionContext = actorSystem.dispatcher
+    implicit val jsonPrettyResponsePrinter = PrettyPrinter
+    (path("disable") & post) {
+      extractCredentials {
+        case Some(BasicHttpCredentials(username, password)) =>
+          if (username == controllerUsername && password == controllerPassword) {
+            loadBalancer.close
+            logging.warn(this, "controller is disabled")
+            complete("controller is disabled")
+          } else {
+            terminate(StatusCodes.Unauthorized, "username or password are wrong.")
+          }
+        case _ => terminate(StatusCodes.Unauthorized)
+      }
+    }
+  }
 }
 
 /**
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
index 045828d66..1a204cf42 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
@@ -86,8 +86,9 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
   override def totalActiveActivations: Future[Int] = Future.successful(totalActivations.intValue)
   override def activeActivationsByController(controller: String): Future[Int] =
     Future.successful(activationsPerController.get(ControllerInstanceId(controller)).map(_.intValue()).getOrElse(0))
-  override def activeActivationsByController: Future[List[ActivationId]] =
-    Future.successful(activationSlots.keySet.toList)
+  override def activeActivationsByController: Future[List[(String, String)]] =
+    Future.successful(
+      activationSlots.values.map(entry => (entry.id.asString, entry.fullyQualifiedEntityName.toString)).toList)
   override def activeActivationsByInvoker(invoker: String): Future[Int] =
     Future.successful(
       activationsPerInvoker.get(InvokerInstanceId(invoker.toInt, userMemory = 0.MB)).map(_.intValue()).getOrElse(0))
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
index 3c247e0dd..bf04d1d2a 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
@@ -663,8 +663,9 @@ class FPCPoolBalancer(config: WhiskConfig,
     Future.successful(activationsPerController.get(ControllerInstanceId(controller)).map(_.intValue()).getOrElse(0))
 
   /** Gets the in-flight activations */
-  override def activeActivationsByController: Future[List[ActivationId]] =
-    Future.successful(activationSlots.keySet.toList)
+  override def activeActivationsByController: Future[List[(String, String)]] =
+    Future.successful(
+      activationSlots.values.map(entry => (entry.id.asString, entry.fullyQualifiedEntityName.toString)).toList)
 
   /** Gets the number of in-flight activations for a specific invoker. */
   override def activeActivationsByInvoker(invoker: String): Future[Int] = Future.successful(0)
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
index a4a0038bc..d9a38c6d2 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
@@ -59,7 +59,7 @@ trait LoadBalancer {
   def activeActivationsByController(controller: String): Future[Int]
 
   /** Gets the in-flight activations */
-  def activeActivationsByController: Future[List[ActivationId]]
+  def activeActivationsByController: Future[List[(String, String)]]
 
   /** Gets the number of in-flight activations for a specific invoker. */
   def activeActivationsByInvoker(invoker: String): Future[Int]
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
index 2c2392feb..f525f88de 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
@@ -22,7 +22,7 @@ import akka.actor.{ActorRef, ActorSystem, FSM, Props, Stash}
 import akka.grpc.internal.ClientClosedException
 import akka.pattern.pipe
 import io.grpc.StatusRuntimeException
-import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.common.{GracefulShutdown, Logging, TransactionId}
 import org.apache.openwhisk.core.connector.ActivationMessage
 import org.apache.openwhisk.core.containerpool.ContainerId
 import org.apache.openwhisk.core.entity._
@@ -124,13 +124,13 @@ class ActivationClientProxy(
       stay()
 
     case Event(e: RescheduleActivation, client: Client) =>
-      logging.info(this, s"got a reschedule message ${e.msg.activationId} for action: ${e.msg.action}")
+      logging.info(this, s"[${containerId.asString}] got a reschedule message ${e.msg.activationId} for action: ${e.msg.action}")
       client.activationClient
         .rescheduleActivation(
           RescheduleRequest(e.invocationNamespace, e.fqn.serialize, e.rev.serialize, e.msg.serialize))
         .recover {
           case t =>
-            logging.error(this, s"Failed to reschedule activation (error: $t)")
+            logging.error(this, s"[${containerId.asString}] Failed to reschedule activation (error: $t)")
             RescheduleResponse()
         }
         .foreach(res => {
@@ -139,7 +139,7 @@ class ActivationClientProxy(
       stay()
 
     case Event(msg: ActivationMessage, _: Client) =>
-      logging.debug(this, s"got a message ${msg.activationId} for action: ${msg.action}")
+      logging.debug(this, s"[${containerId.asString}] got a message ${msg.activationId} for action: ${msg.action}")
       context.parent ! msg
 
       stay()
@@ -152,7 +152,7 @@ class ActivationClientProxy(
         case _: NoMemoryQueue =>
           logging.error(
             this,
-            s"The queue of action ${action} under invocationNamespace ${invocationNamespace} does not exist. Check for queues in other schedulers.")
+            s"[${containerId.asString}] The queue of action ${action} under invocationNamespace ${invocationNamespace} does not exist. Check for queues in other schedulers.")
           c.activationClient
             .close()
             .flatMap(_ =>
@@ -162,7 +162,7 @@ class ActivationClientProxy(
           stay()
 
         case _: ActionMismatch =>
-          logging.error(this, s"action version does not match: $action")
+          logging.error(this, s"[${containerId.asString}] action version does not match: $action")
           c.activationClient.close().andThen {
             case _ => self ! ClientClosed
           }
@@ -170,7 +170,7 @@ class ActivationClientProxy(
           goto(ClientProxyRemoving)
 
         case _: NoActivationMessage => // retry
-          logging.debug(this, s"no activation message exist: $action")
+          logging.debug(this, s"[${containerId.asString}] no activation message exist: $action")
           context.parent ! RetryRequestActivation
 
           stay()
@@ -182,7 +182,7 @@ class ActivationClientProxy(
     case Event(f: FailureMessage, c: Client) =>
       f.cause match {
         case t: ParsingException =>
-          logging.error(this, s"failed to parse activation message: $t")
+          logging.error(this, s"[${containerId.asString}] failed to parse activation message: $t")
           context.parent ! RetryRequestActivation
 
           stay()
@@ -191,13 +191,13 @@ class ActivationClientProxy(
         // In such situation, it is better to stop the activationClientProxy, otherwise, in short time,
         // it would print huge log due to create another grpcClient to fetch activation again.
         case t: StatusRuntimeException if t.getMessage.contains(ActivationClientProxy.hostResolveError) =>
-          logging.error(this, s"akka grpc server connection failed: $t")
+          logging.error(this, s"[${containerId.asString}] akka grpc server connection failed: $t")
           self ! ClientClosed
 
           goto(ClientProxyRemoving)
 
         case t: StatusRuntimeException =>
-          logging.error(this, s"akka grpc server connection failed: $t")
+          logging.error(this, s"[${containerId.asString}] akka grpc server connection failed: $t")
           c.activationClient
             .close()
             .flatMap(_ =>
@@ -207,13 +207,13 @@ class ActivationClientProxy(
           stay()
 
         case _: ClientClosedException =>
-          logging.error(this, s"grpc client is already closed for $action")
+          logging.error(this, s"[${containerId.asString}] grpc client is already closed for $action")
           self ! ClientClosed
 
           goto(ClientProxyRemoving)
 
         case t: Throwable =>
-          logging.error(this, s"get activation from remote server error: $t")
+          logging.error(this, s"[${containerId.asString}] get activation from remote server error: $t")
           safelyCloseClient(c)
           goto(ClientProxyRemoving)
       }
@@ -227,22 +227,12 @@ class ActivationClientProxy(
   }
 
   when(ClientProxyRemoving) {
-    case Event(request: RequestActivation, client: Client) =>
-      request.newScheduler match {
-        // if scheduler is changed, client needs to be recreated
-        case Some(scheduler) if scheduler.host != client.rpcHost || scheduler.rpcPort != client.rpcPort =>
-          val newHost = request.newScheduler.get.host
-          val newPort = request.newScheduler.get.rpcPort
-          client.activationClient
-            .close()
-            .flatMap(_ =>
-              createActivationClient(invocationNamespace, action, newHost, newPort, tryOtherScheduler = false))
-            .pipeTo(self)
 
-        case _ =>
-          requestActivationMessage(invocationNamespace, action, rev, client.activationClient, request.lastDuration)
-            .pipeTo(self)
-      }
+    // This is the case where the last activation message is sent to the container proxy and container proxy requested
+    // another activation. But the activation client is being shut down and it no longer fetches any request.
+    case Event(_: RequestActivation, c: Client) =>
+      safelyCloseClient(c)
+
       stay()
 
     case Event(msg: ActivationMessage, _: Client) =>
@@ -250,13 +240,14 @@ class ActivationClientProxy(
 
       stay()
 
-    case Event(_: MemoryQueueError, _: Client) =>
+    case Event(_: MemoryQueueError, c: Client) =>
+      safelyCloseClient(c)
       self ! ClientClosed
 
       stay()
 
     case Event(f: FailureMessage, c: Client) =>
-      logging.error(this, s"some error happened for action: ${action} in state: $stateName, caused by: $f")
+      logging.error(this, s"[${containerId.asString}] some error happened for action: ${action} in state: $stateName, caused by: $f")
       safelyCloseClient(c)
       stay()
 
@@ -279,16 +270,24 @@ class ActivationClientProxy(
       warmed = true
       stay
 
-    case Event(CloseClientProxy, c: Client) =>
-      safelyCloseClient(c)
+    // When disabling an invoker, there could still be activations in the queue.
+    // The activation client keeps fetching data and will forward it to the container(parent).
+    // Once it receives `NoActivationMessage` from the queue, it will close the activation client and send `ClientClosed`
+    // to the container(parent), rather than sending `RetryRequestActivation`.
+    // When a container proxy(parent) receives `ClientClosed`, it will finally shut down.
+    case Event(GracefulShutdown, _: Client) =>
+      logging.info(this, s"[${containerId.asString}] safely close client proxy and go to the ClientProxyRemoving state")
+
       goto(ClientProxyRemoving)
 
     case Event(ClientClosed, _) =>
+      logging.info(this, s"[${containerId.asString}] the underlying client is closed, stopping the activation client proxy")
       context.parent ! ClientClosed
 
       stop()
 
     case Event(StopClientProxy, c: Client) =>
+      logging.info(this, s"[${containerId.asString}] stop close client proxy and go to the ClientProxyRemoving state")
       safelyCloseClient(c)
       stay()
   }
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
index 5b0c283c3..2a89630a5 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
@@ -18,14 +18,15 @@
 package org.apache.openwhisk.core.containerpool.v2
 
 import java.util.concurrent.atomic.AtomicInteger
-import akka.actor.{Actor, ActorRef, ActorRefFactory, Cancellable, Props}
 
+import akka.actor.{Actor, ActorRef, ActorRefFactory, Cancellable, Props}
 import org.apache.openwhisk.common._
 import org.apache.openwhisk.core.connector.ContainerCreationError._
 import org.apache.openwhisk.core.connector.{
   ContainerCreationAckMessage,
   ContainerCreationMessage,
   ContainerDeletionMessage,
+  GetState,
   ResultMetadata
 }
 import org.apache.openwhisk.core.containerpool.{
@@ -41,6 +42,7 @@ import org.apache.openwhisk.core.containerpool.{
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.http.Messages
+import spray.json.DefaultJsonProtocol
 
 import scala.annotation.tailrec
 import scala.collection.concurrent.TrieMap
@@ -50,6 +52,27 @@ import scala.concurrent.duration._
 import scala.util.{Random, Try}
 import scala.collection.immutable.Queue
 
+object TotalContainerPoolState extends DefaultJsonProtocol {
+  implicit val prewarmedPoolSerdes = jsonFormat2(PrewarmedContainerPoolState.apply)
+  implicit val warmPoolSerdes = jsonFormat2(WarmContainerPoolState.apply)
+  implicit val totalPoolSerdes = jsonFormat5(TotalContainerPoolState.apply)
+}
+
+case class PrewarmedContainerPoolState(total: Int, countsByKind: Map[String, Int])
+case class WarmContainerPoolState(total: Int, containers: List[BasicContainerInfo])
+case class TotalContainerPoolState(totalContainers: Int,
+                                   inProgressCount: Int,
+                                   prewarmedPool: PrewarmedContainerPoolState,
+                                   busyPool: WarmContainerPoolState,
+                                   pausedPool: WarmContainerPoolState) {
+
+  def serialize(): String = TotalContainerPoolState.totalPoolSerdes.write(this).compactPrint
+}
+
+case class NotSupportedPoolState() {
+  def serialize(): String = "not supported"
+}
+
 case class CreationContainer(creationMessage: ContainerCreationMessage, action: WhiskAction)
 case class DeletionContainer(deletionMessage: ContainerDeletionMessage)
 case object Remove
@@ -88,7 +111,7 @@ class FunctionPullingContainerPool(
 
   implicit val ec = context.system.dispatcher
 
-  protected[containerpool] var busyPool = immutable.Map.empty[ActorRef, Data]
+  protected[containerpool] var busyPool = immutable.Map.empty[ActorRef, ContainerAvailableData]
   protected[containerpool] var inProgressPool = immutable.Map.empty[ActorRef, Data]
   protected[containerpool] var warmedPool = immutable.Map.empty[ActorRef, WarmData]
   protected[containerpool] var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmData]
@@ -413,6 +436,15 @@ class FunctionPullingContainerPool(
       // Reset the prewarmCreateCount value when do expiration check and backfill prewarm if possible
       prewarmCreateFailedCount.set(0)
       adjustPrewarmedContainer(false, true)
+
+    case GetState =>
+      val totalContainers = busyPool.size + inProgressPool.size + warmedPool.size + prewarmedPool.size
+      val prewarmedState =
+        PrewarmedContainerPoolState(prewarmedPool.size, prewarmedPool.groupBy(_._2.kind).mapValues(_.size).toMap)
+      val busyState = WarmContainerPoolState(busyPool.size, busyPool.values.map(_.basicContainerInfo).toList)
+      val pausedState = WarmContainerPoolState(warmedPool.size, warmedPool.values.map(_.basicContainerInfo).toList)
+      sender() ! TotalContainerPoolState(totalContainers, inProgressPool.size, prewarmedState, busyState, pausedState)
+
   }
 
   /** Install prewarm containers up to the configured requirements for each kind/memory combination or specified kind/memory */
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
index 2049dc2d1..28d3e1e1c 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
@@ -127,16 +127,30 @@ case class PreWarmData(container: Container,
   def isExpired(): Boolean = expires.exists(_.isOverdue())
 }
 
-case class ContainerCreatedData(container: Container, invocationNamespace: String, action: ExecutableWhiskAction)
+object BasicContainerInfo extends DefaultJsonProtocol {
+  implicit val prewarmedPoolSerdes = jsonFormat4(BasicContainerInfo.apply)
+}
+
+sealed case class BasicContainerInfo(containerId: String, namespace: String, action: String, kind: String)
+
+sealed abstract class ContainerAvailableData(container: Container,
+                                             invocationNamespace: String,
+                                             action: ExecutableWhiskAction)
     extends Data(action.limits.memory.megabytes.MB) {
   override def getContainer = Some(container)
+
+  val basicContainerInfo =
+    BasicContainerInfo(container.containerId.asString, invocationNamespace, action.name.asString, action.exec.kind)
 }
 
+case class ContainerCreatedData(container: Container, invocationNamespace: String, action: ExecutableWhiskAction)
+    extends ContainerAvailableData(container, invocationNamespace, action)
+
 case class InitializedData(container: Container,
                            invocationNamespace: String,
                            action: ExecutableWhiskAction,
                            override val clientProxy: ActorRef)
-    extends Data(action.limits.memory.megabytes.MB)
+    extends ContainerAvailableData(container, invocationNamespace, action)
     with WithClient {
   override def getContainer = Some(container)
   def toReschedulingData(resumeRun: RunActivation) =
@@ -149,7 +163,7 @@ case class WarmData(container: Container,
                     revision: DocRevision,
                     lastUsed: Instant,
                     override val clientProxy: ActorRef)
-    extends Data(action.limits.memory.megabytes.MB)
+    extends ContainerAvailableData(container, invocationNamespace, action)
     with WithClient {
   override def getContainer = Some(container)
   def toReschedulingData(resumeRun: RunActivation) =
@@ -159,12 +173,10 @@ case class WarmData(container: Container,
 case class ReschedulingData(container: Container,
                             invocationNamespace: String,
                             action: ExecutableWhiskAction,
-                            override val clientProxy: ActorRef,
+                            clientProxy: ActorRef,
                             resumeRun: RunActivation)
-    extends Data(action.limits.memory.megabytes.MB)
-    with WithClient {
-  override def getContainer = Some(container)
-}
+    extends ContainerAvailableData(container, invocationNamespace, action)
+    with WithClient
 
 class FunctionPullingContainerProxy(
   factory: (TransactionId,
@@ -628,8 +640,8 @@ class FunctionPullingContainerProxy(
           Some(instance),
           Some(data.container.containerId)))
 
-      // Just send CloseClientProxy to ActivationClientProxy, make ActivationClientProxy throw ClientClosedException when fetchActivation next time.
-      data.clientProxy ! CloseClientProxy
+      // Just send GracefulShutdown to ActivationClientProxy, make ActivationClientProxy throw ClientClosedException when fetchActivation next time.
+      data.clientProxy ! GracefulShutdown
       stay
 
     case x: Event if x.event != PingCache => delay
@@ -774,7 +786,6 @@ class FunctionPullingContainerProxy(
     case Event(Remove | GracefulShutdown, _) =>
       stay()
 
-
     case Event(DetermineKeepContainer(_), _) =>
       stay()
   }
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
index 6665e6373..b0a4ad80e 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
@@ -262,7 +262,7 @@ case class HealthActivationServiceClient() extends Actor {
         case _ => // do nothing
       }
 
-    case CloseClientProxy =>
+    case GracefulShutdown =>
       closed = true
 
   }
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala
index f3503b55e..1321d8add 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala
@@ -46,11 +46,11 @@ class DefaultInvokerServer(val invoker: InvokerCore, systemUsername: String, sys
     super.routes ~ extractCredentials {
       case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
         (path("enable") & post) {
-          invoker.enable()
+          complete(invoker.enable())
         } ~ (path("disable") & post) {
-          invoker.disable()
+          complete(invoker.disable())
         } ~ (path("isEnabled") & get) {
-          invoker.isEnabled()
+          complete(invoker.isEnabled())
         }
       case _ => terminate(StatusCodes.Unauthorized)
     }
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
index 67660e8a6..dd6198a34 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
@@ -20,8 +20,8 @@ package org.apache.openwhisk.core.invoker
 import akka.Done
 import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, CoordinatedShutdown, Props}
 import akka.grpc.GrpcClientSettings
-import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.server.Route
+import akka.pattern.ask
+import akka.util.Timeout
 import com.ibm.etcd.api.Event.EventType
 import com.ibm.etcd.client.kv.KvClient.Watch
 import com.ibm.etcd.client.kv.WatchUpdate
@@ -31,7 +31,7 @@ import org.apache.openwhisk.core.connector._
 import org.apache.openwhisk.core.containerpool._
 import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
 import org.apache.openwhisk.core.containerpool.v2._
-import org.apache.openwhisk.core.database.{UserContext, _}
+import org.apache.openwhisk.core.database._
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
 import org.apache.openwhisk.core.etcd.EtcdKV.QueueKeys.queue
@@ -373,28 +373,33 @@ class FPCInvokerReactive(config: WhiskConfig,
       maxPeek,
       sendAckToScheduler))
 
-  override def enable(): Route = {
+  override def enable(): String = {
     invokerHealthManager ! Enable
     pool ! Enable
     warmUp()
-    complete("Success enable invoker")
+    s"${instance.toString} is now enabled."
   }
 
-  override def disable(): Route = {
+  override def disable(): String = {
     invokerHealthManager ! GracefulShutdown
     pool ! GracefulShutdown
     warmUpWatcher.foreach(_.close())
     warmUpWatcher = None
-    complete("Successfully disabled invoker")
+    s"${instance.toString} is now disabled."
   }
 
-  override def isEnabled(): Route = {
-    complete(InvokerEnabled(warmUpWatcher.nonEmpty).serialize())
+  override def getPoolState(): Future[Either[NotSupportedPoolState, TotalContainerPoolState]] = {
+    implicit val timeout: Timeout = 5.seconds
+    (pool ? GetState).mapTo[TotalContainerPoolState].map(Right(_))
   }
 
-  override def backfillPrewarm(): Route = {
+  override def isEnabled(): String = {
+    InvokerEnabled(warmUpWatcher.nonEmpty).serialize()
+  }
+
+  override def backfillPrewarm(): String = {
     pool ! AdjustPrewarmedContainer
-    complete("backfilling prewarm container")
+    "backfilling prewarm container"
   }
 
   private val warmUpFetchRequest = FetchRequest(
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala
index 61d194f4b..88c8b4076 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala
@@ -27,6 +27,8 @@ import org.apache.openwhisk.http.BasicRasService
 import org.apache.openwhisk.http.ErrorResponse.terminate
 import pureconfig.loadConfigOrThrow
 import spray.json.PrettyPrinter
+import spray.json.DefaultJsonProtocol._
+import spray.json._
 
 import scala.concurrent.ExecutionContext
 
@@ -46,11 +48,31 @@ class FPCInvokerServer(val invoker: InvokerCore, systemUsername: String, systemP
     super.routes ~ extractCredentials {
       case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
         (path("enable") & post) {
-          invoker.enable()
+          complete(invoker.enable())
         } ~ (path("disable") & post) {
-          invoker.disable()
+          complete(invoker.disable())
         } ~ (path("isEnabled") & get) {
-          invoker.isEnabled()
+          complete(invoker.isEnabled())
+        } ~ (pathPrefix("pool") & get) {
+          pathEndOrSingleSlash {
+            complete {
+              invoker.getPoolState().map {
+                case Right(poolState) =>
+                  poolState.serialize()
+                case Left(value) =>
+                  value.serialize()
+              }
+            }
+          } ~ (path("count") & get) {
+            complete {
+              invoker.getPoolState().map {
+                case Right(poolState) =>
+                  (poolState.busyPool.total + poolState.pausedPool.total + poolState.inProgressCount).toJson.compactPrint
+                case Left(value) =>
+                  value.serialize()
+              }
+            }
+          }
         }
       case _ => terminate(StatusCodes.Unauthorized)
     }
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
index 997825865..8319c511b 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
@@ -19,13 +19,13 @@ package org.apache.openwhisk.core.invoker
 
 import akka.Done
 import akka.actor.{ActorSystem, CoordinatedShutdown}
-import akka.http.scaladsl.server.Route
 import com.typesafe.config.ConfigValueFactory
 import kamon.Kamon
 import org.apache.openwhisk.common.Https.HttpsConfig
 import org.apache.openwhisk.common._
 import org.apache.openwhisk.core.WhiskConfig._
 import org.apache.openwhisk.core.connector.{MessageProducer, MessagingProvider}
+import org.apache.openwhisk.core.containerpool.v2.{NotSupportedPoolState, TotalContainerPoolState}
 import org.apache.openwhisk.core.containerpool.{Container, ContainerPoolConfig}
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.entity.size._
@@ -246,10 +246,11 @@ trait InvokerProvider extends Spi {
 
 // this trait can be used to add common implementation
 trait InvokerCore {
-  def enable(): Route
-  def disable(): Route
-  def isEnabled(): Route
-  def backfillPrewarm(): Route
+  def enable(): String
+  def disable(): String
+  def isEnabled(): String
+  def backfillPrewarm(): String
+  def getPoolState(): Future[Either[NotSupportedPoolState, TotalContainerPoolState]]
 }
 
 /**
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
index 31d1f2293..d7aae4a3c 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
@@ -19,20 +19,19 @@ package org.apache.openwhisk.core.invoker
 
 import java.nio.charset.StandardCharsets
 import java.time.Instant
+
 import akka.Done
 import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, CoordinatedShutdown, Props}
 import akka.event.Logging.InfoLevel
-import akka.http.scaladsl.server.Directives.complete
-import akka.http.scaladsl.server.Route
 import org.apache.openwhisk.common._
 import org.apache.openwhisk.common.tracing.WhiskTracerProvider
 import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
 import org.apache.openwhisk.core.connector._
 import org.apache.openwhisk.core.containerpool._
 import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
-import org.apache.openwhisk.core.database.{UserContext, _}
+import org.apache.openwhisk.core.containerpool.v2.{NotSupportedPoolState, TotalContainerPoolState}
+import org.apache.openwhisk.core.database._
 import org.apache.openwhisk.core.entity._
-import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
 import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
 import org.apache.openwhisk.http.Messages
@@ -304,32 +303,36 @@ class InvokerReactive(
 
   private var healthScheduler: Option[ActorRef] = Some(getHealthScheduler)
 
-  override def enable(): Route = {
+  override def enable(): String = {
     if (healthScheduler.isEmpty) {
       healthScheduler = Some(getHealthScheduler)
-      complete(s"${instance.toString} is now enabled.")
+      s"${instance.toString} is now enabled."
     } else {
-      complete(s"${instance.toString} is already enabled.")
+      s"${instance.toString} is already enabled."
     }
   }
 
-  override def disable(): Route = {
+  override def disable(): String = {
     pingController(isEnabled = false)
     if (healthScheduler.nonEmpty) {
       actorSystem.stop(healthScheduler.get)
       healthScheduler = None
-      complete(s"${instance.toString} is now disabled.")
+      s"${instance.toString} is now disabled."
     } else {
-      complete(s"${instance.toString} is already disabled.")
+      s"${instance.toString} is already disabled."
     }
   }
 
-  override def isEnabled(): Route = {
-    complete(InvokerEnabled(healthScheduler.nonEmpty).serialize())
+  override def isEnabled(): String = {
+    InvokerEnabled(healthScheduler.nonEmpty).serialize()
+  }
+
+  override def backfillPrewarm(): String = {
+    "not supported"
   }
 
-  override def backfillPrewarm(): Route = {
-    complete("not supported")
+  override def getPoolState(): Future[Either[NotSupportedPoolState, TotalContainerPoolState]] = {
+    Future.successful(Left(NotSupportedPoolState()))
   }
 
 }
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala
index 4fd2f9b69..aec923508 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala
@@ -49,23 +49,29 @@ class FPCSchedulerServer(scheduler: SchedulerCore, systemUsername: String, syste
           complete {
             scheduler.getState.map {
               case (list, creationCount) =>
-                (list
-                  .map(scheduler => scheduler._1.asString -> scheduler._2.toString)
-                  .toMap
-                  ++ Map("creationCount" -> creationCount.toString)).toJson.asJsObject
+                val sum = list.map(tuple => tuple._2).sum
+                (Map("queue" -> sum.toString) ++ Map("creationCount" -> creationCount.toString)).toJson
             }
           }
         } ~ (path("disable") & post) {
           logger.warn(this, "Scheduler is disabled")
           scheduler.disable()
           complete("scheduler disabled")
-        } ~ (path(FPCSchedulerServer.queuePathPrefix / "total") & get) {
-          complete {
-            scheduler.getQueueSize.map(_.toString)
+        } ~ (pathPrefix(FPCSchedulerServer.queuePathPrefix) & get) {
+          pathEndOrSingleSlash {
+            complete(scheduler.getQueueStatusData.map(s => s.toJson))
+          } ~ (path("count") & get) {
+            complete(scheduler.getQueueSize.map(s => s.toJson))
           }
-        } ~ (path(FPCSchedulerServer.queuePathPrefix / "status") & get) {
-          complete {
-            scheduler.getQueueStatusData.map(s => s.toJson)
+        } ~ (path("activation" / "count") & get) {
+          pathEndOrSingleSlash {
+            complete(
+              scheduler.getQueueStatusData
+                .map { s =>
+                  s.map(_.waitingActivation.size)
+                }
+                .map(a => a.sum)
+                .map(_.toJson))
           }
         }
       case _ =>
@@ -79,7 +85,7 @@ object FPCSchedulerServer {
 
   private val schedulerUsername = loadConfigOrThrow[String](ConfigKeys.whiskSchedulerUsername)
   private val schedulerPassword = loadConfigOrThrow[String](ConfigKeys.whiskSchedulerPassword)
-  private val queuePathPrefix = "queue"
+  private val queuePathPrefix = "queues"
 
   def instance(scheduler: SchedulerCore)(implicit ec: ExecutionContext,
                                          actorSystem: ActorSystem,
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
index 2038fc1c3..e3ed70d8a 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
@@ -127,11 +127,11 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
   }
 
   override def getQueueSize: Future[Int] = {
-    queueManager.ask(QueueSize)(Timeout(5.seconds)).mapTo[Int]
+    queueManager.ask(QueueSize)(Timeout(1.minute)).mapTo[Int]
   }
 
   override def getQueueStatusData: Future[List[StatusData]] = {
-    queueManager.ask(StatusQuery)(Timeout(5.seconds)).mapTo[Future[List[StatusData]]].flatten
+    queueManager.ask(GetState)(Timeout(1.minute)).mapTo[Future[List[StatusData]]].flatten
   }
 
   override def disable(): Unit = {
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala
index acf311e9f..8a3db898a 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala
@@ -72,6 +72,8 @@ class ActivationServiceImpl()(implicit actorSystem: ActorSystem, logging: Loggin
         QueuePool.get(MemoryQueueKey(request.invocationNamespace, key)) match {
           case Some(queueValue) =>
             implicit val transid = TransactionId.serdes.read(request.transactionId.parseJson)
+            if (!request.alive) logging.info(this, s"the container(${request.containerId}) is not alive")
+
             (queueValue.queue ? GetActivation(
               transid,
               fqn,
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
index 7f174f9aa..28ab22210 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
@@ -46,6 +46,7 @@ import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcu
 import pureconfig.loadConfigOrThrow
 import spray.json._
 import pureconfig.generic.auto._
+import scala.collection.JavaConverters._
 
 import java.time.{Duration, Instant}
 import java.util.concurrent.atomic.AtomicInteger
@@ -69,17 +70,30 @@ case object NamespaceThrottled extends MemoryQueueState
 
 // Data
 sealed abstract class MemoryQueueData()
-case class NoData() extends MemoryQueueData()
-case class NoActors() extends MemoryQueueData()
-case class RunningData(schedulerActor: ActorRef, droppingActor: ActorRef) extends MemoryQueueData()
-case class ThrottledData(schedulerActor: ActorRef, droppingActor: ActorRef) extends MemoryQueueData()
+case class NoData() extends MemoryQueueData() {
+  override def toString = "NoData"
+}
+case class NoActors() extends MemoryQueueData() {
+  override def toString = "NoActors"
+}
+case class RunningData(schedulerActor: ActorRef, droppingActor: ActorRef) extends MemoryQueueData() {
+  override def toString = "RunningData"
+}
+case class ThrottledData(schedulerActor: ActorRef, droppingActor: ActorRef) extends MemoryQueueData() {
+  override def toString = "ThrottledData"
+}
 case class FlushingData(schedulerActor: ActorRef,
                         droppingActor: ActorRef,
                         error: ContainerCreationError,
                         reason: String,
                         activeDuringFlush: Boolean = false)
-    extends MemoryQueueData()
-case class RemovingData(schedulerActor: ActorRef, droppingActor: ActorRef, outdated: Boolean) extends MemoryQueueData()
+    extends MemoryQueueData() {
+  override def toString = s"ThrottledData(error: $error, reason: $reason, activeDuringFlush: $activeDuringFlush)"
+}
+case class RemovingData(schedulerActor: ActorRef, droppingActor: ActorRef, outdated: Boolean)
+    extends MemoryQueueData() {
+  override def toString = s"RemovingData(outdated: $outdated)"
+}
 
 // Events sent by the actor
 case class QueueRemoved(invocationNamespace: String, action: DocInfo, leaderKey: Option[String])
@@ -154,8 +168,8 @@ class MemoryQueue(private val etcdClient: EtcdClient,
   private val staleQueueRemovedMsg = QueueRemoved(invocationNamespace, action.toDocId.asDocInfo(revision), None)
   private val actionRetentionTimeout = MemoryQueue.getRetentionTimeout(actionMetaData, queueConfig)
 
-  private[queue] var containers = Set.empty[String]
-  private[queue] var creationIds = Set.empty[String]
+  private[queue] var containers = java.util.concurrent.ConcurrentHashMap.newKeySet[String]().asScala
+  private[queue] var creationIds = java.util.concurrent.ConcurrentHashMap.newKeySet[String]().asScala
 
   private[queue] var queue = Queue.empty[TimeSeriesActivationEntry]
   private[queue] var in = new AtomicInteger(0)
@@ -565,8 +579,13 @@ class MemoryQueue(private val etcdClient: EtcdClient,
       stay
 
     // common case for all statuses
-    case Event(StatusQuery, _) =>
-      sender ! StatusData(invocationNamespace, action.asString, queue.size, stateName.toString, stateData.toString)
+    case Event(GetState, _) =>
+      sender ! StatusData(
+        invocationNamespace,
+        action.asString,
+        queue.toList.map(_.msg.activationId),
+        stateName.toString,
+        stateData.toString)
       stay
 
     // Common case for all cases
@@ -575,9 +594,6 @@ class MemoryQueue(private val etcdClient: EtcdClient,
       // delete relative data, e.g leaderKey, namespaceThrottlingKey, actionThrottlingKey
       cleanUpData()
 
-      // let queue manager knows this queue is going to stop and let it forward incoming activations to a new queue
-      context.parent ! queueRemovedMsg
-
       goto(Removing) using getRemovingData(data, outdated = false)
 
     // the version is updated. it's a shared case for all states
@@ -662,7 +678,6 @@ class MemoryQueue(private val etcdClient: EtcdClient,
   private def cleanUpDataAndGotoRemoved() = {
     cleanUpWatcher()
     cleanUpData()
-    context.parent ! queueRemovedMsg
 
     goto(Removed) using NoData()
   }
@@ -671,8 +686,6 @@ class MemoryQueue(private val etcdClient: EtcdClient,
     cleanUpActors(data)
     cleanUpData()
 
-    context.parent ! queueRemovedMsg
-
     goto(Removed) using NoData()
   }
 
@@ -688,7 +701,6 @@ class MemoryQueue(private val etcdClient: EtcdClient,
         // let the container manager know this version of containers are outdated.
         containerManager ! ContainerDeletion(invocationNamespace, action, revision, actionMetaData)
       }
-      self ! QueueRemovedCompleted
 
       goto(Removed) using NoData()
     } else {
@@ -1017,7 +1029,8 @@ class MemoryQueue(private val etcdClient: EtcdClient,
       queue = newQueue
       logging.info(
         this,
-        s"[$invocationNamespace:$action:$stateName] Get activation request ${request.containerId}, send one message: ${msg.activationId}")
+        s"[$invocationNamespace:$action:$stateName] Get activation request ${request.containerId}, send one message: ${msg.activationId}")(
+        msg.transid)
       val totalTimeInScheduler = Interval(msg.transid.meta.start, Instant.now()).duration
       MetricEmitter.emitHistogramMetric(
         LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString),
@@ -1051,7 +1064,8 @@ class MemoryQueue(private val etcdClient: EtcdClient,
           case Right(msg) =>
             logging.info(
               this,
-              s"[$invocationNamespace:$action:$stateName] Send msg ${msg.activationId} to waiting request ${request.containerId}")
+              s"[$invocationNamespace:$action:$stateName] Send msg ${msg.activationId} to waiting request ${request.containerId}")(
+              msg.transid)
             cancelPoll.cancel()
           case Left(_) => // do nothing
         }
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
index d87338dd3..ad7b17103 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
@@ -261,11 +261,11 @@ class QueueManager(
     case QueueSize =>
       sender ! QueuePool.size
 
-    case StatusQuery =>
-      val poolStatus = Future.sequence {
-        QueuePool.values.map(_.queue.ask(StatusQuery).mapTo[StatusData])
-      }
-      sender ! poolStatus
+    case GetState =>
+      val result =
+        Future.sequence(QueuePool.values.map(_.queue.ask(GetState)(Timeout(5.seconds)).mapTo[StatusData]).toList)
+
+      sender ! result
 
     case msg =>
       logging.error(this, s"failed to elect a leader for ${msg}")
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala
index 77cba01e7..9d33a508b 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala
@@ -24,7 +24,7 @@ import akka.grpc.internal.ClientClosedException
 import akka.testkit.{ImplicitSender, TestKit, TestProbe}
 import common.StreamLogging
 import io.grpc.StatusRuntimeException
-import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.common.{GracefulShutdown, TransactionId}
 import org.apache.openwhisk.core.connector.ActivationMessage
 import org.apache.openwhisk.core.containerpool.ContainerId
 import org.apache.openwhisk.core.containerpool.v2._
@@ -37,9 +37,9 @@ import org.apache.openwhisk.grpc
 import org.apache.openwhisk.grpc.{ActivationServiceClient, FetchRequest, RescheduleRequest, RescheduleResponse}
 import org.junit.runner.RunWith
 import org.scalamock.scalatest.MockFactory
+import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
-import org.scalatest.concurrent.ScalaFutures
 
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.Future
@@ -349,7 +349,7 @@ class ActivationClientProxyTests
     probe expectTerminated machine
   }
 
-  it should "be closed when it receives a CloseClientProxy message for a normal timeout case" in within(timeout) {
+  it should "be closed when it receives a GracefulShutdown message for a normal timeout case" in within(timeout) {
     val fetch = (_: FetchRequest) => Future(grpc.FetchResponse(AResponse(Right(message)).serialize))
     val activationClient = MockActivationServiceClient(fetch)
     val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int, _: Boolean) => Future(activationClient)
@@ -362,14 +362,14 @@ class ActivationClientProxyTests
     registerCallback(machine, probe)
     ready(machine, probe)
 
-    machine ! CloseClientProxy
-    awaitAssert(activationClient.isClosed shouldBe true)
+    machine ! GracefulShutdown
 
     probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))
 
     machine ! RequestActivation()
 
     probe expectMsg ClientClosed
+    awaitAssert(activationClient.isClosed shouldBe true)
     probe expectTerminated machine
   }
 
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
index 14cf432b5..e1da77e63 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
@@ -33,11 +33,11 @@ import org.apache.openwhisk.core.connector.{
   MessageProducer,
   ResultMetadata
 }
-import org.apache.openwhisk.core.containerpool.docker.DockerContainer
 import org.apache.openwhisk.core.containerpool.v2._
 import org.apache.openwhisk.core.containerpool.{
   Container,
   ContainerAddress,
+  ContainerId,
   ContainerPoolConfig,
   ContainerRemoved,
   PrewarmContainerCreationConfig,
@@ -137,9 +137,15 @@ class FunctionPullingContainerPoolTests
   private val schedulerInstanceId = SchedulerInstanceId("0")
   private val producer = stub[MessageProducer]
   private val prewarmedData = PreWarmData(mock[MockableV2Container], actionKind, memoryLimit)
+  private val mockContainer = mock[MockableV2Container]
+  (mockContainer.containerId _: () => ContainerId)
+    .expects()
+    .returning(ContainerId("test-container-id"))
+    .anyNumberOfTimes()
+
   private val initializedData =
     InitializedData(
-      mock[MockableV2Container],
+      mockContainer,
       invocationNamespace.asString,
       whiskAction.toExecutableWhiskAction.get,
       TestProbe().ref)
@@ -315,6 +321,11 @@ class FunctionPullingContainerPoolTests
   private def retry[T](fn: => T) = org.apache.openwhisk.utils.retry(fn, 10, Some(1.second))
 
   it should "stop containers gradually when shut down" in within(timeout * 20) {
+    (mockContainer.containerId _: () => ContainerId)
+      .expects()
+      .returning(ContainerId("test-container-id"))
+      .anyNumberOfTimes()
+
     val (containers, factory) = testContainers(10)
     val disablingContainers = ListBuffer[ActorRef]()
 
@@ -356,7 +367,7 @@ class FunctionPullingContainerPoolTests
           pool,
           ContainerIsPaused(
             WarmData(
-              stub[DockerContainer],
+              mockContainer,
               invocationNamespace.asString,
               whiskAction.toExecutableWhiskAction.get,
               doc.rev,
@@ -636,6 +647,10 @@ class FunctionPullingContainerPoolTests
   }
 
   it should "use a warmed container when invocationNamespace, action and revision matched" in within(timeout) {
+    (mockContainer.containerId _: () => ContainerId)
+      .expects()
+      .returning(ContainerId("test-container-id"))
+      .anyNumberOfTimes()
     val (containers, factory) = testContainers(3)
     val doc = put(entityStore, whiskAction)
 
@@ -654,7 +669,7 @@ class FunctionPullingContainerPoolTests
     pool.tell(
       ContainerIsPaused(
         WarmData(
-          stub[DockerContainer],
+          mockContainer,
           invocationNamespace.asString,
           whiskAction.toExecutableWhiskAction.get,
           doc.rev,
@@ -687,6 +702,10 @@ class FunctionPullingContainerPoolTests
   }
 
   it should "retry when chosen warmed container is failed to resume" in within(timeout) {
+    (mockContainer.containerId _: () => ContainerId)
+      .expects()
+      .returning(ContainerId("test-container-id"))
+      .anyNumberOfTimes()
 
     val (containers, factory) = testContainers(2)
     val doc = put(entityStore, whiskAction)
@@ -706,7 +725,7 @@ class FunctionPullingContainerPoolTests
     pool.tell(
       ContainerIsPaused(
         WarmData(
-          stub[DockerContainer],
+          mockContainer,
           invocationNamespace.asString,
           whiskAction.toExecutableWhiskAction.get,
           doc.rev,
@@ -724,7 +743,7 @@ class FunctionPullingContainerPoolTests
     pool.tell(
       ResumeFailed(
         WarmData(
-          stub[DockerContainer],
+          mockContainer,
           invocationNamespace.asString,
           whiskAction.toExecutableWhiskAction.get,
           doc.rev,
@@ -739,6 +758,10 @@ class FunctionPullingContainerPoolTests
   }
 
   it should "remove oldest previously used container to make space for the job passed to run" in within(timeout) {
+    (mockContainer.containerId _: () => ContainerId)
+      .expects()
+      .returning(ContainerId("test-container-id"))
+      .anyNumberOfTimes()
     val (containers, factory) = testContainers(2)
     val doc = put(entityStore, whiskAction)
 
@@ -757,7 +780,7 @@ class FunctionPullingContainerPoolTests
     pool.tell(
       ContainerIsPaused(
         WarmData(
-          stub[DockerContainer],
+          mockContainer,
           invocationNamespace.asString,
           whiskAction.toExecutableWhiskAction.get,
           doc.rev,
@@ -769,7 +792,7 @@ class FunctionPullingContainerPoolTests
     pool.tell(
       ContainerIsPaused(
         WarmData(
-          stub[DockerContainer],
+          mockContainer,
           invocationNamespace.asString,
           whiskAction.toExecutableWhiskAction.get,
           doc.rev,
@@ -781,7 +804,7 @@ class FunctionPullingContainerPoolTests
     pool.tell(
       ContainerIsPaused(
         WarmData(
-          stub[DockerContainer],
+          mockContainer,
           invocationNamespace.asString,
           whiskAction.toExecutableWhiskAction.get,
           doc.rev,
@@ -854,6 +877,10 @@ class FunctionPullingContainerPoolTests
   }
 
   it should "send ack(success) to scheduler when chosen warmed container is resumed" in within(timeout) {
+    (mockContainer.containerId _: () => ContainerId)
+      .expects()
+      .returning(ContainerId("test-container-id"))
+      .anyNumberOfTimes()
     val (containers, factory) = testContainers(1)
     val doc = put(entityStore, whiskAction)
     // Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled.
@@ -878,7 +905,7 @@ class FunctionPullingContainerPoolTests
     pool.tell(
       ContainerIsPaused(
         WarmData(
-          stub[DockerContainer],
+          mockContainer,
           invocationNamespace.asString,
           whiskAction.toExecutableWhiskAction.get,
           doc.rev,
@@ -894,7 +921,7 @@ class FunctionPullingContainerPoolTests
     pool.tell(
       Resumed(
         WarmData(
-          stub[DockerContainer],
+          mockContainer,
           invocationNamespace.asString,
           whiskAction.toExecutableWhiskAction.get,
           doc.rev,
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
index 2355356bf..04516d4a8 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
@@ -44,9 +44,9 @@ import org.apache.openwhisk.core.containerpool.{
 }
 import org.apache.openwhisk.core.database.{ArtifactStore, StaleParameter, UserContext}
 import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
+import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.core.entity.types.AuthStore
-import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.etcd.EtcdClient
 import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys
 import org.apache.openwhisk.core.etcd.EtcdType._
@@ -1607,7 +1607,7 @@ class FunctionPullingContainerProxyTests
       UnregisterData(ContainerKeys
         .existingContainers(invocationNamespace.asString, fqn, action.rev, Some(instanceId), Some(containerId))))
 
-    client.expectMsg(CloseClientProxy)
+    client.expectMsg(GracefulShutdown)
     client.send(machine, ClientClosed)
 
     probe.expectMsgAllOf(ContainerRemoved(false), Transition(machine, Running, Removing))
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
index 13f957211..a9a72f460 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
@@ -320,7 +320,7 @@ class DegenerateLoadBalancerService(config: WhiskConfig)(implicit ec: ExecutionC
   override def totalActiveActivations = Future.successful(0)
   override def activeActivationsFor(namespace: UUID) = Future.successful(0)
   override def activeActivationsByController(controller: String): Future[Int] = Future.successful(0)
-  override def activeActivationsByController: Future[List[ActivationId]] = Future.successful(List(ActivationId("id")))
+  override def activeActivationsByController: Future[List[(String, String)]] = Future.successful(List(("", "")))
   override def activeActivationsByInvoker(invoker: String): Future[Int] = Future.successful(0)
 
   override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
index 76f1410a7..ea5e73b50 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
@@ -24,6 +24,7 @@ import akka.http.scaladsl.testkit.ScalatestRouteTest
 import akka.http.scaladsl.unmarshalling.Unmarshal
 import common.StreamLogging
 import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.containerpool.v2.{NotSupportedPoolState, TotalContainerPoolState}
 import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
 import org.apache.openwhisk.core.invoker.{DefaultInvokerServer, InvokerCore}
 import org.apache.openwhisk.http.BasicHttpService
@@ -32,6 +33,8 @@ import org.scalamock.scalatest.MockFactory
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec, Matchers}
 import org.scalatest.junit.JUnitRunner
 
+import scala.concurrent.Future
+
 /**
  * Tests InvokerServer API.
  */
@@ -135,22 +138,27 @@ class TestInvokerReactive extends InvokerCore with BasicHttpService {
   var enableCount = 0
   var disableCount = 0
 
-  override def enable(): Route = {
+  override def enable(): String = {
     enableCount += 1
-    complete("")
+    s""
   }
 
-  override def disable(): Route = {
+  override def disable(): String = {
     disableCount += 1
-    complete("")
+    s""
   }
 
-  override def isEnabled(): Route = {
+  override def isEnabled(): String = {
     complete(InvokerEnabled(true).serialize())
+    s""
+  }
+
+  override def backfillPrewarm(): String = {
+    ""
   }
 
-  override def backfillPrewarm(): Route = {
-    complete("")
+  override def getPoolState(): Future[Either[NotSupportedPoolState, TotalContainerPoolState]] = {
+    Future.successful(Left(NotSupportedPoolState()))
   }
 
   def reset(): Unit = {
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
index 2393b8775..cf298fb5f 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
@@ -24,6 +24,7 @@ import akka.http.scaladsl.testkit.ScalatestRouteTest
 import akka.http.scaladsl.unmarshalling.Unmarshal
 import common.StreamLogging
 import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.containerpool.v2.{NotSupportedPoolState, TotalContainerPoolState}
 import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
 import org.apache.openwhisk.core.invoker.{FPCInvokerServer, InvokerCore}
 import org.apache.openwhisk.http.BasicHttpService
@@ -32,6 +33,8 @@ import org.scalamock.scalatest.MockFactory
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec, Matchers}
 import org.scalatest.junit.JUnitRunner
 
+import scala.concurrent.Future
+
 /**
  * Tests InvokerServerV2 API.
  */
@@ -134,22 +137,27 @@ class TestFPCInvokerReactive extends InvokerCore with BasicHttpService {
   var enableCount = 0
   var disableCount = 0
 
-  override def enable(): Route = {
+  override def enable(): String = {
     enableCount += 1
-    complete("")
+    ""
   }
 
-  override def disable(): Route = {
+  override def disable(): String = {
     disableCount += 1
-    complete("")
+    ""
   }
 
-  override def isEnabled(): Route = {
+  override def isEnabled(): String = {
     complete(InvokerEnabled(true).serialize())
+    ""
+  }
+
+  override def backfillPrewarm(): String = {
+    ""
   }
 
-  override def backfillPrewarm(): Route = {
-    complete("")
+  override def getPoolState(): Future[Either[NotSupportedPoolState, TotalContainerPoolState]] = {
+    Future.successful(Left(NotSupportedPoolState()))
   }
 
   def reset(): Unit = {
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala
index 0dab4f4ce..26dc33869 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala
@@ -25,7 +25,7 @@ import akka.http.scaladsl.testkit.ScalatestRouteTest
 import common.StreamLogging
 import org.apache.openwhisk.common.TransactionId
 import org.apache.openwhisk.core.connector.StatusData
-import org.apache.openwhisk.core.entity.SchedulerInstanceId
+import org.apache.openwhisk.core.entity.{ActivationId, SchedulerInstanceId}
 import org.junit.runner.RunWith
 import org.scalamock.scalatest.MockFactory
 import org.scalatest.junit.JUnitRunner
@@ -56,9 +56,10 @@ class FPCSchedulerServerTests
   val queues = List((SchedulerInstanceId("0"), 2), (SchedulerInstanceId("1"), 3))
   val creationCount = 1
   val testQueueSize = 2
+  val activationIds = (1 to 10).map(_ => ActivationId.generate()).toList
   val statusDatas = List(
-    StatusData("testns1", "testaction1", 10, "Running", "RunningData"),
-    StatusData("testns2", "testaction2", 5, "Running", "RunningData"))
+    StatusData("testns1", "testaction1", activationIds, "Running", "RunningData"),
+    StatusData("testns2", "testaction2", activationIds.take(5), "Running", "RunningData"))
 
   // Create scheduler
   val scheduler = new TestScheduler(queues, creationCount, testQueueSize, statusDatas)
@@ -85,8 +86,8 @@ class FPCSchedulerServerTests
     val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
     Get(s"/state") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
       status should be(OK)
-      responseAs[JsObject] shouldBe (queues.map(s => s._1.asString -> s._2.toString).toMap ++ Map(
-        "creationCount" -> creationCount.toString)).toJson
+      responseAs[JsObject] shouldBe (Map("creationCount" -> creationCount.toString) ++ Map(
+        "queue" -> queues.map(_._2).sum.toString)).toJson
     }
   }
 
@@ -94,7 +95,7 @@ class FPCSchedulerServerTests
   it should "get total queue" in {
     implicit val tid = transid()
     val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
-    Get(s"/queue/total") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+    Get(s"/queues/count") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
       status should be(OK)
       responseAs[String] shouldBe testQueueSize.toString
     }
@@ -104,7 +105,7 @@ class FPCSchedulerServerTests
   it should "get all queue status" in {
     implicit val tid = transid()
     val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
-    Get(s"/queue/status") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+    Get(s"/queues") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
       status should be(OK)
       responseAs[List[JsObject]] shouldBe statusDatas.map(_.toJson)
     }
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
index 1cb2a5cda..a3cd73e1a 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
@@ -47,6 +47,7 @@ import spray.json.{JsObject, JsString}
 
 import java.time.Instant
 import scala.collection.immutable.Queue
+import scala.collection.mutable
 import scala.concurrent.Future
 import scala.concurrent.duration.{DurationInt, FiniteDuration, MILLISECONDS}
 import scala.language.postfixOps
@@ -172,9 +173,12 @@ class MemoryQueueFlowTests
 
     expectDataCleanUp(watcher, dataMgmtService)
 
-    parent.expectMsg(queueRemovedMsg)
     probe.expectMsg(Transition(fsm, Idle, Removed))
 
+    // the queue is timed out again in the removed state
+    fsm ! StateTimeout
+
+    parent.expectMsg(queueRemovedMsg)
     fsm ! QueueRemovedCompleted
 
     probe.expectTerminated(fsm, 10.seconds)
@@ -284,9 +288,11 @@ class MemoryQueueFlowTests
 
     fsm ! StateTimeout
 
-    parent.expectMsg(queueRemovedMsg)
     probe.expectMsg(Transition(fsm, Idle, Removed))
 
+    // the queue is timed out again in the removed state
+    fsm ! StateTimeout
+
     fsm ! QueueRemovedCompleted
 
     expectDataCleanUp(watcher, dataMgmtService)
@@ -369,15 +375,19 @@ class MemoryQueueFlowTests
 
     fsm ! GracefulShutdown
 
-    parent.expectMsg(queueRemovedMsg)
     probe.expectMsg(Transition(fsm, NamespaceThrottled, Removing))
 
-    fsm ! QueueRemovedCompleted
+    // the queue is timed out in the Removing state
+    fsm ! StateTimeout
 
     expectDataCleanUp(watcher, dataMgmtService)
 
     probe.expectMsg(Transition(fsm, Removing, Removed))
 
+    // the queue is timed out again in the Removed state
+    fsm ! StateTimeout
+    fsm ! QueueRemovedCompleted
+
     probe.expectTerminated(fsm, 10.seconds)
   }
 
@@ -494,9 +504,12 @@ class MemoryQueueFlowTests
 
     fsm ! StateTimeout
 
-    parent.expectMsg(queueRemovedMsg)
     probe.expectMsg(Transition(fsm, Idle, Removed))
 
+    // the queue is timed out again in the Removed state
+    fsm ! StateTimeout
+
+    parent.expectMsg(queueRemovedMsg)
     fsm ! QueueRemovedCompleted
 
     expectDataCleanUp(watcher, dataMgmtService)
@@ -644,9 +657,12 @@ class MemoryQueueFlowTests
     probe.expectMsg(Transition(fsm, Running, Idle))
 
     fsm ! StateTimeout
-    parent.expectMsg(queueRemovedMsg)
     probe.expectMsg(Transition(fsm, Idle, Removed))
 
+    // the queue is timed out again in the Removed state
+    fsm ! StateTimeout
+    parent.expectMsg(queueRemovedMsg)
+
     fsm ! QueueRemovedCompleted
     expectDataCleanUp(watcher, dataMgmtService)
 
@@ -756,9 +772,11 @@ class MemoryQueueFlowTests
 
     fsm ! StateTimeout
 
-    parent.expectMsg(queueRemovedMsg)
     probe.expectMsg(Transition(fsm, Idle, Removed))
 
+    // the queue is timed out again in the Removed state
+    fsm ! StateTimeout
+    parent.expectMsg(queueRemovedMsg)
     fsm ! QueueRemovedCompleted
 
     expectDataCleanUp(watcher, dataMgmtService)
@@ -838,6 +856,8 @@ class MemoryQueueFlowTests
     expectDataCleanUp(watcher, dataMgmtService)
     probe.expectMsg(Transition(fsm, Flushing, Removed))
 
+    // the queue is timed out again in the Removed state
+    fsm ! StateTimeout
     parent.expectMsg(queueRemovedMsg)
     fsm ! QueueRemovedCompleted
 
@@ -935,9 +955,11 @@ class MemoryQueueFlowTests
     clock.plusSeconds(flushGrace.toSeconds * 2)
     fsm ! StateTimeout
 
-    parent.expectMsg(queueRemovedMsg)
     probe.expectMsg(Transition(fsm, Flushing, Removed))
 
+    // the queue is timed out again in the Removed state
+    fsm ! StateTimeout
+    parent.expectMsg(queueRemovedMsg)
     fsm ! QueueRemovedCompleted
 
     expectDataCleanUp(watcher, dataMgmtService)
@@ -1032,15 +1054,17 @@ class MemoryQueueFlowTests
     container.send(fsm, getActivation(false))
     container.expectMsg(ActivationResponse(Left(NoActivationMessage())))
 
-    fsm.underlyingActor.creationIds = Set.empty[String]
+    fsm.underlyingActor.creationIds = mutable.Set.empty[String]
     fsm ! StateTimeout
     probe.expectMsg(Transition(fsm, Running, Idle))
 
     fsm ! StateTimeout
 
-    parent.expectMsg(queueRemovedMsg)
     probe.expectMsg(Transition(fsm, Idle, Removed))
 
+    // the queue is timed out again in the Removed state
+    fsm ! StateTimeout
+    parent.expectMsg(queueRemovedMsg)
     fsm ! QueueRemovedCompleted
 
     expectDataCleanUp(watcher, dataMgmtService)
@@ -1147,9 +1171,11 @@ class MemoryQueueFlowTests
     fsm ! StateTimeout
 
     expectDataCleanUp(watcher, dataMgmtService)
-    parent.expectMsg(queueRemovedMsg)
     probe.expectMsg(Transition(fsm, Flushing, Removed))
 
+    // the queue is timed out again in the Removed state
+    fsm ! StateTimeout
+    parent.expectMsg(queueRemovedMsg)
     fsm ! QueueRemovedCompleted
 
     probe.expectTerminated(fsm, 10.seconds)
@@ -1229,8 +1255,6 @@ class MemoryQueueFlowTests
       UnregisterData(namespaceThrottlingKey),
       UnregisterData(actionThrottlingKey))
 
-    parent.expectMsg(queueRemovedMsg)
-
     probe.expectMsg(Transition(fsm, Running, Removing))
 
     // a newly arrived message should be properly handled
@@ -1240,8 +1264,6 @@ class MemoryQueueFlowTests
 
     fsm ! messages(2)
 
-    fsm ! QueueRemovedCompleted
-
     // if there is a message, it should not terminate
     fsm ! StateTimeout
 
@@ -1259,6 +1281,11 @@ class MemoryQueueFlowTests
 
     probe.expectMsg(Transition(fsm, Removing, Removed))
 
+    // the queue is timed out again in the Removed state
+    fsm ! StateTimeout
+    parent.expectMsg(queueRemovedMsg)
+    fsm ! QueueRemovedCompleted
+
     probe.expectTerminated(fsm, 10.seconds)
   }
 
@@ -1328,7 +1355,6 @@ class MemoryQueueFlowTests
 
       // another queue is already running
       fsm ! InitialDataStorageResults(`leaderKey`, Left(AlreadyExist()))
-
       parent.expectMsg(queueRemovedMsg)
       parent.expectMsg(message)
 
@@ -1341,6 +1367,9 @@ class MemoryQueueFlowTests
       // move to the Deprecated state
       probe.expectMsg(Transition(fsm, state, Removed))
 
+      // the queue is timed out again in the Removed state
+      fsm ! StateTimeout
+      parent.expectMsg(queueRemovedMsg)
       fsm ! QueueRemovedCompleted
 
       probe.expectTerminated(fsm, 10.seconds)
@@ -1431,6 +1460,7 @@ class MemoryQueueFlowTests
 
       // the queue is supposed to send queueRemovedMsg once again and stops itself.
       parent.expectMsg(queueRemovedMsg)
+      fsm ! QueueRemovedCompleted
       probe.expectTerminated(fsm, 10.seconds)
     }
   }
@@ -1501,7 +1531,7 @@ class MemoryQueueFlowTests
           fsm.setState(state, FlushingData(schedulingActors.ref, schedulingActors.ref, WhiskError, "whisk error"))
 
         case Removing =>
-          fsm.underlyingActor.containers = Set(testContainerId)
+          fsm.underlyingActor.containers = mutable.Set(testContainerId)
           fsm ! message
           fsm.setState(state, RemovingData(schedulingActors.ref, schedulingActors.ref, outdated = true))
 
@@ -1537,7 +1567,7 @@ class MemoryQueueFlowTests
           container.send(fsm, getActivation())
           container.expectMsg(ActivationResponse(Right(message)))
           // has no old containers for old queue, so send the message to queueManager
-          fsm.underlyingActor.containers = Set.empty[String]
+          fsm.underlyingActor.containers = mutable.Set.empty[String]
           fsm.underlyingActor.queue =
             Queue.apply(TimeSeriesActivationEntry(Instant.ofEpochMilli(Instant.now.toEpochMilli + 1000), message))
           fsm ! StopSchedulingAsOutdated
@@ -1742,20 +1772,20 @@ class MemoryQueueFlowTests
             UnwatchEndpoint(existingContainerKey, isPrefix = true, watcherName),
             UnwatchEndpoint(leaderKey, isPrefix = false, watcherName))
 
-          // queue is stale and will be removed
-          parent.expectMsg(queueRemovedMsg)
-
           probe.expectMsg(Transition(fsm, state, Removed))
 
+          // the queue is timed out againd in the Removed state
+          fsm ! StateTimeout
+
+          // queue is stale and will be removed
+          parent.expectMsg(queueRemovedMsg)
           fsm ! QueueRemovedCompleted
           probe.expectTerminated(fsm, 10.seconds)
 
         case _ =>
           // queue is stale and will be removed
-          parent.expectMsg(queueRemovedMsg)
-          probe.expectMsg(Transition(fsm, state, Removing))
 
-          fsm ! QueueRemovedCompleted
+          probe.expectMsg(Transition(fsm, state, Removing))
 
           // queue should not be terminated as there is an activation
           fsm ! StateTimeout
@@ -1771,6 +1801,11 @@ class MemoryQueueFlowTests
             UnwatchEndpoint(leaderKey, isPrefix = false, watcherName))
 
           probe.expectMsg(Transition(fsm, Removing, Removed))
+
+          // the queue is timed out againd in the Removed state
+          fsm ! StateTimeout
+          parent.expectMsg(queueRemovedMsg)
+          fsm ! QueueRemovedCompleted
           probe.expectTerminated(fsm, 10.seconds)
       }
     }
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
index 711847269..d7ec5afd1 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
@@ -57,6 +57,7 @@ import org.scalatest.junit.JUnitRunner
 import spray.json.{JsObject, JsString}
 
 import scala.collection.immutable.Queue
+import scala.collection.mutable
 import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.language.{higherKinds, postfixOps}
@@ -478,7 +479,7 @@ class MemoryQueueTests
     probe2 watch fsm
 
     // do not remove itself when there are still existing containers
-    fsm.underlyingActor.containers = Set("1")
+    fsm.underlyingActor.containers = mutable.Set("1")
     fsm.setState(Running, RunningData(schedulerActor, droppingActor))
     expectMsg(Transition(fsm, Uninitialized, Running))
     fsm ! StateTimeout
@@ -488,7 +489,7 @@ class MemoryQueueTests
     dataManagementService.expectNoMessage()
 
     // change the existing containers count to 0, the StateTimeout should work
-    fsm.underlyingActor.containers = Set.empty[String]
+    fsm.underlyingActor.containers = mutable.Set.empty[String]
     fsm ! StateTimeout
     probe.expectTerminated(schedulerActor)
     probe.expectTerminated(droppingActor)
@@ -496,12 +497,15 @@ class MemoryQueueTests
     expectMsg(Transition(fsm, Running, Idle))
 
     fsm ! StateTimeout
-    parent.expectMsg(QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(revision), Some(leaderKey)))
     expectMsg(Transition(fsm, Idle, Removed))
-    fsm ! QueueRemovedCompleted
     dataManagementService.expectMsg(UnregisterData(leaderKey))
     dataManagementService.expectMsg(UnregisterData(namespaceThrottlingKey))
     dataManagementService.expectMsg(UnregisterData(actionThrottlingKey))
+
+    // queue is timed out again in the Removed state.
+    fsm ! StateTimeout
+    parent.expectMsg(QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(revision), Some(leaderKey)))
+    fsm ! QueueRemovedCompleted
     probe2.expectTerminated(fsm)
 
     fsm.stop()
@@ -562,7 +566,7 @@ class MemoryQueueTests
     fsm ! Start
     expectMsg(Transition(fsm, Uninitialized, Running))
 
-    fsm.underlyingActor.creationIds = Set.empty[String]
+    fsm.underlyingActor.creationIds = mutable.Set.empty[String]
     fsm ! StateTimeout
     expectMsg(Transition(fsm, Running, Idle))
 
@@ -576,8 +580,8 @@ class MemoryQueueTests
       .futureValue shouldBe GetActivationResponse(Right(message))
 
     queueRef.queue.length shouldBe 0
-    fsm.underlyingActor.containers = Set.empty[String]
-    fsm.underlyingActor.creationIds = Set.empty[String]
+    fsm.underlyingActor.containers = mutable.Set.empty[String]
+    fsm.underlyingActor.creationIds = mutable.Set.empty[String]
     fsm ! StateTimeout
     expectMsg(Transition(fsm, Running, Idle))
     (fsm ? GetActivation(tid, fqn, testContainerId, false, None))
@@ -643,7 +647,7 @@ class MemoryQueueTests
     fsm ! Start
     expectMsg(Transition(fsm, Uninitialized, Running))
 
-    fsm.underlyingActor.creationIds = Set.empty[String]
+    fsm.underlyingActor.creationIds = mutable.Set.empty[String]
     fsm ! StateTimeout
     expectMsg(Transition(fsm, Running, Idle))
 
@@ -651,9 +655,13 @@ class MemoryQueueTests
     expectMsg(Transition(fsm, Idle, Removed))
     queueRef.queue.length shouldBe 0
     fsm ! message
-    parent.expectMsg(queueRemovedMsg)
+
+    // queue is timed out again in the Removed state.
     parent.expectMsg(message)
 
+    fsm ! StateTimeout
+    parent.expectMsg(queueRemovedMsg)
+
     expectNoMessage()
 
     fsm.stop()
@@ -1095,10 +1103,13 @@ class MemoryQueueTests
     fsm ! message
     probe.expectMsg(ActivationResponse.developerError("nonExecutbleAction error"))
 
-    parent.expectMsg(
+    parent.expectMsgAnyOf(
       2 * queueConfig.flushGrace + 5.seconds,
-      QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(action.rev), Some(leaderKey)))
-    parent.expectMsg(Transition(fsm, Flushing, Removed))
+      QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(action.rev), Some(leaderKey)),
+      Transition(fsm, Flushing, Removed))
+
+    fsm ! StateTimeout
+    parent.expectMsg(queueRemovedMsg)
     fsm ! QueueRemovedCompleted
     parent.expectTerminated(fsm)
 
@@ -1206,10 +1217,9 @@ class MemoryQueueTests
       lastAckedActivationResult.response.result shouldBe Some(JsObject("error" -> JsString("resource not enough")))
     }, 5.seconds)
 
-    parent.expectMsg(queueRemovedMsg)
-
     // should goto Removed
-    parent.expectMsg(Transition(fsm, Flushing, Removed))
+    parent.expectMsgAnyOf(queueRemovedMsg, Transition(fsm, Flushing, Removed))
+
     fsm ! QueueRemovedCompleted
 
     fsm.stop()
@@ -1250,7 +1260,7 @@ class MemoryQueueTests
     val now = Instant.now
     fsm.underlyingActor.queue =
       Queue.apply(TimeSeriesActivationEntry(Instant.ofEpochMilli(now.toEpochMilli + 1000), message))
-    fsm.underlyingActor.containers = Set.empty[String]
+    fsm.underlyingActor.containers = mutable.Set.empty[String]
     fsm.setState(Running, RunningData(probe.ref, probe.ref))
     fsm ! StopSchedulingAsOutdated // update action
     queueManager.expectMsg(staleQueueRemovedMsg)
@@ -1294,7 +1304,7 @@ class MemoryQueueTests
     val now = Instant.now
     fsm.underlyingActor.queue =
       Queue.apply(TimeSeriesActivationEntry(Instant.ofEpochMilli(now.toEpochMilli + 1000), message))
-    fsm.underlyingActor.containers = Set(testContainerId)
+    fsm.underlyingActor.containers = mutable.Set(testContainerId)
     fsm.setState(Running, RunningData(probe.ref, probe.ref))
     fsm ! StopSchedulingAsOutdated // update action
     queueManager.expectMsg(staleQueueRemovedMsg)
@@ -1399,10 +1409,10 @@ class MemoryQueueTests
 
     val duration = FiniteDuration(queueConfig.maxBlackboxRetentionMs, MILLISECONDS) + queueConfig.flushGrace
     probe.expectMsg(duration, ActivationResponse.whiskError("no available invokers"))
-    parent.expectMsg(
+    parent.expectMsgAnyOf(
       duration,
-      QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(action.rev), Some(leaderKey)))
-    parent.expectMsg(Transition(fsm, Flushing, Removed))
+      QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(action.rev), Some(leaderKey)),
+      Transition(fsm, Flushing, Removed))
     fsm ! QueueRemovedCompleted
     parent.expectTerminated(fsm)
 
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
index b60472e81..6d8d80f72 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
@@ -29,7 +29,7 @@ import org.apache.openwhisk.common.{GracefulShutdown, TransactionId}
 import org.apache.openwhisk.core.WarmUp.warmUpAction
 import org.apache.openwhisk.core.ack.ActiveAck
 import org.apache.openwhisk.core.connector.test.TestConnector
-import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, ActivationMessage, StatusData, StatusQuery}
+import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, ActivationMessage, GetState, StatusData}
 import org.apache.openwhisk.core.database.{ArtifactStore, DocumentRevisionMismatchException, UserContext}
 import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
 import org.apache.openwhisk.core.entity._
@@ -100,7 +100,8 @@ class QueueManagerTests
     ControllerInstanceId("0"),
     blocking = false,
     content = None)
-  val statusData = StatusData(testInvocationNamespace, testFQN.asString, 0, "Running", "RunningData")
+  val statusData =
+    StatusData(testInvocationNamespace, testFQN.asString, List.empty[ActivationId], "Running", "RunningData")
 
   // update start time for activation to ensure it's not stale
   def newActivation(start: Instant = Instant.now()): ActivationMessage = {
@@ -132,7 +133,7 @@ class QueueManagerTests
         override def receive: Receive = {
           case GetActivation(_, _, _, _, _, _) =>
             sender ! ActivationResponse(Right(newActivation()))
-          case StatusQuery =>
+          case GetState =>
             sender ! statusData
         }
       }))
@@ -796,7 +797,7 @@ class QueueManagerTests
 
     (queueManager ? QueueSize).mapTo[Int].futureValue shouldBe 1
 
-    (queueManager ? StatusQuery).mapTo[Future[Iterable[StatusData]]].futureValue.futureValue shouldBe List(statusData)
+    (queueManager ? GetState).mapTo[Future[List[StatusData]]].flatten.futureValue shouldBe List(statusData)
   }
 
   it should "drop the activation message that has not been scheduled for a long time" in {