You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ch...@apache.org on 2019/10/01 17:13:46 UTC
[openwhisk] branch master updated: Capture document size, RU usage,
retry stats for get and put in CosmosDB (#4652)
This is an automated email from the ASF dual-hosted git repository.
chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 3f9a780 Capture document size, RU usage, retry stats for get and put in CosmosDB (#4652)
3f9a780 is described below
commit 3f9a78035d7c3be08d3e3e3c2e783ab5d3530ec7
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Tue Oct 1 22:43:34 2019 +0530
Capture document size, RU usage, retry stats for get and put in CosmosDB (#4652)
* Enable collection of retry stats
* Log document size and RU used for get and put operation
* Include extra logs for transaction with debug mode enabled
* Histogram metric for document size
* Add custom histogram buckets for retry stats
---
common/scala/src/main/resources/application.conf | 14 +++
.../database/cosmosdb/CosmosDBArtifactStore.scala | 61 ++++++---
.../cosmosdb/CosmosDBArtifactStoreProvider.scala | 2 +
.../database/cosmosdb/RetryMetricsCollector.scala | 137 +++++++++++++++++++++
.../cosmosdb/CosmosDBArtifactStoreTests.scala | 49 +++++++-
5 files changed, 242 insertions(+), 21 deletions(-)
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index cfa7845..6b2647b 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -70,6 +70,14 @@ kamon {
prometheus {
# We expose the metrics endpoint over akka http. So default server is disabled
start-embedded-http-server = no
+
+ buckets {
+ custom {
+ //By default retry are configured upto 9. However for certain setups we may increase
+ //it to higher values
+ "histogram.cosmosdb_retry_success" = [1, 2, 3, 5, 7, 10, 12, 15, 20]
+ }
+ }
}
reporters = [
@@ -244,6 +252,10 @@ whisk {
# and exposed as metrics. If any reindexing is in progress then its progress would be logged with this frequency
record-usage-frequency = 10 m
+ # Flag to enable collection of retry stats. This feature works by registering with Logback to intercept
+ # log messages and based on that collect stats
+ retry-stats-enabled = true
+
connection-policy {
max-pool-size = 1000
# When the value of this property is true, the SDK will direct write operations to
@@ -260,6 +272,8 @@ whisk {
retry-options {
# Sets the maximum number of retries in the case where the request fails
# because the service has applied rate limiting on the client.
+
+ # If this value is changed then adjust the buckets under `kamon.prometehus`
max-retry-attempts-on-throttled-requests = 9
# Sets the maximum retry time
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
index 1ddc8b8..1ff638f 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
@@ -19,6 +19,7 @@ package org.apache.openwhisk.core.database.cosmosdb
import _root_.rx.RxReactiveStreams
import akka.actor.ActorSystem
+import akka.event.Logging.InfoLevel
import akka.http.scaladsl.model.{ContentType, StatusCodes, Uri}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
@@ -71,6 +72,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
private val getToken = createToken("get")
private val queryToken = createToken("query")
private val countToken = createToken("count")
+ private val docSizeToken = createDocSizeToken()
private val documentsSizeToken = createUsageToken("documentsSize", MeasurementUnit.information.kilobytes)
private val indexSizeToken = createUsageToken("indexSize", MeasurementUnit.information.kilobytes)
@@ -99,7 +101,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
override protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo] = {
val asJson = d.toDocumentRecord
- val doc = toCosmosDoc(asJson)
+ val (doc, docSize) = toCosmosDoc(asJson)
val id = doc.getId
val docinfoStr = s"id: $id, rev: ${doc.getETag}"
val start = transid.started(this, LoggingMarkers.DATABASE_SAVE, s"[PUT] '$collName' saving document: '$docinfoStr'")
@@ -136,7 +138,12 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
}
.transform(
{ r =>
- transid.finished(this, start, s"[PUT] '$collName' completed document: '$docinfoStr'")
+ docSizeToken.histogram.record(docSize)
+ transid.finished(
+ this,
+ start,
+ s"[PUT] '$collName' completed document: '$docinfoStr', size=$docSize, ru=${r.getRequestCharge}${extraLogs(r)}",
+ InfoLevel)
collectMetrics(putToken, r.getRequestCharge)
toDocInfo(r.getResource)
}, {
@@ -158,8 +165,8 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
}
val g = f
.transform(
- { _ =>
- transid.finished(this, start, s"[DEL] '$collName' completed document: '$doc'")
+ { r =>
+ transid.finished(this, start, s"[DEL] '$collName' completed document: '$doc'${extraLogs(r)}", InfoLevel)
true
}, {
case e: DocumentClientException if isNotFound(e) =>
@@ -194,7 +201,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
private def softDeletePut(docInfo: DocInfo, js: JsObject)(implicit transid: TransactionId) = {
val deletedJs = transform(js, Seq((deleted, Some(JsTrue))))
- val doc = toCosmosDoc(deletedJs)
+ val (doc, _) = toCosmosDoc(deletedJs)
softDeleteTTL.foreach(doc.setTimeToLive(_))
val f = client.replaceDocument(doc, matchRevOption(docInfo)).head()
f.foreach(r => collectMetrics(putToken, r.getRequestCharge))
@@ -220,8 +227,13 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
// for compatibility
throw NoDocumentException("not found on 'get'")
} else {
- val js = getResultToWhiskJsonDoc(rr.getResource)
- transid.finished(this, start, s"[GET] '$collName' completed: found document '$doc'")
+ val (js, docSize) = getResultToWhiskJsonDoc(rr.getResource)
+ transid
+ .finished(
+ this,
+ start,
+ s"[GET] '$collName' completed: found document '$doc',size=$docSize, ru=${rr.getRequestCharge}${extraLogs(rr)}",
+ InfoLevel)
deserialize[A, DocumentAbstraction](doc, js)
}
}, {
@@ -254,7 +266,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
transid.finished(this, start, s"[GET_BY_ID] '$collName' completed: '$id' not found")
None
} else {
- val js = getResultToWhiskJsonDoc(rr.getResource)
+ val (js, _) = getResultToWhiskJsonDoc(rr.getResource)
transid.finished(this, start, s"[GET_BY_ID] '$collName' completed: found document '$id'")
Some(js)
}
@@ -292,7 +304,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
.readDocument(selfLinkOf(id), newRequestOption(id))
.head()
.map { rr =>
- val js = getResultToWhiskJsonDoc(rr.getResource)
+ val (js, _) = getResultToWhiskJsonDoc(rr.getResource)
collectMetrics(getToken, rr.getRequestCharge)
js
}
@@ -358,7 +370,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
this,
s"[QueryMetricsEnabled] Collection [$collName] - Query [${querySpec.getQueryText}].\nQueryMetrics\n[$combinedMetrics]")
}
- transid.finished(this, start, s"[QUERY] '$collName' completed: matched ${out.size}")
+ transid.finished(this, start, s"[QUERY] '$collName' completed: matched ${out.size}", InfoLevel)
}
reportFailure(g, start, failure => s"[QUERY] '$collName' internal error, failure: '${failure.getMessage}'")
}
@@ -465,7 +477,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
e.getStatusCode == StatusCodes.Conflict.intValue || e.getStatusCode == StatusCodes.PreconditionFailed.intValue
}
- private def toCosmosDoc(json: JsObject): Document = {
+ private def toCosmosDoc(json: JsObject): (Document, Int) = {
val computedJs = documentHandler.computedFields(json)
val computedOpt = if (computedJs.fields.nonEmpty) Some(computedJs) else None
val fieldsToAdd =
@@ -476,10 +488,11 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
(clusterId, clusterIdValue))
val fieldsToRemove = Seq(_id, _rev)
val mapped = transform(json, fieldsToAdd, fieldsToRemove)
- val doc = new Document(mapped.compactPrint)
+ val jsonString = mapped.compactPrint
+ val doc = new Document(jsonString)
doc.set(selfLink, createSelfLink(doc.getId))
doc.setTimeToLive(null) //Disable any TTL if in effect for earlier revision
- doc
+ (doc, jsonString.length)
}
private def queryResultToWhiskJsonDoc(doc: Document): JsObject = {
@@ -490,10 +503,12 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
toWhiskJsonDoc(js, id, None)
}
- private def getResultToWhiskJsonDoc(doc: Document): JsObject = {
+ private def getResultToWhiskJsonDoc(doc: Document): (JsObject, Int) = {
checkDoc(doc)
- val js = doc.toJson.parseJson.asJsObject
- toWhiskJsonDoc(js, doc.getId, Some(JsString(doc.getETag)))
+ val jsString = doc.toJson
+ val js = jsString.parseJson.asJsObject
+ val whiskDoc = toWhiskJsonDoc(js, doc.getId, Some(JsString(doc.getETag)))
+ (whiskDoc, jsString.length)
}
private def toDocInfo[T <: Resource](doc: T) = {
@@ -552,9 +567,23 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
else LogMarkerToken("cosmosdb", name, collName)(unit)
}
+ private def createDocSizeToken(): LogMarkerToken = {
+ val unit = MeasurementUnit.information.bytes
+ val name = "doc"
+ val tags = Map("collection" -> collName)
+ if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", name, "size", tags = tags)(unit)
+ else LogMarkerToken("cosmosdb", name, collName)(unit)
+ }
+
private def isSoftDeleted(doc: Document) = doc.getBoolean(deleted) == true
private def isSoftDeleted(js: JsObject) = js.fields.get(deleted).contains(JsTrue)
private def isNewDocument(doc: Document) = doc.getETag == null
+
+ private def extraLogs(r: ResourceResponse[_])(implicit tid: TransactionId): String = {
+ if (tid.meta.extraLogging) {
+ " " + r.getRequestDiagnosticsString
+ } else ""
+ }
}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala
index 72f9f10..7d3ac3e 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala
@@ -41,6 +41,8 @@ object CosmosDBArtifactStoreProvider extends ArtifactStoreProvider {
type DocumentClientRef = ReferenceCounted[ClientHolder]#CountedReference
private val clients = collection.mutable.Map[CosmosDBConfig, ReferenceCounted[ClientHolder]]()
+ RetryMetricsCollector.registerIfEnabled()
+
override def makeStore[D <: DocumentSerializer: ClassTag](useBatching: Boolean)(
implicit jsonFormat: RootJsonFormat[D],
docReader: DocumentReader,
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/RetryMetricsCollector.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/RetryMetricsCollector.scala
new file mode 100644
index 0000000..d5f3d4d
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/RetryMetricsCollector.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.cosmosdb
+
+import akka.event.slf4j.SLF4JLogging
+import ch.qos.logback.classic.LoggerContext
+import ch.qos.logback.classic.spi.ILoggingEvent
+import ch.qos.logback.core.AppenderBase
+import com.microsoft.azure.cosmosdb.rx.internal.ResourceThrottleRetryPolicy
+import org.apache.openwhisk.common.{Counter => WhiskCounter}
+import kamon.metric.{Counter, MeasurementUnit}
+import org.apache.openwhisk.common.{LogMarkerToken, TransactionId}
+import org.apache.openwhisk.core.ConfigKeys
+import org.slf4j.LoggerFactory
+import pureconfig._
+
+import scala.util.Try
+
+object CosmosDBAction extends Enumeration {
+ val Create, Query, Get, Others = Value
+}
+
+object RetryMetricsCollector extends AppenderBase[ILoggingEvent] with SLF4JLogging {
+ import CosmosDBAction._
+ private val tokens =
+ Map(Create -> Token(Create), Query -> Token(Query), Get -> Token(Get), Others -> Token(Others))
+
+ val retryCounter = new WhiskCounter
+ private[cosmosdb] def registerIfEnabled(): Unit = {
+ val enabled = loadConfigOrThrow[Boolean](s"${ConfigKeys.cosmosdb}.retry-stats-enabled")
+ if (enabled) {
+ log.info("Enabling retry metrics collector")
+ register()
+ }
+ }
+
+ /**
+ * CosmosDB uses below log message
+ * ```
+ * logger.warn(
+ * "Operation will be retried after {} milliseconds. Current attempt {}, Cumulative delay {}",
+ * retryDelay.toMillis(),
+ * this.currentAttemptCount,
+ * this.cumulativeRetryDelay,
+ * exception);
+ * ```
+ *
+ */
+ override def append(e: ILoggingEvent): Unit = {
+ val msg = e.getMessage
+ val errorMsg = Option(e.getThrowableProxy).map(_.getMessage).getOrElse(msg)
+ for {
+ success <- isSuccessOrFailedRetry(msg)
+ token <- tokens.get(operationType(errorMsg))
+ } {
+ if (success) {
+ token.success.counter.increment()
+ //Element 1 has the count
+ val attemptCount = getRetryAttempt(e.getArgumentArray, 1)
+ token.success.histogram.record(attemptCount)
+
+ //Used mostly for test mode where tags may be disabled
+ //and test need to determine if count is increased
+ if (!TransactionId.metricsKamonTags) {
+ retryCounter.next()
+ }
+ } else {
+ token.failed.counter.increment()
+ }
+ }
+ }
+
+ def getCounter(opType: CosmosDBAction.Value, retryPassed: Boolean = true): Option[Counter] = {
+ tokens.get(opType).map(t => if (retryPassed) t.success else t.failed).map { _.counter }
+ }
+
+ private def getRetryAttempt(args: Array[AnyRef], index: Int) = {
+ val t = Try {
+ if (args != null & args.length > index) {
+ args(index) match {
+ case n: Number => n.intValue()
+ case _ => 0
+ }
+ } else 0
+ }
+ t.getOrElse(0)
+ }
+
+ private def register(): Unit = {
+ val logCtx = LoggerFactory.getILoggerFactory.asInstanceOf[LoggerContext]
+ val retryLogger = logCtx.getLogger(classOf[ResourceThrottleRetryPolicy].getName)
+ start()
+ retryLogger.addAppender(this)
+ }
+
+ private def isSuccessOrFailedRetry(msg: String) = {
+ if (msg.startsWith("Operation will be retried after")) Some(true)
+ else if (msg.startsWith("Operation will NOT be retried")) Some(false)
+ else None
+ }
+
+ private def operationType(errorMsg: String) = {
+ if (errorMsg.contains("OperationType: Query")) Query
+ else if (errorMsg.contains("OperationType: Create")) Create
+ else if (errorMsg.contains("OperationType: Get")) Get
+ else Others
+ }
+
+ private def createToken(opType: String, retryPassed: Boolean): LogMarkerToken = {
+ val action = if (retryPassed) "success" else "failed"
+ val tags = Map("type" -> opType)
+ if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", "retry", action, tags = tags)(MeasurementUnit.none)
+ else LogMarkerToken("cosmosdb", "retry", action, Some(opType))(MeasurementUnit.none)
+ }
+
+ private case class Token(success: LogMarkerToken, failed: LogMarkerToken)
+
+ private object Token {
+ def apply(opType: CosmosDBAction.Value): Token =
+ new Token(createToken(opType.toString, retryPassed = true), createToken(opType.toString, retryPassed = false))
+ }
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
index d05c607..0cd5f43 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
@@ -17,9 +17,13 @@
package org.apache.openwhisk.core.database.cosmosdb
+import java.util.concurrent.CountDownLatch
+
+import akka.stream.scaladsl.Source
import com.typesafe.config.ConfigFactory
import io.netty.util.ResourceLeakDetector
import io.netty.util.ResourceLeakDetector.Level
+import kamon.metric.LongAdderCounter
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.database.DocumentSerializer
import org.apache.openwhisk.core.database.memory.MemoryAttachmentStoreProvider
@@ -28,6 +32,7 @@ import org.apache.openwhisk.core.entity.WhiskQueries.TOP
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.entity.{
DocumentReader,
+ Parameters,
WhiskActivation,
WhiskDocumentReader,
WhiskEntity,
@@ -35,14 +40,11 @@ import org.apache.openwhisk.core.entity.{
WhiskPackage
}
import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.FlatSpec
-import spray.json.JsString
-import org.apache.openwhisk.core.entity.size._
-import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
+import spray.json.JsString
+import scala.concurrent.duration._
import scala.reflect.ClassTag
@RunWith(classOf[JUnitRunner])
@@ -154,4 +156,41 @@ class CosmosDBArtifactStoreTests extends FlatSpec with CosmosDBStoreBehaviorBase
stream.toString should include("[QueryMetricsEnabled]")
}
+
+ behavior of "CosmosDB retry metrics"
+
+ it should "capture success retries" in {
+ implicit val tid: TransactionId = TransactionId.testing
+ val bigPkg = WhiskPackage(newNS(), aname(), parameters = Parameters("foo", "x" * 1024 * 1024))
+ val latch = new CountDownLatch(1)
+ val f = Source(1 to 500)
+ .mapAsync(100) { i =>
+ latch.countDown()
+ if (i % 5 == 0) println(i)
+ require(retryCount == 0)
+ entityStore.put(bigPkg)
+ }
+ .runForeach { doc =>
+ docsToDelete += ((entityStore, doc))
+ }
+
+ //Wait for one save operation before checking for stats
+ latch.await()
+ retry(() => f, 500.millis)
+ retryCount should be > 0
+ }
+
+ private def retryCount: Int = {
+ //If KamonTags are disabled then Kamon uses CounterMetricImpl which does not provide
+ //any way of determining the current count. So in those cases the retry collector
+ //would increment a counter
+ if (TransactionId.metricsKamonTags) {
+ RetryMetricsCollector.getCounter(CosmosDBAction.Create) match {
+ case Some(x: LongAdderCounter) => x.snapshot(false).value.toInt
+ case _ => 0
+ }
+ } else {
+ RetryMetricsCollector.retryCounter.cur.toInt
+ }
+ }
}