You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by vv...@apache.org on 2018/09/26 07:01:39 UTC
[incubator-openwhisk] branch master updated: Send activations of
sequences to events topic. (#4019)
This is an automated email from the ASF dual-hosted git repository.
vvraskin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new ce6c656 Send activations of sequences to events topic. (#4019)
ce6c656 is described below
commit ce6c656d5a66a8c4ade8c5817df5f82d1d7915a9
Author: Christian Bickel <gi...@cbickel.de>
AuthorDate: Wed Sep 26 09:01:33 2018 +0200
Send activations of sequences to events topic. (#4019)
* Send Sequence actions to UserMetrics.
Co-authored-by: Markus Thömmes <ma...@me.com>
* Send activations of cunductor-actions to user-metric-topics.
* Adapt test, to allow activations from controller.
* Add to source of usermetric.
* Review.
---
.../main/scala/whisk/core/connector/Message.scala | 6 ++++
.../core/controller/actions/PrimitiveActions.scala | 21 +++++++++---
.../core/controller/actions/SequenceActions.scala | 38 ++++++++++++++--------
.../scala/whisk/core/invoker/InvokerReactive.scala | 12 +------
.../test/scala/whisk/common/UserEventTests.scala | 2 +-
5 files changed, 48 insertions(+), 31 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index ecc0bc6..ed677b1 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -221,5 +221,11 @@ object EventMessage extends DefaultJsonProtocol {
implicit val format =
jsonFormat(EventMessage.apply _, "source", "body", "subject", "namespace", "userId", "eventType", "timestamp")
+ def from(a: WhiskActivation, source: String, userId: UUID): Try[EventMessage] = {
+ Activation.from(a).map { body =>
+ EventMessage(source, body, a.subject, a.namespace.toString, userId, body.typeName)
+ }
+ }
+
def parse(msg: String) = format.read(msg.parseJson)
}
diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
index 7bbe071..b924d93 100644
--- a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
@@ -22,23 +22,23 @@ import java.time.{Clock, Instant}
import akka.actor.ActorSystem
import akka.event.Logging.InfoLevel
import spray.json._
-import whisk.common.{Logging, LoggingMarkers, TransactionId}
import whisk.common.tracing.WhiskTracerProvider
-import whisk.core.connector.ActivationMessage
+import whisk.common.{Logging, LoggingMarkers, TransactionId, UserEvents}
+import whisk.core.connector.{ActivationMessage, EventMessage, MessagingProvider}
import whisk.core.controller.WhiskServices
-import whisk.core.database.{ActivationStore, NoDocumentException}
+import whisk.core.database.{ActivationStore, NoDocumentException, UserContext}
import whisk.core.entitlement.{Resource, _}
import whisk.core.entity.ActivationResponse.ERROR_FIELD
import whisk.core.entity._
import whisk.core.entity.size.SizeInt
import whisk.core.entity.types.EntityStore
import whisk.http.Messages._
+import whisk.spi.SpiLoader
import whisk.utils.ExecutionContextFactory.FutureExtensions
-import whisk.core.database.UserContext
import scala.collection.mutable.Buffer
-import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.language.postfixOps
import scala.util.{Failure, Success}
@@ -66,6 +66,10 @@ protected[actions] trait PrimitiveActions {
/** Database service to get activations. */
protected val activationStore: ActivationStore
+ /** Message producer. This is needed to write user-metrics. */
+ private val messagingProvider = SpiLoader.get[MessagingProvider]
+ private val producer = messagingProvider.getProducer(services.whiskConfig)
+
/** A method that knows how to invoke a sequence of actions. */
protected[actions] def invokeSequence(
user: Identity,
@@ -554,6 +558,13 @@ protected[actions] trait PrimitiveActions {
sequenceLimits,
duration = Some(session.duration))
+ if (UserEvents.enabled) {
+ EventMessage.from(activation, s"controller${activeAckTopicIndex.asString}", user.namespace.uuid) match {
+ case Success(msg) => UserEvents.send(producer, msg)
+ case Failure(t) => logging.warn(this, s"activation event was not sent: $t")
+ }
+ }
+
activationStore.store(activation, context)(transid, notifier = None)
activation
diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
index d38a91a..6002a57 100644
--- a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
@@ -17,30 +17,27 @@
package whisk.core.controller.actions
-import java.time.Clock
-import java.time.Instant
+import java.time.{Clock, Instant}
import java.util.concurrent.atomic.AtomicReference
-import scala.collection._
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.util.Failure
-import scala.util.Success
import akka.actor.ActorSystem
import spray.json._
-import whisk.common.Logging
-import whisk.common.TransactionId
+import whisk.common.{Logging, TransactionId, UserEvents}
+import whisk.core.connector.{EventMessage, MessagingProvider}
import whisk.core.controller.WhiskServices
-import whisk.core.database.ActivationStore
-import whisk.core.database.NoDocumentException
+import whisk.core.database.{ActivationStore, NoDocumentException, UserContext}
import whisk.core.entity._
import whisk.core.entity.size.SizeInt
import whisk.core.entity.types._
import whisk.http.Messages._
+import whisk.spi.SpiLoader
import whisk.utils.ExecutionContextFactory.FutureExtensions
-import whisk.core.database.UserContext
+
+import scala.collection._
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success}
protected[actions] trait SequenceActions {
/** The core collections require backend services to be injected in this trait. */
@@ -60,6 +57,13 @@ protected[actions] trait SequenceActions {
/** Database service to get activations. */
protected val activationStore: ActivationStore
+ /** Instace of the controller. This is needed to write user-metrics. */
+ protected val activeAckTopicIndex: ControllerInstanceId
+
+ /** Message producer. This is needed to write user-metrics. */
+ private val messagingProvider = SpiLoader.get[MessagingProvider]
+ private val producer = messagingProvider.getProducer(services.whiskConfig)
+
/** A method that knows how to invoke a single primitive action. */
protected[actions] def invokeAction(
user: Identity,
@@ -165,6 +169,12 @@ protected[actions] trait SequenceActions {
}
.andThen {
case Success((Right(seqActivation), _)) =>
+ if (UserEvents.enabled) {
+ EventMessage.from(seqActivation, s"controller${activeAckTopicIndex.asString}", user.namespace.uuid) match {
+ case Success(msg) => UserEvents.send(producer, msg)
+ case Failure(t) => logging.warn(this, s"activation event was not sent: $t")
+ }
+ }
activationStore.store(seqActivation, context)(transid, notifier = None)
// This should never happen; in this case, there is no activation record created or stored:
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 652fefc..28b0289 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -131,17 +131,7 @@ class InvokerReactive(
}
if (UserEvents.enabled) {
- val event = Activation.from(activationResult).map { body =>
- EventMessage(
- s"invoker${instance.instance}",
- body,
- activationResult.subject,
- activationResult.namespace.toString,
- userId,
- body.typeName)
- }
-
- event match {
+ EventMessage.from(activationResult, s"invoker${instance.instance}", userId) match {
case Success(msg) => UserEvents.send(producer, msg)
case Failure(t) => logging.error(this, s"activation event was not sent: $t")
}
diff --git a/tests/src/test/scala/whisk/common/UserEventTests.scala b/tests/src/test/scala/whisk/common/UserEventTests.scala
index cfa02dc..e9bec96 100644
--- a/tests/src/test/scala/whisk/common/UserEventTests.scala
+++ b/tests/src/test/scala/whisk/common/UserEventTests.scala
@@ -77,7 +77,7 @@ class UserEventTests extends FlatSpec with Matchers with WskTestHelpers with Str
event.body match {
case a: Activation =>
Seq(a.statusCode) should contain oneOf (0, 1, 2, 3)
- event.source should fullyMatch regex "invoker\\d+".r
+ event.source should fullyMatch regex "(invoker|controller)\\d+".r
case m: Metric =>
Seq(m.metricName) should contain oneOf ("ConcurrentInvocations", "ConcurrentRateLimit", "TimedRateLimit")
event.source should fullyMatch regex "controller\\d+".r