You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by dr...@apache.org on 2019/05/21 17:08:38 UTC
[incubator-openwhisk] branch master updated: Switch to consistent
indexing policy for CosmosDB (#4484)
This is an automated email from the ASF dual-hosted git repository.
dragos pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new ed08ed1 Switch to consistent indexing policy for CosmosDB (#4484)
ed08ed1 is described below
commit ed08ed164bb3eca38f5cdd3549fc6097d2532707
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Tue May 21 22:38:27 2019 +0530
Switch to consistent indexing policy for CosmosDB (#4484)
* Switch to consistent indexing policy
* Remove reference to Range index
* Tweak indexing policy comparison to only check for included and excluded path
Do not check for Index type as now there is only one which is Range for our cases
* Use implicit logger
* Excluding root path should be using `/*` instead of `/`
---
.../core/database/cosmosdb/CosmosDBSupport.scala | 25 +++++++--------
.../database/cosmosdb/CosmosDBViewMapper.scala | 26 ++++++++--------
.../core/database/cosmosdb/IndexingPolicy.scala | 27 +++++-----------
.../database/cosmosdb/IndexingPolicyTests.scala | 36 +++++-----------------
4 files changed, 41 insertions(+), 73 deletions(-)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupport.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupport.scala
index da9614f..8032863 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupport.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupport.scala
@@ -19,6 +19,7 @@ package org.apache.openwhisk.core.database.cosmosdb
import com.microsoft.azure.cosmosdb._
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
+import org.apache.openwhisk.common.Logging
import scala.collection.JavaConverters._
import scala.collection.immutable
@@ -29,12 +30,12 @@ private[cosmosdb] trait CosmosDBSupport extends RxObservableImplicits with Cosmo
protected def client: AsyncDocumentClient
protected def viewMapper: CosmosDBViewMapper
- def initialize(): (Database, DocumentCollection) = {
+ def initialize()(implicit logging: Logging): (Database, DocumentCollection) = {
val db = getOrCreateDatabase()
(db, getOrCreateCollection(db))
}
- private def getOrCreateDatabase(): Database = {
+ private def getOrCreateDatabase()(implicit logging: Logging): Database = {
client
.queryDatabases(querySpec(config.db), null)
.blockingOnlyResult()
@@ -43,27 +44,27 @@ private[cosmosdb] trait CosmosDBSupport extends RxObservableImplicits with Cosmo
}
}
- private def getOrCreateCollection(database: Database) = {
+ private def getOrCreateCollection(database: Database)(implicit logging: Logging) = {
client
.queryCollections(database.getSelfLink, querySpec(collName), null)
.blockingOnlyResult()
.map { coll =>
- if (matchingIndexingPolicy(coll)) {
- coll
- } else {
- //Modify the found collection with latest policy as its selfLink is set
- coll.setIndexingPolicy(viewMapper.indexingPolicy.asJava())
- client.replaceCollection(coll, null).blockingResult()
+ val expectedIndexingPolicy = viewMapper.indexingPolicy
+ val existingIndexingPolicy = IndexingPolicy(coll.getIndexingPolicy)
+ if (!IndexingPolicy.isSame(expectedIndexingPolicy, existingIndexingPolicy)) {
+ logging.warn(
+ this,
+ s"Indexing policy for collection [$collName] found to be different." +
+ s"\nExpected - ${expectedIndexingPolicy.asJava().toJson}" +
+ s"\nExisting - ${existingIndexingPolicy.asJava().toJson}")
}
+ coll
}
.getOrElse {
client.createCollection(database.getSelfLink, newDatabaseCollection, dbOptions).blockingResult()
}
}
- private def matchingIndexingPolicy(coll: DocumentCollection): Boolean =
- IndexingPolicy.isSame(viewMapper.indexingPolicy, IndexingPolicy(coll.getIndexingPolicy))
-
private def newDatabaseCollection = {
val defn = new DocumentCollection
defn.setId(collName)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala
index b77d874..ac26965 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala
@@ -20,8 +20,7 @@ package org.apache.openwhisk.core.database.cosmosdb
import java.util.Collections
import com.microsoft.azure.cosmosdb.DataType.{Number, String}
-import com.microsoft.azure.cosmosdb.IndexKind.{Hash, Range}
-import com.microsoft.azure.cosmosdb.IndexingMode.Lazy
+import com.microsoft.azure.cosmosdb.IndexKind.Range
import com.microsoft.azure.cosmosdb.{PartitionKeyDefinition, SqlParameter, SqlParameterCollection, SqlQuerySpec}
import org.apache.openwhisk.core.database.ActivationHandler.NS_PATH
import org.apache.openwhisk.core.database.WhisksHandler.ROOT_NS
@@ -137,9 +136,9 @@ private[cosmosdb] object WhisksViewMapper extends SimpleMapper {
override def indexingPolicy: IndexingPolicy =
IndexingPolicy(
includedPaths = Set(
- IncludedPath(s"/$TYPE/?", Index(Hash, String, -1)),
- IncludedPath(s"/$NS/?", Index(Hash, String, -1)),
- IncludedPath(s"/$computed/$ROOT_NS/?", Index(Hash, String, -1)),
+ IncludedPath(s"/$TYPE/?", Index(Range, String, -1)),
+ IncludedPath(s"/$NS/?", Index(Range, String, -1)),
+ IncludedPath(s"/$computed/$ROOT_NS/?", Index(Range, String, -1)),
IncludedPath(s"/$UPDATED/?", Index(Range, Number, -1))))
override protected def where(ddoc: String,
@@ -193,10 +192,9 @@ private[cosmosdb] object ActivationViewMapper extends SimpleMapper {
override def indexingPolicy: IndexingPolicy =
IndexingPolicy(
- mode = Lazy,
includedPaths = Set(
- IncludedPath(s"/$NS/?", Index(Hash, String, -1)),
- IncludedPath(s"/$computed/$NS_PATH/?", Index(Hash, String, -1)),
+ IncludedPath(s"/$NS/?", Index(Range, String, -1)),
+ IncludedPath(s"/$computed/$NS_PATH/?", Index(Range, String, -1)),
IncludedPath(s"/$START/?", Index(Range, Number, -1))))
override protected def where(ddoc: String,
@@ -255,12 +253,12 @@ private[cosmosdb] object SubjectViewMapper extends CosmosDBViewMapper {
//and keys are bigger
IndexingPolicy(
includedPaths = Set(
- IncludedPath(s"/$UUID/?", Index(Hash, String, -1)),
- IncludedPath(s"/$NSS/[]/$NAME/?", Index(Hash, String, -1)),
- IncludedPath(s"/$SUBJECT/?", Index(Hash, String, -1)),
- IncludedPath(s"/$NSS/[]/$UUID/?", Index(Hash, String, -1)),
- IncludedPath(s"/$CONCURRENT_INVOCATIONS/?", Index(Hash, Number, -1)),
- IncludedPath(s"/$INVOCATIONS_PER_MIN/?", Index(Hash, Number, -1))))
+ IncludedPath(s"/$UUID/?", Index(Range, String, -1)),
+ IncludedPath(s"/$NSS/[]/$NAME/?", Index(Range, String, -1)),
+ IncludedPath(s"/$SUBJECT/?", Index(Range, String, -1)),
+ IncludedPath(s"/$NSS/[]/$UUID/?", Index(Range, String, -1)),
+ IncludedPath(s"/$CONCURRENT_INVOCATIONS/?", Index(Range, Number, -1)),
+ IncludedPath(s"/$INVOCATIONS_PER_MIN/?", Index(Range, Number, -1))))
override def prepareQuery(ddoc: String,
view: String,
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/IndexingPolicy.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/IndexingPolicy.scala
index c10dbfc..8d35f71 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/IndexingPolicy.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/IndexingPolicy.scala
@@ -19,9 +19,7 @@ package org.apache.openwhisk.core.database.cosmosdb
import com.microsoft.azure.cosmosdb.{
DataType,
- HashIndex,
IndexKind,
- IndexingMode,
RangeIndex,
ExcludedPath => JExcludedPath,
IncludedPath => JIncludedPath,
@@ -40,13 +38,11 @@ import scala.collection.JavaConverters._
* needs to be customized
*
*/
-case class IndexingPolicy(mode: IndexingMode = IndexingMode.Consistent,
- includedPaths: Set[IncludedPath],
- excludedPaths: Set[ExcludedPath] = Set(ExcludedPath("/"))) {
+case class IndexingPolicy(includedPaths: Set[IncludedPath],
+ excludedPaths: Set[ExcludedPath] = Set(ExcludedPath("/*"))) {
def asJava(): JIndexingPolicy = {
val policy = new JIndexingPolicy()
- policy.setIndexingMode(mode)
policy.setIncludedPaths(includedPaths.map(_.asJava()).asJava)
policy.setExcludedPaths(excludedPaths.map(_.asJava()).asJava)
policy
@@ -56,7 +52,6 @@ case class IndexingPolicy(mode: IndexingMode = IndexingMode.Consistent,
object IndexingPolicy {
def apply(policy: JIndexingPolicy): IndexingPolicy =
IndexingPolicy(
- policy.getIndexingMode,
policy.getIncludedPaths.asScala.map(IncludedPath(_)).toSet,
policy.getExcludedPaths.asScala.map(ExcludedPath(_)).toSet)
@@ -65,18 +60,14 @@ object IndexingPolicy {
* that at least what we expect is present
*/
def isSame(expected: IndexingPolicy, current: IndexingPolicy): Boolean = {
- expected.mode == current.mode && expected.excludedPaths == current.excludedPaths &&
- matchIncludes(expected.includedPaths, current.includedPaths)
+ epaths(expected.excludedPaths) == epaths(current.excludedPaths) &&
+ ipaths(expected.includedPaths) == ipaths(current.includedPaths)
}
- private def matchIncludes(expected: Set[IncludedPath], current: Set[IncludedPath]): Boolean = {
- expected.size == current.size && expected.forall { i =>
- current.find(_.path == i.path) match {
- case Some(x) => i.indexes.subsetOf(x.indexes)
- case None => false
- }
- }
- }
+ private def ipaths(included: Set[IncludedPath]) = included.map(_.path)
+
+ //CosmosDB seems to add _etag by default in excluded path. So explicitly ignore that in comparison
+ private def epaths(excluded: Set[ExcludedPath]) = excluded.map(_.path).filterNot(_.contains("_etag"))
}
case class IncludedPath(path: String, indexes: Set[Index]) {
@@ -108,7 +99,6 @@ object ExcludedPath {
case class Index(kind: IndexKind, dataType: DataType, precision: Int) {
def asJava(): JIndex = kind match {
- case IndexKind.Hash => JIndex.Hash(dataType, precision)
case IndexKind.Range => JIndex.Range(dataType, precision)
case _ => throw new RuntimeException(s"Unsupported kind $kind")
}
@@ -116,7 +106,6 @@ case class Index(kind: IndexKind, dataType: DataType, precision: Int) {
object Index {
def apply(index: JIndex): Index = index match {
- case i: HashIndex => Index(i.getKind, i.getDataType, i.getPrecision)
case i: RangeIndex => Index(i.getKind, i.getDataType, i.getPrecision)
case _ => throw new RuntimeException(s"Unsupported kind $index")
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/IndexingPolicyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/IndexingPolicyTests.scala
index 2a0fb8b..d739550 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/IndexingPolicyTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/IndexingPolicyTests.scala
@@ -18,8 +18,7 @@
package org.apache.openwhisk.core.database.cosmosdb
import com.microsoft.azure.cosmosdb.DataType.String
-import com.microsoft.azure.cosmosdb.IndexKind.{Hash, Range}
-import com.microsoft.azure.cosmosdb.IndexingMode
+import com.microsoft.azure.cosmosdb.IndexKind.Range
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{FlatSpec, Matchers}
@@ -30,39 +29,21 @@ class IndexingPolicyTests extends FlatSpec with Matchers {
it should "match same instance" in {
val policy =
- IndexingPolicy(mode = IndexingMode.Lazy, includedPaths = Set(IncludedPath("foo", Index(Hash, String, -1))))
+ IndexingPolicy(includedPaths = Set(IncludedPath("foo", Index(Range, String, -1))))
IndexingPolicy.isSame(policy, policy) shouldBe true
}
- it should "match when same path and subset of indexes" in {
- val policy =
- IndexingPolicy(
- mode = IndexingMode.Lazy,
- includedPaths = Set(IncludedPath("foo", Index(Hash, String, -1)), IncludedPath("bar", Index(Hash, String, -1))))
-
- val policy2 =
- IndexingPolicy(
- mode = IndexingMode.Lazy,
- includedPaths = Set(
- IncludedPath("foo", Index(Hash, String, -1)),
- IncludedPath("bar", Set(Index(Hash, String, -1), Index(Range, String, -1)))))
-
- IndexingPolicy.isSame(policy, policy2) shouldBe true
- IndexingPolicy.isSame(policy2, policy) shouldBe false
- }
-
it should "not match when same path are different" in {
val policy =
IndexingPolicy(
- mode = IndexingMode.Lazy,
- includedPaths = Set(IncludedPath("foo", Index(Hash, String, -1)), IncludedPath("bar", Index(Hash, String, -1))))
+ includedPaths =
+ Set(IncludedPath("foo", Index(Range, String, -1)), IncludedPath("bar", Index(Range, String, -1))))
val policy2 =
IndexingPolicy(
- mode = IndexingMode.Lazy,
includedPaths = Set(
- IncludedPath("foo2", Index(Hash, String, -1)),
- IncludedPath("bar", Set(Index(Hash, String, -1), Index(Range, String, -1)))))
+ IncludedPath("foo2", Index(Range, String, -1)),
+ IncludedPath("bar", Set(Index(Range, String, -1), Index(Range, String, -1)))))
IndexingPolicy.isSame(policy, policy2) shouldBe false
}
@@ -70,10 +51,9 @@ class IndexingPolicyTests extends FlatSpec with Matchers {
it should "convert and match java IndexingPolicy" in {
val policy =
IndexingPolicy(
- mode = IndexingMode.Lazy,
includedPaths = Set(
- IncludedPath("foo", Index(Hash, String, -1)),
- IncludedPath("bar", Set(Index(Hash, String, -1), Index(Range, String, -1)))))
+ IncludedPath("foo", Index(Range, String, -1)),
+ IncludedPath("bar", Set(Index(Range, String, -1), Index(Range, String, -1)))))
val jpolicy = policy.asJava()
val policy2 = IndexingPolicy(jpolicy)