You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/06/08 05:13:18 UTC

[GitHub] chetanmeh closed pull request #3709: Inlined attachments

chetanmeh closed pull request #3709: Inlined attachments
URL: https://github.com/apache/incubator-openwhisk/pull/3709
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index feaa96095c..dbc70c5fd3 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -114,6 +114,14 @@ whisk {
         actions-ddoc = "whisks.v2.1.0"
         activations-ddoc = "whisks.v2.1.0"
         activations-filter-ddoc = "whisks-filters.v2.1.0"
+
+        # Size limit for inlined attachments. Attachments having size less than this would
+        # be inlined with there content encoded in attachmentName
+        max-inline-size = 16 k
+
+        # Chunk sized for converting source of bytes to ByteString as part of attachment
+        # upload flow
+        chunk-size = 8 k
     }
 
     # CouchDB related configuration
diff --git a/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala b/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala
index 3750398939..4a70e40c1a 100644
--- a/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala
@@ -137,8 +137,8 @@ trait ArtifactStore[DocumentAbstraction] {
   /**
    * Retrieves a saved attachment, streaming it into the provided Sink.
    */
-  protected[core] def readAttachment[T](doc: DocInfo, name: String, sink: Sink[ByteString, Future[T]])(
-    implicit transid: TransactionId): Future[(ContentType, T)]
+  protected[core] def readAttachment[T](doc: DocInfo, attached: Attached, sink: Sink[ByteString, Future[T]])(
+    implicit transid: TransactionId): Future[T]
 
   /**
    * Deletes all attachments linked to given document
diff --git a/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala b/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala
new file mode 100644
index 0000000000..14eb192aa1
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database
+
+import java.security.MessageDigest
+import java.util.Base64
+
+import akka.NotUsed
+import akka.http.scaladsl.model.Uri
+import akka.stream.Materializer
+import akka.stream.scaladsl.{Concat, Sink, Source}
+import akka.util.{ByteString, ByteStringBuilder}
+import whisk.core.database.AttachmentInliner.MemScheme
+import whisk.core.entity.ByteSize
+
+import scala.collection.immutable
+import scala.concurrent.Future
+
+object AttachmentInliner {
+
+  /**
+   * Scheme name for attachments which are inlined
+   */
+  val MemScheme: String = "mem"
+}
+
+case class InliningConfig(maxInlineSize: ByteSize, chunkSize: ByteSize)
+
+/**
+ * Provides support for inlining small attachments. Inlined attachment contents are encoded as part of attachment
+ * name itself.
+ */
+trait AttachmentInliner {
+  private val digestAlgo = "SHA-256"
+  private val encodedAlgoName = digestAlgo.toLowerCase.replaceAllLiterally("-", "")
+
+  /** Materializer required for stream processing */
+  protected[core] implicit val materializer: Materializer
+
+  protected[database] def inlineAndTail(
+    docStream: Source[ByteString, _]): Future[(immutable.Seq[Byte], Source[Byte, _])] = {
+    docStream
+      .mapConcat(_.seq)
+      .prefixAndTail(maxInlineSize.toBytes.toInt)
+      .runWith(Sink.head[(immutable.Seq[Byte], Source[Byte, _])])
+  }
+
+  protected[database] def uriOf(bytes: Seq[Byte], path: => String): Uri = {
+    //For less than case its definitive that tail source would be empty
+    //for equal case it cannot be determined if tail source is empty. Actual max inline size
+    //would be inlineSize - 1
+    if (bytes.size < maxInlineSize.toBytes) {
+      Uri.from(scheme = MemScheme, path = encode(bytes))
+    } else {
+      Uri.from(scheme = attachmentScheme, path = path)
+    }
+  }
+
+  /**
+   * Constructs a combined source based on attachment content read so far and rest of unread content.
+   * Emitted elements are up to `chunkSize` sized [[akka.util.ByteString]] elements.
+   */
+  protected[database] def combinedSource(inlinedBytes: immutable.Seq[Byte],
+                                         tailSource: Source[Byte, _]): Source[ByteString, NotUsed] =
+    Source
+      .combine(Source(inlinedBytes), tailSource)(Concat[Byte])
+      .batch[ByteStringBuilder](chunkSize.toBytes, b => { val bb = new ByteStringBuilder(); bb += b })((bb, b) =>
+        bb += b)
+      .map(_.result())
+
+  /**
+   * Constructs a source from inlined attachment contents
+   */
+  protected[database] def memorySource(uri: Uri): Source[ByteString, NotUsed] = {
+    require(uri.scheme == MemScheme, s"URI $uri scheme is not $MemScheme")
+    Source.single(ByteString(decode(uri)))
+  }
+
+  protected[database] def isInlined(uri: Uri): Boolean = uri.scheme == MemScheme
+
+  protected[database] def digest(bytes: TraversableOnce[Byte]): String = {
+    val digester = MessageDigest.getInstance(digestAlgo)
+    digester.update(bytes.toArray)
+    val digest = digester.digest().map("%02x".format(_)).mkString
+    s"$encodedAlgoName-$digest"
+  }
+
+  /**
+   * Attachments having size less than this would be inlined
+   */
+  def maxInlineSize: ByteSize = inliningConfig.maxInlineSize
+
+  def chunkSize: ByteSize = inliningConfig.chunkSize
+
+  protected def inliningConfig: InliningConfig
+
+  protected def attachmentScheme: String
+
+  private def encode(bytes: Seq[Byte]): String = {
+    Base64.getUrlEncoder.encodeToString(bytes.toArray)
+  }
+
+  private def decode(uri: Uri): Array[Byte] = {
+    Base64.getUrlDecoder.decode(uri.path.toString())
+  }
+}
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
index 8c401bdfe4..d0f13c9958 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
@@ -30,8 +30,8 @@ import whisk.core.entity.Attachments.Attached
 import whisk.core.entity.{BulkEntityResult, DocInfo, DocumentReader, UUID}
 import whisk.http.Messages
 
