You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/04/30 11:10:00 UTC
[GitHub] markusthoemmes closed pull request #3552: Emit activation metadata
to Kafka
markusthoemmes closed pull request #3552: Emit activation metadata to Kafka
URL: https://github.com/apache/incubator-openwhisk/pull/3552
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 0bff876bbf..f04807be57 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -324,3 +324,4 @@ metrics:
host: "{{ metrics_kamon_statsd_host | default('') }}"
port: "{{ metrics_kamon_statsd_port | default('8125') }}"
+user_events: "{{ user_events_enabled | default(false) }}"
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 3f2b6090ee..8173f0017b 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -151,6 +151,7 @@
"CONFIG_whisk_db_actionsDdoc": "{{ db_whisk_actions_ddoc | default() }}"
"CONFIG_whisk_db_activationsDdoc": "{{ db_whisk_activations_ddoc | default() }}"
"CONFIG_whisk_db_activationsFilterDdoc": "{{ db_whisk_activations_filter_ddoc | default() }}"
+ "CONFIG_whisk_userEvents_enabled": "{{ user_events }}"
"LIMITS_ACTIONS_INVOKES_PERMINUTE": "{{ limits.invocationsPerMinute }}"
"LIMITS_ACTIONS_INVOKES_CONCURRENT": "{{ limits.concurrentInvocations }}"
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 5841269297..2643b548b4 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -175,6 +175,7 @@
-e CONFIG_whisk_kafka_common_sslTruststorePassword='{{ kafka.ssl.keystore.password }}'
-e CONFIG_whisk_kafka_common_sslKeystoreLocation='/conf/{{ kafka.ssl.keystore.name }}'
-e CONFIG_whisk_kafka_common_sslKeystorePassword='{{ kafka.ssl.keystore.password }}'
+ -e CONFIG_whisk_userEvents_enabled='{{ user_events }}'
-e ZOOKEEPER_HOSTS='{{ zookeeper_connect_string }}'
-e CONFIG_whisk_couchdb_protocol='{{ db_protocol }}'
-e CONFIG_whisk_couchdb_host='{{ db_host }}'
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index c12eb0278e..f665edb933 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -50,7 +50,8 @@ kamon {
}
whisk {
- # kafka related configuration
+ # kafka related configuration, the complete list of parameters is here:
+ # https://kafka.apache.org/documentation/#brokerconfigs
kafka {
replication-factor = 1
@@ -98,6 +99,11 @@ whisk {
retention-ms = 172800000
max-message-bytes = ${whisk.activation.payload.max}
}
+ events {
+ segment-bytes = 536870912
+ retention-bytes = 1073741824
+ retention-ms = 3600000
+ }
}
}
# db related configuration
@@ -136,6 +142,10 @@ whisk {
local-image-prefix = "whisk"
}
+ user-events {
+ enabled = false
+ }
+
activation {
payload {
max = 1048576 // 5 m not possible because cross-referenced to kafka configurations
diff --git a/common/scala/src/main/scala/whisk/common/UserEvents.scala b/common/scala/src/main/scala/whisk/common/UserEvents.scala
new file mode 100644
index 0000000000..0bff0203ab
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/common/UserEvents.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.common
+
+import pureconfig.loadConfigOrThrow
+import whisk.core.ConfigKeys
+import whisk.core.connector.{EventMessage, MessageProducer}
+
+object UserEvents {
+
+ case class UserEventsConfig(enabled: Boolean)
+
+ val enabled = loadConfigOrThrow[UserEventsConfig](ConfigKeys.userEvents).enabled
+
+ def send(producer: MessageProducer, em: => EventMessage) = {
+ if (enabled) {
+ producer.send("events", em)
+ }
+ }
+}
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 1fd0154f86..231341fbfb 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -220,6 +220,7 @@ object ConfigKeys {
val logLimit = "whisk.log-limit"
val activation = "whisk.activation"
val activationPayload = s"$activation.payload"
+ val userEvents = "whisk.user-events"
val runtimes = "whisk.runtimes"
diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index 13e0ed6843..b17de8f31a 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -18,15 +18,9 @@
package whisk.core.connector
import scala.util.Try
-
import spray.json._
import whisk.common.TransactionId
-import whisk.core.entity.ActivationId
-import whisk.core.entity.DocRevision
-import whisk.core.entity.FullyQualifiedEntityName
-import whisk.core.entity.Identity
-import whisk.core.entity.InstanceId
-import whisk.core.entity.WhiskActivation
+import whisk.core.entity._
/** Basic trait for messages that are sent on a message bus connector. */
trait Message {
@@ -122,3 +116,85 @@ object PingMessage extends DefaultJsonProtocol {
def parse(msg: String) = Try(serdes.read(msg.parseJson))
implicit val serdes = jsonFormat(PingMessage.apply _, "name")
}
+
+trait EventMessageBody extends Message {
+ def typeName: String
+}
+
+object EventMessageBody extends DefaultJsonProtocol {
+
+ implicit def format = new JsonFormat[EventMessageBody] {
+ def write(eventMessageBody: EventMessageBody) = eventMessageBody match {
+ case m: Metric => m.toJson
+ case a: Activation => a.toJson
+ }
+
+ def read(value: JsValue) =
+ if (value.asJsObject.fields.contains("metricName")) {
+ value.convertTo[Metric]
+ } else {
+ value.convertTo[Activation]
+ }
+ }
+}
+
+case class Activation(name: String,
+ statusCode: Int,
+ duration: Long,
+ waitTime: Long,
+ initTime: Long,
+ kind: String,
+ conductor: Boolean,
+ memory: Int,
+ causedBy: Boolean)
+ extends EventMessageBody {
+ val typeName = "Activation"
+ override def serialize = toJson.compactPrint
+
+ def toJson = Activation.activationFormat.write(this)
+}
+
+object Activation extends DefaultJsonProtocol {
+ def parse(msg: String) = Try(activationFormat.read(msg.parseJson))
+ implicit val activationFormat =
+ jsonFormat(
+ Activation.apply _,
+ "name",
+ "statusCode",
+ "duration",
+ "waitTime",
+ "initTime",
+ "kind",
+ "conductor",
+ "memory",
+ "causedBy")
+}
+
+case class Metric(metricName: String, metricValue: Long) extends EventMessageBody {
+ val typeName = "Metric"
+ override def serialize = toJson.compactPrint
+ def toJson = Metric.metricFormat.write(this).asJsObject
+}
+
+object Metric extends DefaultJsonProtocol {
+ def parse(msg: String) = Try(metricFormat.read(msg.parseJson))
+ implicit val metricFormat = jsonFormat(Metric.apply _, "metricName", "metricValue")
+}
+
+case class EventMessage(source: String,
+ body: EventMessageBody,
+ subject: Subject,
+ namespace: String,
+ userId: UUID,
+ eventType: String,
+ timestamp: Long = System.currentTimeMillis())
+ extends Message {
+ override def serialize = EventMessage.format.write(this).compactPrint
+}
+
+object EventMessage extends DefaultJsonProtocol {
+ implicit val format =
+ jsonFormat(EventMessage.apply _, "source", "body", "subject", "namespace", "userId", "eventType", "timestamp")
+
+ def parse(msg: String) = format.read(msg.parseJson)
+}
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index 095a8b6266..340df960e7 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -121,7 +121,8 @@ class Controller(val instance: InstanceId,
SpiLoader.get[LoadBalancerProvider].loadBalancer(whiskConfig, instance)
logging.info(this, s"loadbalancer initialized: ${loadBalancer.getClass.getSimpleName}")(TransactionId.controller)
- private implicit val entitlementProvider = new LocalEntitlementProvider(whiskConfig, loadBalancer)
+ private implicit val entitlementProvider =
+ new LocalEntitlementProvider(whiskConfig, loadBalancer, instance)
private implicit val activationIdFactory = new ActivationIdGenerator {}
private implicit val logStore = SpiLoader.get[LogStoreProvider].logStore(actorSystem)
@@ -227,6 +228,10 @@ object Controller {
abort(s"failure during msgProvider.ensureTopic for topic cacheInvalidation")
}
+ if (!msgProvider.ensureTopic(config, topic = "events", topicConfig = "events")) {
+ abort(s"failure during msgProvider.ensureTopic for topic events")
+ }
+
ExecManifest.initialize(config) match {
case Success(_) =>
val controller = new Controller(
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
index 563e7fe0dc..b99385accc 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
@@ -69,14 +69,17 @@ class ActivationThrottler(loadBalancer: LoadBalancer, concurrencyLimit: Identity
sealed trait RateLimit {
def ok: Boolean
def errorMsg: String
+ def limitName: String
}
case class ConcurrentRateLimit(count: Int, allowed: Int) extends RateLimit {
val ok: Boolean = count < allowed // must have slack for the current activation request
override def errorMsg: String = Messages.tooManyConcurrentRequests(count, allowed)
+ val limitName: String = "ConcurrentRateLimit"
}
case class TimedRateLimit(count: Int, allowed: Int) extends RateLimit {
val ok: Boolean = count <= allowed // the count is already updated to account for the current request
override def errorMsg: String = Messages.tooManyRequests(count, allowed)
+ val limitName: String = "TimedRateLimit"
}
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
index 1aa6e3c341..478bda4562 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
@@ -27,9 +27,10 @@ import akka.http.scaladsl.model.StatusCodes.Forbidden
import akka.http.scaladsl.model.StatusCodes.TooManyRequests
import whisk.core.entitlement.Privilege.ACTIVATE
import whisk.core.entitlement.Privilege.REJECT
-import whisk.common.Logging
-import whisk.common.TransactionId
+import whisk.common.{Logging, TransactionId, UserEvents}
+import whisk.connector.kafka.KafkaMessagingProvider
import whisk.core.WhiskConfig
+import whisk.core.connector.{EventMessage, Metric}
import whisk.core.controller.RejectRequest
import whisk.core.entity._
import whisk.core.loadBalancer.{LoadBalancer, ShardingContainerPoolBalancer}
@@ -74,9 +75,10 @@ protected[core] object EntitlementProvider {
* A trait that implements entitlements to resources. It performs checks for CRUD and Acivation requests.
* This is where enforcement of activation quotas takes place, in additional to basic authorization.
*/
-protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBalancer: LoadBalancer)(
- implicit actorSystem: ActorSystem,
- logging: Logging) {
+protected[core] abstract class EntitlementProvider(
+ config: WhiskConfig,
+ loadBalancer: LoadBalancer,
+ controllerInstance: InstanceId)(implicit actorSystem: ActorSystem, logging: Logging) {
private implicit val executionContext: ExecutionContext = actorSystem.dispatcher
@@ -142,6 +144,8 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
activationThrottleCalculator(config.actionInvokeConcurrentLimit.toInt, _.limits.concurrentInvocations),
config.actionInvokeSystemOverloadLimit.toInt)
+ private val eventProducer = KafkaMessagingProvider.getProducer(this.config)
+
/**
* Grants a subject the right to access a resources.
*
@@ -358,10 +362,37 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
private def checkThrottleOverload(throttle: Future[RateLimit], user: Identity)(
implicit transid: TransactionId): Future[Unit] = {
throttle.flatMap { limit =>
+ val userId = user.authkey.uuid
if (limit.ok) {
+ limit match {
+ case c: ConcurrentRateLimit => {
+ val metric =
+ Metric("ConcurrentInvocations", c.count + 1)
+ UserEvents.send(
+ eventProducer,
+ EventMessage(
+ s"controller${controllerInstance.instance}",
+ metric,
+ user.subject,
+ user.namespace.toString,
+ userId,
+ metric.typeName))
+ }
+ case _ => // ignore
+ }
Future.successful(())
} else {
logging.info(this, s"'${user.namespace}' has exceeded its throttle limit, ${limit.errorMsg}")
+ val metric = Metric(limit.limitName, 1)
+ UserEvents.send(
+ eventProducer,
+ EventMessage(
+ s"controller${controllerInstance.instance}",
+ metric,
+ user.subject,
+ user.namespace.toString,
+ userId,
+ metric.typeName))
Future.failed(RejectRequest(TooManyRequests, limit.errorMsg))
}
}
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/LocalEntitlement.scala b/core/controller/src/main/scala/whisk/core/entitlement/LocalEntitlement.scala
index e9155f7219..fa0edf7e0b 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/LocalEntitlement.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/LocalEntitlement.scala
@@ -19,13 +19,11 @@ package whisk.core.entitlement
import scala.collection.concurrent.TrieMap
import scala.concurrent.Future
-
import akka.actor.ActorSystem
-
import whisk.common.Logging
import whisk.common.TransactionId
import whisk.core.WhiskConfig
-import whisk.core.entity.Subject
+import whisk.core.entity.{InstanceId, Subject}
import whisk.core.loadBalancer.LoadBalancer
private object LocalEntitlementProvider {
@@ -34,10 +32,11 @@ private object LocalEntitlementProvider {
private val matrix = TrieMap[(Subject, String), Set[Privilege]]()
}
-protected[core] class LocalEntitlementProvider(private val config: WhiskConfig, private val loadBalancer: LoadBalancer)(
- implicit actorSystem: ActorSystem,
- logging: Logging)
- extends EntitlementProvider(config, loadBalancer) {
+protected[core] class LocalEntitlementProvider(
+ private val config: WhiskConfig,
+ private val loadBalancer: LoadBalancer,
+ private val controllerInstance: InstanceId)(implicit actorSystem: ActorSystem, logging: Logging)
+ extends EntitlementProvider(config, loadBalancer, controllerInstance) {
private implicit val executionContext = actorSystem.dispatcher
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index fddafd56a1..b75ad72a7e 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -95,7 +95,7 @@ case object RescheduleJob // job is sent back to parent and could not be process
*/
class ContainerProxy(
factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container],
- sendActiveAck: (TransactionId, WhiskActivation, Boolean, InstanceId) => Future[Any],
+ sendActiveAck: (TransactionId, WhiskActivation, Boolean, InstanceId, UUID) => Future[Any],
storeActivation: (TransactionId, WhiskActivation) => Future[Any],
collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
instance: InstanceId,
@@ -161,7 +161,7 @@ class ContainerProxy(
// implicitly via a FailureMessage which will be processed later when the state
// transitions to Running
val activation = ContainerProxy.constructWhiskActivation(job, None, Interval.zero, response)
- sendActiveAck(transid, activation, job.msg.blocking, job.msg.rootControllerIndex)
+ sendActiveAck(transid, activation, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.authkey.uuid)
storeActivation(transid, activation)
}
.flatMap { container =>
@@ -380,7 +380,7 @@ class ContainerProxy(
}
// Sending active ack. Entirely asynchronous and not waited upon.
- activation.foreach(sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex))
+ activation.foreach(sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.authkey.uuid))
// Adds logs to the raw activation.
val activationWithLogs: Future[Either[ActivationLogReadingError, WhiskActivation]] = activation
@@ -422,7 +422,7 @@ final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration, paus
object ContainerProxy {
def props(
factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container],
- ack: (TransactionId, WhiskActivation, Boolean, InstanceId) => Future[Any],
+ ack: (TransactionId, WhiskActivation, Boolean, InstanceId, UUID) => Future[Any],
store: (TransactionId, WhiskActivation) => Future[Any],
collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
instance: InstanceId,
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 8601e4c9da..b132dd815f 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -26,7 +26,7 @@ import akka.stream.ActorMaterializer
import org.apache.kafka.common.errors.RecordTooLargeException
import pureconfig._
import spray.json._
-import whisk.common.{Logging, LoggingMarkers, Scheduler, TransactionId}
+import whisk.common._
import whisk.core.{ConfigKeys, WhiskConfig}
import whisk.core.connector._
import whisk.core.containerpool._
@@ -40,6 +40,7 @@ import whisk.spi.SpiLoader
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success}
+import DefaultJsonProtocol._
class InvokerReactive(
config: WhiskConfig,
@@ -112,12 +113,12 @@ class InvokerReactive(
private val ack = (tid: TransactionId,
activationResult: WhiskActivation,
blockingInvoke: Boolean,
- controllerInstance: InstanceId) => {
+ controllerInstance: InstanceId,
+ userId: UUID) => {
implicit val transid: TransactionId = tid
def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = {
val msg = CompletionMessage(transid, res, instance)
-
producer.send(s"completed${controllerInstance.toInt}", msg).andThen {
case Success(_) =>
logging.info(
@@ -125,6 +126,30 @@ class InvokerReactive(
s"posted ${if (recovery) "recovery" else "completion"} of activation ${activationResult.activationId}")
}
}
+ // Potentially sends activation metadata to kafka if user events are enabled
+ UserEvents.send(
+ producer, {
+ val activation = Activation(
+ activationResult.namespace + EntityPath.PATHSEP + activationResult.name,
+ activationResult.response.statusCode,
+ activationResult.duration.getOrElse(0),
+ activationResult.annotations.getAs[Long](WhiskActivation.waitTimeAnnotation).getOrElse(0),
+ activationResult.annotations.getAs[Long](WhiskActivation.initTimeAnnotation).getOrElse(0),
+ activationResult.annotations.getAs[String](WhiskActivation.kindAnnotation).getOrElse("unknown_kind"),
+ activationResult.annotations.getAs[Boolean](WhiskActivation.conductorAnnotation).getOrElse(false),
+ activationResult.annotations
+ .getAs[ActionLimits](WhiskActivation.limitsAnnotation)
+ .map(al => al.memory.megabytes)
+ .getOrElse(0),
+ activationResult.annotations.getAs[Boolean](WhiskActivation.causedByAnnotation).getOrElse(false))
+ EventMessage(
+ s"invoker${instance.instance}",
+ activation,
+ activationResult.subject,
+ activationResult.namespace.toString,
+ userId,
+ activation.typeName)
+ })
send(Right(if (blockingInvoke) activationResult else activationResult.withoutLogsOrResult)).recoverWith {
case t if t.getCause.isInstanceOf[RecordTooLargeException] =>
@@ -209,7 +234,7 @@ class InvokerReactive(
val activation = generateFallbackActivation(msg, response)
activationFeed ! MessageFeed.Processed
- ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex)
+ ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex, msg.user.authkey.uuid)
store(msg.transid, activation)
Future.successful(())
}
@@ -219,7 +244,7 @@ class InvokerReactive(
activationFeed ! MessageFeed.Processed
val activation =
generateFallbackActivation(msg, ActivationResponse.applicationError(Messages.namespacesBlacklisted))
- ack(msg.transid, activation, false, msg.rootControllerIndex)
+ ack(msg.transid, activation, false, msg.rootControllerIndex, msg.user.authkey.uuid)
logging.warn(this, s"namespace ${msg.user.namespace} was blocked in invoker.")
Future.successful(())
}
diff --git a/docs/metrics.md b/docs/metrics.md
index 9ba60efa64..3d2bb5e00d 100644
--- a/docs/metrics.md
+++ b/docs/metrics.md
@@ -17,9 +17,14 @@
-->
# Openwhisk Metric Support
-Openwhick contains the capability to send metric information to a statsd server. This capability is disabled per default. Instead metric information is normally written to the log files in logmarker format.
+Openwhisk distinguishes between system and user metrics (events).
-## Configuration
+System metrics typically contain information about system performance and provide a possibility to send them to Kamon or write them to log files in logmarker format. This metrics are typically used by OpenWhisk providers/operators.
+
+User metrics encompass information about action performance which is sent to Kafka in a form of events. These metrics are to be consumed by OpenWhisk users, however they could be also used for billing or audit purpose. It is to be noted that at the moment the events are not directly exposed to the users and require an additional Kakfa Consumer based micro-service for data processing.
+
+## System specific metrics
+### Configuration
Both capabilties can be enabled or disabled separately during deployment via Ansible configuration in the 'goup_vars/all' file of an environment.
@@ -60,7 +65,7 @@ metrics_kamon_statsd_port: '8125'
metrics_log: true
```
-## Testing the statsd metric support
+### Testing the statsd metric support
The Kamon project privides an integrated docker image containing statsd and a connected Grafana dashboard via [this Github project](https://github.com/kamon-io/docker-grafana-graphite). This image is helpful for testing the metrices sent via statsd.
@@ -69,3 +74,39 @@ Please follow these [instructions](https://github.com/kamon-io/docker-grafana-gr
The docker image exposes statsd via the (standard) port 8125 and a Graphana dashboard via port 8080 on your docker host.
The address of your docker host has to be configured in the `metrics_kamon_statsd_host` configuration property.
+
+## User specific metrics
+### Configuration
+User metrics are enabled by default and could be explicitly disabled by setting the following property in one of the Ansible configuration files:
+```
+user_events: false
+```
+
+### Supported events
+Activation is an event that occurs after after each activation. It includes the following execution metadata:
+```
+waitTime - internal system hold time
+initTime - time it took to initialise an action, e.g. docker init
+statusCode - status code of the invocation: 0 - success, 1 - application error, 2 - action developer error, 3 - internal OpenWhisk error
+duration - actual time the action code was running
+kind - action flavor, e.g. nodejs
+conductor - true for conductor backed actions
+memory - maximum memory allowed for action container
+causedBy - true for sequence actions
+```
+Metric is any user specific event produced by the system and it at this moment includes the following information:
+```
+ConcurrentRateLimit - a user has exceeded its limit for concurrent invocations.
+TimedRateLimit - the user has reached its per minute limit for the number of invocations.
+ConcurrentInvocations - the number of in flight invocations per user.
+```
+
+Example events that could be consumed from Kafka.
+Activation:
+```
+{"body":{"statusCode":0,"duration":3,"name":"whisk.system/invokerHealthTestAction0","waitTime":583915671,"conductor":false,"kind":"nodejs:6","initTime":0,"memory": 256, "causedBy": false},"eventType":"Activation","source":"invoker0","subject":"whisk.system","timestamp":1524476122676,"userId":"d0888ad5-5a92-435e-888a-d55a92935e54","namespace":"whisk.system"}
+```
+Metric:
+```
+{"body":{"metricName":"ConcurrentInvocations","metricValue":1},"eventType":"Metric","source":"controller0","subject":"guest","timestamp":1524476104419,"userId":"23bc46b1-71f6-4ed5-8c54-816aa4f8c502","namespace":"guest"}
+```
diff --git a/tests/src/test/resources/application.conf.j2 b/tests/src/test/resources/application.conf.j2
index 6054bda977..62fa019d1e 100644
--- a/tests/src/test/resources/application.conf.j2
+++ b/tests/src/test/resources/application.conf.j2
@@ -58,4 +58,7 @@ whisk {
client-auth = "{{ controller.ssl.clientAuth }}"
}
}
+ user-events {
+ enabled = {{ user_events }}
+ }
}
diff --git a/tests/src/test/scala/whisk/common/UserEventTests.scala b/tests/src/test/scala/whisk/common/UserEventTests.scala
new file mode 100644
index 0000000000..661b07ec84
--- /dev/null
+++ b/tests/src/test/scala/whisk/common/UserEventTests.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.common
+
+import java.nio.charset.StandardCharsets
+
+import akka.actor.ActorSystem
+import common._
+import common.rest.WskRest
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import whisk.connector.kafka.KafkaConsumerConnector
+import whisk.core.WhiskConfig
+import whisk.core.connector.{Activation, EventMessage, Metric}
+
+import scala.concurrent.duration._
+
+@RunWith(classOf[JUnitRunner])
+class UserEventTests extends FlatSpec with Matchers with WskTestHelpers with StreamLogging with BeforeAndAfterAll {
+
+ implicit val wskprops = WskProps()
+ implicit val system = ActorSystem("UserEventTestSystem")
+ val config = new WhiskConfig(WhiskConfig.kafkaHosts)
+
+ val wsk = new WskRest
+
+ val groupid = "kafkatest"
+ val topic = "events"
+ val maxPollInterval = 10.seconds
+
+ val consumer = new KafkaConsumerConnector(config.kafkaHosts, groupid, topic)
+ val testActionsDir = WhiskProperties.getFileRelativeToWhiskHome("tests/dat/actions")
+ behavior of "UserEvents"
+
+ override def afterAll() {
+ consumer.close()
+ }
+
+ if (UserEvents.enabled) {
+ it should "invoke an action and produce user events" in withAssetCleaner(wskprops) { (wp, assetHelper) =>
+ val file = Some(TestUtils.getTestActionFilename("hello.js"))
+ val name = "testUserEvents"
+
+ assetHelper.withCleaner(wsk.action, name, confirmDelete = true) { (action, _) =>
+ action.create(name, file)
+ }
+
+ val run = wsk.action.invoke(name, blocking = true)
+
+ withActivation(wsk.activation, run) { result =>
+ withClue("invoking an action was unsuccessful") {
+ result.response.status shouldBe "success"
+ }
+ }
+ // checking for any metrics to arrive
+ val received =
+ consumer.peek(maxPollInterval).map {
+ case (_, _, _, msg) => EventMessage.parse(new String(msg, StandardCharsets.UTF_8))
+ }
+ received.map(event => {
+ event.body match {
+ case a: Activation =>
+ Seq(a.statusCode) should contain oneOf (0, 1, 2, 3)
+ event.source should fullyMatch regex "invoker\\d+".r
+ case m: Metric =>
+ Seq(m.metricName) should contain oneOf ("ConcurrentInvocations", "ConcurrentRateLimit", "TimedRateLimit")
+ event.source should fullyMatch regex "controller\\d+".r
+ }
+ })
+ // produce at least 2 events - an Activation and a 'ConcurrentInvocations' Metric
+ // >= 2 is due to events that might have potentially occurred in between
+ received.size should be >= 2
+ consumer.commit()
+ }
+
+ }
+
+}
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 0055372f1e..fcde7e70eb 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -150,7 +150,7 @@ class ContainerProxyTests
/** Creates an inspectable version of the ack method, which records all calls in a buffer */
def createAcker(a: ExecutableWhiskAction = action) = LoggedFunction {
- (_: TransactionId, activation: WhiskActivation, _: Boolean, _: InstanceId) =>
+ (_: TransactionId, activation: WhiskActivation, _: Boolean, _: InstanceId, _: UUID) =>
activation.annotations.get("limits") shouldBe Some(a.limits.toJson)
activation.annotations.get("path") shouldBe Some(a.fullyQualifiedName(false).toString.toJson)
activation.annotations.get("kind") shouldBe Some(a.exec.kind.toJson)
@@ -219,7 +219,8 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+ ContainerProxy
+ .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
preWarm(machine)
@@ -255,7 +256,8 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+ ContainerProxy
+ .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
preWarm(machine)
@@ -302,7 +304,8 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+ ContainerProxy
+ .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
preWarm(machine)
@@ -340,7 +343,8 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+ ContainerProxy
+ .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
run(machine, Uninitialized)
@@ -372,7 +376,8 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+ ContainerProxy
+ .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
machine ! Run(noLogsAction, message)
@@ -402,7 +407,8 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+ ContainerProxy
+ .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
@@ -437,7 +443,8 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+ ContainerProxy
+ .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
@@ -476,7 +483,8 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+ ContainerProxy
+ .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
@@ -506,7 +514,8 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+ ContainerProxy
+ .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
@@ -535,7 +544,8 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+ ContainerProxy
+ .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
@@ -568,7 +578,8 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy.props(factory, acker, store, createCollector(), InstanceId(0), poolConfig, pauseGrace = timeout))
+ ContainerProxy
+ .props(factory, acker, store, createCollector(), InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
run(machine, Uninitialized) // first run an activation
timeout(machine) // times out Ready state so container suspends
@@ -603,7 +614,8 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy.props(factory, acker, store, createCollector(), InstanceId(0), poolConfig, pauseGrace = timeout))
+ ContainerProxy
+ .props(factory, acker, store, createCollector(), InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
run(machine, Uninitialized)
timeout(machine) // times out Ready state so container suspends
@@ -639,7 +651,8 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+ ContainerProxy
+ .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
// Start running the action
@@ -690,7 +703,8 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+ ContainerProxy
+ .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
run(machine, Uninitialized)
timeout(machine)
diff --git a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
index 662c6d7fb5..e9df1d530b 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
@@ -67,7 +67,7 @@ protected trait ControllerTestCommon
override implicit val actorSystem = system // defined in ScalatestRouteTest
override val executionContext = actorSystem.dispatcher
- override val whiskConfig = new WhiskConfig(RestApiCommons.requiredProperties)
+ override val whiskConfig = new WhiskConfig(RestApiCommons.requiredProperties ++ WhiskConfig.kafkaHosts)
assert(whiskConfig.isValid)
// initialize runtimes manifest
@@ -75,7 +75,8 @@ protected trait ControllerTestCommon
override val loadBalancer = new DegenerateLoadBalancerService(whiskConfig)
- override lazy val entitlementProvider: EntitlementProvider = new LocalEntitlementProvider(whiskConfig, loadBalancer)
+ override lazy val entitlementProvider: EntitlementProvider =
+ new LocalEntitlementProvider(whiskConfig, loadBalancer, instance)
override val activationIdFactory = new ActivationId.ActivationIdGenerator() {
// need a static activation id to test activations api
diff --git a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
index 59d4b8e2ca..97d5999401 100644
--- a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
@@ -1729,7 +1729,7 @@ trait WebActionsApiBaseTests extends ControllerTestCommon with BeforeAndAfterEac
}
class TestingEntitlementProvider(config: WhiskConfig, loadBalancer: LoadBalancer)
- extends EntitlementProvider(config, loadBalancer) {
+ extends EntitlementProvider(config, loadBalancer, InstanceId(0)) {
protected[core] override def checkThrottles(user: Identity)(implicit transid: TransactionId): Future[Unit] = {
val subject = user.subject
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services