You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2019/01/24 12:16:06 UTC
[incubator-openwhisk] branch master updated: Add limit to not store
activations for a limitted namespace. (#4234)
This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new f83a438 Add limit to not store activations for a limitted namespace. (#4234)
f83a438 is described below
commit f83a438023c92c725cdd08825d9cb688bce95479
Author: Christian Bickel <gi...@cbickel.de>
AuthorDate: Thu Jan 24 13:15:58 2019 +0100
Add limit to not store activations for a limitted namespace. (#4234)
Operators will be able to disable storing of activations into activations store.
The flag is implemented like the per-namespace-limits. It can be set with wskadmin.
This commit was the initial idea of #4078. In the meantime, the idea in the other PR changed, to implement a throttle instead of a switch. But as this is a completely new type of rate-limit (which does not only allow or deny requests) that's a bit bigger to implement. So I'll go with a staged approach here and implement it as switch first.
If someone needs a throttle instead of completely switching off the writes to the DB, this can be brought up again and the throttle can be built upon the solution we already have.
---
.../openwhisk/core/database/ActivationStore.scala | 21 +++++++++++++++-
.../apache/openwhisk/core/entity/Identity.scala | 5 ++--
.../openwhisk/core/controller/Triggers.scala | 3 ++-
.../core/controller/actions/PrimitiveActions.scala | 2 +-
.../core/controller/actions/SequenceActions.scala | 2 +-
.../openwhisk/core/invoker/InvokerReactive.scala | 2 +-
tests/src/test/scala/limits/ThrottleTests.scala | 28 ++++++++++++++++++++--
.../openwhisk/core/admin/WskAdminTests.scala | 15 ++++++++++++
.../controller/test/ControllerTestCommon.scala | 2 +-
.../core/database/LimitsCommandTests.scala | 5 +++-
tools/admin/README-NEXT.md | 4 ++++
tools/admin/README.md | 4 ++++
.../openwhisk/core/database/LimitsCommand.scala | 13 ++++++++--
tools/admin/wskadmin | 13 ++++++++--
14 files changed, 104 insertions(+), 15 deletions(-)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala
index 174261a..e4f974a 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala
@@ -34,7 +34,26 @@ case class UserContext(user: Identity, request: HttpRequest = HttpRequest())
trait ActivationStore {
/**
- * Stores an activation.
+ * Checks if an activation should be stored in database and stores it.
+ *
+ * @param activation activation to store
+ * @param context user and request context
+ * @param transid transaction ID for request
+ * @param notifier cache change notifier
+ * @return Future containing DocInfo related to stored activation
+ */
+ def storeAfterCheck(activation: WhiskActivation, context: UserContext)(
+ implicit transid: TransactionId,
+ notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
+ if (context.user.limits.storeActivations.getOrElse(true)) {
+ store(activation, context)
+ } else {
+ Future.successful(DocInfo(activation.docid))
+ }
+ }
+
+ /**
+ * Stores an activation in the database.
*
* @param activation activation to store
* @param context user and request context
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala
index f1833aa..fcbb717 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala
@@ -34,10 +34,11 @@ import scala.util.Try
case class UserLimits(invocationsPerMinute: Option[Int] = None,
concurrentInvocations: Option[Int] = None,
firesPerMinute: Option[Int] = None,
- allowedKinds: Option[Set[String]] = None)
+ allowedKinds: Option[Set[String]] = None,
+ storeActivations: Option[Boolean] = None)
object UserLimits extends DefaultJsonProtocol {
- implicit val serdes = jsonFormat4(UserLimits.apply)
+ implicit val serdes = jsonFormat5(UserLimits.apply)
}
protected[core] case class Namespace(name: EntityName, uuid: UUID)
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Triggers.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Triggers.scala
index 7320567..53a46ba 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Triggers.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Triggers.scala
@@ -168,8 +168,9 @@ trait WhiskTriggersApi extends WhiskCollectionAPI {
triggerActivation
}
.map { activation =>
- activationStore.store(activation, context)
+ activationStore.storeAfterCheck(activation, context)
}
+
respondWithActivationIdHeader(triggerActivationId) {
complete(Accepted, triggerActivationId.toJsObject)
}
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
index b508442..1363f76 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
@@ -569,7 +569,7 @@ protected[actions] trait PrimitiveActions {
}
}
- activationStore.store(activation, context)(transid, notifier = None)
+ activationStore.storeAfterCheck(activation, context)(transid, notifier = None)
activation
}
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/SequenceActions.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/SequenceActions.scala
index ea7dfa2..a31c436 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/SequenceActions.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/SequenceActions.scala
@@ -175,7 +175,7 @@ protected[actions] trait SequenceActions {
case Failure(t) => logging.warn(this, s"activation event was not sent: $t")
}
}
- activationStore.store(seqActivation, context)(transid, notifier = None)
+ activationStore.storeAfterCheck(seqActivation, context)(transid, notifier = None)
// This should never happen; in this case, there is no activation record created or stored:
// should there be?
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
index ab30510..5f682d7 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
@@ -174,7 +174,7 @@ class InvokerReactive(
/** Stores an activation in the database. */
private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => {
implicit val transid: TransactionId = tid
- activationStore.store(activation, context)(tid, notifier = None)
+ activationStore.storeAfterCheck(activation, context)(tid, notifier = None)
}
/** Creates a ContainerProxy Actor when being called. */
diff --git a/tests/src/test/scala/limits/ThrottleTests.scala b/tests/src/test/scala/limits/ThrottleTests.scala
index 1e020d7..ee67e3c 100644
--- a/tests/src/test/scala/limits/ThrottleTests.scala
+++ b/tests/src/test/scala/limits/ThrottleTests.scala
@@ -317,7 +317,7 @@ class NamespaceSpecificThrottleTests
}
sanitizeNamespaces(
- Seq("zeroSubject", "zeroConcSubject", "oneSubject", "oneSequenceSubject"),
+ Seq("zeroSubject", "zeroConcSubject", "oneSubject", "oneSequenceSubject", "activationDisabled"),
expectedExitCode = DONTCARE_EXIT)
// Create a subject with rate limits == 0
@@ -346,8 +346,12 @@ class NamespaceSpecificThrottleTests
val oneSequenceProps = getAdditionalTestSubject("oneSequenceSubject")
wskadmin.cli(Seq("limits", "set", oneSequenceProps.namespace, "--invocationsPerMinute", "1", "--firesPerMinute", "1"))
+ // Create a subject where storing of activations in activationstore is disabled.
+ val activationDisabled = getAdditionalTestSubject("activationDisabled")
+ wskadmin.cli(Seq("limits", "set", activationDisabled.namespace, "--storeActivations", "false"))
+
override def afterAll() = {
- sanitizeNamespaces(Seq(zeroProps, zeroConcProps, oneProps, oneSequenceProps).map(_.namespace))
+ sanitizeNamespaces(Seq(zeroProps, zeroConcProps, oneProps, oneSequenceProps, activationDisabled).map(_.namespace))
}
behavior of "Namespace-specific throttles"
@@ -463,4 +467,24 @@ class NamespaceSpecificThrottleTests
include(prefix(tooManyConcurrentRequests(0, 0))) and include("allowed: 0")
}
}
+
+ it should "not store an activation if disabled for this namespace" in withAssetCleaner(activationDisabled) {
+ (wp, assetHelper) =>
+ implicit val props = wp
+ val actionName = "activationDisabled"
+
+ assetHelper.withCleaner(wsk.action, actionName) { (action, _) =>
+ action.create(actionName, defaultAction)
+ }
+
+ val runResult = wsk.action.invoke(actionName)
+ val activationId = wsk.activation.extractActivationId(runResult)
+ withClue(s"did not find an activation id in '$runResult'") {
+ activationId shouldBe a[Some[_]]
+ }
+
+ val activation = wsk.activation.waitForActivation(activationId.get)
+
+ activation shouldBe 'Left
+ }
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/admin/WskAdminTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/admin/WskAdminTests.scala
index a87542a..d499d43 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/admin/WskAdminTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/admin/WskAdminTests.scala
@@ -229,6 +229,21 @@ class WskAdminTests extends TestHelpers with WskActorSystem with Matchers with B
wskadmin.cli(Seq("limits", "delete", subject)).stdout should include("Limits deleted")
}
}
+
+ it should "disable saving of activations in ActivationsStore" in {
+ val subject = Subject().asString
+ try {
+ // set limit
+ wskadmin.cli(Seq("limits", "set", subject, "--storeActivations", "false"))
+ // check correctly set
+ val lines = wskadmin.cli(Seq("limits", "get", subject)).stdout.lines.toSeq
+ lines should have size 1
+ lines(0) shouldBe "storeActivations = False"
+ } finally {
+ wskadmin.cli(Seq("limits", "delete", subject)).stdout should include("Limits deleted")
+ }
+ }
+
it should "adjust whitelist for namespace" in {
val subject = Subject().asString
try {
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
index 0fb21aa..d623b3d 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
@@ -105,7 +105,7 @@ protected trait ControllerTestCommon
def storeActivation(activation: WhiskActivation, context: UserContext)(implicit transid: TransactionId,
timeout: Duration = 10 seconds): DocInfo = {
- val docFuture = activationStore.store(activation, context)
+ val docFuture = activationStore.storeAfterCheck(activation, context)
val doc = Await.result(docFuture, timeout)
assert(doc != null)
doc
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/LimitsCommandTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/LimitsCommandTests.scala
index 1b0bcab..3b13915 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/LimitsCommandTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/LimitsCommandTests.scala
@@ -52,6 +52,8 @@ class LimitsCommandTests extends FlatSpec with WhiskAdminCliTestBase {
"--allowedKinds",
"nodejs:6",
"blackbox",
+ "--storeActivations",
+ "false",
ns) shouldBe CommandMessages.limitsSuccessfullySet(ns)
val limits = limitsStore.get[LimitEntity](DocInfo(LimitsCommand.limitIdOf(EntityName(ns)))).futureValue
@@ -59,7 +61,8 @@ class LimitsCommandTests extends FlatSpec with WhiskAdminCliTestBase {
invocationsPerMinute = Some(3),
firesPerMinute = Some(7),
concurrentInvocations = Some(11),
- allowedKinds = Some(Set("nodejs:6", "blackbox")))
+ allowedKinds = Some(Set("nodejs:6", "blackbox")),
+ storeActivations = Some(false))
resultOk("limits", "set", "--invocationsPerMinute", "13", ns) shouldBe CommandMessages.limitsSuccessfullyUpdated(ns)
diff --git a/tools/admin/README-NEXT.md b/tools/admin/README-NEXT.md
index ac877d4..1c971e0 100644
--- a/tools/admin/README-NEXT.md
+++ b/tools/admin/README-NEXT.md
@@ -128,6 +128,10 @@ Limits successfully set for "space1"
# set limits on allowedKinds
$ wskadmin-next limits set --allowedKinds nodejs:6 python space1
Limits successfully set for "space1"
+
+# set limits to disable saving of activations in activationstore
+$ wskadmin-next limits set space1 --storeActivations false
+Limits successfully set for "space1"
```
Note that limits apply to a namespace and will survive even if all users that share a namespace are deleted. You must manually delete them.
diff --git a/tools/admin/README.md b/tools/admin/README.md
index 19d15c0..6834afc 100644
--- a/tools/admin/README.md
+++ b/tools/admin/README.md
@@ -82,6 +82,10 @@ Limits successfully set for "space1"
# set limits on allowedKinds
$ wskadmin limits set space1 --allowedKinds nodejs:6 python
Limits successfully set for "space1"
+
+# set limits to disable saving of activations in activationstore
+$ wskadmin limits set space1 --storeActivations false
+Limits successfully set for "space1"
```
Note that limits apply to a namespace and will survive even if all users that share a namespace are deleted. You must manually delete them.
diff --git a/tools/admin/src/main/scala/org/apache/openwhisk/core/database/LimitsCommand.scala b/tools/admin/src/main/scala/org/apache/openwhisk/core/database/LimitsCommand.scala
index 0b15430..9c3ea32 100644
--- a/tools/admin/src/main/scala/org/apache/openwhisk/core/database/LimitsCommand.scala
+++ b/tools/admin/src/main/scala/org/apache/openwhisk/core/database/LimitsCommand.scala
@@ -80,6 +80,13 @@ class LimitsCommand extends Subcommand("limits") with WhiskCommand {
name = "allowedKinds",
noshort = true,
default = None)
+ val storeActivations =
+ opt[String](
+ descr = "enable or disable storing of activations to datastore for this namespace",
+ argName = "STOREACTIVATIONS",
+ name = "storeActivations",
+ noshort = true,
+ default = None)
lazy val limits: LimitEntity =
new LimitEntity(
@@ -88,7 +95,8 @@ class LimitsCommand extends Subcommand("limits") with WhiskCommand {
invocationsPerMinute.toOption,
concurrentInvocations.toOption,
firesPerMinute.toOption,
- allowedKinds.toOption.map(_.toSet)))
+ allowedKinds.toOption.map(_.toSet),
+ storeActivations.toOption.map(_.toBoolean)))
}
addSubcommand(set)
@@ -147,7 +155,8 @@ class LimitsCommand extends Subcommand("limits") with WhiskCommand {
l.concurrentInvocations.map(ci => s"concurrentInvocations = $ci"),
l.invocationsPerMinute.map(i => s"invocationsPerMinute = $i"),
l.firesPerMinute.map(i => s"firesPerMinute = $i"),
- l.allowedKinds.map(k => s"allowedKinds = ${k.mkString(", ")}")).flatten.mkString(Properties.lineSeparator)
+ l.allowedKinds.map(k => s"allowedKinds = ${k.mkString(", ")}"),
+ l.storeActivations.map(sa => s"storeActivations = $sa")).flatten.mkString(Properties.lineSeparator)
Right(msg)
}
.recover {
diff --git a/tools/admin/wskadmin b/tools/admin/wskadmin
index 39342a7..d6e1cff 100755
--- a/tools/admin/wskadmin
+++ b/tools/admin/wskadmin
@@ -86,6 +86,14 @@ def main():
exitCode = 1
sys.exit(exitCode)
+def str_to_bool(value):
+ if value.lower() in ("yes", "true"):
+ return True
+ elif value.lower() in ("no", "false"):
+ return False
+ else:
+ raise argparse.ArgumentTypeError("%s is not a valid boolean." % value)
+
def parseArgs():
parser = argparse.ArgumentParser(description='OpenWhisk admin command line tool')
parser.add_argument('-v', '--verbose', help='verbose output', action='store_true')
@@ -132,6 +140,7 @@ def parseArgs():
subcmd.add_argument('--firesPerMinute', help='trigger fires per minute allowed', type=int)
subcmd.add_argument('--concurrentInvocations', help='concurrent invocations allowed for this namespace', type=int)
subcmd.add_argument('--allowedKinds', help='list of runtime kinds allowed in this namespace', nargs='+', type=str)
+ subcmd.add_argument('--storeActivations', help='enable or disable storing of activations to datastore for this namespace', default=None, type=str_to_bool)
subcmd = subparser.add_parser('get', help='get limits for a given namespace (if none exist, system defaults apply)')
subcmd.add_argument('namespace', help='the namespace to get limits for')
@@ -517,7 +526,7 @@ def setLimitsCmd(args, props):
(dbDoc, res) = getDocumentFromDb(props, quote_plus(docId), args.verbose)
doc = dbDoc or {'_id': docId}
- limits = ['invocationsPerMinute', 'firesPerMinute', 'concurrentInvocations', 'allowedKinds']
+ limits = ['invocationsPerMinute', 'firesPerMinute', 'concurrentInvocations', 'allowedKinds', 'storeActivations']
for limit in limits:
givenLimit = argsDict.get(limit)
toSet = givenLimit if givenLimit != None else doc.get(limit)
@@ -536,7 +545,7 @@ def getLimitsCmd(args, props):
(dbDoc, res) = getDocumentFromDb(props, quote_plus(docId), args.verbose)
if dbDoc is not None:
- limits = ['invocationsPerMinute', 'firesPerMinute', 'concurrentInvocations', 'allowedKinds']
+ limits = ['invocationsPerMinute', 'firesPerMinute', 'concurrentInvocations', 'allowedKinds', 'storeActivations']
for limit in limits:
givenLimit = dbDoc.get(limit)
if givenLimit != None: