You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2023/01/25 10:23:05 UTC
[openwhisk] branch master updated: Provide action limit configuration for each namespace (#5229)
This is an automated email from the ASF dual-hosted git repository.
style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 61ca4c8fe Provide action limit configuration for each namespace (#5229)
61ca4c8fe is described below
commit 61ca4c8fe39f2b47b84c20a8114f261cd12820d7
Author: Seonghyun Oh <oh...@navercorp.com>
AuthorDate: Wed Jan 25 19:22:58 2023 +0900
Provide action limit configuration for each namespace (#5229)
* Provide action limit config for namespace
* Check namespace default limit
* Validate system limits and namespace default limits
* Fix test code
* Add system limit test
* Refactor code
* Reject message in invoker reactive
* Change config key
* Refactor code
* Add ansible config
* Update annotation for maxConcurrent
* Add test case for limit api
* Rename limit config key
* Update swagger
* Update document
* Add parameter size limit
* Refactor code
* Update limits API
* Rename allowedDuration -> allowedActionDuration
* Add test case for parameter limit
* Refactor code
* Add request payload limit for namespace
* Check activation result size with namespace payload limit
* Provide truncation size option for namespace
* Support scheduler
* Supports backwards compatibility for new limit config
* Update wskadmin
* Add parameter annotation for truncation
* Fix test code for KubernetesContainerTests
* Fix test code for DockerContainerTests
* Fix test cases
* Fix build error
* Fix build error
---
.../jenkins/group_vars/openwhisk-vm1-he-de | 2 +
.../jenkins/group_vars/openwhisk-vm2-he-de | 2 +
.../jenkins/group_vars/openwhisk-vm3-he-de | 2 +
ansible/environments/local/group_vars/all | 1 +
ansible/roles/controller/tasks/deploy.yml | 9 +
ansible/roles/invoker/tasks/deploy.yml | 6 +
ansible/roles/schedulers/tasks/deploy.yml | 9 +
common/scala/src/main/resources/application.conf | 31 ++
.../org/apache/openwhisk/core/WhiskConfig.scala | 9 +
.../apache/openwhisk/core/connector/Message.scala | 3 +
.../core/containerpool/AkkaContainerClient.scala | 44 ++-
.../ApacheBlockingContainerClient.scala | 45 ++-
.../openwhisk/core/containerpool/Container.scala | 32 +-
.../core/containerpool/ContainerClient.scala | 9 +-
.../core/entity/ActivationEntityLimit.scala | 12 +
.../openwhisk/core/entity/ConcurrencyLimit.scala | 44 ++-
.../apache/openwhisk/core/entity/Identity.scala | 97 ++++-
.../org/apache/openwhisk/core/entity/Limits.scala | 10 +-
.../LimitsExceptions.scala} | 20 +-
.../apache/openwhisk/core/entity/LogLimit.scala | 42 ++-
.../apache/openwhisk/core/entity/MemoryLimit.scala | 49 ++-
.../apache/openwhisk/core/entity/Parameter.scala | 19 +-
.../apache/openwhisk/core/entity/TimeLimit.scala | 47 ++-
.../apache/openwhisk/core/entity/WhiskEntity.scala | 13 +-
.../org/apache/openwhisk/http/ErrorResponse.scala | 23 +-
.../src/main/resources/apiv1swagger.json | 36 ++
.../apache/openwhisk/core/controller/Actions.scala | 27 +-
.../openwhisk/core/controller/Controller.scala | 6 +
.../openwhisk/core/controller/Entities.scala | 15 +-
.../apache/openwhisk/core/controller/Limits.scala | 13 +-
.../openwhisk/core/controller/WebActions.scala | 25 +-
.../core/containerpool/ContainerProxy.scala | 2 +
.../containerpool/docker/DockerContainer.scala | 21 +-
.../v2/FunctionPullingContainerProxy.scala | 22 +-
.../core/invoker/ContainerMessageConsumer.scala | 90 +++--
.../openwhisk/core/invoker/InvokerReactive.scala | 5 +-
.../core/scheduler/queue/MemoryQueue.scala | 30 +-
docs/rest_api.md | 2 +-
tests/src/test/resources/application.conf.j2 | 4 +
.../docker/test/AkkaContainerClientTests.scala | 34 +-
.../test/ApacheBlockingContainerClientTests.scala | 25 +-
.../docker/test/DockerContainerTests.scala | 11 +-
.../kubernetes/test/KubernetesContainerTests.scala | 10 +-
.../containerpool/test/ContainerProxyTests.scala | 14 +-
.../test/FunctionPullingContainerProxyTests.scala | 10 +-
.../core/controller/test/ActionsApiTests.scala | 419 ++++++++++++++++++++-
.../core/controller/test/ControllerApiTests.scala | 6 +
.../core/controller/test/LimitsApiTests.scala | 44 ++-
.../core/controller/test/TriggersApiTests.scala | 6 +-
.../core/controller/test/WebActionsApiTests.scala | 64 +++-
.../openwhisk/core/entity/test/SchemaTests.scala | 48 +--
.../test/ContainerMessageConsumerTests.scala | 11 +
tools/admin/wskadmin | 49 ++-
53 files changed, 1331 insertions(+), 298 deletions(-)
diff --git a/ansible/environments/jenkins/group_vars/openwhisk-vm1-he-de b/ansible/environments/jenkins/group_vars/openwhisk-vm1-he-de
index 07c52737f..5b4716f87 100644
--- a/ansible/environments/jenkins/group_vars/openwhisk-vm1-he-de
+++ b/ansible/environments/jenkins/group_vars/openwhisk-vm1-he-de
@@ -49,5 +49,7 @@ container_pool_akka_client: true
runtimes_enable_concurrency: true
limit_action_concurrency_max: 500
+namespace_default_limit_action_concurrency_max: 500
+
invoker1_machine: openwhisk-vm3-he-de
invoker_use_runc: false
diff --git a/ansible/environments/jenkins/group_vars/openwhisk-vm2-he-de b/ansible/environments/jenkins/group_vars/openwhisk-vm2-he-de
index c8c6b7e12..7395d1322 100644
--- a/ansible/environments/jenkins/group_vars/openwhisk-vm2-he-de
+++ b/ansible/environments/jenkins/group_vars/openwhisk-vm2-he-de
@@ -50,5 +50,7 @@ runtimes_enable_concurrency: true
limit_action_concurrency_max: 500
limit_invocations_per_minute: 120
+namespace_default_limit_action_concurrency_max: 500
+
invoker1_machine: openwhisk-vm1-he-de
invoker_use_runc: false
diff --git a/ansible/environments/jenkins/group_vars/openwhisk-vm3-he-de b/ansible/environments/jenkins/group_vars/openwhisk-vm3-he-de
index 9558d5b48..9576ca941 100644
--- a/ansible/environments/jenkins/group_vars/openwhisk-vm3-he-de
+++ b/ansible/environments/jenkins/group_vars/openwhisk-vm3-he-de
@@ -49,5 +49,7 @@ container_pool_akka_client: true
runtimes_enable_concurrency: true
limit_action_concurrency_max: 500
+namespace_default_limit_action_concurrency_max: 500
+
invoker1_machine: openwhisk-vm2-he-de
invoker_use_runc: false
diff --git a/ansible/environments/local/group_vars/all b/ansible/environments/local/group_vars/all
index b8e390cd7..185c3b33d 100644
--- a/ansible/environments/local/group_vars/all
+++ b/ansible/environments/local/group_vars/all
@@ -48,3 +48,4 @@ env_hosts_dir: "{{ playbook_dir }}/environments/local"
container_pool_akka_client: true
runtimes_enable_concurrency: true
limit_action_concurrency_max: 500
+namespace_default_limit_action_concurrency_max: 500
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 5c46392a3..0b604bc45 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -227,6 +227,15 @@
"CONFIG_whisk_concurrencyLimit_max": "{{ limit_action_concurrency_max | default() }}"
"CONFIG_whisk_concurrencyLimit_std": "{{ limit_action_concurrency_std | default() }}"
+ "CONFIG_whisk_namespaceDefaultLimit_memory_min": "{{ namespace_default_limit_action_memory_min | default() }}"
+ "CONFIG_whisk_namespaceDefaultLimit_memory_max": "{{ namespace_default_limit_action_memory_max | default() }}"
+
+ "CONFIG_whisk_namespaceDefaultLimit_timeLimit_min": "{{ namespace_default_limit_action_time_min | default() }}"
+ "CONFIG_whisk_namespaceDefaultLimit_timeLimit_max": "{{ namespace_default_limit_action_time_max | default() }}"
+
+ "CONFIG_whisk_namespaceDefaultLimit_concurrencyLimit_min": "{{ namespace_default_limit_action_concurrency_min | default() }}"
+ "CONFIG_whisk_namespaceDefaultLimit_concurrencyLimit_max": "{{ namespace_default_limit_action_concurrency_max | default() }}"
+
"CONFIG_whisk_featureFlags_requireApiKeyAnnotation": "{{ whisk.feature_flags.require_api_key_annotation | default(true) | lower }}"
"CONFIG_whisk_featureFlags_requireResponsePayload": "{{ whisk.feature_flags.require_response_payload | default(true) | lower }}"
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 62e21fb34..a2089052c 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -279,6 +279,12 @@
"CONFIG_whisk_concurrencyLimit_min": "{{ limit_action_concurrency_min | default() }}"
"CONFIG_whisk_concurrencyLimit_max": "{{ limit_action_concurrency_max | default() }}"
"CONFIG_whisk_concurrencyLimit_std": "{{ limit_action_concurrency_std | default() }}"
+ "CONFIG_whisk_namespaceDefaultLimit_memory_min": "{{ namespace_default_limit_action_memory_min | default() }}"
+ "CONFIG_whisk_namespaceDefaultLimit_memory_max": "{{ namespace_default_limit_action_memory_max | default() }}"
+ "CONFIG_whisk_namespaceDefaultLimit_timeLimit_min": "{{ namespace_default_limit_action_time_min | default() }}"
+ "CONFIG_whisk_namespaceDefaultLimit_timeLimit_max": "{{ namespace_default_limit_action_time_max | default() }}"
+ "CONFIG_whisk_namespaceDefaultLimit_concurrencyLimit_min": "{{ namespace_default_limit_action_concurrency_min | default() }}"
+ "CONFIG_whisk_namespaceDefaultLimit_concurrencyLimit_max": "{{ namespace_default_limit_action_concurrency_max | default() }}"
"CONFIG_whisk_activation_payload_max": "{{ limit_activation_payload | default() }}"
"CONFIG_whisk_transactions_header": "{{ transactions.header }}"
"CONFIG_whisk_containerPool_akkaClient": "{{ container_pool_akka_client | default('false') | lower }}"
diff --git a/ansible/roles/schedulers/tasks/deploy.yml b/ansible/roles/schedulers/tasks/deploy.yml
index 280ea68b7..6b45e3594 100644
--- a/ansible/roles/schedulers/tasks/deploy.yml
+++ b/ansible/roles/schedulers/tasks/deploy.yml
@@ -191,6 +191,15 @@
"CONFIG_whisk_concurrencyLimit_max": "{{ limit_action_concurrency_max | default() }}"
"CONFIG_whisk_concurrencyLimit_std": "{{ limit_action_concurrency_std | default() }}"
+ "CONFIG_whisk_namespaceDefaultLimit_memory_min": "{{ namespace_default_limit_action_memory_min | default() }}"
+ "CONFIG_whisk_namespaceDefaultLimit_memory_max": "{{ namespace_default_limit_action_memory_max | default() }}"
+
+ "CONFIG_whisk_namespaceDefaultLimit_timeLimit_min": "{{ namespace_default_limit_action_time_min | default() }}"
+ "CONFIG_whisk_namespaceDefaultLimit_timeLimit_max": "{{ namespace_default_limit_action_time_max | default() }}"
+
+ "CONFIG_whisk_namespaceDefaultLimit_concurrencyLimit_min": "{{ namespace_default_limit_action_concurrency_min | default() }}"
+ "CONFIG_whisk_namespaceDefaultLimit_concurrencyLimit_max": "{{ namespace_default_limit_action_concurrency_max | default() }}"
+
"RUNTIMES_MANIFEST": "{{ runtimesManifest | to_json }}"
"CONFIG_whisk_runtimes_defaultImagePrefix":
"{{ runtimes_default_image_prefix | default() }}"
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index b139daee7..140d4c4b0 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -509,6 +509,9 @@ whisk {
std = 1
}
+ # maximum size of the action parameter
+ parameter-size-limit = 1 m
+
# maximum size of the action code
exec-size-limit = 48 m
@@ -517,6 +520,34 @@ whisk {
default-list-limit = 30 # default limit on number of entities returned from a collection on a list operation
}
+ # default namespace limit settings
+ # Disabled for backwards compatibility. If you want to use it, either uncomment it or add the setting at deployment time.
+ # namespace-default-limit {
+ # memory {
+ # min = 128 m
+ # max = 512 m
+ # }
+ # time-limit {
+ # min = 100 ms
+ # max = 5 m
+ # }
+ # log-limit {
+ # min = 0 m
+ # max = 10 m
+ # }
+ # concurrency-limit {
+ # min = 1
+ # max = 1
+ # }
+ # parameter-size-limit = 1 m
+ # activation {
+ # payload {
+ # max = 1 m
+ # truncation = 1 m
+ # }
+ # }
+ # }
+
yarn {
master-url="http://localhost:8088" //YARN Resource Manager endpoint to be accessed from the invoker
yarn-link-log-message=true //If true, display a link to YARN in the static log message, otherwise do not include a link to YARN.
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index f9039e947..6a16886fc 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -222,6 +222,15 @@ object ConfigKeys {
val timeLimit = "whisk.time-limit"
val logLimit = "whisk.log-limit"
val concurrencyLimit = "whisk.concurrency-limit"
+ val parameterSizeLimit = "whisk.parameter-size-limit"
+
+ val namespaceMemoryLimit = "whisk.namespace-default-limit.memory"
+ val namespaceTimeLimit = "whisk.namespace-default-limit.time-limit"
+ val namespaceLogLimit = "whisk.namespace-default-limit.log-limit"
+ val namespaceConcurrencyLimit = "whisk.namespace-default-limit.concurrency-limit"
+ val namespaceParameterSizeLimit = "whisk.namespace-default-limit.parameter-size-limit"
+ val namespaceActivationPayloadLimit = "whisk.namespace-default-limit.activation.payload"
+
val activation = "whisk.activation"
val userEvents = "whisk.user-events"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
index da0c4d102..1be6ece8d 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
@@ -682,6 +682,8 @@ object ContainerCreationError extends Enumeration {
case object TooManyConcurrentRequests extends ContainerCreationError
+ case object InvalidActionLimitError extends ContainerCreationError
+
val whiskErrors: Set[ContainerCreationError] =
Set(
NoAvailableInvokersError,
@@ -705,6 +707,7 @@ object ContainerCreationError extends Enumeration {
case "ZERONAMESPACELIMIT" => ZeroNamespaceLimit
case "TOOMANYCONCURRENTREQUESTS" => TooManyConcurrentRequests
case "UNKNOWNERROR" => UnknownError
+ case "INVALIDACTIONLIMITERROR" => InvalidActionLimitError
}
implicit val serds = new RootJsonFormat[ContainerCreationError] {
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
index ea5978bb7..8dbc41f8a 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
@@ -46,9 +46,10 @@ import org.apache.openwhisk.common.MetricEmitter
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.entity.ActivationResponse.ContainerHttpError
import org.apache.openwhisk.core.entity.ActivationResponse._
-import org.apache.openwhisk.core.entity.ByteSize
+import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ByteSize}
import org.apache.openwhisk.core.entity.size.SizeLong
import org.apache.openwhisk.http.PoolingRestClient
+
import java.time.Instant
/**
@@ -61,7 +62,6 @@ import java.time.Instant
* @param hostname the host name
* @param port the port
* @param timeout the timeout in msecs to wait for a response
- * @param maxResponse the maximum size in bytes the connection will accept
* @param queueSize once all connections are used, how big of queue to allow for additional requests
* @param retryInterval duration between retries for TCP connection errors
*/
@@ -69,8 +69,6 @@ protected class AkkaContainerClient(
hostname: String,
port: Int,
timeout: FiniteDuration,
- maxResponse: ByteSize,
- truncation: ByteSize,
queueSize: Int,
retryInterval: FiniteDuration = 100.milliseconds)(implicit logging: Logging, as: ActorSystem)
extends PoolingRestClient("http", hostname, port, queueSize, timeout = Some(timeout))
@@ -87,12 +85,19 @@ protected class AkkaContainerClient(
*
* @param endpoint the path the api call relative to hostname
* @param body the JSON value to post (this is usually a JSON objecT)
+ * @param maxResponse the maximum size in bytes the connection will accept
+ * @param truncation the truncation size in bytes
* @param retry whether or not to retry on connection failure
* @param reschedule whether or not to throw ContainerHealthError (triggers reschedule) on connection failure
* @return Left(Error Message) or Right(Status Code, Response as UTF-8 String)
*/
- def post(endpoint: String, body: JsValue, retry: Boolean, reschedule: Boolean = false)(
- implicit tid: TransactionId): Future[Either[ContainerHttpError, ContainerResponse]] = {
+ def post(
+ endpoint: String,
+ body: JsValue,
+ maxResponse: ByteSize,
+ truncation: ByteSize,
+ retry: Boolean,
+ reschedule: Boolean = false)(implicit tid: TransactionId): Future[Either[ContainerHttpError, ContainerResponse]] = {
//create the request
val req = Marshal(body).to[MessageEntity].map { b =>
@@ -115,7 +120,7 @@ protected class AkkaContainerClient(
Right(ContainerResponse(response.status.intValue, o, None))
}
} else {
- truncated(response.entity.dataBytes).map { s =>
+ truncated(truncation, response.entity.dataBytes).map { s =>
Right(ContainerResponse(response.status.intValue, s, Some(contentLength.B, maxResponse)))
}
}
@@ -167,7 +172,8 @@ protected class AkkaContainerClient(
}
}
- private def truncated(responseBytes: Source[ByteString, _],
+ private def truncated(truncation: ByteSize,
+ responseBytes: Source[ByteString, _],
previouslyCaptured: ByteString = ByteString.empty): Future[String] = {
responseBytes.prefixAndTail(1).runWith(Sink.head).flatMap {
case (Nil, tail) =>
@@ -176,7 +182,7 @@ protected class AkkaContainerClient(
case (Seq(prefix), tail) =>
val truncatedResponse = previouslyCaptured ++ prefix
if (truncatedResponse.size < truncation.toBytes) {
- truncated(tail, truncatedResponse)
+ truncated(truncation, tail, truncatedResponse)
} else {
//ignore the tail (MUST CONSUME ENTIRE ENTITY!)
//captured string MAY be larger than the truncation size, so take only truncation bytes to get the exact length
@@ -194,7 +200,7 @@ object AkkaContainerClient {
as: ActorSystem,
ec: ExecutionContext,
tid: TransactionId): (Int, Option[JsObject]) = {
- val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1.MB, 1)
+ val connection = new AkkaContainerClient(host, port, timeout, 1)
val response = executeRequest(connection, endPoint, content)
val result = Await.result(response, timeout + 10.seconds) //additional timeout to complete futures
connection.close()
@@ -207,7 +213,7 @@ object AkkaContainerClient {
as: ActorSystem,
ec: ExecutionContext,
tid: TransactionId): (Int, Option[JsArray]) = {
- val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1.MB, 1)
+ val connection = new AkkaContainerClient(host, port, timeout, 1)
val response = executeRequestForJsArray(connection, endPoint, content)
val result = Await.result(response, timeout + 10.seconds) //additional timeout to complete futures
connection.close()
@@ -220,7 +226,7 @@ object AkkaContainerClient {
tid: TransactionId,
as: ActorSystem,
ec: ExecutionContext): Seq[(Int, Option[JsObject])] = {
- val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1.MB, 1)
+ val connection = new AkkaContainerClient(host, port, timeout, 1)
val futureResults = contents.map { executeRequest(connection, endPoint, _) }
val results = Await.result(Future.sequence(futureResults), timeout + 10.seconds) //additional timeout to complete futures
connection.close()
@@ -234,7 +240,12 @@ object AkkaContainerClient {
tid: TransactionId): Future[(Int, Option[JsObject])] = {
val res = connection
- .post(endpoint, content, true)
+ .post(
+ endpoint,
+ content,
+ ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
+ ActivationEntityLimit.MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT,
+ true)
.map({
case Right(r) => (r.statusCode, Try(r.entity.parseJson.asJsObject).toOption)
case Left(NoResponseReceived()) => throw new IllegalStateException("no response from container")
@@ -254,7 +265,12 @@ object AkkaContainerClient {
tid: TransactionId): Future[(Int, Option[JsArray])] = {
val res = connection
- .post(endpoint, content, true)
+ .post(
+ endpoint,
+ content,
+ ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
+ ActivationEntityLimit.MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT,
+ retry = true)
.map({
case Right(r) => (r.statusCode, Try(r.entity.parseJson.convertTo[JsArray]).toOption)
case Left(NoResponseReceived()) => throw new IllegalStateException("no response from container")
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ApacheBlockingContainerClient.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ApacheBlockingContainerClient.scala
index 1ba7732be..b366d28b1 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ApacheBlockingContainerClient.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ApacheBlockingContainerClient.scala
@@ -36,7 +36,7 @@ import spray.json._
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.entity.ActivationResponse._
-import org.apache.openwhisk.core.entity.ByteSize
+import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ByteSize}
import org.apache.openwhisk.core.entity.size.SizeLong
import pureconfig._
import pureconfig.generic.auto._
@@ -61,14 +61,11 @@ protected[containerpool] case class RetryableConnectionError(t: Throwable) exten
*
* @param hostname the host name
* @param timeout the timeout in msecs to wait for a response
- * @param maxResponse the maximum size in bytes the connection will accept
* @param maxConcurrent the maximum number of concurrent requests allowed (Default is 1)
*/
-protected class ApacheBlockingContainerClient(hostname: String,
- timeout: FiniteDuration,
- maxResponse: ByteSize,
- truncation: ByteSize,
- maxConcurrent: Int = 1)(implicit logging: Logging, ec: ExecutionContext)
+protected class ApacheBlockingContainerClient(hostname: String, timeout: FiniteDuration, maxConcurrent: Int = 1)(
+ implicit logging: Logging,
+ ec: ExecutionContext)
extends ContainerClient {
/**
@@ -88,11 +85,17 @@ protected class ApacheBlockingContainerClient(hostname: String,
*
* @param endpoint the path the api call relative to hostname
* @param body the JSON value to post (this is usually a JSON objecT)
+ * @param maxResponse the maximum size in bytes the connection will accept
* @param retry whether or not to retry on connection failure
* @return Left(Error Message) or Right(Status Code, Response as UTF-8 String)
*/
- def post(endpoint: String, body: JsValue, retry: Boolean, reschedule: Boolean = false)(
- implicit tid: TransactionId): Future[Either[ContainerHttpError, ContainerResponse]] = {
+ def post(
+ endpoint: String,
+ body: JsValue,
+ maxResponse: ByteSize,
+ truncation: ByteSize,
+ retry: Boolean,
+ reschedule: Boolean = false)(implicit tid: TransactionId): Future[Either[ContainerHttpError, ContainerResponse]] = {
val entity = new StringEntity(body.compactPrint, StandardCharsets.UTF_8)
entity.setContentType("application/json")
@@ -102,7 +105,7 @@ protected class ApacheBlockingContainerClient(hostname: String,
Future {
blocking {
- execute(request, timeout, maxConcurrent, retry, reschedule)
+ execute(request, timeout, maxConcurrent, maxResponse, truncation, retry, reschedule)
}
}
}
@@ -112,6 +115,8 @@ protected class ApacheBlockingContainerClient(hostname: String,
request: HttpRequestBase,
timeout: FiniteDuration,
maxConcurrent: Int,
+ maxResponse: ByteSize,
+ truncation: ByteSize,
retry: Boolean,
reschedule: Boolean = false)(implicit tid: TransactionId): Either[ContainerHttpError, ContainerResponse] = {
val start = Instant.now
@@ -124,13 +129,13 @@ protected class ApacheBlockingContainerClient(hostname: String,
// Negative contentLength means unknown or overflow. We don't want to consume in either case.
if (contentLength >= 0) {
- if (contentLength <= maxResponseBytes) {
+ if (contentLength <= maxResponse.toBytes) {
// optimized route to consume the entire stream into a string
val str = EntityUtils.toString(entity, StandardCharsets.UTF_8) // consumes and closes the whole stream
Right(ContainerResponse(statusCode, str, None))
} else {
// only consume a bounded number of bytes according to the system limits
- val str = new String(IOUtils.toByteArray(entity.getContent, truncationBytes), StandardCharsets.UTF_8)
+ val str = new String(IOUtils.toByteArray(entity.getContent, truncation.toBytes), StandardCharsets.UTF_8)
EntityUtils.consumeQuietly(entity) // consume the rest of the stream to free the connection
Right(ContainerResponse(statusCode, str, Some(contentLength.B, maxResponse)))
}
@@ -171,7 +176,7 @@ protected class ApacheBlockingContainerClient(hostname: String,
if (timeout > Duration.Zero) {
Thread.sleep(50) // Sleep for 50 milliseconds
val newTimeout = timeout - (Instant.now.toEpochMilli - start.toEpochMilli).milliseconds
- execute(request, newTimeout, maxConcurrent, retry = true)
+ execute(request, newTimeout, maxConcurrent, maxResponse, truncation, retry = true)
} else {
logging.warn(this, s"POST failed with $t - no retry because timeout exceeded.")
Left(Timeout(t))
@@ -180,9 +185,6 @@ protected class ApacheBlockingContainerClient(hostname: String,
}
}
- private val maxResponseBytes = maxResponse.toBytes
- private val truncationBytes = truncation.toBytes
-
private val baseUri = new URIBuilder()
.setScheme("http")
.setHost(hostname)
@@ -229,7 +231,7 @@ object ApacheBlockingContainerClient {
tid: TransactionId,
ec: ExecutionContext): (Int, Option[JsObject]) = {
val timeout = 90.seconds
- val connection = new ApacheBlockingContainerClient(s"$host:$port", timeout, 1.MB, 1.MB)
+ val connection = new ApacheBlockingContainerClient(s"$host:$port", timeout)
val response = executeRequest(connection, endPoint, content)
val result = Await.result(response, timeout)
connection.close()
@@ -241,7 +243,7 @@ object ApacheBlockingContainerClient {
implicit logging: Logging,
tid: TransactionId,
ec: ExecutionContext): Seq[(Int, Option[JsObject])] = {
- val connection = new ApacheBlockingContainerClient(s"$host:$port", 90.seconds, 1.MB, 1.MB, contents.size)
+ val connection = new ApacheBlockingContainerClient(s"$host:$port", 90.seconds, contents.size)
val futureResults = contents.map { content =>
executeRequest(connection, endPoint, content)
}
@@ -254,7 +256,12 @@ object ApacheBlockingContainerClient {
implicit logging: Logging,
tid: TransactionId,
ec: ExecutionContext): Future[(Int, Option[JsObject])] = {
- connection.post(endpoint, content, retry = true) map {
+ connection.post(
+ endpoint,
+ content,
+ ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
+ ActivationEntityLimit.MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT,
+ retry = true) map {
case Right(r) => (r.statusCode, Try(r.entity.parseJson.asJsObject).toOption)
case Left(NoResponseReceived()) => throw new IllegalStateException("no response from container")
case Left(Timeout(_)) => throw new java.util.concurrent.TimeoutException()
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
index 5290f3e4a..1df29e055 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
@@ -122,7 +122,14 @@ trait Container {
containerHttpMaxConcurrent = maxConcurrent
containerHttpTimeout = timeout
val body = JsObject("value" -> initializer)
- callContainer("/init", body, timeout, maxConcurrent, retry = true)
+ callContainer(
+ "/init",
+ body,
+ timeout,
+ maxConcurrent,
+ ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
+ ActivationEntityLimit.MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT,
+ retry = true)
.andThen { // never fails
case Success(r: RunResult) =>
transid.finished(
@@ -163,6 +170,8 @@ trait Container {
environment: JsObject,
timeout: FiniteDuration,
maxConcurrent: Int,
+ maxResponse: ByteSize,
+ truncation: ByteSize,
reschedule: Boolean = false)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
val actionName = environment.fields.get("action_name").map(_.convertTo[String]).getOrElse("")
val start =
@@ -174,7 +183,7 @@ trait Container {
val parameterWrapper = JsObject("value" -> parameters)
val body = JsObject(parameterWrapper.fields ++ environment.fields)
- callContainer("/run", body, timeout, maxConcurrent, retry = false, reschedule)
+ callContainer("/run", body, timeout, maxConcurrent, maxResponse, truncation, retry = false, reschedule)
.andThen { // never fails
case Success(r: RunResult) =>
transid.finished(
@@ -219,6 +228,8 @@ trait Container {
body: JsObject,
timeout: FiniteDuration,
maxConcurrent: Int,
+ maxResponse: ByteSize,
+ truncation: ByteSize,
retry: Boolean = false,
reschedule: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
val started = Instant.now()
@@ -228,7 +239,7 @@ trait Container {
conn
}
http
- .post(path, body, retry, reschedule)
+ .post(path, body, maxResponse, truncation, retry, reschedule)
.map { response =>
val finished = Instant.now()
RunResult(Interval(started, finished), response)
@@ -236,20 +247,9 @@ trait Container {
}
private def openConnections(timeout: FiniteDuration, maxConcurrent: Int) = {
if (Container.config.akkaClient) {
- new AkkaContainerClient(
- addr.host,
- addr.port,
- timeout,
- ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
- ActivationEntityLimit.MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT,
- 1024)
+ new AkkaContainerClient(addr.host, addr.port, timeout, 1024)
} else {
- new ApacheBlockingContainerClient(
- s"${addr.host}:${addr.port}",
- timeout,
- ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
- ActivationEntityLimit.MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT,
- maxConcurrent)
+ new ApacheBlockingContainerClient(s"${addr.host}:${addr.port}", timeout, maxConcurrent)
}
}
private def closeConnections(toClose: Option[ContainerClient]): Future[Unit] = {
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerClient.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerClient.scala
index 597376cad..512ae6a10 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerClient.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerClient.scala
@@ -22,9 +22,14 @@ import spray.json._
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.entity.ActivationResponse.ContainerHttpError
import org.apache.openwhisk.core.entity.ActivationResponse._
+import org.apache.openwhisk.core.entity.ByteSize
trait ContainerClient {
- def post(endpoint: String, body: JsValue, retry: Boolean, reschedule: Boolean)(
- implicit tid: TransactionId): Future[Either[ContainerHttpError, ContainerResponse]]
+ def post(endpoint: String,
+ body: JsValue,
+ maxResponse: ByteSize,
+ truncation: ByteSize,
+ retry: Boolean,
+ reschedule: Boolean)(implicit tid: TransactionId): Future[Either[ContainerHttpError, ContainerResponse]]
def close(): Future[Unit]
}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationEntityLimit.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationEntityLimit.scala
index 45685e348..2bbe8355f 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationEntityLimit.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationEntityLimit.scala
@@ -32,8 +32,20 @@ case class ActivationEntityLimitConf(serdesOverhead: ByteSize, payload: Activati
*/
protected[core] object ActivationEntityLimit {
private val config = loadConfigOrThrow[ActivationEntityLimitConf](ConfigKeys.activation)
+ private val namespacePayloadLimitConfig = try {
+ loadConfigOrThrow[ActivationEntityPayload](ConfigKeys.namespaceActivationPayloadLimit)
+ } catch {
+ case _: Throwable =>
+ // Supports backwards compatibility for openwhisk that do not use the namespace default limit
+ ActivationEntityPayload(config.payload.max, config.payload.truncation)
+ }
+ // system limit
protected[core] val MAX_ACTIVATION_ENTITY_LIMIT: ByteSize = config.payload.max
protected[core] val MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT: ByteSize = config.payload.truncation
protected[core] val MAX_ACTIVATION_LIMIT: ByteSize = config.payload.max + config.serdesOverhead
+
+ // namespace default limit
+ protected[core] val MAX_ACTIVATION_ENTITY_LIMIT_DEFAULT: ByteSize = namespacePayloadLimitConfig.max
+ protected[core] val MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT_DEFAULT: ByteSize = namespacePayloadLimitConfig.truncation
}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ConcurrencyLimit.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ConcurrencyLimit.scala
index c9032671c..a6ebddfe3 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ConcurrencyLimit.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ConcurrencyLimit.scala
@@ -19,13 +19,16 @@ package org.apache.openwhisk.core.entity
import com.typesafe.config.ConfigFactory
import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.http.Messages
import pureconfig._
import pureconfig.generic.auto._
+
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import spray.json._
+case class NamespaceConcurrencyLimitConfig(min: Int, max: Int)
case class ConcurrencyLimitConfig(min: Int, max: Int, std: Int)
/**
@@ -39,19 +42,54 @@ case class ConcurrencyLimitConfig(min: Int, max: Int, std: Int)
*
* @param maxConcurrent the max number of concurrent activations in a single container
*/
-protected[entity] class ConcurrencyLimit private (val maxConcurrent: Int) extends AnyVal
+protected[entity] class ConcurrencyLimit private (val maxConcurrent: Int) extends AnyVal {
+
+ /** It checks the namespace memory limit setting value */
+ @throws[ActionConcurrencyLimitException]
+ protected[core] def checkNamespaceLimit(user: Identity): Unit = {
+ val concurrencyMax = user.limits.allowedMaxActionConcurrency
+ val concurrencyMin = user.limits.allowedMinActionConcurrency
+ try {
+ require(
+ maxConcurrent <= concurrencyMax,
+ Messages.concurrencyExceedsAllowedThreshold(maxConcurrent, concurrencyMax))
+ require(maxConcurrent >= concurrencyMin, Messages.concurrencyBelowAllowedThreshold(maxConcurrent, concurrencyMin))
+ } catch {
+ case e: IllegalArgumentException => throw ActionConcurrencyLimitException(e.getMessage)
+ }
+ }
+}
protected[core] object ConcurrencyLimit extends ArgNormalizer[ConcurrencyLimit] {
//since tests require override to the default config, load the "test" config, with fallbacks to default
val config = ConfigFactory.load().getConfig("test")
private val concurrencyConfig =
loadConfigWithFallbackOrThrow[ConcurrencyLimitConfig](config, ConfigKeys.concurrencyLimit)
+ private val namespaceConcurrencyDefaultConfig = try {
+ loadConfigWithFallbackOrThrow[NamespaceConcurrencyLimitConfig](config, ConfigKeys.namespaceConcurrencyLimit)
+ } catch {
+ case _: Throwable =>
+ // Supports backwards compatibility for openwhisk that do not use the namespace default limit
+ NamespaceConcurrencyLimitConfig(concurrencyConfig.min, concurrencyConfig.max)
+ }
- /** These values are set once at the beginning. Dynamic configuration updates are not supported at the moment. */
+ /**
+ * These system limits and namespace default limits are set once at the beginning.
+ * Dynamic configuration updates are not supported at the moment.
+ */
protected[core] val MIN_CONCURRENT: Int = concurrencyConfig.min
protected[core] val MAX_CONCURRENT: Int = concurrencyConfig.max
protected[core] val STD_CONCURRENT: Int = concurrencyConfig.std
+ /** Default namespace limit used if there is no namespace-specific limit */
+ protected[core] val MIN_CONCURRENT_DEFAULT: Int = namespaceConcurrencyDefaultConfig.min
+ protected[core] val MAX_CONCURRENT_DEFAULT: Int = namespaceConcurrencyDefaultConfig.max
+
+ require(
+ MAX_CONCURRENT >= MAX_CONCURRENT_DEFAULT,
+ "The system max limit must be greater than the namespace max limit.")
+ require(MIN_CONCURRENT <= MIN_CONCURRENT_DEFAULT, "The system min limit must be less than the namespace min limit.")
+
/** A singleton ConcurrencyLimit with default value */
protected[core] val standardConcurrencyLimit = ConcurrencyLimit(STD_CONCURRENT)
@@ -67,8 +105,6 @@ protected[core] object ConcurrencyLimit extends ArgNormalizer[ConcurrencyLimit]
*/
@throws[IllegalArgumentException]
protected[core] def apply(concurrency: Int): ConcurrencyLimit = {
- require(concurrency >= MIN_CONCURRENT, s"concurrency $concurrency below allowed threshold of $MIN_CONCURRENT")
- require(concurrency <= MAX_CONCURRENT, s"concurrency $concurrency exceeds allowed threshold of $MAX_CONCURRENT")
new ConcurrencyLimit(concurrency)
}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala
index b05ae7021..653d3b1a0 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala
@@ -29,6 +29,7 @@ import org.apache.openwhisk.core.entity.types.AuthStore
import spray.json._
import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
import scala.util.Try
case class UserLimits(invocationsPerMinute: Option[Int] = None,
@@ -36,13 +37,103 @@ case class UserLimits(invocationsPerMinute: Option[Int] = None,
firesPerMinute: Option[Int] = None,
allowedKinds: Option[Set[String]] = None,
storeActivations: Option[Boolean] = None,
+ minActionMemory: Option[MemoryLimit] = None,
+ maxActionMemory: Option[MemoryLimit] = None,
+ minActionLogs: Option[LogLimit] = None,
+ maxActionLogs: Option[LogLimit] = None,
+ minActionTimeout: Option[TimeLimit] = None,
+ maxActionTimeout: Option[TimeLimit] = None,
+ minActionConcurrency: Option[ConcurrencyLimit] = None,
+ maxActionConcurrency: Option[ConcurrencyLimit] = None,
+ maxParameterSize: Option[ByteSize] = None,
+ maxPayloadSize: Option[ByteSize] = None,
+ truncationSize: Option[ByteSize] = None,
warmedContainerKeepingCount: Option[Int] = None,
- warmedContainerKeepingTimeout: Option[String] = None)
+ warmedContainerKeepingTimeout: Option[String] = None) {
+
+ def allowedMaxParameterSize: ByteSize = {
+ val namespaceLimit = maxParameterSize getOrElse (Parameters.MAX_SIZE_DEFAULT)
+ if (namespaceLimit > Parameters.MAX_SIZE) {
+ Parameters.MAX_SIZE
+ } else namespaceLimit
+ }
+
+ def allowedMaxPayloadSize: ByteSize = {
+ val namespaceLimit = maxPayloadSize getOrElse (ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT_DEFAULT)
+ if (namespaceLimit > ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT) {
+ ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT
+ } else namespaceLimit
+ }
+
+ def allowedTruncationSize: ByteSize = {
+ val namespaceLimit = truncationSize getOrElse (ActivationEntityLimit.MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT_DEFAULT)
+ if (namespaceLimit > ActivationEntityLimit.MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT) {
+ ActivationEntityLimit.MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT
+ } else namespaceLimit
+ }
+
+ def allowedMaxActionConcurrency: Int = {
+ val namespaceLimit = maxActionConcurrency.map(_.maxConcurrent) getOrElse (ConcurrencyLimit.MAX_CONCURRENT_DEFAULT)
+ if (namespaceLimit > ConcurrencyLimit.MAX_CONCURRENT) {
+ ConcurrencyLimit.MAX_CONCURRENT
+ } else namespaceLimit
+ }
+
+ def allowedMinActionConcurrency: Int = {
+ val namespaceLimit = minActionConcurrency.map(_.maxConcurrent) getOrElse (ConcurrencyLimit.MIN_CONCURRENT_DEFAULT)
+ if (namespaceLimit < ConcurrencyLimit.MIN_CONCURRENT) {
+ ConcurrencyLimit.MIN_CONCURRENT
+ } else namespaceLimit
+ }
+
+ def allowedMaxActionMemory: ByteSize = {
+ val namespaceLimit = maxActionMemory.map(_.toByteSize) getOrElse (MemoryLimit.MAX_MEMORY_DEFAULT)
+ if (namespaceLimit > MemoryLimit.MAX_MEMORY) {
+ MemoryLimit.MAX_MEMORY
+ } else namespaceLimit
+ }
+
+ def allowedMinActionMemory: ByteSize = {
+ val namespaceLimit = minActionMemory.map(_.toByteSize) getOrElse (MemoryLimit.MIN_MEMORY_DEFAULT)
+ if (namespaceLimit < MemoryLimit.MIN_MEMORY) {
+ MemoryLimit.MIN_MEMORY
+ } else namespaceLimit
+ }
+
+ def allowedMaxActionLogs: ByteSize = {
+ val namespaceLogsMax = maxActionLogs.map(_.toByteSize) getOrElse (LogLimit.MAX_LOGSIZE_DEFAULT)
+ if (namespaceLogsMax > LogLimit.MAX_LOGSIZE) {
+ LogLimit.MAX_LOGSIZE
+ } else namespaceLogsMax
+ }
+
+ def allowedMinActionLogs: ByteSize = {
+ val namespaceLimit = minActionLogs.map(_.toByteSize) getOrElse (LogLimit.MIN_LOGSIZE_DEFAULT)
+ if (namespaceLimit < LogLimit.MIN_LOGSIZE) {
+ LogLimit.MIN_LOGSIZE
+ } else namespaceLimit
+ }
+
+ def allowedMaxActionTimeout: FiniteDuration = {
+ val namespaceLimit = maxActionTimeout.map(_.duration) getOrElse (TimeLimit.MAX_DURATION_DEFAULT)
+ if (namespaceLimit > TimeLimit.MAX_DURATION) {
+ TimeLimit.MAX_DURATION
+ } else namespaceLimit
+ }
+
+ def allowedMinActionTimeout: FiniteDuration = {
+ val namespaceLimit = minActionTimeout.map(_.duration) getOrElse (TimeLimit.MIN_DURATION_DEFAULT)
+ if (namespaceLimit < TimeLimit.MIN_DURATION) {
+ TimeLimit.MIN_DURATION
+ } else namespaceLimit
+ }
+
+}
object UserLimits extends DefaultJsonProtocol {
val standardUserLimits = UserLimits()
-
- implicit val serdes = jsonFormat7(UserLimits.apply)
+ private implicit val byteSizeSerdes = size.serdes
+ implicit val serdes = jsonFormat18(UserLimits.apply)
}
protected[core] case class Namespace(name: EntityName, uuid: UUID)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Limits.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Limits.scala
index 0992580ab..8d3b932c6 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Limits.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Limits.scala
@@ -37,7 +37,7 @@ protected[entity] abstract class Limits {
* Limits on a specific action. Includes the following properties
* {
* timeout: maximum duration in msecs an action is allowed to consume in [100 msecs, 5 minutes],
- * memory: maximum memory in megabytes an action is allowed to consume within system limit, default [128 MB, 512 MB],
+ * memory: maximum memory in megabytes an action is allowed to consume within namespace limit, default [128 MB, 512 MB],
* logs: maximum logs line in megabytes an action is allowed to generate [10 MB],
* concurrency: maximum number of concurrently processed activations per container [1, 200]
* }
@@ -53,6 +53,14 @@ protected[core] case class ActionLimits(timeout: TimeLimit = TimeLimit(),
concurrency: ConcurrencyLimit = ConcurrencyLimit())
extends Limits {
override protected[entity] def toJson = ActionLimits.serdes.write(this)
+
+ @throws[ActionLimitsException]
+ def checkLimits(user: Identity): Unit = {
+ timeout.checkNamespaceLimit(user)
+ memory.checkNamespaceLimit(user)
+ concurrency.checkNamespaceLimit(user)
+ logs.checkNamespaceLimit(user)
+ }
}
/**
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerClient.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/LimitsExceptions.scala
similarity index 60%
copy from common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerClient.scala
copy to common/scala/src/main/scala/org/apache/openwhisk/core/entity/LimitsExceptions.scala
index 597376cad..10be9497b 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerClient.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/LimitsExceptions.scala
@@ -15,16 +15,14 @@
* limitations under the License.
*/
-package org.apache.openwhisk.core.containerpool
+package org.apache.openwhisk.core.entity
-import scala.concurrent.Future
-import spray.json._
-import org.apache.openwhisk.common.TransactionId
-import org.apache.openwhisk.core.entity.ActivationResponse.ContainerHttpError
-import org.apache.openwhisk.core.entity.ActivationResponse._
+sealed abstract class ActionLimitsException(message: String) extends IllegalArgumentException(message)
-trait ContainerClient {
- def post(endpoint: String, body: JsValue, retry: Boolean, reschedule: Boolean)(
- implicit tid: TransactionId): Future[Either[ContainerHttpError, ContainerResponse]]
- def close(): Future[Unit]
-}
+case class ActionTimeLimitException(message: String) extends ActionLimitsException(message)
+
+case class ActionMemoryLimitException(message: String) extends ActionLimitsException(message)
+
+case class ActionLogLimitException(message: String) extends ActionLimitsException(message)
+
+case class ActionConcurrencyLimitException(message: String) extends ActionLimitsException(message)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/LogLimit.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/LogLimit.scala
index 9197b60d3..10e7672b3 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/LogLimit.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/LogLimit.scala
@@ -27,6 +27,7 @@ import scala.util.Try
import spray.json._
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.http.Messages
case class LogLimitConfig(min: ByteSize, max: ByteSize, std: ByteSize)
@@ -44,16 +45,53 @@ case class LogLimitConfig(min: ByteSize, max: ByteSize, std: ByteSize)
*/
protected[core] class LogLimit private (val megabytes: Int) extends AnyVal {
protected[core] def asMegaBytes: ByteSize = megabytes.megabytes
+
+ def toByteSize: ByteSize = ByteSize(megabytes, SizeUnits.MB)
+
+ /** It checks the namespace memory limit setting value */
+ @throws[ActionLogLimitException]
+ protected[core] def checkNamespaceLimit(user: Identity): Unit = {
+ val logMax = user.limits.allowedMaxActionLogs
+ val logMin = user.limits.allowedMinActionLogs
+ try {
+ require(
+ megabytes <= logMax.toMB,
+ Messages.sizeExceedsAllowedThreshold(LogLimit.logLimitFieldName, megabytes, logMax.toMB.toInt))
+ require(
+ megabytes >= logMin.toMB,
+ Messages.sizeBelowAllowedThreshold(LogLimit.logLimitFieldName, megabytes, logMin.toMB.toInt))
+ } catch {
+ case e: IllegalArgumentException => throw ActionLogLimitException(e.getMessage)
+ }
+ }
}
protected[core] object LogLimit extends ArgNormalizer[LogLimit] {
val config = loadConfigOrThrow[MemoryLimitConfig](ConfigKeys.logLimit)
+ val namespaceDefaultConfig = try {
+ loadConfigOrThrow[NamespaceMemoryLimitConfig](ConfigKeys.namespaceLogLimit)
+ } catch {
+ case _: Throwable =>
+ // Supports backwards compatibility for openwhisk that do not use the namespace default limit
+ NamespaceMemoryLimitConfig(config.min, config.max)
+ }
+ val logLimitFieldName = "log"
- /** These values are set once at the beginning. Dynamic configuration updates are not supported at the moment. */
+ /**
+ * These system limits and namespace default limits are set once at the beginning.
+ * Dynamic configuration updates are not supported at the moment.
+ */
protected[core] val MIN_LOGSIZE: ByteSize = config.min
protected[core] val MAX_LOGSIZE: ByteSize = config.max
protected[core] val STD_LOGSIZE: ByteSize = config.std
+ /** Default log limit used if there is no namespace-specific limit */
+ protected[core] val MIN_LOGSIZE_DEFAULT: ByteSize = namespaceDefaultConfig.min
+ protected[core] val MAX_LOGSIZE_DEFAULT: ByteSize = namespaceDefaultConfig.max
+
+ require(MAX_LOGSIZE >= MAX_LOGSIZE_DEFAULT, "The system max limit must be greater than the namespace max limit.")
+ require(MIN_LOGSIZE <= MIN_LOGSIZE_DEFAULT, "The system min limit must be less than the namespace min limit.")
+
/** A singleton LogLimit with default value */
protected[core] val standardLogLimit = LogLimit(STD_LOGSIZE)
@@ -69,8 +107,6 @@ protected[core] object LogLimit extends ArgNormalizer[LogLimit] {
*/
@throws[IllegalArgumentException]
protected[core] def apply(megabytes: ByteSize): LogLimit = {
- require(megabytes >= MIN_LOGSIZE, s"log size $megabytes below allowed threshold of $MIN_LOGSIZE")
- require(megabytes <= MAX_LOGSIZE, s"log size $megabytes exceeds allowed threshold of $MAX_LOGSIZE")
new LogLimit(megabytes.toMB.toInt)
}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/MemoryLimit.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/MemoryLimit.scala
index 6cef6413e..4f3d8d64f 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/MemoryLimit.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/MemoryLimit.scala
@@ -21,14 +21,15 @@ import scala.language.postfixOps
import scala.util.Failure
import scala.util.Success
import scala.util.Try
-
import spray.json._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.http.Messages
import pureconfig._
import pureconfig.generic.auto._
case class MemoryLimitConfig(min: ByteSize, max: ByteSize, std: ByteSize)
+case class NamespaceMemoryLimitConfig(min: ByteSize, max: ByteSize)
/**
* MemoryLimit encapsulates allowed memory for an action. The limit must be within a
@@ -40,15 +41,50 @@ case class MemoryLimitConfig(min: ByteSize, max: ByteSize, std: ByteSize)
*
* @param megabytes the memory limit in megabytes for the action
*/
-protected[entity] class MemoryLimit private (val megabytes: Int) extends AnyVal
+protected[entity] class MemoryLimit private (val megabytes: Int) extends AnyVal {
+
+ def toByteSize: ByteSize = ByteSize(megabytes, SizeUnits.MB)
+
+ /** It checks the namespace memory limit setting value */
+ @throws[ActionMemoryLimitException]
+ protected[core] def checkNamespaceLimit(user: Identity): Unit = {
+ val memoryMax = user.limits.allowedMaxActionMemory
+ val memoryMin = user.limits.allowedMinActionMemory
+ try {
+ require(
+ megabytes <= memoryMax.toMB,
+ Messages.sizeExceedsAllowedThreshold(MemoryLimit.memoryLimitFieldName, megabytes, memoryMax.toMB.toInt))
+ require(
+ megabytes >= memoryMin.toMB,
+ Messages.sizeBelowAllowedThreshold(MemoryLimit.memoryLimitFieldName, megabytes, memoryMin.toMB.toInt))
+ } catch {
+ case e: IllegalArgumentException => throw ActionMemoryLimitException(e.getMessage)
+ }
+ }
+}
protected[core] object MemoryLimit extends ArgNormalizer[MemoryLimit] {
val config = loadConfigOrThrow[MemoryLimitConfig](ConfigKeys.memory)
+ val namespaceDefaultConfig = try {
+ loadConfigOrThrow[NamespaceMemoryLimitConfig](ConfigKeys.namespaceMemoryLimit)
+ } catch {
+ case _: Throwable =>
+ // Supports backwards compatibility for openwhisk that do not use the namespace default limit
+ NamespaceMemoryLimitConfig(config.min, config.max)
+ }
+ val memoryLimitFieldName = "memory"
- /** These values are set once at the beginning. Dynamic configuration updates are not supported at the moment. */
+ /**
+ * These system limits and namespace default limits are set once at the beginning.
+ * Dynamic configuration updates are not supported at the moment.
+ */
+ protected[core] val STD_MEMORY: ByteSize = config.std
protected[core] val MIN_MEMORY: ByteSize = config.min
protected[core] val MAX_MEMORY: ByteSize = config.max
- protected[core] val STD_MEMORY: ByteSize = config.std
+
+ /** Default namespace limit used if there is no namespace-specific limit */
+ protected[core] val MIN_MEMORY_DEFAULT: ByteSize = namespaceDefaultConfig.min
+ protected[core] val MAX_MEMORY_DEFAULT: ByteSize = namespaceDefaultConfig.max
/** A singleton MemoryLimit with default value */
protected[core] val standardMemoryLimit = MemoryLimit(STD_MEMORY)
@@ -56,6 +92,9 @@ protected[core] object MemoryLimit extends ArgNormalizer[MemoryLimit] {
/** Gets MemoryLimit with default value */
protected[core] def apply(): MemoryLimit = standardMemoryLimit
+ require(MAX_MEMORY >= MAX_MEMORY_DEFAULT, "The system max limit must be greater than the namespace max limit.")
+ require(MIN_MEMORY <= MIN_MEMORY_DEFAULT, "The system min limit must be less than the namespace min limit.")
+
/**
* Creates MemoryLimit for limit, iff limit is within permissible range.
*
@@ -65,8 +104,6 @@ protected[core] object MemoryLimit extends ArgNormalizer[MemoryLimit] {
*/
@throws[IllegalArgumentException]
protected[core] def apply(megabytes: ByteSize): MemoryLimit = {
- require(megabytes >= MIN_MEMORY, s"memory $megabytes below allowed threshold of $MIN_MEMORY")
- require(megabytes <= MAX_MEMORY, s"memory $megabytes exceeds allowed threshold of $MAX_MEMORY")
new MemoryLimit(megabytes.toMB.toInt)
}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Parameter.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Parameter.scala
index 4a6686790..ec71ed59e 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Parameter.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Parameter.scala
@@ -17,13 +17,15 @@
package org.apache.openwhisk.core.entity
+import org.apache.openwhisk.core.ConfigKeys
+
import scala.util.{Failure, Success, Try}
import spray.json.DefaultJsonProtocol._
import spray.json._
import scala.language.postfixOps
-import org.apache.openwhisk.core.entity.size.SizeInt
-import org.apache.openwhisk.core.entity.size.SizeString
+import org.apache.openwhisk.core.entity.size._
+import pureconfig.loadConfigOrThrow
/**
* Parameters is a key-value map from parameter names to parameter values. The value of a
@@ -214,9 +216,20 @@ protected[entity] case class ParameterValue protected[entity] (private val v: Js
protected[core] object Parameters extends ArgNormalizer[Parameters] {
+ protected[core] val MAX_SIZE = loadConfigOrThrow[ByteSize](ConfigKeys.parameterSizeLimit) // system limit
+ protected[core] val MAX_SIZE_DEFAULT = try {
+ loadConfigOrThrow[ByteSize](ConfigKeys.namespaceParameterSizeLimit)
+ } catch {
+ case _: Throwable =>
+ // Supports backwards compatibility for openwhisk that do not use the namespace default limit
+ MAX_SIZE
+ }
+
+ require(MAX_SIZE >= MAX_SIZE_DEFAULT, "The system limit must be greater than the namespace limit.")
+
/** Name of parameter that indicates if action is a feed. */
protected[core] val Feed = "feed"
- protected[core] val sizeLimit = 1 MB
+ protected[core] val sizeLimit = MAX_SIZE
protected[core] def apply(): Parameters = new Parameters(Map.empty)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/TimeLimit.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/TimeLimit.scala
index d8d8c222a..1b6bf545b 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/TimeLimit.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/TimeLimit.scala
@@ -29,6 +29,7 @@ import spray.json.JsValue
import spray.json.RootJsonFormat
import spray.json.deserializationError
import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.http.Messages
/**
* TimeLimit encapsulates a duration for an action. The duration must be within a
@@ -43,17 +44,53 @@ import org.apache.openwhisk.core.ConfigKeys
protected[entity] class TimeLimit private (val duration: FiniteDuration) extends AnyVal {
protected[core] def millis = duration.toMillis.toInt
override def toString = duration.toString
+
+ /** It checks the namespace duration limit setting value */
+ @throws[ActionTimeLimitException]
+ protected[core] def checkNamespaceLimit(user: Identity): Unit = {
+ val durationMax = user.limits.allowedMaxActionTimeout
+ val durationMix = user.limits.allowedMinActionTimeout
+ try {
+ require(
+ duration <= durationMax,
+ Messages.durationExceedsAllowedThreshold(TimeLimit.timeLimitFieldName, duration, durationMax))
+ require(
+ duration >= durationMix,
+ Messages.durationBelowAllowedThreshold(TimeLimit.timeLimitFieldName, duration, durationMix))
+ } catch {
+ case e: IllegalArgumentException => throw ActionTimeLimitException(e.getMessage)
+ }
+ }
}
+case class NamespaceTimeLimitConfig(max: FiniteDuration, min: FiniteDuration)
case class TimeLimitConfig(max: FiniteDuration, min: FiniteDuration, std: FiniteDuration)
protected[core] object TimeLimit extends ArgNormalizer[TimeLimit] {
val config = loadConfigOrThrow[TimeLimitConfig](ConfigKeys.timeLimit)
+ val namespaceDefaultConfig = try {
+ loadConfigOrThrow[NamespaceTimeLimitConfig](ConfigKeys.namespaceTimeLimit)
+ } catch {
+ case _: Throwable =>
+ // Supports backwards compatibility for openwhisk that do not use the namespace default limit
+ NamespaceTimeLimitConfig(config.max, config.min)
+ }
+ val timeLimitFieldName = "duration"
- /** These values are set once at the beginning. Dynamic configuration updates are not supported at the moment. */
+ /**
+ * These system limits and namespace default limits are set once at the beginning.
+ * Dynamic configuration updates are not supported at the moment.
+ */
+ protected[core] val STD_DURATION: FiniteDuration = config.std
protected[core] val MIN_DURATION: FiniteDuration = config.min
protected[core] val MAX_DURATION: FiniteDuration = config.max
- protected[core] val STD_DURATION: FiniteDuration = config.std
+
+ /** Default namespace limit used if there is no namespace-specific limit */
+ protected[core] val MIN_DURATION_DEFAULT: FiniteDuration = namespaceDefaultConfig.min
+ protected[core] val MAX_DURATION_DEFAULT: FiniteDuration = namespaceDefaultConfig.max
+
+ require(MAX_DURATION >= MAX_DURATION_DEFAULT, "The system max limit must be greater than the namespace max limit.")
+ require(MIN_DURATION <= MIN_DURATION_DEFAULT, "The system min limit must be less than the namespace min limit.")
/** A singleton TimeLimit with default value */
protected[core] val standardTimeLimit = TimeLimit(STD_DURATION)
@@ -71,12 +108,6 @@ protected[core] object TimeLimit extends ArgNormalizer[TimeLimit] {
@throws[IllegalArgumentException]
protected[core] def apply(duration: FiniteDuration): TimeLimit = {
require(duration != null, s"duration undefined")
- require(
- duration >= MIN_DURATION,
- s"duration ${duration.toMillis} milliseconds below allowed threshold of ${MIN_DURATION.toMillis} milliseconds")
- require(
- duration <= MAX_DURATION,
- s"duration ${duration.toMillis} milliseconds exceeds allowed threshold of ${MAX_DURATION.toMillis} milliseconds")
new TimeLimit(duration)
}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskEntity.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskEntity.scala
index 0fd81d38e..a3630a5a3 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskEntity.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskEntity.scala
@@ -199,7 +199,10 @@ case class LimitedWhiskEntityPut(exec: Option[Exec] = None,
parameters: Option[Parameters] = None,
annotations: Option[Parameters] = None) {
- def isWithinSizeLimits: Option[SizeError] = {
+ def isWithinSizeLimits(userLimits: UserLimits): Option[SizeError] = {
+
+ val parameterSizeLimit = userLimits.allowedMaxParameterSize
+
exec.flatMap { e =>
val is = e.size
if (is <= Exec.sizeLimit) None
@@ -209,17 +212,17 @@ case class LimitedWhiskEntityPut(exec: Option[Exec] = None,
}
} orElse parameters.flatMap { p =>
val is = p.size
- if (is <= Parameters.sizeLimit) None
+ if (is <= parameterSizeLimit) None
else
Some {
- SizeError(WhiskEntity.paramsFieldName, is, Parameters.sizeLimit)
+ SizeError(WhiskEntity.paramsFieldName, is, parameterSizeLimit)
}
} orElse annotations.flatMap { a =>
val is = a.size
- if (is <= Parameters.sizeLimit) None
+ if (is <= parameterSizeLimit) None
else
Some {
- SizeError(WhiskEntity.annotationsFieldName, is, Parameters.sizeLimit)
+ SizeError(WhiskEntity.annotationsFieldName, is, parameterSizeLimit)
}
}
}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala b/common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala
index 07818418e..5e0a838a5 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala
@@ -20,7 +20,6 @@ package org.apache.openwhisk.http
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import scala.util.Try
-
import akka.http.scaladsl.model.StatusCode
import akka.http.scaladsl.model.StatusCodes.Forbidden
import akka.http.scaladsl.model.StatusCodes.NotFound
@@ -28,9 +27,7 @@ import akka.http.scaladsl.model.MediaType
import akka.http.scaladsl.server.Directives
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.sprayJsonMarshaller
import akka.http.scaladsl.server.StandardRoute
-
import spray.json._
-
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.entity.SizeError
import org.apache.openwhisk.core.entity.ByteSize
@@ -160,6 +157,25 @@ object Messages {
s"${error.field} larger than allowed: ${error.is.toBytes} > ${error.allowed.toBytes} bytes."
}
+ def sizeExceedsAllowedThreshold(field: String, is: Int, allowed: Int) = {
+ s"${field} size ${is} MB exceeds allowed threshold of ${allowed} MB"
+ }
+ def sizeBelowAllowedThreshold(field: String, is: Int, allowed: Int) = {
+ s"${field} size ${is} MB below allowed threshold of ${allowed} MB"
+ }
+ def durationBelowAllowedThreshold(field: String, is: FiniteDuration, allowed: FiniteDuration) = {
+ s"${field} ${is.toMillis} milliseconds below allowed threshold of ${allowed.toMillis} milliseconds"
+ }
+ def durationExceedsAllowedThreshold(field: String, is: FiniteDuration, allowed: FiniteDuration) = {
+ s"${field} ${is.toMillis} milliseconds exceeds allowed threshold of ${allowed.toMillis} milliseconds"
+ }
+ def concurrencyExceedsAllowedThreshold(is: Int, allowed: Int) = {
+ s"concurrency $is exceeds allowed threshold of $allowed"
+ }
+ def concurrencyBelowAllowedThreshold(is: Int, allowed: Int) = {
+ s"concurrency $is below allowed threshold of $allowed"
+ }
+
def listLimitOutOfRange(collection: String, value: Int, max: Int) = {
s"The value '$value' is not in the range of 0 to $max for $collection."
}
@@ -231,6 +247,7 @@ object Messages {
val actionRemovedWhileInvoking = "Action could not be found or may have been deleted."
val actionMismatchWhileInvoking = "Action version is not compatible and cannot be invoked."
val actionFetchErrorWhileInvoking = "Action could not be fetched."
+ val actionLimitExceededSystemLimit = "Action limit exceeded the system limit."
/** Indicates that the image could not be pulled. */
def imagePullError(image: String) = s"Failed to pull container image '$image'."
diff --git a/core/controller/src/main/resources/apiv1swagger.json b/core/controller/src/main/resources/apiv1swagger.json
index 7344209bd..bf2cf35e0 100644
--- a/core/controller/src/main/resources/apiv1swagger.json
+++ b/core/controller/src/main/resources/apiv1swagger.json
@@ -2837,6 +2837,42 @@
"storeActivations": {
"type": "boolean",
"description": "Whether storing activation is turned on for namespace (default is true)"
+ },
+ "maxParameterSize": {
+ "type": "string",
+ "description": "Max parameter size for namespace"
+ },
+ "minActionMemory": {
+ "type": "integer",
+ "description": "Min allowed action memory size in megabytes for namespace"
+ },
+ "maxActionMemory": {
+ "type": "integer",
+ "description": "Max allowed action memory size in megabytes for namespace"
+ },
+ "minActionTimeout": {
+ "type": "integer",
+ "description": "Min allowed action timeout in milliseconds for namespace"
+ },
+ "maxActionTimeout": {
+ "type": "integer",
+ "description": "Max allowed action timeout in milliseconds for namespace"
+ },
+ "minActionLogs": {
+ "type": "integer",
+ "description": "Min allowed action log size in megabytes for namespace"
+ },
+ "maxActionLogs": {
+ "type": "integer",
+ "description": "Max allowed action log size in megabytes for namespace"
+ },
+ "minActionConcurrency": {
+ "type": "integer",
+ "description": "Min number of concurrent activations allowed"
+ },
+ "maxActionConcurrency": {
+ "type": "integer",
+ "description": "Max number of concurrent activations allowed"
}
}
}
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Actions.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Actions.scala
index 7820dfa63..cc1238802 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Actions.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Actions.scala
@@ -212,11 +212,13 @@ trait WhiskActionsApi extends WhiskCollectionAPI with PostActionActivation with
parameter('overwrite ? false) { overwrite =>
entity(as[WhiskActionPut]) { content =>
val request = content.resolve(user.namespace)
- val checkAdditionalPrivileges = entitleReferencedEntities(user, Privilege.READ, request.exec).flatMap {
- case _ => entitlementProvider.check(user, content.exec)
- }
+ val check = for {
+ checkLimits <- checkActionLimits(user, content)
+ checkAdditionalPrivileges <- entitleReferencedEntities(user, Privilege.READ, request.exec).flatMap(_ =>
+ entitlementProvider.check(user, content.exec))
+ } yield (checkAdditionalPrivileges, checkLimits)
- onComplete(checkAdditionalPrivileges) {
+ onComplete(check) {
case Success(_) =>
putEntity(WhiskAction, entityStore, entityName.toDocId, overwrite, update(user, request) _, () => {
make(user, entityName, request)
@@ -628,6 +630,23 @@ trait WhiskActionsApi extends WhiskCollectionAPI with PostActionActivation with
})
}
+ private def checkActionLimits(user: Identity, content: WhiskActionPut)(
+ implicit transid: TransactionId): Future[Unit] = {
+ logging.debug(this, "checking the namespace and system limit for action")
+ try {
+ // check namespace limits
+ content.limits foreach { limit =>
+ limit.memory foreach (_.checkNamespaceLimit(user))
+ limit.timeout foreach (_.checkNamespaceLimit(user))
+ limit.logs foreach (_.checkNamespaceLimit(user))
+ limit.concurrency foreach (_.checkNamespaceLimit(user))
+ }
+ Future.successful(())
+ } catch {
+ case e: ActionLimitsException => Future failed RejectRequest(BadRequest, e.getMessage)
+ }
+ }
+
/**
* Checks that the sequence is not cyclic and that the number of atomic actions in the "inlined" sequence is lower than max allowed.
*
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
index 62659fcea..0d39e3485 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
@@ -267,6 +267,12 @@ object Controller {
"triggers_per_minute" -> config.triggerFirePerMinuteLimit.toInt.toJson,
"concurrent_actions" -> config.actionInvokeConcurrentLimit.toInt.toJson,
"sequence_length" -> config.actionSequenceLimit.toInt.toJson,
+ "default_min_action_duration" -> TimeLimit.namespaceDefaultConfig.min.toMillis.toJson,
+ "default_max_action_duration" -> TimeLimit.namespaceDefaultConfig.max.toMillis.toJson,
+ "default_min_action_memory" -> MemoryLimit.namespaceDefaultConfig.min.toBytes.toJson,
+ "default_max_action_memory" -> MemoryLimit.namespaceDefaultConfig.max.toBytes.toJson,
+ "default_min_action_logs" -> LogLimit.namespaceDefaultConfig.min.toBytes.toJson,
+ "default_max_action_logs" -> LogLimit.namespaceDefaultConfig.max.toBytes.toJson,
"min_action_duration" -> timeLimit.min.toMillis.toJson,
"max_action_duration" -> timeLimit.max.toMillis.toJson,
"min_action_memory" -> memLimit.min.toBytes.toJson,
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Entities.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Entities.scala
index 5e70f7142..102e54916 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Entities.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Entities.scala
@@ -35,7 +35,6 @@ import org.apache.openwhisk.core.entitlement.Privilege
import org.apache.openwhisk.core.entitlement.Privilege.READ
import org.apache.openwhisk.core.entitlement.Resource
import org.apache.openwhisk.core.entity._
-import org.apache.openwhisk.core.entity.ActivationEntityLimit
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.http.ErrorResponse.terminate
import org.apache.openwhisk.http.Messages
@@ -52,16 +51,14 @@ protected[controller] trait ValidateRequestSize extends Directives {
}
/** Checks if request entity is within allowed length range. */
- protected def isWhithinRange(length: Long) = {
- if (length <= allowedActivationEntitySize) {
+ protected def isWhithinRange(userLimits: UserLimits, length: Long) = {
+ if (length <= userLimits.allowedMaxPayloadSize.toBytes) {
None
} else
Some {
- SizeError(fieldDescriptionForSizeError, length.B, allowedActivationEntitySize.B)
+ SizeError(fieldDescriptionForSizeError, length.B, userLimits.allowedMaxPayloadSize.toBytes.B)
}
}
-
- protected val allowedActivationEntitySize: Long = ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT.toBytes
protected val fieldDescriptionForSizeError = "Request"
}
@@ -115,13 +112,15 @@ trait WhiskCollectionAPI
case READ => fetch(user, FullyQualifiedEntityName(resource.namespace, name), resource.env)
case PUT =>
entity(as[LimitedWhiskEntityPut]) { e =>
- validateSize(e.isWithinSizeLimits)(transid, RestApiCommons.jsonDefaultResponsePrinter) {
+ validateSize(e.isWithinSizeLimits(user.limits))(transid, RestApiCommons.jsonDefaultResponsePrinter) {
create(user, FullyQualifiedEntityName(resource.namespace, name))
}
}
case ACTIVATE =>
extract(_.request.entity.contentLengthOption) { length =>
- validateSize(isWhithinRange(length.getOrElse(0)))(transid, RestApiCommons.jsonDefaultResponsePrinter) {
+ validateSize(isWhithinRange(user.limits, length.getOrElse(0)))(
+ transid,
+ RestApiCommons.jsonDefaultResponsePrinter) {
activate(user, FullyQualifiedEntityName(resource.namespace, name), resource.env)
}
}
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Limits.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Limits.scala
index 12120903b..e5fa97553 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Limits.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Limits.scala
@@ -24,7 +24,7 @@ import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.entitlement.{Collection, Privilege, Resource}
import org.apache.openwhisk.core.entitlement.Privilege.READ
-import org.apache.openwhisk.core.entity.Identity
+import org.apache.openwhisk.core.entity.{ConcurrencyLimit, Identity, LogLimit, MemoryLimit, TimeLimit}
trait WhiskLimitsApi extends Directives with AuthenticatedRouteProvider with AuthorizedRouteProvider {
@@ -55,7 +55,16 @@ trait WhiskLimitsApi extends Directives with AuthenticatedRouteProvider with Aut
val limits = user.limits.copy(
Some(user.limits.invocationsPerMinute.getOrElse(invocationsPerMinuteSystemDefault)),
Some(user.limits.concurrentInvocations.getOrElse(concurrentInvocationsSystemDefault)),
- Some(user.limits.firesPerMinute.getOrElse(firePerMinuteSystemDefault)))
+ Some(user.limits.firesPerMinute.getOrElse(firePerMinuteSystemDefault)),
+ maxActionMemory = Some(MemoryLimit(user.limits.allowedMaxActionMemory)),
+ minActionMemory = Some(MemoryLimit(user.limits.allowedMinActionMemory)),
+ maxActionLogs = Some(LogLimit(user.limits.allowedMaxActionLogs)),
+ minActionLogs = Some(LogLimit(user.limits.allowedMinActionLogs)),
+ maxActionTimeout = Some(TimeLimit(user.limits.allowedMaxActionTimeout)),
+ minActionTimeout = Some(TimeLimit(user.limits.allowedMinActionTimeout)),
+ maxActionConcurrency = Some(ConcurrencyLimit(user.limits.allowedMaxActionConcurrency)),
+ minActionConcurrency = Some(ConcurrencyLimit(user.limits.allowedMinActionConcurrency)),
+ maxParameterSize = Some(user.limits.allowedMaxParameterSize))
pathEndOrSingleSlash { complete(OK, limits) }
case _ => reject //should never get here
}
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/WebActions.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/WebActions.scala
index 93e31ac3d..d4eaf8911 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/WebActions.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/WebActions.scala
@@ -491,11 +491,14 @@ trait WhiskWebActionsApi
// as the context body which may be the incoming request when the content type is JSON or formdata, or
// the raw body as __ow_body (and query parameters as __ow_query) otherwise
extract(_.request.entity) { e =>
- validateSize(isWhithinRange(e.contentLengthOption.getOrElse(0)))(transid, jsonPrettyPrinter) {
- requestMethodParamsAndPath { context =>
- provide(fullyQualifiedActionName(actionName)) { fullActionName =>
- onComplete(verifyWebAction(fullActionName)) {
- case Success((actionOwnerIdentity, action)) =>
+ requestMethodParamsAndPath { context =>
+ provide(fullyQualifiedActionName(actionName)) { fullActionName =>
+ onComplete(verifyWebAction(fullActionName)) {
+ case Success((actionOwnerIdentity, action)) =>
+ validateSize(isWhithinRange(actionOwnerIdentity.limits, e.contentLengthOption.getOrElse(0)))(
+ transid,
+ jsonPrettyPrinter) {
+
val actionDelegatesCors =
!action.annotations.getAs[Boolean](Annotations.WebCustomOptionsAnnotationName).getOrElse(false)
@@ -532,14 +535,14 @@ trait WhiskWebActionsApi
context,
e)
}
+ }
- case Failure(t: RejectRequest) =>
- terminate(t.code, t.message)
+ case Failure(t: RejectRequest) =>
+ terminate(t.code, t.message)
- case Failure(t) =>
- logging.error(this, s"exception in handleMatch: $t")
- terminate(InternalServerError)
- }
+ case Failure(t) =>
+ logging.error(this, s"exception in handleMatch: $t")
+ terminate(InternalServerError)
}
}
}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index d5ae131b1..b5548b3b6 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -830,6 +830,8 @@ class ContainerProxy(factory: (TransactionId,
env.toJson.asJsObject,
actionTimeout,
job.action.limits.concurrency.maxConcurrent,
+ job.msg.user.limits.allowedMaxPayloadSize,
+ job.msg.user.limits.allowedTruncationSize,
reschedule)(job.msg.transid)
.map {
case (runInterval, response) =>
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
index 3e834be61..d624ba30a 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
@@ -30,7 +30,7 @@ import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.containerpool._
import org.apache.openwhisk.core.entity.ActivationResponse.{ConnectionError, MemoryExhausted}
-import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ByteSize}
+import org.apache.openwhisk.core.entity.ByteSize
import org.apache.openwhisk.core.entity.size._
import akka.stream.scaladsl.{Framing, Source}
import akka.stream.stage._
@@ -218,32 +218,23 @@ class DockerContainer(protected val id: ContainerId,
body: JsObject,
timeout: FiniteDuration,
maxConcurrent: Int,
+ maxResponse: ByteSize,
+ truncation: ByteSize,
retry: Boolean = false,
reschedule: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
val started = Instant.now()
val http = httpConnection.getOrElse {
val conn = if (Container.config.akkaClient) {
- new AkkaContainerClient(
- addr.host,
- addr.port,
- timeout,
- ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
- ActivationEntityLimit.MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT,
- 1024)
+ new AkkaContainerClient(addr.host, addr.port, timeout, 1024)
} else {
- new ApacheBlockingContainerClient(
- s"${addr.host}:${addr.port}",
- timeout,
- ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
- ActivationEntityLimit.MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT,
- maxConcurrent)
+ new ApacheBlockingContainerClient(s"${addr.host}:${addr.port}", timeout, maxConcurrent)
}
httpConnection = Some(conn)
conn
}
http
- .post(path, body, retry, reschedule)
+ .post(path, body, maxResponse, truncation, retry, reschedule)
.flatMap { response =>
val finished = Instant.now()
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
index 896455c04..9bbbb598a 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
@@ -915,13 +915,17 @@ class FunctionPullingContainerProxy(
get(entityStore, actionid.id, actionid.rev, actionid.rev != DocRevision.empty, false)
.flatMap { action =>
- action.toExecutableWhiskAction match {
- case Some(executable) =>
- Future.successful(RunActivation(executable, msg))
- case None =>
- logging
- .error(this, s"non-executable action reached the invoker ${action.fullyQualifiedName(false)}")
- Future.failed(new IllegalStateException("non-executable action reached the invoker"))
+ {
+ // action that exceed the limit cannot be executed
+ action.limits.checkLimits(msg.user)
+ action.toExecutableWhiskAction match {
+ case Some(executable) =>
+ Future.successful(RunActivation(executable, msg))
+ case None =>
+ logging
+ .error(this, s"non-executable action reached the invoker ${action.fullyQualifiedName(false)}")
+ Future.failed(new IllegalStateException("non-executable action reached the invoker"))
+ }
}
}
.recoverWith {
@@ -939,6 +943,8 @@ class FunctionPullingContainerProxy(
val response = t match {
case _: NoDocumentException =>
ExecutionResponse.applicationError(Messages.actionRemovedWhileInvoking)
+ case e: ActionLimitsException =>
+ ExecutionResponse.applicationError(e.getMessage) // return generated failed message
case _: DocumentTypeMismatchException | _: DocumentUnreadable =>
ExecutionResponse.whiskError(Messages.actionMismatchWhileInvoking)
case e: Throwable =>
@@ -1093,6 +1099,8 @@ class FunctionPullingContainerProxy(
env.toJson.asJsObject,
actionTimeout,
action.limits.concurrency.maxConcurrent,
+ msg.user.limits.allowedMaxPayloadSize,
+ msg.user.limits.allowedTruncationSize,
resumeRun.isDefined)(msg.transid)
.map {
case (runInterval, response) =>
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/ContainerMessageConsumer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/ContainerMessageConsumer.scala
index 64dd46bac..416ce50f4 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/ContainerMessageConsumer.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/ContainerMessageConsumer.scala
@@ -21,7 +21,7 @@ import akka.actor.{ActorRef, ActorSystem, Props}
import org.apache.openwhisk.common.{GracefulShutdown, Logging, TransactionId}
import org.apache.openwhisk.core.WarmUp.isWarmUpAction
import org.apache.openwhisk.core.WhiskConfig
-import org.apache.openwhisk.core.connector.ContainerCreationError.DBFetchError
+import org.apache.openwhisk.core.connector.ContainerCreationError.{DBFetchError, InvalidActionLimitError}
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.containerpool.v2.{CreationContainer, DeletionContainer}
import org.apache.openwhisk.core.database.{ArtifactStore, DocumentRevisionMismatchException, NoDocumentException}
@@ -50,6 +50,8 @@ class ContainerMessageConsumer(
private val consumer =
msgProvider.getConsumer(config, topic, topic, maxPeek, maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
+ private val authStore = WhiskAuthStore.datastore()
+
private def handler(bytes: Array[Byte]): Future[Unit] = Future {
val raw = new String(bytes, StandardCharsets.UTF_8)
ContainerMessage.parse(raw) match {
@@ -63,45 +65,55 @@ class ContainerMessageConsumer(
implicit val transid: TransactionId = creation.transid
logging
.info(this, s"container creation message for ${creation.invocationNamespace}/${creation.action} is received")
- WhiskAction
- .get(entityStore, creation.action.toDocId, creation.revision, fromCache = true)
- .map { action =>
- containerPool ! CreationContainer(creation, action)
- feed ! MessageFeed.Processed
- }
- .recover {
- case t =>
- val message = t match {
- case _: NoDocumentException =>
- Messages.actionRemovedWhileInvoking
- case _: DocumentRevisionMismatchException =>
- Messages.actionMismatchWhileInvoking
- case e: Throwable =>
- logging.error(
- this,
- s"An unknown DB error occurred while fetching action ${creation.invocationNamespace}/${creation.action} for creation ${creation.creationId}, error: $e.")
- Messages.actionFetchErrorWhileInvoking
- }
- logging.error(
- this,
- s"failed to fetch action ${creation.invocationNamespace}/${creation.action}, error: $message (creationId: ${creation.creationId})")
- val ack = ContainerCreationAckMessage(
- creation.transid,
- creation.creationId,
- creation.invocationNamespace,
- creation.action,
- creation.revision,
- creation.whiskActionMetaData,
- invokerInstanceId,
- creation.schedulerHost,
- creation.rpcPort,
- creation.retryCount,
- Some(DBFetchError),
- Some(message))
- sendAckToScheduler(creation.rootSchedulerIndex, ack)
- feed ! MessageFeed.Processed
- }
+ val createContainer = for {
+ identity <- Identity.get(authStore, EntityName(creation.invocationNamespace))
+ action <- WhiskAction
+ .get(entityStore, creation.action.toDocId, creation.revision, fromCache = true)
+ } yield {
+ // check action limits before creating container
+ action.limits.checkLimits(identity)
+ containerPool ! CreationContainer(creation, action)
+ feed ! MessageFeed.Processed
+ }
+ createContainer.recover {
+ case t =>
+ val creationError = t match {
+ case _: ActionLimitsException => InvalidActionLimitError
+ case _ => DBFetchError
+ }
+ val message = t match {
+ case _: ActionLimitsException => t.getMessage // return generated failed message
+ case _: NoDocumentException =>
+ Messages.actionRemovedWhileInvoking
+ case _: DocumentRevisionMismatchException =>
+ Messages.actionMismatchWhileInvoking
+ case e: Throwable =>
+ logging.error(
+ this,
+ s"An unknown DB error occurred while fetching action ${creation.invocationNamespace}/${creation.action} for creation ${creation.creationId}, error: $e.")
+ Messages.actionFetchErrorWhileInvoking
+ }
+ logging.error(
+ this,
+ s"failed to create a container ${creation.invocationNamespace}/${creation.action}, error: $message (creationId: ${creation.creationId})")
+
+ val ack = ContainerCreationAckMessage(
+ creation.transid,
+ creation.creationId,
+ creation.invocationNamespace,
+ creation.action,
+ creation.revision,
+ creation.whiskActionMetaData,
+ invokerInstanceId,
+ creation.schedulerHost,
+ creation.rpcPort,
+ creation.retryCount,
+ Some(creationError),
+ Some(message))
+ sendAckToScheduler(creation.rootSchedulerIndex, ack)
+ feed ! MessageFeed.Processed
+ }
case Success(deletion: ContainerDeletionMessage) =>
implicit val transid: TransactionId = deletion.transid
logging.info(this, s"deletion message for ${deletion.invocationNamespace}/${deletion.action} is received")
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
index c9e32db4d..8d821e422 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
@@ -52,7 +52,6 @@ object InvokerReactive extends InvokerProvider {
poolConfig: ContainerPoolConfig,
limitsConfig: ConcurrencyLimitConfig)(implicit actorSystem: ActorSystem, logging: Logging): InvokerCore =
new InvokerReactive(config, instance, producer, poolConfig, limitsConfig)
-
}
class InvokerReactive(
@@ -178,6 +177,8 @@ class InvokerReactive(
WhiskAction
.get(entityStore, actionid.id, actionid.rev, fromCache = actionid.rev != DocRevision.empty)
.flatMap(action => {
+ // action that exceed the limit cannot be executed.
+ action.limits.checkLimits(msg.user)
action.toExecutableWhiskAction match {
case Some(executable) =>
pool ! Run(executable, msg)
@@ -196,6 +197,8 @@ class InvokerReactive(
val response = t match {
case _: NoDocumentException =>
ActivationResponse.applicationError(Messages.actionRemovedWhileInvoking)
+ case e: ActionLimitsException =>
+ ActivationResponse.applicationError(e.getMessage) // return generated failed message
case _: DocumentTypeMismatchException | _: DocumentUnreadable =>
ActivationResponse.whiskError(Messages.actionMismatchWhileInvoking)
case _ =>
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
index 28ab22210..a5b80ffd1 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
@@ -24,7 +24,7 @@ import org.apache.openwhisk.common._
import org.apache.openwhisk.common.time.{Clock, SystemClock}
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.ack.ActiveAck
-import org.apache.openwhisk.core.connector.ContainerCreationError.ZeroNamespaceLimit
+import org.apache.openwhisk.core.connector.ContainerCreationError.{InvalidActionLimitError, ZeroNamespaceLimit}
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.containerpool.Interval
import org.apache.openwhisk.core.database.{NoDocumentException, UserContext}
@@ -342,15 +342,26 @@ class MemoryQueue(private val etcdClient: EtcdClient,
goto(Running) using RunningData(schedulerActor, droppingActor)
- // log the failed information
- case Event(FailedCreationJob(creationId, _, _, _, error, message), data: FlushingData) =>
- creationIds -= creationId.asString
- logging.info(
- this,
- s"[$invocationNamespace:$action:$stateName][$creationId] Failed to create a container due to $message")
+ case Event(FailedCreationJob(creationId, _, _, _, e, message), data: FlushingData) =>
+ e match {
+ // delete queue when container creation fails with action limit invalid error
+ case InvalidActionLimitError =>
+ logging.info(
+ this,
+ s"[$invocationNamespace:$action:$stateName][$creationId] Clean up because the action limit is invalid")
+ completeAllActivations(data.reason, ContainerCreationError.whiskErrors.contains(data.error))
+ cleanUpActorsAndGotoRemoved(data)
- // keep updating the reason
- stay using data.copy(error = error, reason = message)
+ case _ =>
+ // log the failed information
+ creationIds -= creationId.asString
+ logging.info(
+ this,
+ s"[$invocationNamespace:$action:$stateName][$creationId] Failed to create a container due to $message")
+
+ // keep updating the reason
+ stay using data.copy(error = e, reason = message)
+ }
// since there is no container, activations cannot be handled.
case Event(msg: ActivationMessage, data: FlushingData) =>
@@ -1226,6 +1237,7 @@ case class QueueConfig(idleGrace: FiniteDuration,
failThrottleAsWhiskError: Boolean)
case class BufferedRequest(containerId: String, promise: Promise[Either[MemoryQueueError, ActivationMessage]])
+
case object DropOld
case class ContainerKeyMeta(revision: DocRevision, invokerId: Int, containerId: String)
diff --git a/docs/rest_api.md b/docs/rest_api.md
index effbbd49c..f5a9dbb17 100644
--- a/docs/rest_api.md
+++ b/docs/rest_api.md
@@ -339,7 +339,7 @@ curl -u $AUTH https://$APIHOST/api/v1/namespaces/_/activations/f81dfddd7156401a8
## Limits
-To get the limits set for a namespace (i.e. invocationsPerMinute, concurrentInvocations, firesPerMinute)
+To get the limits set for a namespace (i.e. invocationsPerMinute, concurrentInvocations, firesPerMinute, actionMemoryMax, actionLogsMax...)
```bash
curl -u $AUTH https://$APIHOST/api/v1/namespaces/_/limits
```
diff --git a/tests/src/test/resources/application.conf.j2 b/tests/src/test/resources/application.conf.j2
index a601da065..5e074584b 100644
--- a/tests/src/test/resources/application.conf.j2
+++ b/tests/src/test/resources/application.conf.j2
@@ -175,5 +175,9 @@ test {
concurrency-limit {
max = 200
}
+ namespace-default-limit.concurrency-limit {
+ min = 1
+ max = 200
+ }
}
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/AkkaContainerClientTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/AkkaContainerClientTests.scala
index a3dd87220..b0ff478f0 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/AkkaContainerClientTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/AkkaContainerClientTests.scala
@@ -109,10 +109,10 @@ class AkkaContainerClientTests
it should "not wait longer than set timeout" in {
val timeout = 5.seconds
- val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout, 1.B, 1.B, 100)
+ val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout, 100)
testHang = timeout * 2
val start = Instant.now()
- val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
+ val result = Await.result(connection.post("/init", JsObject.empty, 1.B, 1.B, retry = true), 10.seconds)
val end = Instant.now()
val waited = end.toEpochMilli - start.toEpochMilli
@@ -123,17 +123,17 @@ class AkkaContainerClientTests
it should "handle empty entity response" in {
val timeout = 5.seconds
- val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout, 1.B, 1.B, 100)
+ val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout, 100)
testStatusCode = 204
- val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
+ val result = Await.result(connection.post("/init", JsObject.empty, 1.B, 1.B, retry = true), 10.seconds)
result shouldBe Left(NoResponseReceived())
}
it should "retry till timeout on StreamTcpException" in {
val timeout = 5.seconds
- val connection = new AkkaContainerClient("0.0.0.0", 12345, timeout, 1.B, 1.B, 100)
+ val connection = new AkkaContainerClient("0.0.0.0", 12345, timeout, 100)
val start = Instant.now()
- val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
+ val result = Await.result(connection.post("/init", JsObject.empty, 1.B, 1.B, retry = true), 10.seconds)
val end = Instant.now()
val waited = end.toEpochMilli - start.toEpochMilli
result match {
@@ -146,9 +146,9 @@ class AkkaContainerClientTests
it should "throw ContainerHealthError on HttpHostConnectException if reschedule==true" in {
val timeout = 5.seconds
- val connection = new AkkaContainerClient("0.0.0.0", 12345, timeout, 1.B, 1.B, 100)
+ val connection = new AkkaContainerClient("0.0.0.0", 12345, timeout, 100)
assertThrows[ContainerHealthError] {
- Await.result(connection.post("/run", JsObject.empty, retry = false, reschedule = true), 10.seconds)
+ Await.result(connection.post("/run", JsObject.empty, 1.B, 1.B, retry = false, reschedule = true), 10.seconds)
}
}
@@ -156,11 +156,11 @@ class AkkaContainerClientTests
val timeout = 5.seconds
val retryInterval = 500.milliseconds
val connection =
- new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout, 1.B, 1.B, 100, retryInterval)
+ new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout, 100, retryInterval)
val start = Instant.now()
testConnectionFailCount = 5
testResponse = ""
- val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
+ val result = Await.result(connection.post("/init", JsObject.empty, 1.B, 1.B, retry = true), 10.seconds)
val end = Instant.now()
val waited = end.toEpochMilli - start.toEpochMilli
result shouldBe Right {
@@ -173,12 +173,12 @@ class AkkaContainerClientTests
it should "not truncate responses within limit" in {
val timeout = 1.minute.toMillis
- val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout.millis, 50.B, 50.B, 100)
+ val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout.millis, 100)
Seq(true, false).foreach { success =>
Seq(null, "", "abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
testStatusCode = if (success) 200 else 500
testResponse = r
- val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
+ val result = Await.result(connection.post("/init", JsObject.empty, 50.B, 50.B, retry = true), 10.seconds)
result shouldBe Right {
ContainerResponse(okStatus = success, if (r != null) r else "", None)
}
@@ -191,12 +191,13 @@ class AkkaContainerClientTests
val limit = 2.B
val truncationLimit = 1.B
val connection =
- new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout.millis, limit, truncationLimit, 100)
+ new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout.millis, 100)
Seq(true, false).foreach { success =>
Seq("abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
testStatusCode = if (success) 200 else 500
testResponse = r
- val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
+ val result =
+ Await.result(connection.post("/init", JsObject.empty, limit, truncationLimit, retry = true), 10.seconds)
result shouldBe Right {
ContainerResponse(okStatus = success, r.take(truncationLimit.toBytes.toInt), Some((r.length.B, limit)))
}
@@ -211,13 +212,14 @@ class AkkaContainerClientTests
val limit = 300.KB
val truncationLimit = 299.B
val connection =
- new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout.millis, limit, truncationLimit, 100)
+ new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout.millis, 100)
Seq(true, false).foreach { success =>
// Generate a response that's 1MB
val response = "0" * 1024 * 1024
testStatusCode = if (success) 200 else 500
testResponse = response
- val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
+ val result =
+ Await.result(connection.post("/init", JsObject.empty, limit, truncationLimit, retry = true), 10.seconds)
result shouldBe Right {
ContainerResponse(
okStatus = success,
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala
index d40b1e906..79774b2d5 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala
@@ -100,10 +100,10 @@ class ApacheBlockingContainerClientTests
it should "not wait longer than set timeout" in {
val timeout = 5.seconds
- val connection = new ApacheBlockingContainerClient(hostWithPort, timeout, 1.B, 1.B)
+ val connection = new ApacheBlockingContainerClient(hostWithPort, timeout)
testHang = timeout * 2
val start = Instant.now()
- val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
+ val result = Await.result(connection.post("/init", JsObject.empty, 1.B, 1.B, retry = true), 10.seconds)
val end = Instant.now()
val waited = end.toEpochMilli - start.toEpochMilli
@@ -114,19 +114,19 @@ class ApacheBlockingContainerClientTests
it should "handle empty entity response" in {
val timeout = 5.seconds
- val connection = new ApacheBlockingContainerClient(hostWithPort, timeout, 1.B, 1.B)
+ val connection = new ApacheBlockingContainerClient(hostWithPort, timeout)
testStatusCode = 204
- val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
+ val result = Await.result(connection.post("/init", JsObject.empty, 1.B, 1.B, retry = true), 10.seconds)
result shouldBe Left(NoResponseReceived())
}
it should "retry till timeout on HttpHostConnectException" in {
val timeout = 5.seconds
val badHostAndPort = "0.0.0.0:12345"
- val connection = new ApacheBlockingContainerClient(badHostAndPort, timeout, 1.B, 1.B)
+ val connection = new ApacheBlockingContainerClient(badHostAndPort, timeout)
testStatusCode = 204
val start = Instant.now()
- val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
+ val result = Await.result(connection.post("/init", JsObject.empty, 1.B, 1.B, retry = true), 10.seconds)
val end = Instant.now()
val waited = end.toEpochMilli - start.toEpochMilli
result match {
@@ -142,20 +142,20 @@ class ApacheBlockingContainerClientTests
it should "throw ContainerHealthError on HttpHostConnectException if reschedule==true" in {
val timeout = 5.seconds
val badHostAndPort = "0.0.0.0:12345"
- val connection = new ApacheBlockingContainerClient(badHostAndPort, timeout, 1.B, 1.B)
+ val connection = new ApacheBlockingContainerClient(badHostAndPort, timeout)
assertThrows[ContainerHealthError] {
- Await.result(connection.post("/run", JsObject.empty, retry = false, reschedule = true), 10.seconds)
+ Await.result(connection.post("/run", JsObject.empty, 1.B, 1.B, retry = false, reschedule = true), 10.seconds)
}
}
it should "not truncate responses within limit" in {
val timeout = 1.minute.toMillis
- val connection = new ApacheBlockingContainerClient(hostWithPort, timeout.millis, 50.B, 50.B)
+ val connection = new ApacheBlockingContainerClient(hostWithPort, timeout.millis)
Seq(true, false).foreach { success =>
Seq(null, "", "abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
testStatusCode = if (success) 200 else 500
testResponse = r
- val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
+ val result = Await.result(connection.post("/init", JsObject.empty, 50.B, 50.B, retry = true), 10.seconds)
result shouldBe Right {
ContainerResponse(okStatus = success, if (r != null) r else "", None)
}
@@ -167,12 +167,13 @@ class ApacheBlockingContainerClientTests
val timeout = 1.minute.toMillis
val limit = 2.B
val truncationLimit = 1.B
- val connection = new ApacheBlockingContainerClient(hostWithPort, timeout.millis, limit, truncationLimit)
+ val connection = new ApacheBlockingContainerClient(hostWithPort, timeout.millis)
Seq(true, false).foreach { success =>
Seq("abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
testStatusCode = if (success) 200 else 500
testResponse = r
- val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
+ val result =
+ Await.result(connection.post("/init", JsObject.empty, limit, truncationLimit, retry = true), 10.seconds)
result shouldBe Right {
ContainerResponse(okStatus = success, r.take(truncationLimit.toBytes.toInt), Some((r.length.B, limit)))
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
index 703c31189..dbfb85883 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -19,7 +19,6 @@ package org.apache.openwhisk.core.containerpool.docker.test
import java.io.IOException
import java.time.Instant
-
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import common.TimingHelpers
@@ -42,7 +41,7 @@ import org.apache.openwhisk.common.LogMarker
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.containerpool._
import org.apache.openwhisk.core.containerpool.docker._
-import org.apache.openwhisk.core.entity.ActivationResponse
+import org.apache.openwhisk.core.entity.{ActivationResponse, ByteSize}
import org.apache.openwhisk.core.entity.ActivationResponse.ContainerResponse
import org.apache.openwhisk.core.entity.ActivationResponse.Timeout
import org.apache.openwhisk.core.entity.size._
@@ -109,6 +108,8 @@ class DockerContainerTests
body: JsObject,
timeout: FiniteDuration,
concurrent: Int,
+ maxResponse: ByteSize,
+ truncation: ByteSize,
retry: Boolean = false,
reschedule: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
ccRes
@@ -461,7 +462,7 @@ class DockerContainerTests
Future.successful(RunResult(interval, Right(ContainerResponse(true, result.compactPrint, None))))
}
- val runResult = container.run(JsObject.empty, JsObject.empty, 1.second, 1)
+ val runResult = container.run(JsObject.empty, JsObject.empty, 1.second, 1, 1.MB, 1.MB)
await(runResult) shouldBe (interval, ActivationResponse.success(Some(result), Some(2)))
// assert the starting log is there
@@ -487,7 +488,7 @@ class DockerContainerTests
val initResult = container.initialize(JsObject.empty, 1.second, 1)
an[ContainerHealthError] should be thrownBy await(initResult)
- val runResult = container.run(JsObject.empty, JsObject.empty, 1.second, 1)
+ val runResult = container.run(JsObject.empty, JsObject.empty, 1.second, 1, 1.MB, 1.MB)
an[ContainerHealthError] should be thrownBy await(runResult)
}
@@ -502,7 +503,7 @@ class DockerContainerTests
Future.successful(RunResult(interval, Left(Timeout(new Throwable()))))
}
- val runResult = container.run(JsObject.empty, JsObject.empty, runTimeout, 1)
+ val runResult = container.run(JsObject.empty, JsObject.empty, runTimeout, 1, 1.MB, 1.MB)
await(runResult) shouldBe (interval, ActivationResponse.developerError(
Messages.timedoutActivation(runTimeout, false)))
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index b994cdd20..c471de626 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -108,6 +108,8 @@ class KubernetesContainerTests
body: JsObject,
timeout: FiniteDuration,
concurrent: Int,
+ maxResponse: ByteSize,
+ truncation: ByteSize,
retry: Boolean = false,
reschedule: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
ccRes
@@ -264,7 +266,7 @@ class KubernetesContainerTests
Future.successful(RunResult(interval, Right(ContainerResponse(true, result.compactPrint, None))))
}
- val runResult = container.run(JsObject.empty, JsObject.empty, 1.second, 1)
+ val runResult = container.run(JsObject.empty, JsObject.empty, 1.second, 1, 1.MB, 1.MB)
await(runResult) shouldBe (interval, ActivationResponse.success(Some(result), Some(2)))
// assert the starting log is there
@@ -279,7 +281,7 @@ class KubernetesContainerTests
it should "throw ContainerHealthError if runtime container returns 503 response" in {
implicit val kubernetes = stub[KubernetesApi]
-
+ val runTimeout = 1.second
val interval = intervalOf(1.millisecond)
val result = JsObject.empty
val container = kubernetesContainer() {
@@ -289,7 +291,7 @@ class KubernetesContainerTests
val initResult = container.initialize(JsObject.empty, 1.second, 1)
an[ContainerHealthError] should be thrownBy await(initResult)
- val runResult = container.run(JsObject.empty, JsObject.empty, 1.second, 1)
+ val runResult = container.run(JsObject.empty, JsObject.empty, runTimeout, 1, 1.MB, 1.MB)
an[ContainerHealthError] should be thrownBy await(runResult)
}
@@ -303,7 +305,7 @@ class KubernetesContainerTests
Future.successful(RunResult(interval, Left(Timeout(new Throwable()))))
}
- val runResult = container.run(JsObject.empty, JsObject.empty, runTimeout, 1)
+ val runResult = container.run(JsObject.empty, JsObject.empty, runTimeout, 1, 1.MB, 1.MB)
await(runResult) shouldBe (interval, ActivationResponse.developerError(
Messages.timedoutActivation(runTimeout, false)))
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index 1b4567a2d..66f7c7739 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -1342,6 +1342,8 @@ class ContainerProxyTests
environment: JsObject,
timeout: FiniteDuration,
concurrent: Int,
+ maxResponse: ByteSize,
+ truncation: ByteSize,
reschedule: Boolean = false)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
atomicRunCount.incrementAndGet()
//every other run fails
@@ -1523,6 +1525,8 @@ class ContainerProxyTests
environment: JsObject,
timeout: FiniteDuration,
concurrent: Int,
+ maxResponse: ByteSize,
+ truncation: ByteSize,
reschedule: Boolean = false)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
atomicRunCount.incrementAndGet()
Future.successful((initInterval, ActivationResponse.developerError(("boom"))))
@@ -1692,12 +1696,14 @@ class ContainerProxyTests
environment: JsObject,
timeout: FiniteDuration,
concurrent: Int,
+ maxResponse: ByteSize,
+ truncation: ByteSize,
reschedule: Boolean = false)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
if (reschedule) {
throw ContainerHealthError(transid, "reconnect failed to xyz")
}
- super.run(parameters, environment, timeout, concurrent, reschedule)
+ super.run(parameters, environment, timeout, concurrent, maxResponse, truncation, reschedule)
}
}
val factory = createFactory(Future.successful(container))
@@ -1744,12 +1750,14 @@ class ContainerProxyTests
environment: JsObject,
timeout: FiniteDuration,
concurrent: Int,
+ maxResponse: ByteSize,
+ truncation: ByteSize,
reschedule: Boolean = false)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
if (reschedule) {
throw ContainerHealthError(transid, "reconnect failed to xyz")
}
- super.run(parameters, environment, timeout, concurrent, reschedule)
+ super.run(parameters, environment, timeout, concurrent, maxResponse, truncation, reschedule)
}
}
val factory = createFactory(Future.successful(container))
@@ -2164,6 +2172,8 @@ class ContainerProxyTests
environment: JsObject,
timeout: FiniteDuration,
concurrent: Int,
+ maxResponse: ByteSize,
+ truncation: ByteSize,
reschedule: Boolean = false)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
// the "init" arguments are not passed on run
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
index 2a51d3cad..52362256b 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
@@ -2022,6 +2022,8 @@ class FunctionPullingContainerProxyTests
environment: JsObject,
timeout: FiniteDuration,
concurrent: Int,
+ maxResponse: ByteSize,
+ truncation: ByteSize,
reschedule: Boolean)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
atomicRunCount.incrementAndGet()
Future.successful((initInterval, ActivationResponse.developerError(("boom"))))
@@ -2095,6 +2097,8 @@ class FunctionPullingContainerProxyTests
environment: JsObject,
timeout: FiniteDuration,
concurrent: Int,
+ maxResponse: ByteSize,
+ truncation: ByteSize,
reschedule: Boolean)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
atomicRunCount.incrementAndGet()
//every other run fails
@@ -2703,9 +2707,11 @@ class FunctionPullingContainerProxyTests
environment: JsObject,
timeout: FiniteDuration,
concurrent: Int,
+ maxResponse: ByteSize,
+ truncation: ByteSize,
reschedule: Boolean)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
Thread.sleep((timeoutConfig.pauseGrace + 1.second).toMillis) // 6 sec actions
- super.run(parameters, environment, timeout, concurrent)
+ super.run(parameters, environment, timeout, concurrent, maxResponse, truncation)
}
}
val factory = createFactory(Future.successful(container))
@@ -3101,6 +3107,8 @@ class FunctionPullingContainerProxyTests
environment: JsObject,
timeout: FiniteDuration,
concurrent: Int,
+ maxResponse: ByteSize,
+ truncation: ByteSize,
reschedule: Boolean = false)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
val runCount = atomicRunCount.incrementAndGet()
environment.fields("namespace") shouldBe invocationNamespace.name.toJson
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
index 403c015c8..d2bfde2fc 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
@@ -18,7 +18,6 @@
package org.apache.openwhisk.core.controller.test
import java.time.Instant
-
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
import org.junit.runner.RunWith
@@ -69,7 +68,13 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
def aname() = MakeName.next("action_tests")
val actionLimit = Exec.sizeLimit
- val parametersLimit = Parameters.sizeLimit
+ val parametersLimit = Parameters.MAX_SIZE
+
+ val systemPayloadLimit = ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT
+ val namespacePayloadLimit = systemPayloadLimit - 100.KB
+
+ val credsWithPayloadLimit =
+ WhiskAuthHelpers.newIdentity().copy(limits = UserLimits(maxPayloadSize = Some(namespacePayloadLimit)))
//// GET /actions
it should "return empty list when no actions exist" in {
@@ -530,6 +535,35 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
}
}
+ it should "allow create with parameters that do not exceed the namespace limit" in {
+ implicit val tid = transid()
+
+ val namespaceLimit = Parameters.sizeLimit - 1.KB
+ val parameters = Parameters("a", "a" * (namespaceLimit.toBytes.toInt - 10))
+ val credsWithLimits = creds.copy(limits = UserLimits(maxParameterSize = Some(namespaceLimit)))
+
+ val content = s"""{"exec":{"kind":"nodejs:default","code":"??"},"parameters":$parameters}""".stripMargin
+ Put(s"$collectionPath/${aname()}", content.parseJson.asJsObject) ~> Route.seal(routes(credsWithLimits)) ~> check {
+ status should be(OK)
+ }
+ }
+
+ it should "reject create with parameters that exceed the namespace limit" in {
+ implicit val tid = transid()
+
+ val namespaceLimit = Parameters.sizeLimit - 1.KB
+ val parameters = Parameters("a", "a" * (namespaceLimit.toBytes.toInt + 10))
+ val credsWithLimits = creds.copy(limits = UserLimits(maxParameterSize = Some(namespaceLimit)))
+
+ val content = s"""{"exec":{"kind":"nodejs:default","code":"??"},"parameters":$parameters}""".stripMargin
+ Put(s"$collectionPath/${aname()}", content.parseJson.asJsObject) ~> Route.seal(routes(credsWithLimits)) ~> check {
+ status should be(PayloadTooLarge)
+ responseAs[String] should include {
+ Messages.entityTooBig(SizeError(WhiskEntity.paramsFieldName, parameters.size, namespaceLimit))
+ }
+ }
+ }
+
it should "reject create with annotations which are too big" in {
implicit val tid = transid()
val keys: List[Long] =
@@ -546,15 +580,392 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
}
}
+ it should "allow create with annotations that do not exceed the namespace limit" in {
+ implicit val tid = transid()
+
+ val namespaceLimit = Parameters.sizeLimit - 1.KB
+ val annotations = Parameters("a", "a" * (namespaceLimit.toBytes.toInt - 10))
+ val credsWithLimits = creds.copy(limits = UserLimits(maxParameterSize = Some(namespaceLimit)))
+
+ val content = s"""{"exec":{"kind":"nodejs:default","code":"??"},"annotations":$annotations}""".stripMargin
+ Put(s"$collectionPath/${aname()}", content.parseJson.asJsObject) ~> Route.seal(routes(credsWithLimits)) ~> check {
+ status should be(OK)
+ }
+ }
+
+ it should "reject create with annotations that exceed the namespace limit" in {
+ implicit val tid = transid()
+
+ val namespaceLimit = Parameters.sizeLimit - 1.KB
+ val annotations = Parameters("a", "a" * (namespaceLimit.toBytes.toInt + 10))
+ val credsWithLimits = creds.copy(limits = UserLimits(maxParameterSize = Some(namespaceLimit)))
+
+ val content = s"""{"exec":{"kind":"nodejs:default","code":"??"},"annotations":$annotations}""".stripMargin
+ Put(s"$collectionPath/${aname()}", content.parseJson.asJsObject) ~> Route.seal(routes(credsWithLimits)) ~> check {
+ status should be(PayloadTooLarge)
+ responseAs[String] should include {
+ Messages.entityTooBig(SizeError(WhiskEntity.annotationsFieldName, annotations.size, namespaceLimit))
+ }
+ }
+ }
+
+ it should "reject create when memory is greater than maximum allowed namespace limit" in {
+ implicit val tid = transid()
+
+ val allowed = ByteSize(128, SizeUnits.MB)
+ val is = ByteSize(512, SizeUnits.MB)
+
+ val credsWithNamespaceLimits = WhiskAuthHelpers
+ .newIdentity()
+ .copy(limits = UserLimits(maxActionMemory = Some(MemoryLimit(allowed))))
+
+ val content = WhiskActionPut(
+ Some(jsDefault("_")),
+ Some(Parameters("x", "X")),
+ Some(
+ ActionLimitsOption(
+ Some(TimeLimit(TimeLimit.MAX_DURATION)),
+ Some(MemoryLimit(is)),
+ Some(LogLimit(LogLimit.MAX_LOGSIZE)),
+ Some(ConcurrencyLimit(ConcurrencyLimit.MAX_CONCURRENT)))))
+
+ Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(credsWithNamespaceLimits)) ~> check {
+ status should be(BadRequest)
+ responseAs[String] should include {
+ Messages.sizeExceedsAllowedThreshold(MemoryLimit.memoryLimitFieldName, is.toMB.toInt, allowed.toMB.toInt)
+ }
+ }
+ }
+
+ it should "reject create if exceeds the system memory limit and indicate namespace limit in message" in {
+ implicit val tid = transid()
+
+ val allowed = MemoryLimit.MAX_MEMORY_DEFAULT - 1.MB // namespace limit
+ val is = MemoryLimit.MAX_MEMORY + 1.MB
+
+ val credsWithNamespaceLimits = WhiskAuthHelpers
+ .newIdentity()
+ .copy(limits = UserLimits(maxActionMemory = Some(MemoryLimit(allowed))))
+
+ val content = WhiskActionPut(
+ Some(jsDefault("_")),
+ Some(Parameters("x", "X")),
+ Some(
+ ActionLimitsOption(
+ Some(TimeLimit(TimeLimit.MAX_DURATION)),
+ Some(MemoryLimit(is)),
+ Some(LogLimit(LogLimit.MAX_LOGSIZE)),
+ Some(ConcurrencyLimit(ConcurrencyLimit.MAX_CONCURRENT)))))
+
+ Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(credsWithNamespaceLimits)) ~> check {
+ status should be(BadRequest)
+ responseAs[String] should include {
+ Messages.sizeExceedsAllowedThreshold(MemoryLimit.memoryLimitFieldName, is.toMB.toInt, allowed.toMB.toInt)
+ }
+ }
+ }
+
+ it should "reject create when memory is less than minimum allowed namespace limit" in {
+ implicit val tid = transid()
+
+ val allowed = ByteSize(512, SizeUnits.MB)
+ val is = ByteSize(128, SizeUnits.MB)
+
+ val credsWithNamespaceLimits = WhiskAuthHelpers
+ .newIdentity()
+ .copy(limits = UserLimits(minActionMemory = Some(MemoryLimit(allowed))))
+
+ val content = WhiskActionPut(
+ Some(jsDefault("_")),
+ Some(Parameters("x", "X")),
+ Some(
+ ActionLimitsOption(
+ Some(TimeLimit(TimeLimit.MAX_DURATION)),
+ Some(MemoryLimit(is)),
+ Some(LogLimit(LogLimit.MAX_LOGSIZE)),
+ Some(ConcurrencyLimit(ConcurrencyLimit.MAX_CONCURRENT)))))
+
+ Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(credsWithNamespaceLimits)) ~> check {
+ status should be(BadRequest)
+ responseAs[String] should include {
+ Messages.sizeBelowAllowedThreshold(MemoryLimit.memoryLimitFieldName, is.toMB.toInt, allowed.toMB.toInt)
+ }
+ }
+ }
+
+ it should "reject create when log size is greater than maximum allowed namespace limit" in {
+ implicit val tid = transid()
+
+ val allowed = ByteSize(5, SizeUnits.MB)
+ val is = ByteSize(7, SizeUnits.MB)
+
+ val credsWithNamespaceLimits = WhiskAuthHelpers
+ .newIdentity()
+ .copy(limits = UserLimits(maxActionLogs = Some(LogLimit(allowed))))
+
+ val content = WhiskActionPut(
+ Some(jsDefault("_")),
+ Some(Parameters("x", "X")),
+ Some(
+ ActionLimitsOption(
+ Some(TimeLimit(TimeLimit.MAX_DURATION)),
+ Some(MemoryLimit(MemoryLimit.MAX_MEMORY)),
+ Some(LogLimit(is)),
+ Some(ConcurrencyLimit(ConcurrencyLimit.MAX_CONCURRENT)))))
+
+ Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(credsWithNamespaceLimits)) ~> check {
+ status should be(BadRequest)
+ responseAs[String] should include {
+ Messages.sizeExceedsAllowedThreshold(LogLimit.logLimitFieldName, is.toMB.toInt, allowed.toMB.toInt)
+ }
+ }
+ }
+
+ it should "reject create if exceeds the system log size limit and indicate namespace limit in message" in {
+ implicit val tid = transid()
+
+ val allowed = LogLimit.MAX_LOGSIZE_DEFAULT - 1.MB
+ val is = LogLimit.MAX_LOGSIZE + 1.MB
+
+ val credsWithNamespaceLimits = WhiskAuthHelpers
+ .newIdentity()
+ .copy(limits = UserLimits(maxActionLogs = Some(LogLimit(allowed))))
+
+ val content = WhiskActionPut(
+ Some(jsDefault("_")),
+ Some(Parameters("x", "X")),
+ Some(
+ ActionLimitsOption(
+ Some(TimeLimit(TimeLimit.MAX_DURATION)),
+ Some(MemoryLimit(MemoryLimit.MAX_MEMORY)),
+ Some(LogLimit(is)),
+ Some(ConcurrencyLimit(ConcurrencyLimit.MAX_CONCURRENT)))))
+
+ Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(credsWithNamespaceLimits)) ~> check {
+ status should be(BadRequest)
+ responseAs[String] should include {
+ Messages.sizeExceedsAllowedThreshold(LogLimit.logLimitFieldName, is.toMB.toInt, allowed.toMB.toInt)
+ }
+ }
+ }
+
+ it should "reject create when log size is less than minimum allowed namespace limit" in {
+ implicit val tid = transid()
+
+ val allowed = ByteSize(3, SizeUnits.MB)
+ val is = ByteSize(1, SizeUnits.MB)
+
+ val credsWithNamespaceLimits = WhiskAuthHelpers
+ .newIdentity()
+ .copy(limits = UserLimits(minActionLogs = Some(LogLimit(allowed))))
+
+ val content = WhiskActionPut(
+ Some(jsDefault("_")),
+ Some(Parameters("x", "X")),
+ Some(
+ ActionLimitsOption(
+ Some(TimeLimit(TimeLimit.MAX_DURATION)),
+ Some(MemoryLimit(MemoryLimit.MAX_MEMORY)),
+ Some(LogLimit(is)),
+ Some(ConcurrencyLimit(ConcurrencyLimit.MAX_CONCURRENT)))))
+
+ Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(credsWithNamespaceLimits)) ~> check {
+ status should be(BadRequest)
+ responseAs[String] should include {
+ Messages.sizeBelowAllowedThreshold(LogLimit.logLimitFieldName, is.toMB.toInt, allowed.toMB.toInt)
+ }
+ }
+ }
+
+ it should "reject create when timeout is greater than maximum allowed namespace limit" in {
+ implicit val tid = transid()
+
+ val allowed = TimeLimit.MAX_DURATION.minus(2.second)
+ val is = TimeLimit.MAX_DURATION.minus(1.second)
+
+ val credsWithNamespaceLimits = WhiskAuthHelpers
+ .newIdentity()
+ .copy(limits = UserLimits(maxActionTimeout = Some(TimeLimit(allowed))))
+
+ val content = WhiskActionPut(
+ Some(jsDefault("_")),
+ Some(Parameters("x", "X")),
+ Some(
+ ActionLimitsOption(
+ Some(TimeLimit(is)),
+ Some(MemoryLimit(MemoryLimit.MAX_MEMORY)),
+ Some(LogLimit(LogLimit.MAX_LOGSIZE)),
+ Some(ConcurrencyLimit(ConcurrencyLimit.MAX_CONCURRENT)))))
+
+ Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(credsWithNamespaceLimits)) ~> check {
+ status should be(BadRequest)
+ responseAs[String] should include {
+ Messages.durationExceedsAllowedThreshold(TimeLimit.timeLimitFieldName, is, allowed)
+ }
+ }
+ }
+
+ it should "reject create if exceeds the system timeout limit and indicate namespace limit in message" in {
+ implicit val tid = transid()
+
+ val allowed = TimeLimit.MAX_DURATION_DEFAULT.minus(2.second)
+ val is = TimeLimit.MAX_DURATION.plus(1 second)
+
+ val credsWithNamespaceLimits = WhiskAuthHelpers
+ .newIdentity()
+ .copy(limits = UserLimits(maxActionTimeout = Some(TimeLimit(allowed))))
+
+ val content = WhiskActionPut(
+ Some(jsDefault("_")),
+ Some(Parameters("x", "X")),
+ Some(
+ ActionLimitsOption(
+ Some(TimeLimit(is)),
+ Some(MemoryLimit(MemoryLimit.MAX_MEMORY)),
+ Some(LogLimit(LogLimit.MAX_LOGSIZE)),
+ Some(ConcurrencyLimit(ConcurrencyLimit.MAX_CONCURRENT)))))
+
+ Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(credsWithNamespaceLimits)) ~> check {
+ status should be(BadRequest)
+ responseAs[String] should include {
+ Messages.durationExceedsAllowedThreshold(TimeLimit.timeLimitFieldName, is, allowed)
+ }
+ }
+ }
+
+ it should "reject create when timeout is less than minimum allowed namespace limit" in {
+ implicit val tid = transid()
+
+ val allowed = TimeLimit.MIN_DURATION.plus(2.second)
+ val is = TimeLimit.MIN_DURATION.plus(1.second)
+
+ val credsWithNamespaceLimits = WhiskAuthHelpers
+ .newIdentity()
+ .copy(limits = UserLimits(minActionTimeout = Some(TimeLimit(allowed))))
+
+ val content = WhiskActionPut(
+ Some(jsDefault("_")),
+ Some(Parameters("x", "X")),
+ Some(
+ ActionLimitsOption(
+ Some(TimeLimit(is)),
+ Some(MemoryLimit(MemoryLimit.MAX_MEMORY)),
+ Some(LogLimit(LogLimit.MAX_LOGSIZE)),
+ Some(ConcurrencyLimit(ConcurrencyLimit.MAX_CONCURRENT)))))
+
+ Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(credsWithNamespaceLimits)) ~> check {
+ status should be(BadRequest)
+ responseAs[String] should include {
+ Messages.durationBelowAllowedThreshold(TimeLimit.timeLimitFieldName, is, allowed)
+ }
+ }
+ }
+
+ it should "reject create when max concurrency is greater than maximum allowed namespace limit" in {
+ implicit val tid = transid()
+
+ val allowed = ConcurrencyLimit.MAX_CONCURRENT - 2
+ val is = ConcurrencyLimit.MAX_CONCURRENT - 1
+
+ val credsWithNamespaceLimits = WhiskAuthHelpers
+ .newIdentity()
+ .copy(limits = UserLimits(maxActionConcurrency = Some(ConcurrencyLimit(allowed))))
+
+ val content = WhiskActionPut(
+ Some(jsDefault("_")),
+ Some(Parameters("x", "X")),
+ Some(
+ ActionLimitsOption(
+ Some(TimeLimit(TimeLimit.MAX_DURATION)),
+ Some(MemoryLimit(MemoryLimit.MAX_MEMORY)),
+ Some(LogLimit(LogLimit.MAX_LOGSIZE)),
+ Some(ConcurrencyLimit(is)))))
+
+ Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(credsWithNamespaceLimits)) ~> check {
+ status should be(BadRequest)
+ responseAs[String] should include {
+ Messages.concurrencyExceedsAllowedThreshold(is, allowed)
+ }
+ }
+ }
+
+ it should "reject create if exceeds the system max concurrency limit and indicate namespace limit in message" in {
+ implicit val tid = transid()
+
+ val allowed = ConcurrencyLimit.MAX_CONCURRENT_DEFAULT - 1
+ val is = ConcurrencyLimit.MAX_CONCURRENT + 1
+
+ val credsWithNamespaceLimits = WhiskAuthHelpers
+ .newIdentity()
+ .copy(limits = UserLimits(maxActionConcurrency = Some(ConcurrencyLimit(allowed))))
+
+ val content = WhiskActionPut(
+ Some(jsDefault("_")),
+ Some(Parameters("x", "X")),
+ Some(
+ ActionLimitsOption(
+ Some(TimeLimit(TimeLimit.MAX_DURATION)),
+ Some(MemoryLimit(MemoryLimit.MAX_MEMORY)),
+ Some(LogLimit(LogLimit.MAX_LOGSIZE)),
+ Some(ConcurrencyLimit(is)))))
+
+ Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(credsWithNamespaceLimits)) ~> check {
+ status should be(BadRequest)
+ responseAs[String] should include {
+ Messages.concurrencyExceedsAllowedThreshold(is, allowed)
+ }
+ }
+ }
+
+ it should "reject create when max concurrency is less than minimum allowed namespace limit" in {
+ implicit val tid = transid()
+
+ val allowed = ConcurrencyLimit.MIN_CONCURRENT + 2
+ val is = ConcurrencyLimit.MIN_CONCURRENT + 1
+
+ val credsWithNamespaceLimits = WhiskAuthHelpers
+ .newIdentity()
+ .copy(limits = UserLimits(minActionConcurrency = Some(ConcurrencyLimit(allowed))))
+
+ val content = WhiskActionPut(
+ Some(jsDefault("_")),
+ Some(Parameters("x", "X")),
+ Some(
+ ActionLimitsOption(
+ Some(TimeLimit(TimeLimit.MAX_DURATION)),
+ Some(MemoryLimit(MemoryLimit.MAX_MEMORY)),
+ Some(LogLimit(LogLimit.MAX_LOGSIZE)),
+ Some(ConcurrencyLimit(is)))))
+
+ Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(credsWithNamespaceLimits)) ~> check {
+ status should be(BadRequest)
+ responseAs[String] should include {
+ Messages.concurrencyBelowAllowedThreshold(is, allowed)
+ }
+ }
+ }
+
it should "reject activation with entity which is too big" in {
implicit val tid = transid()
- val code = "a" * (allowedActivationEntitySize.toInt + 1)
+ val code = "a" * (systemPayloadLimit.toBytes.toInt + 1)
val content = s"""{"a":"$code"}""".stripMargin
Post(s"$collectionPath/${aname()}", content.parseJson.asJsObject) ~> Route.seal(routes(creds)) ~> check {
+ status should be(PayloadTooLarge)
+ responseAs[String] should include {
+ Messages.entityTooBig(SizeError(fieldDescriptionForSizeError, (content.length).B, systemPayloadLimit.toBytes.B))
+ }
+ }
+ }
+
+ it should "reject activation with entity size exceeds allowed namespace limit" in {
+ implicit val tid = transid()
+ val code = "a" * (namespacePayloadLimit.toBytes.toInt + 1)
+ val content = s"""{"a":"$code"}""".stripMargin
+ Post(s"$collectionPath/${aname()}", content.parseJson.asJsObject) ~> Route.seal(routes(credsWithPayloadLimit)) ~> check {
status should be(PayloadTooLarge)
responseAs[String] should include {
Messages.entityTooBig(
- SizeError(fieldDescriptionForSizeError, (content.length).B, allowedActivationEntitySize.B))
+ SizeError(fieldDescriptionForSizeError, (content.length).B, namespacePayloadLimit.toBytes.B))
}
}
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerApiTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerApiTests.scala
index 381159679..5917c0396 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerApiTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerApiTests.scala
@@ -57,6 +57,12 @@ class ControllerApiTests extends FlatSpec with RestUtil with Matchers with Strea
"triggers_per_minute" -> config.triggerFirePerMinuteLimit.toInt.toJson,
"concurrent_actions" -> config.actionInvokeConcurrentLimit.toInt.toJson,
"sequence_length" -> config.actionSequenceLimit.toInt.toJson,
+ "default_min_action_duration" -> TimeLimit.namespaceDefaultConfig.min.toMillis.toJson,
+ "default_max_action_duration" -> TimeLimit.namespaceDefaultConfig.max.toMillis.toJson,
+ "default_min_action_memory" -> MemoryLimit.namespaceDefaultConfig.min.toBytes.toJson,
+ "default_max_action_memory" -> MemoryLimit.namespaceDefaultConfig.max.toBytes.toJson,
+ "default_min_action_logs" -> LogLimit.namespaceDefaultConfig.min.toBytes.toJson,
+ "default_max_action_logs" -> LogLimit.namespaceDefaultConfig.max.toBytes.toJson,
"min_action_duration" -> TimeLimit.config.min.toMillis.toJson,
"max_action_duration" -> TimeLimit.config.max.toMillis.toJson,
"min_action_memory" -> MemoryLimit.config.min.toBytes.toJson,
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/LimitsApiTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/LimitsApiTests.scala
index b6f2eb906..39b8ca74a 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/LimitsApiTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/LimitsApiTests.scala
@@ -23,7 +23,10 @@ import akka.http.scaladsl.model.StatusCodes.{BadRequest, MethodNotAllowed, OK}
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.sprayJsonUnmarshaller
import akka.http.scaladsl.server.Route
import org.apache.openwhisk.core.controller.WhiskLimitsApi
-import org.apache.openwhisk.core.entity.{EntityPath, UserLimits}
+import org.apache.openwhisk.core.entity.{ConcurrencyLimit, EntityPath, LogLimit, MemoryLimit, TimeLimit, UserLimits}
+import org.apache.openwhisk.core.entity.size._
+
+import scala.concurrent.duration._
/**
* Tests Packages API.
@@ -49,6 +52,15 @@ class LimitsApiTests extends ControllerTestCommon with WhiskLimitsApi {
val testAllowedKinds = Set("java:8")
val testStoreActivations = false
+ val testMemoryMin = MemoryLimit(150.MB)
+ val testMemoryMax = MemoryLimit(200.MB)
+ val testLogMin = LogLimit(3.MB)
+ val testLogMax = LogLimit(6.MB)
+ val testDurationMax = TimeLimit(20.seconds)
+ val testDurationMin = TimeLimit(10.seconds)
+ val testConcurrencyMax = ConcurrencyLimit(20)
+ val testConcurrencyMin = ConcurrencyLimit(10)
+
val creds = WhiskAuthHelpers.newIdentity()
val credsWithSetLimits = WhiskAuthHelpers
.newIdentity()
@@ -58,7 +70,15 @@ class LimitsApiTests extends ControllerTestCommon with WhiskLimitsApi {
Some(testConcurrent),
Some(testFiresPerMinute),
Some(testAllowedKinds),
- Some(testStoreActivations)))
+ Some(testStoreActivations),
+ minActionMemory = Some(testMemoryMin),
+ maxActionMemory = Some(testMemoryMax),
+ minActionLogs = Some(testLogMin),
+ maxActionLogs = Some(testLogMax),
+ maxActionTimeout = Some(testDurationMax),
+ minActionTimeout = Some(testDurationMin),
+ maxActionConcurrency = Some(testConcurrencyMax),
+ minActionConcurrency = Some(testConcurrencyMin)))
val namespace = EntityPath(creds.subject.asString)
val collectionPath = s"/${EntityPath.DEFAULT}/${collection.path}"
@@ -73,6 +93,16 @@ class LimitsApiTests extends ControllerTestCommon with WhiskLimitsApi {
responseAs[UserLimits].firesPerMinute shouldBe Some(whiskConfig.triggerFirePerMinuteLimit.toInt)
responseAs[UserLimits].allowedKinds shouldBe None
responseAs[UserLimits].storeActivations shouldBe None
+
+ // provide default action limits
+ responseAs[UserLimits].minActionMemory.get.megabytes shouldBe MemoryLimit.MIN_MEMORY_DEFAULT.toMB
+ responseAs[UserLimits].maxActionMemory.get.megabytes shouldBe MemoryLimit.MAX_MEMORY_DEFAULT.toMB
+ responseAs[UserLimits].minActionLogs.get.megabytes shouldBe LogLimit.MIN_LOGSIZE_DEFAULT.toMB
+ responseAs[UserLimits].maxActionLogs.get.megabytes shouldBe LogLimit.MAX_LOGSIZE_DEFAULT.toMB
+ responseAs[UserLimits].minActionTimeout.get.duration shouldBe TimeLimit.MIN_DURATION_DEFAULT
+ responseAs[UserLimits].maxActionTimeout.get.duration shouldBe TimeLimit.MAX_DURATION_DEFAULT
+ responseAs[UserLimits].minActionConcurrency.get.maxConcurrent shouldBe ConcurrencyLimit.MIN_CONCURRENT_DEFAULT
+ responseAs[UserLimits].maxActionConcurrency.get.maxConcurrent shouldBe ConcurrencyLimit.MAX_CONCURRENT_DEFAULT
}
}
}
@@ -87,6 +117,16 @@ class LimitsApiTests extends ControllerTestCommon with WhiskLimitsApi {
responseAs[UserLimits].firesPerMinute shouldBe Some(testFiresPerMinute)
responseAs[UserLimits].allowedKinds shouldBe Some(testAllowedKinds)
responseAs[UserLimits].storeActivations shouldBe Some(testStoreActivations)
+
+ // provide action limits for namespace
+ responseAs[UserLimits].minActionMemory.get.megabytes shouldBe testMemoryMin.megabytes
+ responseAs[UserLimits].maxActionMemory.get.megabytes shouldBe testMemoryMax.megabytes
+ responseAs[UserLimits].minActionLogs.get.megabytes shouldBe testLogMin.megabytes
+ responseAs[UserLimits].maxActionLogs.get.megabytes shouldBe testLogMax.megabytes
+ responseAs[UserLimits].minActionTimeout.get.duration shouldBe testDurationMin.duration
+ responseAs[UserLimits].maxActionTimeout.get.duration shouldBe testDurationMax.duration
+ responseAs[UserLimits].minActionConcurrency.get.maxConcurrent shouldBe testConcurrencyMin.maxConcurrent
+ responseAs[UserLimits].maxActionConcurrency.get.maxConcurrent shouldBe testConcurrencyMax.maxConcurrent
}
}
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/TriggersApiTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/TriggersApiTests.scala
index 58b2fd480..ae7b4fc0a 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/TriggersApiTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/TriggersApiTests.scala
@@ -67,6 +67,7 @@ class TriggersApiTests extends ControllerTestCommon with WhiskTriggersApi {
def aname() = MakeName.next("triggers_tests")
def afullname(namespace: EntityPath, name: String) = FullyQualifiedEntityName(namespace, EntityName(name))
val parametersLimit = Parameters.sizeLimit
+ val systemPayloadLimit = ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT
val dummyInstant = Instant.now()
//// GET /triggers
@@ -269,13 +270,12 @@ class TriggersApiTests extends ControllerTestCommon with WhiskTriggersApi {
it should "reject activation with entity which is too big" in {
implicit val tid = transid()
- val code = "a" * (allowedActivationEntitySize.toInt + 1)
+ val code = "a" * (systemPayloadLimit.toBytes.toInt + 1)
val content = s"""{"a":"$code"}""".stripMargin
Post(s"$collectionPath/${aname()}", content.parseJson.asJsObject) ~> Route.seal(routes(creds)) ~> check {
status should be(PayloadTooLarge)
responseAs[String] should include {
- Messages.entityTooBig(
- SizeError(fieldDescriptionForSizeError, (content.length).B, allowedActivationEntitySize.B))
+ Messages.entityTooBig(SizeError(fieldDescriptionForSizeError, (content.length).B, systemPayloadLimit.toBytes.B))
}
}
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/WebActionsApiTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/WebActionsApiTests.scala
index f0eb668df..e2e8c391b 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/WebActionsApiTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/WebActionsApiTests.scala
@@ -118,12 +118,21 @@ class WebActionsApiTests extends FlatSpec with Matchers with WebActionsApiBaseTe
}
trait WebActionsApiBaseTests extends ControllerTestCommon with BeforeAndAfterEach with WhiskWebActionsApi {
+
+ val systemPayloadLimit = ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT
+ val namespacePayloadLimit = systemPayloadLimit - 100.KB
+
val uuid = UUID()
val systemId = Subject()
val systemKey = BasicAuthenticationAuthKey(uuid, Secret())
val systemIdentity =
Future.successful(
- Identity(systemId, Namespace(EntityName(systemId.asString), uuid), systemKey, rights = Privilege.ALL))
+ Identity(
+ systemId,
+ Namespace(EntityName(systemId.asString), uuid),
+ systemKey,
+ rights = Privilege.ALL,
+ limits = UserLimits(maxPayloadSize = Option(namespacePayloadLimit))))
val namespace = EntityPath(systemId.asString)
val proxyNamespace = namespace.addPath(EntityName("proxy"))
override lazy val entitlementProvider = new TestingEntitlementProvider(whiskConfig, loadBalancer)
@@ -140,6 +149,7 @@ trait WebActionsApiBaseTests extends ControllerTestCommon with BeforeAndAfterEac
var requireAuthenticationKey = "example-web-action-api-key"
var invocationCount = 0
var invocationsAllowed = 0
+
lazy val testFixturesToGc = {
implicit val tid = transid()
Seq(
@@ -1504,17 +1514,61 @@ trait WebActionsApiBaseTests extends ControllerTestCommon with BeforeAndAfterEac
}
}
- it should s"reject requests when entity size exceeds allowed limit (auth? ${creds.isDefined})" in {
+ it should s"reject requests when entity size exceeds allowed system limit (auth? ${creds.isDefined})" in {
+ implicit val tid = transid()
+
+ Seq(s"$systemId/proxy/export_c.json").foreach { path =>
+ val largeEntity = "a" * (systemPayloadLimit.toBytes.toInt + 1)
+
+ val content = s"""{"a":"$largeEntity"}"""
+ Post(s"$testRoutePath/$path", content.parseJson.asJsObject) ~> Route.seal(routes(creds)) ~> check {
+ status should be(PayloadTooLarge)
+ val expectedErrorMsg = Messages.entityTooBig(
+ // must contains namespace's payload limit size in error message
+ SizeError(fieldDescriptionForSizeError, (largeEntity.length + 8).B, namespacePayloadLimit.toBytes.B))
+ confirmErrorWithTid(responseAs[JsObject], Some(expectedErrorMsg))
+ }
+
+ val form = FormData(Map("a" -> largeEntity))
+ Post(s"$testRoutePath/$path", form) ~> Route.seal(routes(creds)) ~> check {
+ status should be(PayloadTooLarge)
+ val expectedErrorMsg = Messages.entityTooBig(
+ // must contains namespace's payload limit size in error message
+ SizeError(fieldDescriptionForSizeError, (largeEntity.length + 2).B, namespacePayloadLimit.toBytes.B))
+ confirmErrorWithTid(responseAs[JsObject], Some(expectedErrorMsg))
+ }
+ }
+ }
+
+ it should s"allow requests when entity size does not exceed allowed namespace limit (auth? ${creds.isDefined})" in {
+ implicit val tid = transid()
+
+ Seq(s"$systemId/proxy/export_c.json").foreach { path =>
+ val largeEntity = "a" * (namespacePayloadLimit.toBytes.toInt - 10)
+
+ val content = s"""{"a":"$largeEntity"}"""
+ Post(s"$testRoutePath/$path", content.parseJson.asJsObject) ~> Route.seal(routes(creds)) ~> check {
+ status should be(OK)
+ }
+
+ val form = FormData(Map("a" -> largeEntity))
+ Post(s"$testRoutePath/$path", form) ~> Route.seal(routes(creds)) ~> check {
+ status should be(OK)
+ }
+ }
+ }
+
+ it should s"reject requests when entity size exceeds allowed namespace limit (auth? ${creds.isDefined})" in {
implicit val tid = transid()
Seq(s"$systemId/proxy/export_c.json").foreach { path =>
- val largeEntity = "a" * (allowedActivationEntitySize.toInt + 1)
+ val largeEntity = "a" * (namespacePayloadLimit.toBytes.toInt + 1)
val content = s"""{"a":"$largeEntity"}"""
Post(s"$testRoutePath/$path", content.parseJson.asJsObject) ~> Route.seal(routes(creds)) ~> check {
status should be(PayloadTooLarge)
val expectedErrorMsg = Messages.entityTooBig(
- SizeError(fieldDescriptionForSizeError, (largeEntity.length + 8).B, allowedActivationEntitySize.B))
+ SizeError(fieldDescriptionForSizeError, (largeEntity.length + 8).B, namespacePayloadLimit.toBytes.B))
confirmErrorWithTid(responseAs[JsObject], Some(expectedErrorMsg))
}
@@ -1522,7 +1576,7 @@ trait WebActionsApiBaseTests extends ControllerTestCommon with BeforeAndAfterEac
Post(s"$testRoutePath/$path", form) ~> Route.seal(routes(creds)) ~> check {
status should be(PayloadTooLarge)
val expectedErrorMsg = Messages.entityTooBig(
- SizeError(fieldDescriptionForSizeError, (largeEntity.length + 2).B, allowedActivationEntitySize.B))
+ SizeError(fieldDescriptionForSizeError, (largeEntity.length + 2).B, namespacePayloadLimit.toBytes.B))
confirmErrorWithTid(responseAs[JsObject], Some(expectedErrorMsg))
}
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/SchemaTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/SchemaTests.scala
index 3af4bd2a6..e03989b9d 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/SchemaTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/SchemaTests.scala
@@ -19,7 +19,6 @@ package org.apache.openwhisk.core.entity.test
import java.util.Base64
-import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
import scala.util.Failure
import scala.util.Try
@@ -826,61 +825,16 @@ class SchemaTests extends FlatSpec with BeforeAndAfter with ExecHelpers with Mat
serdes foreach { s =>
withClue(s"serializer $s") {
- if (s != LogLimit.serdes) {
- val lb = the[DeserializationException] thrownBy s.read(JsNumber(0))
- lb.getMessage should include("below allowed threshold")
- } else {
+ if (s == LogLimit.serdes) {
val lb = the[DeserializationException] thrownBy s.read(JsNumber(-1))
lb.getMessage should include("a negative size of an object is not allowed")
}
-
- val ub = the[DeserializationException] thrownBy s.read(JsNumber(Int.MaxValue))
- ub.getMessage should include("exceeds allowed threshold")
-
val int = the[DeserializationException] thrownBy s.read(JsNumber(2.5))
int.getMessage should include("limit must be whole number")
}
}
}
- it should "reject bad limit values" in {
- an[IllegalArgumentException] should be thrownBy ActionLimits(
- TimeLimit(TimeLimit.MIN_DURATION - 1.millisecond),
- MemoryLimit(),
- LogLimit())
- an[IllegalArgumentException] should be thrownBy ActionLimits(
- TimeLimit(),
- MemoryLimit(MemoryLimit.MIN_MEMORY - 1.B),
- LogLimit())
- an[IllegalArgumentException] should be thrownBy ActionLimits(
- TimeLimit(),
- MemoryLimit(),
- LogLimit(LogLimit.MIN_LOGSIZE - 1.B))
- an[IllegalArgumentException] should be thrownBy ActionLimits(
- TimeLimit(),
- MemoryLimit(),
- LogLimit(),
- ConcurrencyLimit(ConcurrencyLimit.MIN_CONCURRENT - 1))
-
- an[IllegalArgumentException] should be thrownBy ActionLimits(
- TimeLimit(TimeLimit.MAX_DURATION + 1.millisecond),
- MemoryLimit(),
- LogLimit())
- an[IllegalArgumentException] should be thrownBy ActionLimits(
- TimeLimit(),
- MemoryLimit(MemoryLimit.MAX_MEMORY + 1.B),
- LogLimit())
- an[IllegalArgumentException] should be thrownBy ActionLimits(
- TimeLimit(),
- MemoryLimit(),
- LogLimit(LogLimit.MAX_LOGSIZE + 1.B))
- an[IllegalArgumentException] should be thrownBy ActionLimits(
- TimeLimit(),
- MemoryLimit(),
- LogLimit(),
- ConcurrencyLimit(ConcurrencyLimit.MAX_CONCURRENT + 1))
- }
-
it should "parse activation id as uuid" in {
val id = "213174381920559471141441e1111111"
val aid = ActivationId.parse(id)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/ContainerMessageConsumerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/ContainerMessageConsumerTests.scala
index 38fdd24ea..aa2db7c21 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/ContainerMessageConsumerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/ContainerMessageConsumerTests.scala
@@ -61,6 +61,8 @@ class ContainerMessageConsumerTests
implicit val transId = TransactionId.testing
implicit val creationId = CreationId.generate()
+ val authStore = WhiskAuthStore.datastore()
+
override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
super.afterAll()
@@ -81,12 +83,21 @@ class ContainerMessageConsumerTests
private val invokerInstance = InvokerInstanceId(0, userMemory = defaultUserMemory)
private val schedulerInstanceId = SchedulerInstanceId("0")
+ /* Subject document needed for the second test */
private val invocationNamespace = EntityName("invocationSpace")
+ private val uuid = UUID()
+ private val ak = BasicAuthenticationAuthKey(uuid, Secret())
+ private val ns = Namespace(invocationNamespace, uuid)
+ private val auth = WhiskAuth(Subject(), Set(WhiskNamespace(ns, ak)))
private val schedulerHost = "127.17.0.1"
private val rpcPort = 13001
+ override def beforeAll() = {
+ put(authStore, auth)
+ }
+
override def afterEach(): Unit = {
cleanup()
}
diff --git a/tools/admin/wskadmin b/tools/admin/wskadmin
index 126e7847d..7569261da 100755
--- a/tools/admin/wskadmin
+++ b/tools/admin/wskadmin
@@ -148,6 +148,17 @@ def parseArgs():
subcmd.add_argument('--concurrentInvocations', help='concurrent invocations allowed for this namespace', type=int)
subcmd.add_argument('--allowedKinds', help='list of runtime kinds allowed in this namespace', nargs='+', type=str)
subcmd.add_argument('--storeActivations', help='enable or disable storing of activations to datastore for this namespace', default=None, type=str_to_bool)
+ subcmd.add_argument('--minActionMemory', help='minimum action memory size for this namespace', default=None, type=int)
+ subcmd.add_argument('--maxActionMemory', help='maximum action memory size for this namespace', default=None, type=int)
+ subcmd.add_argument('--minActionLogs', help='minimum activation log size for this namespace', default=None, type=int)
+ subcmd.add_argument('--maxActionLogs', help='maximum activation log size for this namespace', default=None, type=int)
+ subcmd.add_argument('--minActionTimeout', help='minimum action time limit for this namespace', default=None, type=int)
+ subcmd.add_argument('--maxActionTimeout', help='maximum action time limit for this namespace', default=None, type=int)
+ subcmd.add_argument('--minActionConcurrency', help='minimum action concurrency limit for this namespace', default=None, type=int)
+ subcmd.add_argument('--maxActionConcurrency', help='maximum action concurrency limit for this namespace', default=None, type=int)
+ subcmd.add_argument('--maxParameterSize', help='maximum parameter size for this namespace', default=None, type=str)
+ subcmd.add_argument('--maxPayloadSize', help='maximum payload size for this namespace', default=None, type=str)
+ subcmd.add_argument('--truncationSize', help='activation truncation size for this namespace', default=None, type=str)
subcmd = subparser.add_parser('get', help='get limits for a given namespace (if none exist, system defaults apply)')
subcmd.add_argument('namespace', help='the namespace to get limits for')
@@ -549,7 +560,24 @@ def setLimitsCmd(args, props):
(dbDoc, res) = getDocumentFromDb(props, quote_plus(docId), args.verbose)
doc = dbDoc or {'_id': docId}
- limits = ['invocationsPerMinute', 'firesPerMinute', 'concurrentInvocations', 'allowedKinds', 'storeActivations']
+ limits = [
+ 'invocationsPerMinute',
+ 'firesPerMinute',
+ 'concurrentInvocations',
+ 'allowedKinds',
+ 'storeActivations',
+ 'minActionMemory',
+ 'maxActionMemory',
+ 'minActionLogs',
+ 'maxActionLogs',
+ 'minActionTimeout',
+ 'maxActionTimeout',
+ 'minActionConcurrency',
+ 'maxActionConcurrency',
+ 'maxParameterSize',
+ 'maxPayloadSize',
+ 'truncationSize'
+ ]
for limit in limits:
givenLimit = argsDict.get(limit)
toSet = givenLimit if givenLimit != None else doc.get(limit)
@@ -568,7 +596,24 @@ def getLimitsCmd(args, props):
(dbDoc, res) = getDocumentFromDb(props, quote_plus(docId), args.verbose)
if dbDoc is not None:
- limits = ['invocationsPerMinute', 'firesPerMinute', 'concurrentInvocations', 'allowedKinds', 'storeActivations']
+ limits = [
+ 'invocationsPerMinute',
+ 'firesPerMinute',
+ 'concurrentInvocations',
+ 'allowedKinds',
+ 'storeActivations',
+ 'minActionMemory',
+ 'maxActionMemory',
+ 'minActionLogs',
+ 'maxActionLogs',
+ 'minActionTimeout',
+ 'maxActionTimeout',
+ 'minActionConcurrency',
+ 'maxActionConcurrency',
+ 'maxParameterSize',
+ 'maxPayloadSize',
+ 'truncationSize'
+ ]
for limit in limits:
givenLimit = dbDoc.get(limit)
if givenLimit != None: