You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/06/14 05:05:10 UTC

[GitHub] chetanmeh closed pull request #3453: Introduce a AttachmentStore SPI

chetanmeh closed pull request #3453: Introduce a AttachmentStore SPI
URL: https://github.com/apache/incubator-openwhisk/pull/3453
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala
index 0e21fe3e26..a82e56b4a7 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -312,5 +312,7 @@ object LoggingMarkers {
   val DATABASE_QUERY = LogMarkerToken(database, "queryView", start)
   val DATABASE_ATT_GET = LogMarkerToken(database, "getDocumentAttachment", start)
   val DATABASE_ATT_SAVE = LogMarkerToken(database, "saveDocumentAttachment", start)
+  val DATABASE_ATT_DELETE = LogMarkerToken(database, "deleteDocumentAttachment", start)
+  val DATABASE_ATTS_DELETE = LogMarkerToken(database, "deleteDocumentAttachments", start)
   val DATABASE_BATCH_SIZE = LogMarkerToken(database, "batchSize", count)
 }
diff --git a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala b/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala
index fdcc306690..32d6150141 100644
--- a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala
+++ b/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala
@@ -19,9 +19,10 @@ package whisk.core.database
 
 import akka.actor.ActorSystem
 import akka.stream.ActorMaterializer
+import com.typesafe.config.ConfigFactory
 import spray.json.RootJsonFormat
 import whisk.common.Logging
-import whisk.spi.Spi
+import whisk.spi.{Spi, SpiLoader}
 import whisk.core.entity.DocumentReader
 
 import scala.reflect.ClassTag
@@ -36,4 +37,15 @@ trait ArtifactStoreProvider extends Spi {
     actorSystem: ActorSystem,
     logging: Logging,
     materializer: ActorMaterializer): ArtifactStore[D]
+
+  protected def getAttachmentStore[D <: DocumentSerializer: ClassTag]()(
+    implicit actorSystem: ActorSystem,
+    logging: Logging,
+    materializer: ActorMaterializer): Option[AttachmentStore] = {
+    if (ConfigFactory.load().hasPath("whisk.spi.AttachmentStoreProvider")) {
+      Some(SpiLoader.get[AttachmentStoreProvider].makeStore[D]())
+    } else {
+      None
+    }
+  }
 }
diff --git a/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala b/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala
index 14eb192aa1..2369b1bf26 100644
--- a/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala
+++ b/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala
@@ -17,7 +17,6 @@
 
 package whisk.core.database
 
-import java.security.MessageDigest
 import java.util.Base64
 
 import akka.NotUsed
@@ -46,8 +45,6 @@ case class InliningConfig(maxInlineSize: ByteSize, chunkSize: ByteSize)
  * name itself.
  */
 trait AttachmentInliner {
-  private val digestAlgo = "SHA-256"
-  private val encodedAlgoName = digestAlgo.toLowerCase.replaceAllLiterally("-", "")
 
   /** Materializer required for stream processing */
   protected[core] implicit val materializer: Materializer
@@ -94,10 +91,9 @@ trait AttachmentInliner {
   protected[database] def isInlined(uri: Uri): Boolean = uri.scheme == MemScheme
 
   protected[database] def digest(bytes: TraversableOnce[Byte]): String = {
-    val digester = MessageDigest.getInstance(digestAlgo)
+    val digester = StoreUtils.emptyDigest()
     digester.update(bytes.toArray)
-    val digest = digester.digest().map("%02x".format(_)).mkString
-    s"$encodedAlgoName-$digest"
+    StoreUtils.encodeDigest(digester.digest())
   }
 
   /**
diff --git a/common/scala/src/main/scala/whisk/core/database/AttachmentStore.scala b/common/scala/src/main/scala/whisk/core/database/AttachmentStore.scala
new file mode 100644
index 0000000000..e606f1db7a
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/database/AttachmentStore.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package whisk.core.database
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.ContentType
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.{Sink, Source}
+import akka.util.ByteString
+import whisk.common.{Logging, TransactionId}
+import whisk.core.entity.DocId
+import whisk.spi.Spi
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.reflect.ClassTag
+
+trait AttachmentStoreProvider extends Spi {
+  def makeStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
+                                                     logging: Logging,
+                                                     materializer: ActorMaterializer): AttachmentStore
+}
+
+case class AttachResult(digest: String, length: Long)
+
+trait AttachmentStore {
+
+  /** Identifies the store type */
+  protected[core] def scheme: String
+
+  /** Execution context for futures */
+  protected[core] implicit val executionContext: ExecutionContext
+
+  /**
+   * Attaches a "file" of type `contentType` to an existing document.
+   */
+  protected[core] def attach(doc: DocId, name: String, contentType: ContentType, docStream: Source[ByteString, _])(
+    implicit transid: TransactionId): Future[AttachResult]
+
+  /**
+   * Retrieves a saved attachment, streaming it into the provided Sink.
+   */
+  protected[core] def readAttachment[T](doc: DocId, name: String, sink: Sink[ByteString, Future[T]])(
+    implicit transid: TransactionId): Future[T]
+
+  /**
+   * Deletes all attachments linked to given document
+   */
+  protected[core] def deleteAttachments(doc: DocId)(implicit transid: TransactionId): Future[Boolean]
+
+  /**
+   * Deletes specific attachment.
+   */
+  protected[core] def deleteAttachment(doc: DocId, name: String)(implicit transid: TransactionId): Future[Boolean]
+
+  /** Shut it down. After this invocation, every other call is invalid. */
+  def shutdown(): Unit
+}
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
index d0f13c9958..beb72cfaa4 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
@@ -27,7 +27,7 @@ import spray.json._
 import whisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
 import whisk.core.database.StoreUtils._
 import whisk.core.entity.Attachments.Attached
-import whisk.core.entity.{BulkEntityResult, DocInfo, DocumentReader, UUID}
+import whisk.core.entity.{BulkEntityResult, DocId, DocInfo, DocumentReader, UUID}
 import whisk.http.Messages
 
 import scala.concurrent.duration._
@@ -52,7 +52,8 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
                                                                   dbPassword: String,
                                                                   dbName: String,
                                                                   useBatching: Boolean = false,
-                                                                  val inliningConfig: InliningConfig)(
+                                                                  val inliningConfig: InliningConfig,
+                                                                  val attachmentStore: Option[AttachmentStore])(
   implicit system: ActorSystem,
   val logging: Logging,
   jsonFormat: RootJsonFormat[DocumentAbstraction],
@@ -64,7 +65,8 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
 
   protected[core] implicit val executionContext = system.dispatcher
 
-  val attachmentScheme: String = "couch"
+  private val couchScheme = "couch"
+  val attachmentScheme: String = attachmentStore.map(_.scheme).getOrElse(couchScheme)
 
   private val client: CouchDbRestClient =
     new CouchDbRestClient(dbProtocol, dbHost, dbPort.toInt, dbUsername, dbPassword, dbName)
@@ -351,12 +353,25 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
   }
 
   override protected[database] def putAndAttach[A <: DocumentAbstraction](
-    d: A,
+    doc: A,
     update: (A, Attached) => A,
     contentType: ContentType,
     docStream: Source[ByteString, _],
     oldAttachment: Option[Attached])(implicit transid: TransactionId): Future[(DocInfo, Attached)] = {
 
+    attachmentStore match {
+      case Some(_) =>
+        attachToExternalStore(doc, update, contentType, docStream, oldAttachment)
+      case None =>
+        attachToCouch(doc, update, contentType, docStream)
+    }
+  }
+
+  private def attachToCouch[A <: DocumentAbstraction](
+    doc: A,
+    update: (A, Attached) => A,
+    contentType: ContentType,
+    docStream: Source[ByteString, _])(implicit transid: TransactionId) = {
     for {
       (bytes, tailSource) <- inlineAndTail(docStream)
       uri <- Future.successful(uriOf(bytes, UUID().asString))
@@ -368,13 +383,56 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
         }
         Future.successful(a)
       }