-import scala.concurrent.{Await, Future}
 import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
 import scala.util.Try
 
 /**
@@ -51,18 +51,21 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
                                                                   dbUsername: String,
                                                                   dbPassword: String,
                                                                   dbName: String,
-                                                                  useBatching: Boolean = false)(
+                                                                  useBatching: Boolean = false,
+                                                                  val inliningConfig: InliningConfig)(
   implicit system: ActorSystem,
   val logging: Logging,
   jsonFormat: RootJsonFormat[DocumentAbstraction],
-  materializer: ActorMaterializer,
+  val materializer: ActorMaterializer,
   docReader: DocumentReader)
     extends ArtifactStore[DocumentAbstraction]
-    with DefaultJsonProtocol {
+    with DefaultJsonProtocol
+    with AttachmentInliner {
 
   protected[core] implicit val executionContext = system.dispatcher
 
-  private val attachmentScheme = "couch"
+  val attachmentScheme: String = "couch"
+
   private val client: CouchDbRestClient =
     new CouchDbRestClient(dbProtocol, dbHost, dbPort.toInt, dbUsername, dbPassword, dbName)
 
@@ -354,13 +357,21 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
     docStream: Source[ByteString, _],
     oldAttachment: Option[Attached])(implicit transid: TransactionId): Future[(DocInfo, Attached)] = {
 
-    val attachmentUri = Uri.from(scheme = attachmentScheme, path = UUID().asString)
-    val attached = Attached(attachmentUri.toString(), contentType)
-    val updatedDoc = update(d, attached)
-
     for {
-      i1 <- put(updatedDoc)
-      i2 <- attach(i1, attachmentUri.path.toString(), attached.attachmentType, docStream)
+      (bytes, tailSource) <- inlineAndTail(docStream)
+      uri <- Future.successful(uriOf(bytes, UUID().asString))
+      attached <- {
+        val a = if (isInlined(uri)) {
+          Attached(uri.toString, contentType, Some(bytes.size), Some(digest(bytes)))
+        } else {
+          Attached(uri.toString, contentType)
+        }
+        Future.successful(a)
+      }
+      i1 <- put(update(d, attached))
+      i2 <- if (isInlined(uri)) { Future.successful(i1) } else {
+        attach(i1, uri.path.toString, attached.attachmentType, combinedSource(bytes, tailSource))
+      }
     } yield (i2, attached)
   }
 
@@ -408,9 +419,10 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
           ErrorLevel))
   }
 
-  override protected[core] def readAttachment[T](doc: DocInfo, name: String, sink: Sink[ByteString, Future[T]])(
-    implicit transid: TransactionId): Future[(ContentType, T)] = {
+  override protected[core] def readAttachment[T](doc: DocInfo, attached: Attached, sink: Sink[ByteString, Future[T]])(
+    implicit transid: TransactionId): Future[T] = {
 
+    val name = attached.attachmentName
     val start = transid.started(
       this,
       LoggingMarkers.DATABASE_ATT_GET,
@@ -420,12 +432,14 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
     require(doc.rev.rev != null, "doc revision must be specified")
 
     val attachmentUri = Uri(name)
-    val f = client.getAttachment[T](doc.id.id, doc.rev.rev, attachmentUri.path.toString(), sink)
-    val g = f.map { e =>
-      e match {
-        case Right((contentType, result)) =>
+    val g = if (isInlined(attachmentUri)) {
+      memorySource(attachmentUri).runWith(sink)
+    } else {
+      val f = client.getAttachment[T](doc.id.id, doc.rev.rev, attachmentUri.path.toString, sink)
+      f.map {
+        case Right((_, result)) =>
           transid.finished(this, start, s"[ATT_GET] '$dbName' completed: found attachment '$name' of document '$doc'")
-          (contentType, result)
+          result
 
         case Left(StatusCodes.NotFound) =>
           transid.finished(
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala
index c10436b5c8..df6a374a56 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala
@@ -23,6 +23,7 @@ import spray.json.RootJsonFormat
 import whisk.common.Logging
 import whisk.core.ConfigKeys
 import whisk.core.entity.DocumentReader
+import whisk.core.entity.size._
 import pureconfig._
 
 import scala.reflect.ClassTag
@@ -58,6 +59,8 @@ object CouchDbStoreProvider extends ArtifactStoreProvider {
       dbConfig.provider == "Cloudant" || dbConfig.provider == "CouchDB",
       s"Unsupported db.provider: ${dbConfig.provider}")
 
+    val inliningConfig = loadConfigOrThrow[InliningConfig](ConfigKeys.db)
+
     new CouchDbRestStore[D](
       dbConfig.protocol,
       dbConfig.host,
@@ -65,6 +68,7 @@ object CouchDbStoreProvider extends ArtifactStoreProvider {
       dbConfig.username,
       dbConfig.password,
       dbConfig.databaseFor[D],
-      useBatching)
+      useBatching,
+      inliningConfig)
   }
 }
diff --git a/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala b/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
index 38827b39b4..f05d39c016 100644
--- a/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
+++ b/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
@@ -224,7 +224,7 @@ trait DocumentFactory[W <: DocumentRevisionProvider] extends MultipleReadersSing
   def getAttachment[Wsuper >: W](
     db: ArtifactStore[Wsuper],
     doc: W,
-    attachmentName: String,
+    attached: Attached,
     outputStream: OutputStream,
     postProcess: Option[W => W] = None)(implicit transid: TransactionId, mw: Manifest[W]): Future[W] = {
 
@@ -242,7 +242,7 @@ trait DocumentFactory[W <: DocumentRevisionProvider] extends MultipleReadersSing
       val key = CacheKey(docInfo)
       val sink = StreamConverters.fromOutputStream(() => outputStream)
 
-      db.readAttachment[IOResult](docInfo, attachmentName, sink).map {
+      db.readAttachment[IOResult](docInfo, attached, sink).map {
         case _ =>
           val cacheDoc = postProcess map { _(doc) } getOrElse doc
 
diff --git a/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala b/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
index e1275997fe..75973ffe08 100644
--- a/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
@@ -17,20 +17,20 @@
 
 package whisk.core.database.memory
 
-import java.security.MessageDigest
-import java.util.Base64
-
 import akka.actor.ActorSystem
 import akka.http.scaladsl.model.{ContentType, Uri}
 import akka.stream.ActorMaterializer
 import akka.stream.scaladsl.{Keep, Sink, Source}
 import akka.util.{ByteString, ByteStringBuilder}
+import pureconfig.loadConfigOrThrow
 import spray.json.{DefaultJsonProtocol, DeserializationException, JsObject, JsString, RootJsonFormat}
 import whisk.common.{Logging, LoggingMarkers, TransactionId}
+import whisk.core.ConfigKeys
 import whisk.core.database.StoreUtils._
 import whisk.core.database._
 import whisk.core.entity.Attachments.Attached
 import whisk.core.entity._
+import whisk.core.entity.size._
 import whisk.http.Messages
 
 import scala.collection.concurrent.TrieMap
@@ -48,8 +48,8 @@ object MemoryArtifactStoreProvider extends ArtifactStoreProvider {
 
     val classTag = implicitly[ClassTag[D]]
     val (dbName, handler, viewMapper) = handlerAndMapper(classTag)
-
-    new MemoryArtifactStore(dbName, handler, viewMapper)
+    val inliningConfig = loadConfigOrThrow[InliningConfig](ConfigKeys.db)
+    new MemoryArtifactStore(dbName, handler, viewMapper, inliningConfig)
   }
 
   private def handlerAndMapper[D](entityType: ClassTag[D])(
@@ -74,15 +74,17 @@ object MemoryArtifactStoreProvider extends ArtifactStoreProvider {
  */
 class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: String,
                                                                      documentHandler: DocumentHandler,
