You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ch...@apache.org on 2019/10/01 17:13:46 UTC

[openwhisk] branch master updated: Capture document size, RU usage, retry stats for get and put in CosmosDB (#4652)

This is an automated email from the ASF dual-hosted git repository.

chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f9a780  Capture document size, RU usage, retry stats for get and put in CosmosDB (#4652)
3f9a780 is described below

commit 3f9a78035d7c3be08d3e3e3c2e783ab5d3530ec7
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Tue Oct 1 22:43:34 2019 +0530

    Capture document size, RU usage, retry stats for get and put in CosmosDB (#4652)
    
    * Enable collection of retry stats
    * Log document size and RU used for get and put operation
    * Include extra logs for transaction with debug mode enabled
    * Histogram metric for document size
    * Add custom histogram buckets for retry stats
---
 common/scala/src/main/resources/application.conf   |  14 +++
 .../database/cosmosdb/CosmosDBArtifactStore.scala  |  61 ++++++---
 .../cosmosdb/CosmosDBArtifactStoreProvider.scala   |   2 +
 .../database/cosmosdb/RetryMetricsCollector.scala  | 137 +++++++++++++++++++++
 .../cosmosdb/CosmosDBArtifactStoreTests.scala      |  49 +++++++-
 5 files changed, 242 insertions(+), 21 deletions(-)

diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index cfa7845..6b2647b 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -70,6 +70,14 @@ kamon {
     prometheus {
         # We expose the metrics endpoint over akka http. So default server is disabled
         start-embedded-http-server = no
+
+        buckets {
+            custom {
+                //By default retry are configured upto 9. However for certain setups we may increase
+                //it to higher values
+                "histogram.cosmosdb_retry_success" = [1, 2, 3, 5, 7, 10, 12, 15, 20]
+            }
+        }
     }
 
     reporters = [
@@ -244,6 +252,10 @@ whisk {
         # and exposed as metrics. If any reindexing is in progress then its progress would be logged with this frequency
         record-usage-frequency = 10 m
 
+        # Flag to enable collection of retry stats. This feature works by registering with Logback to intercept
+        # log messages and based on that collect stats
+        retry-stats-enabled = true
+
         connection-policy {
             max-pool-size = 1000
             # When the value of this property is true, the SDK will direct write operations to
@@ -260,6 +272,8 @@ whisk {
             retry-options {
                 # Sets the maximum number of retries in the case where the request fails
                 # because the service has applied rate limiting on the client.
+
+                # If this value is changed then adjust the buckets under `kamon.prometehus`
                 max-retry-attempts-on-throttled-requests = 9
 
                 # Sets the maximum retry time
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
index 1ddc8b8..1ff638f 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
@@ -19,6 +19,7 @@ package org.apache.openwhisk.core.database.cosmosdb
 
 import _root_.rx.RxReactiveStreams
 import akka.actor.ActorSystem
+import akka.event.Logging.InfoLevel
 import akka.http.scaladsl.model.{ContentType, StatusCodes, Uri}
 import akka.stream.ActorMaterializer
 import akka.stream.scaladsl.{Sink, Source}
@@ -71,6 +72,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
   private val getToken = createToken("get")
   private val queryToken = createToken("query")
   private val countToken = createToken("count")
+  private val docSizeToken = createDocSizeToken()
 
   private val documentsSizeToken = createUsageToken("documentsSize", MeasurementUnit.information.kilobytes)
   private val indexSizeToken = createUsageToken("indexSize", MeasurementUnit.information.kilobytes)
@@ -99,7 +101,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
   override protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo] = {
     val asJson = d.toDocumentRecord
 
-    val doc = toCosmosDoc(asJson)
+    val (doc, docSize) = toCosmosDoc(asJson)
     val id = doc.getId
     val docinfoStr = s"id: $id, rev: ${doc.getETag}"
     val start = transid.started(this, LoggingMarkers.DATABASE_SAVE, s"[PUT] '$collName' saving document: '$docinfoStr'")
@@ -136,7 +138,12 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
       }
       .transform(
         { r =>
-          transid.finished(this, start, s"[PUT] '$collName' completed document: '$docinfoStr'")
+          docSizeToken.histogram.record(docSize)
+          transid.finished(
+            this,
+            start,
+            s"[PUT] '$collName' completed document: '$docinfoStr', size=$docSize, ru=${r.getRequestCharge}${extraLogs(r)}",
+            InfoLevel)
           collectMetrics(putToken, r.getRequestCharge)
           toDocInfo(r.getResource)
         }, {
@@ -158,8 +165,8 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
     }
     val g = f
       .transform(
-        { _ =>
-          transid.finished(this, start, s"[DEL] '$collName' completed document: '$doc'")
+        { r =>
+          transid.finished(this, start, s"[DEL] '$collName' completed document: '$doc'${extraLogs(r)}", InfoLevel)
           true
         }, {
           case e: DocumentClientException if isNotFound(e) =>
@@ -194,7 +201,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
 
   private def softDeletePut(docInfo: DocInfo, js: JsObject)(implicit transid: TransactionId) = {
     val deletedJs = transform(js, Seq((deleted, Some(JsTrue))))
-    val doc = toCosmosDoc(deletedJs)
+    val (doc, _) = toCosmosDoc(deletedJs)
     softDeleteTTL.foreach(doc.setTimeToLive(_))
     val f = client.replaceDocument(doc, matchRevOption(docInfo)).head()
     f.foreach(r => collectMetrics(putToken, r.getRequestCharge))
@@ -220,8 +227,13 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
               // for compatibility
               throw NoDocumentException("not found on 'get'")
             } else {
-              val js = getResultToWhiskJsonDoc(rr.getResource)
-              transid.finished(this, start, s"[GET] '$collName' completed: found document '$doc'")
+              val (js, docSize) = getResultToWhiskJsonDoc(rr.getResource)
+              transid
+                .finished(
+                  this,
+                  start,
+                  s"[GET] '$collName' completed: found document '$doc',size=$docSize, ru=${rr.getRequestCharge}${extraLogs(rr)}",
+                  InfoLevel)
               deserialize[A, DocumentAbstraction](doc, js)
             }
           }, {
@@ -254,7 +266,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
           transid.finished(this, start, s"[GET_BY_ID] '$collName' completed: '$id' not found")
           None
         } else {
-          val js = getResultToWhiskJsonDoc(rr.getResource)
+          val (js, _) = getResultToWhiskJsonDoc(rr.getResource)
           transid.finished(this, start, s"[GET_BY_ID] '$collName' completed: found document '$id'")
           Some(js)
         }
@@ -292,7 +304,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
       .readDocument(selfLinkOf(id), newRequestOption(id))
       .head()
       .map { rr =>
-        val js = getResultToWhiskJsonDoc(rr.getResource)
+        val (js, _) = getResultToWhiskJsonDoc(rr.getResource)
         collectMetrics(getToken, rr.getRequestCharge)
         js
       }
@@ -358,7 +370,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
             this,
             s"[QueryMetricsEnabled] Collection [$collName] - Query [${querySpec.getQueryText}].\nQueryMetrics\n[$combinedMetrics]")
         }
-        transid.finished(this, start, s"[QUERY] '$collName' completed: matched ${out.size}")
+        transid.finished(this, start, s"[QUERY] '$collName' completed: matched ${out.size}", InfoLevel)
     }
     reportFailure(g, start, failure => s"[QUERY] '$collName' internal error, failure: '${failure.getMessage}'")
   }
@@ -465,7 +477,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
     e.getStatusCode == StatusCodes.Conflict.intValue || e.getStatusCode == StatusCodes.PreconditionFailed.intValue
   }
 
-  private def toCosmosDoc(json: JsObject): Document = {
+  private def toCosmosDoc(json: JsObject): (Document, Int) = {
     val computedJs = documentHandler.computedFields(json)
     val computedOpt = if (computedJs.fields.nonEmpty) Some(computedJs) else None
     val fieldsToAdd =
@@ -476,10 +488,11 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
         (clusterId, clusterIdValue))
     val fieldsToRemove = Seq(_id, _rev)
     val mapped = transform(json, fieldsToAdd, fieldsToRemove)
-    val doc = new Document(mapped.compactPrint)
+    val jsonString = mapped.compactPrint
+    val doc = new Document(jsonString)
     doc.set(selfLink, createSelfLink(doc.getId))
     doc.setTimeToLive(null) //Disable any TTL if in effect for earlier revision
-    doc
+    (doc, jsonString.length)
   }
 
   private def queryResultToWhiskJsonDoc(doc: Document): JsObject = {
@@ -490,10 +503,12 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
     toWhiskJsonDoc(js, id, None)
   }
 
-  private def getResultToWhiskJsonDoc(doc: Document): JsObject = {
+  private def getResultToWhiskJsonDoc(doc: Document): (JsObject, Int) = {
     checkDoc(doc)
-    val js = doc.toJson.parseJson.asJsObject
-    toWhiskJsonDoc(js, doc.getId, Some(JsString(doc.getETag)))
+    val jsString = doc.toJson
+    val js = jsString.parseJson.asJsObject
+    val whiskDoc = toWhiskJsonDoc(js, doc.getId, Some(JsString(doc.getETag)))
+    (whiskDoc, jsString.length)
   }
 
   private def toDocInfo[T <: Resource](doc: T) = {
@@ -552,9 +567,23 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
     else LogMarkerToken("cosmosdb", name, collName)(unit)
   }
 
+  private def createDocSizeToken(): LogMarkerToken = {
+    val unit = MeasurementUnit.information.bytes
+    val name = "doc"
+    val tags = Map("collection" -> collName)
+    if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", name, "size", tags = tags)(unit)
+    else LogMarkerToken("cosmosdb", name, collName)(unit)
+  }
+
   private def isSoftDeleted(doc: Document) = doc.getBoolean(deleted) == true
 
   private def isSoftDeleted(js: JsObject) = js.fields.get(deleted).contains(JsTrue)
 
   private def isNewDocument(doc: Document) = doc.getETag == null
+
+  private def extraLogs(r: ResourceResponse[_])(implicit tid: TransactionId): String = {
+    if (tid.meta.extraLogging) {
+      " " + r.getRequestDiagnosticsString
+    } else ""
+  }
 }
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala
index 72f9f10..7d3ac3e 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala
@@ -41,6 +41,8 @@ object CosmosDBArtifactStoreProvider extends ArtifactStoreProvider {
   type DocumentClientRef = ReferenceCounted[ClientHolder]#CountedReference
   private val clients = collection.mutable.Map[CosmosDBConfig, ReferenceCounted[ClientHolder]]()
 
+  RetryMetricsCollector.registerIfEnabled()
+
   override def makeStore[D <: DocumentSerializer: ClassTag](useBatching: Boolean)(
     implicit jsonFormat: RootJsonFormat[D],
     docReader: DocumentReader,
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/RetryMetricsCollector.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/RetryMetricsCollector.scala
new file mode 100644
index 0000000..d5f3d4d
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/RetryMetricsCollector.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.cosmosdb
+
+import akka.event.slf4j.SLF4JLogging
+import ch.qos.logback.classic.LoggerContext
+import ch.qos.logback.classic.spi.ILoggingEvent
+import ch.qos.logback.core.AppenderBase
+import com.microsoft.azure.cosmosdb.rx.internal.ResourceThrottleRetryPolicy
+import org.apache.openwhisk.common.{Counter => WhiskCounter}
+import kamon.metric.{Counter, MeasurementUnit}
+import org.apache.openwhisk.common.{LogMarkerToken, TransactionId}
+import org.apache.openwhisk.core.ConfigKeys
+import org.slf4j.LoggerFactory
+import pureconfig._
+
+import scala.util.Try
+
+object CosmosDBAction extends Enumeration {
+  val Create, Query, Get, Others = Value
+}
+
+object RetryMetricsCollector extends AppenderBase[ILoggingEvent] with SLF4JLogging {
+  import CosmosDBAction._
+  private val tokens =
+    Map(Create -> Token(Create), Query -> Token(Query), Get -> Token(Get), Others -> Token(Others))
+
+  val retryCounter = new WhiskCounter
+  private[cosmosdb] def registerIfEnabled(): Unit = {
+    val enabled = loadConfigOrThrow[Boolean](s"${ConfigKeys.cosmosdb}.retry-stats-enabled")
+    if (enabled) {
+      log.info("Enabling retry metrics collector")
+      register()
+    }
+  }
+
+  /**
+   * CosmosDB uses below log message
+   * ```
+   * logger.warn(
+   * "Operation will be retried after {} milliseconds. Current attempt {}, Cumulative delay {}",
+   *                     retryDelay.toMillis(),
+   *                     this.currentAttemptCount,
+   *                     this.cumulativeRetryDelay,
+   * exception);
+   * ```
+   *
+   */
+  override def append(e: ILoggingEvent): Unit = {
+    val msg = e.getMessage
+    val errorMsg = Option(e.getThrowableProxy).map(_.getMessage).getOrElse(msg)
+    for {
+      success <- isSuccessOrFailedRetry(msg)
+      token <- tokens.get(operationType(errorMsg))
+    } {
+      if (success) {
+        token.success.counter.increment()
+        //Element 1 has the count
+        val attemptCount = getRetryAttempt(e.getArgumentArray, 1)
+        token.success.histogram.record(attemptCount)
+
+        //Used mostly for test mode where tags may be disabled
+        //and test need to determine if count is increased
+        if (!TransactionId.metricsKamonTags) {
+          retryCounter.next()
+        }
+      } else {
+        token.failed.counter.increment()
+      }
+    }
+  }
+
+  def getCounter(opType: CosmosDBAction.Value, retryPassed: Boolean = true): Option[Counter] = {
+    tokens.get(opType).map(t => if (retryPassed) t.success else t.failed).map { _.counter }
+  }
+
+  private def getRetryAttempt(args: Array[AnyRef], index: Int) = {
+    val t = Try {
+      if (args != null & args.length > index) {
+        args(index) match {
+          case n: Number => n.intValue()
+          case _         => 0
+        }
+      } else 0
+    }
+    t.getOrElse(0)
+  }
+
+  private def register(): Unit = {
+    val logCtx = LoggerFactory.getILoggerFactory.asInstanceOf[LoggerContext]
+    val retryLogger = logCtx.getLogger(classOf[ResourceThrottleRetryPolicy].getName)
+    start()
+    retryLogger.addAppender(this)
+  }
+
+  private def isSuccessOrFailedRetry(msg: String) = {
+    if (msg.startsWith("Operation will be retried after")) Some(true)
+    else if (msg.startsWith("Operation will NOT be retried")) Some(false)
+    else None
+  }
+
+  private def operationType(errorMsg: String) = {
+    if (errorMsg.contains("OperationType: Query")) Query
+    else if (errorMsg.contains("OperationType: Create")) Create
+    else if (errorMsg.contains("OperationType: Get")) Get
+    else Others
+  }
+
+  private def createToken(opType: String, retryPassed: Boolean): LogMarkerToken = {
+    val action = if (retryPassed) "success" else "failed"
+    val tags = Map("type" -> opType)
+    if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", "retry", action, tags = tags)(MeasurementUnit.none)
+    else LogMarkerToken("cosmosdb", "retry", action, Some(opType))(MeasurementUnit.none)
+  }
+
+  private case class Token(success: LogMarkerToken, failed: LogMarkerToken)
+
+  private object Token {
+    def apply(opType: CosmosDBAction.Value): Token =
+      new Token(createToken(opType.toString, retryPassed = true), createToken(opType.toString, retryPassed = false))
+  }
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
index d05c607..0cd5f43 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
@@ -17,9 +17,13 @@
 
 package org.apache.openwhisk.core.database.cosmosdb
 
+import java.util.concurrent.CountDownLatch
+
+import akka.stream.scaladsl.Source
 import com.typesafe.config.ConfigFactory
 import io.netty.util.ResourceLeakDetector
 import io.netty.util.ResourceLeakDetector.Level
+import kamon.metric.LongAdderCounter
 import org.apache.openwhisk.common.TransactionId
 import org.apache.openwhisk.core.database.DocumentSerializer
 import org.apache.openwhisk.core.database.memory.MemoryAttachmentStoreProvider
@@ -28,6 +32,7 @@ import org.apache.openwhisk.core.entity.WhiskQueries.TOP
 import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.core.entity.{
   DocumentReader,
+  Parameters,
   WhiskActivation,
   WhiskDocumentReader,
   WhiskEntity,
@@ -35,14 +40,11 @@ import org.apache.openwhisk.core.entity.{
   WhiskPackage
 }
 import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.FlatSpec
-import spray.json.JsString
-import org.apache.openwhisk.core.entity.size._
-import org.junit.runner.RunWith
 import org.scalatest.FlatSpec
 import org.scalatest.junit.JUnitRunner
+import spray.json.JsString
 
+import scala.concurrent.duration._
 import scala.reflect.ClassTag
 
 @RunWith(classOf[JUnitRunner])
@@ -154,4 +156,41 @@ class CosmosDBArtifactStoreTests extends FlatSpec with CosmosDBStoreBehaviorBase
     stream.toString should include("[QueryMetricsEnabled]")
 
   }
+
+  behavior of "CosmosDB retry metrics"
+
+  it should "capture success retries" in {
+    implicit val tid: TransactionId = TransactionId.testing
+    val bigPkg = WhiskPackage(newNS(), aname(), parameters = Parameters("foo", "x" * 1024 * 1024))
+    val latch = new CountDownLatch(1)
+    val f = Source(1 to 500)
+      .mapAsync(100) { i =>
+        latch.countDown()
+        if (i % 5 == 0) println(i)
+        require(retryCount == 0)
+        entityStore.put(bigPkg)
+      }
+      .runForeach { doc =>
+        docsToDelete += ((entityStore, doc))
+      }
+
+    //Wait for one save operation before checking for stats
+    latch.await()
+    retry(() => f, 500.millis)
+    retryCount should be > 0
+  }
+
+  private def retryCount: Int = {
+    //If KamonTags are disabled then Kamon uses CounterMetricImpl which does not provide
+    //any way of determining the current count. So in those cases the retry collector
+    //would increment a counter
+    if (TransactionId.metricsKamonTags) {
+      RetryMetricsCollector.getCounter(CosmosDBAction.Create) match {
+        case Some(x: LongAdderCounter) => x.snapshot(false).value.toInt
+        case _                         => 0
+      }
+    } else {
+      RetryMetricsCollector.retryCounter.cur.toInt
+    }
+  }
 }