You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/06/21 01:04:45 UTC

[GitHub] rabbah closed pull request #3777: Avoid converting ByteString to bytes in attachment inlining flow

rabbah closed pull request #3777: Avoid converting ByteString to bytes in attachment inlining flow
URL: https://github.com/apache/incubator-openwhisk/pull/3777
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 6aa1bbd890..3ceaab8b77 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -117,11 +117,7 @@ whisk {
 
         # Size limit for inlined attachments. Attachments having size less than this would
         # be inlined with there content encoded in attachmentName
-        max-inline-size = 0 k
-
-        # Chunk sized for converting source of bytes to ByteString as part of attachment
-        # upload flow
-        chunk-size = 8 k
+        max-inline-size = 16 k
     }
 
     # CouchDB related configuration
diff --git a/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala b/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala
deleted file mode 100644
index 2369b1bf26..0000000000
--- a/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.database
-
-import java.util.Base64
-
-import akka.NotUsed
-import akka.http.scaladsl.model.Uri
-import akka.stream.Materializer
-import akka.stream.scaladsl.{Concat, Sink, Source}
-import akka.util.{ByteString, ByteStringBuilder}
-import whisk.core.database.AttachmentInliner.MemScheme
-import whisk.core.entity.ByteSize
-
-import scala.collection.immutable
-import scala.concurrent.Future
-
-object AttachmentInliner {
-
-  /**
-   * Scheme name for attachments which are inlined
-   */
-  val MemScheme: String = "mem"
-}
-
-case class InliningConfig(maxInlineSize: ByteSize, chunkSize: ByteSize)
-
-/**
- * Provides support for inlining small attachments. Inlined attachment contents are encoded as part of attachment
- * name itself.
- */
-trait AttachmentInliner {
-
-  /** Materializer required for stream processing */
-  protected[core] implicit val materializer: Materializer
-
-  protected[database] def inlineAndTail(
-    docStream: Source[ByteString, _]): Future[(immutable.Seq[Byte], Source[Byte, _])] = {
-    docStream
-      .mapConcat(_.seq)
-      .prefixAndTail(maxInlineSize.toBytes.toInt)
-      .runWith(Sink.head[(immutable.Seq[Byte], Source[Byte, _])])
-  }
-
-  protected[database] def uriOf(bytes: Seq[Byte], path: => String): Uri = {
-    //For less than case its definitive that tail source would be empty
-    //for equal case it cannot be determined if tail source is empty. Actual max inline size
-    //would be inlineSize - 1
-    if (bytes.size < maxInlineSize.toBytes) {
-      Uri.from(scheme = MemScheme, path = encode(bytes))
-    } else {
-      Uri.from(scheme = attachmentScheme, path = path)
-    }
-  }
-
-  /**
-   * Constructs a combined source based on attachment content read so far and rest of unread content.
-   * Emitted elements are up to `chunkSize` sized [[akka.util.ByteString]] elements.
-   */
-  protected[database] def combinedSource(inlinedBytes: immutable.Seq[Byte],
-                                         tailSource: Source[Byte, _]): Source[ByteString, NotUsed] =
-    Source
-      .combine(Source(inlinedBytes), tailSource)(Concat[Byte])
-      .batch[ByteStringBuilder](chunkSize.toBytes, b => { val bb = new ByteStringBuilder(); bb += b })((bb, b) =>
-        bb += b)
-      .map(_.result())
-
-  /**
-   * Constructs a source from inlined attachment contents
-   */
-  protected[database] def memorySource(uri: Uri): Source[ByteString, NotUsed] = {
-    require(uri.scheme == MemScheme, s"URI $uri scheme is not $MemScheme")
-    Source.single(ByteString(decode(uri)))
-  }
-
-  protected[database] def isInlined(uri: Uri): Boolean = uri.scheme == MemScheme
-
-  protected[database] def digest(bytes: TraversableOnce[Byte]): String = {
-    val digester = StoreUtils.emptyDigest()
-    digester.update(bytes.toArray)
-    StoreUtils.encodeDigest(digester.digest())
-  }
-
-  /**
-   * Attachments having size less than this would be inlined
-   */
-  def maxInlineSize: ByteSize = inliningConfig.maxInlineSize
-
-  def chunkSize: ByteSize = inliningConfig.chunkSize
-
-  protected def inliningConfig: InliningConfig
-
-  protected def attachmentScheme: String
-
-  private def encode(bytes: Seq[Byte]): String = {
-    Base64.getUrlEncoder.encodeToString(bytes.toArray)
-  }
-
-  private def decode(uri: Uri): Array[Byte] = {
-    Base64.getUrlDecoder.decode(uri.path.toString())
-  }
-}
diff --git a/common/scala/src/main/scala/whisk/core/database/AttachmentSupport.scala b/common/scala/src/main/scala/whisk/core/database/AttachmentSupport.scala
new file mode 100644
index 0000000000..72f7754d38
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/database/AttachmentSupport.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database
+
+import java.util.Base64
+
+import akka.NotUsed
+import akka.http.scaladsl.model.{ContentType, Uri}
+import akka.stream.Materializer
+import akka.stream.scaladsl.{Sink, Source}
+import akka.util.ByteString
+import spray.json.DefaultJsonProtocol
+import whisk.common.TransactionId
+import whisk.core.database.AttachmentSupport.MemScheme
+import whisk.core.entity.Attachments.Attached
+import whisk.core.entity.{ByteSize, DocId, DocInfo, UUID}
+
+import scala.concurrent.{ExecutionContext, Future}
+
+object AttachmentSupport {
+
+  /**
+   * Scheme name for attachments which are inlined
+   */
+  val MemScheme: String = "mem"
+}
+
+case class InliningConfig(maxInlineSize: ByteSize)
+
+/**
+ * Provides support for inlining small attachments. Inlined attachment contents are encoded as part of attachment
+ * name itself.
+ */
+trait AttachmentSupport[DocumentAbstraction <: DocumentSerializer] extends DefaultJsonProtocol {
+
+  /** Materializer required for stream processing */
+  protected[core] implicit val materializer: Materializer
+
+  protected def executionContext: ExecutionContext
+
+  /**
+   * Attachment scheme name to use for non inlined attachments
+   */
+  protected def attachmentScheme: String
+
+  protected def inliningConfig: InliningConfig
+
+  /**
+   * Attachments having size less than this would be inlined
+   */
+  def maxInlineSize: ByteSize = inliningConfig.maxInlineSize
+
+  /**
+   * See {{ ArtifactStore#put }}
+   */
+  protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo]
+
+  /**
+   * Given a ByteString source it determines if the source can be inlined or not by returning an
+   * Either - Left(byteString) containing all the bytes from the source or Right(Source[ByteString, _])
+   * if the source is large
+   */
+  protected[database] def inlineOrAttach(
+    docStream: Source[ByteString, _],
+    previousPrefix: ByteString = ByteString.empty): Future[Either[ByteString, Source[ByteString, _]]] = {
+    implicit val ec = executionContext
+    docStream.prefixAndTail(1).runWith(Sink.head).flatMap {
+      case (Nil, _) =>
+        Future.successful(Left(previousPrefix))
+      case (Seq(prefix), tail) =>
+        val completePrefix = previousPrefix ++ prefix
+        if (completePrefix.size < maxInlineSize.toBytes) {
+          inlineOrAttach(tail, completePrefix)
+        } else {
+          Future.successful(Right(tail.prepend(Source.single(completePrefix))))
+        }
+    }
+  }
+
+  /**
+   * Constructs a URI for the attachment
+   *
+   * @param bytesOrSource either byteString or byteString source
+   * @param path function to generate the attachment name for non inlined case
+   * @return constructed uri. In case of inlined attachment the uri contains base64 encoded inlined attachment content
+   */
+  protected[database] def uriOf(bytesOrSource: Either[ByteString, Source[ByteString, _]], path: => String): Uri = {
+    bytesOrSource match {
+      case Left(bytes) => Uri.from(scheme = MemScheme, path = encode(bytes))
+      case Right(_)    => Uri.from(scheme = attachmentScheme, path = path)
+    }
+  }
+
+  /**
+   * Constructs a source from inlined attachment contents
+   */
+  protected[database] def memorySource(uri: Uri): Source[ByteString, NotUsed] = {
+    require(uri.scheme == MemScheme, s"URI $uri scheme is not $MemScheme")
+    Source.single(ByteString(decode(uri)))
+  }
+
+  protected[database] def isInlined(uri: Uri): Boolean = uri.scheme == MemScheme
+
+  /**
+   * Computes digest for passed bytes as hex encoded string
+   */
+  protected[database] def digest(bytes: TraversableOnce[Byte]): String = {
+    val digester = StoreUtils.emptyDigest()
+    digester.update(bytes.toArray)
+    StoreUtils.encodeDigest(digester.digest())
+  }
+
+  /**
+   * Attaches the passed source content to  an {{ AttachmentStore }}
+   *
+   * @param doc document with attachment
+   * @param update function to update the `Attached` state with attachment metadata
+   * @param contentType contentType of the attachment
+   * @param docStream attachment source
+   * @param oldAttachment old attachment in case of update. Required for deleting the old attachment
+   * @param attachmentStore attachmentStore where attachment needs to be stored
+   *
+   * @return a tuple of updated document info and attachment metadata
+   */
+  protected[database] def attachToExternalStore[A <: DocumentAbstraction](
+    doc: A,
+    update: (A, Attached) => A,
+    contentType: ContentType,
+    docStream: Source[ByteString, _],
+    oldAttachment: Option[Attached],
+    attachmentStore: AttachmentStore)(implicit transid: TransactionId): Future[(DocInfo, Attached)] = {
+
+    val asJson = doc.toDocumentRecord
+    val id = asJson.fields("_id").convertTo[String].trim
+
+    implicit val ec = executionContext
+
+    for {
+      bytesOrSource <- inlineOrAttach(docStream)
+      uri = uriOf(bytesOrSource, UUID().asString)
+      attached <- {
+        // Upload if cannot be inlined
+        bytesOrSource match {
+          case Left(bytes) =>
+            Future.successful(Attached(uri.toString, contentType, Some(bytes.size), Some(digest(bytes))))
+          case Right(source) =>
+            attachmentStore
+              .attach(DocId(id), uri.path.toString, contentType, source)
+              .map(r => Attached(uri.toString, contentType, Some(r.length), Some(r.digest)))
+        }
+      }
+      i1 <- put(update(doc, attached))
+
+      //Remove old attachment if it was part of attachmentStore
+      _ <- oldAttachment
+        .map { old =>
+          val oldUri = Uri(old.attachmentName)
+          if (oldUri.scheme == attachmentStore.scheme) {
+            attachmentStore.deleteAttachment(DocId(id), oldUri.path.toString)
+          } else {
+            Future.successful(true)
+          }
+        }
+        .getOrElse(Future.successful(true))
+    } yield (i1, attached)
+  }
+
+  private def encode(bytes: Seq[Byte]): String = {
+    Base64.getUrlEncoder.encodeToString(bytes.toArray)
+  }
+
+  private def decode(uri: Uri): Array[Byte] = {
+    Base64.getUrlDecoder.decode(uri.path.toString())
+  }
+}
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
index 853adea9c6..fba0def425 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
@@ -27,7 +27,7 @@ import spray.json._
 import whisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
 import whisk.core.database.StoreUtils._
 import whisk.core.entity.Attachments.Attached