-                                                                     viewMapper: MemoryViewMapper)(
+                                                                     viewMapper: MemoryViewMapper,
+                                                                     val inliningConfig: InliningConfig)(
   implicit system: ActorSystem,
   val logging: Logging,
   jsonFormat: RootJsonFormat[DocumentAbstraction],
-  materializer: ActorMaterializer,
+  val materializer: ActorMaterializer,
   docReader: DocumentReader)
     extends ArtifactStore[DocumentAbstraction]
     with DefaultJsonProtocol
-    with DocumentProvider {
+    with DocumentProvider
+    with AttachmentInliner {
 
   override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher
 
@@ -90,7 +92,7 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str
 
   private val _id = "_id"
   private val _rev = "_rev"
-  private val attachmentScheme = "mem"
+  val attachmentScheme = "mems"
 
   override protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo] = {
     val asJson = d.toDocumentRecord
@@ -244,23 +246,29 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str
     f.map(_.size)
   }
 
-  override protected[core] def readAttachment[T](doc: DocInfo, name: String, sink: Sink[ByteString, Future[T]])(
-    implicit transid: TransactionId): Future[(ContentType, T)] = {
+  override protected[core] def readAttachment[T](doc: DocInfo, attached: Attached, sink: Sink[ByteString, Future[T]])(
+    implicit transid: TransactionId): Future[T] = {
     //TODO Temporary implementation till MemoryAttachmentStore PR is merged
+    val name = attached.attachmentName
     val start = transid.started(
       this,
       LoggingMarkers.DATABASE_ATT_GET,
       s"[ATT_GET] '$dbName' finding attachment '$name' of document '$doc'")
 
-    val storedName = Uri(name).path.toString()
-    artifacts.get(doc.id.id) match {
-      case Some(a: Artifact) if a.attachments.contains(storedName) =>
-        val attachment = a.attachments(storedName)
-        val r = Source.single(attachment.bytes).toMat(sink)(Keep.right).run
-        transid.finished(this, start, s"[ATT_GET] '$dbName' completed: found attachment '$name' of document '$doc'")
-        r.map(t => (attachment.contentType, t))
-      case None =>
-        Future.failed(NoDocumentException("Not found on 'readAttachment'."))
+    val attachmentUri = Uri(name)
+    if (isInlined(attachmentUri)) {
+      memorySource(attachmentUri).runWith(sink)
+    } else {
+      val storedName = attachmentUri.path.toString()
+      artifacts.get(doc.id.id) match {
+        case Some(a: Artifact) if a.attachments.contains(storedName) =>
+          val attachment = a.attachments(storedName)
+          val r = Source.single(attachment.bytes).toMat(sink)(Keep.right).run
+          transid.finished(this, start, s"[ATT_GET] '$dbName' completed: found attachment '$name' of document '$doc'")
+          r
+        case None =>
+          Future.failed(NoDocumentException("Not found on 'readAttachment'."))
+      }
     }
   }
 
@@ -275,19 +283,28 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str
     docStream: Source[ByteString, _],
     oldAttachment: Option[Attached])(implicit transid: TransactionId): Future[(DocInfo, Attached)] = {
 
-    val attachmentUri = Uri.from(scheme = attachmentScheme, path = UUID().asString)
-
+    //Inlined attachment with Memory storage is not required. However to validate the constructs
+    //inlined support is implemented
     for {
-      bytes <- toByteString(docStream)
-      attached <- Future.successful(
-        Attached(attachmentUri.toString(), contentType, Some(bytes.size), Some(digest(bytes))))
-      updatedDoc <- Future.successful(update(d, attached))
-      i1 <- put(updatedDoc)
-      i2 <- attach(i1, attachmentUri.path.toString(), attached.attachmentType, bytes)
+      allBytes <- toByteString(docStream)
+      (bytes, tailSource) <- inlineAndTail(Source.single(allBytes))
+      uri <- Future.successful(uriOf(bytes, UUID().asString))
+      attached <- {
+        val a = if (isInlined(uri)) {
+          Attached(uri.toString(), contentType, Some(bytes.size), Some(digest(bytes)))
+        } else {
+          Attached(uri.toString(), contentType, Some(allBytes.size), Some(digest(allBytes)))
+        }
+        Future.successful(a)
+      }
+      i1 <- put(update(d, attached))
+      i2 <- if (isInlined(uri)) { Future.successful(i1) } else {
+        attach(i1, uri.path.toString(), attached.attachmentType, toByteString(combinedSource(bytes, tailSource)))
+      }
     } yield (i2, attached)
   }
 
-  private def attach(doc: DocInfo, name: String, contentType: ContentType, bytes: ByteString)(
+  private def attach(doc: DocInfo, name: String, contentType: ContentType, bytes: Future[ByteString])(
     implicit transid: TransactionId): Future[DocInfo] = {
 
     val start = transid.started(
@@ -296,11 +313,11 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str
       s"[ATT_PUT] '$dbName' uploading attachment '$name' of document '$doc'")
 
     //TODO Temporary implementation till MemoryAttachmentStore PR is merged
-    val g =
+    bytes.map { b =>
       artifacts.get(doc.id.id) match {
         case Some(a) =>
           val existing = Artifact(doc, a.doc, a.computed)
-          val updated = existing.attach(name, Attachment(bytes, contentType))
+          val updated = existing.attach(name, Attachment(b, contentType))
           if (artifacts.replace(doc.id.id, existing, updated)) {
             transid
               .finished(this, start, s"[ATT_PUT] '$dbName' completed uploading attachment '$name' of document '$doc'")
@@ -311,7 +328,7 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str
         case None =>
           throw DocumentConflictException("conflict on 'put'")
       }
-    Future.successful(g)
+    }
   }
 
   override def shutdown(): Unit = {
@@ -337,16 +354,9 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str
     reportFailure(f, start, failure => s"[GET] '$dbName' internal error, doc: '$id', failure: '${failure.getMessage}'")
   }
 
-  private def toByteString(docStream: Source[ByteString, _]) =
+  private def toByteString(docStream: Source[Traversable[Byte], _]) =
     docStream.runFold(new ByteStringBuilder)((builder, b) => builder ++= b).map(_.result().compact)
 
-  private def digest(bytes: ByteString) = {
-    val digestBytes = MessageDigest
-      .getInstance("MD5")
-      .digest(bytes.toArray)
-    s"md5-${Base64.getUrlEncoder.encodeToString(digestBytes)}"
-  }
-
   private def getRevision(asJson: JsObject) = {
     asJson.fields.get(_rev) match {
       case Some(JsString(r)) => r.toInt
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
index a0011ff33f..ac6b2d059e 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
@@ -371,11 +371,11 @@ object WhiskAction extends DocumentFactory[WhiskAction] with WhiskEntityQueries[
 
     fa.flatMap { action =>
       action.exec match {
-        case exec @ CodeExecAsAttachment(_, Attached(attachmentName, _, _, _), _) =>
+        case exec @ CodeExecAsAttachment(_, attached: Attached, _) =>
           val boas = new ByteArrayOutputStream()
           val b64s = Base64.getEncoder().wrap(boas)
 
-          getAttachment[A](db, action, attachmentName, b64s, Some { a: WhiskAction =>
+          getAttachment[A](db, action, attached, b64s, Some { a: WhiskAction =>
             b64s.close()
             val newAction = a.copy(exec = exec.inline(boas.toByteArray))
             newAction.revision(a.rev)
diff --git a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
index c8d360e827..4842a11f01 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
@@ -781,7 +781,11 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
 
   it should "put and then get an action with attachment from cache" in {
     val action =
-      WhiskAction(namespace, aname(), javaDefault("ZHViZWU=", Some("hello")), annotations = Parameters("exec", "java"))
+      WhiskAction(
+        namespace,
+        aname(),
+        javaDefault(nonInlinedCode(entityStore), Some("hello")),
+        annotations = Parameters("exec", "java"))
     val content = WhiskActionPut(
       Some(action.exec),
       Some(action.parameters),
@@ -854,9 +858,85 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
     stream.reset()
   }
 
+  it should "put and then get an action with inlined attachment" in {
+    val action =
+      WhiskAction(
+        namespace,
+        aname(),
+        javaDefault(encodedRandomBytes(inlinedAttachmentSize(entityStore)), Some("hello")),
+        annotations = Parameters("exec", "java"))
+    val content = WhiskActionPut(
+      Some(action.exec),
+      Some(action.parameters),
+      Some(ActionLimitsOption(Some(action.limits.timeout), Some(action.limits.memory), Some(action.limits.logs))))
+    val name = action.name
+    val cacheKey = s"${CacheKey(action)}".replace("(", "\\(").replace(")", "\\)")
+    val notExpectedGetLog = Seq(
+      s"finding document: 'id: ${action.namespace}/${action.name}",
+      s"finding attachment '[\\w-/:]+' of document 'id: ${action.namespace}/${action.name}").mkString("(?s).*")
+
+    // first request invalidates any previous entries and caches new result
+    Put(s"$collectionPath/$name", content) ~> Route.seal(routes(creds)(transid())) ~> check {
+      status should be(OK)
+      val response = responseAs[WhiskAction]
+      response should be(
+        WhiskAction(
+          action.namespace,
+          action.name,
+          action.exec,
+          action.parameters,
+          action.limits,
+          action.version,
+          action.publish,
+          action.annotations ++ Parameters(WhiskAction.execFieldName, JAVA_DEFAULT)))
+    }
+
+    stream.toString should not include (s"invalidating ${CacheKey(action)} on delete")
+    stream.toString should not include ("uploading attachment")
+    stream.reset()
+
+    // second request should fetch from cache
+    Get(s"$collectionPath/$name") ~> Route.seal(routes(creds)(transid())) ~> check {
+      status should be(OK)
+      val response = responseAs[WhiskAction]
+      response should be(
+        WhiskAction(
+          action.namespace,
+          action.name,
+          action.exec,
+          action.parameters,
+          action.limits,
+          action.version,
+          action.publish,
+          action.annotations ++ Parameters(WhiskAction.execFieldName, JAVA_DEFAULT)))
+    }
+
+    stream.toString should include(s"serving from cache: ${CacheKey(action)}")
+    stream.toString should not include regex(notExpectedGetLog)
+    stream.reset()
+
+    // delete should invalidate cache
+    Delete(s"$collectionPath/$name") ~> Route.seal(routes(creds)(transid())) ~> check {
+      status should be(OK)
+      val response = responseAs[WhiskAction]
+      response should be(
+        WhiskAction(
+          action.namespace,
+          action.name,
+          action.exec,
+          action.parameters,
+          action.limits,
+          action.version,
+          action.publish,
+          action.annotations ++ Parameters(WhiskAction.execFieldName, JAVA_DEFAULT)))
+    }
+    stream.toString should include(s"invalidating ${CacheKey(action)}")
+    stream.reset()
+  }
+
   it should "get an action with attachment that is not cached" in {
     implicit val tid = transid()
-    val code = "ZHViZWU="
+    val code = nonInlinedCode(entityStore)
     val action =
       WhiskAction(namespace, aname(), javaDefault(code, Some("hello")), annotations = Parameters("exec", "java"))
     val content = WhiskActionPut(
@@ -907,7 +987,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
 
   it should "update an existing action with attachment that is not cached" in {
     implicit val tid = transid()
-    val code = "ZHViZWU="
+    val code = nonInlinedCode(entityStore)
     val action =
       WhiskAction(namespace, aname(), javaDefault(code, Some("hello")), annotations = Parameters("exec", "java"))
     val content = WhiskActionPut(
diff --git a/tests/src/test/scala/whisk/core/database/test/AttachmentInlinerTests.scala b/tests/src/test/scala/whisk/core/database/test/AttachmentInlinerTests.scala
new file mode 100644
index 0000000000..783d57e348
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/test/AttachmentInlinerTests.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.test
+
+import akka.http.scaladsl.model.Uri
+import akka.stream.scaladsl.Source
+import akka.stream.{ActorMaterializer, Materializer}
+import akka.util.{ByteStringBuilder, CompactByteString}
+import common.WskActorSystem
+import org.junit.runner.RunWith
+import whisk.core.entity.size._
+import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import whisk.core.database.{AttachmentInliner, InliningConfig}
+
+@RunWith(classOf[JUnitRunner])
+class AttachmentInlinerTests extends FlatSpec with Matchers with ScalaFutures with WskActorSystem {
+
+  behavior of "Attachment inlining"
+
+  implicit val materializer: Materializer = ActorMaterializer()
+
+  it should "not inline if maxInlineSize set to zero" in {
+    val inliner = new TestInliner(InliningConfig(maxInlineSize = 0.KB, chunkSize = 8.KB))
+    val bs = CompactByteString("hello world")
+
+    val (head, tail) = inliner.inlineAndTail(Source.single(bs)).futureValue
+    val uri = inliner.uriOf(head, "foo")
+
+    uri shouldBe Uri("test:foo")
+
+    val bsResult = toByteString(inliner.combinedSource(head, tail)).futureValue
+    bsResult shouldBe bs
+  }
+
+  private def toByteString(docStream: Source[Traversable[Byte], _]) =
+    docStream.runFold(new ByteStringBuilder)((builder, b) => builder ++= b).map(_.result().compact)
+
+  class TestInliner(val inliningConfig: InliningConfig) extends AttachmentInliner {
+    override protected[core] implicit val materializer: Materializer = ActorMaterializer()
+    override protected def attachmentScheme: String = "test"
+  }
+}
diff --git a/tests/src/test/scala/whisk/core/database/test/AttachmentTests.scala b/tests/src/test/scala/whisk/core/database/test/AttachmentTests.scala
deleted file mode 100644
index bde3ee57f3..0000000000
--- a/tests/src/test/scala/whisk/core/database/test/AttachmentTests.scala
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.database.test
-
-import java.util.Base64
-
-import akka.http.scaladsl.model.Uri
-import akka.stream.ActorMaterializer
-import common.{StreamLogging, WskActorSystem}
-import org.junit.runner.RunWith
-import org.scalatest.concurrent.ScalaFutures
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec}
-import whisk.common.TransactionId
-import whisk.core.controller.test.WhiskAuthHelpers
-import whisk.core.database.CacheChangeNotification
-import whisk.core.entity.Attachments.{Attached, Attachment, Inline}
-import whisk.core.entity._
-import whisk.core.entity.test.ExecHelpers
-
-import scala.util.Random
-
-@RunWith(classOf[JUnitRunner])
-class AttachmentTests
-    extends FlatSpec
-    with BeforeAndAfterEach
-    with BeforeAndAfterAll
-    with WskActorSystem
-    with DbUtils
-    with ExecHelpers
-    with ScalaFutures
-    with StreamLogging {
-
-  implicit val materializer = ActorMaterializer()
-  private val namespace = EntityPath(WhiskAuthHelpers.newIdentity().subject.asString)
-  private val datastore = WhiskEntityStore.datastore()
-  private val attachmentHandler = Some(WhiskAction.attachmentHandler _)
-
-  implicit val cacheUpdateNotifier: Option[CacheChangeNotification] = None
-  implicit override val patienceConfig: PatienceConfig = PatienceConfig(timeout = dbOpTimeout)
-
-  override def afterEach() = {
-    cleanup()
-  }
-
-  override def afterAll() = {
-    datastore.shutdown()
-    super.afterAll()
-  }
-
-  behavior of "Datastore"
-
-  it should "generate different attachment name on update" in {
-    implicit val tid: TransactionId = transid()
-    val exec = javaDefault("ZHViZWU=", Some("hello"))
-    val javaAction =
-      WhiskAction(namespace, EntityName("attachment_unique"), exec)
-
-    val i1 = WhiskAction.put(datastore, javaAction, old = None).futureValue
-    val action2 = datastore.get[WhiskAction](i1, attachmentHandler).futureValue
-
-    //Change attachment to inline one otherwise WhiskAction would not go for putAndAttach
-    val action2Updated = action2.copy(exec = exec).revision[WhiskAction](i1.rev)
-    val i2 = WhiskAction.put(datastore, action2Updated, old = Some(action2)).futureValue
-    val action3 = datastore.get[WhiskAction](i2, attachmentHandler).futureValue
-
-    docsToDelete += ((datastore, i2))
-
-    attached(action2).attachmentName should not be attached(action3).attachmentName
-
-    //Check that attachment name is actually a uri
-    val attachmentUri = Uri(attached(action2).attachmentName)
-    attachmentUri.isAbsolute shouldBe true
-  }
-
-  it should "put and read same attachment" in {
-    implicit val tid: TransactionId = transid()
-    val size = 4000
-    val bytes = randomBytes(size)
-    val base64 = Base64.getEncoder.encodeToString(bytes)
-
-    val exec = javaDefault(base64, Some("hello"))
-    val javaAction =
-      WhiskAction(namespace, EntityName("attachment_unique"), exec)
-
-    val i1 = WhiskAction.put(datastore, javaAction, old = None).futureValue
-    val action2 = datastore.get[WhiskAction](i1, attachmentHandler).futureValue
-    val action3 = WhiskAction.get(datastore, i1.id, i1.rev).futureValue
-
-    docsToDelete += ((datastore, i1))
-
-    attached(action2).attachmentType shouldBe ExecManifest.runtimesManifest
-      .resolveDefaultRuntime(JAVA_DEFAULT)
-      .get
-      .attached
-      .get
-      .attachmentType
-    attached(action2).length shouldBe Some(size)
-    attached(action2).digest should not be empty
-
-    action3.exec shouldBe exec
-    inlined(action3).value shouldBe base64
-  }
-
-  private def attached(a: WhiskAction): Attached =
-    a.exec.asInstanceOf[CodeExec[Attachment[Nothing]]].code.asInstanceOf[Attached]
-
-  private def inlined(a: WhiskAction): Inline[String] =
-    a.exec.asInstanceOf[CodeExec[Attachment[String]]].code.asInstanceOf[Inline[String]]
-
-  private def randomBytes(size: Int): Array[Byte] = {
-    val arr = new Array[Byte](size)
-    Random.nextBytes(arr)
-    arr
-  }
-}
diff --git a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
index 9e2f875aa1..39ddad3b80 100644
--- a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
+++ b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
@@ -17,6 +17,7 @@
 
 package whisk.core.database.test
 
+import java.util.Base64
 import java.util.concurrent.TimeoutException
 import java.util.concurrent.atomic.AtomicInteger
 
@@ -27,9 +28,7 @@ import scala.concurrent.Future
 import scala.concurrent.duration.Duration
 import scala.concurrent.duration.DurationInt
 import scala.language.postfixOps
-import scala.util.Failure
-import scala.util.Success
-import scala.util.Try
+import scala.util.{Failure, Random, Success, Try}
 import spray.json._
 import spray.json.DefaultJsonProtocol._
 import whisk.common.TransactionId
@@ -284,6 +283,47 @@ trait DbUtils {
     docsToDelete.clear()
   }
 
+  /**
+   * Generates a Base64 string for code which would not be inlined by the ArtifactStore
+   */
+  def nonInlinedCode(db: ArtifactStore[_]): String = {
+    encodedRandomBytes(nonInlinedAttachmentSize(db))
+  }
+
+  /**
+   * Size in bytes for attachments which would always be inlined.
+   */
+  def inlinedAttachmentSize(db: ArtifactStore[_]): Int = {
+    db match {
+      case inliner: AttachmentInliner =>
+        inliner.maxInlineSize.toBytes.toInt - 1
+      case _ =>
+        throw new IllegalStateException(s"ArtifactStore does not support attachment inlining $db")
+    }
+  }
+
+  /**
+   * Size in bytes for attachments which would never be inlined.
+   */
+  def nonInlinedAttachmentSize(db: ArtifactStore[_]): Int = {
+    db match {
+      case inliner: AttachmentInliner =>
+        val inlineSize = inliner.maxInlineSize.toBytes.toInt
+        val chunkSize = inliner.chunkSize.toBytes.toInt
+        Math.max(inlineSize, chunkSize) * 2
+      case _ =>
+        42
+    }
+  }
+
+  protected def encodedRandomBytes(size: Int): String = Base64.getEncoder.encodeToString(randomBytes(size))
+
   def isMemoryStore(store: ArtifactStore[_]): Boolean = store.isInstanceOf[MemoryArtifactStore[_]]
   def isCouchStore(store: ArtifactStore[_]): Boolean = store.isInstanceOf[CouchDbRestStore[_]]
+
+  private def randomBytes(size: Int): Array[Byte] = {
+    val arr = new Array[Byte](size)
+    Random.nextBytes(arr)
+    arr
+  }
 }
diff --git a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
index 62ae0005c9..18083c3cc9 100644
--- a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
+++ b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
@@ -17,16 +17,16 @@
 
 package whisk.core.database.test.behavior
 
-import java.util.Base64
+import java.io.ByteArrayOutputStream
 
-import akka.http.scaladsl.model.Uri
+import akka.http.scaladsl.model.{ContentTypes, Uri}
+import akka.stream.IOResult
+import akka.stream.scaladsl.StreamConverters
 import whisk.common.TransactionId
-import whisk.core.database.CacheChangeNotification
+import whisk.core.database.{AttachmentInliner, CacheChangeNotification, NoDocumentException}
 import whisk.core.entity.Attachments.{Attached, Attachment, Inline}
 import whisk.core.entity.test.ExecHelpers
-import whisk.core.entity.{CodeExec, EntityName, ExecManifest, WhiskAction}
-
-import scala.util.Random
+import whisk.core.entity.{CodeExec, DocInfo, EntityName, ExecManifest, WhiskAction}
 
 trait ArtifactStoreAttachmentBehaviors extends ArtifactStoreBehaviorBase with ExecHelpers {
   behavior of "Attachments"
@@ -37,7 +37,7 @@ trait ArtifactStoreAttachmentBehaviors extends ArtifactStoreBehaviorBase with Ex
 
   it should "generate different attachment name on update" in {
     implicit val tid: TransactionId = transid()
-    val exec = javaDefault("ZHViZWU=", Some("hello"))
+    val exec = javaDefault(nonInlinedCode(entityStore), Some("hello"))
     val javaAction =
       WhiskAction(namespace, EntityName("attachment_unique"), exec)
 
@@ -60,9 +60,8 @@ trait ArtifactStoreAttachmentBehaviors extends ArtifactStoreBehaviorBase with Ex
 
   it should "put and read same attachment" in {
     implicit val tid: TransactionId = transid()
-    val size = 4000
-    val bytes = randomBytes(size)
-    val base64 = Base64.getEncoder.encodeToString(bytes)
+    val size = nonInlinedAttachmentSize(entityStore)
+    val base64 = encodedRandomBytes(size)
 
     val exec = javaDefault(base64, Some("hello"))
     val javaAction =
@@ -87,15 +86,47 @@ trait ArtifactStoreAttachmentBehaviors extends ArtifactStoreBehaviorBase with Ex
     inlined(action3).value shouldBe base64
   }
 
+  it should "inline small attachments" in {
+    implicit val tid: TransactionId = transid()
+    val attachmentSize = inlinedAttachmentSize(entityStore) - 1
+    val base64 = encodedRandomBytes(attachmentSize)
+
+    val exec = javaDefault(base64, Some("hello"))
+    val javaAction = WhiskAction(namespace, EntityName("attachment_inline"), exec)
+
+    val i1 = WhiskAction.put(entityStore, javaAction, old = None).futureValue
+    val action2 = entityStore.get[WhiskAction](i1, attachmentHandler).futureValue
+    val action3 = WhiskAction.get(entityStore, i1.id, i1.rev).futureValue
+
+    docsToDelete += ((entityStore, i1))
+
+    action3.exec shouldBe exec
+    inlined(action3).value shouldBe base64
+
+    val a = attached(action2)
+
+    val attachmentUri = Uri(a.attachmentName)
+    attachmentUri.scheme shouldBe AttachmentInliner.MemScheme
+    a.length shouldBe Some(attachmentSize)
+    a.digest should not be empty
+  }
+
+  it should "throw NoDocumentException for non existing attachment" in {
+    implicit val tid: TransactionId = transid()
+
+    val sink = StreamConverters.fromOutputStream(() => new ByteArrayOutputStream())
+    entityStore
+      .readAttachment[IOResult](
+        DocInfo ! ("non-existing-doc", "42"),
+        Attached("foo", ContentTypes.`application/octet-stream`),
+        sink)
+      .failed
+      .futureValue shouldBe a[NoDocumentException]
+  }
+
   private def attached(a: WhiskAction): Attached =
     a.exec.asInstanceOf[CodeExec[Attachment[Nothing]]].code.asInstanceOf[Attached]
 
   private def inlined(a: WhiskAction): Inline[String] =
     a.exec.asInstanceOf[CodeExec[Attachment[String]]].code.asInstanceOf[Inline[String]]
-
-  private def randomBytes(size: Int): Array[Byte] = {
-    val arr = new Array[Byte](size)
-    Random.nextBytes(arr)
-    arr
-  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services