You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cs...@apache.org on 2017/09/07 15:09:16 UTC
[incubator-openwhisk] branch master updated: Decorate throttled
error response with count/limit. (#2684)
This is an automated email from the ASF dual-hosted git repository.
csantanapr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 81ce5d4 Decorate throttled error response with count/limit. (#2684)
81ce5d4 is described below
commit 81ce5d43d82e3323d14cf5a8662da9fe60ffd0c3
Author: rodric rabbah <ro...@gmail.com>
AuthorDate: Thu Sep 7 11:09:14 2017 -0400
Decorate throttled error response with count/limit. (#2684)
---
.../src/main/scala/whisk/http/ErrorResponse.scala | 6 +-
.../core/entitlement/ActivationThrottler.scala | 22 ++++++-
.../scala/whisk/core/entitlement/Entitlement.scala | 73 +++++++++-------------
.../whisk/core/entitlement/RateThrottler.scala | 27 ++++----
tests/src/test/scala/limits/ThrottleTests.scala | 50 ++++++++++-----
.../core/controller/test/RateThrottleTests.scala | 22 +++----
.../core/controller/test/WebActionsApiTests.scala | 4 +-
7 files changed, 112 insertions(+), 92 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
index 98db5e8..760b9f9 100644
--- a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
+++ b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
@@ -59,10 +59,12 @@ object Messages {
val resourceDoesNotExist = "The requested resource does not exist."
/** Standard message for too many activation requests within a rolling time window. */
- val tooManyRequests = "Too many requests in a given amount of time for namespace."
+ def tooManyRequests(count: Int, allowed: Int) =
+ s"Too many requests in the last minute (count: $count, allowed: $allowed)."
/** Standard message for too many concurrent activation requests within a time window. */
- val tooManyConcurrentRequests = "Too many concurrent requests in flight for namespace."
+ def tooManyConcurrentRequests(count: Int, allowed: Int) =
+ s"Too many concurrent requests in flight (count: $count, allowed: $allowed)."
/** System overload message. */
val systemOverloaded = "System is overloaded, try again later."
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
index 688d807..6afeb91 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
@@ -21,6 +21,7 @@ import whisk.common.Logging
import whisk.common.TransactionId
import whisk.core.entity.Identity
import whisk.core.loadBalancer.LoadBalancer
+import whisk.http.Messages
/**
* Determines user limits and activation counts as seen by the invoker and the loadbalancer
@@ -41,13 +42,13 @@ class ActivationThrottler(loadBalancer: LoadBalancer, defaultConcurrencyLimit: I
/**
* Checks whether the operation should be allowed to proceed.
*/
- def check(user: Identity)(implicit tid: TransactionId): Boolean = {
+ def check(user: Identity)(implicit tid: TransactionId): RateLimit = {
val concurrentActivations = loadBalancer.activeActivationsFor(user.uuid)
val concurrencyLimit = user.limits.concurrentInvocations.getOrElse(defaultConcurrencyLimit)
logging.info(
this,
- s"namespace = ${user.uuid.asString}, concurrent activations = $concurrentActivations, below limit = $concurrencyLimit")
- concurrentActivations < concurrencyLimit
+ s"namespace = ${user.uuid.asString}, concurrent activations = $concurrentActivations, limit = $concurrencyLimit")
+ ConcurrentRateLimit(concurrentActivations, concurrencyLimit)
}
/**
@@ -59,3 +60,18 @@ class ActivationThrottler(loadBalancer: LoadBalancer, defaultConcurrencyLimit: I
concurrentActivations > systemOverloadLimit
}
}
+
+sealed trait RateLimit {
+ def ok: Boolean
+ def errorMsg: String
+}
+
+case class ConcurrentRateLimit(count: Int, allowed: Int) extends RateLimit {
+ val ok = count < allowed // must have slack for the current activation request
+ override def errorMsg = Messages.tooManyConcurrentRequests(count, allowed)
+}
+
+case class TimedRateLimit(count: Int, allowed: Int) extends RateLimit {
+ val ok = count <= allowed // the count is already updated to account for the current request
+ override def errorMsg = Messages.tooManyRequests(count, allowed)
+}
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
index 5e04366..9a0e9ba 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
@@ -129,14 +129,9 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
protected[core] def checkThrottles(user: Identity)(implicit transid: TransactionId): Future[Unit] = {
logging.info(this, s"checking user '${user.subject}' has not exceeded activation quota")
-
- checkSystemOverload(ACTIVATE) orElse {
- checkThrottleOverload(!invokeRateThrottler.check(user), tooManyRequests)
- } orElse {
- checkThrottleOverload(!concurrentInvokeThrottler.check(user), tooManyConcurrentRequests)
- } map {
- Future.failed(_)
- } getOrElse Future.successful({})
+ checkSystemOverload(ACTIVATE)
+ .flatMap(_ => checkThrottleOverload(invokeRateThrottler.check(user)))
+ .flatMap(_ => checkThrottleOverload(concurrentInvokeThrottler.check(user)))
}
/**
@@ -176,13 +171,10 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
val entitlementCheck: Future[Boolean] = if (user.rights.contains(right)) {
if (resources.nonEmpty) {
logging.info(this, s"checking user '$subject' has privilege '$right' for '${resources.mkString(",")}'")
- checkSystemOverload(right) orElse {
- checkUserThrottle(user, right, resources)
- } orElse {
- checkConcurrentUserThrottle(user, right, resources)
- } map {
- Future.failed(_)
- } getOrElse checkPrivilege(user, right, resources)
+ checkSystemOverload(right)
+ .flatMap(_ => checkUserThrottle(user, right, resources))
+ .flatMap(_ => checkConcurrentUserThrottle(user, right, resources))
+ .flatMap(_ => checkPrivilege(user, right, resources))
} else Future.successful(true)
} else if (right != REJECT) {
logging.info(
@@ -236,14 +228,14 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
* Limits activations if the system is overloaded.
*
* @param right the privilege, if ACTIVATE then check quota else return None
- * @return None if system is not overloaded else a rejection
+ * @return future completing successfully if system is not overloaded else failing with a rejection
*/
- protected def checkSystemOverload(right: Privilege)(implicit transid: TransactionId): Option[RejectRequest] = {
+ protected def checkSystemOverload(right: Privilege)(implicit transid: TransactionId): Future[Unit] = {
val systemOverload = right == ACTIVATE && concurrentInvokeThrottler.isOverloaded
if (systemOverload) {
logging.error(this, "system is overloaded")
- Some(RejectRequest(TooManyRequests, systemOverloaded))
- } else None
+ Future.failed(RejectRequest(TooManyRequests, systemOverloaded))
+ } else Future.successful(())
}
/**
@@ -255,17 +247,17 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
* @param user the subject identity to check rights for
* @param right the privilege, if ACTIVATE then check quota else return None
* @param resource the set of resources must contain at least one resource that can be activated else return None
- * @return None if subject is not throttled else a rejection
+ * @return future completing successfully if user is below limits else failing with a rejection
*/
private def checkUserThrottle(user: Identity, right: Privilege, resources: Set[Resource])(
- implicit transid: TransactionId): Option[RejectRequest] = {
- def userThrottled = {
- val isInvocation = resources.exists(_.collection.path == Collection.ACTIONS)
- val isTrigger = resources.exists(_.collection.path == Collection.TRIGGERS)
- (isInvocation && !invokeRateThrottler.check(user)) || (isTrigger && !triggerRateThrottler.check(user))
- }
-
- checkThrottleOverload(right == ACTIVATE && userThrottled, tooManyRequests)
+ implicit transid: TransactionId): Future[Unit] = {
+ if (right == ACTIVATE) {
+ if (resources.exists(_.collection.path == Collection.ACTIONS)) {
+ checkThrottleOverload(invokeRateThrottler.check(user))
+ } else if (resources.exists(_.collection.path == Collection.TRIGGERS)) {
+ checkThrottleOverload(triggerRateThrottler.check(user))
+ } else Future.successful(())
+ } else Future.successful(())
}
/**
@@ -277,24 +269,21 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
* @param user the subject identity to check rights for
* @param right the privilege, if ACTIVATE then check quota else return None
* @param resource the set of resources must contain at least one resource that can be activated else return None
- * @return None if subject is not throttled else a rejection
+ * @return future completing successfully if user is below limits else failing with a rejection
*/
private def checkConcurrentUserThrottle(user: Identity, right: Privilege, resources: Set[Resource])(
- implicit transid: TransactionId): Option[RejectRequest] = {
- def userThrottled = {
- val isInvocation = resources.exists(_.collection.path == Collection.ACTIONS)
- (isInvocation && !concurrentInvokeThrottler.check(user))
- }
-
- checkThrottleOverload(right == ACTIVATE && userThrottled, tooManyConcurrentRequests)
+ implicit transid: TransactionId): Future[Unit] = {
+ if (right == ACTIVATE && resources.exists(_.collection.path == Collection.ACTIONS)) {
+ checkThrottleOverload(concurrentInvokeThrottler.check(user))
+ } else Future.successful(())
}
- /** Helper. */
- private def checkThrottleOverload(hasTooMany: Boolean, message: String)(
- implicit transid: TransactionId): Option[RejectRequest] = {
- if (hasTooMany) {
- Some(RejectRequest(TooManyRequests, message))
- } else None
+ private def checkThrottleOverload(throttle: RateLimit)(implicit transid: TransactionId): Future[Unit] = {
+ if (throttle.ok) {
+ Future.successful(())
+ } else {
+ Future.failed(RejectRequest(TooManyRequests, throttle.errorMsg))
+ }
}
}
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/RateThrottler.scala b/core/controller/src/main/scala/whisk/core/entitlement/RateThrottler.scala
index 770d78f..ef2cf22 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/RateThrottler.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/RateThrottler.scala
@@ -23,6 +23,7 @@ import whisk.common.Logging
import whisk.common.TransactionId
import whisk.core.entity.Identity
import whisk.core.entity.UUID
+import java.util.concurrent.atomic.AtomicInteger
/**
* A class tracking the rate of invocation (or any operation) by subject (any key really).
@@ -46,15 +47,13 @@ class RateThrottler(description: String, defaultMaxPerMinute: Int, overrideMaxPe
* @param user the identity to check
* @return true iff subject namespace is below allowed limit
*/
- def check(user: Identity)(implicit transid: TransactionId): Boolean = {
+ def check(user: Identity)(implicit transid: TransactionId): RateLimit = {
val uuid = user.uuid // this is namespace identifier
- val rate = rateMap.getOrElseUpdate(uuid, new RateInfo)
+ val throttle = rateMap.getOrElseUpdate(uuid, new RateInfo)
val limit = overrideMaxPerMinute(user).getOrElse(defaultMaxPerMinute)
- val belowLimit = rate.check(limit)
- logging.debug(
- this,
- s"namespace = ${uuid.asString} rate = ${rate.count()}, limit = $limit, below limit = $belowLimit")
- belowLimit
+ val rate = TimedRateLimit(throttle.update(limit), limit)
+ logging.debug(this, s"namespace = ${uuid.asString} rate = ${rate.count}, limit = $limit")
+ rate
}
}
@@ -62,10 +61,8 @@ class RateThrottler(description: String, defaultMaxPerMinute: Int, overrideMaxPe
* Tracks the activation rate of one subject at minute-granularity.
*/
private class RateInfo {
- var lastMin = getCurrentMinute
- var lastMinCount = 0
-
- def count() = lastMinCount
+ @volatile var lastMin = getCurrentMinute
+ val lastMinCount = new AtomicInteger()
/**
* Increments operation count in the current time window by
@@ -73,18 +70,18 @@ private class RateInfo {
*
* @param maxPerMinute the current maximum allowed requests
* per minute (might change over time)
+ * @return current count
*/
- def check(maxPerMinute: Int): Boolean = {
+ def update(maxPerMinute: Int): Int = {
roll()
- lastMinCount = lastMinCount + 1
- lastMinCount <= maxPerMinute
+ lastMinCount.incrementAndGet()
}
def roll() = {
val curMin = getCurrentMinute
if (curMin != lastMin) {
lastMin = curMin
- lastMinCount = 0
+ lastMinCount.set(0)
}
}
diff --git a/tests/src/test/scala/limits/ThrottleTests.scala b/tests/src/test/scala/limits/ThrottleTests.scala
index a1d85ac..8da66e8 100644
--- a/tests/src/test/scala/limits/ThrottleTests.scala
+++ b/tests/src/test/scala/limits/ThrottleTests.scala
@@ -46,6 +46,10 @@ import org.scalatest.BeforeAndAfterAll
import common.RunWskAdminCmd
import whisk.utils.retry
+protected[limits] trait LocalHelper {
+ def prefix(msg: String) = msg.substring(0, msg.indexOf('('))
+}
+
@RunWith(classOf[JUnitRunner])
class ThrottleTests
extends FlatSpec
@@ -53,7 +57,8 @@ class ThrottleTests
with WskTestHelpers
with WskActorSystem
with ScalaFutures
- with Matchers {
+ with Matchers
+ with LocalHelper {
// use an infinite thread pool so that activations do not wait to send the activation requests
override implicit val executionContext = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
@@ -86,7 +91,7 @@ class ThrottleTests
*/
def throttledActivations(results: List[RunResult], message: String) = {
val count = results.count { result =>
- result.exitCode == TestUtils.THROTTLED && result.stderr.contains(message)
+ result.exitCode == TestUtils.THROTTLED && result.stderr.contains(prefix(message))
}
println(s"number of throttled activations: $count out of ${results.length}")
count
@@ -190,7 +195,7 @@ class ThrottleTests
val afterInvokes = Instant.now
try {
- val throttledCount = throttledActivations(results, tooManyRequests)
+ val throttledCount = throttledActivations(results, tooManyRequests(0, 0))
throttledCount should be > 0
} finally {
val alreadyWaited = durationBetween(afterInvokes, Instant.now)
@@ -216,7 +221,7 @@ class ThrottleTests
val afterFirings = Instant.now
try {
- val throttledCount = throttledActivations(results, tooManyRequests)
+ val throttledCount = throttledActivations(results, tooManyRequests(0, 0))
throttledCount should be > 0
} finally {
// no need to wait for activations of triggers since they consume no resources
@@ -267,7 +272,7 @@ class ThrottleTests
val combinedResults = slowResults ++ fastResults
try {
- val throttledCount = throttledActivations(combinedResults, tooManyConcurrentRequests)
+ val throttledCount = throttledActivations(combinedResults, tooManyConcurrentRequests(0, 0))
throttledCount should be > 0
} finally {
val alreadyWaited = durationBetween(afterSlowInvokes, Instant.now)
@@ -287,7 +292,8 @@ class NamespaceSpecificThrottleTests
with TestHelpers
with WskTestHelpers
with Matchers
- with BeforeAndAfterAll {
+ with BeforeAndAfterAll
+ with LocalHelper {
val wskadmin = new RunWskAdminCmd {}
val wsk = new Wsk
@@ -340,8 +346,12 @@ class NamespaceSpecificThrottleTests
trigger.create(triggerName)
}
- wsk.action.invoke(actionName, expectedExitCode = TestUtils.THROTTLED).stderr should include(tooManyRequests)
- wsk.trigger.fire(triggerName, expectedExitCode = TestUtils.THROTTLED).stderr should include(tooManyRequests)
+ wsk.action.invoke(actionName, expectedExitCode = TestUtils.THROTTLED).stderr should {
+ include(prefix(tooManyRequests(0, 0))) and include("allowed: 0")
+ }
+ wsk.trigger.fire(triggerName, expectedExitCode = TestUtils.THROTTLED).stderr should {
+ include(prefix(tooManyRequests(0, 0))) and include("allowed: 0")
+ }
}
it should "respect overridden rate-throttles of 1" in withAssetCleaner(oneProps) { (wp, assetHelper) =>
@@ -356,24 +366,30 @@ class NamespaceSpecificThrottleTests
trigger.create(triggerName)
}
- // One invoke should be allowed, the second one throttled
- // Due to the current implementation of the rate throttling, it could be possible, that the counter gets deleted, because the minute switches
+ // One invoke should be allowed, the second one throttled.
+ // Due to the current implementation of the rate throttling,
+ // it is possible that the counter gets deleted, because the minute switches.
retry({
val results = (1 to 2).map { _ =>
wsk.action.invoke(actionName, expectedExitCode = TestUtils.DONTCARE_EXIT)
}
results.map(_.exitCode) should contain(TestUtils.THROTTLED)
- results.map(_.stderr).mkString should include(tooManyRequests)
+ results.map(_.stderr).mkString should {
+ include(prefix(tooManyRequests(0, 0))) and include("allowed: 1")
+ }
}, 2, Some(1.second))
- // One fire should be allowed, the second one throttled
- // Due to the current implementation of the rate throttling, it could be possible, that the counter gets deleted, because the minute switches
+ // One fire should be allowed, the second one throttled.
+ // Due to the current implementation of the rate throttling,
+ // it is possible, that the counter gets deleted, because the minute switches.
retry({
val results = (1 to 2).map { _ =>
wsk.trigger.fire(triggerName, expectedExitCode = TestUtils.DONTCARE_EXIT)
}
results.map(_.exitCode) should contain(TestUtils.THROTTLED)
- results.map(_.stderr).mkString should include(tooManyRequests)
+ results.map(_.stderr).mkString should {
+ include(prefix(tooManyRequests(0, 0))) and include("allowed: 1")
+ }
}, 2, Some(1.second))
}
@@ -385,8 +401,8 @@ class NamespaceSpecificThrottleTests
action.create(actionName, defaultAction)
}
- wsk.action.invoke(actionName, expectedExitCode = TestUtils.THROTTLED).stderr should include(
- tooManyConcurrentRequests)
+ wsk.action.invoke(actionName, expectedExitCode = TestUtils.THROTTLED).stderr should {
+ include(prefix(tooManyConcurrentRequests(0, 0))) and include("allowed: 0")
+ }
}
-
}
diff --git a/tests/src/test/scala/whisk/core/controller/test/RateThrottleTests.scala b/tests/src/test/scala/whisk/core/controller/test/RateThrottleTests.scala
index 4d48b55..720e738 100644
--- a/tests/src/test/scala/whisk/core/controller/test/RateThrottleTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/RateThrottleTests.scala
@@ -44,24 +44,24 @@ class RateThrottleTests extends FlatSpec with Matchers with StreamLogging {
behavior of "Rate Throttle"
it should "throttle when rate exceeds allowed threshold" in {
- new RateThrottler("test", 0, _.limits.invocationsPerMinute).check(subject) shouldBe false
+ new RateThrottler("test", 0, _.limits.invocationsPerMinute).check(subject).ok shouldBe false
val rt = new RateThrottler("test", 1, _.limits.invocationsPerMinute)
- rt.check(subject) shouldBe true
- rt.check(subject) shouldBe false
- rt.check(subject) shouldBe false
+ rt.check(subject).ok shouldBe true
+ rt.check(subject).ok shouldBe false
+ rt.check(subject).ok shouldBe false
Thread.sleep(1.minute.toMillis)
- rt.check(subject) shouldBe true
+ rt.check(subject).ok shouldBe true
}
it should "check against an alternative limit if passed in" in {
val withLimits = subject.copy(limits = UserLimits(invocationsPerMinute = Some(5)))
val rt = new RateThrottler("test", 1, _.limits.invocationsPerMinute)
- rt.check(withLimits) shouldBe true // 1
- rt.check(withLimits) shouldBe true // 2
- rt.check(withLimits) shouldBe true // 3
- rt.check(withLimits) shouldBe true // 4
- rt.check(withLimits) shouldBe true // 5
- rt.check(withLimits) shouldBe false
+ rt.check(withLimits).ok shouldBe true // 1
+ rt.check(withLimits).ok shouldBe true // 2
+ rt.check(withLimits).ok shouldBe true // 3
+ rt.check(withLimits).ok shouldBe true // 4
+ rt.check(withLimits).ok shouldBe true // 5
+ rt.check(withLimits).ok shouldBe false
}
}
diff --git a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
index f43ed4a..7a87882 100644
--- a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
@@ -1297,7 +1297,7 @@ trait WebActionsApiTests extends ControllerTestCommon with BeforeAndAfterEach wi
failThrottleForSubject = Some(systemId)
m(s"$testRoutePath/$path") ~> Route.seal(routes(creds)) ~> check {
status should be(TooManyRequests)
- confirmErrorWithTid(responseAs[JsObject], Some(Messages.tooManyRequests))
+ confirmErrorWithTid(responseAs[JsObject], Some(Messages.tooManyRequests(2, 1)))
}
failThrottleForSubject = None
}
@@ -1492,7 +1492,7 @@ trait WebActionsApiTests extends ControllerTestCommon with BeforeAndAfterEach wi
failThrottleForSubject match {
case Some(subject) if subject == user.subject =>
- Future.failed(RejectRequest(TooManyRequests, Messages.tooManyRequests))
+ Future.failed(RejectRequest(TooManyRequests, Messages.tooManyRequests(2, 1)))
case _ => Future.successful({})
}
}
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].