You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ch...@apache.org on 2019/05/23 03:58:41 UTC

[incubator-openwhisk] branch master updated: Track collection usage metrics for CosmosDB (#4490)

This is an automated email from the ASF dual-hosted git repository.

chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e0c562  Track collection usage metrics for CosmosDB (#4490)
2e0c562 is described below

commit 2e0c5624e4e479f13f565d266625425314eff4c0
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Thu May 23 09:28:28 2019 +0530

    Track collection usage metrics for CosmosDB (#4490)
    
    Record collection usage stats for CosmosDB so as to enable tracking the growth of collection in  terms of storage size, document count and index size over the period of time. It also enables tracking any indexing progress if any change is done in Index configuration.
    
    Note that Count stats are currently not exposed via Azure Portal
    
    Further this commit also enables emitting verbose trace for query when in debug mode. This would simplify any query performance analysis.
    
    Fixes #4489
---
 common/scala/src/main/resources/application.conf   |  5 ++
 .../cosmosdb/CollectionResourceUsage.scala         | 71 ++++++++++++++++++++++
 .../database/cosmosdb/CosmosDBArtifactStore.scala  | 48 ++++++++++++++-
 .../core/database/cosmosdb/CosmosDBConfig.scala    |  3 +-
 .../cosmosdb/CollectionResourceUsageTests.scala    | 45 ++++++++++++++
 .../cosmosdb/CosmosDBArtifactStoreTests.scala      |  9 +++
 6 files changed, 178 insertions(+), 3 deletions(-)

diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 0c547c9..9d122b7 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -209,6 +209,11 @@ whisk {
         # it would be marked deleted by setting `_deleted` property to true and then actual delete
         # happens via TTL.
         # soft-delete-ttl   = 10 h
+
+        # Frequency at which collection resource usage info like collection size, document count etc is recorded
+        # and exposed as metrics. If any reindexing is in progress then its progress would be logged with this frequency
+        record-usage-frequency = 10 m
+
         connection-policy {
             max-pool-size = 1000
             # When the value of this property is true, the SDK will direct write operations to
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CollectionResourceUsage.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CollectionResourceUsage.scala
new file mode 100644
index 0000000..96b7fce
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CollectionResourceUsage.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.cosmosdb
+
+import org.apache.commons.io.FileUtils
+import org.apache.openwhisk.core.entity.ByteSize
+import org.apache.openwhisk.core.entity.SizeUnits.KB
+
+case class CollectionResourceUsage(documentsSize: Option[ByteSize],
+                                   collectionSize: Option[ByteSize],
+                                   documentsCount: Option[Long],
+                                   indexingProgress: Option[Int],
+                                   documentsSizeQuota: Option[ByteSize]) {
+  def indexSize: Option[ByteSize] = {
+    for {
+      ds <- documentsSize
+      cs <- collectionSize
+    } yield cs - ds
+  }
+
+  def asString: String = {
+    List(
+      documentsSize.map(ds => s"documentSize: ${displaySize(ds)}"),
+      indexSize.map(is => s"indexSize: ${displaySize(is)}"),
+      documentsCount.map(dc => s"documentsCount: $dc"),
+      documentsSizeQuota.map(dq => s"collectionSizeQuota: ${displaySize(dq)}")).flatten.mkString(",")
+  }
+
+  private def displaySize(b: ByteSize) = FileUtils.byteCountToDisplaySize(b.toBytes)
+}
+
+object CollectionResourceUsage {
+  val quotaHeader = "x-ms-resource-quota"
+  val usageHeader = "x-ms-resource-usage"
+  val indexHeader = "x-ms-documentdb-collection-index-transformation-progress"
+
+  def apply(responseHeaders: Map[String, String]): Option[CollectionResourceUsage] = {
+    for {
+      quota <- responseHeaders.get(quotaHeader).map(headerValueToMap)
+      usage <- responseHeaders.get(usageHeader).map(headerValueToMap)
+    } yield {
+      CollectionResourceUsage(
+        usage.get("documentsSize").map(_.toLong).map(ByteSize(_, KB)),
+        usage.get("collectionSize").map(_.toLong).map(ByteSize(_, KB)),
+        usage.get("documentsCount").map(_.toLong),
+        responseHeaders.get(indexHeader).map(_.toInt),
+        quota.get("collectionSize").map(_.toLong).map(ByteSize(_, KB)))
+    }
+  }
+
+  private def headerValueToMap(value: String): Map[String, String] = {
+    //storedProcedures=100;triggers=25;functions=25;documentsCount=-1;documentsSize=xxx;collectionSize=xxx
+    val pairs = value.split("=|;").grouped(2)
+    pairs.map { case Array(k, v) => k -> v }.toMap
+  }
+}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
index 16d0705..c635092 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
@@ -27,7 +27,7 @@ import com.microsoft.azure.cosmosdb._
 import com.microsoft.azure.cosmosdb.internal.Constants.Properties
 import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
 import kamon.metric.MeasurementUnit
-import org.apache.openwhisk.common.{LogMarkerToken, Logging, LoggingMarkers, MetricEmitter, TransactionId}
+import org.apache.openwhisk.common.{LogMarkerToken, Logging, LoggingMarkers, MetricEmitter, Scheduler, TransactionId}
 import org.apache.openwhisk.core.database.StoreUtils.{checkDocHasRevision, deserialize, reportFailure}
 import org.apache.openwhisk.core.database._
 import org.apache.openwhisk.core.database.cosmosdb.CosmosDBArtifactStoreProvider.DocumentClientRef
@@ -39,6 +39,7 @@ import spray.json._
 
 import scala.collection.JavaConverters._
 import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration._
 import scala.util.Success
 
 class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected val collName: String,
@@ -70,6 +71,11 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
   private val getToken = createToken("get")
   private val queryToken = createToken("query")
   private val countToken = createToken("count")
+
+  private val documentsSizeToken = createUsageToken("documentsSize", MeasurementUnit.information.kilobytes)
+  private val indexSizeToken = createUsageToken("indexSize", MeasurementUnit.information.kilobytes)
+  private val documentCountToken = createUsageToken("documentCount")
+
   private val softDeleteTTL = config.softDeleteTTL.map(_.toSeconds.toInt)
 
   private val clusterIdValue = config.clusterId.map(JsString(_))
@@ -78,7 +84,12 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
     this,
     s"Initializing CosmosDBArtifactStore for collection [$collName]. Service endpoint [${client.getServiceEndpoint}], " +
       s"Read endpoint [${client.getReadEndpoint}], Write endpoint [${client.getWriteEndpoint}], Connection Policy [${client.getConnectionPolicy}], " +
-      s"Time to live [${collection.getDefaultTimeToLive} secs, clusterId [${config.clusterId}], soft delete TTL [${config.softDeleteTTL}], Consistency Level [${config.consistencyLevel}]")
+      s"Time to live [${collection.getDefaultTimeToLive} secs, clusterId [${config.clusterId}], soft delete TTL [${config.softDeleteTTL}], " +
+      s"Consistency Level [${config.consistencyLevel}], Usage Metric Frequency [${config.recordUsageFrequency}]")
+
+  private val usageMetricRecorder = config.recordUsageFrequency.map { f =>
+    Scheduler.scheduleWaitAtLeast(f, 10.seconds)(() => recordResourceUsage())
+  }
 
   //Clone the returned instance as these are mutable
   def documentCollection(): DocumentCollection = new DocumentCollection(collection.toJson)
@@ -314,6 +325,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
     val queryMetrics = scala.collection.mutable.Buffer[QueryMetrics]()
     if (transid.meta.extraLogging) {
       options.setPopulateQueryMetrics(true)
+      options.setEmitVerboseTracesInQuery(true)
     }
 
     def collectQueryMetrics(r: FeedResponse[Document]): Unit = {
@@ -416,10 +428,36 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
       .getOrElse(Future.successful(true)) // For CosmosDB it is expected that the entire document is deleted.
 
   override def shutdown(): Unit = {
+    //Its async so a chance exist for next scheduled job to still trigger
+    usageMetricRecorder.foreach(system.stop)
     attachmentStore.foreach(_.shutdown())
     clientRef.close()
   }
 
+  def getResourceUsage(): Future[Option[CollectionResourceUsage]] = {
+    val opts = new RequestOptions
+    opts.setPopulateQuotaInfo(true)
+    client
+      .readCollection(collection.getSelfLink, opts)
+      .head()
+      .map(rr => CollectionResourceUsage(rr.getResponseHeaders.asScala.toMap))
+  }
+
+  private def recordResourceUsage() = {
+    getResourceUsage().map { o =>
+      o.foreach { u =>
+        u.documentsCount.foreach(documentCountToken.gauge.set(_))
+        u.documentsSize.foreach(ds => documentsSizeToken.gauge.set(ds.toKB))
+        u.indexSize.foreach(is => indexSizeToken.gauge.set(is.toKB))
+        logging.info(this, s"Collection usage stats for [$collName] are ${u.asString}")
+        u.indexingProgress.foreach { i =>
+          if (i < 100) logging.info(this, s"Indexing for collection [$collName] is at $i%")
+        }
+      }
+      o
+    }
+  }
+
   private def isNotFound[A <: DocumentAbstraction](e: DocumentClientException) =
     e.getStatusCode == StatusCodes.NotFound.intValue
 
@@ -508,6 +546,12 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
     else LogMarkerToken("cosmosdb", "ru", collName, Some(action))(MeasurementUnit.none)
   }
 
+  private def createUsageToken(name: String, unit: MeasurementUnit = MeasurementUnit.none): LogMarkerToken = {
+    val tags = Map("collection" -> collName)
+    if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", name, "used", tags = tags)(unit)
+    else LogMarkerToken("cosmosdb", name, collName)(unit)
+  }
+
   private def isSoftDeleted(doc: Document) = doc.getBoolean(deleted) == true
 
   private def isSoftDeleted(js: JsObject) = js.fields.get(deleted).contains(JsTrue)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
index de0de9d..2dba5f8 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
@@ -39,7 +39,8 @@ case class CosmosDBConfig(endpoint: String,
                           connectionPolicy: ConnectionPolicy,
                           timeToLive: Option[Duration],
                           clusterId: Option[String],
-                          softDeleteTTL: Option[FiniteDuration]) {
+                          softDeleteTTL: Option[FiniteDuration],
+                          recordUsageFrequency: Option[FiniteDuration]) {
 
   def createClient(): AsyncDocumentClient = {
     new AsyncDocumentClient.Builder()
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CollectionResourceUsageTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CollectionResourceUsageTests.scala
new file mode 100644
index 0000000..dd26718
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CollectionResourceUsageTests.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.cosmosdb
+
+import org.apache.openwhisk.core.database.cosmosdb.CollectionResourceUsage.{indexHeader, quotaHeader, usageHeader}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+import org.apache.openwhisk.core.entity.size._
+
+@RunWith(classOf[JUnitRunner])
+class CollectionResourceUsageTests extends FlatSpec with Matchers {
+  behavior of "CollectionInfo"
+
+  it should "populate resource usage info" in {
+    val headers = Map(
+      usageHeader ->
+        "storedProcedures=0;triggers=0;functions=0;documentsCount=5058;documentsSize=780;collectionSize=800",
+      quotaHeader -> "storedProcedures=100;triggers=25;functions=25;documentsCount=-1;documentsSize=335544320;collectionSize=1000",
+      indexHeader -> "42")
+
+    val usage = CollectionResourceUsage(headers).get
+    usage shouldBe CollectionResourceUsage(
+      documentsSize = Some(780.KB),
+      collectionSize = Some(800.KB),
+      documentsCount = Some(5058),
+      indexingProgress = Some(42),
+      documentsSizeQuota = Some(1000.KB))
+  }
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
index af2e8d8..9600f60 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
@@ -117,6 +117,15 @@ class CosmosDBArtifactStoreTests extends FlatSpec with CosmosDBStoreBehaviorBase
     js.get.fields(CosmosDBConstants.clusterId) shouldBe JsString("foo")
   }
 
+  it should "fetch collection usage info" in {
+    val uopt = activationStore.getResourceUsage().futureValue
+    uopt shouldBe defined
+    val u = uopt.get
+    println(u.asString)
+    u.documentsCount shouldBe defined
+    u.documentsSize shouldBe defined
+  }
+
   behavior of "CosmosDB query debug"
 
   it should "log query metrics in debug flow" in {