You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by dr...@apache.org on 2019/05/21 17:08:38 UTC

[incubator-openwhisk] branch master updated: Switch to consistent indexing policy for CosmosDB (#4484)

This is an automated email from the ASF dual-hosted git repository.

dragos pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new ed08ed1  Switch to consistent indexing policy for CosmosDB (#4484)
ed08ed1 is described below

commit ed08ed164bb3eca38f5cdd3549fc6097d2532707
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Tue May 21 22:38:27 2019 +0530

    Switch to consistent indexing policy for CosmosDB (#4484)
    
    * Switch to consistent indexing policy
    * Remove reference to Range index
    * Tweak indexing policy comparison to only check for included and excluded path
    Do not check for Index type as now there is only one which is Range for our cases
    * Use implicit logger
    * Excluding root path should be using `/*` instead of `/`
---
 .../core/database/cosmosdb/CosmosDBSupport.scala   | 25 +++++++--------
 .../database/cosmosdb/CosmosDBViewMapper.scala     | 26 ++++++++--------
 .../core/database/cosmosdb/IndexingPolicy.scala    | 27 +++++-----------
 .../database/cosmosdb/IndexingPolicyTests.scala    | 36 +++++-----------------
 4 files changed, 41 insertions(+), 73 deletions(-)

diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupport.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupport.scala
index da9614f..8032863 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupport.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupport.scala
@@ -19,6 +19,7 @@ package org.apache.openwhisk.core.database.cosmosdb
 
 import com.microsoft.azure.cosmosdb._
 import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
+import org.apache.openwhisk.common.Logging
 
 import scala.collection.JavaConverters._
 import scala.collection.immutable
@@ -29,12 +30,12 @@ private[cosmosdb] trait CosmosDBSupport extends RxObservableImplicits with Cosmo
   protected def client: AsyncDocumentClient
   protected def viewMapper: CosmosDBViewMapper
 
-  def initialize(): (Database, DocumentCollection) = {
+  def initialize()(implicit logging: Logging): (Database, DocumentCollection) = {
     val db = getOrCreateDatabase()
     (db, getOrCreateCollection(db))
   }
 
-  private def getOrCreateDatabase(): Database = {
+  private def getOrCreateDatabase()(implicit logging: Logging): Database = {
     client
       .queryDatabases(querySpec(config.db), null)
       .blockingOnlyResult()
@@ -43,27 +44,27 @@ private[cosmosdb] trait CosmosDBSupport extends RxObservableImplicits with Cosmo
       }
   }
 
-  private def getOrCreateCollection(database: Database) = {
+  private def getOrCreateCollection(database: Database)(implicit logging: Logging) = {
     client
       .queryCollections(database.getSelfLink, querySpec(collName), null)
       .blockingOnlyResult()
       .map { coll =>
-        if (matchingIndexingPolicy(coll)) {
-          coll
-        } else {
-          //Modify the found collection with latest policy as its selfLink is set
-          coll.setIndexingPolicy(viewMapper.indexingPolicy.asJava())
-          client.replaceCollection(coll, null).blockingResult()
+        val expectedIndexingPolicy = viewMapper.indexingPolicy
+        val existingIndexingPolicy = IndexingPolicy(coll.getIndexingPolicy)
+        if (!IndexingPolicy.isSame(expectedIndexingPolicy, existingIndexingPolicy)) {
+          logging.warn(
+            this,
+            s"Indexing policy for collection [$collName] found to be different." +
+              s"\nExpected - ${expectedIndexingPolicy.asJava().toJson}" +
+              s"\nExisting - ${existingIndexingPolicy.asJava().toJson}")
         }
+        coll
       }
       .getOrElse {
         client.createCollection(database.getSelfLink, newDatabaseCollection, dbOptions).blockingResult()
       }
   }
 
-  private def matchingIndexingPolicy(coll: DocumentCollection): Boolean =
-    IndexingPolicy.isSame(viewMapper.indexingPolicy, IndexingPolicy(coll.getIndexingPolicy))
-
   private def newDatabaseCollection = {
     val defn = new DocumentCollection
     defn.setId(collName)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala
index b77d874..ac26965 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala
@@ -20,8 +20,7 @@ package org.apache.openwhisk.core.database.cosmosdb
 import java.util.Collections
 
 import com.microsoft.azure.cosmosdb.DataType.{Number, String}
-import com.microsoft.azure.cosmosdb.IndexKind.{Hash, Range}
-import com.microsoft.azure.cosmosdb.IndexingMode.Lazy
+import com.microsoft.azure.cosmosdb.IndexKind.Range
 import com.microsoft.azure.cosmosdb.{PartitionKeyDefinition, SqlParameter, SqlParameterCollection, SqlQuerySpec}
 import org.apache.openwhisk.core.database.ActivationHandler.NS_PATH
 import org.apache.openwhisk.core.database.WhisksHandler.ROOT_NS
@@ -137,9 +136,9 @@ private[cosmosdb] object WhisksViewMapper extends SimpleMapper {
   override def indexingPolicy: IndexingPolicy =
     IndexingPolicy(
       includedPaths = Set(
-        IncludedPath(s"/$TYPE/?", Index(Hash, String, -1)),
-        IncludedPath(s"/$NS/?", Index(Hash, String, -1)),
-        IncludedPath(s"/$computed/$ROOT_NS/?", Index(Hash, String, -1)),
+        IncludedPath(s"/$TYPE/?", Index(Range, String, -1)),
+        IncludedPath(s"/$NS/?", Index(Range, String, -1)),
+        IncludedPath(s"/$computed/$ROOT_NS/?", Index(Range, String, -1)),
         IncludedPath(s"/$UPDATED/?", Index(Range, Number, -1))))
 
   override protected def where(ddoc: String,
@@ -193,10 +192,9 @@ private[cosmosdb] object ActivationViewMapper extends SimpleMapper {
 
   override def indexingPolicy: IndexingPolicy =
     IndexingPolicy(
-      mode = Lazy,
       includedPaths = Set(
-        IncludedPath(s"/$NS/?", Index(Hash, String, -1)),
-        IncludedPath(s"/$computed/$NS_PATH/?", Index(Hash, String, -1)),
+        IncludedPath(s"/$NS/?", Index(Range, String, -1)),
+        IncludedPath(s"/$computed/$NS_PATH/?", Index(Range, String, -1)),
         IncludedPath(s"/$START/?", Index(Range, Number, -1))))
 
   override protected def where(ddoc: String,
@@ -255,12 +253,12 @@ private[cosmosdb] object SubjectViewMapper extends CosmosDBViewMapper {
     //and keys are bigger
     IndexingPolicy(
       includedPaths = Set(
-        IncludedPath(s"/$UUID/?", Index(Hash, String, -1)),
-        IncludedPath(s"/$NSS/[]/$NAME/?", Index(Hash, String, -1)),
-        IncludedPath(s"/$SUBJECT/?", Index(Hash, String, -1)),
-        IncludedPath(s"/$NSS/[]/$UUID/?", Index(Hash, String, -1)),
-        IncludedPath(s"/$CONCURRENT_INVOCATIONS/?", Index(Hash, Number, -1)),
-        IncludedPath(s"/$INVOCATIONS_PER_MIN/?", Index(Hash, Number, -1))))
+        IncludedPath(s"/$UUID/?", Index(Range, String, -1)),
+        IncludedPath(s"/$NSS/[]/$NAME/?", Index(Range, String, -1)),
+        IncludedPath(s"/$SUBJECT/?", Index(Range, String, -1)),
+        IncludedPath(s"/$NSS/[]/$UUID/?", Index(Range, String, -1)),
+        IncludedPath(s"/$CONCURRENT_INVOCATIONS/?", Index(Range, Number, -1)),
+        IncludedPath(s"/$INVOCATIONS_PER_MIN/?", Index(Range, Number, -1))))
 
   override def prepareQuery(ddoc: String,
                             view: String,
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/IndexingPolicy.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/IndexingPolicy.scala
index c10dbfc..8d35f71 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/IndexingPolicy.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/IndexingPolicy.scala
@@ -19,9 +19,7 @@ package org.apache.openwhisk.core.database.cosmosdb
 
 import com.microsoft.azure.cosmosdb.{
   DataType,
-  HashIndex,
   IndexKind,
-  IndexingMode,
   RangeIndex,
   ExcludedPath => JExcludedPath,
   IncludedPath => JIncludedPath,
@@ -40,13 +38,11 @@ import scala.collection.JavaConverters._
  *    needs to be customized
  *
  */
-case class IndexingPolicy(mode: IndexingMode = IndexingMode.Consistent,
-                          includedPaths: Set[IncludedPath],
-                          excludedPaths: Set[ExcludedPath] = Set(ExcludedPath("/"))) {
+case class IndexingPolicy(includedPaths: Set[IncludedPath],
+                          excludedPaths: Set[ExcludedPath] = Set(ExcludedPath("/*"))) {
 
   def asJava(): JIndexingPolicy = {
     val policy = new JIndexingPolicy()
-    policy.setIndexingMode(mode)
     policy.setIncludedPaths(includedPaths.map(_.asJava()).asJava)
     policy.setExcludedPaths(excludedPaths.map(_.asJava()).asJava)
     policy
@@ -56,7 +52,6 @@ case class IndexingPolicy(mode: IndexingMode = IndexingMode.Consistent,
 object IndexingPolicy {
   def apply(policy: JIndexingPolicy): IndexingPolicy =
     IndexingPolicy(
-      policy.getIndexingMode,
       policy.getIncludedPaths.asScala.map(IncludedPath(_)).toSet,
       policy.getExcludedPaths.asScala.map(ExcludedPath(_)).toSet)
 
@@ -65,18 +60,14 @@ object IndexingPolicy {
    * that at least what we expect is present
    */
   def isSame(expected: IndexingPolicy, current: IndexingPolicy): Boolean = {
-    expected.mode == current.mode && expected.excludedPaths == current.excludedPaths &&
-    matchIncludes(expected.includedPaths, current.includedPaths)
+    epaths(expected.excludedPaths) == epaths(current.excludedPaths) &&
+    ipaths(expected.includedPaths) == ipaths(current.includedPaths)
   }
 
-  private def matchIncludes(expected: Set[IncludedPath], current: Set[IncludedPath]): Boolean = {
-    expected.size == current.size && expected.forall { i =>
-      current.find(_.path == i.path) match {
-        case Some(x) => i.indexes.subsetOf(x.indexes)
-        case None    => false
-      }
-    }
-  }
+  private def ipaths(included: Set[IncludedPath]) = included.map(_.path)
+
+  //CosmosDB seems to add _etag by default in excluded path. So explicitly ignore that in comparison
+  private def epaths(excluded: Set[ExcludedPath]) = excluded.map(_.path).filterNot(_.contains("_etag"))
 }
 
 case class IncludedPath(path: String, indexes: Set[Index]) {
@@ -108,7 +99,6 @@ object ExcludedPath {
 
 case class Index(kind: IndexKind, dataType: DataType, precision: Int) {
   def asJava(): JIndex = kind match {
-    case IndexKind.Hash  => JIndex.Hash(dataType, precision)
     case IndexKind.Range => JIndex.Range(dataType, precision)
     case _               => throw new RuntimeException(s"Unsupported kind $kind")
   }
@@ -116,7 +106,6 @@ case class Index(kind: IndexKind, dataType: DataType, precision: Int) {
 
 object Index {
   def apply(index: JIndex): Index = index match {
-    case i: HashIndex  => Index(i.getKind, i.getDataType, i.getPrecision)
     case i: RangeIndex => Index(i.getKind, i.getDataType, i.getPrecision)
     case _             => throw new RuntimeException(s"Unsupported kind $index")
   }
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/IndexingPolicyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/IndexingPolicyTests.scala
index 2a0fb8b..d739550 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/IndexingPolicyTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/IndexingPolicyTests.scala
@@ -18,8 +18,7 @@
 package org.apache.openwhisk.core.database.cosmosdb
 
 import com.microsoft.azure.cosmosdb.DataType.String
-import com.microsoft.azure.cosmosdb.IndexKind.{Hash, Range}
-import com.microsoft.azure.cosmosdb.IndexingMode
+import com.microsoft.azure.cosmosdb.IndexKind.Range
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{FlatSpec, Matchers}
@@ -30,39 +29,21 @@ class IndexingPolicyTests extends FlatSpec with Matchers {
 
   it should "match same instance" in {
     val policy =
-      IndexingPolicy(mode = IndexingMode.Lazy, includedPaths = Set(IncludedPath("foo", Index(Hash, String, -1))))
+      IndexingPolicy(includedPaths = Set(IncludedPath("foo", Index(Range, String, -1))))
     IndexingPolicy.isSame(policy, policy) shouldBe true
   }
 
-  it should "match when same path and subset of indexes" in {
-    val policy =
-      IndexingPolicy(
-        mode = IndexingMode.Lazy,
-        includedPaths = Set(IncludedPath("foo", Index(Hash, String, -1)), IncludedPath("bar", Index(Hash, String, -1))))
-
-    val policy2 =
-      IndexingPolicy(
-        mode = IndexingMode.Lazy,
-        includedPaths = Set(
-          IncludedPath("foo", Index(Hash, String, -1)),
-          IncludedPath("bar", Set(Index(Hash, String, -1), Index(Range, String, -1)))))
-
-    IndexingPolicy.isSame(policy, policy2) shouldBe true
-    IndexingPolicy.isSame(policy2, policy) shouldBe false
-  }
-
   it should "not match when same path are different" in {
     val policy =
       IndexingPolicy(
-        mode = IndexingMode.Lazy,
-        includedPaths = Set(IncludedPath("foo", Index(Hash, String, -1)), IncludedPath("bar", Index(Hash, String, -1))))
+        includedPaths =
+          Set(IncludedPath("foo", Index(Range, String, -1)), IncludedPath("bar", Index(Range, String, -1))))
 
     val policy2 =
       IndexingPolicy(
-        mode = IndexingMode.Lazy,
         includedPaths = Set(
-          IncludedPath("foo2", Index(Hash, String, -1)),
-          IncludedPath("bar", Set(Index(Hash, String, -1), Index(Range, String, -1)))))
+          IncludedPath("foo2", Index(Range, String, -1)),
+          IncludedPath("bar", Set(Index(Range, String, -1), Index(Range, String, -1)))))
 
     IndexingPolicy.isSame(policy, policy2) shouldBe false
   }
@@ -70,10 +51,9 @@ class IndexingPolicyTests extends FlatSpec with Matchers {
   it should "convert and match java IndexingPolicy" in {
     val policy =
       IndexingPolicy(
-        mode = IndexingMode.Lazy,
         includedPaths = Set(
-          IncludedPath("foo", Index(Hash, String, -1)),
-          IncludedPath("bar", Set(Index(Hash, String, -1), Index(Range, String, -1)))))
+          IncludedPath("foo", Index(Range, String, -1)),
+          IncludedPath("bar", Set(Index(Range, String, -1), Index(Range, String, -1)))))
 
     val jpolicy = policy.asJava()
     val policy2 = IndexingPolicy(jpolicy)