You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cs...@apache.org on 2017/09/07 15:09:16 UTC

[incubator-openwhisk] branch master updated: Decorate throttled error response with count/limit. (#2684)

This is an automated email from the ASF dual-hosted git repository.

csantanapr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 81ce5d4  Decorate throttled error response with count/limit. (#2684)
81ce5d4 is described below

commit 81ce5d43d82e3323d14cf5a8662da9fe60ffd0c3
Author: rodric rabbah <ro...@gmail.com>
AuthorDate: Thu Sep 7 11:09:14 2017 -0400

    Decorate throttled error response with count/limit. (#2684)
---
 .../src/main/scala/whisk/http/ErrorResponse.scala  |  6 +-
 .../core/entitlement/ActivationThrottler.scala     | 22 ++++++-
 .../scala/whisk/core/entitlement/Entitlement.scala | 73 +++++++++-------------
 .../whisk/core/entitlement/RateThrottler.scala     | 27 ++++----
 tests/src/test/scala/limits/ThrottleTests.scala    | 50 ++++++++++-----
 .../core/controller/test/RateThrottleTests.scala   | 22 +++----
 .../core/controller/test/WebActionsApiTests.scala  |  4 +-
 7 files changed, 112 insertions(+), 92 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
index 98db5e8..760b9f9 100644
--- a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
+++ b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
@@ -59,10 +59,12 @@ object Messages {
   val resourceDoesNotExist = "The requested resource does not exist."
 
   /** Standard message for too many activation requests within a rolling time window. */
-  val tooManyRequests = "Too many requests in a given amount of time for namespace."
+  def tooManyRequests(count: Int, allowed: Int) =
+    s"Too many requests in the last minute (count: $count, allowed: $allowed)."
 
   /** Standard message for too many concurrent activation requests within a time window. */
-  val tooManyConcurrentRequests = "Too many concurrent requests in flight for namespace."
+  def tooManyConcurrentRequests(count: Int, allowed: Int) =
+    s"Too many concurrent requests in flight (count: $count, allowed: $allowed)."
 
   /** System overload message. */
   val systemOverloaded = "System is overloaded, try again later."
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
index 688d807..6afeb91 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
@@ -21,6 +21,7 @@ import whisk.common.Logging
 import whisk.common.TransactionId
 import whisk.core.entity.Identity
 import whisk.core.loadBalancer.LoadBalancer
+import whisk.http.Messages
 
 /**
  * Determines user limits and activation counts as seen by the invoker and the loadbalancer
@@ -41,13 +42,13 @@ class ActivationThrottler(loadBalancer: LoadBalancer, defaultConcurrencyLimit: I
   /**
    * Checks whether the operation should be allowed to proceed.
    */
-  def check(user: Identity)(implicit tid: TransactionId): Boolean = {
+  def check(user: Identity)(implicit tid: TransactionId): RateLimit = {
     val concurrentActivations = loadBalancer.activeActivationsFor(user.uuid)
     val concurrencyLimit = user.limits.concurrentInvocations.getOrElse(defaultConcurrencyLimit)
     logging.info(
       this,
-      s"namespace = ${user.uuid.asString}, concurrent activations = $concurrentActivations, below limit = $concurrencyLimit")
-    concurrentActivations < concurrencyLimit
+      s"namespace = ${user.uuid.asString}, concurrent activations = $concurrentActivations, limit = $concurrencyLimit")
+    ConcurrentRateLimit(concurrentActivations, concurrencyLimit)
   }
 
   /**
@@ -59,3 +60,18 @@ class ActivationThrottler(loadBalancer: LoadBalancer, defaultConcurrencyLimit: I
     concurrentActivations > systemOverloadLimit
   }
 }
+
+sealed trait RateLimit {
+  def ok: Boolean
+  def errorMsg: String
+}
+
+case class ConcurrentRateLimit(count: Int, allowed: Int) extends RateLimit {
+  val ok = count < allowed // must have slack for the current activation request
+  override def errorMsg = Messages.tooManyConcurrentRequests(count, allowed)
+}
+
+case class TimedRateLimit(count: Int, allowed: Int) extends RateLimit {
+  val ok = count <= allowed // the count is already updated to account for the current request
+  override def errorMsg = Messages.tooManyRequests(count, allowed)
+}
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
index 5e04366..9a0e9ba 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
@@ -129,14 +129,9 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
   protected[core] def checkThrottles(user: Identity)(implicit transid: TransactionId): Future[Unit] = {
 
     logging.info(this, s"checking user '${user.subject}' has not exceeded activation quota")
-
-    checkSystemOverload(ACTIVATE) orElse {
-      checkThrottleOverload(!invokeRateThrottler.check(user), tooManyRequests)
-    } orElse {
-      checkThrottleOverload(!concurrentInvokeThrottler.check(user), tooManyConcurrentRequests)
-    } map {
-      Future.failed(_)
-    } getOrElse Future.successful({})
+    checkSystemOverload(ACTIVATE)
+      .flatMap(_ => checkThrottleOverload(invokeRateThrottler.check(user)))
+      .flatMap(_ => checkThrottleOverload(concurrentInvokeThrottler.check(user)))
   }
 
   /**
@@ -176,13 +171,10 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
     val entitlementCheck: Future[Boolean] = if (user.rights.contains(right)) {
       if (resources.nonEmpty) {
         logging.info(this, s"checking user '$subject' has privilege '$right' for '${resources.mkString(",")}'")
-        checkSystemOverload(right) orElse {
-          checkUserThrottle(user, right, resources)
-        } orElse {
-          checkConcurrentUserThrottle(user, right, resources)
-        } map {
-          Future.failed(_)
-        } getOrElse checkPrivilege(user, right, resources)
+        checkSystemOverload(right)
+          .flatMap(_ => checkUserThrottle(user, right, resources))
+          .flatMap(_ => checkConcurrentUserThrottle(user, right, resources))
+          .flatMap(_ => checkPrivilege(user, right, resources))
       } else Future.successful(true)
     } else if (right != REJECT) {
       logging.info(
@@ -236,14 +228,14 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
    * Limits activations if the system is overloaded.
    *
    * @param right the privilege, if ACTIVATE then check quota else return None
-   * @return None if system is not overloaded else a rejection
+   * @return future completing successfully if system is not overloaded else failing with a rejection
    */
-  protected def checkSystemOverload(right: Privilege)(implicit transid: TransactionId): Option[RejectRequest] = {
+  protected def checkSystemOverload(right: Privilege)(implicit transid: TransactionId): Future[Unit] = {
     val systemOverload = right == ACTIVATE && concurrentInvokeThrottler.isOverloaded
     if (systemOverload) {
       logging.error(this, "system is overloaded")
-      Some(RejectRequest(TooManyRequests, systemOverloaded))
-    } else None
+      Future.failed(RejectRequest(TooManyRequests, systemOverloaded))
+    } else Future.successful(())
   }
 
   /**
@@ -255,17 +247,17 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
    * @param user the subject identity to check rights for
    * @param right the privilege, if ACTIVATE then check quota else return None
    * @param resource the set of resources must contain at least one resource that can be activated else return None
-   * @return None if subject is not throttled else a rejection
+   * @return future completing successfully if user is below limits else failing with a rejection
    */
   private def checkUserThrottle(user: Identity, right: Privilege, resources: Set[Resource])(
-    implicit transid: TransactionId): Option[RejectRequest] = {
-    def userThrottled = {
-      val isInvocation = resources.exists(_.collection.path == Collection.ACTIONS)
-      val isTrigger = resources.exists(_.collection.path == Collection.TRIGGERS)
-      (isInvocation && !invokeRateThrottler.check(user)) || (isTrigger && !triggerRateThrottler.check(user))
-    }
-
-    checkThrottleOverload(right == ACTIVATE && userThrottled, tooManyRequests)
+    implicit transid: TransactionId): Future[Unit] = {
+    if (right == ACTIVATE) {
+      if (resources.exists(_.collection.path == Collection.ACTIONS)) {
+        checkThrottleOverload(invokeRateThrottler.check(user))
+      } else if (resources.exists(_.collection.path == Collection.TRIGGERS)) {
+        checkThrottleOverload(triggerRateThrottler.check(user))
+      } else Future.successful(())
+    } else Future.successful(())
   }
 
   /**
@@ -277,24 +269,21 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
    * @param user the subject identity to check rights for
    * @param right the privilege, if ACTIVATE then check quota else return None
    * @param resource the set of resources must contain at least one resource that can be activated else return None
-   * @return None if subject is not throttled else a rejection
+   * @return future completing successfully if user is below limits else failing with a rejection
    */
   private def checkConcurrentUserThrottle(user: Identity, right: Privilege, resources: Set[Resource])(
-    implicit transid: TransactionId): Option[RejectRequest] = {
-    def userThrottled = {
-      val isInvocation = resources.exists(_.collection.path == Collection.ACTIONS)
-      (isInvocation && !concurrentInvokeThrottler.check(user))
-    }
-
-    checkThrottleOverload(right == ACTIVATE && userThrottled, tooManyConcurrentRequests)
+    implicit transid: TransactionId): Future[Unit] = {
+    if (right == ACTIVATE && resources.exists(_.collection.path == Collection.ACTIONS)) {
+      checkThrottleOverload(concurrentInvokeThrottler.check(user))
+    } else Future.successful(())
   }
 
-  /** Helper. */
-  private def checkThrottleOverload(hasTooMany: Boolean, message: String)(
-    implicit transid: TransactionId): Option[RejectRequest] = {
-    if (hasTooMany) {
-      Some(RejectRequest(TooManyRequests, message))
-    } else None
+  private def checkThrottleOverload(throttle: RateLimit)(implicit transid: TransactionId): Future[Unit] = {
+    if (throttle.ok) {
+      Future.successful(())
+    } else {
+      Future.failed(RejectRequest(TooManyRequests, throttle.errorMsg))
+    }
   }
 }
 
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/RateThrottler.scala b/core/controller/src/main/scala/whisk/core/entitlement/RateThrottler.scala
index 770d78f..ef2cf22 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/RateThrottler.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/RateThrottler.scala
@@ -23,6 +23,7 @@ import whisk.common.Logging
 import whisk.common.TransactionId
 import whisk.core.entity.Identity
 import whisk.core.entity.UUID
+import java.util.concurrent.atomic.AtomicInteger
 
 /**
  * A class tracking the rate of invocation (or any operation) by subject (any key really).
@@ -46,15 +47,13 @@ class RateThrottler(description: String, defaultMaxPerMinute: Int, overrideMaxPe
    * @param user the identity to check
    * @return true iff subject namespace is below allowed limit
    */
-  def check(user: Identity)(implicit transid: TransactionId): Boolean = {
+  def check(user: Identity)(implicit transid: TransactionId): RateLimit = {
     val uuid = user.uuid // this is namespace identifier
-    val rate = rateMap.getOrElseUpdate(uuid, new RateInfo)
+    val throttle = rateMap.getOrElseUpdate(uuid, new RateInfo)
     val limit = overrideMaxPerMinute(user).getOrElse(defaultMaxPerMinute)
-    val belowLimit = rate.check(limit)
-    logging.debug(
-      this,
-      s"namespace = ${uuid.asString} rate = ${rate.count()}, limit = $limit, below limit = $belowLimit")
-    belowLimit
+    val rate = TimedRateLimit(throttle.update(limit), limit)
+    logging.debug(this, s"namespace = ${uuid.asString} rate = ${rate.count}, limit = $limit")
+    rate
   }
 }
 
@@ -62,10 +61,8 @@ class RateThrottler(description: String, defaultMaxPerMinute: Int, overrideMaxPe
  * Tracks the activation rate of one subject at minute-granularity.
  */
 private class RateInfo {
-  var lastMin = getCurrentMinute
-  var lastMinCount = 0
-
-  def count() = lastMinCount
+  @volatile var lastMin = getCurrentMinute
+  val lastMinCount = new AtomicInteger()
 
   /**
    * Increments operation count in the current time window by
@@ -73,18 +70,18 @@ private class RateInfo {
    *
    * @param maxPerMinute the current maximum allowed requests
    *                     per minute (might change over time)
+   * @return current count
    */
-  def check(maxPerMinute: Int): Boolean = {
+  def update(maxPerMinute: Int): Int = {
     roll()
-    lastMinCount = lastMinCount + 1
-    lastMinCount <= maxPerMinute
+    lastMinCount.incrementAndGet()
   }
 
   def roll() = {
     val curMin = getCurrentMinute
     if (curMin != lastMin) {
       lastMin = curMin
-      lastMinCount = 0
+      lastMinCount.set(0)
     }
   }
 
diff --git a/tests/src/test/scala/limits/ThrottleTests.scala b/tests/src/test/scala/limits/ThrottleTests.scala
index a1d85ac..8da66e8 100644
--- a/tests/src/test/scala/limits/ThrottleTests.scala
+++ b/tests/src/test/scala/limits/ThrottleTests.scala
@@ -46,6 +46,10 @@ import org.scalatest.BeforeAndAfterAll
 import common.RunWskAdminCmd
 import whisk.utils.retry
 
+protected[limits] trait LocalHelper {
+  def prefix(msg: String) = msg.substring(0, msg.indexOf('('))
+}
+
 @RunWith(classOf[JUnitRunner])
 class ThrottleTests
     extends FlatSpec
@@ -53,7 +57,8 @@ class ThrottleTests
     with WskTestHelpers
     with WskActorSystem
     with ScalaFutures
-    with Matchers {
+    with Matchers
+    with LocalHelper {
 
   // use an infinite thread pool so that activations do not wait to send the activation requests
   override implicit val executionContext = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
@@ -86,7 +91,7 @@ class ThrottleTests
    */
   def throttledActivations(results: List[RunResult], message: String) = {
     val count = results.count { result =>
-      result.exitCode == TestUtils.THROTTLED && result.stderr.contains(message)
+      result.exitCode == TestUtils.THROTTLED && result.stderr.contains(prefix(message))
     }
     println(s"number of throttled activations: $count out of ${results.length}")
     count
@@ -190,7 +195,7 @@ class ThrottleTests
     val afterInvokes = Instant.now
 
     try {
-      val throttledCount = throttledActivations(results, tooManyRequests)
+      val throttledCount = throttledActivations(results, tooManyRequests(0, 0))
       throttledCount should be > 0
     } finally {
       val alreadyWaited = durationBetween(afterInvokes, Instant.now)
@@ -216,7 +221,7 @@ class ThrottleTests
     val afterFirings = Instant.now
 
     try {
-      val throttledCount = throttledActivations(results, tooManyRequests)
+      val throttledCount = throttledActivations(results, tooManyRequests(0, 0))
       throttledCount should be > 0
     } finally {
       // no need to wait for activations of triggers since they consume no resources
@@ -267,7 +272,7 @@ class ThrottleTests
 
     val combinedResults = slowResults ++ fastResults
     try {
-      val throttledCount = throttledActivations(combinedResults, tooManyConcurrentRequests)
+      val throttledCount = throttledActivations(combinedResults, tooManyConcurrentRequests(0, 0))
       throttledCount should be > 0
     } finally {
       val alreadyWaited = durationBetween(afterSlowInvokes, Instant.now)
@@ -287,7 +292,8 @@ class NamespaceSpecificThrottleTests
     with TestHelpers
     with WskTestHelpers
     with Matchers
-    with BeforeAndAfterAll {
+    with BeforeAndAfterAll
+    with LocalHelper {
 
   val wskadmin = new RunWskAdminCmd {}
   val wsk = new Wsk
@@ -340,8 +346,12 @@ class NamespaceSpecificThrottleTests
       trigger.create(triggerName)
     }
 
-    wsk.action.invoke(actionName, expectedExitCode = TestUtils.THROTTLED).stderr should include(tooManyRequests)
-    wsk.trigger.fire(triggerName, expectedExitCode = TestUtils.THROTTLED).stderr should include(tooManyRequests)
+    wsk.action.invoke(actionName, expectedExitCode = TestUtils.THROTTLED).stderr should {
+      include(prefix(tooManyRequests(0, 0))) and include("allowed: 0")
+    }
+    wsk.trigger.fire(triggerName, expectedExitCode = TestUtils.THROTTLED).stderr should {
+      include(prefix(tooManyRequests(0, 0))) and include("allowed: 0")
+    }
   }
 
   it should "respect overridden rate-throttles of 1" in withAssetCleaner(oneProps) { (wp, assetHelper) =>
@@ -356,24 +366,30 @@ class NamespaceSpecificThrottleTests
       trigger.create(triggerName)
     }
 
-    // One invoke should be allowed, the second one throttled
-    // Due to the current implementation of the rate throttling, it could be possible, that the counter gets deleted, because the minute switches
+    // One invoke should be allowed, the second one throttled.
+    // Due to the current implementation of the rate throttling,
+    // it is possible that the counter gets deleted, because the minute switches.
     retry({
       val results = (1 to 2).map { _ =>
         wsk.action.invoke(actionName, expectedExitCode = TestUtils.DONTCARE_EXIT)
       }
       results.map(_.exitCode) should contain(TestUtils.THROTTLED)
-      results.map(_.stderr).mkString should include(tooManyRequests)
+      results.map(_.stderr).mkString should {
+        include(prefix(tooManyRequests(0, 0))) and include("allowed: 1")
+      }
     }, 2, Some(1.second))
 
-    // One fire should be allowed, the second one throttled
-    // Due to the current implementation of the rate throttling, it could be possible, that the counter gets deleted, because the minute switches
+    // One fire should be allowed, the second one throttled.
+    // Due to the current implementation of the rate throttling,
+    // it is possible, that the counter gets deleted, because the minute switches.
     retry({
       val results = (1 to 2).map { _ =>
         wsk.trigger.fire(triggerName, expectedExitCode = TestUtils.DONTCARE_EXIT)
       }
       results.map(_.exitCode) should contain(TestUtils.THROTTLED)
-      results.map(_.stderr).mkString should include(tooManyRequests)
+      results.map(_.stderr).mkString should {
+        include(prefix(tooManyRequests(0, 0))) and include("allowed: 1")
+      }
     }, 2, Some(1.second))
   }
 
@@ -385,8 +401,8 @@ class NamespaceSpecificThrottleTests
       action.create(actionName, defaultAction)
     }
 
-    wsk.action.invoke(actionName, expectedExitCode = TestUtils.THROTTLED).stderr should include(
-      tooManyConcurrentRequests)
+    wsk.action.invoke(actionName, expectedExitCode = TestUtils.THROTTLED).stderr should {
+      include(prefix(tooManyConcurrentRequests(0, 0))) and include("allowed: 0")
+    }
   }
-
 }
diff --git a/tests/src/test/scala/whisk/core/controller/test/RateThrottleTests.scala b/tests/src/test/scala/whisk/core/controller/test/RateThrottleTests.scala
index 4d48b55..720e738 100644
--- a/tests/src/test/scala/whisk/core/controller/test/RateThrottleTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/RateThrottleTests.scala
@@ -44,24 +44,24 @@ class RateThrottleTests extends FlatSpec with Matchers with StreamLogging {
   behavior of "Rate Throttle"
 
   it should "throttle when rate exceeds allowed threshold" in {
-    new RateThrottler("test", 0, _.limits.invocationsPerMinute).check(subject) shouldBe false
+    new RateThrottler("test", 0, _.limits.invocationsPerMinute).check(subject).ok shouldBe false
     val rt = new RateThrottler("test", 1, _.limits.invocationsPerMinute)
-    rt.check(subject) shouldBe true
-    rt.check(subject) shouldBe false
-    rt.check(subject) shouldBe false
+    rt.check(subject).ok shouldBe true
+    rt.check(subject).ok shouldBe false
+    rt.check(subject).ok shouldBe false
     Thread.sleep(1.minute.toMillis)
-    rt.check(subject) shouldBe true
+    rt.check(subject).ok shouldBe true
   }
 
   it should "check against an alternative limit if passed in" in {
     val withLimits = subject.copy(limits = UserLimits(invocationsPerMinute = Some(5)))
     val rt = new RateThrottler("test", 1, _.limits.invocationsPerMinute)
-    rt.check(withLimits) shouldBe true // 1
-    rt.check(withLimits) shouldBe true // 2
-    rt.check(withLimits) shouldBe true // 3
-    rt.check(withLimits) shouldBe true // 4
-    rt.check(withLimits) shouldBe true // 5
-    rt.check(withLimits) shouldBe false
+    rt.check(withLimits).ok shouldBe true // 1
+    rt.check(withLimits).ok shouldBe true // 2
+    rt.check(withLimits).ok shouldBe true // 3
+    rt.check(withLimits).ok shouldBe true // 4
+    rt.check(withLimits).ok shouldBe true // 5
+    rt.check(withLimits).ok shouldBe false
   }
 
 }
diff --git a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
index f43ed4a..7a87882 100644
--- a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
@@ -1297,7 +1297,7 @@ trait WebActionsApiTests extends ControllerTestCommon with BeforeAndAfterEach wi
           failThrottleForSubject = Some(systemId)
           m(s"$testRoutePath/$path") ~> Route.seal(routes(creds)) ~> check {
             status should be(TooManyRequests)
-            confirmErrorWithTid(responseAs[JsObject], Some(Messages.tooManyRequests))
+            confirmErrorWithTid(responseAs[JsObject], Some(Messages.tooManyRequests(2, 1)))
           }
           failThrottleForSubject = None
         }
@@ -1492,7 +1492,7 @@ trait WebActionsApiTests extends ControllerTestCommon with BeforeAndAfterEach wi
 
       failThrottleForSubject match {
         case Some(subject) if subject == user.subject =>
-          Future.failed(RejectRequest(TooManyRequests, Messages.tooManyRequests))
+          Future.failed(RejectRequest(TooManyRequests, Messages.tooManyRequests(2, 1)))
         case _ => Future.successful({})
       }
     }

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].