You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by vv...@apache.org on 2018/09/26 07:01:39 UTC

[incubator-openwhisk] branch master updated: Send activations of sequences to events topic. (#4019)

This is an automated email from the ASF dual-hosted git repository.

vvraskin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new ce6c656  Send activations of sequences to events topic. (#4019)
ce6c656 is described below

commit ce6c656d5a66a8c4ade8c5817df5f82d1d7915a9
Author: Christian Bickel <gi...@cbickel.de>
AuthorDate: Wed Sep 26 09:01:33 2018 +0200

    Send activations of sequences to events topic. (#4019)
    
    * Send Sequence actions to UserMetrics.
    
    Co-authored-by: Markus Thömmes <ma...@me.com>
    
    * Send activations of cunductor-actions to user-metric-topics.
    
    * Adapt test, to allow activations from controller.
    
    * Add  to source of usermetric.
    
    * Review.
---
 .../main/scala/whisk/core/connector/Message.scala  |  6 ++++
 .../core/controller/actions/PrimitiveActions.scala | 21 +++++++++---
 .../core/controller/actions/SequenceActions.scala  | 38 ++++++++++++++--------
 .../scala/whisk/core/invoker/InvokerReactive.scala | 12 +------
 .../test/scala/whisk/common/UserEventTests.scala   |  2 +-
 5 files changed, 48 insertions(+), 31 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index ecc0bc6..ed677b1 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -221,5 +221,11 @@ object EventMessage extends DefaultJsonProtocol {
   implicit val format =
     jsonFormat(EventMessage.apply _, "source", "body", "subject", "namespace", "userId", "eventType", "timestamp")
 
+  def from(a: WhiskActivation, source: String, userId: UUID): Try[EventMessage] = {
+    Activation.from(a).map { body =>
+      EventMessage(source, body, a.subject, a.namespace.toString, userId, body.typeName)
+    }
+  }
+
   def parse(msg: String) = format.read(msg.parseJson)
 }
diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
index 7bbe071..b924d93 100644
--- a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
@@ -22,23 +22,23 @@ import java.time.{Clock, Instant}
 import akka.actor.ActorSystem
 import akka.event.Logging.InfoLevel
 import spray.json._
-import whisk.common.{Logging, LoggingMarkers, TransactionId}
 import whisk.common.tracing.WhiskTracerProvider
-import whisk.core.connector.ActivationMessage
+import whisk.common.{Logging, LoggingMarkers, TransactionId, UserEvents}
+import whisk.core.connector.{ActivationMessage, EventMessage, MessagingProvider}
 import whisk.core.controller.WhiskServices
-import whisk.core.database.{ActivationStore, NoDocumentException}
+import whisk.core.database.{ActivationStore, NoDocumentException, UserContext}
 import whisk.core.entitlement.{Resource, _}
 import whisk.core.entity.ActivationResponse.ERROR_FIELD
 import whisk.core.entity._
 import whisk.core.entity.size.SizeInt
 import whisk.core.entity.types.EntityStore
 import whisk.http.Messages._
+import whisk.spi.SpiLoader
 import whisk.utils.ExecutionContextFactory.FutureExtensions
-import whisk.core.database.UserContext
 
 import scala.collection.mutable.Buffer
-import scala.concurrent.{ExecutionContext, Future, Promise}
 import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future, Promise}
 import scala.language.postfixOps
 import scala.util.{Failure, Success}
 
@@ -66,6 +66,10 @@ protected[actions] trait PrimitiveActions {
   /** Database service to get activations. */
   protected val activationStore: ActivationStore
 
+  /** Message producer. This is needed to write user-metrics. */
+  private val messagingProvider = SpiLoader.get[MessagingProvider]
+  private val producer = messagingProvider.getProducer(services.whiskConfig)
+
   /** A method that knows how to invoke a sequence of actions. */
   protected[actions] def invokeSequence(
     user: Identity,
@@ -554,6 +558,13 @@ protected[actions] trait PrimitiveActions {
         sequenceLimits,
       duration = Some(session.duration))
 
+    if (UserEvents.enabled) {
+      EventMessage.from(activation, s"controller${activeAckTopicIndex.asString}", user.namespace.uuid) match {
+        case Success(msg) => UserEvents.send(producer, msg)
+        case Failure(t)   => logging.warn(this, s"activation event was not sent: $t")
+      }
+    }
+
     activationStore.store(activation, context)(transid, notifier = None)
 
     activation
diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
index d38a91a..6002a57 100644
--- a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
@@ -17,30 +17,27 @@
 
 package whisk.core.controller.actions
 
-import java.time.Clock
-import java.time.Instant
+import java.time.{Clock, Instant}
 import java.util.concurrent.atomic.AtomicReference
 
-import scala.collection._
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.util.Failure
-import scala.util.Success
 import akka.actor.ActorSystem
 import spray.json._
-import whisk.common.Logging
-import whisk.common.TransactionId
+import whisk.common.{Logging, TransactionId, UserEvents}
+import whisk.core.connector.{EventMessage, MessagingProvider}
 import whisk.core.controller.WhiskServices
-import whisk.core.database.ActivationStore
-import whisk.core.database.NoDocumentException
+import whisk.core.database.{ActivationStore, NoDocumentException, UserContext}
 import whisk.core.entity._
 import whisk.core.entity.size.SizeInt
 import whisk.core.entity.types._
 import whisk.http.Messages._
+import whisk.spi.SpiLoader
 import whisk.utils.ExecutionContextFactory.FutureExtensions
-import whisk.core.database.UserContext
+
+import scala.collection._
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success}
 
 protected[actions] trait SequenceActions {
   /** The core collections require backend services to be injected in this trait. */
@@ -60,6 +57,13 @@ protected[actions] trait SequenceActions {
   /** Database service to get activations. */
   protected val activationStore: ActivationStore
 
+  /** Instace of the controller. This is needed to write user-metrics. */
+  protected val activeAckTopicIndex: ControllerInstanceId
+
+  /** Message producer. This is needed to write user-metrics. */
+  private val messagingProvider = SpiLoader.get[MessagingProvider]
+  private val producer = messagingProvider.getProducer(services.whiskConfig)
+
   /** A method that knows how to invoke a single primitive action. */
   protected[actions] def invokeAction(
     user: Identity,
@@ -165,6 +169,12 @@ protected[actions] trait SequenceActions {
       }
       .andThen {
         case Success((Right(seqActivation), _)) =>
+          if (UserEvents.enabled) {
+            EventMessage.from(seqActivation, s"controller${activeAckTopicIndex.asString}", user.namespace.uuid) match {
+              case Success(msg) => UserEvents.send(producer, msg)
+              case Failure(t)   => logging.warn(this, s"activation event was not sent: $t")
+            }
+          }
           activationStore.store(seqActivation, context)(transid, notifier = None)
 
         // This should never happen; in this case, there is no activation record created or stored:
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 652fefc..28b0289 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -131,17 +131,7 @@ class InvokerReactive(
     }
 
     if (UserEvents.enabled) {
-      val event = Activation.from(activationResult).map { body =>
-        EventMessage(
-          s"invoker${instance.instance}",
-          body,
-          activationResult.subject,
-          activationResult.namespace.toString,
-          userId,
-          body.typeName)
-      }
-
-      event match {
+      EventMessage.from(activationResult, s"invoker${instance.instance}", userId) match {
         case Success(msg) => UserEvents.send(producer, msg)
         case Failure(t)   => logging.error(this, s"activation event was not sent: $t")
       }
diff --git a/tests/src/test/scala/whisk/common/UserEventTests.scala b/tests/src/test/scala/whisk/common/UserEventTests.scala
index cfa02dc..e9bec96 100644
--- a/tests/src/test/scala/whisk/common/UserEventTests.scala
+++ b/tests/src/test/scala/whisk/common/UserEventTests.scala
@@ -77,7 +77,7 @@ class UserEventTests extends FlatSpec with Matchers with WskTestHelpers with Str
         event.body match {
           case a: Activation =>
             Seq(a.statusCode) should contain oneOf (0, 1, 2, 3)
-            event.source should fullyMatch regex "invoker\\d+".r
+            event.source should fullyMatch regex "(invoker|controller)\\d+".r
           case m: Metric =>
             Seq(m.metricName) should contain oneOf ("ConcurrentInvocations", "ConcurrentRateLimit", "TimedRateLimit")
             event.source should fullyMatch regex "controller\\d+".r