You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2019/01/24 12:16:06 UTC

[incubator-openwhisk] branch master updated: Add limit to not store activations for a limitted namespace. (#4234)

This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new f83a438  Add limit to not store activations for a limitted namespace. (#4234)
f83a438 is described below

commit f83a438023c92c725cdd08825d9cb688bce95479
Author: Christian Bickel <gi...@cbickel.de>
AuthorDate: Thu Jan 24 13:15:58 2019 +0100

    Add limit to not store activations for a limitted namespace. (#4234)
    
    Operators will be able to disable storing of activations into activations store.
    The flag is implemented like the per-namespace-limits. It can be set with wskadmin.
    
    This commit was the initial idea of #4078. In the meantime, the idea in the other PR changed, to implement a throttle instead of a switch. But as this is a completely new type of rate-limit (which does not only allow or deny requests) that's a bit bigger to implement. So I'll go with a staged approach here and implement it as switch first.
    If someone needs a throttle instead of completely switching off the writes to the DB, this can be brought up again and the throttle can be built upon the solution we already have.
---
 .../openwhisk/core/database/ActivationStore.scala  | 21 +++++++++++++++-
 .../apache/openwhisk/core/entity/Identity.scala    |  5 ++--
 .../openwhisk/core/controller/Triggers.scala       |  3 ++-
 .../core/controller/actions/PrimitiveActions.scala |  2 +-
 .../core/controller/actions/SequenceActions.scala  |  2 +-
 .../openwhisk/core/invoker/InvokerReactive.scala   |  2 +-
 tests/src/test/scala/limits/ThrottleTests.scala    | 28 ++++++++++++++++++++--
 .../openwhisk/core/admin/WskAdminTests.scala       | 15 ++++++++++++
 .../controller/test/ControllerTestCommon.scala     |  2 +-
 .../core/database/LimitsCommandTests.scala         |  5 +++-
 tools/admin/README-NEXT.md                         |  4 ++++
 tools/admin/README.md                              |  4 ++++
 .../openwhisk/core/database/LimitsCommand.scala    | 13 ++++++++--
 tools/admin/wskadmin                               | 13 ++++++++--
 14 files changed, 104 insertions(+), 15 deletions(-)

diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala
index 174261a..e4f974a 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala
@@ -34,7 +34,26 @@ case class UserContext(user: Identity, request: HttpRequest = HttpRequest())
 trait ActivationStore {
 
   /**
-   * Stores an activation.
+   * Checks if an activation should be stored in database and stores it.
+   *
+   * @param activation activation to store
+   * @param context user and request context
+   * @param transid transaction ID for request
+   * @param notifier cache change notifier
+   * @return Future containing DocInfo related to stored activation
+   */
+  def storeAfterCheck(activation: WhiskActivation, context: UserContext)(
+    implicit transid: TransactionId,
+    notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
+    if (context.user.limits.storeActivations.getOrElse(true)) {
+      store(activation, context)
+    } else {
+      Future.successful(DocInfo(activation.docid))
+    }
+  }
+
+  /**
+   * Stores an activation in the database.
    *
    * @param activation activation to store
    * @param context user and request context
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala
index f1833aa..fcbb717 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala
@@ -34,10 +34,11 @@ import scala.util.Try
 case class UserLimits(invocationsPerMinute: Option[Int] = None,
                       concurrentInvocations: Option[Int] = None,
                       firesPerMinute: Option[Int] = None,
-                      allowedKinds: Option[Set[String]] = None)
+                      allowedKinds: Option[Set[String]] = None,
+                      storeActivations: Option[Boolean] = None)
 
 object UserLimits extends DefaultJsonProtocol {
-  implicit val serdes = jsonFormat4(UserLimits.apply)
+  implicit val serdes = jsonFormat5(UserLimits.apply)
 }
 
 protected[core] case class Namespace(name: EntityName, uuid: UUID)
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Triggers.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Triggers.scala
index 7320567..53a46ba 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Triggers.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Triggers.scala
@@ -168,8 +168,9 @@ trait WhiskTriggersApi extends WhiskCollectionAPI {
                     triggerActivation
                 }
                 .map { activation =>
-                  activationStore.store(activation, context)
+                  activationStore.storeAfterCheck(activation, context)
                 }
+
               respondWithActivationIdHeader(triggerActivationId) {
                 complete(Accepted, triggerActivationId.toJsObject)
               }
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
index b508442..1363f76 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
@@ -569,7 +569,7 @@ protected[actions] trait PrimitiveActions {
       }
     }
 
-    activationStore.store(activation, context)(transid, notifier = None)
+    activationStore.storeAfterCheck(activation, context)(transid, notifier = None)
 
     activation
   }
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/SequenceActions.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/SequenceActions.scala
index ea7dfa2..a31c436 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/SequenceActions.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/SequenceActions.scala
@@ -175,7 +175,7 @@ protected[actions] trait SequenceActions {
               case Failure(t)   => logging.warn(this, s"activation event was not sent: $t")
             }
           }
-          activationStore.store(seqActivation, context)(transid, notifier = None)
+          activationStore.storeAfterCheck(seqActivation, context)(transid, notifier = None)
 
         // This should never happen; in this case, there is no activation record created or stored:
         // should there be?
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
index ab30510..5f682d7 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
@@ -174,7 +174,7 @@ class InvokerReactive(
   /** Stores an activation in the database. */
   private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => {
     implicit val transid: TransactionId = tid
-    activationStore.store(activation, context)(tid, notifier = None)
+    activationStore.storeAfterCheck(activation, context)(tid, notifier = None)
   }
 
   /** Creates a ContainerProxy Actor when being called. */
diff --git a/tests/src/test/scala/limits/ThrottleTests.scala b/tests/src/test/scala/limits/ThrottleTests.scala
index 1e020d7..ee67e3c 100644
--- a/tests/src/test/scala/limits/ThrottleTests.scala
+++ b/tests/src/test/scala/limits/ThrottleTests.scala
@@ -317,7 +317,7 @@ class NamespaceSpecificThrottleTests
   }
 
   sanitizeNamespaces(
-    Seq("zeroSubject", "zeroConcSubject", "oneSubject", "oneSequenceSubject"),
+    Seq("zeroSubject", "zeroConcSubject", "oneSubject", "oneSequenceSubject", "activationDisabled"),
     expectedExitCode = DONTCARE_EXIT)
 
   // Create a subject with rate limits == 0
@@ -346,8 +346,12 @@ class NamespaceSpecificThrottleTests
   val oneSequenceProps = getAdditionalTestSubject("oneSequenceSubject")
   wskadmin.cli(Seq("limits", "set", oneSequenceProps.namespace, "--invocationsPerMinute", "1", "--firesPerMinute", "1"))
 
+  // Create a subject where storing of activations in activationstore is disabled.
+  val activationDisabled = getAdditionalTestSubject("activationDisabled")
+  wskadmin.cli(Seq("limits", "set", activationDisabled.namespace, "--storeActivations", "false"))
+
   override def afterAll() = {
-    sanitizeNamespaces(Seq(zeroProps, zeroConcProps, oneProps, oneSequenceProps).map(_.namespace))
+    sanitizeNamespaces(Seq(zeroProps, zeroConcProps, oneProps, oneSequenceProps, activationDisabled).map(_.namespace))
   }
 
   behavior of "Namespace-specific throttles"
@@ -463,4 +467,24 @@ class NamespaceSpecificThrottleTests
       include(prefix(tooManyConcurrentRequests(0, 0))) and include("allowed: 0")
     }
   }
+
+  it should "not store an activation if disabled for this namespace" in withAssetCleaner(activationDisabled) {
+    (wp, assetHelper) =>
+      implicit val props = wp
+      val actionName = "activationDisabled"
+
+      assetHelper.withCleaner(wsk.action, actionName) { (action, _) =>
+        action.create(actionName, defaultAction)
+      }
+
+      val runResult = wsk.action.invoke(actionName)
+      val activationId = wsk.activation.extractActivationId(runResult)
+      withClue(s"did not find an activation id in '$runResult'") {
+        activationId shouldBe a[Some[_]]
+      }
+
+      val activation = wsk.activation.waitForActivation(activationId.get)
+
+      activation shouldBe 'Left
+  }
 }
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/admin/WskAdminTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/admin/WskAdminTests.scala
index a87542a..d499d43 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/admin/WskAdminTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/admin/WskAdminTests.scala
@@ -229,6 +229,21 @@ class WskAdminTests extends TestHelpers with WskActorSystem with Matchers with B
       wskadmin.cli(Seq("limits", "delete", subject)).stdout should include("Limits deleted")
     }
   }