-import whisk.core.entity.{BulkEntityResult, DocId, DocInfo, DocumentReader, UUID}
+import whisk.core.entity.{BulkEntityResult, DocInfo, DocumentReader, UUID}
 import whisk.http.Messages
 
 import scala.concurrent.duration._
@@ -61,7 +61,7 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
   docReader: DocumentReader)
     extends ArtifactStore[DocumentAbstraction]
     with DefaultJsonProtocol
-    with AttachmentInliner {
+    with AttachmentSupport[DocumentAbstraction] {
 
   protected[core] implicit val executionContext = system.dispatchers.lookup("dispatchers.couch-dispatcher")
 
@@ -360,8 +360,8 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
     oldAttachment: Option[Attached])(implicit transid: TransactionId): Future[(DocInfo, Attached)] = {
 
     attachmentStore match {
-      case Some(_) =>
-        attachToExternalStore(doc, update, contentType, docStream, oldAttachment)
+      case Some(as) =>
+        attachToExternalStore(doc, update, contentType, docStream, oldAttachment, as)
       case None =>
         attachToCouch(doc, update, contentType, docStream)
     }
@@ -371,7 +371,7 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
     doc: A,
     update: (A, Attached) => A,
     contentType: ContentType,
-    docStream: Source[ByteString, _])(implicit transid: TransactionId) = {
+    docStream: Source[ByteString, _])(implicit transid: TransactionId): Future[(DocInfo, Attached)] = {
 
     if (maxInlineSize.toBytes == 0) {
       val uri = Uri.from(scheme = attachmentScheme, path = UUID().asString)
@@ -382,67 +382,24 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
       } yield (i2, attached)
     } else {
       for {
-        (bytes, tailSource) <- inlineAndTail(docStream)
-        uri <- Future.successful(uriOf(bytes, UUID().asString))
+        bytesOrSource <- inlineOrAttach(docStream)
+        uri = uriOf(bytesOrSource, UUID().asString)
         attached <- {
-          val a = if (isInlined(uri)) {
-            Attached(uri.toString, contentType, Some(bytes.size), Some(digest(bytes)))
-          } else {
-            Attached(uri.toString, contentType)
+          val a = bytesOrSource match {
+            case Left(bytes) => Attached(uri.toString, contentType, Some(bytes.size), Some(digest(bytes)))
+            case Right(_)    => Attached(uri.toString, contentType)
           }
           Future.successful(a)
         }
         i1 <- put(update(doc, attached))
-        i2 <- if (isInlined(uri)) {
-          Future.successful(i1)
-        } else {
-          attach(i1, uri.path.toString, attached.attachmentType, combinedSource(bytes, tailSource))
+        i2 <- bytesOrSource match {
+          case Left(_)  => Future.successful(i1)
+          case Right(s) => attach(i1, uri.path.toString, attached.attachmentType, s)
         }
       } yield (i2, attached)
     }
   }
 
-  private def attachToExternalStore[A <: DocumentAbstraction](
-    doc: A,
-    update: (A, Attached) => A,
-    contentType: ContentType,
-    docStream: Source[ByteString, _],
-    oldAttachment: Option[Attached])(implicit transid: TransactionId) = {
-    val as = attachmentStore.get
-    val asJson = doc.toDocumentRecord
-    val id = asJson.fields("_id").convertTo[String].trim
-
-    for {
-      (bytes, tailSource) <- inlineAndTail(docStream)
-      uri <- Future.successful(uriOf(bytes, UUID().asString))
-      attached <- {
-        // Upload if cannot be inlined
-        if (isInlined(uri)) {
-          val a = Attached(uri.toString, contentType, Some(bytes.size), Some(digest(bytes)))
-          Future.successful(a)
-        } else {
-          as.attach(DocId(id), uri.path.toString, contentType, combinedSource(bytes, tailSource))
-            .map { r =>
-              Attached(uri.toString, contentType, Some(r.length), Some(r.digest))
-            }
-        }
-      }
-      i1 <- put(update(doc, attached))
-
-      //Remove old attachment if it was part of attachmentStore
-      _ <- oldAttachment
-        .map { old =>
-          val oldUri = Uri(old.attachmentName)
-          if (oldUri.scheme == as.scheme) {
-            as.deleteAttachment(DocId(id), oldUri.path.toString)
-          } else {
-            Future.successful(true)
-          }
-        }
-        .getOrElse(Future.successful(true))
-    } yield (i1, attached)
-  }
-
   private def attach(doc: DocInfo, name: String, contentType: ContentType, docStream: Source[ByteString, _])(
     implicit transid: TransactionId): Future[DocInfo] = {
 
@@ -492,7 +449,7 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
     val name = attached.attachmentName
     val attachmentUri = Uri(name)
     attachmentUri.scheme match {
-      case AttachmentInliner.MemScheme =>
+      case AttachmentSupport.MemScheme =>
         memorySource(attachmentUri).runWith(sink)
       case s if s == couchScheme || attachmentUri.isRelative =>
         //relative case is for compatibility with earlier naming approach where attachment name would be like 'jarfile'
diff --git a/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala b/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
index 140b854810..f01064c181 100644
--- a/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
@@ -94,7 +94,7 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str
     extends ArtifactStore[DocumentAbstraction]
     with DefaultJsonProtocol
     with DocumentProvider
-    with AttachmentInliner {
+    with AttachmentSupport[DocumentAbstraction] {
 
   override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher
 
@@ -288,38 +288,7 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str
     contentType: ContentType,
     docStream: Source[ByteString, _],
     oldAttachment: Option[Attached])(implicit transid: TransactionId): Future[(DocInfo, Attached)] = {
-
-    val asJson = d.toDocumentRecord
-    val id = asJson.fields(_id).convertTo[String].trim
-    //Inlined attachment with Memory storage is not required. However to validate the constructs
-    //inlined support is implemented
-    for {
-      (bytes, tailSource) <- inlineAndTail(docStream)
-      uri <- Future.successful(uriOf(bytes, UUID().asString))
-      attached <- {
-        if (isInlined(uri)) {
-          val a = Attached(uri.toString, contentType, Some(bytes.size), Some(digest(bytes)))
-          Future.successful(a)
-        } else {
-          attachmentStore
-            .attach(DocId(id), uri.path.toString, contentType, combinedSource(bytes, tailSource))
-            .map { r =>
-              Attached(uri.toString, contentType, Some(r.length), Some(r.digest))
-            }
-        }
-      }
-      i1 <- put(update(d, attached))
-      _ <- oldAttachment
-        .map { old =>
-          val oldUri = Uri(old.attachmentName)
-          if (oldUri.scheme == attachmentStore.scheme) {
-            attachmentStore.deleteAttachment(DocId(id), oldUri.path.toString)
-          } else {
-            Future.successful(true)
-          }
-        }
-        .getOrElse(Future.successful(true))
-    } yield (i1, attached)
+    attachToExternalStore(d, update, contentType, docStream, oldAttachment, attachmentStore)
   }
 
   override def shutdown(): Unit = {
diff --git a/common/scala/src/main/scala/whisk/core/database/memory/MemoryAttachmentStore.scala b/common/scala/src/main/scala/whisk/core/database/memory/MemoryAttachmentStore.scala
index face871081..cb80c24cef 100644
--- a/common/scala/src/main/scala/whisk/core/database/memory/MemoryAttachmentStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/memory/MemoryAttachmentStore.scala
@@ -62,7 +62,8 @@ class MemoryAttachmentStore(dbName: String)(implicit system: ActorSystem,
     contentType: ContentType,
     docStream: Source[ByteString, _])(implicit transid: TransactionId): Future[AttachResult] = {
     require(name != null, "name undefined")
-    val start = transid.started(this, DATABASE_ATT_SAVE, s"[ATT_PUT] uploading attachment '$name' of document '$docId'")
+    val start =
+      transid.started(this, DATABASE_ATT_SAVE, s"[ATT_PUT] uploading attachment '$name' of document 'id: $docId'")
 
     val uploadSink = Sink.fold[ByteStringBuilder, ByteString](new ByteStringBuilder)((builder, b) => builder ++= b)
 
@@ -78,7 +79,8 @@ class MemoryAttachmentStore(dbName: String)(implicit system: ActorSystem,
     reportFailure(
       g,
       start,
-      failure => s"[ATT_PUT] '$dbName' internal error, name: '$name', doc: '$docId', failure: '${failure.getMessage}'")
+      failure =>
+        s"[ATT_PUT] '$dbName' internal error, name: '$name', doc: 'id: $docId', failure: '${failure.getMessage}'")
   }
 
   /**
@@ -88,7 +90,10 @@ class MemoryAttachmentStore(dbName: String)(implicit system: ActorSystem,
     implicit transid: TransactionId): Future[T] = {
 
     val start =
-      transid.started(this, DATABASE_ATT_GET, s"[ATT_GET] '$dbName' finding attachment '$name' of document '$docId'")
+      transid.started(
+        this,
+        DATABASE_ATT_GET,
+        s"[ATT_GET] '$dbName' finding attachment '$name' of document 'id: $docId'")
 
     val f = attachments.get(attachmentKey(docId, name)) match {
       case Some(Attachment(bytes)) =>
diff --git a/tests/src/test/scala/whisk/core/database/test/AttachmentInlinerTests.scala b/tests/src/test/scala/whisk/core/database/test/AttachmentSupportTests.scala
similarity index 68%
rename from tests/src/test/scala/whisk/core/database/test/AttachmentInlinerTests.scala
rename to tests/src/test/scala/whisk/core/database/test/AttachmentSupportTests.scala
index 783d57e348..e93475d19f 100644
--- a/tests/src/test/scala/whisk/core/database/test/AttachmentInlinerTests.scala
+++ b/tests/src/test/scala/whisk/core/database/test/AttachmentSupportTests.scala
@@ -20,40 +20,38 @@ package whisk.core.database.test
 import akka.http.scaladsl.model.Uri
 import akka.stream.scaladsl.Source
 import akka.stream.{ActorMaterializer, Materializer}
-import akka.util.{ByteStringBuilder, CompactByteString}
+import akka.util.CompactByteString
 import common.WskActorSystem
 import org.junit.runner.RunWith
-import whisk.core.entity.size._
-import org.scalatest.{FlatSpec, Matchers}
 import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.junit.JUnitRunner
-import whisk.core.database.{AttachmentInliner, InliningConfig}
+import org.scalatest.{FlatSpec, Matchers}
+import whisk.common.TransactionId
+import whisk.core.database.{AttachmentSupport, InliningConfig}
+import whisk.core.entity.WhiskEntity
+import whisk.core.entity.size._
 
 @RunWith(classOf[JUnitRunner])
-class AttachmentInlinerTests extends FlatSpec with Matchers with ScalaFutures with WskActorSystem {
+class AttachmentSupportTests extends FlatSpec with Matchers with ScalaFutures with WskActorSystem {
 
   behavior of "Attachment inlining"
 
   implicit val materializer: Materializer = ActorMaterializer()
 
   it should "not inline if maxInlineSize set to zero" in {
-    val inliner = new TestInliner(InliningConfig(maxInlineSize = 0.KB, chunkSize = 8.KB))
+    val inliner = new AttachmentSupportTestMock(InliningConfig(maxInlineSize = 0.KB))
     val bs = CompactByteString("hello world")
 
-    val (head, tail) = inliner.inlineAndTail(Source.single(bs)).futureValue
-    val uri = inliner.uriOf(head, "foo")
+    val bytesOrSource = inliner.inlineOrAttach(Source.single(bs)).futureValue
+    val uri = inliner.uriOf(bytesOrSource, "foo")
 
     uri shouldBe Uri("test:foo")
-
-    val bsResult = toByteString(inliner.combinedSource(head, tail)).futureValue
-    bsResult shouldBe bs
   }
 
-  private def toByteString(docStream: Source[Traversable[Byte], _]) =
-    docStream.runFold(new ByteStringBuilder)((builder, b) => builder ++= b).map(_.result().compact)
-
-  class TestInliner(val inliningConfig: InliningConfig) extends AttachmentInliner {
+  class AttachmentSupportTestMock(val inliningConfig: InliningConfig) extends AttachmentSupport[WhiskEntity] {
     override protected[core] implicit val materializer: Materializer = ActorMaterializer()
     override protected def attachmentScheme: String = "test"
+    override protected def executionContext = actorSystem.dispatcher
+    override protected[database] def put(d: WhiskEntity)(implicit transid: TransactionId) = ???
   }
 }
diff --git a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
index 47c884a3a1..78910fcdef 100644
--- a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
+++ b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
@@ -296,7 +296,7 @@ trait DbUtils extends Assertions {
    */
   def inlinedAttachmentSize(db: ArtifactStore[_]): Int = {
     db match {
-      case inliner: AttachmentInliner =>
+      case inliner: AttachmentSupport[_] =>
         inliner.maxInlineSize.toBytes.toInt - 1
       case _ =>
         throw new IllegalStateException(s"ArtifactStore does not support attachment inlining $db")
@@ -308,10 +308,8 @@ trait DbUtils extends Assertions {
    */
   def nonInlinedAttachmentSize(db: ArtifactStore[_]): Int = {
     db match {
-      case inliner: AttachmentInliner =>
-        val inlineSize = inliner.maxInlineSize.toBytes.toInt
-        val chunkSize = inliner.chunkSize.toBytes.toInt
-        Math.max(inlineSize, chunkSize) * 2
+      case inliner: AttachmentSupport[_] =>
+        inliner.maxInlineSize.toBytes.toInt * 2
       case _ =>
         42
     }
diff --git a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
index a32eb282dc..82b6b80d33 100644
--- a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
+++ b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
@@ -25,7 +25,8 @@ import akka.stream.IOResult
 import akka.stream.scaladsl.{Sink, StreamConverters}
 import akka.util.{ByteString, ByteStringBuilder}
 import whisk.common.TransactionId
-import whisk.core.database.{AttachmentInliner, CacheChangeNotification, NoDocumentException}
+import whisk.core.entity.size._
+import whisk.core.database.{AttachmentSupport, CacheChangeNotification, NoDocumentException}
 import whisk.core.entity.Attachments.{Attached, Attachment, Inline}
 import whisk.core.entity.test.ExecHelpers
 import whisk.core.entity.{CodeExec, DocInfo, EntityName, ExecManifest, WhiskAction}
@@ -112,14 +113,14 @@ trait ArtifactStoreAttachmentBehaviors extends ArtifactStoreBehaviorBase with Ex
     getAttachmentBytes(i2, attached(action2)).futureValue.result() shouldBe decode(code1)
   }
 
-  it should "put and read same attachment" in {
+  it should "put and read 5 MB attachment" in {
     implicit val tid: TransactionId = transid()
-    val size = nonInlinedAttachmentSize(entityStore)
+    val size = Math.max(nonInlinedAttachmentSize(entityStore), 5.MB.toBytes.toInt)
     val base64 = encodedRandomBytes(size)
 
     val exec = javaDefault(base64, Some("hello"))
     val javaAction =
-      WhiskAction(namespace, EntityName("attachment_unique"), exec)
+      WhiskAction(namespace, EntityName("attachment_large"), exec)
 
     val i1 = WhiskAction.put(entityStore, javaAction, old = None).futureValue
     val action2 = entityStore.get[WhiskAction](i1, attachmentHandler).futureValue
@@ -161,7 +162,7 @@ trait ArtifactStoreAttachmentBehaviors extends ArtifactStoreBehaviorBase with Ex
     val a = attached(action2)
 
     val attachmentUri = Uri(a.attachmentName)
-    attachmentUri.scheme shouldBe AttachmentInliner.MemScheme
+    attachmentUri.scheme shouldBe AttachmentSupport.MemScheme
     a.length shouldBe Some(attachmentSize)
     a.digest should not be empty
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services