You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2022/11/01 02:07:17 UTC
[openwhisk] branch master updated: Add zero downtime deployment (#5338)
This is an automated email from the ASF dual-hosted git repository.
style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 651a2e957 Add zero downtime deployment (#5338)
651a2e957 is described below
commit 651a2e95726f69fb8403c49ce909371521e8986f
Author: Dominic Kim <st...@apache.org>
AuthorDate: Tue Nov 1 11:07:12 2022 +0900
Add zero downtime deployment (#5338)
* Deploy controllers without downtime
* Deploy invokers without downtime
* Deploy schedulers without downtime
* Fix typo
* Fix typo
* Add a disable API to controllers
* Remove unnecessary steps
* Add more logs for container liveness
* Change Set to thread-safe one
* Use the transaction ID of the activation
* Gracefully shutdown activation client proxy
* Update core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
Apply suggestion
Co-authored-by: Brendan Doyle <bd...@gmail.com>
* Update core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
Apply suggestion
Co-authored-by: Brendan Doyle <bd...@gmail.com>
* Update core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
Co-authored-by: Brendan Doyle <bd...@gmail.com>
* Apply https://github.com/apache/openwhisk/pull/5334
* Remove akka-http dependency from the invoker reactive
* Exclude the prewarm containers count from the /pool/count route
* Add missing import
* Make it compatible with scala-2.13
In scala-2.13 mapValues returns a MapView, and it cannot be cast to Map by default.
* Fix test cases
* Add container id to the logs of ActivationClientProxy
Co-authored-by: Brendan Doyle <bd...@gmail.com>
---
ansible/group_vars/all | 13 +++-
ansible/roles/controller/tasks/deploy.yml | 83 ++++++++++++++++++++++
ansible/roles/invoker/tasks/clean.yml | 33 +++++++++
ansible/roles/invoker/tasks/deploy.yml | 6 ++
ansible/roles/schedulers/tasks/deploy.yml | 21 ++----
ansible/templates/whisk.properties.j2 | 4 ++
.../org/apache/openwhisk/core/WhiskConfig.scala | 3 +
.../apache/openwhisk/core/connector/Message.scala | 8 ++-
.../openwhisk/core/service/WatcherService.scala | 10 ++-
.../controller/src/main/resources/application.conf | 4 ++
.../openwhisk/core/controller/Controller.scala | 46 +++++++++++-
.../core/loadBalancer/CommonLoadBalancer.scala | 5 +-
.../core/loadBalancer/FPCPoolBalancer.scala | 5 +-
.../openwhisk/core/loadBalancer/LoadBalancer.scala | 2 +-
.../containerpool/v2/ActivationClientProxy.scala | 61 ++++++++--------
.../v2/FunctionPullingContainerPool.scala | 36 +++++++++-
.../v2/FunctionPullingContainerProxy.scala | 33 ++++++---
.../containerpool/v2/InvokerHealthManager.scala | 2 +-
.../core/invoker/DefaultInvokerServer.scala | 6 +-
.../core/invoker/FPCInvokerReactive.scala | 27 ++++---
.../openwhisk/core/invoker/FPCInvokerServer.scala | 28 +++++++-
.../apache/openwhisk/core/invoker/Invoker.scala | 11 +--
.../openwhisk/core/invoker/InvokerReactive.scala | 31 ++++----
.../core/scheduler/FPCSchedulerServer.scala | 28 +++++---
.../openwhisk/core/scheduler/Scheduler.scala | 4 +-
.../scheduler/grpc/ActivationServiceImpl.scala | 2 +
.../core/scheduler/queue/MemoryQueue.scala | 52 +++++++++-----
.../core/scheduler/queue/QueueManager.scala | 10 +--
.../v2/test/ActivationClientProxyTests.scala | 10 +--
.../test/FunctionPullingContainerPoolTests.scala | 49 ++++++++++---
.../test/FunctionPullingContainerProxyTests.scala | 4 +-
.../controller/test/ControllerTestCommon.scala | 2 +-
.../invoker/test/DefaultInvokerServerTests.scala | 22 ++++--
.../core/invoker/test/FPCInvokerServerTests.scala | 22 ++++--
.../core/scheduler/FPCSchedulerServerTests.scala | 15 ++--
.../queue/test/MemoryQueueFlowTests.scala | 83 +++++++++++++++-------
.../scheduler/queue/test/MemoryQueueTests.scala | 50 +++++++------
.../scheduler/queue/test/QueueManagerTests.scala | 9 +--
38 files changed, 607 insertions(+), 233 deletions(-)
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index bac6e2372..072e4aebb 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -112,6 +112,8 @@ controller:
authentication:
spi: "{{ controller_authentication_spi | default('') }}"
loglevel: "{{ controller_loglevel | default(whisk_loglevel) | default('INFO') }}"
+ username: "{{ controller_username | default('controller.user') }}"
+ password: "{{ controller_password | default('controller.pass') }}"
entitlement:
spi: "{{ controller_entitlement_spi | default('') }}"
protocol: "{{ controller_protocol | default('https') }}"
@@ -126,6 +128,10 @@ controller:
password: "openwhisk"
name: "{{ __controller_ssl_keyPrefix }}openwhisk-keystore.p12"
extraEnv: "{{ controller_extraEnv | default({}) }}"
+ deployment:
+ ignore_error: "{{ controller_deployment_ignore_error | default('False') }}"
+ retries: "{{ controller_deployment_retries | default(180) }}"
+ delay: "{{ controller_deployment_delay | default(5) }}"
jmx:
basePortController: 15000
@@ -234,6 +240,10 @@ invoker:
creationMaxPeek: "{{ container_creation_max_peek | default(500) }}"
reactiveSpi: "{{ invokerReactive_spi | default('') }}"
serverSpi: "{{ invokerServer_spi | default('') }}"
+ deployment:
+ ignore_error: "{{ invoker_deployment_ignore_error | default('False') }}"
+ retries: "{{ invoker_deployment_retries | default(180) }}"
+ delay: "{{ invoker_deployment_delay | default(5) }}"
userLogs:
spi: "{{ userLogs_spi | default('org.apache.openwhisk.core.containerpool.logging.DockerToActivationLogStoreProvider') }}"
@@ -450,8 +460,7 @@ metrics:
user_events: "{{ user_events_enabled | default(false) | lower }}"
zeroDowntimeDeployment:
- enabled: "{{ zerodowntime_deployment_switch | default(true) }}"
- solution: "{{ zerodowntime_deployment_solution | default('apicall') }}"
+ enabled: "{{ zerodowntime_deployment_switch | default(false) }}"
etcd:
version: "{{ etcd_version | default('v3.4.0') }}"
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 26d241ce6..5c46392a3 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -155,6 +155,8 @@
"CONFIG_whisk_info_date": "{{ whisk.version.date }}"
"CONFIG_whisk_info_buildNo": "{{ docker.image.tag }}"
"CONFIG_whisk_cluster_name": "{{ whisk.cluster_name | lower }}"
+ "CONFIG_whisk_controller_username": "{{ controller.username }}"
+ "CONFIG_whisk_controller_password": "{{ controller.password }}"
"KAFKA_HOSTS": "{{ kafka_connect_string }}"
"CONFIG_whisk_kafka_replicationFactor":
@@ -363,6 +365,53 @@
include_tasks: "lean.yml"
when: lean
+# Before redeploy controller, should remove that controller instance from nginx
+- name: remove the controller from nginx's upstream configuration
+ shell:
+ docker exec -t nginx sh -c "sed -i \"s/ server {{ ansible_host }}:{{ controller.basePort + (controller_index | int) }}/ \#server {{ ansible_host }}:{{ controller.basePort + (controller_index | int) }}/g\" /etc/nginx/nginx.conf && nginx -s reload"
+ delegate_to: "{{ item }}"
+ with_items: "{{ groups['edge'] }}"
+ when: zeroDowntimeDeployment.enabled == true
+
+- name: wait some time for controllers fire all existing triggers
+ shell: sleep 5s
+ when: zeroDowntimeDeployment.enabled == true
+
+- name: wait until {{ controller_name }} executes all existing activations
+ uri:
+ url: "{{ controller.protocol }}://{{ ansible_host }}:{{ controller.basePort + (controller_index | int) }}/activation/count"
+ validate_certs: no
+ client_key: "{{ controller.confdir }}/controller{{ groups['controllers'].index(inventory_hostname) }}/{{ controller.ssl.key }}"
+ client_cert: "{{ controller.confdir }}/controller{{ groups['controllers'].index(inventory_hostname) }}/{{ controller.ssl.cert }}"
+ return_content: yes
+ user: "{{ controller.username }}"
+ password: "{{ controller.password }}"
+ force_basic_auth: yes
+ register: result
+ until: result.content == '0'
+ retries: "{{ controller.deployment.retries }}"
+ delay: "{{ controller.deployment.delay }}"
+ when: zeroDowntimeDeployment.enabled == true
+ ignore_errors: "{{ controller.deployment.ignore_error }}"
+
+- name: Disable {{ controller_name }} before remove controller
+ uri:
+ url: "{{ controller.protocol }}://{{ ansible_host }}:{{ controller.basePort + groups['controllers'].index(inventory_hostname) }}/disable"
+ validate_certs: no
+ client_key: "{{ controller.confdir }}/{{ controller_name }}/{{ controller.ssl.key }}"
+ client_cert: "{{ controller.confdir }}/{{ controller_name }}/{{ controller.ssl.cert }}"
+ method: POST
+ status_code: 200
+ user: "{{ controller.username }}"
+ password: "{{ controller.password }}"
+ force_basic_auth: yes
+ ignore_errors: "{{ controller.deployment.ignore_error }}"
+ when: zeroDowntimeDeployment.enabled == true
+
+- name: wait some time for controller to gracefully shutdown the consumer for activation ack
+ shell: sleep 5s
+ when: zeroDowntimeDeployment.enabled == true
+
- name: (re)start controller
docker_container:
name: "{{ controller_name }}"
@@ -397,3 +446,37 @@
until: result.status == 200
retries: 12
delay: 10
+
+- name: warm up activation path
+ uri:
+ url:
+ "{{controller.protocol}}://{{ lookup('file', '{{ catalog_auth_key }}')}}@{{ansible_host}}:{{controller_port}}/api/v1/namespaces/_/actions/invokerHealthTestAction{{controller_index}}?blocking=false&result=false"
+ validate_certs: "no"
+ client_key:
+ "{{ controller.confdir }}/{{ controller_name }}/{{ controller.ssl.key }}"
+ client_cert:
+ "{{ controller.confdir }}/{{ controller_name }}/{{ controller.ssl.cert }}"
+ method: POST
+ ignore_errors: True
+
+- name: wait for all invokers in {{ controller_name }} to become up
+ uri:
+ url: "{{ controller.protocol }}://{{ ansible_host }}:{{ controller.basePort + (controller_index | int) }}/invokers"
+ validate_certs: no
+ client_key: "{{ controller.confdir }}/controller{{ groups['controllers'].index(inventory_hostname) }}/{{ controller.ssl.key }}"
+ client_cert: "{{ controller.confdir }}/controller{{ groups['controllers'].index(inventory_hostname) }}/{{ controller.ssl.cert }}"
+ return_content: yes
+ register: invokerStatus
+ until: invokerStatus.json|length >= 1 and "unhealthy" not in invokerStatus.content
+ retries: 14
+ delay: 5
+ when: zeroDowntimeDeployment.enabled == true
+
+# When all invokers report their status to controller, add the controller instance to nginx when exist at least one invoker is up
+- name: Add the controller back to nginx's upstream configuration when there exist at least one healthy invoker
+ shell:
+ docker exec -t nginx sh -c "sed -i \"s/ \#server {{ ansible_host }}:{{ controller.basePort + (controller_index | int) }}/ server {{ ansible_host }}:{{ controller.basePort + (controller_index | int) }}/g\" /etc/nginx/nginx.conf && nginx -s reload"
+ delegate_to: "{{ item }}"
+ with_items: "{{ groups['edge'] }}"
+ ignore_errors: True
+ when: zeroDowntimeDeployment.enabled == true and "up" in invokerStatus.content
diff --git a/ansible/roles/invoker/tasks/clean.yml b/ansible/roles/invoker/tasks/clean.yml
index 8d7ebaca2..b8f0b2f34 100644
--- a/ansible/roles/invoker/tasks/clean.yml
+++ b/ansible/roles/invoker/tasks/clean.yml
@@ -22,6 +22,37 @@
invoker_name: "{{ name_prefix ~ ((invoker_index_base | int) + host_group.index(inventory_hostname)) }}"
invoker_index: "{{ (invoker_index_base | int) + host_group.index(inventory_hostname) }}"
+- name: disable invoker{{ groups['invokers'].index(inventory_hostname) }}
+ uri:
+ url: "{{ invoker.protocol }}://{{ ansible_host }}:{{ invoker.port + groups['invokers'].index(inventory_hostname) }}/disable"
+ validate_certs: no
+ client_key: "{{ invoker.confdir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}/{{ invoker.ssl.key }}"
+ client_cert: "{{ invoker.confdir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}/{{ invoker.ssl.cert }}"
+ method: POST
+ status_code: 200
+ user: "{{ invoker.username }}"
+ password: "{{ invoker.password }}"
+ force_basic_auth: yes
+ ignore_errors: "{{ invoker.deployment.ignore_error }}"
+ when: zeroDowntimeDeployment.enabled == true and enable_scheduler
+
+- name: wait invoker{{ groups['invokers'].index(inventory_hostname) }} to clean up all existing containers
+ uri:
+ url: "{{ invoker.protocol }}://{{ ansible_host }}:{{ invoker.port + groups['invokers'].index(inventory_hostname) }}/pool/count"
+ validate_certs: no
+ client_key: "{{ invoker.confdir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}/{{ invoker.ssl.key }}"
+ client_cert: "{{ invoker.confdir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}/{{ invoker.ssl.cert }}"
+ user: "{{ invoker.username }}"
+ password: "{{ invoker.password }}"
+ force_basic_auth: yes
+ return_content: yes
+ register: result
+ until: result.content == '0'
+ retries: "{{ invoker.deployment.retries }}"
+ delay: "{{ invoker.deployment.delay }}"
+ when: zeroDowntimeDeployment.enabled == true and enable_scheduler
+ ignore_errors: "{{ invoker.deployment.ignore_error }}"
+
- name: remove invoker
docker_container:
name: "{{ invoker_name }}"
@@ -59,12 +90,14 @@
path: "{{ whisk_logs_dir }}/{{ invoker_name }}"
state: absent
become: "{{ logs.dir.become }}"
+ when: mode == "clean"
- name: remove invoker conf directory
file:
path: "{{ invoker.confdir }}/{{ invoker_name }}"
state: absent
become: "{{ invoker.dir.become }}"
+ when: mode == "clean"
# Workaround for orphaned ifstate.veth* files on Ubuntu 14.04
# See https://github.com/moby/moby/issues/22513
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 0223df97d..62e21fb34 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -17,6 +17,12 @@
---
# This role installs invokers.
+###
+# When the zero-downtime-deployment is enabled, clean.yml is used to gracefully shut down the invoker.
+#
+- import_tasks: clean.yml
+ when: zeroDowntimeDeployment.enabled == true and enable_scheduler
+
- import_tasks: docker_login.yml
- name: get invoker name and index
diff --git a/ansible/roles/schedulers/tasks/deploy.yml b/ansible/roles/schedulers/tasks/deploy.yml
index d1ec6e487..280ea68b7 100644
--- a/ansible/roles/schedulers/tasks/deploy.yml
+++ b/ansible/roles/schedulers/tasks/deploy.yml
@@ -280,11 +280,6 @@
include_tasks: "{{ item }}.yml"
with_items: "{{ scheduler_plugins | default([]) }}"
-- name: Judge current scheduler whether deployed
- shell: echo $(docker ps | grep {{ scheduler_name }} | wc -l)
- register: schedulerDeployed
- when: zeroDowntimeDeployment.enabled == true
-
- name: disable scheduler{{ groups['schedulers'].index(inventory_hostname) }} before redeploy scheduler
uri:
url: "{{ scheduler.protocol }}://{{ ansible_host }}:{{ scheduler_port }}/disable"
@@ -295,27 +290,23 @@
password: "{{ scheduler.password }}"
force_basic_auth: yes
ignore_errors: "{{ scheduler.deployment_ignore_error }}"
- when: zeroDowntimeDeployment.enabled == true and schedulerDeployed.stdout != "0"
+ when: zeroDowntimeDeployment.enabled == true
-- name: wait until all queue and create queue task is finished before redeploy scheduler when using apicall solution or half solution
+- name: wait until all activation is finished before redeploy scheduler
uri:
- url: "{{ scheduler.protocol }}://{{ ansible_host }}:{{ scheduler_port }}/queue/total"
+ url: "{{ scheduler.protocol }}://{{ ansible_host }}:{{ scheduler_port }}/activation/count"
validate_certs: no
return_content: yes
user: "{{ scheduler.username }}"
password: "{{ scheduler.password }}"
force_basic_auth: yes
- register: totalQueue
- until: totalQueue.content == "0"
+ register: result
+ until: result.content == "0"
retries: 180
delay: 5
- when: zeroDowntimeDeployment.enabled == true and schedulerDeployed.stdout != "0"
+ when: zeroDowntimeDeployment.enabled == true
ignore_errors: "{{ scheduler.deployment_ignore_error }}"
-- name: wait until all queue and create queue task is finished before redeploy scheduler using sleep solution
- shell: sleep 120s
- when: zeroDowntimeDeployment.enabled == true and schedulerDeployed.stdout != "0" and zeroDowntimeDeployment.solution == 'sleep'
-
- name: (re)start scheduler
docker_container:
name: "{{ scheduler_name }}"
diff --git a/ansible/templates/whisk.properties.j2 b/ansible/templates/whisk.properties.j2
index 03ade2b2d..e9dc2c2dd 100644
--- a/ansible/templates/whisk.properties.j2
+++ b/ansible/templates/whisk.properties.j2
@@ -53,11 +53,15 @@ edge.host.apiport=443
kafkaras.host.port={{ kafka.ras.port }}
redis.host.port={{ redis.port }}
invoker.hosts.basePort={{ invoker.port }}
+invoker.username={{ invoker.username }}
+invoker.password={{ invoker.password }}
controller.hosts={{ groups["controllers"] | map('extract', hostvars, 'ansible_host') | list | join(",") }}
controller.host.basePort={{ controller.basePort }}
controller.instances={{ controller.instances }}
controller.protocol={{ controller.protocol }}
+controller.username={{ controller.username }}
+controller.password={{ controller.password }}
invoker.container.network=bridge
invoker.container.policy={{ invoker_container_policy_name | default()}}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index c029edb9d..f9039e947 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -312,6 +312,9 @@ object ConfigKeys {
val dataManagementServiceRetryInterval = "whisk.scheduler.data-management-service.retry-interval"
+ val whiskControllerUsername = "whisk.controller.username"
+ val whiskControllerPassword = "whisk.controller.password"
+
val whiskSchedulerUsername = "whisk.scheduler.username"
val whiskSchedulerPassword = "whisk.scheduler.password"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
index b65b11496..da0c4d102 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
@@ -532,9 +532,13 @@ object InvokerResourceMessage extends DefaultJsonProtocol {
* ...
* ]
*/
-object StatusQuery
+object GetState
-case class StatusData(invocationNamespace: String, fqn: String, waitingActivation: Int, status: String, data: String)
+case class StatusData(invocationNamespace: String,
+ fqn: String,
+ waitingActivation: List[ActivationId],
+ status: String,
+ data: String)
extends Message {
override def serialize: String = StatusData.serdes.write(this).compactPrint
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala
index 2c544185e..e5f3397da 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala
@@ -20,9 +20,10 @@ package org.apache.openwhisk.core.service
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.ibm.etcd.api.Event.EventType
import com.ibm.etcd.client.kv.WatchUpdate
-import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.common.{GracefulShutdown, Logging}
import org.apache.openwhisk.core.etcd.EtcdClient
import org.apache.openwhisk.core.etcd.EtcdType._
+
import scala.collection.JavaConverters._
import scala.collection.concurrent.TrieMap
@@ -141,6 +142,13 @@ class WatcherService(etcdClient: EtcdClient)(implicit logging: Logging, actorSys
// always send WatcherClosed back to sender if it need a feedback
if (request.needFeedback)
sender ! WatcherClosed(request.watchKey, request.isPrefix)
+
+ case GracefulShutdown =>
+ watcher.close()
+ putWatchers.clear()
+ deleteWatchers.clear()
+ prefixPutWatchers.clear()
+ prefixDeleteWatchers.clear()
}
}
diff --git a/core/controller/src/main/resources/application.conf b/core/controller/src/main/resources/application.conf
index e7bf1a956..7467b2df1 100644
--- a/core/controller/src/main/resources/application.conf
+++ b/core/controller/src/main/resources/application.conf
@@ -122,4 +122,8 @@ whisk{
file-system : true
dir-path : "/swagger-ui/"
}
+ controller {
+ username: "controller.user"
+ password: "controller.pass"
+ }
}
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
index 0e662faa4..2ff1ecb56 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
@@ -22,7 +22,8 @@ import akka.actor.{ActorSystem, CoordinatedShutdown}
import akka.event.Logging.InfoLevel
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.model.StatusCodes._
-import akka.http.scaladsl.model.Uri
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.model.{StatusCodes, Uri}
import akka.http.scaladsl.server.Route
import kamon.Kamon
import org.apache.openwhisk.common.Https.HttpsConfig
@@ -36,6 +37,7 @@ import org.apache.openwhisk.core.entity.ActivationId.ActivationIdGenerator
import org.apache.openwhisk.core.entity.ExecManifest.Runtimes
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.loadBalancer.LoadBalancerProvider
+import org.apache.openwhisk.http.ErrorResponse.terminate
import org.apache.openwhisk.http.{BasicHttpService, BasicRasService}
import org.apache.openwhisk.spi.SpiLoader
import pureconfig._
@@ -96,7 +98,7 @@ class Controller(val instance: ControllerInstanceId,
(pathEndOrSingleSlash & get) {
complete(info)
}
- } ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth
+ } ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth ~ activationStatus ~ disable
}
// initialize datastores
@@ -167,6 +169,22 @@ class Controller(val instance: ControllerInstanceId,
}
}
+ /**
+ * Handles GET /activation URI.
+ *
+ * @return running activation
+ */
+ protected[controller] val activationStatus = {
+ implicit val executionContext = actorSystem.dispatcher
+ (pathPrefix("activation") & get) {
+ pathEndOrSingleSlash {
+ complete(loadBalancer.activeActivationsByController.map(_.toJson))
+ } ~ path("count") {
+ complete(loadBalancer.activeActivationsByController(controllerInstance.asString).map(_.toJson))
+ }
+ }
+ }
+
// controller top level info
private val info = Controller.info(
whiskConfig,
@@ -175,6 +193,30 @@ class Controller(val instance: ControllerInstanceId,
LogLimit.config,
runtimes,
List(apiV1.basepath()))
+
+ private val controllerUsername = loadConfigOrThrow[String](ConfigKeys.whiskControllerUsername)
+ private val controllerPassword = loadConfigOrThrow[String](ConfigKeys.whiskControllerPassword)
+
+ /**
+ * disable controller
+ */
+ private def disable(implicit transid: TransactionId) = {
+ implicit val executionContext = actorSystem.dispatcher
+ implicit val jsonPrettyResponsePrinter = PrettyPrinter
+ (path("disable") & post) {
+ extractCredentials {
+ case Some(BasicHttpCredentials(username, password)) =>
+ if (username == controllerUsername && password == controllerPassword) {
+ loadBalancer.close
+ logging.warn(this, "controller is disabled")
+ complete("controller is disabled")
+ } else {
+ terminate(StatusCodes.Unauthorized, "username or password are wrong.")
+ }
+ case _ => terminate(StatusCodes.Unauthorized)
+ }
+ }
+ }
}
/**
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
index 045828d66..1a204cf42 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
@@ -86,8 +86,9 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
override def totalActiveActivations: Future[Int] = Future.successful(totalActivations.intValue)
override def activeActivationsByController(controller: String): Future[Int] =
Future.successful(activationsPerController.get(ControllerInstanceId(controller)).map(_.intValue()).getOrElse(0))
- override def activeActivationsByController: Future[List[ActivationId]] =
- Future.successful(activationSlots.keySet.toList)
+ override def activeActivationsByController: Future[List[(String, String)]] =
+ Future.successful(
+ activationSlots.values.map(entry => (entry.id.asString, entry.fullyQualifiedEntityName.toString)).toList)
override def activeActivationsByInvoker(invoker: String): Future[Int] =
Future.successful(
activationsPerInvoker.get(InvokerInstanceId(invoker.toInt, userMemory = 0.MB)).map(_.intValue()).getOrElse(0))
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
index 3c247e0dd..bf04d1d2a 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
@@ -663,8 +663,9 @@ class FPCPoolBalancer(config: WhiskConfig,
Future.successful(activationsPerController.get(ControllerInstanceId(controller)).map(_.intValue()).getOrElse(0))
/** Gets the in-flight activations */
- override def activeActivationsByController: Future[List[ActivationId]] =
- Future.successful(activationSlots.keySet.toList)
+ override def activeActivationsByController: Future[List[(String, String)]] =
+ Future.successful(
+ activationSlots.values.map(entry => (entry.id.asString, entry.fullyQualifiedEntityName.toString)).toList)
/** Gets the number of in-flight activations for a specific invoker. */
override def activeActivationsByInvoker(invoker: String): Future[Int] = Future.successful(0)
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
index a4a0038bc..d9a38c6d2 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
@@ -59,7 +59,7 @@ trait LoadBalancer {
def activeActivationsByController(controller: String): Future[Int]
/** Gets the in-flight activations */
- def activeActivationsByController: Future[List[ActivationId]]
+ def activeActivationsByController: Future[List[(String, String)]]
/** Gets the number of in-flight activations for a specific invoker. */
def activeActivationsByInvoker(invoker: String): Future[Int]
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
index 2c2392feb..f525f88de 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
@@ -22,7 +22,7 @@ import akka.actor.{ActorRef, ActorSystem, FSM, Props, Stash}
import akka.grpc.internal.ClientClosedException
import akka.pattern.pipe
import io.grpc.StatusRuntimeException
-import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.common.{GracefulShutdown, Logging, TransactionId}
import org.apache.openwhisk.core.connector.ActivationMessage
import org.apache.openwhisk.core.containerpool.ContainerId
import org.apache.openwhisk.core.entity._
@@ -124,13 +124,13 @@ class ActivationClientProxy(
stay()
case Event(e: RescheduleActivation, client: Client) =>
- logging.info(this, s"got a reschedule message ${e.msg.activationId} for action: ${e.msg.action}")
+ logging.info(this, s"[${containerId.asString}] got a reschedule message ${e.msg.activationId} for action: ${e.msg.action}")
client.activationClient
.rescheduleActivation(
RescheduleRequest(e.invocationNamespace, e.fqn.serialize, e.rev.serialize, e.msg.serialize))
.recover {
case t =>
- logging.error(this, s"Failed to reschedule activation (error: $t)")
+ logging.error(this, s"[${containerId.asString}] Failed to reschedule activation (error: $t)")
RescheduleResponse()
}
.foreach(res => {
@@ -139,7 +139,7 @@ class ActivationClientProxy(
stay()
case Event(msg: ActivationMessage, _: Client) =>
- logging.debug(this, s"got a message ${msg.activationId} for action: ${msg.action}")
+ logging.debug(this, s"[${containerId.asString}] got a message ${msg.activationId} for action: ${msg.action}")
context.parent ! msg
stay()
@@ -152,7 +152,7 @@ class ActivationClientProxy(
case _: NoMemoryQueue =>
logging.error(
this,
- s"The queue of action ${action} under invocationNamespace ${invocationNamespace} does not exist. Check for queues in other schedulers.")
+ s"[${containerId.asString}] The queue of action ${action} under invocationNamespace ${invocationNamespace} does not exist. Check for queues in other schedulers.")
c.activationClient
.close()
.flatMap(_ =>
@@ -162,7 +162,7 @@ class ActivationClientProxy(
stay()
case _: ActionMismatch =>
- logging.error(this, s"action version does not match: $action")
+ logging.error(this, s"[${containerId.asString}] action version does not match: $action")
c.activationClient.close().andThen {
case _ => self ! ClientClosed
}
@@ -170,7 +170,7 @@ class ActivationClientProxy(
goto(ClientProxyRemoving)
case _: NoActivationMessage => // retry
- logging.debug(this, s"no activation message exist: $action")
+ logging.debug(this, s"[${containerId.asString}] no activation message exist: $action")
context.parent ! RetryRequestActivation
stay()
@@ -182,7 +182,7 @@ class ActivationClientProxy(
case Event(f: FailureMessage, c: Client) =>
f.cause match {
case t: ParsingException =>
- logging.error(this, s"failed to parse activation message: $t")
+ logging.error(this, s"[${containerId.asString}] failed to parse activation message: $t")
context.parent ! RetryRequestActivation
stay()
@@ -191,13 +191,13 @@ class ActivationClientProxy(
// In such situation, it is better to stop the activationClientProxy, otherwise, in short time,
// it would print huge log due to create another grpcClient to fetch activation again.
case t: StatusRuntimeException if t.getMessage.contains(ActivationClientProxy.hostResolveError) =>
- logging.error(this, s"akka grpc server connection failed: $t")
+ logging.error(this, s"[${containerId.asString}] akka grpc server connection failed: $t")
self ! ClientClosed
goto(ClientProxyRemoving)
case t: StatusRuntimeException =>
- logging.error(this, s"akka grpc server connection failed: $t")
+ logging.error(this, s"[${containerId.asString}] akka grpc server connection failed: $t")
c.activationClient
.close()
.flatMap(_ =>
@@ -207,13 +207,13 @@ class ActivationClientProxy(
stay()
case _: ClientClosedException =>
- logging.error(this, s"grpc client is already closed for $action")
+ logging.error(this, s"[${containerId.asString}] grpc client is already closed for $action")
self ! ClientClosed
goto(ClientProxyRemoving)
case t: Throwable =>
- logging.error(this, s"get activation from remote server error: $t")
+ logging.error(this, s"[${containerId.asString}] get activation from remote server error: $t")
safelyCloseClient(c)
goto(ClientProxyRemoving)
}
@@ -227,22 +227,12 @@ class ActivationClientProxy(
}
when(ClientProxyRemoving) {
- case Event(request: RequestActivation, client: Client) =>
- request.newScheduler match {
- // if scheduler is changed, client needs to be recreated
- case Some(scheduler) if scheduler.host != client.rpcHost || scheduler.rpcPort != client.rpcPort =>
- val newHost = request.newScheduler.get.host
- val newPort = request.newScheduler.get.rpcPort
- client.activationClient
- .close()
- .flatMap(_ =>
- createActivationClient(invocationNamespace, action, newHost, newPort, tryOtherScheduler = false))
- .pipeTo(self)
- case _ =>
- requestActivationMessage(invocationNamespace, action, rev, client.activationClient, request.lastDuration)
- .pipeTo(self)
- }
+ // This is the case where the last activation message is sent to the container proxy and container proxy requested
+ // another activation. But the activation client is being shut down and it no longer fetches any request.
+ case Event(_: RequestActivation, c: Client) =>
+ safelyCloseClient(c)
+
stay()
case Event(msg: ActivationMessage, _: Client) =>
@@ -250,13 +240,14 @@ class ActivationClientProxy(
stay()
- case Event(_: MemoryQueueError, _: Client) =>
+ case Event(_: MemoryQueueError, c: Client) =>
+ safelyCloseClient(c)
self ! ClientClosed
stay()
case Event(f: FailureMessage, c: Client) =>
- logging.error(this, s"some error happened for action: ${action} in state: $stateName, caused by: $f")
+ logging.error(this, s"[${containerId.asString}] some error happened for action: ${action} in state: $stateName, caused by: $f")
safelyCloseClient(c)
stay()
@@ -279,16 +270,24 @@ class ActivationClientProxy(
warmed = true
stay
- case Event(CloseClientProxy, c: Client) =>
- safelyCloseClient(c)
+ // When disabling an invoker, there could still be activations in the queue.
+ // The activation client keeps fetching data and will forward it to the container(parent).
+ // Once it receives `NoActivationMessage` from the queue, it will close the activation client and send `ClientClosed`
+ // to the container(parent), rather than sending `RetryRequestActivation`.
+ // When a container proxy(parent) receives `ClientClosed`, it will finally shut down.
+ case Event(GracefulShutdown, _: Client) =>
+ logging.info(this, s"[${containerId.asString}] safely close client proxy and go to the ClientProxyRemoving state")
+
goto(ClientProxyRemoving)
case Event(ClientClosed, _) =>
+ logging.info(this, s"[${containerId.asString}] the underlying client is closed, stopping the activation client proxy")
context.parent ! ClientClosed
stop()
case Event(StopClientProxy, c: Client) =>
+ logging.info(this, s"[${containerId.asString}] stop close client proxy and go to the ClientProxyRemoving state")
safelyCloseClient(c)
stay()
}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
index 5b0c283c3..2a89630a5 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
@@ -18,14 +18,15 @@
package org.apache.openwhisk.core.containerpool.v2
import java.util.concurrent.atomic.AtomicInteger
-import akka.actor.{Actor, ActorRef, ActorRefFactory, Cancellable, Props}
+import akka.actor.{Actor, ActorRef, ActorRefFactory, Cancellable, Props}
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.connector.ContainerCreationError._
import org.apache.openwhisk.core.connector.{
ContainerCreationAckMessage,
ContainerCreationMessage,
ContainerDeletionMessage,
+ GetState,
ResultMetadata
}
import org.apache.openwhisk.core.containerpool.{
@@ -41,6 +42,7 @@ import org.apache.openwhisk.core.containerpool.{
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.http.Messages
+import spray.json.DefaultJsonProtocol
import scala.annotation.tailrec
import scala.collection.concurrent.TrieMap
@@ -50,6 +52,27 @@ import scala.concurrent.duration._
import scala.util.{Random, Try}
import scala.collection.immutable.Queue
+object TotalContainerPoolState extends DefaultJsonProtocol {
+ implicit val prewarmedPoolSerdes = jsonFormat2(PrewarmedContainerPoolState.apply)
+ implicit val warmPoolSerdes = jsonFormat2(WarmContainerPoolState.apply)
+ implicit val totalPoolSerdes = jsonFormat5(TotalContainerPoolState.apply)
+}
+
+case class PrewarmedContainerPoolState(total: Int, countsByKind: Map[String, Int])
+case class WarmContainerPoolState(total: Int, containers: List[BasicContainerInfo])
+case class TotalContainerPoolState(totalContainers: Int,
+ inProgressCount: Int,
+ prewarmedPool: PrewarmedContainerPoolState,
+ busyPool: WarmContainerPoolState,
+ pausedPool: WarmContainerPoolState) {
+
+ def serialize(): String = TotalContainerPoolState.totalPoolSerdes.write(this).compactPrint
+}
+
+case class NotSupportedPoolState() {
+ def serialize(): String = "not supported"
+}
+
case class CreationContainer(creationMessage: ContainerCreationMessage, action: WhiskAction)
case class DeletionContainer(deletionMessage: ContainerDeletionMessage)
case object Remove
@@ -88,7 +111,7 @@ class FunctionPullingContainerPool(
implicit val ec = context.system.dispatcher
- protected[containerpool] var busyPool = immutable.Map.empty[ActorRef, Data]
+ protected[containerpool] var busyPool = immutable.Map.empty[ActorRef, ContainerAvailableData]
protected[containerpool] var inProgressPool = immutable.Map.empty[ActorRef, Data]
protected[containerpool] var warmedPool = immutable.Map.empty[ActorRef, WarmData]
protected[containerpool] var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmData]
@@ -413,6 +436,15 @@ class FunctionPullingContainerPool(
// Reset the prewarmCreateCount value when do expiration check and backfill prewarm if possible
prewarmCreateFailedCount.set(0)
adjustPrewarmedContainer(false, true)
+
+ case GetState =>
+ val totalContainers = busyPool.size + inProgressPool.size + warmedPool.size + prewarmedPool.size
+ val prewarmedState =
+ PrewarmedContainerPoolState(prewarmedPool.size, prewarmedPool.groupBy(_._2.kind).mapValues(_.size).toMap)
+ val busyState = WarmContainerPoolState(busyPool.size, busyPool.values.map(_.basicContainerInfo).toList)
+ val pausedState = WarmContainerPoolState(warmedPool.size, warmedPool.values.map(_.basicContainerInfo).toList)
+ sender() ! TotalContainerPoolState(totalContainers, inProgressPool.size, prewarmedState, busyState, pausedState)
+
}
/** Install prewarm containers up to the configured requirements for each kind/memory combination or specified kind/memory */
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
index 2049dc2d1..28d3e1e1c 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
@@ -127,16 +127,30 @@ case class PreWarmData(container: Container,
def isExpired(): Boolean = expires.exists(_.isOverdue())
}
-case class ContainerCreatedData(container: Container, invocationNamespace: String, action: ExecutableWhiskAction)
+object BasicContainerInfo extends DefaultJsonProtocol {
+ implicit val prewarmedPoolSerdes = jsonFormat4(BasicContainerInfo.apply)
+}
+
+sealed case class BasicContainerInfo(containerId: String, namespace: String, action: String, kind: String)
+
+sealed abstract class ContainerAvailableData(container: Container,
+ invocationNamespace: String,
+ action: ExecutableWhiskAction)
extends Data(action.limits.memory.megabytes.MB) {
override def getContainer = Some(container)
+
+ val basicContainerInfo =
+ BasicContainerInfo(container.containerId.asString, invocationNamespace, action.name.asString, action.exec.kind)
}
+case class ContainerCreatedData(container: Container, invocationNamespace: String, action: ExecutableWhiskAction)
+ extends ContainerAvailableData(container, invocationNamespace, action)
+
case class InitializedData(container: Container,
invocationNamespace: String,
action: ExecutableWhiskAction,
override val clientProxy: ActorRef)
- extends Data(action.limits.memory.megabytes.MB)
+ extends ContainerAvailableData(container, invocationNamespace, action)
with WithClient {
override def getContainer = Some(container)
def toReschedulingData(resumeRun: RunActivation) =
@@ -149,7 +163,7 @@ case class WarmData(container: Container,
revision: DocRevision,
lastUsed: Instant,
override val clientProxy: ActorRef)
- extends Data(action.limits.memory.megabytes.MB)
+ extends ContainerAvailableData(container, invocationNamespace, action)
with WithClient {
override def getContainer = Some(container)
def toReschedulingData(resumeRun: RunActivation) =
@@ -159,12 +173,10 @@ case class WarmData(container: Container,
case class ReschedulingData(container: Container,
invocationNamespace: String,
action: ExecutableWhiskAction,
- override val clientProxy: ActorRef,
+ clientProxy: ActorRef,
resumeRun: RunActivation)
- extends Data(action.limits.memory.megabytes.MB)
- with WithClient {
- override def getContainer = Some(container)
-}
+ extends ContainerAvailableData(container, invocationNamespace, action)
+ with WithClient
class FunctionPullingContainerProxy(
factory: (TransactionId,
@@ -628,8 +640,8 @@ class FunctionPullingContainerProxy(
Some(instance),
Some(data.container.containerId)))
- // Just send CloseClientProxy to ActivationClientProxy, make ActivationClientProxy throw ClientClosedException when fetchActivation next time.
- data.clientProxy ! CloseClientProxy
+ // Just send GracefulShutdown to ActivationClientProxy, make ActivationClientProxy throw ClientClosedException when fetchActivation next time.
+ data.clientProxy ! GracefulShutdown
stay
case x: Event if x.event != PingCache => delay
@@ -774,7 +786,6 @@ class FunctionPullingContainerProxy(
case Event(Remove | GracefulShutdown, _) =>
stay()
-
case Event(DetermineKeepContainer(_), _) =>
stay()
}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
index 6665e6373..b0a4ad80e 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
@@ -262,7 +262,7 @@ case class HealthActivationServiceClient() extends Actor {
case _ => // do nothing
}
- case CloseClientProxy =>
+ case GracefulShutdown =>
closed = true
}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala
index f3503b55e..1321d8add 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala
@@ -46,11 +46,11 @@ class DefaultInvokerServer(val invoker: InvokerCore, systemUsername: String, sys
super.routes ~ extractCredentials {
case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
(path("enable") & post) {
- invoker.enable()
+ complete(invoker.enable())
} ~ (path("disable") & post) {
- invoker.disable()
+ complete(invoker.disable())
} ~ (path("isEnabled") & get) {
- invoker.isEnabled()
+ complete(invoker.isEnabled())
}
case _ => terminate(StatusCodes.Unauthorized)
}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
index 67660e8a6..dd6198a34 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
@@ -20,8 +20,8 @@ package org.apache.openwhisk.core.invoker
import akka.Done
import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, CoordinatedShutdown, Props}
import akka.grpc.GrpcClientSettings
-import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.server.Route
+import akka.pattern.ask
+import akka.util.Timeout
import com.ibm.etcd.api.Event.EventType
import com.ibm.etcd.client.kv.KvClient.Watch
import com.ibm.etcd.client.kv.WatchUpdate
@@ -31,7 +31,7 @@ import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.containerpool._
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
import org.apache.openwhisk.core.containerpool.v2._
-import org.apache.openwhisk.core.database.{UserContext, _}
+import org.apache.openwhisk.core.database._
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
import org.apache.openwhisk.core.etcd.EtcdKV.QueueKeys.queue
@@ -373,28 +373,33 @@ class FPCInvokerReactive(config: WhiskConfig,
maxPeek,
sendAckToScheduler))
- override def enable(): Route = {
+ override def enable(): String = {
invokerHealthManager ! Enable
pool ! Enable
warmUp()
- complete("Success enable invoker")
+ s"${instance.toString} is now enabled."
}
- override def disable(): Route = {
+ override def disable(): String = {
invokerHealthManager ! GracefulShutdown
pool ! GracefulShutdown
warmUpWatcher.foreach(_.close())
warmUpWatcher = None
- complete("Successfully disabled invoker")
+ s"${instance.toString} is now disabled."
}
- override def isEnabled(): Route = {
- complete(InvokerEnabled(warmUpWatcher.nonEmpty).serialize())
+ override def getPoolState(): Future[Either[NotSupportedPoolState, TotalContainerPoolState]] = {
+ implicit val timeout: Timeout = 5.seconds
+ (pool ? GetState).mapTo[TotalContainerPoolState].map(Right(_))
}
- override def backfillPrewarm(): Route = {
+ override def isEnabled(): String = {
+ InvokerEnabled(warmUpWatcher.nonEmpty).serialize()
+ }
+
+ override def backfillPrewarm(): String = {
pool ! AdjustPrewarmedContainer
- complete("backfilling prewarm container")
+ "backfilling prewarm container"
}
private val warmUpFetchRequest = FetchRequest(
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala
index 61d194f4b..88c8b4076 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala
@@ -27,6 +27,8 @@ import org.apache.openwhisk.http.BasicRasService
import org.apache.openwhisk.http.ErrorResponse.terminate
import pureconfig.loadConfigOrThrow
import spray.json.PrettyPrinter
+import spray.json.DefaultJsonProtocol._
+import spray.json._
import scala.concurrent.ExecutionContext
@@ -46,11 +48,31 @@ class FPCInvokerServer(val invoker: InvokerCore, systemUsername: String, systemP
super.routes ~ extractCredentials {
case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
(path("enable") & post) {
- invoker.enable()
+ complete(invoker.enable())
} ~ (path("disable") & post) {
- invoker.disable()
+ complete(invoker.disable())
} ~ (path("isEnabled") & get) {
- invoker.isEnabled()
+ complete(invoker.isEnabled())
+ } ~ (pathPrefix("pool") & get) {
+ pathEndOrSingleSlash {
+ complete {
+ invoker.getPoolState().map {
+ case Right(poolState) =>
+ poolState.serialize()
+ case Left(value) =>
+ value.serialize()
+ }
+ }
+ } ~ (path("count") & get) {
+ complete {
+ invoker.getPoolState().map {
+ case Right(poolState) =>
+ (poolState.busyPool.total + poolState.pausedPool.total + poolState.inProgressCount).toJson.compactPrint
+ case Left(value) =>
+ value.serialize()
+ }
+ }
+ }
}
case _ => terminate(StatusCodes.Unauthorized)
}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
index 997825865..8319c511b 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
@@ -19,13 +19,13 @@ package org.apache.openwhisk.core.invoker
import akka.Done
import akka.actor.{ActorSystem, CoordinatedShutdown}
-import akka.http.scaladsl.server.Route
import com.typesafe.config.ConfigValueFactory
import kamon.Kamon
import org.apache.openwhisk.common.Https.HttpsConfig
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.WhiskConfig._
import org.apache.openwhisk.core.connector.{MessageProducer, MessagingProvider}
+import org.apache.openwhisk.core.containerpool.v2.{NotSupportedPoolState, TotalContainerPoolState}
import org.apache.openwhisk.core.containerpool.{Container, ContainerPoolConfig}
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
@@ -246,10 +246,11 @@ trait InvokerProvider extends Spi {
// this trait can be used to add common implementation
trait InvokerCore {
- def enable(): Route
- def disable(): Route
- def isEnabled(): Route
- def backfillPrewarm(): Route
+ def enable(): String
+ def disable(): String
+ def isEnabled(): String
+ def backfillPrewarm(): String
+ def getPoolState(): Future[Either[NotSupportedPoolState, TotalContainerPoolState]]
}
/**
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
index 31d1f2293..d7aae4a3c 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
@@ -19,20 +19,19 @@ package org.apache.openwhisk.core.invoker
import java.nio.charset.StandardCharsets
import java.time.Instant
+
import akka.Done
import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, CoordinatedShutdown, Props}
import akka.event.Logging.InfoLevel
-import akka.http.scaladsl.server.Directives.complete
-import akka.http.scaladsl.server.Route
import org.apache.openwhisk.common._
import org.apache.openwhisk.common.tracing.WhiskTracerProvider
import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.containerpool._
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
-import org.apache.openwhisk.core.database.{UserContext, _}
+import org.apache.openwhisk.core.containerpool.v2.{NotSupportedPoolState, TotalContainerPoolState}
+import org.apache.openwhisk.core.database._
import org.apache.openwhisk.core.entity._
-import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.http.Messages
@@ -304,32 +303,36 @@ class InvokerReactive(
private var healthScheduler: Option[ActorRef] = Some(getHealthScheduler)
- override def enable(): Route = {
+ override def enable(): String = {
if (healthScheduler.isEmpty) {
healthScheduler = Some(getHealthScheduler)
- complete(s"${instance.toString} is now enabled.")
+ s"${instance.toString} is now enabled."
} else {
- complete(s"${instance.toString} is already enabled.")
+ s"${instance.toString} is already enabled."
}
}
- override def disable(): Route = {
+ override def disable(): String = {
pingController(isEnabled = false)
if (healthScheduler.nonEmpty) {
actorSystem.stop(healthScheduler.get)
healthScheduler = None
- complete(s"${instance.toString} is now disabled.")
+ s"${instance.toString} is now disabled."
} else {
- complete(s"${instance.toString} is already disabled.")
+ s"${instance.toString} is already disabled."
}
}
- override def isEnabled(): Route = {
- complete(InvokerEnabled(healthScheduler.nonEmpty).serialize())
+ override def isEnabled(): String = {
+ InvokerEnabled(healthScheduler.nonEmpty).serialize()
+ }
+
+ override def backfillPrewarm(): String = {
+ "not supported"
}
- override def backfillPrewarm(): Route = {
- complete("not supported")
+ override def getPoolState(): Future[Either[NotSupportedPoolState, TotalContainerPoolState]] = {
+ Future.successful(Left(NotSupportedPoolState()))
}
}
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala
index 4fd2f9b69..aec923508 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala
@@ -49,23 +49,29 @@ class FPCSchedulerServer(scheduler: SchedulerCore, systemUsername: String, syste
complete {
scheduler.getState.map {
case (list, creationCount) =>
- (list
- .map(scheduler => scheduler._1.asString -> scheduler._2.toString)
- .toMap
- ++ Map("creationCount" -> creationCount.toString)).toJson.asJsObject
+ val sum = list.map(tuple => tuple._2).sum
+ (Map("queue" -> sum.toString) ++ Map("creationCount" -> creationCount.toString)).toJson
}
}
} ~ (path("disable") & post) {
logger.warn(this, "Scheduler is disabled")
scheduler.disable()
complete("scheduler disabled")
- } ~ (path(FPCSchedulerServer.queuePathPrefix / "total") & get) {
- complete {
- scheduler.getQueueSize.map(_.toString)
+ } ~ (pathPrefix(FPCSchedulerServer.queuePathPrefix) & get) {
+ pathEndOrSingleSlash {
+ complete(scheduler.getQueueStatusData.map(s => s.toJson))
+ } ~ (path("count") & get) {
+ complete(scheduler.getQueueSize.map(s => s.toJson))
}
- } ~ (path(FPCSchedulerServer.queuePathPrefix / "status") & get) {
- complete {
- scheduler.getQueueStatusData.map(s => s.toJson)
+ } ~ (path("activation" / "count") & get) {
+ pathEndOrSingleSlash {
+ complete(
+ scheduler.getQueueStatusData
+ .map { s =>
+ s.map(_.waitingActivation.size)
+ }
+ .map(a => a.sum)
+ .map(_.toJson))
}
}
case _ =>
@@ -79,7 +85,7 @@ object FPCSchedulerServer {
private val schedulerUsername = loadConfigOrThrow[String](ConfigKeys.whiskSchedulerUsername)
private val schedulerPassword = loadConfigOrThrow[String](ConfigKeys.whiskSchedulerPassword)
- private val queuePathPrefix = "queue"
+ private val queuePathPrefix = "queues"
def instance(scheduler: SchedulerCore)(implicit ec: ExecutionContext,
actorSystem: ActorSystem,
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
index 2038fc1c3..e3ed70d8a 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
@@ -127,11 +127,11 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
}
override def getQueueSize: Future[Int] = {
- queueManager.ask(QueueSize)(Timeout(5.seconds)).mapTo[Int]
+ queueManager.ask(QueueSize)(Timeout(1.minute)).mapTo[Int]
}
override def getQueueStatusData: Future[List[StatusData]] = {
- queueManager.ask(StatusQuery)(Timeout(5.seconds)).mapTo[Future[List[StatusData]]].flatten
+ queueManager.ask(GetState)(Timeout(1.minute)).mapTo[Future[List[StatusData]]].flatten
}
override def disable(): Unit = {
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala
index acf311e9f..8a3db898a 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala
@@ -72,6 +72,8 @@ class ActivationServiceImpl()(implicit actorSystem: ActorSystem, logging: Loggin
QueuePool.get(MemoryQueueKey(request.invocationNamespace, key)) match {
case Some(queueValue) =>
implicit val transid = TransactionId.serdes.read(request.transactionId.parseJson)
+ if (!request.alive) logging.info(this, s"the container(${request.containerId}) is not alive")
+
(queueValue.queue ? GetActivation(
transid,
fqn,
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
index 7f174f9aa..28ab22210 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
@@ -46,6 +46,7 @@ import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcu
import pureconfig.loadConfigOrThrow
import spray.json._
import pureconfig.generic.auto._
+import scala.collection.JavaConverters._
import java.time.{Duration, Instant}
import java.util.concurrent.atomic.AtomicInteger
@@ -69,17 +70,30 @@ case object NamespaceThrottled extends MemoryQueueState
// Data
sealed abstract class MemoryQueueData()
-case class NoData() extends MemoryQueueData()
-case class NoActors() extends MemoryQueueData()
-case class RunningData(schedulerActor: ActorRef, droppingActor: ActorRef) extends MemoryQueueData()
-case class ThrottledData(schedulerActor: ActorRef, droppingActor: ActorRef) extends MemoryQueueData()
+case class NoData() extends MemoryQueueData() {
+ override def toString = "NoData"
+}
+case class NoActors() extends MemoryQueueData() {
+ override def toString = "NoActors"
+}
+case class RunningData(schedulerActor: ActorRef, droppingActor: ActorRef) extends MemoryQueueData() {
+ override def toString = "RunningData"
+}
+case class ThrottledData(schedulerActor: ActorRef, droppingActor: ActorRef) extends MemoryQueueData() {
+ override def toString = "ThrottledData"
+}
case class FlushingData(schedulerActor: ActorRef,
droppingActor: ActorRef,
error: ContainerCreationError,
reason: String,
activeDuringFlush: Boolean = false)
- extends MemoryQueueData()
-case class RemovingData(schedulerActor: ActorRef, droppingActor: ActorRef, outdated: Boolean) extends MemoryQueueData()
+ extends MemoryQueueData() {
+ override def toString = s"ThrottledData(error: $error, reason: $reason, activeDuringFlush: $activeDuringFlush)"
+}
+case class RemovingData(schedulerActor: ActorRef, droppingActor: ActorRef, outdated: Boolean)
+ extends MemoryQueueData() {
+ override def toString = s"RemovingData(outdated: $outdated)"
+}
// Events sent by the actor
case class QueueRemoved(invocationNamespace: String, action: DocInfo, leaderKey: Option[String])
@@ -154,8 +168,8 @@ class MemoryQueue(private val etcdClient: EtcdClient,
private val staleQueueRemovedMsg = QueueRemoved(invocationNamespace, action.toDocId.asDocInfo(revision), None)
private val actionRetentionTimeout = MemoryQueue.getRetentionTimeout(actionMetaData, queueConfig)
- private[queue] var containers = Set.empty[String]
- private[queue] var creationIds = Set.empty[String]
+ private[queue] var containers = java.util.concurrent.ConcurrentHashMap.newKeySet[String]().asScala
+ private[queue] var creationIds = java.util.concurrent.ConcurrentHashMap.newKeySet[String]().asScala
private[queue] var queue = Queue.empty[TimeSeriesActivationEntry]
private[queue] var in = new AtomicInteger(0)
@@ -565,8 +579,13 @@ class MemoryQueue(private val etcdClient: EtcdClient,
stay
// common case for all statuses
- case Event(StatusQuery, _) =>
- sender ! StatusData(invocationNamespace, action.asString, queue.size, stateName.toString, stateData.toString)
+ case Event(GetState, _) =>
+ sender ! StatusData(
+ invocationNamespace,
+ action.asString,
+ queue.toList.map(_.msg.activationId),
+ stateName.toString,
+ stateData.toString)
stay
// Common case for all cases
@@ -575,9 +594,6 @@ class MemoryQueue(private val etcdClient: EtcdClient,
// delete relative data, e.g leaderKey, namespaceThrottlingKey, actionThrottlingKey
cleanUpData()
- // let queue manager knows this queue is going to stop and let it forward incoming activations to a new queue
- context.parent ! queueRemovedMsg
-
goto(Removing) using getRemovingData(data, outdated = false)
// the version is updated. it's a shared case for all states
@@ -662,7 +678,6 @@ class MemoryQueue(private val etcdClient: EtcdClient,
private def cleanUpDataAndGotoRemoved() = {
cleanUpWatcher()
cleanUpData()
- context.parent ! queueRemovedMsg
goto(Removed) using NoData()
}
@@ -671,8 +686,6 @@ class MemoryQueue(private val etcdClient: EtcdClient,
cleanUpActors(data)
cleanUpData()
- context.parent ! queueRemovedMsg
-
goto(Removed) using NoData()
}
@@ -688,7 +701,6 @@ class MemoryQueue(private val etcdClient: EtcdClient,
// let the container manager know this version of containers are outdated.
containerManager ! ContainerDeletion(invocationNamespace, action, revision, actionMetaData)
}
- self ! QueueRemovedCompleted
goto(Removed) using NoData()
} else {
@@ -1017,7 +1029,8 @@ class MemoryQueue(private val etcdClient: EtcdClient,
queue = newQueue
logging.info(
this,
- s"[$invocationNamespace:$action:$stateName] Get activation request ${request.containerId}, send one message: ${msg.activationId}")
+ s"[$invocationNamespace:$action:$stateName] Get activation request ${request.containerId}, send one message: ${msg.activationId}")(
+ msg.transid)
val totalTimeInScheduler = Interval(msg.transid.meta.start, Instant.now()).duration
MetricEmitter.emitHistogramMetric(
LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString),
@@ -1051,7 +1064,8 @@ class MemoryQueue(private val etcdClient: EtcdClient,
case Right(msg) =>
logging.info(
this,
- s"[$invocationNamespace:$action:$stateName] Send msg ${msg.activationId} to waiting request ${request.containerId}")
+ s"[$invocationNamespace:$action:$stateName] Send msg ${msg.activationId} to waiting request ${request.containerId}")(
+ msg.transid)
cancelPoll.cancel()
case Left(_) => // do nothing
}
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
index d87338dd3..ad7b17103 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
@@ -261,11 +261,11 @@ class QueueManager(
case QueueSize =>
sender ! QueuePool.size
- case StatusQuery =>
- val poolStatus = Future.sequence {
- QueuePool.values.map(_.queue.ask(StatusQuery).mapTo[StatusData])
- }
- sender ! poolStatus
+ case GetState =>
+ val result =
+ Future.sequence(QueuePool.values.map(_.queue.ask(GetState)(Timeout(5.seconds)).mapTo[StatusData]).toList)
+
+ sender ! result
case msg =>
logging.error(this, s"failed to elect a leader for ${msg}")
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala
index 77cba01e7..9d33a508b 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala
@@ -24,7 +24,7 @@ import akka.grpc.internal.ClientClosedException
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import common.StreamLogging
import io.grpc.StatusRuntimeException
-import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.common.{GracefulShutdown, TransactionId}
import org.apache.openwhisk.core.connector.ActivationMessage
import org.apache.openwhisk.core.containerpool.ContainerId
import org.apache.openwhisk.core.containerpool.v2._
@@ -37,9 +37,9 @@ import org.apache.openwhisk.grpc
import org.apache.openwhisk.grpc.{ActivationServiceClient, FetchRequest, RescheduleRequest, RescheduleResponse}
import org.junit.runner.RunWith
import org.scalamock.scalatest.MockFactory
+import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
-import org.scalatest.concurrent.ScalaFutures
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Future
@@ -349,7 +349,7 @@ class ActivationClientProxyTests
probe expectTerminated machine
}
- it should "be closed when it receives a CloseClientProxy message for a normal timeout case" in within(timeout) {
+ it should "be closed when it receives a GracefulShutdown message for a normal timeout case" in within(timeout) {
val fetch = (_: FetchRequest) => Future(grpc.FetchResponse(AResponse(Right(message)).serialize))
val activationClient = MockActivationServiceClient(fetch)
val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int, _: Boolean) => Future(activationClient)
@@ -362,14 +362,14 @@ class ActivationClientProxyTests
registerCallback(machine, probe)
ready(machine, probe)
- machine ! CloseClientProxy
- awaitAssert(activationClient.isClosed shouldBe true)
+ machine ! GracefulShutdown
probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))
machine ! RequestActivation()
probe expectMsg ClientClosed
+ awaitAssert(activationClient.isClosed shouldBe true)
probe expectTerminated machine
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
index 14cf432b5..e1da77e63 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
@@ -33,11 +33,11 @@ import org.apache.openwhisk.core.connector.{
MessageProducer,
ResultMetadata
}
-import org.apache.openwhisk.core.containerpool.docker.DockerContainer
import org.apache.openwhisk.core.containerpool.v2._
import org.apache.openwhisk.core.containerpool.{
Container,
ContainerAddress,
+ ContainerId,
ContainerPoolConfig,
ContainerRemoved,
PrewarmContainerCreationConfig,
@@ -137,9 +137,15 @@ class FunctionPullingContainerPoolTests
private val schedulerInstanceId = SchedulerInstanceId("0")
private val producer = stub[MessageProducer]
private val prewarmedData = PreWarmData(mock[MockableV2Container], actionKind, memoryLimit)
+ private val mockContainer = mock[MockableV2Container]
+ (mockContainer.containerId _: () => ContainerId)
+ .expects()
+ .returning(ContainerId("test-container-id"))
+ .anyNumberOfTimes()
+
private val initializedData =
InitializedData(
- mock[MockableV2Container],
+ mockContainer,
invocationNamespace.asString,
whiskAction.toExecutableWhiskAction.get,
TestProbe().ref)
@@ -315,6 +321,11 @@ class FunctionPullingContainerPoolTests
private def retry[T](fn: => T) = org.apache.openwhisk.utils.retry(fn, 10, Some(1.second))
it should "stop containers gradually when shut down" in within(timeout * 20) {
+ (mockContainer.containerId _: () => ContainerId)
+ .expects()
+ .returning(ContainerId("test-container-id"))
+ .anyNumberOfTimes()
+
val (containers, factory) = testContainers(10)
val disablingContainers = ListBuffer[ActorRef]()
@@ -356,7 +367,7 @@ class FunctionPullingContainerPoolTests
pool,
ContainerIsPaused(
WarmData(
- stub[DockerContainer],
+ mockContainer,
invocationNamespace.asString,
whiskAction.toExecutableWhiskAction.get,
doc.rev,
@@ -636,6 +647,10 @@ class FunctionPullingContainerPoolTests
}
it should "use a warmed container when invocationNamespace, action and revision matched" in within(timeout) {
+ (mockContainer.containerId _: () => ContainerId)
+ .expects()
+ .returning(ContainerId("test-container-id"))
+ .anyNumberOfTimes()
val (containers, factory) = testContainers(3)
val doc = put(entityStore, whiskAction)
@@ -654,7 +669,7 @@ class FunctionPullingContainerPoolTests
pool.tell(
ContainerIsPaused(
WarmData(
- stub[DockerContainer],
+ mockContainer,
invocationNamespace.asString,
whiskAction.toExecutableWhiskAction.get,
doc.rev,
@@ -687,6 +702,10 @@ class FunctionPullingContainerPoolTests
}
it should "retry when chosen warmed container is failed to resume" in within(timeout) {
+ (mockContainer.containerId _: () => ContainerId)
+ .expects()
+ .returning(ContainerId("test-container-id"))
+ .anyNumberOfTimes()
val (containers, factory) = testContainers(2)
val doc = put(entityStore, whiskAction)
@@ -706,7 +725,7 @@ class FunctionPullingContainerPoolTests
pool.tell(
ContainerIsPaused(
WarmData(
- stub[DockerContainer],
+ mockContainer,
invocationNamespace.asString,
whiskAction.toExecutableWhiskAction.get,
doc.rev,
@@ -724,7 +743,7 @@ class FunctionPullingContainerPoolTests
pool.tell(
ResumeFailed(
WarmData(
- stub[DockerContainer],
+ mockContainer,
invocationNamespace.asString,
whiskAction.toExecutableWhiskAction.get,
doc.rev,
@@ -739,6 +758,10 @@ class FunctionPullingContainerPoolTests
}
it should "remove oldest previously used container to make space for the job passed to run" in within(timeout) {
+ (mockContainer.containerId _: () => ContainerId)
+ .expects()
+ .returning(ContainerId("test-container-id"))
+ .anyNumberOfTimes()
val (containers, factory) = testContainers(2)
val doc = put(entityStore, whiskAction)
@@ -757,7 +780,7 @@ class FunctionPullingContainerPoolTests
pool.tell(
ContainerIsPaused(
WarmData(
- stub[DockerContainer],
+ mockContainer,
invocationNamespace.asString,
whiskAction.toExecutableWhiskAction.get,
doc.rev,
@@ -769,7 +792,7 @@ class FunctionPullingContainerPoolTests
pool.tell(
ContainerIsPaused(
WarmData(
- stub[DockerContainer],
+ mockContainer,
invocationNamespace.asString,
whiskAction.toExecutableWhiskAction.get,
doc.rev,
@@ -781,7 +804,7 @@ class FunctionPullingContainerPoolTests
pool.tell(
ContainerIsPaused(
WarmData(
- stub[DockerContainer],
+ mockContainer,
invocationNamespace.asString,
whiskAction.toExecutableWhiskAction.get,
doc.rev,
@@ -854,6 +877,10 @@ class FunctionPullingContainerPoolTests
}
it should "send ack(success) to scheduler when chosen warmed container is resumed" in within(timeout) {
+ (mockContainer.containerId _: () => ContainerId)
+ .expects()
+ .returning(ContainerId("test-container-id"))
+ .anyNumberOfTimes()
val (containers, factory) = testContainers(1)
val doc = put(entityStore, whiskAction)
// Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled.
@@ -878,7 +905,7 @@ class FunctionPullingContainerPoolTests
pool.tell(
ContainerIsPaused(
WarmData(
- stub[DockerContainer],
+ mockContainer,
invocationNamespace.asString,
whiskAction.toExecutableWhiskAction.get,
doc.rev,
@@ -894,7 +921,7 @@ class FunctionPullingContainerPoolTests
pool.tell(
Resumed(
WarmData(
- stub[DockerContainer],
+ mockContainer,
invocationNamespace.asString,
whiskAction.toExecutableWhiskAction.get,
doc.rev,
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
index 2355356bf..04516d4a8 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
@@ -44,9 +44,9 @@ import org.apache.openwhisk.core.containerpool.{
}
import org.apache.openwhisk.core.database.{ArtifactStore, StaleParameter, UserContext}
import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
+import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.entity.types.AuthStore
-import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.etcd.EtcdClient
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys
import org.apache.openwhisk.core.etcd.EtcdType._
@@ -1607,7 +1607,7 @@ class FunctionPullingContainerProxyTests
UnregisterData(ContainerKeys
.existingContainers(invocationNamespace.asString, fqn, action.rev, Some(instanceId), Some(containerId))))
- client.expectMsg(CloseClientProxy)
+ client.expectMsg(GracefulShutdown)
client.send(machine, ClientClosed)
probe.expectMsgAllOf(ContainerRemoved(false), Transition(machine, Running, Removing))
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
index 13f957211..a9a72f460 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
@@ -320,7 +320,7 @@ class DegenerateLoadBalancerService(config: WhiskConfig)(implicit ec: ExecutionC
override def totalActiveActivations = Future.successful(0)
override def activeActivationsFor(namespace: UUID) = Future.successful(0)
override def activeActivationsByController(controller: String): Future[Int] = Future.successful(0)
- override def activeActivationsByController: Future[List[ActivationId]] = Future.successful(List(ActivationId("id")))
+ override def activeActivationsByController: Future[List[(String, String)]] = Future.successful(List(("", "")))
override def activeActivationsByInvoker(invoker: String): Future[Int] = Future.successful(0)
override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
index 76f1410a7..ea5e73b50 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
@@ -24,6 +24,7 @@ import akka.http.scaladsl.testkit.ScalatestRouteTest
import akka.http.scaladsl.unmarshalling.Unmarshal
import common.StreamLogging
import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.containerpool.v2.{NotSupportedPoolState, TotalContainerPoolState}
import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
import org.apache.openwhisk.core.invoker.{DefaultInvokerServer, InvokerCore}
import org.apache.openwhisk.http.BasicHttpService
@@ -32,6 +33,8 @@ import org.scalamock.scalatest.MockFactory
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec, Matchers}
import org.scalatest.junit.JUnitRunner
+import scala.concurrent.Future
+
/**
* Tests InvokerServer API.
*/
@@ -135,22 +138,27 @@ class TestInvokerReactive extends InvokerCore with BasicHttpService {
var enableCount = 0
var disableCount = 0
- override def enable(): Route = {
+ override def enable(): String = {
enableCount += 1
- complete("")
+ s""
}
- override def disable(): Route = {
+ override def disable(): String = {
disableCount += 1
- complete("")
+ s""
}
- override def isEnabled(): Route = {
+ override def isEnabled(): String = {
complete(InvokerEnabled(true).serialize())
+ s""
+ }
+
+ override def backfillPrewarm(): String = {
+ ""
}
- override def backfillPrewarm(): Route = {
- complete("")
+ override def getPoolState(): Future[Either[NotSupportedPoolState, TotalContainerPoolState]] = {
+ Future.successful(Left(NotSupportedPoolState()))
}
def reset(): Unit = {
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
index 2393b8775..cf298fb5f 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
@@ -24,6 +24,7 @@ import akka.http.scaladsl.testkit.ScalatestRouteTest
import akka.http.scaladsl.unmarshalling.Unmarshal
import common.StreamLogging
import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.containerpool.v2.{NotSupportedPoolState, TotalContainerPoolState}
import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
import org.apache.openwhisk.core.invoker.{FPCInvokerServer, InvokerCore}
import org.apache.openwhisk.http.BasicHttpService
@@ -32,6 +33,8 @@ import org.scalamock.scalatest.MockFactory
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec, Matchers}
import org.scalatest.junit.JUnitRunner
+import scala.concurrent.Future
+
/**
* Tests InvokerServerV2 API.
*/
@@ -134,22 +137,27 @@ class TestFPCInvokerReactive extends InvokerCore with BasicHttpService {
var enableCount = 0
var disableCount = 0
- override def enable(): Route = {
+ override def enable(): String = {
enableCount += 1
- complete("")
+ ""
}
- override def disable(): Route = {
+ override def disable(): String = {
disableCount += 1
- complete("")
+ ""
}
- override def isEnabled(): Route = {
+ override def isEnabled(): String = {
complete(InvokerEnabled(true).serialize())
+ ""
+ }
+
+ override def backfillPrewarm(): String = {
+ ""
}
- override def backfillPrewarm(): Route = {
- complete("")
+ override def getPoolState(): Future[Either[NotSupportedPoolState, TotalContainerPoolState]] = {
+ Future.successful(Left(NotSupportedPoolState()))
}
def reset(): Unit = {
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala
index 0dab4f4ce..26dc33869 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala
@@ -25,7 +25,7 @@ import akka.http.scaladsl.testkit.ScalatestRouteTest
import common.StreamLogging
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.connector.StatusData
-import org.apache.openwhisk.core.entity.SchedulerInstanceId
+import org.apache.openwhisk.core.entity.{ActivationId, SchedulerInstanceId}
import org.junit.runner.RunWith
import org.scalamock.scalatest.MockFactory
import org.scalatest.junit.JUnitRunner
@@ -56,9 +56,10 @@ class FPCSchedulerServerTests
val queues = List((SchedulerInstanceId("0"), 2), (SchedulerInstanceId("1"), 3))
val creationCount = 1
val testQueueSize = 2
+ val activationIds = (1 to 10).map(_ => ActivationId.generate()).toList
val statusDatas = List(
- StatusData("testns1", "testaction1", 10, "Running", "RunningData"),
- StatusData("testns2", "testaction2", 5, "Running", "RunningData"))
+ StatusData("testns1", "testaction1", activationIds, "Running", "RunningData"),
+ StatusData("testns2", "testaction2", activationIds.take(5), "Running", "RunningData"))
// Create scheduler
val scheduler = new TestScheduler(queues, creationCount, testQueueSize, statusDatas)
@@ -85,8 +86,8 @@ class FPCSchedulerServerTests
val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
Get(s"/state") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
status should be(OK)
- responseAs[JsObject] shouldBe (queues.map(s => s._1.asString -> s._2.toString).toMap ++ Map(
- "creationCount" -> creationCount.toString)).toJson
+ responseAs[JsObject] shouldBe (Map("creationCount" -> creationCount.toString) ++ Map(
+ "queue" -> queues.map(_._2).sum.toString)).toJson
}
}
@@ -94,7 +95,7 @@ class FPCSchedulerServerTests
it should "get total queue" in {
implicit val tid = transid()
val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
- Get(s"/queue/total") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+ Get(s"/queues/count") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
status should be(OK)
responseAs[String] shouldBe testQueueSize.toString
}
@@ -104,7 +105,7 @@ class FPCSchedulerServerTests
it should "get all queue status" in {
implicit val tid = transid()
val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
- Get(s"/queue/status") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+ Get(s"/queues") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
status should be(OK)
responseAs[List[JsObject]] shouldBe statusDatas.map(_.toJson)
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
index 1cb2a5cda..a3cd73e1a 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
@@ -47,6 +47,7 @@ import spray.json.{JsObject, JsString}
import java.time.Instant
import scala.collection.immutable.Queue
+import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration, MILLISECONDS}
import scala.language.postfixOps
@@ -172,9 +173,12 @@ class MemoryQueueFlowTests
expectDataCleanUp(watcher, dataMgmtService)
- parent.expectMsg(queueRemovedMsg)
probe.expectMsg(Transition(fsm, Idle, Removed))
+ // the queue is timed out again in the removed state
+ fsm ! StateTimeout
+
+ parent.expectMsg(queueRemovedMsg)
fsm ! QueueRemovedCompleted
probe.expectTerminated(fsm, 10.seconds)
@@ -284,9 +288,11 @@ class MemoryQueueFlowTests
fsm ! StateTimeout
- parent.expectMsg(queueRemovedMsg)
probe.expectMsg(Transition(fsm, Idle, Removed))
+ // the queue is timed out again in the removed state
+ fsm ! StateTimeout
+
fsm ! QueueRemovedCompleted
expectDataCleanUp(watcher, dataMgmtService)
@@ -369,15 +375,19 @@ class MemoryQueueFlowTests
fsm ! GracefulShutdown
- parent.expectMsg(queueRemovedMsg)
probe.expectMsg(Transition(fsm, NamespaceThrottled, Removing))
- fsm ! QueueRemovedCompleted
+ // the queue is timed out in the Removing state
+ fsm ! StateTimeout
expectDataCleanUp(watcher, dataMgmtService)
probe.expectMsg(Transition(fsm, Removing, Removed))
+ // the queue is timed out again in the Removed state
+ fsm ! StateTimeout
+ fsm ! QueueRemovedCompleted
+
probe.expectTerminated(fsm, 10.seconds)
}
@@ -494,9 +504,12 @@ class MemoryQueueFlowTests
fsm ! StateTimeout
- parent.expectMsg(queueRemovedMsg)
probe.expectMsg(Transition(fsm, Idle, Removed))
+ // the queue is timed out again in the Removed state
+ fsm ! StateTimeout
+
+ parent.expectMsg(queueRemovedMsg)
fsm ! QueueRemovedCompleted
expectDataCleanUp(watcher, dataMgmtService)
@@ -644,9 +657,12 @@ class MemoryQueueFlowTests
probe.expectMsg(Transition(fsm, Running, Idle))
fsm ! StateTimeout
- parent.expectMsg(queueRemovedMsg)
probe.expectMsg(Transition(fsm, Idle, Removed))
+ // the queue is timed out again in the Removed state
+ fsm ! StateTimeout
+ parent.expectMsg(queueRemovedMsg)
+
fsm ! QueueRemovedCompleted
expectDataCleanUp(watcher, dataMgmtService)
@@ -756,9 +772,11 @@ class MemoryQueueFlowTests
fsm ! StateTimeout
- parent.expectMsg(queueRemovedMsg)
probe.expectMsg(Transition(fsm, Idle, Removed))
+ // the queue is timed out again in the Removed state
+ fsm ! StateTimeout
+ parent.expectMsg(queueRemovedMsg)
fsm ! QueueRemovedCompleted
expectDataCleanUp(watcher, dataMgmtService)
@@ -838,6 +856,8 @@ class MemoryQueueFlowTests
expectDataCleanUp(watcher, dataMgmtService)
probe.expectMsg(Transition(fsm, Flushing, Removed))
+ // the queue is timed out again in the Removed state
+ fsm ! StateTimeout
parent.expectMsg(queueRemovedMsg)
fsm ! QueueRemovedCompleted
@@ -935,9 +955,11 @@ class MemoryQueueFlowTests
clock.plusSeconds(flushGrace.toSeconds * 2)
fsm ! StateTimeout
- parent.expectMsg(queueRemovedMsg)
probe.expectMsg(Transition(fsm, Flushing, Removed))
+ // the queue is timed out again in the Removed state
+ fsm ! StateTimeout
+ parent.expectMsg(queueRemovedMsg)
fsm ! QueueRemovedCompleted
expectDataCleanUp(watcher, dataMgmtService)
@@ -1032,15 +1054,17 @@ class MemoryQueueFlowTests
container.send(fsm, getActivation(false))
container.expectMsg(ActivationResponse(Left(NoActivationMessage())))
- fsm.underlyingActor.creationIds = Set.empty[String]
+ fsm.underlyingActor.creationIds = mutable.Set.empty[String]
fsm ! StateTimeout
probe.expectMsg(Transition(fsm, Running, Idle))
fsm ! StateTimeout
- parent.expectMsg(queueRemovedMsg)
probe.expectMsg(Transition(fsm, Idle, Removed))
+ // the queue is timed out again in the Removed state
+ fsm ! StateTimeout
+ parent.expectMsg(queueRemovedMsg)
fsm ! QueueRemovedCompleted
expectDataCleanUp(watcher, dataMgmtService)
@@ -1147,9 +1171,11 @@ class MemoryQueueFlowTests
fsm ! StateTimeout
expectDataCleanUp(watcher, dataMgmtService)
- parent.expectMsg(queueRemovedMsg)
probe.expectMsg(Transition(fsm, Flushing, Removed))
+ // the queue is timed out again in the Removed state
+ fsm ! StateTimeout
+ parent.expectMsg(queueRemovedMsg)
fsm ! QueueRemovedCompleted
probe.expectTerminated(fsm, 10.seconds)
@@ -1229,8 +1255,6 @@ class MemoryQueueFlowTests
UnregisterData(namespaceThrottlingKey),
UnregisterData(actionThrottlingKey))
- parent.expectMsg(queueRemovedMsg)
-
probe.expectMsg(Transition(fsm, Running, Removing))
// a newly arrived message should be properly handled
@@ -1240,8 +1264,6 @@ class MemoryQueueFlowTests
fsm ! messages(2)
- fsm ! QueueRemovedCompleted
-
// if there is a message, it should not terminate
fsm ! StateTimeout
@@ -1259,6 +1281,11 @@ class MemoryQueueFlowTests
probe.expectMsg(Transition(fsm, Removing, Removed))
+ // the queue is timed out again in the Removed state
+ fsm ! StateTimeout
+ parent.expectMsg(queueRemovedMsg)
+ fsm ! QueueRemovedCompleted
+
probe.expectTerminated(fsm, 10.seconds)
}
@@ -1328,7 +1355,6 @@ class MemoryQueueFlowTests
// another queue is already running
fsm ! InitialDataStorageResults(`leaderKey`, Left(AlreadyExist()))
-
parent.expectMsg(queueRemovedMsg)
parent.expectMsg(message)
@@ -1341,6 +1367,9 @@ class MemoryQueueFlowTests
// move to the Deprecated state
probe.expectMsg(Transition(fsm, state, Removed))
+ // the queue is timed out again in the Removed state
+ fsm ! StateTimeout
+ parent.expectMsg(queueRemovedMsg)
fsm ! QueueRemovedCompleted
probe.expectTerminated(fsm, 10.seconds)
@@ -1431,6 +1460,7 @@ class MemoryQueueFlowTests
// the queue is supposed to send queueRemovedMsg once again and stops itself.
parent.expectMsg(queueRemovedMsg)
+ fsm ! QueueRemovedCompleted
probe.expectTerminated(fsm, 10.seconds)
}
}
@@ -1501,7 +1531,7 @@ class MemoryQueueFlowTests
fsm.setState(state, FlushingData(schedulingActors.ref, schedulingActors.ref, WhiskError, "whisk error"))
case Removing =>
- fsm.underlyingActor.containers = Set(testContainerId)
+ fsm.underlyingActor.containers = mutable.Set(testContainerId)
fsm ! message
fsm.setState(state, RemovingData(schedulingActors.ref, schedulingActors.ref, outdated = true))
@@ -1537,7 +1567,7 @@ class MemoryQueueFlowTests
container.send(fsm, getActivation())
container.expectMsg(ActivationResponse(Right(message)))
// has no old containers for old queue, so send the message to queueManager
- fsm.underlyingActor.containers = Set.empty[String]
+ fsm.underlyingActor.containers = mutable.Set.empty[String]
fsm.underlyingActor.queue =
Queue.apply(TimeSeriesActivationEntry(Instant.ofEpochMilli(Instant.now.toEpochMilli + 1000), message))
fsm ! StopSchedulingAsOutdated
@@ -1742,20 +1772,20 @@ class MemoryQueueFlowTests
UnwatchEndpoint(existingContainerKey, isPrefix = true, watcherName),
UnwatchEndpoint(leaderKey, isPrefix = false, watcherName))
- // queue is stale and will be removed
- parent.expectMsg(queueRemovedMsg)
-
probe.expectMsg(Transition(fsm, state, Removed))
+ // the queue is timed out againd in the Removed state
+ fsm ! StateTimeout
+
+ // queue is stale and will be removed
+ parent.expectMsg(queueRemovedMsg)
fsm ! QueueRemovedCompleted
probe.expectTerminated(fsm, 10.seconds)
case _ =>
// queue is stale and will be removed
- parent.expectMsg(queueRemovedMsg)
- probe.expectMsg(Transition(fsm, state, Removing))
- fsm ! QueueRemovedCompleted
+ probe.expectMsg(Transition(fsm, state, Removing))
// queue should not be terminated as there is an activation
fsm ! StateTimeout
@@ -1771,6 +1801,11 @@ class MemoryQueueFlowTests
UnwatchEndpoint(leaderKey, isPrefix = false, watcherName))
probe.expectMsg(Transition(fsm, Removing, Removed))
+
+ // the queue is timed out againd in the Removed state
+ fsm ! StateTimeout
+ parent.expectMsg(queueRemovedMsg)
+ fsm ! QueueRemovedCompleted
probe.expectTerminated(fsm, 10.seconds)
}
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
index 711847269..d7ec5afd1 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
@@ -57,6 +57,7 @@ import org.scalatest.junit.JUnitRunner
import spray.json.{JsObject, JsString}
import scala.collection.immutable.Queue
+import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.{higherKinds, postfixOps}
@@ -478,7 +479,7 @@ class MemoryQueueTests
probe2 watch fsm
// do not remove itself when there are still existing containers
- fsm.underlyingActor.containers = Set("1")
+ fsm.underlyingActor.containers = mutable.Set("1")
fsm.setState(Running, RunningData(schedulerActor, droppingActor))
expectMsg(Transition(fsm, Uninitialized, Running))
fsm ! StateTimeout
@@ -488,7 +489,7 @@ class MemoryQueueTests
dataManagementService.expectNoMessage()
// change the existing containers count to 0, the StateTimeout should work
- fsm.underlyingActor.containers = Set.empty[String]
+ fsm.underlyingActor.containers = mutable.Set.empty[String]
fsm ! StateTimeout
probe.expectTerminated(schedulerActor)
probe.expectTerminated(droppingActor)
@@ -496,12 +497,15 @@ class MemoryQueueTests
expectMsg(Transition(fsm, Running, Idle))
fsm ! StateTimeout
- parent.expectMsg(QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(revision), Some(leaderKey)))
expectMsg(Transition(fsm, Idle, Removed))
- fsm ! QueueRemovedCompleted
dataManagementService.expectMsg(UnregisterData(leaderKey))
dataManagementService.expectMsg(UnregisterData(namespaceThrottlingKey))
dataManagementService.expectMsg(UnregisterData(actionThrottlingKey))
+
+ // queue is timed out again in the Removed state.
+ fsm ! StateTimeout
+ parent.expectMsg(QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(revision), Some(leaderKey)))
+ fsm ! QueueRemovedCompleted
probe2.expectTerminated(fsm)
fsm.stop()
@@ -562,7 +566,7 @@ class MemoryQueueTests
fsm ! Start
expectMsg(Transition(fsm, Uninitialized, Running))
- fsm.underlyingActor.creationIds = Set.empty[String]
+ fsm.underlyingActor.creationIds = mutable.Set.empty[String]
fsm ! StateTimeout
expectMsg(Transition(fsm, Running, Idle))
@@ -576,8 +580,8 @@ class MemoryQueueTests
.futureValue shouldBe GetActivationResponse(Right(message))
queueRef.queue.length shouldBe 0
- fsm.underlyingActor.containers = Set.empty[String]
- fsm.underlyingActor.creationIds = Set.empty[String]
+ fsm.underlyingActor.containers = mutable.Set.empty[String]
+ fsm.underlyingActor.creationIds = mutable.Set.empty[String]
fsm ! StateTimeout
expectMsg(Transition(fsm, Running, Idle))
(fsm ? GetActivation(tid, fqn, testContainerId, false, None))
@@ -643,7 +647,7 @@ class MemoryQueueTests
fsm ! Start
expectMsg(Transition(fsm, Uninitialized, Running))
- fsm.underlyingActor.creationIds = Set.empty[String]
+ fsm.underlyingActor.creationIds = mutable.Set.empty[String]
fsm ! StateTimeout
expectMsg(Transition(fsm, Running, Idle))
@@ -651,9 +655,13 @@ class MemoryQueueTests
expectMsg(Transition(fsm, Idle, Removed))
queueRef.queue.length shouldBe 0
fsm ! message
- parent.expectMsg(queueRemovedMsg)
+
+ // queue is timed out again in the Removed state.
parent.expectMsg(message)
+ fsm ! StateTimeout
+ parent.expectMsg(queueRemovedMsg)
+
expectNoMessage()
fsm.stop()
@@ -1095,10 +1103,13 @@ class MemoryQueueTests
fsm ! message
probe.expectMsg(ActivationResponse.developerError("nonExecutbleAction error"))
- parent.expectMsg(
+ parent.expectMsgAnyOf(
2 * queueConfig.flushGrace + 5.seconds,
- QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(action.rev), Some(leaderKey)))
- parent.expectMsg(Transition(fsm, Flushing, Removed))
+ QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(action.rev), Some(leaderKey)),
+ Transition(fsm, Flushing, Removed))
+
+ fsm ! StateTimeout
+ parent.expectMsg(queueRemovedMsg)
fsm ! QueueRemovedCompleted
parent.expectTerminated(fsm)
@@ -1206,10 +1217,9 @@ class MemoryQueueTests
lastAckedActivationResult.response.result shouldBe Some(JsObject("error" -> JsString("resource not enough")))
}, 5.seconds)
- parent.expectMsg(queueRemovedMsg)
-
// should goto Removed
- parent.expectMsg(Transition(fsm, Flushing, Removed))
+ parent.expectMsgAnyOf(queueRemovedMsg, Transition(fsm, Flushing, Removed))
+
fsm ! QueueRemovedCompleted
fsm.stop()
@@ -1250,7 +1260,7 @@ class MemoryQueueTests
val now = Instant.now
fsm.underlyingActor.queue =
Queue.apply(TimeSeriesActivationEntry(Instant.ofEpochMilli(now.toEpochMilli + 1000), message))
- fsm.underlyingActor.containers = Set.empty[String]
+ fsm.underlyingActor.containers = mutable.Set.empty[String]
fsm.setState(Running, RunningData(probe.ref, probe.ref))
fsm ! StopSchedulingAsOutdated // update action
queueManager.expectMsg(staleQueueRemovedMsg)
@@ -1294,7 +1304,7 @@ class MemoryQueueTests
val now = Instant.now
fsm.underlyingActor.queue =
Queue.apply(TimeSeriesActivationEntry(Instant.ofEpochMilli(now.toEpochMilli + 1000), message))
- fsm.underlyingActor.containers = Set(testContainerId)
+ fsm.underlyingActor.containers = mutable.Set(testContainerId)
fsm.setState(Running, RunningData(probe.ref, probe.ref))
fsm ! StopSchedulingAsOutdated // update action
queueManager.expectMsg(staleQueueRemovedMsg)
@@ -1399,10 +1409,10 @@ class MemoryQueueTests
val duration = FiniteDuration(queueConfig.maxBlackboxRetentionMs, MILLISECONDS) + queueConfig.flushGrace
probe.expectMsg(duration, ActivationResponse.whiskError("no available invokers"))
- parent.expectMsg(
+ parent.expectMsgAnyOf(
duration,
- QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(action.rev), Some(leaderKey)))
- parent.expectMsg(Transition(fsm, Flushing, Removed))
+ QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(action.rev), Some(leaderKey)),
+ Transition(fsm, Flushing, Removed))
fsm ! QueueRemovedCompleted
parent.expectTerminated(fsm)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
index b60472e81..6d8d80f72 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
@@ -29,7 +29,7 @@ import org.apache.openwhisk.common.{GracefulShutdown, TransactionId}
import org.apache.openwhisk.core.WarmUp.warmUpAction
import org.apache.openwhisk.core.ack.ActiveAck
import org.apache.openwhisk.core.connector.test.TestConnector
-import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, ActivationMessage, StatusData, StatusQuery}
+import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, ActivationMessage, GetState, StatusData}
import org.apache.openwhisk.core.database.{ArtifactStore, DocumentRevisionMismatchException, UserContext}
import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
import org.apache.openwhisk.core.entity._
@@ -100,7 +100,8 @@ class QueueManagerTests
ControllerInstanceId("0"),
blocking = false,
content = None)
- val statusData = StatusData(testInvocationNamespace, testFQN.asString, 0, "Running", "RunningData")
+ val statusData =
+ StatusData(testInvocationNamespace, testFQN.asString, List.empty[ActivationId], "Running", "RunningData")
// update start time for activation to ensure it's not stale
def newActivation(start: Instant = Instant.now()): ActivationMessage = {
@@ -132,7 +133,7 @@ class QueueManagerTests
override def receive: Receive = {
case GetActivation(_, _, _, _, _, _) =>
sender ! ActivationResponse(Right(newActivation()))
- case StatusQuery =>
+ case GetState =>
sender ! statusData
}
}))
@@ -796,7 +797,7 @@ class QueueManagerTests
(queueManager ? QueueSize).mapTo[Int].futureValue shouldBe 1
- (queueManager ? StatusQuery).mapTo[Future[Iterable[StatusData]]].futureValue.futureValue shouldBe List(statusData)
+ (queueManager ? GetState).mapTo[Future[List[StatusData]]].flatten.futureValue shouldBe List(statusData)
}
it should "drop the activation message that has not been scheduled for a long time" in {