-      i1 <- put(update(d, attached))
-      i2 <- if (isInlined(uri)) { Future.successful(i1) } else {
+      i1 <- put(update(doc, attached))
+      i2 <- if (isInlined(uri)) {
+        Future.successful(i1)
+      } else {
         attach(i1, uri.path.toString, attached.attachmentType, combinedSource(bytes, tailSource))
       }
     } yield (i2, attached)
   }
 
+  private def attachToExternalStore[A <: DocumentAbstraction](
+    doc: A,
+    update: (A, Attached) => A,
+    contentType: ContentType,
+    docStream: Source[ByteString, _],
+    oldAttachment: Option[Attached])(implicit transid: TransactionId) = {
+    val as = attachmentStore.get
+    val asJson = doc.toDocumentRecord
+    val id = asJson.fields("_id").convertTo[String].trim
+
+    for {
+      (bytes, tailSource) <- inlineAndTail(docStream)
+      uri <- Future.successful(uriOf(bytes, UUID().asString))
+      attached <- {
+        // Upload if cannot be inlined
+        if (isInlined(uri)) {
+          val a = Attached(uri.toString, contentType, Some(bytes.size), Some(digest(bytes)))
+          Future.successful(a)
+        } else {
+          as.attach(DocId(id), uri.path.toString, contentType, combinedSource(bytes, tailSource))
+            .map { r =>
+              Attached(uri.toString, contentType, Some(r.length), Some(r.digest))
+            }
+        }
+      }
+      i1 <- put(update(doc, attached))
+
+      //Remove old attachment if it was part of attachmentStore
+      _ <- oldAttachment
+        .map { old =>
+          val oldUri = Uri(old.attachmentName)
+          if (oldUri.scheme == as.scheme) {
+            as.deleteAttachment(DocId(id), oldUri.path.toString)
+          } else {
+            Future.successful(true)
+          }
+        }
+        .getOrElse(Future.successful(true))
+    } yield (i1, attached)
+  }
+
   private def attach(doc: DocInfo, name: String, contentType: ContentType, docStream: Source[ByteString, _])(
     implicit transid: TransactionId): Future[DocInfo] = {
 
@@ -421,8 +479,26 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
 
   override protected[core] def readAttachment[T](doc: DocInfo, attached: Attached, sink: Sink[ByteString, Future[T]])(
     implicit transid: TransactionId): Future[T] = {
-
     val name = attached.attachmentName
+    val attachmentUri = Uri(name)
+    attachmentUri.scheme match {
+      case AttachmentInliner.MemScheme =>
+        memorySource(attachmentUri).runWith(sink)
+      case s if s == couchScheme || attachmentUri.isRelative =>
+        //relative case is for compatibility with earlier naming approach where attachment name would be like 'jarfile'
+        //Compared to current approach of '<scheme>:<name>'
+        readAttachmentFromCouch(doc, attachmentUri, sink)
+      case s if attachmentStore.isDefined && attachmentStore.get.scheme == s =>
+        attachmentStore.get.readAttachment(doc.id, attachmentUri.path.toString, sink)
+      case _ =>
+        throw new IllegalArgumentException(s"Unknown attachment scheme in attachment uri $attachmentUri")
+    }
+  }
+
+  private def readAttachmentFromCouch[T](doc: DocInfo, attachmentUri: Uri, sink: Sink[ByteString, Future[T]])(
+    implicit transid: TransactionId): Future[T] = {
+
+    val name = attachmentUri.path
     val start = transid.started(
       this,
       LoggingMarkers.DATABASE_ATT_GET,
@@ -431,31 +507,28 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
     require(doc != null, "doc undefined")
     require(doc.rev.rev != null, "doc revision must be specified")
 
-    val attachmentUri = Uri(name)
-    val g = if (isInlined(attachmentUri)) {
-      memorySource(attachmentUri).runWith(sink)
-    } else {
-      val f = client.getAttachment[T](doc.id.id, doc.rev.rev, attachmentUri.path.toString, sink)
-      f.map {
-        case Right((_, result)) =>
-          transid.finished(this, start, s"[ATT_GET] '$dbName' completed: found attachment '$name' of document '$doc'")
-          result
-
-        case Left(StatusCodes.NotFound) =>
-          transid.finished(
-            this,
-            start,
-            s"[ATT_GET] '$dbName', retrieving attachment '$name' of document '$doc'; not found.")
-          throw NoDocumentException("Not found on 'readAttachment'.")
-
-        case Left(code) =>
-          transid.failed(
-            this,
-            start,
-            s"[ATT_GET] '$dbName' failed to get attachment '$name' of document '$doc'; http status: '${code}'")
-          throw new Exception("Unexpected http response code: " + code)
-      }
-    }
+    val g =
+      client
+        .getAttachment[T](doc.id.id, doc.rev.rev, attachmentUri.path.toString, sink)
+        .map {
+          case Right((_, result)) =>
+            transid.finished(this, start, s"[ATT_GET] '$dbName' completed: found attachment '$name' of document '$doc'")
+            result
+
+          case Left(StatusCodes.NotFound) =>
+            transid.finished(
+              this,
+              start,
+              s"[ATT_GET] '$dbName', retrieving attachment '$name' of document '$doc'; not found.")
+            throw NoDocumentException("Not found on 'readAttachment'.")
+
+          case Left(code) =>
+            transid.failed(
+              this,
+              start,
+              s"[ATT_GET] '$dbName' failed to get attachment '$name' of document '$doc'; http status: '$code'")
+            throw new Exception("Unexpected http response code: " + code)
+        }
 
     reportFailure(
       g,
@@ -468,12 +541,13 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
   }
 
   override protected[core] def deleteAttachments[T](doc: DocInfo)(implicit transid: TransactionId): Future[Boolean] =
-    // NOTE: this method is not intended for standalone use for CouchDB.
-    // To delete attachments, it is expected that the entire document is deleted.
-    Future.successful(true)
+    attachmentStore
+      .map(as => as.deleteAttachments(doc.id))
+      .getOrElse(Future.successful(true)) // For CouchDB it is expected that the entire document is deleted.
 
   override def shutdown(): Unit = {
     Await.ready(client.shutdown(), 1.minute)
+    attachmentStore.foreach(_.shutdown())
   }
 
   private def processAttachments[A <: DocumentAbstraction](doc: A,
@@ -492,7 +566,7 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
               }
               attachmentHandler(
                 doc,
-                Attached(getAttachmentName(name), contentType, Some(length.intValue()), Some(digest)))
+                Attached(getAttachmentName(name), contentType, Some(length.longValue()), Some(digest)))
             case x =>
               throw DeserializationException("Attachment json does not have required fields" + x)
 
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala
index df6a374a56..1fc11a1de9 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala
@@ -53,6 +53,14 @@ object CouchDbStoreProvider extends ArtifactStoreProvider {
     docReader: DocumentReader,
     actorSystem: ActorSystem,
     logging: Logging,
+    materializer: ActorMaterializer): ArtifactStore[D] = makeArtifactStore(useBatching, getAttachmentStore())
+
+  def makeArtifactStore[D <: DocumentSerializer: ClassTag](useBatching: Boolean,
+                                                           attachmentStore: Option[AttachmentStore])(
+    implicit jsonFormat: RootJsonFormat[D],
+    docReader: DocumentReader,
+    actorSystem: ActorSystem,
+    logging: Logging,
     materializer: ActorMaterializer): ArtifactStore[D] = {
     val dbConfig = loadConfigOrThrow[CouchDbConfig](ConfigKeys.couchdb)
     require(
@@ -69,6 +77,7 @@ object CouchDbStoreProvider extends ArtifactStoreProvider {
       dbConfig.password,
       dbConfig.databaseFor[D],
       useBatching,
-      inliningConfig)
+      inliningConfig,
+      attachmentStore)
   }
 }
diff --git a/common/scala/src/main/scala/whisk/core/database/StoreUtils.scala b/common/scala/src/main/scala/whisk/core/database/StoreUtils.scala
index cff9dd9f37..f6351d1326 100644
--- a/common/scala/src/main/scala/whisk/core/database/StoreUtils.scala
+++ b/common/scala/src/main/scala/whisk/core/database/StoreUtils.scala
@@ -17,15 +17,22 @@
 
 package whisk.core.database
 
+import java.security.MessageDigest
+
 import akka.event.Logging.ErrorLevel
+import akka.stream.SinkShape
+import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, Sink}
+import akka.util.ByteString
+import spray.json.DefaultJsonProtocol._
 import spray.json.{JsObject, RootJsonFormat}
 import whisk.common.{Logging, StartMarker, TransactionId}
