You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2018/09/18 08:23:01 UTC

[incubator-openwhisk] branch master updated: Emit CosmosDB request usage metric. (#4023)

This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new e0d562e  Emit CosmosDB request usage metric. (#4023)
e0d562e is described below

commit e0d562e19fe22aed49236b7ba32f94b157d459d4
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Tue Sep 18 13:52:55 2018 +0530

    Emit CosmosDB request usage metric. (#4023)
---
 .../src/main/scala/whisk/common/Logging.scala      | 10 +++----
 .../database/cosmosdb/CosmosDBArtifactStore.scala  | 31 +++++++++++++++++++---
 docs/metrics.md                                    | 12 +++++++++
 3 files changed, 45 insertions(+), 8 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala
index 8149774..19642f9 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -182,8 +182,8 @@ case class LogMarkerToken(component: String,
                           subAction: Option[String] = None,
                           tags: Map[String, String] = Map.empty) {
 
-  override def toString = component + "_" + action + "_" + state
-  def toStringWithSubAction =
+  override val toString = component + "_" + action + "_" + state
+  val toStringWithSubAction =
     subAction.map(sa => component + "_" + action + "." + sa + "_" + state).getOrElse(toString)
 
   def asFinish = copy(state = LoggingMarkers.finish)
@@ -212,14 +212,14 @@ object MetricEmitter {
 
   val metrics = Kamon.metrics
 
-  def emitCounterMetric(token: LogMarkerToken): Unit = {
+  def emitCounterMetric(token: LogMarkerToken, times: Long = 1): Unit = {
     if (TransactionId.metricsKamon) {
       if (TransactionId.metricsKamonTags) {
         metrics
           .counter(token.toString, token.tags)
-          .increment(1)
+          .increment(times)
       } else {
-        metrics.counter(token.toStringWithSubAction).increment(1)
+        metrics.counter(token.toStringWithSubAction).increment(times)
       }
     }
   }
diff --git a/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
index 116995e..9292d17 100644
--- a/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
@@ -28,7 +28,7 @@ import akka.util.{ByteString, ByteStringBuilder}
 import com.microsoft.azure.cosmosdb._
 import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
 import spray.json.{DefaultJsonProtocol, JsObject, JsString, JsValue, RootJsonFormat, _}
-import whisk.common.{Logging, LoggingMarkers, TransactionId}
+import whisk.common.{LogMarkerToken, Logging, LoggingMarkers, MetricEmitter, TransactionId}
 import whisk.core.database.StoreUtils.{checkDocHasRevision, deserialize, reportFailure}
 import whisk.core.database._
 import whisk.core.database.cosmosdb.CosmosDBArtifactStoreProvider.DocumentClientRef
@@ -67,6 +67,13 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
   private val _id = "_id"
   private val _rev = "_rev"
 
+  private val putToken = createToken("put", read = false)
+  private val delToken = createToken("del", read = false)
+  private val getToken = createToken("get")
+  private val queryToken = createToken("query")
+  private val countToken = createToken("count")
+  private val putAttachmentToken = createToken("putAttachment", read = false)
+
   override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher
 
   override protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo] = {
@@ -88,6 +95,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
       .transform(
         { r =>
           transid.finished(this, start, s"[PUT] '$collName' completed document: '$docinfoStr'")
+          collectMetrics(putToken, r.getRequestCharge)
           toDocInfo(r.getResource)
         }, {
           case e: DocumentClientException if isConflict(e) =>
@@ -106,8 +114,9 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
       .deleteDocument(selfLinkOf(doc.id), matchRevOption(doc))
       .head()
       .transform(
-        { _ =>
+        { r =>
           transid.finished(this, start, s"[DEL] '$collName' completed document: '$doc'")
+          collectMetrics(delToken, r.getRequestCharge)
           true
         }, {
           case e: DocumentClientException if isNotFound(e) =>
@@ -139,6 +148,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
         { rr =>
           val js = getResultToWhiskJsonDoc(rr.getResource)
           transid.finished(this, start, s"[GET] '$collName' completed: found document '$doc'")
+          collectMetrics(getToken, rr.getRequestCharge)
           deserialize[A, DocumentAbstraction](doc, js)
         }, {
           case e: DocumentClientException if isNotFound(e) =>
@@ -167,6 +177,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
       .map { rr =>
         val js = getResultToWhiskJsonDoc(rr.getResource)
         transid.finished(this, start, s"[GET_BY_ID] '$collName' completed: found document '$id'")
+        collectMetrics(getToken, rr.getRequestCharge)
         Some(js)
       }
       .recoverWith {
@@ -206,6 +217,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
       RxReactiveStreams.toPublisher(client.queryDocuments(collection.getSelfLink, querySpec, newFeedOptions()))
     val f = Source
       .fromPublisher(publisher)
+      .wireTap(Sink.foreach(r => collectMetrics(queryToken, r.getRequestCharge)))
       .mapConcat(asSeq)
       .drop(skip)
       .map(queryResultToWhiskJsonDoc)
@@ -241,6 +253,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
       .map { r =>
         val count = r.getResults.asScala.head.getLong(aggregate).longValue()
         transid.finished(this, start, s"[COUNT] '$collName' completed: count $count")
+        collectMetrics(countToken, r.getRequestCharge)
         if (count > skip) count - skip else 0L
       }
 
@@ -321,9 +334,10 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
       .upsertAttachment(selfLinkOf(doc.id), s, options, matchRevOption(doc))
       .head()
       .transform(
-        { _ =>
+        { r =>
           transid
             .finished(this, start, s"[ATT_PUT] '$collName' completed uploading attachment '$name' of document '$doc'")
+          collectMetrics(putAttachmentToken, r.getRequestCharge)
           doc //Adding attachment does not change the revision of document. So retain the doc info
         }, {
           case e: DocumentClientException if isConflict(e) =>
@@ -495,4 +509,15 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
     require(doc.getId != null, s"$doc does not have id field set")
     require(doc.getETag != null, s"$doc does not have etag field set")
   }
+
+  private def collectMetrics(token: LogMarkerToken, charge: Double): Unit = {
+    MetricEmitter.emitCounterMetric(token, Math.round(charge))
+  }
+
+  private def createToken(action: String, read: Boolean = true): LogMarkerToken = {
+    val mode = if (read) "read" else "write"
+    val tags = Map("action" -> action, "mode" -> mode, "collection" -> collName)
+    if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", "ru", "used", tags = tags)
+    else LogMarkerToken("cosmosdb", "ru", collName, Some(action))
+  }
 }
diff --git a/docs/metrics.md b/docs/metrics.md
index 33e2b7e..0e96321 100644
--- a/docs/metrics.md
+++ b/docs/metrics.md
@@ -256,6 +256,18 @@ Operation Types
 * `saveDocument`
 * `saveDocumentBulk`
 
+#### CosmosDB RU Metrics
+
+When database used is CosmosDB then metrics related to CosmosDB Resource Units is also emitted.
+
+If Kamon tags are enabled then metric name is `openwhisk.counter.cosmosdb_ru_used` with following tags
+
+- `mode` - `read` or `write`
+- `collection` - Name of collection. Example `activations`, `whisks` and `subjects`
+- `action` - Type of operation performed. Example `get`, `put`, `del`, `query` and `count`
+
+If Kamon tags are not enabled then metric name is of the form `openwhisk.counter.cosmosdb.ru.<collecton>.<action>`
+
 ## User specific metrics
 ### Configuration
 User metrics are enabled by default and could be explicitly disabled by setting the following property in one of the Ansible configuration files: