You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/04/30 11:10:00 UTC

[GitHub] markusthoemmes closed pull request #3552: Emit activation metadata to Kafka

markusthoemmes closed pull request #3552: Emit activation metadata to Kafka
URL: https://github.com/apache/incubator-openwhisk/pull/3552
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 0bff876bbf..f04807be57 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -324,3 +324,4 @@ metrics:
     host: "{{ metrics_kamon_statsd_host | default('') }}"
     port: "{{ metrics_kamon_statsd_port | default('8125') }}"
 
+user_events: "{{ user_events_enabled | default(false) }}"
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 3f2b6090ee..8173f0017b 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -151,6 +151,7 @@
       "CONFIG_whisk_db_actionsDdoc": "{{ db_whisk_actions_ddoc | default() }}"
       "CONFIG_whisk_db_activationsDdoc": "{{ db_whisk_activations_ddoc | default() }}"
       "CONFIG_whisk_db_activationsFilterDdoc": "{{ db_whisk_activations_filter_ddoc | default() }}"
+      "CONFIG_whisk_userEvents_enabled": "{{ user_events }}"
 
       "LIMITS_ACTIONS_INVOKES_PERMINUTE": "{{ limits.invocationsPerMinute }}"
       "LIMITS_ACTIONS_INVOKES_CONCURRENT": "{{ limits.concurrentInvocations }}"
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 5841269297..2643b548b4 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -175,6 +175,7 @@
         -e CONFIG_whisk_kafka_common_sslTruststorePassword='{{ kafka.ssl.keystore.password }}'
         -e CONFIG_whisk_kafka_common_sslKeystoreLocation='/conf/{{ kafka.ssl.keystore.name }}'
         -e CONFIG_whisk_kafka_common_sslKeystorePassword='{{ kafka.ssl.keystore.password }}'
+        -e CONFIG_whisk_userEvents_enabled='{{ user_events }}'
         -e ZOOKEEPER_HOSTS='{{ zookeeper_connect_string }}'
         -e CONFIG_whisk_couchdb_protocol='{{ db_protocol }}'
         -e CONFIG_whisk_couchdb_host='{{ db_host }}'
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index c12eb0278e..f665edb933 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -50,7 +50,8 @@ kamon {
 }
 
 whisk {
-    # kafka related configuration
+    # kafka related configuration, the complete list of parameters is here:
+    # https://kafka.apache.org/documentation/#brokerconfigs
     kafka {
         replication-factor = 1
 
@@ -98,6 +99,11 @@ whisk {
                 retention-ms      =  172800000
                 max-message-bytes = ${whisk.activation.payload.max}
             }
+            events {
+                segment-bytes   =  536870912
+                retention-bytes = 1073741824
+                retention-ms    = 3600000
+            }
         }
     }
     # db related configuration
@@ -136,6 +142,10 @@ whisk {
         local-image-prefix = "whisk"
     }
 
