You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ty...@apache.org on 2019/01/16 15:19:33 UTC

[incubator-openwhisk] branch master updated: Enable configuring CosmosDB client per collection (#4198)

This is an automated email from the ASF dual-hosted git repository.

tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 408a06a  Enable configuring CosmosDB client per collection (#4198)
408a06a is described below

commit 408a06aca594520e732cff05a4ce92cd1019d80d
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Wed Jan 16 20:49:26 2019 +0530

    Enable configuring CosmosDB client per collection (#4198)
    
    * Enable entity specific config
---
 common/scala/src/main/resources/application.conf   |  49 +++++++-
 .../database/cosmosdb/CosmosDBArtifactStore.scala  |   5 +
 .../cosmosdb/CosmosDBArtifactStoreProvider.scala   |  27 +++--
 .../core/database/cosmosdb/CosmosDBConfig.scala    |  88 +++++++++++++++
 .../core/database/cosmosdb/CosmosDBUtil.scala      |  10 --
 .../database/cosmosdb/CosmosDBConfigTests.scala    | 125 +++++++++++++++++++++
 .../database/cosmosdb/CosmosDBTestSupport.scala    |   2 +-
 7 files changed, 277 insertions(+), 29 deletions(-)

diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 72f64fa..99cd688 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -154,12 +154,49 @@ whisk {
 
     # CosmosDB related configuration
     # For example:
-    # cosmosdb {
-    #     endpoint   =               # Endpoint URL like https://<account>.documents.azure.com:443/
-    #     key        =               # Access key
-    #     db         =               # Database name
-    #     throughput = 1000          # Throughput configure for each collection within this db
-    #}
+    cosmosdb {
+    #   endpoint          =               # Endpoint URL like https://<account>.documents.azure.com:443/
+    #   key               =               # Access key
+    #   db                =               # Database name
+        # Throughput configured for each collection within this db
+        # This is configured only if collection is created fresh. If collection
+        # already exists then existing throughput would be used
+        throughput        = 1000
+        # Select from one of the supported
+        # https://azure.github.io/azure-cosmosdb-java/1.0.0/com/microsoft/azure/cosmosdb/ConsistencyLevel.html
+        consistency-level = "Session"
+        connection-policy {
+            max-pool-size = 1000
+            # When the value of this property is true, the SDK will direct write operations to
+            # available writable locations of geo-replicated database account
+            using-multiple-write-locations = false
+
+            # Sets the preferred locations for geo-replicated database accounts e.g. "East US"
+            # See names at https://azure.microsoft.com/en-in/global-infrastructure/locations/
+            preferred-locations = []
+            retry-options {
+                # Sets the maximum number of retries in the case where the request fails
+                # because the service has applied rate limiting on the client.
+                max-retry-attempts-on-throttled-requests = 9
+
+                # Sets the maximum retry time
+                # If the cumulative wait time exceeds this SDK will stop retrying and return the
+                # error to the application.
+                max-retry-wait-time                      = 30 s
+            }
+        }
+
+        # Specify entity specific overrides below. By default all config values would be picked from top level. To override
+        # any config option for specific entity specify them below. For example if multiple writes need to be enabled
+        # for activations then
+    #   collections {
+    #       WhiskActivation {            # Add entity specific overrides here
+    #           connection-policy {
+    #              using-multiple-write-locations = true
+    #            }
+    #       }
+    #   }
+    }
 
     # transaction ID related configuration
     transactions {
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
index efa25f0..88ebb36 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
@@ -74,6 +74,11 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
   private val countToken = createToken("count")
   private val putAttachmentToken = createToken("putAttachment", read = false)
 
+  logging.info(
+    this,
+    s"Initializing CosmosDBArtifactStore for collection [$collName]. Service endpoint [${client.getServiceEndpoint}], " +
+      s"Read endpoint [${client.getReadEndpoint}], Write endpoint [${client.getWriteEndpoint}], Connection Policy [${client.getConnectionPolicy}]")
+
   //Clone the returned instance as these are mutable
   def documentCollection(): DocumentCollection = new DocumentCollection(collection.toJson)
 
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala
index a93a6d0..ab59e67 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala
@@ -22,27 +22,24 @@ import java.io.Closeable
 import akka.actor.ActorSystem
 import akka.stream.ActorMaterializer
 import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
-import spray.json.RootJsonFormat
+import com.typesafe.config.ConfigFactory
 import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.ConfigKeys
 import org.apache.openwhisk.core.database._
-import pureconfig._
 import org.apache.openwhisk.core.entity.size._
-import org.apache.openwhisk.core.ConfigKeys
-import org.apache.openwhisk.core.database.cosmosdb.CosmosDBUtil.createClient
 import org.apache.openwhisk.core.entity.{DocumentReader, WhiskActivation, WhiskAuth, WhiskEntity}
+import pureconfig._
+import spray.json.RootJsonFormat
 
 import scala.reflect.ClassTag
 
-case class CosmosDBConfig(endpoint: String, key: String, db: String, throughput: Int = 1000)
-
 case class ClientHolder(client: AsyncDocumentClient) extends Closeable {
   override def close(): Unit = client.close()
 }
 
 object CosmosDBArtifactStoreProvider extends ArtifactStoreProvider {
   type DocumentClientRef = ReferenceCounted[ClientHolder]#CountedReference
-  private lazy val config = loadConfigOrThrow[CosmosDBConfig](ConfigKeys.cosmosdb)
-  private var clientRef: ReferenceCounted[ClientHolder] = _
+  private val clients = collection.mutable.Map[CosmosDBConfig, ReferenceCounted[ClientHolder]]()
 
   override def makeStore[D <: DocumentSerializer: ClassTag](useBatching: Boolean)(
     implicit jsonFormat: RootJsonFormat[D],
@@ -50,6 +47,8 @@ object CosmosDBArtifactStoreProvider extends ArtifactStoreProvider {
     actorSystem: ActorSystem,
     logging: Logging,
     materializer: ActorMaterializer): ArtifactStore[D] = {
+    val tag = implicitly[ClassTag[D]]
+    val config = CosmosDBConfig(ConfigFactory.load(), tag.runtimeClass.getSimpleName)
     makeStoreForClient(config, getOrCreateReference(config), getAttachmentStore())
   }
 
@@ -102,13 +101,17 @@ object CosmosDBArtifactStoreProvider extends ArtifactStoreProvider {
    * Synchronization is required to ensure concurrent init of various store instances share same ref instance
    */
   private def getOrCreateReference(config: CosmosDBConfig) = synchronized {
-    if (clientRef == null || clientRef.isClosed) {
-      clientRef = createReference(config)
+    val clientRef = clients.getOrElseUpdate(config, createReference(config))
+    if (clientRef.isClosed) {
+      val newRef = createReference(config)
+      clients.put(config, newRef)
+      newRef.reference()
+    } else {
+      clientRef.reference()
     }
-    clientRef.reference()
   }
 
   private def createReference(config: CosmosDBConfig) =
-    new ReferenceCounted[ClientHolder](ClientHolder(createClient(config)))
+    new ReferenceCounted[ClientHolder](ClientHolder(config.createClient()))
 
 }
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
new file mode 100644
index 0000000..0c93917
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.cosmosdb
+import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
+import com.microsoft.azure.cosmosdb.{
+  ConsistencyLevel,
+  ConnectionPolicy => JConnectionPolicy,
+  RetryOptions => JRetryOptions
+}
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigUtil.joinPath
+import org.apache.openwhisk.core.ConfigKeys
+import pureconfig._
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+
+case class CosmosDBConfig(endpoint: String,
+                          key: String,
+                          db: String,
+                          throughput: Int,
+                          consistencyLevel: ConsistencyLevel,
+                          connectionPolicy: ConnectionPolicy) {
+
+  def createClient(): AsyncDocumentClient = {
+    new AsyncDocumentClient.Builder()
+      .withServiceEndpoint(endpoint)
+      .withMasterKeyOrResourceToken(key)
+      .withConsistencyLevel(consistencyLevel)
+      .withConnectionPolicy(connectionPolicy.asJava)
+      .build()
+  }
+}
+
+case class ConnectionPolicy(maxPoolSize: Int,
+                            preferredLocations: Seq[String],
+                            usingMultipleWriteLocations: Boolean,
+                            retryOptions: RetryOptions) {
+  def asJava: JConnectionPolicy = {
+    val p = new JConnectionPolicy
+    p.setMaxPoolSize(maxPoolSize)
+    p.setUsingMultipleWriteLocations(usingMultipleWriteLocations)
+    p.setPreferredLocations(preferredLocations.asJava)
+    p.setRetryOptions(retryOptions.asJava)
+    p
+  }
+}
+
+case class RetryOptions(maxRetryAttemptsOnThrottledRequests: Int, maxRetryWaitTime: Duration) {
+  def asJava: JRetryOptions = {
+    val o = new JRetryOptions
+    o.setMaxRetryAttemptsOnThrottledRequests(maxRetryAttemptsOnThrottledRequests)
+    o.setMaxRetryWaitTimeInSeconds(maxRetryWaitTime.toSeconds.toInt)
+    o
+  }
+}
+
+object CosmosDBConfig {
+  val collections = "collections"
+
+  def apply(globalConfig: Config, entityTypeName: String): CosmosDBConfig = {
+    val config = globalConfig.getConfig(ConfigKeys.cosmosdb)
+    val specificConfigPath = joinPath(collections, entityTypeName)
+
+    //Merge config specific to entity with common config
+    val entityConfig = if (config.hasPath(specificConfigPath)) {
+      config.getConfig(specificConfigPath).withFallback(config)
+    } else {
+      config
+    }
+    loadConfigOrThrow[CosmosDBConfig](entityConfig)
+  }
+}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBUtil.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBUtil.scala
index a82e73d..c4237ba 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBUtil.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBUtil.scala
@@ -17,9 +17,7 @@
 
 package org.apache.openwhisk.core.database.cosmosdb
 
-import com.microsoft.azure.cosmosdb._
 import com.microsoft.azure.cosmosdb.internal.Constants.Properties.{AGGREGATE, E_TAG, ID, SELF_LINK}
-import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
 import org.apache.openwhisk.core.database.cosmosdb.CosmosDBConstants._
 
 import scala.collection.immutable.Iterable
@@ -81,14 +79,6 @@ private[cosmosdb] trait CosmosDBUtil {
     case x            => x.toString
   }
 
-  def createClient(config: CosmosDBConfig): AsyncDocumentClient =
-    new AsyncDocumentClient.Builder()
-      .withServiceEndpoint(config.endpoint)
-      .withMasterKeyOrResourceToken(config.key)
-      .withConnectionPolicy(ConnectionPolicy.GetDefault)
-      .withConsistencyLevel(ConsistencyLevel.Session)
-      .build
-
   /**
    * CosmosDB id considers '/', '\' , '?' and '#' as invalid. EntityNames can include '/' so
    * that need to be escaped. For that we use '|' as the replacement char
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfigTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfigTests.scala
new file mode 100644
index 0000000..5e6e1cd
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfigTests.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.cosmosdb
+import com.typesafe.config.ConfigFactory
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+import com.microsoft.azure.cosmosdb.{ConnectionPolicy => JConnectionPolicy}
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[JUnitRunner])
+class CosmosDBConfigTests extends FlatSpec with Matchers {
+  val globalConfig = ConfigFactory.defaultApplication()
+  behavior of "CosmosDB Config"
+
+  it should "match SDK defaults" in {
+    val config = ConfigFactory.parseString(s"""
+      | whisk.cosmosdb {
+      |  endpoint = "http://localhost"
+      |  key = foo
+      |  db  = openwhisk
+      | }
+         """.stripMargin).withFallback(globalConfig)
+    val cosmos = CosmosDBConfig(config, "WhiskAuth")
+
+    //Cosmos SDK does not have equals defined so match them explicitly
+    val policy = cosmos.connectionPolicy.asJava
+    val defaultPolicy = JConnectionPolicy.GetDefault()
+    policy.getConnectionMode shouldBe defaultPolicy.getConnectionMode
+    policy.getEnableEndpointDiscovery shouldBe defaultPolicy.getEnableEndpointDiscovery
+    policy.getIdleConnectionTimeoutInMillis shouldBe defaultPolicy.getIdleConnectionTimeoutInMillis
+    policy.getMaxPoolSize shouldBe defaultPolicy.getMaxPoolSize
+    policy.getPreferredLocations shouldBe defaultPolicy.getPreferredLocations
+    policy.getRequestTimeoutInMillis shouldBe defaultPolicy.getRequestTimeoutInMillis
+    policy.isUsingMultipleWriteLocations shouldBe defaultPolicy.isUsingMultipleWriteLocations
+
+    val retryOpts = policy.getRetryOptions
+    val defaultOpts = defaultPolicy.getRetryOptions
+
+    retryOpts.getMaxRetryAttemptsOnThrottledRequests shouldBe defaultOpts.getMaxRetryAttemptsOnThrottledRequests
+    retryOpts.getMaxRetryWaitTimeInSeconds shouldBe defaultOpts.getMaxRetryWaitTimeInSeconds
+  }
+
+  it should "work with generic config" in {
+    val config = ConfigFactory.parseString(s"""
+      | whisk.cosmosdb {
+      |  endpoint = "http://localhost"
+      |  key = foo
+      |  db  = openwhisk
+      | }
+         """.stripMargin).withFallback(globalConfig)
+    val cosmos = CosmosDBConfig(config, "WhiskAuth")
+    cosmos should matchPattern { case CosmosDBConfig("http://localhost", "foo", "openwhisk", _, _, _) => }
+  }
+
+  it should "work with extended config" in {
+    val config = ConfigFactory.parseString(s"""
+      | whisk.cosmosdb {
+      |  endpoint = "http://localhost"
+      |  key = foo
+      |  db  = openwhisk
+      |  connection-policy {
+      |     max-pool-size = 42
+      |  }
+      | }
+         """.stripMargin).withFallback(globalConfig)
+    val cosmos = CosmosDBConfig(config, "WhiskAuth")
+    cosmos should matchPattern { case CosmosDBConfig("http://localhost", "foo", "openwhisk", _, _, _) => }
+
+    cosmos.connectionPolicy.maxPoolSize shouldBe 42
+    val policy = cosmos.connectionPolicy.asJava
+    val defaultPolicy = JConnectionPolicy.GetDefault()
+    policy.getConnectionMode shouldBe defaultPolicy.getConnectionMode
+    policy.getRetryOptions.getMaxRetryAttemptsOnThrottledRequests shouldBe defaultPolicy.getRetryOptions.getMaxRetryAttemptsOnThrottledRequests
+    policy.getRetryOptions.getMaxRetryWaitTimeInSeconds shouldBe defaultPolicy.getRetryOptions.getMaxRetryWaitTimeInSeconds
+  }
+
+  it should "work with specific extended config" in {
+    val config = ConfigFactory.parseString(s"""
+      | whisk.cosmosdb {
+      |  endpoint = "http://localhost"
+      |  key = foo
+      |  db  = openwhisk
+      |  connection-policy {
+      |     max-pool-size = 42
+      |     retry-options {
+      |        max-retry-wait-time = 2 m
+      |     }
+      |  }
+      |  collections {
+      |     WhiskAuth = {
+      |        connection-policy {
+      |           using-multiple-write-locations = true
+      |           preferred-locations = [a, b]
+      |        }
+      |     }
+      |  }
+      | }
+         """.stripMargin).withFallback(globalConfig)
+    val cosmos = CosmosDBConfig(config, "WhiskAuth")
+    cosmos should matchPattern { case CosmosDBConfig("http://localhost", "foo", "openwhisk", _, _, _) => }
+
+    val policy = cosmos.connectionPolicy.asJava
+    policy.isUsingMultipleWriteLocations shouldBe true
+    policy.getMaxPoolSize shouldBe 42
+    policy.getPreferredLocations.asScala.toSeq should contain only ("a", "b")
+    policy.getRetryOptions.getMaxRetryWaitTimeInSeconds shouldBe 120
+  }
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBTestSupport.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBTestSupport.scala
index 87af893..4c50102 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBTestSupport.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBTestSupport.scala
@@ -30,7 +30,7 @@ trait CosmosDBTestSupport extends FlatSpec with BeforeAndAfterAll with RxObserva
   private val dbsToDelete = ListBuffer[Database]()
 
   lazy val storeConfigTry = Try { loadConfigOrThrow[CosmosDBConfig](ConfigKeys.cosmosdb) }
-  lazy val client = CosmosDBUtil.createClient(storeConfig)
+  lazy val client = storeConfig.createClient()
   val useExistingDB = java.lang.Boolean.getBoolean("whisk.cosmosdb.useExistingDB")
 
   def storeConfig = storeConfigTry.get