You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2018/05/25 12:01:01 UTC

[incubator-openwhisk] branch master updated: Immutable attachments via putAndAttach (#3502)

This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new b392330  Immutable attachments via putAndAttach (#3502)
b392330 is described below

commit b3923300638d64b375d8b79dbc268e0ae50a6bf8
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Fri May 25 17:30:57 2018 +0530

    Immutable attachments via putAndAttach (#3502)
    
    Introduces a new method in ArtifactStore "putAndAttach" to manage document attachments for different backing object stores.
    
    The method accepts two new parameters:
       "attachedUpdater" - Method which would update the "DocumentAbstraction" with new attachment details.
       "oldAttached" - Details about old attachment.
    
    For "CouchDBRestStore"
        1. Generate a random name
        2. Update the DocumentAbstraction and save it
        3. Now attach the passed source with generated name
        4. No need for deleting old attachment as post document update CouchDB would not remember it anyway
    
    For "CouchDBRestStore" + "S3AttachmentStore"
        1. Generate a random name and then upload to S3 with generated name
        2. Update the DocumentAbstraction and save it
        3. Delete the old attachment
    
    This restricts the attachment support to 1-1 mapping i.e. one attachment per document, but this is exactly the same behavior to date, while enabling the "ArtifactStore" to handle attachment in efficient way as per its capabilities.
    
    The "attachmentName" stored in document would be a URI consisting of scheme and path, <scheme>:<attachment name/path>, where:
        scheme - Indicates where the attachment is stored or attachment store type. It can have values such as
            mem - If attachment content is inlined in name itself (TBD)
            couch - If attachment is stored in CouchDB
            s3 - If attachment is stored in S3
        attachment name/path - Is the actual attachment name in the storage
    
    The "Attached" metadata includes 2 new attributes:
        length - Captures the length of attachment
        digest - Digest of attachment with format "-" e.g. "md5-6PkVAZ00zzZKPoquAJj5Lw=="
    
    The commit also adapts the cache interface to directly accept a future, preserving existing behavior but all allowing futures to be directly cached.
---
 .../scala/whisk/core/database/ArtifactStore.scala  |  21 +++-
 .../whisk/core/database/CouchDbRestStore.scala     |  64 ++++++++--
 .../whisk/core/database/DocumentFactory.scala      |  44 ++++---
 .../MultipleReadersSingleWriterCache.scala         |  12 +-
 .../core/database/memory/MemoryArtifactStore.scala |  85 ++++++++-----
 .../main/scala/whisk/core/entity/Attachments.scala |   8 +-
 .../src/main/scala/whisk/core/entity/Exec.scala    |   6 +-
 .../main/scala/whisk/core/entity/WhiskAction.scala |  48 +++++---
 .../scala/whisk/core/entity/WhiskActivation.scala  |  12 +-
 .../scala/whisk/core/controller/ApiUtils.scala     |  15 +--
 .../main/scala/whisk/core/controller/Rules.scala   |  10 +-
 .../core/loadBalancer/InvokerSupervision.scala     |   4 +-
 .../core/controller/test/ActionsApiTests.scala     |  46 ++++----
 .../whisk/core/database/test/AttachmentTests.scala | 131 +++++++++++++++++++++
 .../scala/whisk/core/database/test/DbUtils.scala   |  13 +-
 .../ArtifactStoreAttachmentBehaviors.scala         | 101 ++++++++++++++++
 .../test/behavior/ArtifactStoreBehavior.scala      |   1 +
 .../whisk/core/entity/test/DatastoreTests.scala    |   2 +-
 18 files changed, 492 insertions(+), 131 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala b/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala
index 631f60a..3750398 100644
--- a/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala
@@ -26,6 +26,7 @@ import akka.util.ByteString
 import spray.json.JsObject
 import whisk.common.Logging
 import whisk.common.TransactionId
+import whisk.core.entity.Attachments.Attached
 import whisk.core.entity.DocInfo
 
 abstract class StaleParameter(val value: Option[String])
@@ -69,16 +70,18 @@ trait ArtifactStore[DocumentAbstraction] {
    * If the operation is successful, the future completes with the requested document if it exists.
    *
    * @param doc the document info for the record to get (must contain valid id and rev)
+   * @param attachmentHandler function to update the attachment details in document
    * @param transid the transaction id for logging
    * @param ma manifest for A to determine its runtime type, required by some db APIs
    * @return a future that completes either with DocumentAbstraction if the document exists and is deserializable into desired type
    */
-  protected[database] def get[A <: DocumentAbstraction](doc: DocInfo)(implicit transid: TransactionId,
-                                                                      ma: Manifest[A]): Future[A]
+  protected[database] def get[A <: DocumentAbstraction](
+    doc: DocInfo,
+    attachmentHandler: Option[(A, Attached) => A] = None)(implicit transid: TransactionId, ma: Manifest[A]): Future[A]
 
   /**
    * Gets all documents from database view that match a start key, up to an end key, using a future.
-   * If the operation is successful, the promise completes with List[View]] with zero or more documents.
+   * If the operation is successful, the promise completes with List[View] with zero or more documents.
    *
    * @param table the name of the table to query
    * @param startKey to starting key to query the view for
@@ -119,9 +122,17 @@ trait ArtifactStore[DocumentAbstraction] {
 
   /**
    * Attaches a "file" of type `contentType` to an existing document. The revision for the document must be set.
+   *
+   * @param update - function to transform the document with new attachment details
+   * @param oldAttachment Optional old document instance for the update scenario. It would be used to determine
+   *                      the existing attachment details.
    */
-  protected[core] def attach(doc: DocInfo, name: String, contentType: ContentType, docStream: Source[ByteString, _])(
-    implicit transid: TransactionId): Future[DocInfo]
+  protected[database] def putAndAttach[A <: DocumentAbstraction](
+    d: A,
+    update: (A, Attached) => A,
+    contentType: ContentType,
+    docStream: Source[ByteString, _],
+    oldAttachment: Option[Attached])(implicit transid: TransactionId): Future[(DocInfo, Attached)]
 
   /**
    * Retrieves a saved attachment, streaming it into the provided Sink.
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
index 0d97077..ffa2aaf 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
@@ -28,11 +28,13 @@ import akka.stream.scaladsl._
 import akka.util.ByteString
 import spray.json._
 import whisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
+import whisk.core.entity.Attachments.Attached
 import whisk.core.database.StoreUtils._
 import whisk.core.entity.BulkEntityResult
 import whisk.core.entity.DocInfo
-import whisk.http.Messages
 import whisk.core.entity.DocumentReader
+import whisk.core.entity.UUID
+import whisk.http.Messages
 
 /**
  * Basic client to put and delete artifacts in a data store.
@@ -62,6 +64,7 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
 
   protected[core] implicit val executionContext = system.dispatcher
 
+  private val attachmentScheme = "couch"
   private val client: CouchDbRestClient =
     new CouchDbRestClient(dbProtocol, dbHost, dbPort.toInt, dbUsername, dbPassword, dbName)
 
@@ -207,8 +210,10 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
           ErrorLevel))
   }
 
-  override protected[database] def get[A <: DocumentAbstraction](doc: DocInfo)(implicit transid: TransactionId,
-                                                                               ma: Manifest[A]): Future[A] = {
+  override protected[database] def get[A <: DocumentAbstraction](doc: DocInfo,
+                                                                 attachmentHandler: Option[(A, Attached) => A] = None)(
+    implicit transid: TransactionId,
+    ma: Manifest[A]): Future[A] = {
 
     val start = transid.started(this, LoggingMarkers.DATABASE_GET, s"[GET] '$dbName' finding document: '$doc'")
 
@@ -223,7 +228,8 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
       e match {
         case Right(response) =>
           transid.finished(this, start, s"[GET] '$dbName' completed: found document '$doc'")
-          deserialize[A, DocumentAbstraction](doc, response)
+          val deserializedDoc = deserialize[A, DocumentAbstraction](doc, response)
+          attachmentHandler.map(processAttachments(deserializedDoc, response, _)).getOrElse(deserializedDoc)
         case Left(StatusCodes.NotFound) =>
           transid.finished(this, start, s"[GET] '$dbName', document: '${doc}'; not found.")
           // for compatibility
@@ -343,11 +349,25 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
         transid.failed(this, start, s"[COUNT] '$dbName' internal error, failure: '${failure.getMessage}'", ErrorLevel))
   }
 
-  override protected[core] def attach(
-    doc: DocInfo,
-    name: String,
+  override protected[database] def putAndAttach[A <: DocumentAbstraction](
+    d: A,
+    update: (A, Attached) => A,
     contentType: ContentType,
-    docStream: Source[ByteString, _])(implicit transid: TransactionId): Future[DocInfo] = {
+    docStream: Source[ByteString, _],
+    oldAttachment: Option[Attached])(implicit transid: TransactionId): Future[(DocInfo, Attached)] = {
+
+    val attachmentUri = Uri.from(scheme = attachmentScheme, path = UUID().asString)
+    val attached = Attached(attachmentUri.toString(), contentType)
+    val updatedDoc = update(d, attached)
+
+    for {
+      i1 <- put(updatedDoc)
+      i2 <- attach(i1, attachmentUri.path.toString(), attached.attachmentType, docStream)
+    } yield (i2, attached)
+  }
+
+  private def attach(doc: DocInfo, name: String, contentType: ContentType, docStream: Source[ByteString, _])(
+    implicit transid: TransactionId): Future[DocInfo] = {
 
     val start = transid.started(
       this,
@@ -401,7 +421,8 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
     require(doc != null, "doc undefined")
     require(doc.rev.rev != null, "doc revision must be specified")
 
-    val f = client.getAttachment[T](doc.id.id, doc.rev.rev, name, sink)
+    val attachmentUri = Uri(name)
+    val f = client.getAttachment[T](doc.id.id, doc.rev.rev, attachmentUri.path.toString(), sink)
     val g = f.map { e =>
       e match {
         case Right((contentType, result)) =>
@@ -443,6 +464,31 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
     Await.ready(client.shutdown(), 1.minute)
   }
 
+  private def processAttachments[A <: DocumentAbstraction](doc: A,
+                                                           js: JsObject,
+                                                           attachmentHandler: (A, Attached) => A): A = {
+    js.fields
+      .get("_attachments")
+      .map {
+        case JsObject(fields) if fields.size == 1 =>
+          val (name, value) = fields.head
+          value.asJsObject.getFields("content_type", "digest", "length") match {
+            case Seq(JsString(contentTypeValue), JsString(digest), JsNumber(length)) =>
+              val attachmentName = Uri.from(scheme = attachmentScheme, path = name).toString()
+              val contentType = ContentType.parse(contentTypeValue) match {
+                case Right(ct) => ct
+                case Left(_)   => ContentTypes.NoContentType //Should not happen
+              }
+              attachmentHandler(doc, Attached(attachmentName, contentType, Some(length.intValue()), Some(digest)))
+            case x =>
+              throw DeserializationException("Attachment json does not have required fields" + x)
+
+          }
+        case x => throw DeserializationException("Multiple attachments found" + x)
+      }
+      .getOrElse(doc)
+  }
+
   private def reportFailure[T, U](f: Future[T], onFailure: Throwable => U): Future[T] = {
     f.onFailure({
       case _: ArtifactStoreException => // These failures are intentional and shouldn't trigger the catcher.
diff --git a/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala b/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
index 00f6669..38827b3 100644
--- a/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
+++ b/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
@@ -20,7 +20,7 @@ package whisk.core.database
 import java.io.InputStream
 import java.io.OutputStream
 
-import scala.concurrent.Future
+import scala.concurrent.{Future, Promise}
 import scala.util.Failure
 import scala.util.Success
 import scala.util.Try
@@ -29,6 +29,7 @@ import akka.stream.IOResult
 import akka.stream.scaladsl.StreamConverters
 import spray.json.JsObject
 import whisk.common.TransactionId
+import whisk.core.entity.Attachments.Attached
 import whisk.core.entity.CacheKey
 import whisk.core.entity.DocId
 import whisk.core.entity.DocInfo
@@ -95,9 +96,10 @@ trait DocumentFactory[W <: DocumentRevisionProvider] extends MultipleReadersSing
    * @param doc the entity to store
    * @param transid the transaction id for logging
    * @param notifier an optional callback when cache changes
+   * @param old an optional old document in case of update
    * @return Future[DocInfo] with completion to DocInfo containing the save document id and revision
    */
-  def put[Wsuper >: W](db: ArtifactStore[Wsuper], doc: W)(
+  def put[Wsuper >: W](db: ArtifactStore[Wsuper], doc: W, old: Option[W])(
     implicit transid: TransactionId,
     notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
     Try {
@@ -120,12 +122,13 @@ trait DocumentFactory[W <: DocumentRevisionProvider] extends MultipleReadersSing
     }
   }
 
-  def attach[Wsuper >: W](db: ArtifactStore[Wsuper],
-                          doc: W,
-                          attachmentName: String,
-                          contentType: ContentType,
-                          bytes: InputStream,
-                          postProcess: Option[W => W] = None)(
+  def putAndAttach[Wsuper >: W](db: ArtifactStore[Wsuper],
+                                doc: W,
+                                update: (W, Attached) => W,
+                                contentType: ContentType,
+                                bytes: InputStream,
+                                oldAttachment: Option[Attached],
+                                postProcess: Option[W => W] = None)(
     implicit transid: TransactionId,
     notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
 
@@ -137,14 +140,18 @@ trait DocumentFactory[W <: DocumentRevisionProvider] extends MultipleReadersSing
       implicit val ec = db.executionContext
 
       val key = CacheKey(doc)
-      val docInfo = doc.docinfo
       val src = StreamConverters.fromInputStream(() => bytes)
-      val cacheDoc = postProcess map { _(doc) } getOrElse doc
 
-      cacheUpdate(cacheDoc, key, db.attach(docInfo, attachmentName, contentType, src) map { newDocInfo =>
-        cacheDoc.revision[W](newDocInfo.rev)
-        cacheDoc.docinfo
+      val p = Promise[W]
+      cacheUpdate(p.future, key, db.putAndAttach[W](doc, update, contentType, src, oldAttachment) map {
+        case (newDocInfo, attached) =>
+          val newDoc = update(doc, attached)
+          val cacheDoc = postProcess map { _(newDoc) } getOrElse newDoc
+          cacheDoc.revision[W](newDocInfo.rev)
+          p.success(cacheDoc)
+          newDocInfo
       })
+
     } match {
       case Success(f) => f
       case Failure(t) => Future.failed(t)
@@ -191,6 +198,15 @@ trait DocumentFactory[W <: DocumentRevisionProvider] extends MultipleReadersSing
     doc: DocId,
     rev: DocRevision = DocRevision.empty,
     fromCache: Boolean = cacheEnabled)(implicit transid: TransactionId, mw: Manifest[W]): Future[W] = {
+    getWithAttachment(db, doc, rev, fromCache, None)
+  }
+
+  protected def getWithAttachment[Wsuper >: W](
+    db: ArtifactStore[Wsuper],
+    doc: DocId,
+    rev: DocRevision = DocRevision.empty,
+    fromCache: Boolean,
+    attachmentHandler: Option[(W, Attached) => W])(implicit transid: TransactionId, mw: Manifest[W]): Future[W] = {
     Try {
       require(db != null, "db undefined")
     } map {
@@ -198,7 +214,7 @@ trait DocumentFactory[W <: DocumentRevisionProvider] extends MultipleReadersSing
       implicit val ec = db.executionContext
       val key = doc.asDocInfo(rev)
       _ =>
-        cacheLookup(CacheKey(key), db.get[W](key), fromCache)
+        cacheLookup(CacheKey(key), db.get[W](key, attachmentHandler), fromCache)
     } match {
       case Success(f) => f
       case Failure(t) => Future.failed(t)
diff --git a/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala b/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
index 4175180..a2f5c46 100644
--- a/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
+++ b/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
@@ -261,10 +261,18 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
     } else generator // not caching
   }
 
+  protected def cacheUpdate(doc: W, key: CacheKey, generator: => Future[Winfo])(
+    implicit ec: ExecutionContext,
+    transid: TransactionId,
+    logger: Logging,
+    notifier: Option[CacheChangeNotification]): Future[Winfo] = {
+    cacheUpdate(Future.successful(doc), key, generator)
+  }
+
   /**
    * This method posts an update to the backing store, and potentially stores the result in the cache.
    */
-  protected def cacheUpdate(doc: W, key: CacheKey, generator: => Future[Winfo])(
+  protected def cacheUpdate(f: Future[W], key: CacheKey, generator: => Future[Winfo])(
     implicit ec: ExecutionContext,
     transid: TransactionId,
     logger: Logging,
@@ -272,7 +280,7 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
     if (cacheEnabled) {
 
       // try inserting our desired entry...
-      val desiredEntry = Entry(transid, WriteInProgress, Some(Future.successful(doc)))
+      val desiredEntry = Entry(transid, WriteInProgress, Some(f))
       cache(key)(desiredEntry) flatMap { actualEntry =>
         // ... and see what we get back
 
diff --git a/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala b/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
index e228652..e127599 100644
--- a/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
@@ -17,8 +17,11 @@
 
 package whisk.core.database.memory
 
+import java.security.MessageDigest
+import java.util.Base64
+
 import akka.actor.ActorSystem
-import akka.http.scaladsl.model.ContentType
+import akka.http.scaladsl.model.{ContentType, Uri}
 import akka.stream.ActorMaterializer
 import akka.stream.scaladsl.{Keep, Sink, Source}
 import akka.util.{ByteString, ByteStringBuilder}
@@ -26,6 +29,7 @@ import spray.json.{DefaultJsonProtocol, DeserializationException, JsObject, JsSt
 import whisk.common.{Logging, LoggingMarkers, TransactionId}
 import whisk.core.database.StoreUtils._
 import whisk.core.database._
+import whisk.core.entity.Attachments.Attached
 import whisk.core.entity._
 import whisk.http.Messages
 
@@ -86,6 +90,7 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str
 
   private val _id = "_id"
   private val _rev = "_rev"
+  private val attachmentScheme = "mem"
 
   override protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo] = {
     val asJson = d.toDocumentRecord
@@ -136,7 +141,7 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str
         true
       } else if (artifacts.contains(doc.id.id)) {
         //Indicates that document exist but revision does not match
-        transid.finished(this, start, s"[DEL] '$dbName', document: '${doc}'; conflict.")
+        transid.finished(this, start, s"[DEL] '$dbName', document: '$doc'; conflict.")
         throw DocumentConflictException("conflict on 'delete'")
       } else {
         transid.finished(this, start, s"[DEL] '$dbName', document: '$doc'; not found.")
@@ -150,8 +155,10 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str
     reportFailure(f, start, failure => s"[DEL] '$dbName' internal error, doc: '$doc', failure: '${failure.getMessage}'")
   }
 
-  override protected[database] def get[A <: DocumentAbstraction](doc: DocInfo)(implicit transid: TransactionId,
-                                                                               ma: Manifest[A]): Future[A] = {
+  override protected[database] def get[A <: DocumentAbstraction](doc: DocInfo,
+                                                                 attachmentHandler: Option[(A, Attached) => A] = None)(
+    implicit transid: TransactionId,
+    ma: Manifest[A]): Future[A] = {
     val start = transid.started(this, LoggingMarkers.DATABASE_GET, s"[GET] '$dbName' finding document: '$doc'")
 
     require(doc != null, "doc undefined")
@@ -245,9 +252,10 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str
       LoggingMarkers.DATABASE_ATT_GET,
       s"[ATT_GET] '$dbName' finding attachment '$name' of document '$doc'")
 
+    val storedName = Uri(name).path.toString()
     artifacts.get(doc.id.id) match {
-      case Some(a: Artifact) if a.attachments.contains(name) =>
-        val attachment = a.attachments(name)
+      case Some(a: Artifact) if a.attachments.contains(storedName) =>
+        val attachment = a.attachments(storedName)
         val r = Source.single(attachment.bytes).toMat(sink)(Keep.right).run
         transid.finished(this, start, s"[ATT_GET] '$dbName' completed: found attachment '$name' of document '$doc'")
         r.map(t => (attachment.contentType, t))
@@ -260,11 +268,27 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str
     Future.successful(true)
   }
 
-  override protected[core] def attach(
-    doc: DocInfo,
-    name: String,
+  override protected[database] def putAndAttach[A <: DocumentAbstraction](
+    d: A,
+    update: (A, Attached) => A,
     contentType: ContentType,
-    docStream: Source[ByteString, _])(implicit transid: TransactionId): Future[DocInfo] = {
+    docStream: Source[ByteString, _],
+    oldAttachment: Option[Attached])(implicit transid: TransactionId): Future[(DocInfo, Attached)] = {
+
+    val attachmentUri = Uri.from(scheme = attachmentScheme, path = UUID().asString)
+
+    for {
+      bytes <- toByteString(docStream)
+      attached <- Future.successful(
+        Attached(attachmentUri.toString(), contentType, Some(bytes.size), Some(digest(bytes))))
+      updatedDoc <- Future.successful(update(d, attached))
+      i1 <- put(updatedDoc)
+      i2 <- attach(i1, attachmentUri.path.toString(), attached.attachmentType, bytes)
+    } yield (i2, attached)
+  }
+
+  private def attach(doc: DocInfo, name: String, contentType: ContentType, bytes: ByteString)(
+    implicit transid: TransactionId): Future[DocInfo] = {
 
     val start = transid.started(
       this,
@@ -272,25 +296,22 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str
       s"[ATT_PUT] '$dbName' uploading attachment '$name' of document '$doc'")
 
     //TODO Temporary implementation till MemoryAttachmentStore PR is merged
-    val f = docStream.runFold(new ByteStringBuilder)((builder, b) => builder ++= b)
-    val g = f
-      .map { b =>
-        artifacts.get(doc.id.id) match {
-          case Some(a) =>
-            val existing = Artifact(doc, a.doc, a.computed)
-            val updated = existing.attach(name, Attachment(b.result().compact, contentType))
-            if (artifacts.replace(doc.id.id, existing, updated)) {
-              transid
-                .finished(this, start, s"[ATT_PUT] '$dbName' completed uploading attachment '$name' of document '$doc'")
-              updated.docInfo
-            } else {
-              throw DocumentConflictException("conflict on 'put'")
-            }
-          case None =>
+    val g =
+      artifacts.get(doc.id.id) match {
+        case Some(a) =>
+          val existing = Artifact(doc, a.doc, a.computed)
+          val updated = existing.attach(name, Attachment(bytes, contentType))
+          if (artifacts.replace(doc.id.id, existing, updated)) {
+            transid
+              .finished(this, start, s"[ATT_PUT] '$dbName' completed uploading attachment '$name' of document '$doc'")
+            updated.docInfo
+          } else {
             throw DocumentConflictException("conflict on 'put'")
-        }
+          }
+        case None =>
+          throw DocumentConflictException("conflict on 'put'")
       }
-    g
+    Future.successful(g)
   }
 
   override def shutdown(): Unit = {
@@ -316,6 +337,16 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str
     reportFailure(f, start, failure => s"[GET] '$dbName' internal error, doc: '$id', failure: '${failure.getMessage}'")
   }
 
+  private def toByteString(docStream: Source[ByteString, _]) =
+    docStream.runFold(new ByteStringBuilder)((builder, b) => builder ++= b).map(_.result().compact)
+
+  private def digest(bytes: ByteString) = {
+    val digestBytes = MessageDigest
+      .getInstance("MD5")
+      .digest(bytes.toArray)
+    s"md5-${Base64.getUrlEncoder.encodeToString(digestBytes)}"
+  }
+
   private def getRevision(asJson: JsObject) = {
     asJson.fields.get(_rev) match {
       case Some(JsString(r)) => r.toInt
diff --git a/common/scala/src/main/scala/whisk/core/entity/Attachments.scala b/common/scala/src/main/scala/whisk/core/entity/Attachments.scala
index d7792cb..6ec9bae 100644
--- a/common/scala/src/main/scala/whisk/core/entity/Attachments.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/Attachments.scala
@@ -41,7 +41,11 @@ object Attachments {
 
   case class Inline[T](value: T) extends Attachment[T]
 
-  case class Attached(attachmentName: String, attachmentType: ContentType) extends Attachment[Nothing]
+  case class Attached(attachmentName: String,
+                      attachmentType: ContentType,
+                      length: Option[Int] = None,
+                      digest: Option[String] = None)
+      extends Attachment[Nothing]
 
   // Attachments are considered free because the name/content type are system determined
   // and a size check for the content is done during create/update
@@ -65,7 +69,7 @@ object Attachments {
           }
       }
 
-      jsonFormat2(Attached.apply)
+      jsonFormat4(Attached.apply)
     }
   }
 
diff --git a/common/scala/src/main/scala/whisk/core/entity/Exec.scala b/common/scala/src/main/scala/whisk/core/entity/Exec.scala
index 8f31812..a281144 100644
--- a/common/scala/src/main/scala/whisk/core/entity/Exec.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/Exec.scala
@@ -155,10 +155,8 @@ protected[core] case class CodeExecAsAttachment(manifest: RuntimeManifest,
     copy(code = Inline(encoded))
   }
 
-  def attach: CodeExecAsAttachment = {
-    manifest.attached.map { a =>
-      copy(code = Attached(a.attachmentName, a.attachmentType))
-    } getOrElse this
+  def attach(attached: Attached): CodeExecAsAttachment = {
+    copy(code = attached)
   }
 }
 
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
index 1e380c2..2ecbd3e 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
@@ -317,7 +317,7 @@ object WhiskAction extends DocumentFactory[WhiskAction] with WhiskEntityQueries[
   val requireWhiskAuthHeader = "x-require-whisk-auth"
 
   // overriden to store attached code
-  override def put[A >: WhiskAction](db: ArtifactStore[A], doc: WhiskAction)(
+  override def put[A >: WhiskAction](db: ArtifactStore[A], doc: WhiskAction, old: Option[WhiskAction])(
     implicit transid: TransactionId,
     notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
 
@@ -330,25 +330,27 @@ object WhiskAction extends DocumentFactory[WhiskAction] with WhiskEntityQueries[
           implicit val logger = db.logging
           implicit val ec = db.executionContext
 
-          val newDoc = doc.copy(exec = exec.attach)
-          newDoc.revision(doc.rev)
-
           val stream = new ByteArrayInputStream(Base64.getDecoder().decode(code))
           val manifest = exec.manifest.attached.get
-
-          for (i1 <- super.put(db, newDoc);
-               i2 <- attach[A](
-                 db,
-                 newDoc.revision(i1.rev),
-                 manifest.attachmentName,
-                 manifest.attachmentType,
-                 stream,
-                 Some { a: WhiskAction =>
-                   a.copy(exec = exec.inline(code.getBytes("UTF-8")))
-                 })) yield i2
+          val oldAttachment = old
+            .flatMap(_.exec match {
+              case CodeExecAsAttachment(_, a: Attached, _) => Some(a)
+              case _                                       => None
+            })
+
+          super.putAndAttach(
+            db,
+            doc,
+            (d, a) => d.copy(exec = exec.attach(a)).revision[WhiskAction](d.rev),
+            manifest.attachmentType,
+            stream,
+            oldAttachment,
+            Some { a: WhiskAction =>
+              a.copy(exec = exec.inline(code.getBytes("UTF-8")))
+            })
 
         case _ =>
-          super.put(db, doc)
+          super.put(db, doc, old)
       }
     } match {
       case Success(f) => f
@@ -365,11 +367,11 @@ object WhiskAction extends DocumentFactory[WhiskAction] with WhiskEntityQueries[
 
     implicit val ec = db.executionContext
 
-    val fa = super.get(db, doc, rev, fromCache)
+    val fa = super.getWithAttachment(db, doc, rev, fromCache, Some(attachmentHandler _))
 
     fa.flatMap { action =>
       action.exec match {
-        case exec @ CodeExecAsAttachment(_, Attached(attachmentName, _), _) =>
+        case exec @ CodeExecAsAttachment(_, Attached(attachmentName, _, _, _), _) =>
           val boas = new ByteArrayOutputStream()
           val b64s = Base64.getEncoder().wrap(boas)
 
@@ -386,6 +388,16 @@ object WhiskAction extends DocumentFactory[WhiskAction] with WhiskEntityQueries[
     }
   }
 
+  def attachmentHandler(action: WhiskAction, attached: Attached): WhiskAction = {
+    val eu = action.exec match {
+      case exec @ CodeExecAsAttachment(_, Attached(attachmentName, _, _, _), _) =>
+        require(attachmentName == attached.attachmentName)
+        exec.attach(attached)
+      case exec => exec
+    }
+    action.copy(exec = eu).revision[WhiskAction](action.rev)
+  }
+
   override def del[Wsuper >: WhiskAction](db: ArtifactStore[Wsuper], doc: DocInfo)(
     implicit transid: TransactionId,
     notifier: Option[CacheChangeNotification]): Future[Boolean] = {
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala
index 891aa20..899147c 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala
@@ -21,15 +21,11 @@ import java.time.Instant
 
 import scala.concurrent.Future
 import scala.util.Try
-
 import spray.json._
 import spray.json.DefaultJsonProtocol._
 import whisk.common.TransactionId
 import whisk.core.ConfigKeys
-import whisk.core.database.ArtifactStore
-import whisk.core.database.DocumentFactory
-import whisk.core.database.StaleParameter
-
+import whisk.core.database.{ArtifactStore, CacheChangeNotification, DocumentFactory, StaleParameter}
 import pureconfig._
 
 /**
@@ -200,4 +196,10 @@ object WhiskActivation
     val endKey = List(namespace.addPath(path).asString, upto map { _.toEpochMilli } getOrElse TOP, TOP)
     query(db, filtersView, startKey, endKey, skip, limit, reduce = false, stale, convert)
   }
+
+  def put[Wsuper >: WhiskActivation](db: ArtifactStore[Wsuper], doc: WhiskActivation)(
+    implicit transid: TransactionId,
+    notifier: Option[CacheChangeNotification]): Future[DocInfo] =
+    //As activations are not updated we just pass None for the old document
+    super.put(db, doc, None)
 }
diff --git a/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala b/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala
index 87e6576..7bb0f2a 100644
--- a/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala
@@ -253,7 +253,7 @@ trait WriteOps extends Directives {
     onComplete(factory.get(datastore, docid) flatMap { doc =>
       if (overwrite) {
         logging.debug(this, s"[PUT] entity exists, will try to update '$doc'")
-        update(doc)
+        update(doc).map(updatedDoc => (Some(doc), updatedDoc))
       } else if (treatExistsAsConflict) {
         logging.debug(this, s"[PUT] entity exists, but overwrite is not enabled, aborting")
         Future failed RejectRequest(Conflict, "resource already exists")
@@ -263,12 +263,13 @@ trait WriteOps extends Directives {
     } recoverWith {
       case _: NoDocumentException =>
         logging.debug(this, s"[PUT] entity does not exist, will try to create it")
-        create()
-    } flatMap { a =>
-      logging.debug(this, s"[PUT] entity created/updated, writing back to datastore")
-      factory.put(datastore, a) map { _ =>
-        a
-      }
+        create().map(newDoc => (None, newDoc))
+    } flatMap {
+      case (old, a) =>
+        logging.debug(this, s"[PUT] entity created/updated, writing back to datastore")
+        factory.put(datastore, a, old) map { _ =>
+          a
+        }
     }) {
       case Success(entity) =>
         logging.debug(this, s"[PUT] entity success")
diff --git a/core/controller/src/main/scala/whisk/core/controller/Rules.scala b/core/controller/src/main/scala/whisk/core/controller/Rules.scala
index 5e99ff9..87fe771 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Rules.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Rules.scala
@@ -157,7 +157,7 @@ trait WhiskRulesApi extends WhiskCollectionAPI with ReferencedEntities {
               WhiskTrigger.get(entityStore, rule.trigger.toDocId) flatMap { trigger =>
                 val newTrigger = trigger.removeRule(ruleName)
                 val triggerLink = ReducedRule(rule.action, newStatus)
-                WhiskTrigger.put(entityStore, newTrigger.addRule(ruleName, triggerLink))
+                WhiskTrigger.put(entityStore, newTrigger.addRule(ruleName, triggerLink), Some(trigger))
               }
           }
 
@@ -208,7 +208,7 @@ trait WhiskRulesApi extends WhiskCollectionAPI with ReferencedEntities {
         } flatMap {
           case (status, triggerOpt) =>
             triggerOpt map { trigger =>
-              WhiskTrigger.put(entityStore, trigger.removeRule(ruleName)) map { _ =>
+              WhiskTrigger.put(entityStore, trigger.removeRule(ruleName), triggerOpt) map { _ =>
                 {}
               }
             } getOrElse Future.successful({})
@@ -293,7 +293,7 @@ trait WhiskRulesApi extends WhiskCollectionAPI with ReferencedEntities {
 
           val triggerLink = ReducedRule(actionName, Status.ACTIVE)
           logging.debug(this, s"about to put ${trigger.addRule(ruleName, triggerLink)}")
-          WhiskTrigger.put(entityStore, trigger.addRule(ruleName, triggerLink)) map { _ =>
+          WhiskTrigger.put(entityStore, trigger.addRule(ruleName, triggerLink), old = None) map { _ =>
             rule
           }
       }
@@ -331,11 +331,11 @@ trait WhiskRulesApi extends WhiskCollectionAPI with ReferencedEntities {
             isDifferentTrigger <- content.trigger.filter(_ => newTriggerName != oldTriggerName)
             oldTrigger <- oldTriggerOpt
           } yield {
-            WhiskTrigger.put(entityStore, oldTrigger.removeRule(ruleName))
+            WhiskTrigger.put(entityStore, oldTrigger.removeRule(ruleName), oldTriggerOpt)
           }
 
           val triggerLink = ReducedRule(actionName, status)
-          val update = WhiskTrigger.put(entityStore, newTrigger.addRule(ruleName, triggerLink))
+          val update = WhiskTrigger.put(entityStore, newTrigger.addRule(ruleName, triggerLink), oldTriggerOpt)
           Future.sequence(Seq(deleteOldLink.getOrElse(Future.successful(true)), update)).map(_ => r)
       }
     }
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index 1ac686d..e33845e 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -177,10 +177,10 @@ object InvokerPool {
     WhiskAction
       .get(db, action.docid)
       .flatMap { oldAction =>
-        WhiskAction.put(db, action.revision(oldAction.rev))(tid, notifier = None)
+        WhiskAction.put(db, action.revision(oldAction.rev), Some(oldAction))(tid, notifier = None)
       }
       .recover {
-        case _: NoDocumentException => WhiskAction.put(db, action)(tid, notifier = None)
+        case _: NoDocumentException => WhiskAction.put(db, action, old = None)(tid, notifier = None)
       }
       .map(_ => {})
       .andThen {
diff --git a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
index 609e773..c8d360e 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
@@ -788,13 +788,12 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
       Some(ActionLimitsOption(Some(action.limits.timeout), Some(action.limits.memory), Some(action.limits.logs))))
     val name = action.name
     val cacheKey = s"${CacheKey(action)}".replace("(", "\\(").replace(")", "\\)")
-    val expectedPutLog = Seq(
-      s"caching $cacheKey",
-      s"uploading attachment 'jarfile' of document 'id: ${action.namespace}/${action.name}",
-      s"caching $cacheKey").mkString("(?s).*")
+    val expectedPutLog =
+      Seq(s"uploading attachment '[\\w-]+' of document 'id: ${action.namespace}/${action.name}", s"caching $cacheKey")
+        .mkString("(?s).*")
     val notExpectedGetLog = Seq(
       s"finding document: 'id: ${action.namespace}/${action.name}",
-      s"finding attachment 'jarfile' of document 'id: ${action.namespace}/${action.name}").mkString("(?s).*")
+      s"finding attachment '[\\w-/:]+' of document 'id: ${action.namespace}/${action.name}").mkString("(?s).*")
 
     // first request invalidates any previous entries and caches new result
     Put(s"$collectionPath/$name", content) ~> Route.seal(routes(creds)(transid())) ~> check {
@@ -868,20 +867,20 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
     val cacheKey = s"${CacheKey(action)}".replace("(", "\\(").replace(")", "\\)")
     val expectedGetLog = Seq(
       s"finding document: 'id: ${action.namespace}/${action.name}",
-      s"finding attachment 'jarfile' of document 'id: ${action.namespace}/${action.name}").mkString("(?s).*")
+      s"finding attachment '[\\w-/:]+' of document 'id: ${action.namespace}/${action.name}").mkString("(?s).*")
 
     action.exec match {
       case exec @ CodeExecAsAttachment(_, _, _) =>
-        val newAction = action.copy(exec = exec.attach)
-        newAction.revision(action.rev)
-
-        val doc1 = put(entityStore, newAction, false)
-
         val stream = new ByteArrayInputStream(Base64.getDecoder().decode(code))
         val manifest = exec.manifest.attached.get
         val src = StreamConverters.fromInputStream(() => stream)
-
-        attach(entityStore, doc1, manifest.attachmentName, manifest.attachmentType, src)
+        putAndAttach[WhiskAction, WhiskEntity](
+          entityStore,
+          action,
+          (d, a) => d.copy(exec = exec.attach(a)).revision[WhiskAction](d.rev),
+          manifest.attachmentType,
+          src,
+          None)
 
       case _ =>
     }
@@ -917,23 +916,22 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
       Some(ActionLimitsOption(Some(action.limits.timeout), Some(action.limits.memory), Some(action.limits.logs))))
     val name = action.name
     val cacheKey = s"${CacheKey(action)}".replace("(", "\\(").replace(")", "\\)")
-    val expectedPutLog = Seq(
-      s"caching $cacheKey",
-      s"uploading attachment 'jarfile' of document 'id: ${action.namespace}/${action.name}",
-      s"caching $cacheKey").mkString("(?s).*")
+    val expectedPutLog =
+      Seq(s"uploading attachment '[\\w-/:]+' of document 'id: ${action.namespace}/${action.name}", s"caching $cacheKey")
+        .mkString("(?s).*")
 
     action.exec match {
       case exec @ CodeExecAsAttachment(_, _, _) =>
-        val newAction = action.copy(exec = exec.attach)
-        newAction.revision(action.rev)
-
-        val doc = put(entityStore, newAction)
-
         val stream = new ByteArrayInputStream(Base64.getDecoder().decode(code))
         val manifest = exec.manifest.attached.get
         val src = StreamConverters.fromInputStream(() => stream)
-
-        attach(entityStore, doc, manifest.attachmentName, manifest.attachmentType, src)
+        putAndAttach[WhiskAction, WhiskEntity](
+          entityStore,
+          action,
+          (d, a) => d.copy(exec = exec.attach(a)).revision[WhiskAction](d.rev),
+          manifest.attachmentType,
+          src,
+          None)
 
       case _ =>
     }
diff --git a/tests/src/test/scala/whisk/core/database/test/AttachmentTests.scala b/tests/src/test/scala/whisk/core/database/test/AttachmentTests.scala
new file mode 100644
index 0000000..bde3ee5
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/test/AttachmentTests.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.test
+
+import java.util.Base64
+
+import akka.http.scaladsl.model.Uri
+import akka.stream.ActorMaterializer
+import common.{StreamLogging, WskActorSystem}
+import org.junit.runner.RunWith
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec}
+import whisk.common.TransactionId
+import whisk.core.controller.test.WhiskAuthHelpers
+import whisk.core.database.CacheChangeNotification
+import whisk.core.entity.Attachments.{Attached, Attachment, Inline}
+import whisk.core.entity._
+import whisk.core.entity.test.ExecHelpers
+
+import scala.util.Random
+
+@RunWith(classOf[JUnitRunner])
+class AttachmentTests
+    extends FlatSpec
+    with BeforeAndAfterEach
+    with BeforeAndAfterAll
+    with WskActorSystem
+    with DbUtils
+    with ExecHelpers
+    with ScalaFutures
+    with StreamLogging {
+
+  implicit val materializer = ActorMaterializer()
+  private val namespace = EntityPath(WhiskAuthHelpers.newIdentity().subject.asString)
+  private val datastore = WhiskEntityStore.datastore()
+  private val attachmentHandler = Some(WhiskAction.attachmentHandler _)
+
+  implicit val cacheUpdateNotifier: Option[CacheChangeNotification] = None
+  implicit override val patienceConfig: PatienceConfig = PatienceConfig(timeout = dbOpTimeout)
+
+  override def afterEach() = {
+    cleanup()
+  }
+
+  override def afterAll() = {
+    datastore.shutdown()
+    super.afterAll()
+  }
+
+  behavior of "Datastore"
+
+  it should "generate different attachment name on update" in {
+    implicit val tid: TransactionId = transid()
+    val exec = javaDefault("ZHViZWU=", Some("hello"))
+    val javaAction =
+      WhiskAction(namespace, EntityName("attachment_unique"), exec)
+
+    val i1 = WhiskAction.put(datastore, javaAction, old = None).futureValue
+    val action2 = datastore.get[WhiskAction](i1, attachmentHandler).futureValue
+
+    //Change attachment to inline one otherwise WhiskAction would not go for putAndAttach
+    val action2Updated = action2.copy(exec = exec).revision[WhiskAction](i1.rev)
+    val i2 = WhiskAction.put(datastore, action2Updated, old = Some(action2)).futureValue
+    val action3 = datastore.get[WhiskAction](i2, attachmentHandler).futureValue
+
+    docsToDelete += ((datastore, i2))
+
+    attached(action2).attachmentName should not be attached(action3).attachmentName
+
+    //Check that attachment name is actually a uri
+    val attachmentUri = Uri(attached(action2).attachmentName)
+    attachmentUri.isAbsolute shouldBe true
+  }
+
+  it should "put and read same attachment" in {
+    implicit val tid: TransactionId = transid()
+    val size = 4000
+    val bytes = randomBytes(size)
+    val base64 = Base64.getEncoder.encodeToString(bytes)
+
+    val exec = javaDefault(base64, Some("hello"))
+    val javaAction =
+      WhiskAction(namespace, EntityName("attachment_unique"), exec)
+
+    val i1 = WhiskAction.put(datastore, javaAction, old = None).futureValue
+    val action2 = datastore.get[WhiskAction](i1, attachmentHandler).futureValue
+    val action3 = WhiskAction.get(datastore, i1.id, i1.rev).futureValue
+
+    docsToDelete += ((datastore, i1))
+
+    attached(action2).attachmentType shouldBe ExecManifest.runtimesManifest
+      .resolveDefaultRuntime(JAVA_DEFAULT)
+      .get
+      .attached
+      .get
+      .attachmentType
+    attached(action2).length shouldBe Some(size)
+    attached(action2).digest should not be empty
+
+    action3.exec shouldBe exec
+    inlined(action3).value shouldBe base64
+  }
+
+  private def attached(a: WhiskAction): Attached =
+    a.exec.asInstanceOf[CodeExec[Attachment[Nothing]]].code.asInstanceOf[Attached]
+
+  private def inlined(a: WhiskAction): Inline[String] =
+    a.exec.asInstanceOf[CodeExec[Attachment[String]]].code.asInstanceOf[Inline[String]]
+
+  private def randomBytes(size: Int): Array[Byte] = {
+    val arr = new Array[Byte](size)
+    Random.nextBytes(arr)
+    arr
+  }
+}
diff --git a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
index fbcd1cf..9e2f875 100644
--- a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
+++ b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
@@ -38,10 +38,10 @@ import whisk.core.database.memory.MemoryArtifactStore
 import whisk.core.entity._
 import whisk.core.entity.types.AuthStore
 import whisk.core.entity.types.EntityStore
-
 import akka.http.scaladsl.model.ContentType
 import akka.stream.scaladsl.Source
 import akka.util.ByteString
+import whisk.core.entity.Attachments.Attached
 
 /**
  * WARNING: the put/get/del operations in this trait operate directly on the datastore,
@@ -204,15 +204,16 @@ trait DbUtils {
     doc
   }
 
-  def attach[A, Au >: A](
+  def putAndAttach[A <: DocumentRevisionProvider, Au >: A](
     db: ArtifactStore[Au],
-    doc: DocInfo,
-    name: String,
+    doc: A,
+    update: (A, Attached) => A,
     contentType: ContentType,
     docStream: Source[ByteString, _],
+    oldAttachment: Option[Attached],
     garbageCollect: Boolean = true)(implicit transid: TransactionId, timeout: Duration = 10 seconds): DocInfo = {
-    val docFuture = db.attach(doc, name, contentType, docStream)
-    val newDoc = Await.result(docFuture, timeout)
+    val docFuture = db.putAndAttach[A](doc, update, contentType, docStream, oldAttachment)
+    val newDoc = Await.result(docFuture, timeout)._1
     assert(newDoc != null)
     if (garbageCollect) docsToDelete += ((db, newDoc))
     newDoc
diff --git a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
new file mode 100644
index 0000000..62ae000
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.test.behavior
+
+import java.util.Base64
+
+import akka.http.scaladsl.model.Uri
+import whisk.common.TransactionId
+import whisk.core.database.CacheChangeNotification
+import whisk.core.entity.Attachments.{Attached, Attachment, Inline}
+import whisk.core.entity.test.ExecHelpers
+import whisk.core.entity.{CodeExec, EntityName, ExecManifest, WhiskAction}
+
+import scala.util.Random
+
+trait ArtifactStoreAttachmentBehaviors extends ArtifactStoreBehaviorBase with ExecHelpers {
+  behavior of "Attachments"
+
+  private val namespace = newNS()
+  private val attachmentHandler = Some(WhiskAction.attachmentHandler _)
+  private implicit val cacheUpdateNotifier: Option[CacheChangeNotification] = None
+
+  it should "generate different attachment name on update" in {
+    implicit val tid: TransactionId = transid()
+    val exec = javaDefault("ZHViZWU=", Some("hello"))
+    val javaAction =
+      WhiskAction(namespace, EntityName("attachment_unique"), exec)
+
+    val i1 = WhiskAction.put(entityStore, javaAction, old = None).futureValue
+    val action2 = entityStore.get[WhiskAction](i1, attachmentHandler).futureValue
+
+    //Change attachment to inline one otherwise WhiskAction would not go for putAndAttach
+    val action2Updated = action2.copy(exec = exec).revision[WhiskAction](i1.rev)
+    val i2 = WhiskAction.put(entityStore, action2Updated, old = Some(action2)).futureValue
+    val action3 = entityStore.get[WhiskAction](i2, attachmentHandler).futureValue
+
+    docsToDelete += ((entityStore, i2))
+
+    attached(action2).attachmentName should not be attached(action3).attachmentName
+
+    //Check that attachment name is actually a uri
+    val attachmentUri = Uri(attached(action2).attachmentName)
+    attachmentUri.isAbsolute shouldBe true
+  }
+
+  it should "put and read same attachment" in {
+    implicit val tid: TransactionId = transid()
+    val size = 4000
+    val bytes = randomBytes(size)
+    val base64 = Base64.getEncoder.encodeToString(bytes)
+
+    val exec = javaDefault(base64, Some("hello"))
+    val javaAction =
+      WhiskAction(namespace, EntityName("attachment_unique"), exec)
+
+    val i1 = WhiskAction.put(entityStore, javaAction, old = None).futureValue
+    val action2 = entityStore.get[WhiskAction](i1, attachmentHandler).futureValue
+    val action3 = WhiskAction.get(entityStore, i1.id, i1.rev).futureValue
+
+    docsToDelete += ((entityStore, i1))
+
+    attached(action2).attachmentType shouldBe ExecManifest.runtimesManifest
+      .resolveDefaultRuntime(JAVA_DEFAULT)
+      .get
+      .attached
+      .get
+      .attachmentType
+    attached(action2).length shouldBe Some(size)
+    attached(action2).digest should not be empty
+
+    action3.exec shouldBe exec
+    inlined(action3).value shouldBe base64
+  }
+
+  private def attached(a: WhiskAction): Attached =
+    a.exec.asInstanceOf[CodeExec[Attachment[Nothing]]].code.asInstanceOf[Attached]
+
+  private def inlined(a: WhiskAction): Inline[String] =
+    a.exec.asInstanceOf[CodeExec[Attachment[String]]].code.asInstanceOf[Inline[String]]
+
+  private def randomBytes(size: Int): Array[Byte] = {
+    val arr = new Array[Byte](size)
+    Random.nextBytes(arr)
+    arr
+  }
+}
diff --git a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreBehavior.scala b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreBehavior.scala
index e196494..add479e 100644
--- a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreBehavior.scala
+++ b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreBehavior.scala
@@ -24,3 +24,4 @@ trait ArtifactStoreBehavior
     with ArtifactStoreSubjectQueryBehaviors
     with ArtifactStoreWhisksQueryBehaviors
     with ArtifactStoreActivationsQueryBehaviors
+    with ArtifactStoreAttachmentBehaviors
diff --git a/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala b/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala
index 4741ef3..680b9e6 100644
--- a/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala
+++ b/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala
@@ -352,7 +352,7 @@ class DatastoreTests
       assert(false)
     }
     intercept[IllegalArgumentException] {
-      Await.result(WhiskAction.put(null, null), dbOpTimeout)
+      Await.result(WhiskAction.put(null, null, None), dbOpTimeout)
       assert(false)
     }
   }

-- 
To stop receiving notification emails like this one, please contact
rabbah@apache.org.