+
+  it should "disable saving of activations in ActivationsStore" in {
+    val subject = Subject().asString
+    try {
+      // set limit
+      wskadmin.cli(Seq("limits", "set", subject, "--storeActivations", "false"))
+      // check correctly set
+      val lines = wskadmin.cli(Seq("limits", "get", subject)).stdout.lines.toSeq
+      lines should have size 1
+      lines(0) shouldBe "storeActivations = False"
+    } finally {
+      wskadmin.cli(Seq("limits", "delete", subject)).stdout should include("Limits deleted")
+    }
+  }
+
   it should "adjust whitelist for namespace" in {
     val subject = Subject().asString
     try {
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
index 0fb21aa..d623b3d 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
@@ -105,7 +105,7 @@ protected trait ControllerTestCommon
 
   def storeActivation(activation: WhiskActivation, context: UserContext)(implicit transid: TransactionId,
                                                                          timeout: Duration = 10 seconds): DocInfo = {
-    val docFuture = activationStore.store(activation, context)
+    val docFuture = activationStore.storeAfterCheck(activation, context)
     val doc = Await.result(docFuture, timeout)
     assert(doc != null)
     doc
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/LimitsCommandTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/LimitsCommandTests.scala
index 1b0bcab..3b13915 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/LimitsCommandTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/LimitsCommandTests.scala
@@ -52,6 +52,8 @@ class LimitsCommandTests extends FlatSpec with WhiskAdminCliTestBase {
       "--allowedKinds",
       "nodejs:6",
       "blackbox",
+      "--storeActivations",
+      "false",
       ns) shouldBe CommandMessages.limitsSuccessfullySet(ns)
 
     val limits = limitsStore.get[LimitEntity](DocInfo(LimitsCommand.limitIdOf(EntityName(ns)))).futureValue
@@ -59,7 +61,8 @@ class LimitsCommandTests extends FlatSpec with WhiskAdminCliTestBase {
       invocationsPerMinute = Some(3),
       firesPerMinute = Some(7),
       concurrentInvocations = Some(11),
-      allowedKinds = Some(Set("nodejs:6", "blackbox")))
+      allowedKinds = Some(Set("nodejs:6", "blackbox")),
+      storeActivations = Some(false))
 
     resultOk("limits", "set", "--invocationsPerMinute", "13", ns) shouldBe CommandMessages.limitsSuccessfullyUpdated(ns)
 
diff --git a/tools/admin/README-NEXT.md b/tools/admin/README-NEXT.md
index ac877d4..1c971e0 100644
--- a/tools/admin/README-NEXT.md
+++ b/tools/admin/README-NEXT.md
@@ -128,6 +128,10 @@ Limits successfully set for "space1"
 # set limits on allowedKinds
 $ wskadmin-next limits set --allowedKinds nodejs:6 python space1
 Limits successfully set for "space1"
+
+# set limits to disable saving of activations in activationstore
+$ wskadmin-next limits set space1 --storeActivations false
+Limits successfully set for "space1"
 ```
 
 Note that limits apply to a namespace and will survive even if all users that share a namespace are deleted. You must manually delete them.
diff --git a/tools/admin/README.md b/tools/admin/README.md
index 19d15c0..6834afc 100644
--- a/tools/admin/README.md
+++ b/tools/admin/README.md
@@ -82,6 +82,10 @@ Limits successfully set for "space1"
 # set limits on allowedKinds
 $ wskadmin limits set space1 --allowedKinds nodejs:6 python
 Limits successfully set for "space1"
+
+# set limits to disable saving of activations in activationstore
+$ wskadmin limits set space1 --storeActivations false
+Limits successfully set for "space1"
 ```
 
 Note that limits apply to a namespace and will survive even if all users that share a namespace are deleted. You must manually delete them.
diff --git a/tools/admin/src/main/scala/org/apache/openwhisk/core/database/LimitsCommand.scala b/tools/admin/src/main/scala/org/apache/openwhisk/core/database/LimitsCommand.scala
index 0b15430..9c3ea32 100644
--- a/tools/admin/src/main/scala/org/apache/openwhisk/core/database/LimitsCommand.scala
+++ b/tools/admin/src/main/scala/org/apache/openwhisk/core/database/LimitsCommand.scala
@@ -80,6 +80,13 @@ class LimitsCommand extends Subcommand("limits") with WhiskCommand {
         name = "allowedKinds",
         noshort = true,
         default = None)
+    val storeActivations =
+      opt[String](
+        descr = "enable or disable storing of activations to datastore for this namespace",
+        argName = "STOREACTIVATIONS",
+        name = "storeActivations",
+        noshort = true,
+        default = None)
 
     lazy val limits: LimitEntity =
       new LimitEntity(
@@ -88,7 +95,8 @@ class LimitsCommand extends Subcommand("limits") with WhiskCommand {
           invocationsPerMinute.toOption,
           concurrentInvocations.toOption,
           firesPerMinute.toOption,
-          allowedKinds.toOption.map(_.toSet)))
+          allowedKinds.toOption.map(_.toSet),
+          storeActivations.toOption.map(_.toBoolean)))
   }
   addSubcommand(set)
 