-import spray.json.DefaultJsonProtocol._
 import whisk.core.entity.{DocInfo, DocRevision, DocumentReader, WhiskDocument}
 
 import scala.concurrent.{ExecutionContext, Future}
 
 private[database] object StoreUtils {
+  private val digestAlgo = "SHA-256"
+  private val encodedAlgoName = digestAlgo.toLowerCase.replaceAllLiterally("-", "")
 
   def reportFailure[T](f: Future[T], start: StartMarker, failureMessage: Throwable => String)(
     implicit transid: TransactionId,
@@ -65,4 +72,49 @@ private[database] object StoreUtils {
     // FIXME remove mutability from appropriate classes now that it is no longer required by GSON.
     deserialized.asInstanceOf[WhiskDocument].revision(DocRevision(responseRev))
   }
+
+  def combinedSink[T](dest: Sink[ByteString, Future[T]])(
+    implicit ec: ExecutionContext): Sink[ByteString, Future[AttachmentUploadResult[T]]] = {
+    Sink.fromGraph(GraphDSL.create(digestSink(), lengthSink(), dest)(combineResult) {
+      implicit builder => (dgs, ls, dests) =>
+        import GraphDSL.Implicits._
+
+        val bcast = builder.add(Broadcast[ByteString](3))
+
+        bcast ~> dgs.in
+        bcast ~> ls.in
+        bcast ~> dests.in
+
+        SinkShape(bcast.in)
+    })
+  }
+
+  def emptyDigest(): MessageDigest = MessageDigest.getInstance(digestAlgo)
+
+  def encodeDigest(bytes: Array[Byte]): String = {
+    val digest = bytes.map("%02x".format(_)).mkString
+    s"$encodedAlgoName-$digest"
+  }
+
+  private def combineResult[T](digest: Future[String], length: Future[Long], upload: Future[T])(
+    implicit ec: ExecutionContext) = {
+    for {
+      d <- digest
+      l <- length
+      u <- upload
+    } yield AttachmentUploadResult(d, l, u)
+  }
+
+  case class AttachmentUploadResult[T](digest: String, length: Long, uploadResult: T)
+
+  private def digestSink(): Sink[ByteString, Future[String]] = {
+    Flow[ByteString]
+      .fold(emptyDigest())((digest, bytes) => { digest.update(bytes.toArray); digest })
+      .map(md => encodeDigest(md.digest()))
+      .toMat(Sink.head)(Keep.right)
+  }
+
+  private def lengthSink(): Sink[ByteString, Future[Long]] = {
+    Sink.fold[Long, ByteString](0)((length, bytes) => length + bytes.size)
+  }
 }
diff --git a/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala b/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
index 75973ffe08..140b854810 100644
--- a/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
@@ -20,8 +20,8 @@ package whisk.core.database.memory
 import akka.actor.ActorSystem
 import akka.http.scaladsl.model.{ContentType, Uri}
 import akka.stream.ActorMaterializer
-import akka.stream.scaladsl.{Keep, Sink, Source}
-import akka.util.{ByteString, ByteStringBuilder}
+import akka.stream.scaladsl.{Sink, Source}
+import akka.util.ByteString
 import pureconfig.loadConfigOrThrow
 import spray.json.{DefaultJsonProtocol, DeserializationException, JsObject, JsString, RootJsonFormat}
 import whisk.common.{Logging, LoggingMarkers, TransactionId}
@@ -45,11 +45,20 @@ object MemoryArtifactStoreProvider extends ArtifactStoreProvider {
     actorSystem: ActorSystem,
     logging: Logging,
     materializer: ActorMaterializer): ArtifactStore[D] = {
+    makeArtifactStore(MemoryAttachmentStoreProvider.makeStore())
+  }
+
+  def makeArtifactStore[D <: DocumentSerializer: ClassTag](attachmentStore: AttachmentStore)(
+    implicit jsonFormat: RootJsonFormat[D],
+    docReader: DocumentReader,
+    actorSystem: ActorSystem,
+    logging: Logging,
+    materializer: ActorMaterializer): ArtifactStore[D] = {
 
     val classTag = implicitly[ClassTag[D]]
     val (dbName, handler, viewMapper) = handlerAndMapper(classTag)
     val inliningConfig = loadConfigOrThrow[InliningConfig](ConfigKeys.db)
-    new MemoryArtifactStore(dbName, handler, viewMapper, inliningConfig)
+    new MemoryArtifactStore(dbName, handler, viewMapper, inliningConfig, attachmentStore)
   }
 
   private def handlerAndMapper[D](entityType: ClassTag[D])(
@@ -75,7 +84,8 @@ object MemoryArtifactStoreProvider extends ArtifactStoreProvider {
 class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: String,
                                                                      documentHandler: DocumentHandler,
                                                                      viewMapper: MemoryViewMapper,
-                                                                     val inliningConfig: InliningConfig)(
+                                                                     val inliningConfig: InliningConfig,
+                                                                     val attachmentStore: AttachmentStore)(
   implicit system: ActorSystem,
   val logging: Logging,
   jsonFormat: RootJsonFormat[DocumentAbstraction],
@@ -92,7 +102,7 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str
 
   private val _id = "_id"
   private val _rev = "_rev"
-  val attachmentScheme = "mems"
+  val attachmentScheme: String = attachmentStore.scheme
 
   override protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo] = {
     val asJson = d.toDocumentRecord
@@ -248,7 +258,6 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str
 
   override protected[core] def readAttachment[T](doc: DocInfo, attached: Attached, sink: Sink[ByteString, Future[T]])(
     implicit transid: TransactionId): Future[T] = {
-    //TODO Temporary implementation till MemoryAttachmentStore PR is merged
     val name = attached.attachmentName
     val start = transid.started(
       this,
@@ -260,20 +269,17 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str
       memorySource(attachmentUri).runWith(sink)
     } else {
       val storedName = attachmentUri.path.toString()
-      artifacts.get(doc.id.id) match {
-        case Some(a: Artifact) if a.attachments.contains(storedName) =>
-          val attachment = a.attachments(storedName)
-          val r = Source.single(attachment.bytes).toMat(sink)(Keep.right).run
+      val f = attachmentStore.readAttachment(doc.id, storedName, sink)
+      f.onSuccess {
+        case _ =>
           transid.finished(this, start, s"[ATT_GET] '$dbName' completed: found attachment '$name' of document '$doc'")
-          r
-        case None =>
-          Future.failed(NoDocumentException("Not found on 'readAttachment'."))
       }
+      f
     }
   }
 
   override protected[core] def deleteAttachments[T](doc: DocInfo)(implicit transid: TransactionId): Future[Boolean] = {
-    Future.successful(true)
+    attachmentStore.deleteAttachments(doc.id)
   }
 
   override protected[database] def putAndAttach[A <: DocumentAbstraction](
@@ -283,56 +289,42 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str
     docStream: Source[ByteString, _],
     oldAttachment: Option[Attached])(implicit transid: TransactionId): Future[(DocInfo, Attached)] = {
 
+    val asJson = d.toDocumentRecord
+    val id = asJson.fields(_id).convertTo[String].trim
     //Inlined attachment with Memory storage is not required. However to validate the constructs
     //inlined support is implemented
     for {
-      allBytes <- toByteString(docStream)
-      (bytes, tailSource) <- inlineAndTail(Source.single(allBytes))
+      (bytes, tailSource) <- inlineAndTail(docStream)
       uri <- Future.successful(uriOf(bytes, UUID().asString))
       attached <- {
-        val a = if (isInlined(uri)) {
-          Attached(uri.toString(), contentType, Some(bytes.size), Some(digest(bytes)))
+        if (isInlined(uri)) {
+          val a = Attached(uri.toString, contentType, Some(bytes.size), Some(digest(bytes)))
+          Future.successful(a)
         } else {
-          Attached(uri.toString(), contentType, Some(allBytes.size), Some(digest(allBytes)))
+          attachmentStore
+            .attach(DocId(id), uri.path.toString, contentType, combinedSource(bytes, tailSource))
+            .map { r =>
+              Attached(uri.toString, contentType, Some(r.length), Some(r.digest))
+            }
         }
-        Future.successful(a)
       }
       i1 <- put(update(d, attached))
-      i2 <- if (isInlined(uri)) { Future.successful(i1) } else {
-        attach(i1, uri.path.toString(), attached.attachmentType, toByteString(combinedSource(bytes, tailSource)))
-      }
-    } yield (i2, attached)
-  }
-
-  private def attach(doc: DocInfo, name: String, contentType: ContentType, bytes: Future[ByteString])(
-    implicit transid: TransactionId): Future[DocInfo] = {
-
-    val start = transid.started(
-      this,
-      LoggingMarkers.DATABASE_ATT_SAVE,
-      s"[ATT_PUT] '$dbName' uploading attachment '$name' of document '$doc'")
-
-    //TODO Temporary implementation till MemoryAttachmentStore PR is merged
-    bytes.map { b =>
-      artifacts.get(doc.id.id) match {
-        case Some(a) =>
-          val existing = Artifact(doc, a.doc, a.computed)
-          val updated = existing.attach(name, Attachment(b, contentType))
-          if (artifacts.replace(doc.id.id, existing, updated)) {
-            transid
-              .finished(this, start, s"[ATT_PUT] '$dbName' completed uploading attachment '$name' of document '$doc'")
-            updated.docInfo
+      _ <- oldAttachment
+        .map { old =>
+          val oldUri = Uri(old.attachmentName)
+          if (oldUri.scheme == attachmentStore.scheme) {
+            attachmentStore.deleteAttachment(DocId(id), oldUri.path.toString)
           } else {
-            throw DocumentConflictException("conflict on 'put'")
+            Future.successful(true)
           }
-        case None =>
-          throw DocumentConflictException("conflict on 'put'")
-      }
-    }
+        }
+        .getOrElse(Future.successful(true))
+    } yield (i1, attached)
   }
 
   override def shutdown(): Unit = {
     artifacts.clear()
+    attachmentStore.shutdown()
   }
 
   override protected[database] def get(id: DocId)(implicit transid: TransactionId): Future[Option[JsObject]] = {
@@ -354,9 +346,6 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str
     reportFailure(f, start, failure => s"[GET] '$dbName' internal error, doc: '$id', failure: '${failure.getMessage}'")
   }
 
-  private def toByteString(docStream: Source[Traversable[Byte], _]) =
-    docStream.runFold(new ByteStringBuilder)((builder, b) => builder ++= b).map(_.result().compact)
-
   private def getRevision(asJson: JsObject) = {
     asJson.fields.get(_rev) match {
       case Some(JsString(r)) => r.toInt
@@ -367,21 +356,14 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str
   //Use curried case class to allow equals support only for id and rev
   //This allows us to implement atomic replace and remove which check
   //for id,rev equality only
-  private case class Artifact(id: String, rev: Int)(val doc: JsObject,
-                                                    val computed: JsObject,
-                                                    val attachments: Map[String, Attachment] = Map.empty) {
+  private case class Artifact(id: String, rev: Int)(val doc: JsObject, val computed: JsObject) {
     def incrementRev(): Artifact = {
       val (newRev, updatedDoc) = incrementAndGet()
-      copy(rev = newRev)(updatedDoc, computed, Map.empty) //With Couch attachments are lost post update
+      copy(rev = newRev)(updatedDoc, computed) //With Couch attachments are lost post update
     }
 
     def docInfo = DocInfo(DocId(id), DocRevision(rev.toString))
 
-    def attach(name: String, attachment: Attachment): Artifact = {
-      val (newRev, updatedDoc) = incrementAndGet()
-      copy(rev = newRev)(updatedDoc, computed, attachments + (name -> attachment))
-    }
-
     private def incrementAndGet() = {
       val newRev = rev + 1
       val updatedDoc = JsObject(doc.fields + (_rev -> JsString(newRev.toString)))
@@ -389,8 +371,6 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str
     }
   }
 
-  private case class Attachment(bytes: ByteString, contentType: ContentType)
-
   private object Artifact {
     def apply(id: String, rev: Int, doc: JsObject): Artifact = {
       Artifact(id, rev)(doc, documentHandler.computedFields(doc))
@@ -399,10 +379,5 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str
     def apply(info: DocInfo): Artifact = {
       Artifact(info.id.id, info.rev.rev.toInt)(JsObject.empty, JsObject.empty)
     }
-
-    def apply(info: DocInfo, doc: JsObject, c: JsObject): Artifact = {
-      Artifact(info.id.id, info.rev.rev.toInt)(doc, c)
-    }
   }
-
 }
diff --git a/common/scala/src/main/scala/whisk/core/database/memory/MemoryAttachmentStore.scala b/common/scala/src/main/scala/whisk/core/database/memory/MemoryAttachmentStore.scala
new file mode 100644
index 0000000000..face871081
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/database/memory/MemoryAttachmentStore.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.memory
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.ContentType
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.{Keep, Sink, Source}
+import akka.util.{ByteString, ByteStringBuilder}
+import whisk.common.LoggingMarkers.{DATABASE_ATTS_DELETE, DATABASE_ATT_DELETE, DATABASE_ATT_GET, DATABASE_ATT_SAVE}
+import whisk.common.{Logging, TransactionId}
+import whisk.core.database.StoreUtils._
+import whisk.core.database._
+import whisk.core.entity.DocId
+
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.{ExecutionContext, Future}
+import scala.reflect.ClassTag
+
+object MemoryAttachmentStoreProvider extends AttachmentStoreProvider {
+  override def makeStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
+                                                              logging: Logging,
+                                                              materializer: ActorMaterializer): AttachmentStore =
+    new MemoryAttachmentStore(implicitly[ClassTag[D]].runtimeClass.getSimpleName.toLowerCase)
+}
+
+/**
+ * Basic in-memory AttachmentStore implementation. Useful for testing.
+ */
+class MemoryAttachmentStore(dbName: String)(implicit system: ActorSystem,
+                                            logging: Logging,
+                                            materializer: ActorMaterializer)
+    extends AttachmentStore {
+
+  override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher
+
+  private case class Attachment(bytes: ByteString)
+
+  private val attachments = new TrieMap[String, Attachment]
+  private var closed = false
+
+  override val scheme = "mems"
+
+  override protected[core] def attach(
+    docId: DocId,
+    name: String,
+    contentType: ContentType,
+    docStream: Source[ByteString, _])(implicit transid: TransactionId): Future[AttachResult] = {
+    require(name != null, "name undefined")
+    val start = transid.started(this, DATABASE_ATT_SAVE, s"[ATT_PUT] uploading attachment '$name' of document '$docId'")
+
+    val uploadSink = Sink.fold[ByteStringBuilder, ByteString](new ByteStringBuilder)((builder, b) => builder ++= b)
+
+    val f = docStream.runWith(combinedSink(uploadSink))
+
+    val g = f.map { r =>
+      attachments += (attachmentKey(docId, name) -> Attachment(r.uploadResult.result().compact))
+      transid
+        .finished(this, start, s"[ATT_PUT] '$dbName' completed uploading attachment '$name' of document '$docId'")
+      AttachResult(r.digest, r.length)
+    }
+
+    reportFailure(
+      g,
+      start,
+      failure => s"[ATT_PUT] '$dbName' internal error, name: '$name', doc: '$docId', failure: '${failure.getMessage}'")
+  }
+
+  /**
+   * Retrieves a saved attachment, streaming it into the provided Sink.
+   */
+  override protected[core] def readAttachment[T](docId: DocId, name: String, sink: Sink[ByteString, Future[T]])(
+    implicit transid: TransactionId): Future[T] = {
+
+    val start =
+      transid.started(this, DATABASE_ATT_GET, s"[ATT_GET] '$dbName' finding attachment '$name' of document '$docId'")
+
+    val f = attachments.get(attachmentKey(docId, name)) match {
+      case Some(Attachment(bytes)) =>
+        val r = Source.single(bytes).toMat(sink)(Keep.right).run
+        r.map(t => {
+          transid.finished(this, start, s"[ATT_GET] '$dbName' completed: found attachment '$name' of document '$docId'")
+          t
+        })
+      case None =>
+        transid.finished(
+          this,
+          start,
+          s"[ATT_GET] '$dbName', retrieving attachment '$name' of document '$docId'; not found.")
+        Future.failed(NoDocumentException("Not found on 'readAttachment'."))
+    }
+    reportFailure(
+      f,
+      start,
+      failure => s"[ATT_GET] '$dbName' internal error, name: '$name', doc: '$docId', failure: '${failure.getMessage}'")
+  }
+
+  override protected[core] def deleteAttachments(docId: DocId)(implicit transid: TransactionId): Future[Boolean] = {
+    val start = transid.started(this, DATABASE_ATTS_DELETE, s"[ATTS_DELETE] uploading attachment of document '$docId'")
+
+    val prefix = docId + "/"
+    attachments --= attachments.keySet.filter(_.startsWith(prefix))
+    transid.finished(this, start, s"[ATTS_DELETE] completed: delete attachment of document '$docId'")
+    Future.successful(true)
+  }
+
+  override protected[core] def deleteAttachment(docId: DocId, name: String)(
+    implicit transid: TransactionId): Future[Boolean] = {
+    val start = transid.started(this, DATABASE_ATT_DELETE, s"[ATT_DELETE] uploading attachment of document '$docId'")
+    attachments.remove(attachmentKey(docId, name))
+    transid.finished(this, start, s"[ATT_DELETE] completed: delete attachment of document '$docId'")
+    Future.successful(true)
+  }
+
+  def attachmentCount: Int = attachments.size
+
+  def isClosed = closed
+
+  override def shutdown(): Unit = {
+    closed = true
+  }
+
+  private def attachmentKey(docId: DocId, name: String) = s"${docId.id}/$name"
+}
diff --git a/common/scala/src/main/scala/whisk/core/entity/Attachments.scala b/common/scala/src/main/scala/whisk/core/entity/Attachments.scala
index 6ec9bae7fc..2d00a03deb 100644
--- a/common/scala/src/main/scala/whisk/core/entity/Attachments.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/Attachments.scala
@@ -17,15 +17,13 @@
 
 package whisk.core.entity
 
-import scala.util.Try
-
 import akka.http.scaladsl.model.ContentType
-
-import spray.json._
 import spray.json.DefaultJsonProtocol._
-
+import spray.json._
 import whisk.core.entity.size._
 
+import scala.util.Try
+
 object Attachments {
 
   /**
@@ -43,7 +41,7 @@ object Attachments {
 
   case class Attached(attachmentName: String,
                       attachmentType: ContentType,
-                      length: Option[Int] = None,
+                      length: Option[Long] = None,
                       digest: Option[String] = None)
       extends Attachment[Nothing]
 
diff --git a/tests/src/test/scala/whisk/core/database/CouchDBArtifactStoreTests.scala b/tests/src/test/scala/whisk/core/database/CouchDBArtifactStoreTests.scala
index a36fd30d38..159ac5b580 100644
--- a/tests/src/test/scala/whisk/core/database/CouchDBArtifactStoreTests.scala
+++ b/tests/src/test/scala/whisk/core/database/CouchDBArtifactStoreTests.scala
@@ -21,30 +21,6 @@ import org.junit.runner.RunWith
 import org.scalatest.FlatSpec
 import org.scalatest.junit.JUnitRunner
 import whisk.core.database.test.behavior.ArtifactStoreBehavior
-import whisk.core.entity._
-
-import scala.reflect.classTag
 
 @RunWith(classOf[JUnitRunner])
-class CouchDBArtifactStoreTests extends FlatSpec with ArtifactStoreBehavior {
-  override def storeType = "CouchDB"
-
-  override val authStore = {
-    implicit val docReader: DocumentReader = WhiskDocumentReader
-    CouchDbStoreProvider.makeStore[WhiskAuth]()
-  }
-
-  override val entityStore =
-    CouchDbStoreProvider.makeStore[WhiskEntity]()(
-      classTag[WhiskEntity],
-      WhiskEntityJsonFormat,
-      WhiskDocumentReader,
-      actorSystem,
-      logging,
-      materializer)
-
-  override val activationStore = {
-    implicit val docReader: DocumentReader = WhiskDocumentReader
-    CouchDbStoreProvider.makeStore[WhiskActivation]()
-  }
-}
+class CouchDBArtifactStoreTests extends FlatSpec with CouchDBStoreBehaviorBase with ArtifactStoreBehavior {}
diff --git a/tests/src/test/scala/whisk/core/database/CouchDBAttachmentStoreTests.scala b/tests/src/test/scala/whisk/core/database/CouchDBAttachmentStoreTests.scala
new file mode 100644
index 0000000000..e0d79867b1
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/CouchDBAttachmentStoreTests.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database
+
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.junit.JUnitRunner
+import whisk.core.database.memory.MemoryAttachmentStoreProvider
+import whisk.core.database.test.behavior.ArtifactStoreAttachmentBehaviors
+
+import scala.reflect.ClassTag
+
+@RunWith(classOf[JUnitRunner])
+class CouchDBAttachmentStoreTests extends FlatSpec with CouchDBStoreBehaviorBase with ArtifactStoreAttachmentBehaviors {
+  override protected def getAttachmentStore[D <: DocumentSerializer: ClassTag]() =
+    Some(MemoryAttachmentStoreProvider.makeStore[D]())
+}
diff --git a/tests/src/test/scala/whisk/core/database/CouchDBStoreBehaviorBase.scala b/tests/src/test/scala/whisk/core/database/CouchDBStoreBehaviorBase.scala
new file mode 100644
index 0000000000..0a13075614
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/CouchDBStoreBehaviorBase.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database
+
+import org.scalatest.FlatSpec
+import whisk.core.database.test.behavior.ArtifactStoreBehaviorBase
+import whisk.core.entity.{
+  DocumentReader,
+  WhiskActivation,
+  WhiskAuth,
+  WhiskDocumentReader,
+  WhiskEntity,
+  WhiskEntityJsonFormat
+}
+
+import scala.reflect.{classTag, ClassTag}
+
+trait CouchDBStoreBehaviorBase extends FlatSpec with ArtifactStoreBehaviorBase {
+  override def storeType = "CouchDB"
+
+  override val authStore = {
+    implicit val docReader: DocumentReader = WhiskDocumentReader
+    CouchDbStoreProvider.makeArtifactStore[WhiskAuth](useBatching = false, getAttachmentStore[WhiskAuth]())
+  }
+
+  override val entityStore =
+    CouchDbStoreProvider.makeArtifactStore[WhiskEntity](useBatching = false, getAttachmentStore[WhiskEntity]())(
+      classTag[WhiskEntity],
+      WhiskEntityJsonFormat,
+      WhiskDocumentReader,
+      actorSystem,
+      logging,
+      materializer)
+
+  override val activationStore = {
+    implicit val docReader: DocumentReader = WhiskDocumentReader
+    CouchDbStoreProvider.makeArtifactStore[WhiskActivation](useBatching = true, getAttachmentStore[WhiskActivation]())
+  }
+
+  override protected def getAttachmentStore(store: ArtifactStore[_]) =
+    store.asInstanceOf[CouchDbRestStore[_]].attachmentStore
+
+  protected def getAttachmentStore[D <: DocumentSerializer: ClassTag](): Option[AttachmentStore] = None
+}
diff --git a/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala b/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala
index 9d4643f874..c47daaacea 100644
--- a/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala
+++ b/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala
@@ -20,6 +20,7 @@ package whisk.core.database.memory
 import org.junit.runner.RunWith
 import org.scalatest.FlatSpec
 import org.scalatest.junit.JUnitRunner
+import whisk.core.database.ArtifactStore
 import whisk.core.database.test.behavior.ArtifactStoreBehavior
 import whisk.core.entity._
 
@@ -47,4 +48,7 @@ class MemoryArtifactStoreTests extends FlatSpec with ArtifactStoreBehavior {
     implicit val docReader: DocumentReader = WhiskDocumentReader
     MemoryArtifactStoreProvider.makeStore[WhiskActivation]()
   }
+
+  override protected def getAttachmentStore(store: ArtifactStore[_]) =
+    Some(store.asInstanceOf[MemoryArtifactStore[_]].attachmentStore)
 }
diff --git a/tests/src/test/scala/whisk/core/database/memory/MemoryAttachmentStoreTests.scala b/tests/src/test/scala/whisk/core/database/memory/MemoryAttachmentStoreTests.scala
new file mode 100644
index 0000000000..9d845bcd63
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/memory/MemoryAttachmentStoreTests.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.memory
+
+import common.WskActorSystem
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.junit.JUnitRunner
+import whisk.core.database.AttachmentStore
+import whisk.core.database.test.AttachmentStoreBehaviors
+import whisk.core.entity.WhiskEntity
+
+@RunWith(classOf[JUnitRunner])
+class MemoryAttachmentStoreTests extends FlatSpec with AttachmentStoreBehaviors with WskActorSystem {
+
+  override val store: AttachmentStore = MemoryAttachmentStoreProvider.makeStore[WhiskEntity]()
+
+  override def storeType: String = "Memory"
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    val count = store.asInstanceOf[MemoryAttachmentStore].attachmentCount
+    require(count == 0, s"AttachmentStore not empty after all runs - $count")
+  }
+}
diff --git a/tests/src/test/scala/whisk/core/database/test/AttachmentCompatibilityTests.scala b/tests/src/test/scala/whisk/core/database/test/AttachmentCompatibilityTests.scala
index 6550cc069e..dc8969f50b 100644
--- a/tests/src/test/scala/whisk/core/database/test/AttachmentCompatibilityTests.scala
+++ b/tests/src/test/scala/whisk/core/database/test/AttachmentCompatibilityTests.scala
@@ -33,7 +33,8 @@ import pureconfig.loadConfigOrThrow
 import spray.json.DefaultJsonProtocol
 import whisk.common.TransactionId
 import whisk.core.ConfigKeys
-import whisk.core.database.{CouchDbConfig, CouchDbRestClient, NoDocumentException}
+import whisk.core.database.memory.MemoryAttachmentStoreProvider
+import whisk.core.database.{CouchDbConfig, CouchDbRestClient, CouchDbStoreProvider, NoDocumentException}
 import whisk.core.entity.Attachments.Inline
 import whisk.core.entity.test.ExecHelpers
 import whisk.core.entity.{
@@ -42,11 +43,14 @@ import whisk.core.entity.{
   EntityName,
   EntityPath,
   WhiskAction,
+  WhiskDocumentReader,
   WhiskEntity,
+  WhiskEntityJsonFormat,
   WhiskEntityStore
 }
 
 import scala.concurrent.Future
+import scala.reflect.classTag
 
 @RunWith(classOf[JUnitRunner])
 class AttachmentCompatibilityTests
@@ -93,6 +97,28 @@ class AttachmentCompatibilityTests
     val doc =
       WhiskAction(namespace, EntityName("attachment_unique"), exec)
 
+    createAction(doc)
+
+    val doc2 = WhiskAction.get(entityStore, doc.docid).futureValue
+    doc2.exec shouldBe exec
+  }
+
+  it should "read attachments created using old scheme with AttachmentStore" in {
+    implicit val tid: TransactionId = transid()
+    val namespace = EntityPath("attachment-compat-test2")
+    val exec = javaDefault("ZHViZWU=", Some("hello"))
+    val doc =
+      WhiskAction(namespace, EntityName("attachment_unique"), exec)
+
+    createAction(doc)
+
+    val entityStore2 = createEntityStore()
+    val doc2 = WhiskAction.get(entityStore2, doc.docid).futureValue
+    doc2.exec shouldBe exec
+  }
+
+  private def createAction(doc: WhiskAction) = {
+    implicit val tid: TransactionId = transid()
     doc.exec match {
       case exec @ CodeExecAsAttachment(_, Inline(code), _) =>
         val attached = exec.manifest.attached.get
@@ -109,9 +135,6 @@ class AttachmentCompatibilityTests
       case _ =>
         fail("Exec must be code attachment")
     }
-
-    val doc2 = WhiskAction.get(entityStore, doc.docid).futureValue
-    doc2.exec shouldBe exec
   }
 
   private def attach(doc: DocInfo,
@@ -131,4 +154,17 @@ class AttachmentCompatibilityTests
         throw new Exception("Unexpected http response code: " + code)
     }
   }
+
+  private def createEntityStore() =
+    CouchDbStoreProvider
+      .makeArtifactStore[WhiskEntity](
+        useBatching = false,
+        Some(MemoryAttachmentStoreProvider.makeStore[WhiskEntity]()))(
+        classTag[WhiskEntity],
+        WhiskEntityJsonFormat,
+        WhiskDocumentReader,
+        actorSystem,
+        logging,
+        materializer)
+
 }
diff --git a/tests/src/test/scala/whisk/core/database/test/AttachmentStoreBehaviors.scala b/tests/src/test/scala/whisk/core/database/test/AttachmentStoreBehaviors.scala
new file mode 100644
index 0000000000..0b5a879650
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/test/AttachmentStoreBehaviors.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.test
+
+import java.io.ByteArrayInputStream
+
+import akka.http.scaladsl.model.ContentTypes
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.{Sink, Source, StreamConverters}
+import akka.util.{ByteString, ByteStringBuilder}
+import common.{StreamLogging, WskActorSystem}
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import whisk.common.TransactionId
+import whisk.core.database.{AttachmentStore, NoDocumentException}
+import whisk.core.entity.DocId
+
+import scala.collection.mutable.ListBuffer
+import scala.concurrent.Await
+import scala.concurrent.duration.DurationInt
+import scala.util.Random
+
+trait AttachmentStoreBehaviors
+    extends ScalaFutures
+    with DbUtils
+    with Matchers
+    with StreamLogging
+    with WskActorSystem
+    with BeforeAndAfterAll {
+  this: FlatSpec =>
+
+  //Bring in sync the timeout used by ScalaFutures and DBUtils
+  implicit override val patienceConfig: PatienceConfig = PatienceConfig(timeout = dbOpTimeout)
+
+  protected implicit val materializer: ActorMaterializer = ActorMaterializer()
+
+  protected val prefix = s"attachmentTCK_${Random.alphanumeric.take(4).mkString}"
+
+  private val attachmentsToDelete = ListBuffer[String]()
+
+  def store: AttachmentStore
+
+  def storeType: String
+
+  def garbageCollectAttachments: Boolean = true
+
+  behavior of s"$storeType AttachmentStore"
+
+  it should "add and read attachment" in {
+    implicit val tid: TransactionId = transid()
+    val bytes = randomBytes(16023)
+
+    val docId = newDocId()
+    val result = store.attach(docId, "code", ContentTypes.`application/octet-stream`, chunkedSource(bytes)).futureValue
+
+    result.length shouldBe 16023
+
+    val byteBuilder = store.readAttachment(docId, "code", byteStringSink()).futureValue
+
+    byteBuilder.result() shouldBe ByteString(bytes)
+    garbageCollect(docId)
+  }
+
+  it should "add and delete attachments" in {
+    implicit val tid: TransactionId = transid()
+    val b1 = randomBytes(1000)
+    val b2 = randomBytes(2000)
+    val b3 = randomBytes(3000)
+
+    val docId = newDocId()
+    val r1 = store.attach(docId, "c1", ContentTypes.`application/octet-stream`, chunkedSource(b1)).futureValue
+    val r2 = store.attach(docId, "c2", ContentTypes.`application/json`, chunkedSource(b2)).futureValue
+    val r3 = store.attach(docId, "c3", ContentTypes.`application/json`, chunkedSource(b3)).futureValue
+
+    r1.length shouldBe 1000
+    r2.length shouldBe 2000
+    r3.length shouldBe 3000
+
+    attachmentBytes(docId, "c1").futureValue.result() shouldBe ByteString(b1)
+    attachmentBytes(docId, "c2").futureValue.result() shouldBe ByteString(b2)
+    attachmentBytes(docId, "c3").futureValue.result() shouldBe ByteString(b3)
+
+    //Delete single attachment
+    store.deleteAttachment(docId, "c1").futureValue shouldBe true
+
+    //Non deleted attachments related to same docId must still be accessible
+    attachmentBytes(docId, "c1").failed.futureValue shouldBe a[NoDocumentException]
+    attachmentBytes(docId, "c2").futureValue.result() shouldBe ByteString(b2)
+    attachmentBytes(docId, "c3").futureValue.result() shouldBe ByteString(b3)
+
+    //Delete all attachments
+    store.deleteAttachments(docId).futureValue shouldBe true
+
+    attachmentBytes(docId, "c2").failed.futureValue shouldBe a[NoDocumentException]
+    attachmentBytes(docId, "c3").failed.futureValue shouldBe a[NoDocumentException]
+  }
+
+  it should "throw NoDocumentException on reading non existing attachment" in {
+    implicit val tid: TransactionId = transid()
+
+    val docId = DocId("no-existing-id")
+    val f = store.readAttachment(docId, "code", byteStringSink())
+
+    f.failed.futureValue shouldBe a[NoDocumentException]
+  }
+
+  it should "not write an attachment when there is error in Source" in {
+    implicit val tid: TransactionId = transid()
+
+    val docId = newDocId()
+    val error = new Error("boom!")
+    val faultySource = Source(1 to 10)
+      .map { n ⇒
+        if (n == 7) throw error
+        n
+      }
+      .map(ByteString(_))
+    val writeResult = store.attach(docId, "code", ContentTypes.`application/octet-stream`, faultySource)
+    writeResult.failed.futureValue.getCause should be theSameInstanceAs error
+
+    val readResult = store.readAttachment(docId, "code", byteStringSink())
+    readResult.failed.futureValue shouldBe a[NoDocumentException]
+  }
+
+  override def afterAll(): Unit = {
+    if (garbageCollectAttachments) {
+      implicit val tid: TransactionId = transid()
+      val f =
+        Source(attachmentsToDelete.toList)
+          .mapAsync(2)(id => store.deleteAttachments(DocId(id)))
+          .runWith(Sink.ignore)
+      Await.result(f, 1.minute)
+    }
+    super.afterAll()
+  }
+
+  protected def garbageCollect(docId: DocId): Unit = {}
+
+  protected def newDocId(): DocId = {
+    //By default create an info with dummy revision
+    //as apart from CouchDB other stores do not support the revision property
+    //for blobs
+    counter = counter + 1
+    val docId = s"${prefix}_$counter"
+    attachmentsToDelete += docId
+    DocId(docId)
+  }
+
+  @volatile var counter = 0
+
+  private def attachmentBytes(id: DocId, name: String) = {
+    implicit val tid: TransactionId = transid()
+    store.readAttachment(id, name, byteStringSink())
+  }
+
+  private def chunkedSource(bytes: Array[Byte]): Source[ByteString, _] = {
+    StreamConverters.fromInputStream(() => new ByteArrayInputStream(bytes), 42)
+  }
+
+  private def byteStringSink() = {
+    Sink.fold[ByteStringBuilder, ByteString](new ByteStringBuilder)((builder, b) => builder ++= b)
+  }
+}
diff --git a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
index 39ddad3b80..2b39fd311b 100644
--- a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
+++ b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
@@ -21,26 +21,23 @@ import java.util.Base64
 import java.util.concurrent.TimeoutException
 import java.util.concurrent.atomic.AtomicInteger
 
-import scala.collection.mutable.ListBuffer
-import scala.concurrent.Await
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.concurrent.duration.Duration
-import scala.concurrent.duration.DurationInt
-import scala.language.postfixOps
-import scala.util.{Failure, Random, Success, Try}
-import spray.json._
+import akka.http.scaladsl.model.ContentType
+import akka.stream.scaladsl.Source
+import akka.util.ByteString
 import spray.json.DefaultJsonProtocol._
+import spray.json._
 import whisk.common.TransactionId
 import whisk.core.database._
 import whisk.core.database.memory.MemoryArtifactStore
-import whisk.core.entity._
-import whisk.core.entity.types.AuthStore
-import whisk.core.entity.types.EntityStore
-import akka.http.scaladsl.model.ContentType
-import akka.stream.scaladsl.Source
-import akka.util.ByteString
 import whisk.core.entity.Attachments.Attached
+import whisk.core.entity._
+import whisk.core.entity.types.{AuthStore, EntityStore}
+
+import scala.collection.mutable.ListBuffer
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.duration.{Duration, DurationInt}
+import scala.language.postfixOps
+import scala.util.{Failure, Random, Success, Try}
 
 /**
  * WARNING: the put/get/del operations in this trait operate directly on the datastore,
@@ -278,7 +275,10 @@ trait DbUtils {
    */
   def cleanup()(implicit timeout: Duration = 10 seconds) = {
     docsToDelete.map { e =>
-      Try(Await.result(e._1.del(e._2)(TransactionId.testing), timeout))
+      Try {
+        Await.result(e._1.del(e._2)(TransactionId.testing), timeout)
+        Await.result(e._1.deleteAttachments(e._2)(TransactionId.testing), timeout)
+      }
     }
     docsToDelete.clear()
   }
@@ -321,7 +321,7 @@ trait DbUtils {
   def isMemoryStore(store: ArtifactStore[_]): Boolean = store.isInstanceOf[MemoryArtifactStore[_]]
   def isCouchStore(store: ArtifactStore[_]): Boolean = store.isInstanceOf[CouchDbRestStore[_]]
 
-  private def randomBytes(size: Int): Array[Byte] = {
+  protected def randomBytes(size: Int): Array[Byte] = {
     val arr = new Array[Byte](size)
     Random.nextBytes(arr)
     arr
diff --git a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
index 18083c3cc9..de7a7a860f 100644
--- a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
+++ b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
@@ -18,10 +18,12 @@
 package whisk.core.database.test.behavior
 
 import java.io.ByteArrayOutputStream
+import java.util.Base64
 
 import akka.http.scaladsl.model.{ContentTypes, Uri}
 import akka.stream.IOResult
-import akka.stream.scaladsl.StreamConverters
+import akka.stream.scaladsl.{Sink, StreamConverters}
+import akka.util.{ByteString, ByteStringBuilder}
 import whisk.common.TransactionId
 import whisk.core.database.{AttachmentInliner, CacheChangeNotification, NoDocumentException}
 import whisk.core.entity.Attachments.{Attached, Attachment, Inline}
@@ -29,7 +31,7 @@ import whisk.core.entity.test.ExecHelpers
 import whisk.core.entity.{CodeExec, DocInfo, EntityName, ExecManifest, WhiskAction}
 
 trait ArtifactStoreAttachmentBehaviors extends ArtifactStoreBehaviorBase with ExecHelpers {
-  behavior of "Attachments"
+  behavior of s"${storeType}ArtifactStore attachments"
 
   private val namespace = newNS()
   private val attachmentHandler = Some(WhiskAction.attachmentHandler _)
@@ -58,6 +60,57 @@ trait ArtifactStoreAttachmentBehaviors extends ArtifactStoreBehaviorBase with Ex
     attachmentUri.isAbsolute shouldBe true
   }
 
+  /**
+   * This test asserts that old attachments are deleted and cannot be read again
+   */
+  it should "fail on reading with old non inlined attachment" in {
+    implicit val tid: TransactionId = transid()
+    val code1 = nonInlinedCode(entityStore)
+    val exec = javaDefault(code1, Some("hello"))
+    val javaAction =
+      WhiskAction(namespace, EntityName("attachment_update_2"), exec)
+
+    val i1 = WhiskAction.put(entityStore, javaAction, old = None).futureValue
+
+    val action2 = entityStore.get[WhiskAction](i1, attachmentHandler).futureValue
+    val code2 = nonInlinedCode(entityStore)
+    val exec2 = javaDefault(code2, Some("hello"))
+    val action2Updated = action2.copy(exec = exec2).revision[WhiskAction](i1.rev)
+
+    val i2 = WhiskAction.put(entityStore, action2Updated, old = Some(action2)).futureValue
+    val action3 = entityStore.get[WhiskAction](i2, attachmentHandler).futureValue
+
+    docsToDelete += ((entityStore, i2))
+    getAttachmentBytes(i2, attached(action3)).futureValue.result() shouldBe decode(code2)
+    getAttachmentBytes(i1, attached(action2)).failed.futureValue shouldBe a[NoDocumentException]
+  }
+
+  /**
+   * Variant of previous test where read with old attachment should still work
+   * if attachment is inlined
+   */
+  it should "work on reading with old inlined attachment" in {
+    implicit val tid: TransactionId = transid()
+    val code1 = encodedRandomBytes(inlinedAttachmentSize(entityStore))
+    val exec = javaDefault(code1, Some("hello"))
+    val javaAction =
+      WhiskAction(namespace, EntityName("attachment_update_2"), exec)
+
+    val i1 = WhiskAction.put(entityStore, javaAction, old = None).futureValue
+
+    val action2 = entityStore.get[WhiskAction](i1, attachmentHandler).futureValue
+    val code2 = nonInlinedCode(entityStore)
+    val exec2 = javaDefault(code2, Some("hello"))
+    val action2Updated = action2.copy(exec = exec2).revision[WhiskAction](i1.rev)
+
+    val i2 = WhiskAction.put(entityStore, action2Updated, old = Some(action2)).futureValue
+    val action3 = entityStore.get[WhiskAction](i2, attachmentHandler).futureValue
+
+    docsToDelete += ((entityStore, i2))
+    getAttachmentBytes(i2, attached(action3)).futureValue.result() shouldBe decode(code2)
+    getAttachmentBytes(i2, attached(action2)).futureValue.result() shouldBe decode(code1)
+  }
+
   it should "put and read same attachment" in {
     implicit val tid: TransactionId = transid()
     val size = nonInlinedAttachmentSize(entityStore)
@@ -124,9 +177,44 @@ trait ArtifactStoreAttachmentBehaviors extends ArtifactStoreBehaviorBase with Ex
       .futureValue shouldBe a[NoDocumentException]
   }
 
+  it should "delete attachment on document delete" in {
+    val attachmentStore = getAttachmentStore(entityStore)
+    assume(attachmentStore.isDefined, "ArtifactStore does not have attachmentStore configured")
+
+    implicit val tid: TransactionId = transid()
+    val size = nonInlinedAttachmentSize(entityStore)
+    val base64 = encodedRandomBytes(size)
+
+    val exec = javaDefault(base64, Some("hello"))
+    val javaAction =
+      WhiskAction(namespace, EntityName("attachment_unique"), exec)
+
+    val i1 = WhiskAction.put(entityStore, javaAction, old = None).futureValue
+    val action2 = entityStore.get[WhiskAction](i1, attachmentHandler).futureValue
+
+    WhiskAction.del(entityStore, i1).futureValue shouldBe true
+
+    val attachmentName = Uri(attached(action2).attachmentName).path.toString
+    attachmentStore.get
+      .readAttachment(i1.id, attachmentName, byteStringSink())
+      .failed
+      .futureValue shouldBe a[NoDocumentException]
+  }
+
   private def attached(a: WhiskAction): Attached =
     a.exec.asInstanceOf[CodeExec[Attachment[Nothing]]].code.asInstanceOf[Attached]
 
   private def inlined(a: WhiskAction): Inline[String] =
     a.exec.asInstanceOf[CodeExec[Attachment[String]]].code.asInstanceOf[Inline[String]]
+
+  private def getAttachmentBytes(docInfo: DocInfo, attached: Attached) = {
+    implicit val tid: TransactionId = transid()
+    entityStore.readAttachment(docInfo, attached, byteStringSink())
+  }
+
+  private def byteStringSink() = {
+    Sink.fold[ByteStringBuilder, ByteString](new ByteStringBuilder)((builder, b) => builder ++= b)
+  }
+
+  private def decode(s: String): ByteString = ByteString(Base64.getDecoder.decode(s))
 }
diff --git a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreBehaviorBase.scala b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreBehaviorBase.scala
index e22063d6c2..239bbfc923 100644
--- a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreBehaviorBase.scala
+++ b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreBehaviorBase.scala
@@ -25,8 +25,9 @@ import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures}
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec, Matchers}
 import spray.json.{JsObject, JsValue}
 import whisk.common.TransactionId
+import whisk.core.database.memory.MemoryAttachmentStore
 import whisk.core.database.test.DbUtils
-import whisk.core.database.{ArtifactStore, StaleParameter}
+import whisk.core.database.{ArtifactStore, AttachmentStore, StaleParameter}
 import whisk.core.entity._
 import whisk.utils.JsHelpers
 
@@ -61,11 +62,13 @@ trait ArtifactStoreBehaviorBase
   }
 
   override def afterAll(): Unit = {
+    assertAttachmentStoreIsEmpty()
     println("Shutting down store connections")
     authStore.shutdown()
     entityStore.shutdown()
     activationStore.shutdown()
     super.afterAll()
+    assertAttachmentStoresAreClosed()
   }
 
   //~----------------------------------------< utility methods >
@@ -137,4 +140,30 @@ trait ArtifactStoreBehaviorBase
   protected def getJsField(js: JsObject, subObject: String, fieldName: String): JsValue = {
     js.fields(subObject).asJsObject().fields(fieldName)
   }
+
+  protected def getAttachmentStore(store: ArtifactStore[_]): Option[AttachmentStore]
+
+  protected def getAttachmentCount(store: AttachmentStore): Option[Int] = store match {
+    case s: MemoryAttachmentStore => Some(s.attachmentCount)
+    case _                        => None
+  }
+
+  private def assertAttachmentStoreIsEmpty(): Unit = {
+    Seq(authStore, entityStore, activationStore).foreach { s =>
+      for {
+        as <- getAttachmentStore(s)
+        count <- getAttachmentCount(as)
+      } require(count == 0, s"AttachmentStore not empty after all runs - $count")
+    }
+  }
+
+  private def assertAttachmentStoresAreClosed(): Unit = {
+    Seq(authStore, entityStore, activationStore).foreach { s =>
+      getAttachmentStore(s).foreach {
+        case s: MemoryAttachmentStore => require(s.isClosed, "AttachmentStore was not closed")
+        case _                        =>
+      }
+    }
+  }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services