You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2018/09/18 08:23:01 UTC
[incubator-openwhisk] branch master updated: Emit CosmosDB request
usage metric. (#4023)
This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new e0d562e Emit CosmosDB request usage metric. (#4023)
e0d562e is described below
commit e0d562e19fe22aed49236b7ba32f94b157d459d4
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Tue Sep 18 13:52:55 2018 +0530
Emit CosmosDB request usage metric. (#4023)
---
.../src/main/scala/whisk/common/Logging.scala | 10 +++----
.../database/cosmosdb/CosmosDBArtifactStore.scala | 31 +++++++++++++++++++---
docs/metrics.md | 12 +++++++++
3 files changed, 45 insertions(+), 8 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala
index 8149774..19642f9 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -182,8 +182,8 @@ case class LogMarkerToken(component: String,
subAction: Option[String] = None,
tags: Map[String, String] = Map.empty) {
- override def toString = component + "_" + action + "_" + state
- def toStringWithSubAction =
+ override val toString = component + "_" + action + "_" + state
+ val toStringWithSubAction =
subAction.map(sa => component + "_" + action + "." + sa + "_" + state).getOrElse(toString)
def asFinish = copy(state = LoggingMarkers.finish)
@@ -212,14 +212,14 @@ object MetricEmitter {
val metrics = Kamon.metrics
- def emitCounterMetric(token: LogMarkerToken): Unit = {
+ def emitCounterMetric(token: LogMarkerToken, times: Long = 1): Unit = {
if (TransactionId.metricsKamon) {
if (TransactionId.metricsKamonTags) {
metrics
.counter(token.toString, token.tags)
- .increment(1)
+ .increment(times)
} else {
- metrics.counter(token.toStringWithSubAction).increment(1)
+ metrics.counter(token.toStringWithSubAction).increment(times)
}
}
}
diff --git a/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
index 116995e..9292d17 100644
--- a/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
@@ -28,7 +28,7 @@ import akka.util.{ByteString, ByteStringBuilder}
import com.microsoft.azure.cosmosdb._
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
import spray.json.{DefaultJsonProtocol, JsObject, JsString, JsValue, RootJsonFormat, _}
-import whisk.common.{Logging, LoggingMarkers, TransactionId}
+import whisk.common.{LogMarkerToken, Logging, LoggingMarkers, MetricEmitter, TransactionId}
import whisk.core.database.StoreUtils.{checkDocHasRevision, deserialize, reportFailure}
import whisk.core.database._
import whisk.core.database.cosmosdb.CosmosDBArtifactStoreProvider.DocumentClientRef
@@ -67,6 +67,13 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
private val _id = "_id"
private val _rev = "_rev"
+ private val putToken = createToken("put", read = false)
+ private val delToken = createToken("del", read = false)
+ private val getToken = createToken("get")
+ private val queryToken = createToken("query")
+ private val countToken = createToken("count")
+ private val putAttachmentToken = createToken("putAttachment", read = false)
+
override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher
override protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo] = {
@@ -88,6 +95,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
.transform(
{ r =>
transid.finished(this, start, s"[PUT] '$collName' completed document: '$docinfoStr'")
+ collectMetrics(putToken, r.getRequestCharge)
toDocInfo(r.getResource)
}, {
case e: DocumentClientException if isConflict(e) =>
@@ -106,8 +114,9 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
.deleteDocument(selfLinkOf(doc.id), matchRevOption(doc))
.head()
.transform(
- { _ =>
+ { r =>
transid.finished(this, start, s"[DEL] '$collName' completed document: '$doc'")
+ collectMetrics(delToken, r.getRequestCharge)
true
}, {
case e: DocumentClientException if isNotFound(e) =>
@@ -139,6 +148,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
{ rr =>
val js = getResultToWhiskJsonDoc(rr.getResource)
transid.finished(this, start, s"[GET] '$collName' completed: found document '$doc'")
+ collectMetrics(getToken, rr.getRequestCharge)
deserialize[A, DocumentAbstraction](doc, js)
}, {
case e: DocumentClientException if isNotFound(e) =>
@@ -167,6 +177,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
.map { rr =>
val js = getResultToWhiskJsonDoc(rr.getResource)
transid.finished(this, start, s"[GET_BY_ID] '$collName' completed: found document '$id'")
+ collectMetrics(getToken, rr.getRequestCharge)
Some(js)
}
.recoverWith {
@@ -206,6 +217,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
RxReactiveStreams.toPublisher(client.queryDocuments(collection.getSelfLink, querySpec, newFeedOptions()))
val f = Source
.fromPublisher(publisher)
+ .wireTap(Sink.foreach(r => collectMetrics(queryToken, r.getRequestCharge)))
.mapConcat(asSeq)
.drop(skip)
.map(queryResultToWhiskJsonDoc)
@@ -241,6 +253,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
.map { r =>
val count = r.getResults.asScala.head.getLong(aggregate).longValue()
transid.finished(this, start, s"[COUNT] '$collName' completed: count $count")
+ collectMetrics(countToken, r.getRequestCharge)
if (count > skip) count - skip else 0L
}
@@ -321,9 +334,10 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
.upsertAttachment(selfLinkOf(doc.id), s, options, matchRevOption(doc))
.head()
.transform(
- { _ =>
+ { r =>
transid
.finished(this, start, s"[ATT_PUT] '$collName' completed uploading attachment '$name' of document '$doc'")
+ collectMetrics(putAttachmentToken, r.getRequestCharge)
doc //Adding attachment does not change the revision of document. So retain the doc info
}, {
case e: DocumentClientException if isConflict(e) =>
@@ -495,4 +509,15 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
require(doc.getId != null, s"$doc does not have id field set")
require(doc.getETag != null, s"$doc does not have etag field set")
}
+
+ private def collectMetrics(token: LogMarkerToken, charge: Double): Unit = {
+ MetricEmitter.emitCounterMetric(token, Math.round(charge))
+ }
+
+ private def createToken(action: String, read: Boolean = true): LogMarkerToken = {
+ val mode = if (read) "read" else "write"
+ val tags = Map("action" -> action, "mode" -> mode, "collection" -> collName)
+ if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", "ru", "used", tags = tags)
+ else LogMarkerToken("cosmosdb", "ru", collName, Some(action))
+ }
}
diff --git a/docs/metrics.md b/docs/metrics.md
index 33e2b7e..0e96321 100644
--- a/docs/metrics.md
+++ b/docs/metrics.md
@@ -256,6 +256,18 @@ Operation Types
* `saveDocument`
* `saveDocumentBulk`
+#### CosmosDB RU Metrics
+
+When database used is CosmosDB then metrics related to CosmosDB Resource Units is also emitted.
+
+If Kamon tags are enabled then metric name is `openwhisk.counter.cosmosdb_ru_used` with following tags
+
+- `mode` - `read` or `write`
+- `collection` - Name of collection. Example `activations`, `whisks` and `subjects`
+- `action` - Type of operation performed. Example `get`, `put`, `del`, `query` and `count`
+
+If Kamon tags are not enabled then metric name is of the form `openwhisk.counter.cosmosdb.ru.<collecton>.<action>`
+
## User specific metrics
### Configuration
User metrics are enabled by default and could be explicitly disabled by setting the following property in one of the Ansible configuration files: