You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by vv...@apache.org on 2018/09/24 13:17:23 UTC
[incubator-openwhisk] branch master updated: Adjust activation
metric schema to reflect reality. (#4006)
This is an automated email from the ASF dual-hosted git repository.
vvraskin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new ebfd5b0 Adjust activation metric schema to reflect reality. (#4006)
ebfd5b0 is described below
commit ebfd5b0b8fbef7b87a93ac5f395649708af0bb6e
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Mon Sep 24 15:17:15 2018 +0200
Adjust activation metric schema to reflect reality. (#4006)
* Adjust activation metric schema to reflect reality.
1. The `causedBy` annotation is actually an optional String vs. a boolean flag.
2. The `name` field is actually intended to contain the fully-qualified name of the action vs. a composite of the invocation namespace and an action name (which doesn't really represent anything).
Co-authored-by: Christian Bickel <gi...@cbickel.de>
* Require certain fields for sending activation metrics.
* Fix import.
* Pull enabled check to the call-site
* Move constructing logic and test it.
* Set loglevel to error.
---
.../main/scala/whisk/core/connector/Message.scala | 26 ++++++-
.../main/scala/whisk/core/entity/Parameter.scala | 8 ++-
.../scala/whisk/core/controller/WebActions.scala | 8 +--
.../scala/whisk/core/invoker/InvokerReactive.scala | 32 ++++-----
docs/metrics.md | 2 +-
.../core/connector/test/EventMessageTests.scala | 80 ++++++++++++++++++++++
.../core/controller/test/ConductorsApiTests.scala | 8 ++-
7 files changed, 133 insertions(+), 31 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index 55e5a64..ecc0bc6 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -147,7 +147,7 @@ case class Activation(name: String,
kind: String,
conductor: Boolean,
memory: Int,
- causedBy: Boolean)
+ causedBy: Option[String])
extends EventMessageBody {
val typeName = "Activation"
override def serialize = toJson.compactPrint
@@ -169,6 +169,30 @@ object Activation extends DefaultJsonProtocol {
"conductor",
"memory",
"causedBy")
+
+ /** Constructs an "Activation" event from a WhiskActivation */
+ def from(a: WhiskActivation): Try[Activation] = {
+ for {
+ // There are no sensible defaults for these fields, so they are required. They should always be there but there is
+ // no static analysis to proof that so we're defensive here.
+ fqn <- a.annotations.getAs[String](WhiskActivation.pathAnnotation)
+ kind <- a.annotations.getAs[String](WhiskActivation.kindAnnotation)
+ } yield {
+ Activation(
+ fqn,
+ a.response.statusCode,
+ a.duration.getOrElse(0),
+ a.annotations.getAs[Long](WhiskActivation.waitTimeAnnotation).getOrElse(0),
+ a.annotations.getAs[Long](WhiskActivation.initTimeAnnotation).getOrElse(0),
+ kind,
+ a.annotations.getAs[Boolean](WhiskActivation.conductorAnnotation).getOrElse(false),
+ a.annotations
+ .getAs[ActionLimits](WhiskActivation.limitsAnnotation)
+ .map(_.memory.megabytes)
+ .getOrElse(0),
+ a.annotations.getAs[String](WhiskActivation.causedByAnnotation).toOption)
+ }
+ }
}
case class Metric(metricName: String, metricValue: Long) extends EventMessageBody {
diff --git a/common/scala/src/main/scala/whisk/core/entity/Parameter.scala b/common/scala/src/main/scala/whisk/core/entity/Parameter.scala
index 14ebf51..0da679b 100644
--- a/common/scala/src/main/scala/whisk/core/entity/Parameter.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/Parameter.scala
@@ -17,9 +17,10 @@
package whisk.core.entity
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
import spray.json.DefaultJsonProtocol._
import spray.json._
+
import scala.language.postfixOps
import whisk.core.entity.size.SizeInt
import whisk.core.entity.size.SizeString
@@ -97,7 +98,10 @@ protected[core] class Parameters protected[entity] (private val params: Map[Para
protected[core] def get(p: String): Option[JsValue] = params.get(new ParameterName(p)).map(_.value)
/** Retrieves parameter by name if it exists. Returns that parameter if it is deserializable to {@code T} */
- protected[core] def getAs[T: JsonReader](p: String): Option[T] = get(p).flatMap(js => Try(js.convertTo[T]).toOption)
+ protected[core] def getAs[T: JsonReader](p: String): Try[T] =
+ get(p)
+ .fold[Try[JsValue]](Failure(new IllegalStateException(s"key '$p' does not exist")))(Success.apply)
+ .flatMap(js => Try(js.convertTo[T]))
/** Retrieves parameter by name if it exist. Returns true if parameter exists and has truthy value. */
protected[core] def isTruthy(p: String): Boolean = {
diff --git a/core/controller/src/main/scala/whisk/core/controller/WebActions.scala b/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
index e03bdc8..978dc24 100644
--- a/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
@@ -498,7 +498,7 @@ trait WhiskWebActionsApi extends Directives with ValidateRequestSize with PostAc
this,
"web action with require-whisk-auth was invoked without a matching x-require-whisk-auth header value")
terminate(Unauthorized)
- } else if (!action.annotations.getAs[Boolean]("web-custom-options").exists(identity)) {
+ } else if (!action.annotations.getAs[Boolean]("web-custom-options").getOrElse(false)) {
respondWithHeaders(defaultCorsResponse(context.headers)) {
if (context.method == OPTIONS) {
complete(OK, HttpEntity.Empty)
@@ -574,7 +574,7 @@ trait WhiskWebActionsApi extends Directives with ValidateRequestSize with PostAc
processRequest(actionOwnerIdentity, action, extension, onBehalfOf, context.withBody(body), isRawHttpAction)
}
- provide(action.annotations.getAs[Boolean]("raw-http").exists(identity)) { isRawHttpAction =>
+ provide(action.annotations.getAs[Boolean]("raw-http").getOrElse(false)) { isRawHttpAction =>
httpEntity match {
case Empty =>
process(None, isRawHttpAction)
@@ -743,8 +743,8 @@ trait WhiskWebActionsApi extends Directives with ValidateRequestSize with PostAc
implicit transid: TransactionId): Future[WhiskActionMetaData] = {
actionLookup flatMap { action =>
val requiresAuthenticatedUser =
- action.annotations.getAs[Boolean](WhiskAction.requireWhiskAuthAnnotation).exists(identity)
- val isExported = action.annotations.getAs[Boolean]("web-export").exists(identity)
+ action.annotations.getAs[Boolean](WhiskAction.requireWhiskAuthAnnotation).getOrElse(false)
+ val isExported = action.annotations.getAs[Boolean]("web-export").getOrElse(false)
if ((isExported && requiresAuthenticatedUser && authenticated) ||
(isExported && !requiresAuthenticatedUser)) {
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 5f4fd8d..652fefc 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -25,7 +25,6 @@ import akka.event.Logging.InfoLevel
import akka.stream.ActorMaterializer
import org.apache.kafka.common.errors.RecordTooLargeException
import pureconfig._
-import spray.json.DefaultJsonProtocol._
import spray.json._
import whisk.common.tracing.WhiskTracerProvider
import whisk.common._
@@ -130,30 +129,23 @@ class InvokerReactive(
s"posted ${if (recovery) "recovery" else "completion"} of activation ${activationResult.activationId}")
}
}
- // Potentially sends activation metadata to kafka if user events are enabled
- UserEvents.send(
- producer, {
- val activation = Activation(
- activationResult.namespace + EntityPath.PATHSEP + activationResult.name,
- activationResult.response.statusCode,
- activationResult.duration.getOrElse(0),
- activationResult.annotations.getAs[Long](WhiskActivation.waitTimeAnnotation).getOrElse(0),
- activationResult.annotations.getAs[Long](WhiskActivation.initTimeAnnotation).getOrElse(0),
- activationResult.annotations.getAs[String](WhiskActivation.kindAnnotation).getOrElse("unknown_kind"),
- activationResult.annotations.getAs[Boolean](WhiskActivation.conductorAnnotation).getOrElse(false),
- activationResult.annotations
- .getAs[ActionLimits](WhiskActivation.limitsAnnotation)
- .map(al => al.memory.megabytes)
- .getOrElse(0),
- activationResult.annotations.getAs[Boolean](WhiskActivation.causedByAnnotation).getOrElse(false))
+
+ if (UserEvents.enabled) {
+ val event = Activation.from(activationResult).map { body =>
EventMessage(
s"invoker${instance.instance}",
- activation,
+ body,
activationResult.subject,
activationResult.namespace.toString,
userId,
- activation.typeName)
- })
+ body.typeName)
+ }
+
+ event match {
+ case Success(msg) => UserEvents.send(producer, msg)
+ case Failure(t) => logging.error(this, s"activation event was not sent: $t")
+ }
+ }
send(Right(if (blockingInvoke) activationResult else activationResult.withoutLogsOrResult)).recoverWith {
case t if t.getCause.isInstanceOf[RecordTooLargeException] =>
diff --git a/docs/metrics.md b/docs/metrics.md
index 0e96321..9cfd887 100644
--- a/docs/metrics.md
+++ b/docs/metrics.md
@@ -285,7 +285,7 @@ duration - actual time the action code was running
kind - action flavor, e.g. Node.js
conductor - true for conductor backed actions
memory - maximum memory allowed for action container
-causedBy - true for sequence actions
+causedBy - contains the "causedBy" annotation (can be "sequence" or nothing at the moment)
```
Metric is any user specific event produced by the system and it at this moment includes the following information:
```
diff --git a/tests/src/test/scala/whisk/core/connector/test/EventMessageTests.scala b/tests/src/test/scala/whisk/core/connector/test/EventMessageTests.scala
new file mode 100644
index 0000000..a9fe2a9
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/connector/test/EventMessageTests.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.connector.test
+
+import java.time.Instant
+
+import org.junit.runner.RunWith
+import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.junit.JUnitRunner
+import spray.json._
+import spray.json.DefaultJsonProtocol._
+import whisk.core.connector.Activation
+import whisk.core.entity._
+import whisk.core.entity.size._
+
+import scala.concurrent.duration._
+import scala.util.Success
+
+/**
+ * Unit tests for the EventMessage objects.
+ */
+@RunWith(classOf[JUnitRunner])
+class EventMessageTests extends FlatSpec with Matchers {
+
+ behavior of "Activation"
+
+ val fullActivation = WhiskActivation(
+ namespace = EntityPath("ns"),
+ name = EntityName("a"),
+ Subject(),
+ activationId = ActivationId.generate(),
+ start = Instant.now(),
+ end = Instant.now(),
+ response = ActivationResponse.success(Some(JsObject("res" -> JsNumber(1)))),
+ annotations = Parameters("limits", ActionLimits(TimeLimit(1.second), MemoryLimit(128.MB), LogLimit(1.MB)).toJson) ++
+ Parameters(WhiskActivation.waitTimeAnnotation, 5.toJson) ++
+ Parameters(WhiskActivation.initTimeAnnotation, 10.toJson) ++
+ Parameters(WhiskActivation.kindAnnotation, "testkind") ++
+ Parameters(WhiskActivation.pathAnnotation, "ns2/a") ++
+ Parameters(WhiskActivation.causedByAnnotation, "sequence"),
+ duration = Some(123))
+
+ it should "transform an activation into an event body" in {
+ Activation.from(fullActivation) shouldBe Success(
+ Activation("ns2/a", 0, 123, 5, 10, "testkind", false, 128, Some("sequence")))
+ }
+
+ it should "fail transformation if needed annotations are missing" in {
+ Activation.from(fullActivation.copy(annotations = Parameters())) shouldBe 'failure
+ Activation.from(fullActivation.copy(annotations = fullActivation.annotations - WhiskActivation.kindAnnotation)) shouldBe 'failure
+ Activation.from(fullActivation.copy(annotations = fullActivation.annotations - WhiskActivation.pathAnnotation)) shouldBe 'failure
+ }
+
+ it should "provide sensible defaults for optional annotations" in {
+ val a =
+ fullActivation
+ .copy(
+ duration = None,
+ annotations = Parameters(WhiskActivation.kindAnnotation, "testkind") ++ Parameters(
+ WhiskActivation.pathAnnotation,
+ "ns2/a"))
+
+ Activation.from(a) shouldBe Success(Activation("ns2/a", 0, 0, 0, 0, "testkind", false, 0, None))
+ }
+}
diff --git a/tests/src/test/scala/whisk/core/controller/test/ConductorsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ConductorsApiTests.scala
index d8ac1ee..0f715b8 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ConductorsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ConductorsApiTests.scala
@@ -37,6 +37,8 @@ import whisk.core.controller.WhiskActionsApi
import whisk.core.entity._
import whisk.http.Messages._
+import scala.util.Success
+
/**
* Tests Conductor Actions API.
*
@@ -84,9 +86,9 @@ class ConductorsApiTests extends ControllerTestCommon with WhiskActionsApi {
response.fields("response").asJsObject.fields("result") shouldBe JsObject("payload" -> testString.toJson)
response.fields("duration") shouldBe duration.toJson
val annotations = response.fields("annotations").convertTo[Parameters]
- annotations.getAs[Boolean]("conductor") shouldBe Some(true)
- annotations.getAs[String]("kind") shouldBe Some("sequence")
- annotations.getAs[Boolean]("topmost") shouldBe Some(true)
+ annotations.getAs[Boolean]("conductor") shouldBe Success(true)
+ annotations.getAs[String]("kind") shouldBe Success("sequence")
+ annotations.getAs[Boolean]("topmost") shouldBe Success(true)
annotations.get("limits") should not be None
response.fields("logs").convertTo[JsArray].elements.size shouldBe 1
}