@@ -147,7 +155,8 @@ class LimitsCommand extends Subcommand("limits") with WhiskCommand {
           l.concurrentInvocations.map(ci => s"concurrentInvocations =  $ci"),
           l.invocationsPerMinute.map(i => s"invocationsPerMinute = $i"),
           l.firesPerMinute.map(i => s"firesPerMinute = $i"),
-          l.allowedKinds.map(k => s"allowedKinds = ${k.mkString(", ")}")).flatten.mkString(Properties.lineSeparator)
+          l.allowedKinds.map(k => s"allowedKinds = ${k.mkString(", ")}"),
+          l.storeActivations.map(sa => s"storeActivations = $sa")).flatten.mkString(Properties.lineSeparator)
         Right(msg)
       }
       .recover {
diff --git a/tools/admin/wskadmin b/tools/admin/wskadmin
index 39342a7..d6e1cff 100755
--- a/tools/admin/wskadmin
+++ b/tools/admin/wskadmin
@@ -86,6 +86,14 @@ def main():
             exitCode = 1
     sys.exit(exitCode)
 
+def str_to_bool(value):
+    if value.lower() in ("yes", "true"):
+        return True
+    elif value.lower() in ("no", "false"):
+        return False
+    else:
+        raise argparse.ArgumentTypeError("%s is not a valid boolean." % value)
+
 def parseArgs():
     parser = argparse.ArgumentParser(description='OpenWhisk admin command line tool')
     parser.add_argument('-v', '--verbose', help='verbose output', action='store_true')
@@ -132,6 +140,7 @@ def parseArgs():
     subcmd.add_argument('--firesPerMinute', help='trigger fires per minute allowed', type=int)
     subcmd.add_argument('--concurrentInvocations', help='concurrent invocations allowed for this namespace', type=int)
     subcmd.add_argument('--allowedKinds', help='list of runtime kinds allowed in this namespace', nargs='+', type=str)
+    subcmd.add_argument('--storeActivations', help='enable or disable storing of activations to datastore for this namespace', default=None, type=str_to_bool)
 
     subcmd = subparser.add_parser('get', help='get limits for a given namespace (if none exist, system defaults apply)')
     subcmd.add_argument('namespace', help='the namespace to get limits for')
@@ -517,7 +526,7 @@ def setLimitsCmd(args, props):
     (dbDoc, res) = getDocumentFromDb(props, quote_plus(docId), args.verbose)
     doc = dbDoc or {'_id': docId}
 
-    limits = ['invocationsPerMinute', 'firesPerMinute', 'concurrentInvocations', 'allowedKinds']
+    limits = ['invocationsPerMinute', 'firesPerMinute', 'concurrentInvocations', 'allowedKinds', 'storeActivations']
     for limit in limits:
         givenLimit = argsDict.get(limit)
         toSet = givenLimit if givenLimit != None else doc.get(limit)
@@ -536,7 +545,7 @@ def getLimitsCmd(args, props):
     (dbDoc, res) = getDocumentFromDb(props, quote_plus(docId), args.verbose)
 
     if dbDoc is not None:
-        limits = ['invocationsPerMinute', 'firesPerMinute', 'concurrentInvocations', 'allowedKinds']
+        limits = ['invocationsPerMinute', 'firesPerMinute', 'concurrentInvocations', 'allowedKinds', 'storeActivations']
         for limit in limits:
             givenLimit = dbDoc.get(limit)
             if givenLimit != None: