You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by vv...@apache.org on 2018/09/24 13:17:23 UTC

[incubator-openwhisk] branch master updated: Adjust activation metric schema to reflect reality. (#4006)

This is an automated email from the ASF dual-hosted git repository.

vvraskin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new ebfd5b0  Adjust activation metric schema to reflect reality. (#4006)
ebfd5b0 is described below

commit ebfd5b0b8fbef7b87a93ac5f395649708af0bb6e
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Mon Sep 24 15:17:15 2018 +0200

    Adjust activation metric schema to reflect reality. (#4006)
    
    * Adjust activation metric schema to reflect reality.
    
    1. The `causedBy` annotation is actually an optional String vs. a boolean flag.
    2. The `name` field is actually intended to contain the fully-qualified name of the action vs. a composite of the invocation namespace and an action name (which doesn't really represent anything).
    
    Co-authored-by: Christian Bickel <gi...@cbickel.de>
    
    * Require certain fields for sending activation metrics.
    
    * Fix import.
    
    * Pull enabled check to the call-site
    
    * Move constructing logic and test it.
    
    * Set loglevel to error.
---
 .../main/scala/whisk/core/connector/Message.scala  | 26 ++++++-
 .../main/scala/whisk/core/entity/Parameter.scala   |  8 ++-
 .../scala/whisk/core/controller/WebActions.scala   |  8 +--
 .../scala/whisk/core/invoker/InvokerReactive.scala | 32 ++++-----
 docs/metrics.md                                    |  2 +-
 .../core/connector/test/EventMessageTests.scala    | 80 ++++++++++++++++++++++
 .../core/controller/test/ConductorsApiTests.scala  |  8 ++-
 7 files changed, 133 insertions(+), 31 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index 55e5a64..ecc0bc6 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -147,7 +147,7 @@ case class Activation(name: String,
                       kind: String,
                       conductor: Boolean,
                       memory: Int,
-                      causedBy: Boolean)
+                      causedBy: Option[String])
     extends EventMessageBody {
   val typeName = "Activation"
   override def serialize = toJson.compactPrint
@@ -169,6 +169,30 @@ object Activation extends DefaultJsonProtocol {
       "conductor",
       "memory",
       "causedBy")
+
+  /** Constructs an "Activation" event from a WhiskActivation */
+  def from(a: WhiskActivation): Try[Activation] = {
+    for {
+      // There are no sensible defaults for these fields, so they are required. They should always be there but there is
+      // no static analysis to proof that so we're defensive here.
+      fqn <- a.annotations.getAs[String](WhiskActivation.pathAnnotation)
+      kind <- a.annotations.getAs[String](WhiskActivation.kindAnnotation)
+    } yield {
+      Activation(
+        fqn,
+        a.response.statusCode,
+        a.duration.getOrElse(0),
+        a.annotations.getAs[Long](WhiskActivation.waitTimeAnnotation).getOrElse(0),
+        a.annotations.getAs[Long](WhiskActivation.initTimeAnnotation).getOrElse(0),
+        kind,
+        a.annotations.getAs[Boolean](WhiskActivation.conductorAnnotation).getOrElse(false),
+        a.annotations
+          .getAs[ActionLimits](WhiskActivation.limitsAnnotation)
+          .map(_.memory.megabytes)
+          .getOrElse(0),
+        a.annotations.getAs[String](WhiskActivation.causedByAnnotation).toOption)
+    }
+  }
 }
 
 case class Metric(metricName: String, metricValue: Long) extends EventMessageBody {
diff --git a/common/scala/src/main/scala/whisk/core/entity/Parameter.scala b/common/scala/src/main/scala/whisk/core/entity/Parameter.scala
index 14ebf51..0da679b 100644
--- a/common/scala/src/main/scala/whisk/core/entity/Parameter.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/Parameter.scala
@@ -17,9 +17,10 @@
 
 package whisk.core.entity
 
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
 import spray.json.DefaultJsonProtocol._
 import spray.json._
+
 import scala.language.postfixOps
 import whisk.core.entity.size.SizeInt
 import whisk.core.entity.size.SizeString
@@ -97,7 +98,10 @@ protected[core] class Parameters protected[entity] (private val params: Map[Para
   protected[core] def get(p: String): Option[JsValue] = params.get(new ParameterName(p)).map(_.value)
 
   /** Retrieves parameter by name if it exists. Returns that parameter if it is deserializable to {@code T} */
-  protected[core] def getAs[T: JsonReader](p: String): Option[T] = get(p).flatMap(js => Try(js.convertTo[T]).toOption)
+  protected[core] def getAs[T: JsonReader](p: String): Try[T] =
+    get(p)
+      .fold[Try[JsValue]](Failure(new IllegalStateException(s"key '$p' does not exist")))(Success.apply)
+      .flatMap(js => Try(js.convertTo[T]))
 
   /** Retrieves parameter by name if it exist. Returns true if parameter exists and has truthy value. */
   protected[core] def isTruthy(p: String): Boolean = {
diff --git a/core/controller/src/main/scala/whisk/core/controller/WebActions.scala b/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
index e03bdc8..978dc24 100644
--- a/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
@@ -498,7 +498,7 @@ trait WhiskWebActionsApi extends Directives with ValidateRequestSize with PostAc
                         this,
                         "web action with require-whisk-auth was invoked without a matching x-require-whisk-auth header value")
                       terminate(Unauthorized)
-                    } else if (!action.annotations.getAs[Boolean]("web-custom-options").exists(identity)) {
+                    } else if (!action.annotations.getAs[Boolean]("web-custom-options").getOrElse(false)) {
                       respondWithHeaders(defaultCorsResponse(context.headers)) {
                         if (context.method == OPTIONS) {
                           complete(OK, HttpEntity.Empty)
@@ -574,7 +574,7 @@ trait WhiskWebActionsApi extends Directives with ValidateRequestSize with PostAc
       processRequest(actionOwnerIdentity, action, extension, onBehalfOf, context.withBody(body), isRawHttpAction)
     }
 
-    provide(action.annotations.getAs[Boolean]("raw-http").exists(identity)) { isRawHttpAction =>
+    provide(action.annotations.getAs[Boolean]("raw-http").getOrElse(false)) { isRawHttpAction =>
       httpEntity match {
         case Empty =>
           process(None, isRawHttpAction)
@@ -743,8 +743,8 @@ trait WhiskWebActionsApi extends Directives with ValidateRequestSize with PostAc
     implicit transid: TransactionId): Future[WhiskActionMetaData] = {
     actionLookup flatMap { action =>
       val requiresAuthenticatedUser =
-        action.annotations.getAs[Boolean](WhiskAction.requireWhiskAuthAnnotation).exists(identity)
-      val isExported = action.annotations.getAs[Boolean]("web-export").exists(identity)
+        action.annotations.getAs[Boolean](WhiskAction.requireWhiskAuthAnnotation).getOrElse(false)
+      val isExported = action.annotations.getAs[Boolean]("web-export").getOrElse(false)
 
       if ((isExported && requiresAuthenticatedUser && authenticated) ||
           (isExported && !requiresAuthenticatedUser)) {
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 5f4fd8d..652fefc 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -25,7 +25,6 @@ import akka.event.Logging.InfoLevel
 import akka.stream.ActorMaterializer
 import org.apache.kafka.common.errors.RecordTooLargeException
 import pureconfig._
-import spray.json.DefaultJsonProtocol._
 import spray.json._
 import whisk.common.tracing.WhiskTracerProvider
 import whisk.common._
@@ -130,30 +129,23 @@ class InvokerReactive(
             s"posted ${if (recovery) "recovery" else "completion"} of activation ${activationResult.activationId}")
       }
     }
-    // Potentially sends activation metadata to kafka if user events are enabled
-    UserEvents.send(
-      producer, {
-        val activation = Activation(
-          activationResult.namespace + EntityPath.PATHSEP + activationResult.name,
-          activationResult.response.statusCode,
-          activationResult.duration.getOrElse(0),
-          activationResult.annotations.getAs[Long](WhiskActivation.waitTimeAnnotation).getOrElse(0),
-          activationResult.annotations.getAs[Long](WhiskActivation.initTimeAnnotation).getOrElse(0),
-          activationResult.annotations.getAs[String](WhiskActivation.kindAnnotation).getOrElse("unknown_kind"),
-          activationResult.annotations.getAs[Boolean](WhiskActivation.conductorAnnotation).getOrElse(false),
-          activationResult.annotations
-            .getAs[ActionLimits](WhiskActivation.limitsAnnotation)
-            .map(al => al.memory.megabytes)
-            .getOrElse(0),
-          activationResult.annotations.getAs[Boolean](WhiskActivation.causedByAnnotation).getOrElse(false))
+
+    if (UserEvents.enabled) {
+      val event = Activation.from(activationResult).map { body =>
         EventMessage(
           s"invoker${instance.instance}",
-          activation,
+          body,
           activationResult.subject,
           activationResult.namespace.toString,
           userId,
-          activation.typeName)
-      })
+          body.typeName)
+      }
+
+      event match {
+        case Success(msg) => UserEvents.send(producer, msg)
+        case Failure(t)   => logging.error(this, s"activation event was not sent: $t")
+      }
+    }
 
     send(Right(if (blockingInvoke) activationResult else activationResult.withoutLogsOrResult)).recoverWith {
       case t if t.getCause.isInstanceOf[RecordTooLargeException] =>
diff --git a/docs/metrics.md b/docs/metrics.md
index 0e96321..9cfd887 100644
--- a/docs/metrics.md
+++ b/docs/metrics.md
@@ -285,7 +285,7 @@ duration - actual time the action code was running
 kind - action flavor, e.g. Node.js
 conductor - true for conductor backed actions
 memory - maximum memory allowed for action container
-causedBy - true for sequence actions
+causedBy - contains the "causedBy" annotation (can be "sequence" or nothing at the moment)
 ```
 Metric is any user specific event produced by the system and it at this moment includes the following information:
 ```
diff --git a/tests/src/test/scala/whisk/core/connector/test/EventMessageTests.scala b/tests/src/test/scala/whisk/core/connector/test/EventMessageTests.scala
new file mode 100644
index 0000000..a9fe2a9
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/connector/test/EventMessageTests.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.connector.test
+
+import java.time.Instant
+
+import org.junit.runner.RunWith
+import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.junit.JUnitRunner
+import spray.json._
+import spray.json.DefaultJsonProtocol._
+import whisk.core.connector.Activation
+import whisk.core.entity._
+import whisk.core.entity.size._
+
+import scala.concurrent.duration._
+import scala.util.Success
+
+/**
+ * Unit tests for the EventMessage objects.
+ */
+@RunWith(classOf[JUnitRunner])
+class EventMessageTests extends FlatSpec with Matchers {
+
+  behavior of "Activation"
+
+  val fullActivation = WhiskActivation(
+    namespace = EntityPath("ns"),
+    name = EntityName("a"),
+    Subject(),
+    activationId = ActivationId.generate(),
+    start = Instant.now(),
+    end = Instant.now(),
+    response = ActivationResponse.success(Some(JsObject("res" -> JsNumber(1)))),
+    annotations = Parameters("limits", ActionLimits(TimeLimit(1.second), MemoryLimit(128.MB), LogLimit(1.MB)).toJson) ++
+      Parameters(WhiskActivation.waitTimeAnnotation, 5.toJson) ++
+      Parameters(WhiskActivation.initTimeAnnotation, 10.toJson) ++
+      Parameters(WhiskActivation.kindAnnotation, "testkind") ++
+      Parameters(WhiskActivation.pathAnnotation, "ns2/a") ++
+      Parameters(WhiskActivation.causedByAnnotation, "sequence"),
+    duration = Some(123))
+
+  it should "transform an activation into an event body" in {
+    Activation.from(fullActivation) shouldBe Success(
+      Activation("ns2/a", 0, 123, 5, 10, "testkind", false, 128, Some("sequence")))
+  }
+
+  it should "fail transformation if needed annotations are missing" in {
+    Activation.from(fullActivation.copy(annotations = Parameters())) shouldBe 'failure
+    Activation.from(fullActivation.copy(annotations = fullActivation.annotations - WhiskActivation.kindAnnotation)) shouldBe 'failure
+    Activation.from(fullActivation.copy(annotations = fullActivation.annotations - WhiskActivation.pathAnnotation)) shouldBe 'failure
+  }
+
+  it should "provide sensible defaults for optional annotations" in {
+    val a =
+      fullActivation
+        .copy(
+          duration = None,
+          annotations = Parameters(WhiskActivation.kindAnnotation, "testkind") ++ Parameters(
+            WhiskActivation.pathAnnotation,
+            "ns2/a"))
+
+    Activation.from(a) shouldBe Success(Activation("ns2/a", 0, 0, 0, 0, "testkind", false, 0, None))
+  }
+}
diff --git a/tests/src/test/scala/whisk/core/controller/test/ConductorsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ConductorsApiTests.scala
index d8ac1ee..0f715b8 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ConductorsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ConductorsApiTests.scala
@@ -37,6 +37,8 @@ import whisk.core.controller.WhiskActionsApi
 import whisk.core.entity._
 import whisk.http.Messages._
 
+import scala.util.Success
+
 /**
  * Tests Conductor Actions API.
  *
@@ -84,9 +86,9 @@ class ConductorsApiTests extends ControllerTestCommon with WhiskActionsApi {
       response.fields("response").asJsObject.fields("result") shouldBe JsObject("payload" -> testString.toJson)
       response.fields("duration") shouldBe duration.toJson
       val annotations = response.fields("annotations").convertTo[Parameters]
-      annotations.getAs[Boolean]("conductor") shouldBe Some(true)
-      annotations.getAs[String]("kind") shouldBe Some("sequence")
-      annotations.getAs[Boolean]("topmost") shouldBe Some(true)
+      annotations.getAs[Boolean]("conductor") shouldBe Success(true)
+      annotations.getAs[String]("kind") shouldBe Success("sequence")
+      annotations.getAs[Boolean]("topmost") shouldBe Success(true)
       annotations.get("limits") should not be None
       response.fields("logs").convertTo[JsArray].elements.size shouldBe 1
     }