+    user-events {
+        enabled = false
+    }
+
     activation {
         payload {
             max = 1048576 // 5 m not possible because cross-referenced to kafka configurations
diff --git a/common/scala/src/main/scala/whisk/common/UserEvents.scala b/common/scala/src/main/scala/whisk/common/UserEvents.scala
new file mode 100644
index 0000000000..0bff0203ab
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/common/UserEvents.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.common
+
+import pureconfig.loadConfigOrThrow
+import whisk.core.ConfigKeys
+import whisk.core.connector.{EventMessage, MessageProducer}
+
+object UserEvents {
+
+  case class UserEventsConfig(enabled: Boolean)
+
+  val enabled = loadConfigOrThrow[UserEventsConfig](ConfigKeys.userEvents).enabled
+
+  def send(producer: MessageProducer, em: => EventMessage) = {
+    if (enabled) {
+      producer.send("events", em)
+    }
+  }
+}
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 1fd0154f86..231341fbfb 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -220,6 +220,7 @@ object ConfigKeys {
   val logLimit = "whisk.log-limit"
   val activation = "whisk.activation"
   val activationPayload = s"$activation.payload"
+  val userEvents = "whisk.user-events"
 
   val runtimes = "whisk.runtimes"
 
diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index 13e0ed6843..b17de8f31a 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -18,15 +18,9 @@
 package whisk.core.connector
 
 import scala.util.Try
-
 import spray.json._
 import whisk.common.TransactionId
-import whisk.core.entity.ActivationId
-import whisk.core.entity.DocRevision
-import whisk.core.entity.FullyQualifiedEntityName
-import whisk.core.entity.Identity
-import whisk.core.entity.InstanceId
-import whisk.core.entity.WhiskActivation
+import whisk.core.entity._
 
 /** Basic trait for messages that are sent on a message bus connector. */
 trait Message {
@@ -122,3 +116,85 @@ object PingMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(serdes.read(msg.parseJson))
   implicit val serdes = jsonFormat(PingMessage.apply _, "name")
 }
+
+trait EventMessageBody extends Message {
+  def typeName: String
+}
+
+object EventMessageBody extends DefaultJsonProtocol {
+
+  implicit def format = new JsonFormat[EventMessageBody] {
+    def write(eventMessageBody: EventMessageBody) = eventMessageBody match {
+      case m: Metric     => m.toJson
+      case a: Activation => a.toJson
+    }
+
+    def read(value: JsValue) =
+      if (value.asJsObject.fields.contains("metricName")) {
+        value.convertTo[Metric]
+      } else {
+        value.convertTo[Activation]
+      }
+  }
+}
+
+case class Activation(name: String,
+                      statusCode: Int,
+                      duration: Long,
+                      waitTime: Long,
+                      initTime: Long,
+                      kind: String,
+                      conductor: Boolean,
+                      memory: Int,
+                      causedBy: Boolean)
+    extends EventMessageBody {
+  val typeName = "Activation"
+  override def serialize = toJson.compactPrint
+
+  def toJson = Activation.activationFormat.write(this)
+}
+
+object Activation extends DefaultJsonProtocol {
+  def parse(msg: String) = Try(activationFormat.read(msg.parseJson))
+  implicit val activationFormat =
+    jsonFormat(
+      Activation.apply _,
+      "name",
+      "statusCode",
+      "duration",
+      "waitTime",
+      "initTime",
+      "kind",
+      "conductor",
+      "memory",
+      "causedBy")
+}
+
+case class Metric(metricName: String, metricValue: Long) extends EventMessageBody {
+  val typeName = "Metric"
+  override def serialize = toJson.compactPrint
+  def toJson = Metric.metricFormat.write(this).asJsObject
+}
+
+object Metric extends DefaultJsonProtocol {
+  def parse(msg: String) = Try(metricFormat.read(msg.parseJson))
+  implicit val metricFormat = jsonFormat(Metric.apply _, "metricName", "metricValue")
+}
+
+case class EventMessage(source: String,
+                        body: EventMessageBody,
+                        subject: Subject,
+                        namespace: String,
+                        userId: UUID,
+                        eventType: String,
+                        timestamp: Long = System.currentTimeMillis())
+    extends Message {
+  override def serialize = EventMessage.format.write(this).compactPrint
+}
+
+object EventMessage extends DefaultJsonProtocol {
+  implicit val format =
+    jsonFormat(EventMessage.apply _, "source", "body", "subject", "namespace", "userId", "eventType", "timestamp")
+
+  def parse(msg: String) = format.read(msg.parseJson)
+}
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index 095a8b6266..340df960e7 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -121,7 +121,8 @@ class Controller(val instance: InstanceId,
     SpiLoader.get[LoadBalancerProvider].loadBalancer(whiskConfig, instance)
   logging.info(this, s"loadbalancer initialized: ${loadBalancer.getClass.getSimpleName}")(TransactionId.controller)
 
-  private implicit val entitlementProvider = new LocalEntitlementProvider(whiskConfig, loadBalancer)
+  private implicit val entitlementProvider =
+    new LocalEntitlementProvider(whiskConfig, loadBalancer, instance)
   private implicit val activationIdFactory = new ActivationIdGenerator {}
   private implicit val logStore = SpiLoader.get[LogStoreProvider].logStore(actorSystem)
 
@@ -227,6 +228,10 @@ object Controller {
       abort(s"failure during msgProvider.ensureTopic for topic cacheInvalidation")
     }
 
+    if (!msgProvider.ensureTopic(config, topic = "events", topicConfig = "events")) {
+      abort(s"failure during msgProvider.ensureTopic for topic events")
+    }
+
     ExecManifest.initialize(config) match {
       case Success(_) =>
         val controller = new Controller(
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
index 563e7fe0dc..b99385accc 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
@@ -69,14 +69,17 @@ class ActivationThrottler(loadBalancer: LoadBalancer, concurrencyLimit: Identity
 sealed trait RateLimit {
   def ok: Boolean
   def errorMsg: String
+  def limitName: String
 }
 
 case class ConcurrentRateLimit(count: Int, allowed: Int) extends RateLimit {
   val ok: Boolean = count < allowed // must have slack for the current activation request
   override def errorMsg: String = Messages.tooManyConcurrentRequests(count, allowed)
+  val limitName: String = "ConcurrentRateLimit"
 }
 
 case class TimedRateLimit(count: Int, allowed: Int) extends RateLimit {
   val ok: Boolean = count <= allowed // the count is already updated to account for the current request
   override def errorMsg: String = Messages.tooManyRequests(count, allowed)
+  val limitName: String = "TimedRateLimit"
 }
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
index 1aa6e3c341..478bda4562 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
@@ -27,9 +27,10 @@ import akka.http.scaladsl.model.StatusCodes.Forbidden
 import akka.http.scaladsl.model.StatusCodes.TooManyRequests
 import whisk.core.entitlement.Privilege.ACTIVATE
 import whisk.core.entitlement.Privilege.REJECT
-import whisk.common.Logging
-import whisk.common.TransactionId
+import whisk.common.{Logging, TransactionId, UserEvents}
+import whisk.connector.kafka.KafkaMessagingProvider
 import whisk.core.WhiskConfig
+import whisk.core.connector.{EventMessage, Metric}
 import whisk.core.controller.RejectRequest
 import whisk.core.entity._
 import whisk.core.loadBalancer.{LoadBalancer, ShardingContainerPoolBalancer}
@@ -74,9 +75,10 @@ protected[core] object EntitlementProvider {
  * A trait that implements entitlements to resources. It performs checks for CRUD and Acivation requests.
  * This is where enforcement of activation quotas takes place, in additional to basic authorization.
  */
-protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBalancer: LoadBalancer)(
-  implicit actorSystem: ActorSystem,
-  logging: Logging) {
+protected[core] abstract class EntitlementProvider(
+  config: WhiskConfig,
+  loadBalancer: LoadBalancer,
+  controllerInstance: InstanceId)(implicit actorSystem: ActorSystem, logging: Logging) {
 
   private implicit val executionContext: ExecutionContext = actorSystem.dispatcher
 
@@ -142,6 +144,8 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
       activationThrottleCalculator(config.actionInvokeConcurrentLimit.toInt, _.limits.concurrentInvocations),
       config.actionInvokeSystemOverloadLimit.toInt)
 
+  private val eventProducer = KafkaMessagingProvider.getProducer(this.config)
+
   /**
    * Grants a subject the right to access a resources.
    *
@@ -358,10 +362,37 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
   private def checkThrottleOverload(throttle: Future[RateLimit], user: Identity)(
     implicit transid: TransactionId): Future[Unit] = {
     throttle.flatMap { limit =>
+      val userId = user.authkey.uuid
       if (limit.ok) {
+        limit match {
+          case c: ConcurrentRateLimit => {
+            val metric =
+              Metric("ConcurrentInvocations", c.count + 1)
+            UserEvents.send(
+              eventProducer,
+              EventMessage(
+                s"controller${controllerInstance.instance}",
+                metric,
+                user.subject,
+                user.namespace.toString,
+                userId,
+                metric.typeName))
+          }
+          case _ => // ignore
+        }
         Future.successful(())
       } else {
         logging.info(this, s"'${user.namespace}' has exceeded its throttle limit, ${limit.errorMsg}")
+        val metric = Metric(limit.limitName, 1)
+        UserEvents.send(
+          eventProducer,
+          EventMessage(
+            s"controller${controllerInstance.instance}",
+            metric,
+            user.subject,
+            user.namespace.toString,
+            userId,
+            metric.typeName))
         Future.failed(RejectRequest(TooManyRequests, limit.errorMsg))
       }
     }
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/LocalEntitlement.scala b/core/controller/src/main/scala/whisk/core/entitlement/LocalEntitlement.scala
index e9155f7219..fa0edf7e0b 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/LocalEntitlement.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/LocalEntitlement.scala
@@ -19,13 +19,11 @@ package whisk.core.entitlement
 
 import scala.collection.concurrent.TrieMap
 import scala.concurrent.Future
-
 import akka.actor.ActorSystem
-
 import whisk.common.Logging
 import whisk.common.TransactionId
 import whisk.core.WhiskConfig
-import whisk.core.entity.Subject
+import whisk.core.entity.{InstanceId, Subject}
 import whisk.core.loadBalancer.LoadBalancer
 
 private object LocalEntitlementProvider {
@@ -34,10 +32,11 @@ private object LocalEntitlementProvider {
   private val matrix = TrieMap[(Subject, String), Set[Privilege]]()
 }
 
-protected[core] class LocalEntitlementProvider(private val config: WhiskConfig, private val loadBalancer: LoadBalancer)(
-  implicit actorSystem: ActorSystem,
-  logging: Logging)
-    extends EntitlementProvider(config, loadBalancer) {
+protected[core] class LocalEntitlementProvider(
+  private val config: WhiskConfig,
+  private val loadBalancer: LoadBalancer,
+  private val controllerInstance: InstanceId)(implicit actorSystem: ActorSystem, logging: Logging)
+    extends EntitlementProvider(config, loadBalancer, controllerInstance) {
 
   private implicit val executionContext = actorSystem.dispatcher
 
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index fddafd56a1..b75ad72a7e 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -95,7 +95,7 @@ case object RescheduleJob // job is sent back to parent and could not be process
  */
 class ContainerProxy(
   factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container],
-  sendActiveAck: (TransactionId, WhiskActivation, Boolean, InstanceId) => Future[Any],
+  sendActiveAck: (TransactionId, WhiskActivation, Boolean, InstanceId, UUID) => Future[Any],
   storeActivation: (TransactionId, WhiskActivation) => Future[Any],
   collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
   instance: InstanceId,
@@ -161,7 +161,7 @@ class ContainerProxy(
             // implicitly via a FailureMessage which will be processed later when the state
             // transitions to Running
             val activation = ContainerProxy.constructWhiskActivation(job, None, Interval.zero, response)
-            sendActiveAck(transid, activation, job.msg.blocking, job.msg.rootControllerIndex)
+            sendActiveAck(transid, activation, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.authkey.uuid)
             storeActivation(transid, activation)
         }
         .flatMap { container =>
@@ -380,7 +380,7 @@ class ContainerProxy(
       }
 
     // Sending active ack. Entirely asynchronous and not waited upon.
-    activation.foreach(sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex))
+    activation.foreach(sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.authkey.uuid))
 
     // Adds logs to the raw activation.
     val activationWithLogs: Future[Either[ActivationLogReadingError, WhiskActivation]] = activation
@@ -422,7 +422,7 @@ final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration, paus
 object ContainerProxy {
   def props(
     factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container],
-    ack: (TransactionId, WhiskActivation, Boolean, InstanceId) => Future[Any],
+    ack: (TransactionId, WhiskActivation, Boolean, InstanceId, UUID) => Future[Any],
     store: (TransactionId, WhiskActivation) => Future[Any],
     collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
     instance: InstanceId,
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 8601e4c9da..b132dd815f 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -26,7 +26,7 @@ import akka.stream.ActorMaterializer
 import org.apache.kafka.common.errors.RecordTooLargeException
 import pureconfig._
 import spray.json._
-import whisk.common.{Logging, LoggingMarkers, Scheduler, TransactionId}
+import whisk.common._
 import whisk.core.{ConfigKeys, WhiskConfig}
 import whisk.core.connector._
 import whisk.core.containerpool._
@@ -40,6 +40,7 @@ import whisk.spi.SpiLoader
 import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.duration._
 import scala.util.{Failure, Success}
+import DefaultJsonProtocol._
 
 class InvokerReactive(
   config: WhiskConfig,
@@ -112,12 +113,12 @@ class InvokerReactive(
   private val ack = (tid: TransactionId,
                      activationResult: WhiskActivation,
                      blockingInvoke: Boolean,
-                     controllerInstance: InstanceId) => {
+                     controllerInstance: InstanceId,
+                     userId: UUID) => {
     implicit val transid: TransactionId = tid
 
     def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = {
       val msg = CompletionMessage(transid, res, instance)
-
       producer.send(s"completed${controllerInstance.toInt}", msg).andThen {
         case Success(_) =>
           logging.info(
@@ -125,6 +126,30 @@ class InvokerReactive(
             s"posted ${if (recovery) "recovery" else "completion"} of activation ${activationResult.activationId}")
       }
     }
+    // Potentially sends activation metadata to kafka if user events are enabled
+    UserEvents.send(
+      producer, {
+        val activation = Activation(
+          activationResult.namespace + EntityPath.PATHSEP + activationResult.name,
+          activationResult.response.statusCode,
+          activationResult.duration.getOrElse(0),
+          activationResult.annotations.getAs[Long](WhiskActivation.waitTimeAnnotation).getOrElse(0),
+          activationResult.annotations.getAs[Long](WhiskActivation.initTimeAnnotation).getOrElse(0),
+          activationResult.annotations.getAs[String](WhiskActivation.kindAnnotation).getOrElse("unknown_kind"),
+          activationResult.annotations.getAs[Boolean](WhiskActivation.conductorAnnotation).getOrElse(false),
+          activationResult.annotations
+            .getAs[ActionLimits](WhiskActivation.limitsAnnotation)
+            .map(al => al.memory.megabytes)
+            .getOrElse(0),
+          activationResult.annotations.getAs[Boolean](WhiskActivation.causedByAnnotation).getOrElse(false))
+        EventMessage(
+          s"invoker${instance.instance}",
+          activation,
+          activationResult.subject,
+          activationResult.namespace.toString,
+          userId,
+          activation.typeName)
+      })
 
     send(Right(if (blockingInvoke) activationResult else activationResult.withoutLogsOrResult)).recoverWith {
       case t if t.getCause.isInstanceOf[RecordTooLargeException] =>
@@ -209,7 +234,7 @@ class InvokerReactive(
 
                 val activation = generateFallbackActivation(msg, response)
                 activationFeed ! MessageFeed.Processed
-                ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex)
+                ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex, msg.user.authkey.uuid)
                 store(msg.transid, activation)
                 Future.successful(())
             }
@@ -219,7 +244,7 @@ class InvokerReactive(
           activationFeed ! MessageFeed.Processed
           val activation =
             generateFallbackActivation(msg, ActivationResponse.applicationError(Messages.namespacesBlacklisted))
-          ack(msg.transid, activation, false, msg.rootControllerIndex)
+          ack(msg.transid, activation, false, msg.rootControllerIndex, msg.user.authkey.uuid)
           logging.warn(this, s"namespace ${msg.user.namespace} was blocked in invoker.")
           Future.successful(())
         }
diff --git a/docs/metrics.md b/docs/metrics.md
index 9ba60efa64..3d2bb5e00d 100644
--- a/docs/metrics.md
+++ b/docs/metrics.md
@@ -17,9 +17,14 @@
 -->
 # Openwhisk Metric Support
 
-Openwhick contains the capability to send metric information to a statsd server. This capability is disabled per default. Instead metric information is normally written to the log files in logmarker format.
+Openwhisk distinguishes between system and user metrics (events).
 
-## Configuration
+System metrics typically contain information about system performance and provide a possibility to send them to Kamon or write them to log files in logmarker format. This metrics are typically used by OpenWhisk providers/operators.
+
+User metrics encompass information about action performance which is sent to Kafka in a form of events. These metrics are to be consumed by OpenWhisk users, however they could be also used for billing or audit purpose. It is to be noted that at the moment the events are not directly exposed to the users and require an additional Kakfa Consumer based micro-service for data processing.
+
+## System specific metrics
+### Configuration
 
 Both capabilties can be enabled or disabled separately during deployment via Ansible configuration in the 'goup_vars/all' file of an  environment.
 
@@ -60,7 +65,7 @@ metrics_kamon_statsd_port: '8125'
 metrics_log: true
 ```
 
-## Testing the statsd metric support
+### Testing the statsd metric support
 
 The Kamon project privides an integrated docker image containing statsd and a connected Grafana dashboard via [this Github project](https://github.com/kamon-io/docker-grafana-graphite). This image is helpful for testing the metrices sent via statsd.
 
@@ -69,3 +74,39 @@ Please follow these [instructions](https://github.com/kamon-io/docker-grafana-gr
 The docker image exposes statsd via the (standard) port 8125 and a Graphana dashboard via port 8080 on your docker host.
 
 The address of your docker host has to be configured in the `metrics_kamon_statsd_host` configuration property.
+
+## User specific metrics
+### Configuration
+User metrics are enabled by default and could be explicitly disabled by setting the following property in one of the Ansible configuration files:
+```
+user_events: false
+```
+
+### Supported events
+Activation is an event that occurs after after each activation. It includes the following execution metadata:
+```
+waitTime - internal system hold time
+initTime - time it took to initialise an action, e.g. docker init
+statusCode - status code of the invocation: 0 - success, 1 - application error, 2 - action developer error, 3 - internal OpenWhisk error
+duration - actual time the action code was running
+kind - action flavor, e.g. nodejs
+conductor - true for conductor backed actions
+memory - maximum memory allowed for action container
+causedBy - true for sequence actions
+```
+Metric is any user specific event produced by the system and it at this moment includes the following information:
+```
+ConcurrentRateLimit - a user has exceeded its limit for concurrent invocations.
+TimedRateLimit - the user has reached its per minute limit for the number of invocations.
+ConcurrentInvocations - the number of in flight invocations per user.
+```
+
+Example events that could be consumed from Kafka.
+Activation:
+```
+{"body":{"statusCode":0,"duration":3,"name":"whisk.system/invokerHealthTestAction0","waitTime":583915671,"conductor":false,"kind":"nodejs:6","initTime":0,"memory": 256, "causedBy": false},"eventType":"Activation","source":"invoker0","subject":"whisk.system","timestamp":1524476122676,"userId":"d0888ad5-5a92-435e-888a-d55a92935e54","namespace":"whisk.system"}
+```
+Metric:
+```
+{"body":{"metricName":"ConcurrentInvocations","metricValue":1},"eventType":"Metric","source":"controller0","subject":"guest","timestamp":1524476104419,"userId":"23bc46b1-71f6-4ed5-8c54-816aa4f8c502","namespace":"guest"}
+```
diff --git a/tests/src/test/resources/application.conf.j2 b/tests/src/test/resources/application.conf.j2
index 6054bda977..62fa019d1e 100644
--- a/tests/src/test/resources/application.conf.j2
+++ b/tests/src/test/resources/application.conf.j2
@@ -58,4 +58,7 @@ whisk {
         client-auth = "{{ controller.ssl.clientAuth }}"
       }
     }
+    user-events {
+        enabled = {{ user_events }}
+    }
 }
diff --git a/tests/src/test/scala/whisk/common/UserEventTests.scala b/tests/src/test/scala/whisk/common/UserEventTests.scala
new file mode 100644
index 0000000000..661b07ec84
--- /dev/null
+++ b/tests/src/test/scala/whisk/common/UserEventTests.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.common
+
+import java.nio.charset.StandardCharsets
+
+import akka.actor.ActorSystem
+import common._
+import common.rest.WskRest
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import whisk.connector.kafka.KafkaConsumerConnector
+import whisk.core.WhiskConfig
+import whisk.core.connector.{Activation, EventMessage, Metric}
+
+import scala.concurrent.duration._
+
+@RunWith(classOf[JUnitRunner])
+class UserEventTests extends FlatSpec with Matchers with WskTestHelpers with StreamLogging with BeforeAndAfterAll {
+
+  implicit val wskprops = WskProps()
+  implicit val system = ActorSystem("UserEventTestSystem")
+  val config = new WhiskConfig(WhiskConfig.kafkaHosts)
+
+  val wsk = new WskRest
+
+  val groupid = "kafkatest"
+  val topic = "events"
+  val maxPollInterval = 10.seconds
+
+  val consumer = new KafkaConsumerConnector(config.kafkaHosts, groupid, topic)
+  val testActionsDir = WhiskProperties.getFileRelativeToWhiskHome("tests/dat/actions")
+  behavior of "UserEvents"
+
+  override def afterAll() {
+    consumer.close()
+  }
+
+  if (UserEvents.enabled) {
+    it should "invoke an action and produce user events" in withAssetCleaner(wskprops) { (wp, assetHelper) =>
+      val file = Some(TestUtils.getTestActionFilename("hello.js"))
+      val name = "testUserEvents"
+
+      assetHelper.withCleaner(wsk.action, name, confirmDelete = true) { (action, _) =>
+        action.create(name, file)
+      }
+
+      val run = wsk.action.invoke(name, blocking = true)
+
+      withActivation(wsk.activation, run) { result =>
+        withClue("invoking an action was unsuccessful") {
+          result.response.status shouldBe "success"
+        }
+      }
+      // checking for any metrics to arrive
+      val received =
+        consumer.peek(maxPollInterval).map {
+          case (_, _, _, msg) => EventMessage.parse(new String(msg, StandardCharsets.UTF_8))
+        }
+      received.map(event => {
+        event.body match {
+          case a: Activation =>
+            Seq(a.statusCode) should contain oneOf (0, 1, 2, 3)
+            event.source should fullyMatch regex "invoker\\d+".r
+          case m: Metric =>
+            Seq(m.metricName) should contain oneOf ("ConcurrentInvocations", "ConcurrentRateLimit", "TimedRateLimit")
+            event.source should fullyMatch regex "controller\\d+".r
+        }
+      })
+      // produce at least 2 events - an Activation and a 'ConcurrentInvocations' Metric
+      // >= 2 is due to events that might have potentially occurred in between
+      received.size should be >= 2
+      consumer.commit()
+    }
+
+  }
+
+}
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 0055372f1e..fcde7e70eb 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -150,7 +150,7 @@ class ContainerProxyTests
 
   /** Creates an inspectable version of the ack method, which records all calls in a buffer */
   def createAcker(a: ExecutableWhiskAction = action) = LoggedFunction {
-    (_: TransactionId, activation: WhiskActivation, _: Boolean, _: InstanceId) =>
+    (_: TransactionId, activation: WhiskActivation, _: Boolean, _: InstanceId, _: UUID) =>
       activation.annotations.get("limits") shouldBe Some(a.limits.toJson)
       activation.annotations.get("path") shouldBe Some(a.fullyQualifiedName(false).toString.toJson)
       activation.annotations.get("kind") shouldBe Some(a.exec.kind.toJson)
@@ -219,7 +219,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
 
     preWarm(machine)
@@ -255,7 +256,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
     preWarm(machine)
 
@@ -302,7 +304,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
     preWarm(machine)
 
@@ -340,7 +343,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
     run(machine, Uninitialized)
 
@@ -372,7 +376,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
 
     machine ! Run(noLogsAction, message)
@@ -402,7 +407,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
@@ -437,7 +443,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
@@ -476,7 +483,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
@@ -506,7 +514,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
@@ -535,7 +544,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
@@ -568,7 +578,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, createCollector(), InstanceId(0), poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, createCollector(), InstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
     run(machine, Uninitialized) // first run an activation
     timeout(machine) // times out Ready state so container suspends
@@ -603,7 +614,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, createCollector(), InstanceId(0), poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, createCollector(), InstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
     run(machine, Uninitialized)
     timeout(machine) // times out Ready state so container suspends
@@ -639,7 +651,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
 
     // Start running the action
@@ -690,7 +703,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
     run(machine, Uninitialized)
     timeout(machine)
diff --git a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
index 662c6d7fb5..e9df1d530b 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
@@ -67,7 +67,7 @@ protected trait ControllerTestCommon
   override implicit val actorSystem = system // defined in ScalatestRouteTest
   override val executionContext = actorSystem.dispatcher
 
-  override val whiskConfig = new WhiskConfig(RestApiCommons.requiredProperties)
+  override val whiskConfig = new WhiskConfig(RestApiCommons.requiredProperties ++ WhiskConfig.kafkaHosts)
   assert(whiskConfig.isValid)
 
   // initialize runtimes manifest
@@ -75,7 +75,8 @@ protected trait ControllerTestCommon
 
   override val loadBalancer = new DegenerateLoadBalancerService(whiskConfig)
 
-  override lazy val entitlementProvider: EntitlementProvider = new LocalEntitlementProvider(whiskConfig, loadBalancer)
+  override lazy val entitlementProvider: EntitlementProvider =
+    new LocalEntitlementProvider(whiskConfig, loadBalancer, instance)
 
   override val activationIdFactory = new ActivationId.ActivationIdGenerator() {
     // need a static activation id to test activations api
diff --git a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
index 59d4b8e2ca..97d5999401 100644
--- a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
@@ -1729,7 +1729,7 @@ trait WebActionsApiBaseTests extends ControllerTestCommon with BeforeAndAfterEac
   }
 
   class TestingEntitlementProvider(config: WhiskConfig, loadBalancer: LoadBalancer)
-      extends EntitlementProvider(config, loadBalancer) {
+      extends EntitlementProvider(config, loadBalancer, InstanceId(0)) {
 
     protected[core] override def checkThrottles(user: Identity)(implicit transid: TransactionId): Future[Unit] = {
       val subject = user.subject


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services