You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ty...@apache.org on 2019/01/16 15:19:33 UTC
[incubator-openwhisk] branch master updated: Enable configuring
CosmosDB client per collection (#4198)
This is an automated email from the ASF dual-hosted git repository.
tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 408a06a Enable configuring CosmosDB client per collection (#4198)
408a06a is described below
commit 408a06aca594520e732cff05a4ce92cd1019d80d
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Wed Jan 16 20:49:26 2019 +0530
Enable configuring CosmosDB client per collection (#4198)
* Enable entity specific config
---
common/scala/src/main/resources/application.conf | 49 +++++++-
.../database/cosmosdb/CosmosDBArtifactStore.scala | 5 +
.../cosmosdb/CosmosDBArtifactStoreProvider.scala | 27 +++--
.../core/database/cosmosdb/CosmosDBConfig.scala | 88 +++++++++++++++
.../core/database/cosmosdb/CosmosDBUtil.scala | 10 --
.../database/cosmosdb/CosmosDBConfigTests.scala | 125 +++++++++++++++++++++
.../database/cosmosdb/CosmosDBTestSupport.scala | 2 +-
7 files changed, 277 insertions(+), 29 deletions(-)
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 72f64fa..99cd688 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -154,12 +154,49 @@ whisk {
# CosmosDB related configuration
# For example:
- # cosmosdb {
- # endpoint = # Endpoint URL like https://<account>.documents.azure.com:443/
- # key = # Access key
- # db = # Database name
- # throughput = 1000 # Throughput configure for each collection within this db
- #}
+ cosmosdb {
+ # endpoint = # Endpoint URL like https://<account>.documents.azure.com:443/
+ # key = # Access key
+ # db = # Database name
+ # Throughput configured for each collection within this db
+ # This is configured only if collection is created fresh. If collection
+ # already exists then existing throughput would be used
+ throughput = 1000
+ # Select from one of the supported
+ # https://azure.github.io/azure-cosmosdb-java/1.0.0/com/microsoft/azure/cosmosdb/ConsistencyLevel.html
+ consistency-level = "Session"
+ connection-policy {
+ max-pool-size = 1000
+ # When the value of this property is true, the SDK will direct write operations to
+ # available writable locations of geo-replicated database account
+ using-multiple-write-locations = false
+
+ # Sets the preferred locations for geo-replicated database accounts e.g. "East US"
+ # See names at https://azure.microsoft.com/en-in/global-infrastructure/locations/
+ preferred-locations = []
+ retry-options {
+ # Sets the maximum number of retries in the case where the request fails
+ # because the service has applied rate limiting on the client.
+ max-retry-attempts-on-throttled-requests = 9
+
+ # Sets the maximum retry time
+ # If the cumulative wait time exceeds this SDK will stop retrying and return the
+ # error to the application.
+ max-retry-wait-time = 30 s
+ }
+ }
+
+ # Specify entity specific overrides below. By default all config values would be picked from top level. To override
+ # any config option for specific entity specify them below. For example if multiple writes need to be enabled
+ # for activations then
+ # collections {
+ # WhiskActivation { # Add entity specific overrides here
+ # connection-policy {
+ # using-multiple-write-locations = true
+ # }
+ # }
+ # }
+ }
# transaction ID related configuration
transactions {
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
index efa25f0..88ebb36 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
@@ -74,6 +74,11 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
private val countToken = createToken("count")
private val putAttachmentToken = createToken("putAttachment", read = false)
+ logging.info(
+ this,
+ s"Initializing CosmosDBArtifactStore for collection [$collName]. Service endpoint [${client.getServiceEndpoint}], " +
+ s"Read endpoint [${client.getReadEndpoint}], Write endpoint [${client.getWriteEndpoint}], Connection Policy [${client.getConnectionPolicy}]")
+
//Clone the returned instance as these are mutable
def documentCollection(): DocumentCollection = new DocumentCollection(collection.toJson)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala
index a93a6d0..ab59e67 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala
@@ -22,27 +22,24 @@ import java.io.Closeable
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
-import spray.json.RootJsonFormat
+import com.typesafe.config.ConfigFactory
import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.database._
-import pureconfig._
import org.apache.openwhisk.core.entity.size._
-import org.apache.openwhisk.core.ConfigKeys
-import org.apache.openwhisk.core.database.cosmosdb.CosmosDBUtil.createClient
import org.apache.openwhisk.core.entity.{DocumentReader, WhiskActivation, WhiskAuth, WhiskEntity}
+import pureconfig._
+import spray.json.RootJsonFormat
import scala.reflect.ClassTag
-case class CosmosDBConfig(endpoint: String, key: String, db: String, throughput: Int = 1000)
-
case class ClientHolder(client: AsyncDocumentClient) extends Closeable {
override def close(): Unit = client.close()
}
object CosmosDBArtifactStoreProvider extends ArtifactStoreProvider {
type DocumentClientRef = ReferenceCounted[ClientHolder]#CountedReference
- private lazy val config = loadConfigOrThrow[CosmosDBConfig](ConfigKeys.cosmosdb)
- private var clientRef: ReferenceCounted[ClientHolder] = _
+ private val clients = collection.mutable.Map[CosmosDBConfig, ReferenceCounted[ClientHolder]]()
override def makeStore[D <: DocumentSerializer: ClassTag](useBatching: Boolean)(
implicit jsonFormat: RootJsonFormat[D],
@@ -50,6 +47,8 @@ object CosmosDBArtifactStoreProvider extends ArtifactStoreProvider {
actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): ArtifactStore[D] = {
+ val tag = implicitly[ClassTag[D]]
+ val config = CosmosDBConfig(ConfigFactory.load(), tag.runtimeClass.getSimpleName)
makeStoreForClient(config, getOrCreateReference(config), getAttachmentStore())
}
@@ -102,13 +101,17 @@ object CosmosDBArtifactStoreProvider extends ArtifactStoreProvider {
* Synchronization is required to ensure concurrent init of various store instances share same ref instance
*/
private def getOrCreateReference(config: CosmosDBConfig) = synchronized {
- if (clientRef == null || clientRef.isClosed) {
- clientRef = createReference(config)
+ val clientRef = clients.getOrElseUpdate(config, createReference(config))
+ if (clientRef.isClosed) {
+ val newRef = createReference(config)
+ clients.put(config, newRef)
+ newRef.reference()
+ } else {
+ clientRef.reference()
}
- clientRef.reference()
}
private def createReference(config: CosmosDBConfig) =
- new ReferenceCounted[ClientHolder](ClientHolder(createClient(config)))
+ new ReferenceCounted[ClientHolder](ClientHolder(config.createClient()))
}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
new file mode 100644
index 0000000..0c93917
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.cosmosdb
+import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
+import com.microsoft.azure.cosmosdb.{
+ ConsistencyLevel,
+ ConnectionPolicy => JConnectionPolicy,
+ RetryOptions => JRetryOptions
+}
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigUtil.joinPath
+import org.apache.openwhisk.core.ConfigKeys
+import pureconfig._
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+
+case class CosmosDBConfig(endpoint: String,
+ key: String,
+ db: String,
+ throughput: Int,
+ consistencyLevel: ConsistencyLevel,
+ connectionPolicy: ConnectionPolicy) {
+
+ def createClient(): AsyncDocumentClient = {
+ new AsyncDocumentClient.Builder()
+ .withServiceEndpoint(endpoint)
+ .withMasterKeyOrResourceToken(key)
+ .withConsistencyLevel(consistencyLevel)
+ .withConnectionPolicy(connectionPolicy.asJava)
+ .build()
+ }
+}
+
+case class ConnectionPolicy(maxPoolSize: Int,
+ preferredLocations: Seq[String],
+ usingMultipleWriteLocations: Boolean,
+ retryOptions: RetryOptions) {
+ def asJava: JConnectionPolicy = {
+ val p = new JConnectionPolicy
+ p.setMaxPoolSize(maxPoolSize)
+ p.setUsingMultipleWriteLocations(usingMultipleWriteLocations)
+ p.setPreferredLocations(preferredLocations.asJava)
+ p.setRetryOptions(retryOptions.asJava)
+ p
+ }
+}
+
+case class RetryOptions(maxRetryAttemptsOnThrottledRequests: Int, maxRetryWaitTime: Duration) {
+ def asJava: JRetryOptions = {
+ val o = new JRetryOptions
+ o.setMaxRetryAttemptsOnThrottledRequests(maxRetryAttemptsOnThrottledRequests)
+ o.setMaxRetryWaitTimeInSeconds(maxRetryWaitTime.toSeconds.toInt)
+ o
+ }
+}
+
+object CosmosDBConfig {
+ val collections = "collections"
+
+ def apply(globalConfig: Config, entityTypeName: String): CosmosDBConfig = {
+ val config = globalConfig.getConfig(ConfigKeys.cosmosdb)
+ val specificConfigPath = joinPath(collections, entityTypeName)
+
+ //Merge config specific to entity with common config
+ val entityConfig = if (config.hasPath(specificConfigPath)) {
+ config.getConfig(specificConfigPath).withFallback(config)
+ } else {
+ config
+ }
+ loadConfigOrThrow[CosmosDBConfig](entityConfig)
+ }
+}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBUtil.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBUtil.scala
index a82e73d..c4237ba 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBUtil.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBUtil.scala
@@ -17,9 +17,7 @@
package org.apache.openwhisk.core.database.cosmosdb
-import com.microsoft.azure.cosmosdb._
import com.microsoft.azure.cosmosdb.internal.Constants.Properties.{AGGREGATE, E_TAG, ID, SELF_LINK}
-import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
import org.apache.openwhisk.core.database.cosmosdb.CosmosDBConstants._
import scala.collection.immutable.Iterable
@@ -81,14 +79,6 @@ private[cosmosdb] trait CosmosDBUtil {
case x => x.toString
}
- def createClient(config: CosmosDBConfig): AsyncDocumentClient =
- new AsyncDocumentClient.Builder()
- .withServiceEndpoint(config.endpoint)
- .withMasterKeyOrResourceToken(config.key)
- .withConnectionPolicy(ConnectionPolicy.GetDefault)
- .withConsistencyLevel(ConsistencyLevel.Session)
- .build
-
/**
* CosmosDB id considers '/', '\' , '?' and '#' as invalid. EntityNames can include '/' so
* that need to be escaped. For that we use '|' as the replacement char
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfigTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfigTests.scala
new file mode 100644
index 0000000..5e6e1cd
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfigTests.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.cosmosdb
+import com.typesafe.config.ConfigFactory
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+import com.microsoft.azure.cosmosdb.{ConnectionPolicy => JConnectionPolicy}
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[JUnitRunner])
+class CosmosDBConfigTests extends FlatSpec with Matchers {
+ val globalConfig = ConfigFactory.defaultApplication()
+ behavior of "CosmosDB Config"
+
+ it should "match SDK defaults" in {
+ val config = ConfigFactory.parseString(s"""
+ | whisk.cosmosdb {
+ | endpoint = "http://localhost"
+ | key = foo
+ | db = openwhisk
+ | }
+ """.stripMargin).withFallback(globalConfig)
+ val cosmos = CosmosDBConfig(config, "WhiskAuth")
+
+ //Cosmos SDK does not have equals defined so match them explicitly
+ val policy = cosmos.connectionPolicy.asJava
+ val defaultPolicy = JConnectionPolicy.GetDefault()
+ policy.getConnectionMode shouldBe defaultPolicy.getConnectionMode
+ policy.getEnableEndpointDiscovery shouldBe defaultPolicy.getEnableEndpointDiscovery
+ policy.getIdleConnectionTimeoutInMillis shouldBe defaultPolicy.getIdleConnectionTimeoutInMillis
+ policy.getMaxPoolSize shouldBe defaultPolicy.getMaxPoolSize
+ policy.getPreferredLocations shouldBe defaultPolicy.getPreferredLocations
+ policy.getRequestTimeoutInMillis shouldBe defaultPolicy.getRequestTimeoutInMillis
+ policy.isUsingMultipleWriteLocations shouldBe defaultPolicy.isUsingMultipleWriteLocations
+
+ val retryOpts = policy.getRetryOptions
+ val defaultOpts = defaultPolicy.getRetryOptions
+
+ retryOpts.getMaxRetryAttemptsOnThrottledRequests shouldBe defaultOpts.getMaxRetryAttemptsOnThrottledRequests
+ retryOpts.getMaxRetryWaitTimeInSeconds shouldBe defaultOpts.getMaxRetryWaitTimeInSeconds
+ }
+
+ it should "work with generic config" in {
+ val config = ConfigFactory.parseString(s"""
+ | whisk.cosmosdb {
+ | endpoint = "http://localhost"
+ | key = foo
+ | db = openwhisk
+ | }
+ """.stripMargin).withFallback(globalConfig)
+ val cosmos = CosmosDBConfig(config, "WhiskAuth")
+ cosmos should matchPattern { case CosmosDBConfig("http://localhost", "foo", "openwhisk", _, _, _) => }
+ }
+
+ it should "work with extended config" in {
+ val config = ConfigFactory.parseString(s"""
+ | whisk.cosmosdb {
+ | endpoint = "http://localhost"
+ | key = foo
+ | db = openwhisk
+ | connection-policy {
+ | max-pool-size = 42
+ | }
+ | }
+ """.stripMargin).withFallback(globalConfig)
+ val cosmos = CosmosDBConfig(config, "WhiskAuth")
+ cosmos should matchPattern { case CosmosDBConfig("http://localhost", "foo", "openwhisk", _, _, _) => }
+
+ cosmos.connectionPolicy.maxPoolSize shouldBe 42
+ val policy = cosmos.connectionPolicy.asJava
+ val defaultPolicy = JConnectionPolicy.GetDefault()
+ policy.getConnectionMode shouldBe defaultPolicy.getConnectionMode
+ policy.getRetryOptions.getMaxRetryAttemptsOnThrottledRequests shouldBe defaultPolicy.getRetryOptions.getMaxRetryAttemptsOnThrottledRequests
+ policy.getRetryOptions.getMaxRetryWaitTimeInSeconds shouldBe defaultPolicy.getRetryOptions.getMaxRetryWaitTimeInSeconds
+ }
+
+ it should "work with specific extended config" in {
+ val config = ConfigFactory.parseString(s"""
+ | whisk.cosmosdb {
+ | endpoint = "http://localhost"
+ | key = foo
+ | db = openwhisk
+ | connection-policy {
+ | max-pool-size = 42
+ | retry-options {
+ | max-retry-wait-time = 2 m
+ | }
+ | }
+ | collections {
+ | WhiskAuth = {
+ | connection-policy {
+ | using-multiple-write-locations = true
+ | preferred-locations = [a, b]
+ | }
+ | }
+ | }
+ | }
+ """.stripMargin).withFallback(globalConfig)
+ val cosmos = CosmosDBConfig(config, "WhiskAuth")
+ cosmos should matchPattern { case CosmosDBConfig("http://localhost", "foo", "openwhisk", _, _, _) => }
+
+ val policy = cosmos.connectionPolicy.asJava
+ policy.isUsingMultipleWriteLocations shouldBe true
+ policy.getMaxPoolSize shouldBe 42
+ policy.getPreferredLocations.asScala.toSeq should contain only ("a", "b")
+ policy.getRetryOptions.getMaxRetryWaitTimeInSeconds shouldBe 120
+ }
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBTestSupport.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBTestSupport.scala
index 87af893..4c50102 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBTestSupport.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBTestSupport.scala
@@ -30,7 +30,7 @@ trait CosmosDBTestSupport extends FlatSpec with BeforeAndAfterAll with RxObserva
private val dbsToDelete = ListBuffer[Database]()
lazy val storeConfigTry = Try { loadConfigOrThrow[CosmosDBConfig](ConfigKeys.cosmosdb) }
- lazy val client = CosmosDBUtil.createClient(storeConfig)
+ lazy val client = storeConfig.createClient()
val useExistingDB = java.lang.Boolean.getBoolean("whisk.cosmosdb.useExistingDB")
def storeConfig = storeConfigTry.get