You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2017/12/06 19:57:08 UTC

[GitHub] csantanapr closed pull request #2832: Do not cache invalidate when adding a DB attachment

csantanapr closed pull request #2832: Do not cache invalidate when adding a DB attachment
URL: https://github.com/apache/incubator-openwhisk/pull/2832
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala b/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
index 6aa951552d..59e04879fb 100644
--- a/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
+++ b/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
@@ -24,7 +24,6 @@ import scala.concurrent.Future
 import scala.util.Failure
 import scala.util.Success
 import scala.util.Try
-
 import akka.http.scaladsl.model.ContentType
 import akka.stream.IOResult
 import akka.stream.scaladsl.StreamConverters
@@ -35,49 +34,6 @@ import whisk.core.entity.DocId
 import whisk.core.entity.DocInfo
 import whisk.core.entity.DocRevision
 
-/**
- * A common trait for all records that are stored in the datastore requiring an _id field,
- * the unique document identifier. The _id field on a document must be defined (not null,
- * not empty) before the document is added to the datastore, otherwise the operation will
- * reject the document.
- *
- * The field is writable because a document retrieved from the datastore will write this
- * field. Reading from the datastore requires a nullary constructor and hence a field from
- * the datastore is declared as a var for that purpose.
- */
-trait Document {
-
-  /** The document id, this is the primary key for the document and must be unique. */
-  protected var _id: String = null
-
-  /** The document revision as determined by the datastore; an opaque value. */
-  protected[database] var _rev: String = null
-
-  /** Gets the document id and revision as an instance of DocInfo. */
-  protected[database] def docinfo: DocInfo
-
-  /**
-   * Checks if the document has a valid revision set, in which case
-   * this is an update operation.
-   *
-   * @return true iff document has a valid revision
-   */
-  protected[database] final def update: Boolean = _rev != null
-
-  /**
-   * Confirms the document has a valid id set.
-   *
-   * @return true iff document has a valid id
-   * @throws IllegalArgumentException iff document does not have a valid id
-   */
-  @throws[IllegalArgumentException]
-  protected[database] final def confirmId: Boolean = {
-    require(_id != null, "document id undefined")
-    require(_id.trim.nonEmpty, "document id undefined")
-    true
-  }
-}
-
 /**
  * An interface for modifying the revision number on a document. Hides the details of
  * the revision to some extent while providing a marker interface for operations that
@@ -85,6 +41,9 @@ trait Document {
  */
 protected[core] trait DocumentRevisionProvider {
 
+  /** Gets the document id and revision as an instance of DocInfo. */
+  protected[database] def docinfo: DocInfo
+
   /**
    * Sets the revision number when a document is deserialized from datastore. The
    * _rev is an opaque value, needed to update the record in the datastore. It is
@@ -123,7 +82,7 @@ trait DocumentSerializer {
  * but the get permits a datastore of its super type so that a single datastore client
  * may be used for multiple types (because the types are stored in the same database for example).
  */
-trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] {
+trait DocumentFactory[W <: DocumentRevisionProvider] extends MultipleReadersSingleWriterCache[W, DocInfo] {
 
   /**
    * Puts a record of type W in the datastore.
@@ -149,15 +108,12 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] {
       implicit val ec = db.executionContext
 
       val key = CacheKey(doc)
+      val docInfo = doc.docinfo
 
-      cacheUpdate(doc, key, db.put(doc) map { docinfo =>
-        doc match {
-          // if doc has a revision id, update it with new version
-          case w: DocumentRevisionProvider => w.revision[W](docinfo.rev)
-        }
-        docinfo
+      cacheUpdate(doc, key, db.put(doc) map { newDocInfo =>
+        doc.revision[W](newDocInfo.rev)
+        doc.docinfo
       })
-
     } match {
       case Success(f) => f
       case Failure(t) => Future.failed(t)
@@ -166,7 +122,7 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] {
 
   def attach[Wsuper >: W](
     db: ArtifactStore[Wsuper],
-    doc: DocInfo,
+    doc: W,
     attachmentName: String,
     contentType: ContentType,
     bytes: InputStream)(implicit transid: TransactionId, notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
@@ -178,12 +134,13 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] {
       implicit val logger = db.logging
       implicit val ec = db.executionContext
 
-      val key = CacheKey(doc.id.asDocInfo)
-      // invalidate the key because attachments update the revision;
-      // do not cache the new attachment (controller does not need it)
-      cacheInvalidate(key, {
-        val src = StreamConverters.fromInputStream(() => bytes)
-        db.attach(doc, attachmentName, contentType, src)
+      val key = CacheKey(doc)
+      val docInfo = doc.docinfo
+      val src = StreamConverters.fromInputStream(() => bytes)
+
+      cacheUpdate(doc, key, db.attach(docInfo, attachmentName, contentType, src) map { newDocInfo =>
+        doc.revision[W](newDocInfo.rev)
+        doc.docinfo
       })
     } match {
       case Success(f) => f
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
index 41af1aa1da..afd85cc395 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
@@ -315,7 +315,8 @@ object WhiskAction extends DocumentFactory[WhiskAction] with WhiskEntityQueries[
           val manifest = exec.manifest.attached.get
 
           for (i1 <- super.put(db, newDoc);
-               i2 <- attach[A](db, i1, manifest.attachmentName, manifest.attachmentType, stream)) yield i2
+               i2 <- attach[A](db, newDoc.revision(i1.rev), manifest.attachmentName, manifest.attachmentType, stream))
+            yield i2
 
         case _ =>
           super.put(db, doc)
diff --git a/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala b/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala
index c97ae76f73..4b44ddf66a 100644
--- a/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala
@@ -21,7 +21,6 @@ import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.util.Failure
 import scala.util.Success
-
 import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
 import akka.http.scaladsl.model.StatusCode
 import akka.http.scaladsl.model.StatusCodes.Conflict
@@ -39,13 +38,7 @@ import spray.json.RootJsonFormat
 import whisk.common.Logging
 import whisk.common.TransactionId
 import whisk.core.controller.PostProcess.PostProcessEntity
-import whisk.core.database.ArtifactStore
-import whisk.core.database.ArtifactStoreException
-import whisk.core.database.DocumentConflictException
-import whisk.core.database.DocumentFactory
-import whisk.core.database.DocumentTypeMismatchException
-import whisk.core.database.CacheChangeNotification
-import whisk.core.database.NoDocumentException
+import whisk.core.database._
 import whisk.core.entity.DocId
 import whisk.core.entity.WhiskDocument
 import whisk.core.entity.WhiskEntity
@@ -158,12 +151,13 @@ trait ReadOps extends Directives {
    * - 404 Not Found
    * - 500 Internal Server Error
    */
-  protected def getEntity[A, Au >: A](factory: DocumentFactory[A],
-                                      datastore: ArtifactStore[Au],
-                                      docid: DocId,
-                                      postProcess: Option[PostProcessEntity[A]] = None)(implicit transid: TransactionId,
-                                                                                        format: RootJsonFormat[A],
-                                                                                        ma: Manifest[A]) = {
+  protected def getEntity[A <: DocumentRevisionProvider, Au >: A](factory: DocumentFactory[A],
+                                                                  datastore: ArtifactStore[Au],
+                                                                  docid: DocId,
+                                                                  postProcess: Option[PostProcessEntity[A]] = None)(
+    implicit transid: TransactionId,
+    format: RootJsonFormat[A],
+    ma: Manifest[A]) = {
     onComplete(factory.get(datastore, docid)) {
       case Success(entity) =>
         logging.info(this, s"[GET] entity success")
@@ -196,7 +190,7 @@ trait ReadOps extends Directives {
    * - 404 Not Found
    * - 500 Internal Server Error
    */
-  protected def getEntityAndProject[A, Au >: A](
+  protected def getEntityAndProject[A <: DocumentRevisionProvider, Au >: A](
     factory: DocumentFactory[A],
     datastore: ArtifactStore[Au],
     docid: DocId,
@@ -259,14 +253,14 @@ trait WriteOps extends Directives {
    * - 409 Conflict
    * - 500 Internal Server Error
    */
-  protected def putEntity[A, Au >: A](factory: DocumentFactory[A],
-                                      datastore: ArtifactStore[Au],
-                                      docid: DocId,
-                                      overwrite: Boolean,
-                                      update: A => Future[A],
-                                      create: () => Future[A],
-                                      treatExistsAsConflict: Boolean = true,
-                                      postProcess: Option[PostProcessEntity[A]] = None)(
+  protected def putEntity[A <: DocumentRevisionProvider, Au >: A](factory: DocumentFactory[A],
+                                                                  datastore: ArtifactStore[Au],
+                                                                  docid: DocId,
+                                                                  overwrite: Boolean,
+                                                                  update: A => Future[A],
+                                                                  create: () => Future[A],
+                                                                  treatExistsAsConflict: Boolean = true,
+                                                                  postProcess: Option[PostProcessEntity[A]] = None)(
     implicit transid: TransactionId,
     format: RootJsonFormat[A],
     notifier: Option[CacheChangeNotification],
diff --git a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
index aa60a25072..c841545177 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
@@ -503,68 +503,95 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
     }
   }
 
-  it should "put and then get action from cache" in {
-    val action = WhiskAction(namespace, aname(), jsDefault("??"), Parameters("x", "b"))
-    val content = WhiskActionPut(
-      Some(action.exec),
-      Some(action.parameters),
-      Some(ActionLimitsOption(Some(action.limits.timeout), Some(action.limits.memory), Some(action.limits.logs))))
-    val name = action.name
-
-    // first request invalidates any previous entries and caches new result
-    Put(s"$collectionPath/$name", content) ~> Route.seal(routes(creds)(transid())) ~> check {
-      status should be(OK)
-      val response = responseAs[WhiskAction]
-      response should be(
-        WhiskAction(
-          action.namespace,
-          action.name,
-          action.exec,
-          action.parameters,
-          action.limits,
-          action.version,
-          action.publish,
-          action.annotations ++ Parameters(WhiskAction.execFieldName, NODEJS6)))
-    }
-    stream.toString should include(s"caching ${CacheKey(action)}")
-    stream.reset()
-
-    // second request should fetch from cache
-    Get(s"$collectionPath/$name") ~> Route.seal(routes(creds)(transid())) ~> check {
-      status should be(OK)
-      val response = responseAs[WhiskAction]
-      response should be(
-        WhiskAction(
-          action.namespace,
-          action.name,
-          action.exec,
-          action.parameters,
-          action.limits,
-          action.version,
-          action.publish,
-          action.annotations ++ Parameters(WhiskAction.execFieldName, NODEJS6)))
-    }
-
-    stream.toString should include(s"serving from cache: ${CacheKey(action)}")
-    stream.reset()
-
-    // delete should invalidate cache
-    Delete(s"$collectionPath/$name") ~> Route.seal(routes(creds)(transid())) ~> check {
-      status should be(OK)
-      val response = responseAs[WhiskAction]
-      response should be(
-        WhiskAction(
-          action.namespace,
-          action.name,
-          action.exec,
-          action.parameters,
-          action.limits,
-          action.version,
-          action.publish,
-          action.annotations ++ Parameters(WhiskAction.execFieldName, NODEJS6)))
+  it should "put and then get an action from cache" in {
+    val javaAction =
+      WhiskAction(namespace, aname(), javaDefault("ZHViZWU=", Some("hello")), annotations = Parameters("exec", "java"))
+    val nodeAction = WhiskAction(namespace, aname(), jsDefault("??"), Parameters("x", "b"))
+    val actions = Seq((javaAction, JAVA_DEFAULT), (nodeAction, NODEJS6))
+
+    actions.foreach {
+      case (action, kind) =>
+        val content = WhiskActionPut(
+          Some(action.exec),
+          Some(action.parameters),
+          Some(ActionLimitsOption(Some(action.limits.timeout), Some(action.limits.memory), Some(action.limits.logs))))
+
+        // first request invalidates any previous entries and caches new result
+        Put(s"$collectionPath/${action.name}", content) ~> Route.seal(routes(creds)(transid())) ~> check {
+          status should be(OK)
+          val response = responseAs[WhiskAction]
+          response should be(
+            WhiskAction(
+              action.namespace,
+              action.name,
+              action.exec,
+              action.parameters,
+              action.limits,
+              action.version,
+              action.publish,
+              action.annotations ++ Parameters(WhiskAction.execFieldName, kind)))
+        }
+        stream.toString should include(s"caching ${CacheKey(action)}")
+        stream.toString should not include (s"invalidating ${CacheKey(action)} on delete")
+        stream.reset()
+
+        // second request should fetch from cache
+        Get(s"$collectionPath/${action.name}") ~> Route.seal(routes(creds)(transid())) ~> check {
+          status should be(OK)
+          val response = responseAs[WhiskAction]
+          response should be(
+            WhiskAction(
+              action.namespace,
+              action.name,
+              action.exec,
+              action.parameters,
+              action.limits,
+              action.version,
+              action.publish,
+              action.annotations ++ Parameters(WhiskAction.execFieldName, kind)))
+        }
+        stream.toString should include(s"serving from cache: ${CacheKey(action)}")
+        stream.reset()
+
+        // update should invalidate cache
+        Put(s"$collectionPath/${action.name}?overwrite=true", content) ~> Route.seal(routes(creds)(transid())) ~> check {
+          status should be(OK)
+          val response = responseAs[WhiskAction]
+          response should be {
+            WhiskAction(
+              action.namespace,
+              action.name,
+              action.exec,
+              action.parameters,
+              action.limits,
+              action.version.upPatch,
+              action.publish,
+              action.annotations ++ Parameters(WhiskAction.execFieldName, kind))
+          }
+        }
+        stream.toString should include(s"entity exists, will try to update '$action'")
+        stream.toString should include(s"invalidating ${CacheKey(action)}")
+        stream.toString should include(s"caching ${CacheKey(action)}")
+        stream.reset()
+
+        // delete should invalidate cache
+        Delete(s"$collectionPath/${action.name}") ~> Route.seal(routes(creds)(transid())) ~> check {
+          status should be(OK)
+          val response = responseAs[WhiskAction]
+          response should be(
+            WhiskAction(
+              action.namespace,
+              action.name,
+              action.exec,
+              action.parameters,
+              action.limits,
+              action.version.upPatch,
+              action.publish,
+              action.annotations ++ Parameters(WhiskAction.execFieldName, kind)))
+        }
+        stream.toString should include(s"invalidating ${CacheKey(action)}")
+        stream.reset()
     }
-    stream.toString should include(s"invalidating ${CacheKey(action)}")
-    stream.reset()
   }
 
   it should "reject put with conflict for pre-existing action" in {
diff --git a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
index 3c77f4e204..6c87faf927 100644
--- a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
+++ b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
@@ -29,16 +29,11 @@ import scala.language.postfixOps
 import scala.util.Failure
 import scala.util.Success
 import scala.util.Try
-
 import spray.json._
 import spray.json.DefaultJsonProtocol._
 import whisk.common.TransactionCounter
 import whisk.common.TransactionId
-import whisk.core.database.ArtifactStore
-import whisk.core.database.CouchDbRestClient
-import whisk.core.database.DocumentFactory
-import whisk.core.database.NoDocumentException
-import whisk.core.database.StaleParameter
+import whisk.core.database._
 import whisk.core.entity._
 import whisk.core.entity.types.AuthStore
 import whisk.core.entity.types.EntityStore
@@ -180,10 +175,12 @@ trait DbUtils extends TransactionCounter {
   /**
    * Gets document by id from datastore, and add it to gc queue to delete after the test completes.
    */
-  def get[A, Au >: A](db: ArtifactStore[Au], docid: DocId, factory: DocumentFactory[A], garbageCollect: Boolean = true)(
-    implicit transid: TransactionId,
-    timeout: Duration = 10 seconds,
-    ma: Manifest[A]): A = {
+  def get[A <: DocumentRevisionProvider, Au >: A](db: ArtifactStore[Au],
+                                                  docid: DocId,
+                                                  factory: DocumentFactory[A],
+                                                  garbageCollect: Boolean = true)(implicit transid: TransactionId,
+                                                                                  timeout: Duration = 10 seconds,
+                                                                                  ma: Manifest[A]): A = {
     val docFuture = factory.get(db, docid)
     val doc = Await.result(docFuture, timeout)
     assert(doc != null)
@@ -215,10 +212,12 @@ trait DbUtils extends TransactionCounter {
   /**
    * Puts a document 'entity' into the datastore, then do a get to retrieve it and confirm the identity.
    */
-  def putGetCheck[A, Au >: A](db: ArtifactStore[Au], entity: A, factory: DocumentFactory[A], gc: Boolean = true)(
-    implicit transid: TransactionId,
-    timeout: Duration = 10 seconds,
-    ma: Manifest[A]): (DocInfo, A) = {
+  def putGetCheck[A <: DocumentRevisionProvider, Au >: A](db: ArtifactStore[Au],
+                                                          entity: A,
+                                                          factory: DocumentFactory[A],
+                                                          gc: Boolean = true)(implicit transid: TransactionId,
+                                                                              timeout: Duration = 10 seconds,
+                                                                              ma: Manifest[A]): (DocInfo, A) = {
     val doc = put(db, entity, gc)
     assert(doc != null && doc.id.asString != null && doc.rev.asString != null)
     val future = factory.get(db, doc.id, doc.rev)
diff --git a/tests/src/test/scala/whisk/core/entity/test/ExecHelpers.scala b/tests/src/test/scala/whisk/core/entity/test/ExecHelpers.scala
index 79eb18d949..2114157138 100644
--- a/tests/src/test/scala/whisk/core/entity/test/ExecHelpers.scala
+++ b/tests/src/test/scala/whisk/core/entity/test/ExecHelpers.scala
@@ -27,6 +27,9 @@ import whisk.core.entity._
 import whisk.core.entity.ArgNormalizer.trim
 import whisk.core.entity.ExecManifest._
 
+import spray.json._
+import spray.json.DefaultJsonProtocol._
+
 trait ExecHelpers extends Matchers with WskActorSystem with StreamLogging {
   self: Suite =>
 
@@ -38,6 +41,9 @@ trait ExecHelpers extends Matchers with WskActorSystem with StreamLogging {
   protected val SWIFT = "swift"
   protected val SWIFT3 = "swift:3.1.1"
   protected val SWIFT3_IMAGE = "action-swift-v3.1.1"
+  protected val JAVA_DEFAULT = "java"
+
+  private def attFmt[T: JsonFormat] = Attachments.serdes[T]
 
   protected def imagename(name: String) = {
     var image = s"${name}action".replace(":", "")
@@ -71,6 +77,13 @@ trait ExecHelpers extends Matchers with WskActorSystem with StreamLogging {
     js6MetaData(code, main)
   }
 
+  protected def javaDefault(code: String, main: Option[String] = None) = {
+    val attachment = attFmt[String].read(code.trim.toJson)
+    val manifest = ExecManifest.runtimesManifest.resolveDefaultRuntime(JAVA_DEFAULT).get
+
+    CodeExecAsAttachment(manifest, attachment, main.map(_.trim))
+  }
+
   protected def swift(code: String, main: Option[String] = None) = {
     CodeExecAsString(RuntimeManifest(SWIFT, imagename(SWIFT), deprecated = Some(true)), trim(code), main.map(_.trim))
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services