You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/05/10 03:09:07 UTC

[GitHub] csantanapr closed pull request #2855: Cache database attachments

csantanapr closed pull request #2855: Cache database attachments
URL: https://github.com/apache/incubator-openwhisk/pull/2855
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala b/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
index b6e346ba4b..00f6669b6c 100644
--- a/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
+++ b/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
@@ -120,12 +120,14 @@ trait DocumentFactory[W <: DocumentRevisionProvider] extends MultipleReadersSing
     }
   }
 
-  def attach[Wsuper >: W](
-    db: ArtifactStore[Wsuper],
-    doc: W,
-    attachmentName: String,
-    contentType: ContentType,
-    bytes: InputStream)(implicit transid: TransactionId, notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
+  def attach[Wsuper >: W](db: ArtifactStore[Wsuper],
+                          doc: W,
+                          attachmentName: String,
+                          contentType: ContentType,
+                          bytes: InputStream,
+                          postProcess: Option[W => W] = None)(
+    implicit transid: TransactionId,
+    notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
 
     Try {
       require(db != null, "db undefined")
@@ -137,10 +139,11 @@ trait DocumentFactory[W <: DocumentRevisionProvider] extends MultipleReadersSing
       val key = CacheKey(doc)
       val docInfo = doc.docinfo
       val src = StreamConverters.fromInputStream(() => bytes)
+      val cacheDoc = postProcess map { _(doc) } getOrElse doc
 
-      cacheUpdate(doc, key, db.attach(docInfo, attachmentName, contentType, src) map { newDocInfo =>
-        doc.revision[W](newDocInfo.rev)
-        doc.docinfo
+      cacheUpdate(cacheDoc, key, db.attach(docInfo, attachmentName, contentType, src) map { newDocInfo =>
+        cacheDoc.revision[W](newDocInfo.rev)
+        cacheDoc.docinfo
       })
     } match {
       case Success(f) => f
@@ -202,26 +205,37 @@ trait DocumentFactory[W <: DocumentRevisionProvider] extends MultipleReadersSing
     }
   }
 
-  def getAttachment[Wsuper >: W](db: ArtifactStore[Wsuper],
-                                 doc: DocInfo,
-                                 attachmentName: String,
-                                 outputStream: OutputStream)(implicit transid: TransactionId): Future[Unit] = {
+  def getAttachment[Wsuper >: W](
+    db: ArtifactStore[Wsuper],
+    doc: W,
+    attachmentName: String,
+    outputStream: OutputStream,
+    postProcess: Option[W => W] = None)(implicit transid: TransactionId, mw: Manifest[W]): Future[W] = {
 
     implicit val ec = db.executionContext
+    implicit val notifier: Option[CacheChangeNotification] = None
 
     Try {
       require(db != null, "db defined")
       require(doc != null, "doc undefined")
     } map { _ =>
+      implicit val logger = db.logging
+      implicit val ec = db.executionContext
+
+      val docInfo = doc.docinfo
+      val key = CacheKey(docInfo)
       val sink = StreamConverters.fromOutputStream(() => outputStream)
-      db.readAttachment[IOResult](doc, attachmentName, sink).map {
-        case (_, r) =>
-          if (!r.wasSuccessful) {
-            // FIXME...
-            // Figure out whether OutputStreams are even a decent model.
+
+      db.readAttachment[IOResult](docInfo, attachmentName, sink).map {
+        case _ =>
+          val cacheDoc = postProcess map { _(doc) } getOrElse doc
+
+          cacheUpdate(cacheDoc, key, Future.successful(docInfo)) map { newDocInfo =>
+            cacheDoc.revision[W](newDocInfo.rev)
           }
-          ()
+          cacheDoc
       }
+
     } match {
       case Success(f) => f
       case Failure(t) => Future.failed(t)
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
index 97f98ae315..1e380c2e88 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
@@ -337,8 +337,15 @@ object WhiskAction extends DocumentFactory[WhiskAction] with WhiskEntityQueries[
           val manifest = exec.manifest.attached.get
 
           for (i1 <- super.put(db, newDoc);
-               i2 <- attach[A](db, newDoc.revision(i1.rev), manifest.attachmentName, manifest.attachmentType, stream))
-            yield i2
+               i2 <- attach[A](
+                 db,
+                 newDoc.revision(i1.rev),
+                 manifest.attachmentName,
+                 manifest.attachmentType,
+                 stream,
+                 Some { a: WhiskAction =>
+                   a.copy(exec = exec.inline(code.getBytes("UTF-8")))
+                 })) yield i2
 
         case _ =>
           super.put(db, doc)
@@ -366,12 +373,12 @@ object WhiskAction extends DocumentFactory[WhiskAction] with WhiskEntityQueries[
           val boas = new ByteArrayOutputStream()
           val b64s = Base64.getEncoder().wrap(boas)
 
-          getAttachment[A](db, action.docinfo, attachmentName, b64s).map { _ =>
+          getAttachment[A](db, action, attachmentName, b64s, Some { a: WhiskAction =>
             b64s.close()
-            val newAction = action.copy(exec = exec.inline(boas.toByteArray))
-            newAction.revision(action.rev)
+            val newAction = a.copy(exec = exec.inline(boas.toByteArray))
+            newAction.revision(a.rev)
             newAction
-          }
+          })
 
         case _ =>
           Future.successful(action)
diff --git a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
index 380c5f0b76..704659c2ab 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
@@ -40,6 +40,10 @@ import whisk.core.entitlement.Collection
 import whisk.http.ErrorResponse
 import whisk.http.Messages
 
+import java.io.ByteArrayInputStream
+import java.util.Base64
+import akka.stream.scaladsl._
+
 /**
  * Tests Actions API.
  *
@@ -742,6 +746,201 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
     }
   }
 
+  it should "put and then get an action with attachment from cache" in {
+    val action =
+      WhiskAction(namespace, aname(), javaDefault("ZHViZWU=", Some("hello")), annotations = Parameters("exec", "java"))
+    val content = WhiskActionPut(
+      Some(action.exec),
+      Some(action.parameters),
+      Some(ActionLimitsOption(Some(action.limits.timeout), Some(action.limits.memory), Some(action.limits.logs))))
+    val name = action.name
+    val cacheKey = s"${CacheKey(action)}".replace("(", "\\(").replace(")", "\\)")
+    val expectedPutLog = Seq(
+      s"caching $cacheKey",
+      s"uploading attachment 'jarfile' of document 'id: ${action.namespace}/${action.name}",
+      s"caching $cacheKey").mkString("(?s).*")
+    val notExpectedGetLog = Seq(
+      s"finding document: 'id: ${action.namespace}/${action.name}",
+      s"finding attachment 'jarfile' of document 'id: ${action.namespace}/${action.name}").mkString("(?s).*")
+
+    // first request invalidates any previous entries and caches new result
+    Put(s"$collectionPath/$name", content) ~> Route.seal(routes(creds)(transid())) ~> check {
+      status should be(OK)
+      val response = responseAs[WhiskAction]
+      response should be(
+        WhiskAction(
+          action.namespace,
+          action.name,
+          action.exec,
+          action.parameters,
+          action.limits,
+          action.version,
+          action.publish,
+          action.annotations ++ Parameters(WhiskAction.execFieldName, JAVA_DEFAULT)))
+    }
+
+    stream.toString should not include (s"invalidating ${CacheKey(action)} on delete")
+    stream.toString should include regex (expectedPutLog)
+    stream.reset()
+
+    // second request should fetch from cache
+    Get(s"$collectionPath/$name") ~> Route.seal(routes(creds)(transid())) ~> check {
+      status should be(OK)
+      val response = responseAs[WhiskAction]
+      response should be(
+        WhiskAction(
+          action.namespace,
+          action.name,
+          action.exec,
+          action.parameters,
+          action.limits,
+          action.version,
+          action.publish,
+          action.annotations ++ Parameters(WhiskAction.execFieldName, JAVA_DEFAULT)))
+    }
+
+    stream.toString should include(s"serving from cache: ${CacheKey(action)}")
+    stream.toString should not include regex(notExpectedGetLog)
+    stream.reset()
+
+    // delete should invalidate cache
+    Delete(s"$collectionPath/$name") ~> Route.seal(routes(creds)(transid())) ~> check {
+      status should be(OK)
+      val response = responseAs[WhiskAction]
+      response should be(
+        WhiskAction(
+          action.namespace,
+          action.name,
+          action.exec,
+          action.parameters,
+          action.limits,
+          action.version,
+          action.publish,
+          action.annotations ++ Parameters(WhiskAction.execFieldName, JAVA_DEFAULT)))
+    }
+    stream.toString should include(s"invalidating ${CacheKey(action)}")
+    stream.reset()
+  }
+
+  it should "get an action with attachment that is not cached" in {
+    implicit val tid = transid()
+    val code = "ZHViZWU="
+    val action =
+      WhiskAction(namespace, aname(), javaDefault(code, Some("hello")), annotations = Parameters("exec", "java"))
+    val content = WhiskActionPut(
+      Some(action.exec),
+      Some(action.parameters),
+      Some(ActionLimitsOption(Some(action.limits.timeout), Some(action.limits.memory), Some(action.limits.logs))))
+    val name = action.name
+    val cacheKey = s"${CacheKey(action)}".replace("(", "\\(").replace(")", "\\)")
+    val expectedGetLog = Seq(
+      s"finding document: 'id: ${action.namespace}/${action.name}",
+      s"finding attachment 'jarfile' of document 'id: ${action.namespace}/${action.name}").mkString("(?s).*")
+
+    action.exec match {
+      case exec @ CodeExecAsAttachment(_, _, _) =>
+        val newAction = action.copy(exec = exec.attach)
+        newAction.revision(action.rev)
+
+        val doc1 = put(entityStore, newAction, false)
+
+        val stream = new ByteArrayInputStream(Base64.getDecoder().decode(code))
+        val manifest = exec.manifest.attached.get
+        val src = StreamConverters.fromInputStream(() => stream)
+
+        attach(entityStore, doc1, manifest.attachmentName, manifest.attachmentType, src)
+
+      case _ =>
+    }
+
+    // second request should fetch from cache
+    Get(s"$collectionPath/$name") ~> Route.seal(routes(creds)(transid())) ~> check {
+      status should be(OK)
+      val response = responseAs[WhiskAction]
+      response should be(
+        WhiskAction(
+          action.namespace,
+          action.name,
+          action.exec,
+          action.parameters,
+          action.limits,
+          action.version,
+          action.publish,
+          action.annotations ++ Parameters(WhiskAction.execFieldName, JAVA_DEFAULT)))
+    }
+
+    stream.toString should include regex (expectedGetLog)
+    stream.reset()
+  }
+
+  it should "update an existing action with attachment that is not cached" in {
+    implicit val tid = transid()
+    val code = "ZHViZWU="
+    val action =
+      WhiskAction(namespace, aname(), javaDefault(code, Some("hello")), annotations = Parameters("exec", "java"))
+    val content = WhiskActionPut(
+      Some(action.exec),
+      Some(action.parameters),
+      Some(ActionLimitsOption(Some(action.limits.timeout), Some(action.limits.memory), Some(action.limits.logs))))
+    val name = action.name
+    val cacheKey = s"${CacheKey(action)}".replace("(", "\\(").replace(")", "\\)")
+    val expectedPutLog = Seq(
+      s"caching $cacheKey",
+      s"uploading attachment 'jarfile' of document 'id: ${action.namespace}/${action.name}",
+      s"caching $cacheKey").mkString("(?s).*")
+
+    action.exec match {
+      case exec @ CodeExecAsAttachment(_, _, _) =>
+        val newAction = action.copy(exec = exec.attach)
+        newAction.revision(action.rev)
+
+        val doc = put(entityStore, newAction)
+
+        val stream = new ByteArrayInputStream(Base64.getDecoder().decode(code))
+        val manifest = exec.manifest.attached.get
+        val src = StreamConverters.fromInputStream(() => stream)
+
+        attach(entityStore, doc, manifest.attachmentName, manifest.attachmentType, src)
+
+      case _ =>
+    }
+
+    Put(s"$collectionPath/$name?overwrite=true", content) ~> Route.seal(routes(creds)(transid())) ~> check {
+      status should be(OK)
+      val response = responseAs[WhiskAction]
+      response should be(
+        WhiskAction(
+          action.namespace,
+          action.name,
+          action.exec,
+          action.parameters,
+          action.limits,
+          action.version.upPatch,
+          action.publish,
+          action.annotations ++ Parameters(WhiskAction.execFieldName, JAVA_DEFAULT)))
+    }
+    stream.toString should include regex (expectedPutLog)
+    stream.reset()
+
+    // delete should invalidate cache
+    Delete(s"$collectionPath/$name") ~> Route.seal(routes(creds)(transid())) ~> check {
+      status should be(OK)
+      val response = responseAs[WhiskAction]
+      response should be(
+        WhiskAction(
+          action.namespace,
+          action.name,
+          action.exec,
+          action.parameters,
+          action.limits,
+          action.version.upPatch,
+          action.publish,
+          action.annotations ++ Parameters(WhiskAction.execFieldName, JAVA_DEFAULT)))
+    }
+    stream.toString should include(s"invalidating ${CacheKey(action)}")
+    stream.reset()
+  }
+
   it should "reject put with conflict for pre-existing action" in {
     implicit val tid = transid()
     val action = WhiskAction(namespace, aname(), jsDefault("??"), Parameters("x", "b"))
diff --git a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
index 277e1310ac..04d82a927f 100644
--- a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
+++ b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
@@ -39,6 +39,10 @@ import whisk.core.entity._
 import whisk.core.entity.types.AuthStore
 import whisk.core.entity.types.EntityStore
 
+import akka.http.scaladsl.model.ContentType
+import akka.stream.scaladsl.Source
+import akka.util.ByteString
+
 /**
  * WARNING: the put/get/del operations in this trait operate directly on the datastore,
  * and in the presence of a cache, there will be inconsistencies if one mixes these
@@ -194,6 +198,20 @@ trait DbUtils extends TransactionCounter {
     doc
   }
 
+  def attach[A, Au >: A](
+    db: ArtifactStore[Au],
+    doc: DocInfo,
+    name: String,
+    contentType: ContentType,
+    docStream: Source[ByteString, _],
+    garbageCollect: Boolean = true)(implicit transid: TransactionId, timeout: Duration = 10 seconds): DocInfo = {
+    val docFuture = db.attach(doc, name, contentType, docStream)
+    val newDoc = Await.result(docFuture, timeout)
+    assert(newDoc != null)
+    if (garbageCollect) docsToDelete += ((db, newDoc))
+    newDoc
+  }
+
   /**
    * Gets document by id from datastore, and add it to gc queue to delete after the test completes.
    */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services