You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2018/07/27 18:24:03 UTC

[incubator-openwhisk] branch master updated: S3AttachmentStore (#3779)

This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fea23b  S3AttachmentStore (#3779)
4fea23b is described below

commit 4fea23bc06831d66a54f61a8353926cfebd3cb3c
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Fri Jul 27 23:53:59 2018 +0530

    S3AttachmentStore (#3779)
    
    This PR introduces a S3AttachmentStore which is an AttachmentStore implementation for storing attachments in S3 API compatible object storages.
---
 common/scala/build.gradle                          |   7 +
 common/scala/src/main/resources/reference.conf     |   2 +
 common/scala/src/main/resources/s3-reference.conf  |  78 +++++++++
 .../src/main/scala/whisk/core/WhiskConfig.scala    |   2 +
 .../whisk/core/database/AttachmentSupport.scala    |   5 +-
 .../whisk/core/database/CouchDbRestStore.scala     |   4 +-
 .../whisk/core/database/s3/S3AttachmentStore.scala | 192 +++++++++++++++++++++
 tests/build.gradle                                 |   2 +
 .../scala/actionContainers/ActionContainer.scala   |   2 +-
 ...scala => MemoryArtifactStoreBehaviorBase.scala} |  35 ++--
 .../database/memory/MemoryArtifactStoreTests.scala |  30 +---
 .../database/s3/S3AttachmentStoreAwsTests.scala    |  29 ++++
 .../s3/S3AttachmentStoreBehaviorBase.scala         |  50 ++++++
 .../database/s3/S3AttachmentStoreMinioTests.scala  |  31 ++++
 .../test/scala/whisk/core/database/s3/S3Aws.scala  |  73 ++++++++
 .../scala/whisk/core/database/s3/S3Minio.scala     | 111 ++++++++++++
 16 files changed, 606 insertions(+), 47 deletions(-)

diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 8cb91bf..5edea10 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -76,6 +76,13 @@ dependencies {
     compile 'io.reactivex:rxscala_2.11:0.26.5'
     compile 'io.reactivex:rxjava-reactive-streams:1.2.1'
     compile 'com.microsoft.azure:azure-cosmosdb:2.0.0'
+
+    compile ('com.lightbend.akka:akka-stream-alpakka-s3_2.11:0.19') {
+        exclude group: 'commons-logging'
+        exclude group: 'org.apache.httpcomponents' //Not used as alpakka uses akka-http
+        exclude group: 'com.fasterxml.jackson.core'
+        exclude group: 'com.fasterxml.jackson.dataformat'
+    }
     scoverage gradle.scoverage.deps
 }
 
diff --git a/common/scala/src/main/resources/reference.conf b/common/scala/src/main/resources/reference.conf
index 4c958df..b671f02 100644
--- a/common/scala/src/main/resources/reference.conf
+++ b/common/scala/src/main/resources/reference.conf
@@ -1,6 +1,8 @@
 # Licensed to the Apache Software Foundation (ASF) under one or more contributor
 # license agreements; and to You under the Apache License, Version 2.0.
 
+include "s3-reference.conf"
+
 whisk.spi {
   ArtifactStoreProvider = whisk.core.database.CouchDbStoreProvider
   ActivationStoreProvider = whisk.core.database.ArtifactActivationStoreProvider
diff --git a/common/scala/src/main/resources/s3-reference.conf b/common/scala/src/main/resources/s3-reference.conf
new file mode 100644
index 0000000..92bc23e
--- /dev/null
+++ b/common/scala/src/main/resources/s3-reference.conf
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more contributor
+# license agreements; and to You under the Apache License, Version 2.0.
+
+whisk {
+  s3 {
+    # See https://developer.lightbend.com/docs/alpakka/current/s3.html#usage
+    alpakka {
+      # whether the buffer request chunks (up to 5MB each) to "memory" or "disk"
+      buffer = "memory"
+
+      # location for temporary files, if buffer is set to "disk". If empty, uses the standard java temp path.
+      disk-buffer-path = ""
+
+      proxy {
+        # hostname of the proxy. If undefined ("") proxy is not enabled.
+        host = ""
+        port = 8000
+
+        # if "secure" is set to "true" then HTTPS will be used for all requests to S3, otherwise HTTP will be used
+        secure = true
+      }
+
+      # default values for AWS configuration. If credentials and/or region are not specified when creating S3Client,
+      # these values will be used.
+      aws {
+        # If this section is absent, the fallback behavior is to use the
+        # com.amazonaws.auth.DefaultAWSCredentialsProviderChain instance to resolve credentials
+        credentials {
+          # supported providers:
+          # anon - anonymous requests ("no auth")
+          # static - static credentials,
+          #   required params:
+          #     access-key-id
+          #     secret-access-key
+          #   optional:
+          #     token
+          # default: as described in com.amazonaws.auth.DefaultAWSCredentialsProviderChain docs,
+          # attempts to get the credentials from either:
+          #   - environment variables
+          #   - system properties
+          #   - credentials file
+          #   - EC2 credentials service
+          #   - IAM / metadata
+          provider = default
+        }
+
+        # If this section is absent, the fallback behavior is to use the
+        # com.amazonaws.regions.AwsRegionProvider.DefaultAwsRegionProviderChain instance to resolve region
+        region {
+          # supported providers:
+          # static - static credentials,
+          #   required params:
+          #     default-region
+          # default: as described in com.amazonaws.regions.AwsRegionProvider.DefaultAwsRegionProviderChain docs,
+          # attempts to get the region from either:
+          #   - environment variables
+          #   - system properties
+          #   - progile file
+          #   - EC2 metadata
+          provider = default
+        }
+      }
+
+      # Enable path style access to s3, i.e. "https://s3-eu-west-1.amazonaws.com/my.bucket/myobject"
+      # Default is virtual-hosted style.
+      # When using virtual hosted–style buckets with SSL, the S3 wild card certificate only matches buckets that do not contain periods.
+      # Buckets containing periods will lead to certificate errors. In those cases it's useful to enable path-style access.
+      path-style-access = true
+
+      # Custom endpoint url, used for alternate s3 implementations
+      # endpoint-url = null
+
+      # Which version of the list bucket api to use. Set to 1 to use the old style version 1 API.
+      # By default the newer version 2 api is used.
+      list-bucket-api-version = 2
+    }
+  }
+}
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 7396c02..4646dc2 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -243,4 +243,6 @@ object ConfigKeys {
   val containerProxy = "whisk.container-proxy"
   val containerProxyTimeouts = s"$containerProxy.timeouts"
 
+  val s3 = "whisk.s3"
+
 }
diff --git a/common/scala/src/main/scala/whisk/core/database/AttachmentSupport.scala b/common/scala/src/main/scala/whisk/core/database/AttachmentSupport.scala
index 72f7754..600e8b6 100644
--- a/common/scala/src/main/scala/whisk/core/database/AttachmentSupport.scala
+++ b/common/scala/src/main/scala/whisk/core/database/AttachmentSupport.scala
@@ -102,10 +102,13 @@ trait AttachmentSupport[DocumentAbstraction <: DocumentSerializer] extends Defau
   protected[database] def uriOf(bytesOrSource: Either[ByteString, Source[ByteString, _]], path: => String): Uri = {
     bytesOrSource match {
       case Left(bytes) => Uri.from(scheme = MemScheme, path = encode(bytes))
-      case Right(_)    => Uri.from(scheme = attachmentScheme, path = path)
+      case Right(_)    => uriFrom(scheme = attachmentScheme, path = path)
     }
   }
 
+  //Not using Uri.from due to https://github.com/akka/akka-http/issues/2080
+  protected[database] def uriFrom(scheme: String, path: String): Uri = Uri(s"$scheme:$path")
+
   /**
    * Constructs a source from inlined attachment contents
    */
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
index aa35358..8518a95 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
@@ -375,7 +375,7 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
     docStream: Source[ByteString, _])(implicit transid: TransactionId): Future[(DocInfo, Attached)] = {
 
     if (maxInlineSize.toBytes == 0) {
-      val uri = Uri.from(scheme = attachmentScheme, path = UUID().asString)
+      val uri = uriFrom(scheme = attachmentScheme, path = UUID().asString)
       for {
         attached <- Future.successful(Attached(uri.toString, contentType))
         i1 <- put(update(doc, attached))
@@ -550,7 +550,7 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
    */
   private def getAttachmentName(name: String): String = {
     Try(java.util.UUID.fromString(name))
-      .map(_ => Uri.from(scheme = attachmentScheme, path = name).toString)
+      .map(_ => uriFrom(scheme = attachmentScheme, path = name).toString)
       .getOrElse(name)
   }
 
diff --git a/common/scala/src/main/scala/whisk/core/database/s3/S3AttachmentStore.scala b/common/scala/src/main/scala/whisk/core/database/s3/S3AttachmentStore.scala
new file mode 100644
index 0000000..fee6c5f
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/database/s3/S3AttachmentStore.scala
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.s3
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.ContentType
+import akka.stream.ActorMaterializer
+import akka.stream.alpakka.s3.scaladsl.S3Client
+import akka.stream.alpakka.s3.{S3Exception, S3Settings}
+import akka.stream.scaladsl.{Sink, Source}
+import akka.util.ByteString
+import com.typesafe.config.Config
+import pureconfig.loadConfigOrThrow
+import whisk.common.LoggingMarkers.{DATABASE_ATTS_DELETE, DATABASE_ATT_DELETE, DATABASE_ATT_GET, DATABASE_ATT_SAVE}
+import whisk.common.{Logging, TransactionId}
+import whisk.core.ConfigKeys
+import whisk.core.database.StoreUtils._
+import whisk.core.database._
+import whisk.core.entity.DocId
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.reflect.ClassTag
+
+object S3AttachmentStoreProvider extends AttachmentStoreProvider {
+  val alpakkaConfigKey = s"${ConfigKeys.s3}.alpakka"
+  case class S3Config(bucket: String) {
+    def prefixFor[D](implicit tag: ClassTag[D]): String = {
+      tag.runtimeClass.getSimpleName.toLowerCase
+    }
+  }
+
+  override def makeStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
+                                                              logging: Logging,
+                                                              materializer: ActorMaterializer): AttachmentStore = {
+    val client = new S3Client(S3Settings(alpakkaConfigKey))
+    val config = loadConfigOrThrow[S3Config](ConfigKeys.s3)
+    new S3AttachmentStore(client, config.bucket, config.prefixFor[D])
+  }
+
+  def makeStore[D <: DocumentSerializer: ClassTag](config: Config)(implicit actorSystem: ActorSystem,
+                                                                   logging: Logging,
+                                                                   materializer: ActorMaterializer): AttachmentStore = {
+    val client = new S3Client(S3Settings(config, alpakkaConfigKey))
+    val s3config = loadConfigOrThrow[S3Config](config, ConfigKeys.s3)
+    new S3AttachmentStore(client, s3config.bucket, s3config.prefixFor[D])
+  }
+
+}
+class S3AttachmentStore(client: S3Client, bucket: String, prefix: String)(implicit system: ActorSystem,
+                                                                          logging: Logging,
+                                                                          materializer: ActorMaterializer)
+    extends AttachmentStore {
+  override val scheme = "s3"
+
+  override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher
+
+  override protected[core] def attach(
+    docId: DocId,
+    name: String,
+    contentType: ContentType,
+    docStream: Source[ByteString, _])(implicit transid: TransactionId): Future[AttachResult] = {
+    require(name != null, "name undefined")
+    val start =
+      transid.started(this, DATABASE_ATT_SAVE, s"[ATT_PUT] uploading attachment '$name' of document 'id: $docId'")
+
+    //A possible optimization for small attachments < 5MB can be to use putObject instead of multipartUpload
+    //and thus use 1 remote call instead of 3
+    val f = docStream
+      .runWith(combinedSink(client.multipartUpload(bucket, objectKey(docId, name), contentType)))
+      .map(r => AttachResult(r.digest, r.length))
+
+    f.onSuccess({
+      case _ =>
+        transid
+          .finished(this, start, s"[ATT_PUT] '$prefix' completed uploading attachment '$name' of document 'id: $docId'")
+    })
+
+    reportFailure(
+      f,
+      start,
+      failure => s"[ATT_PUT] '$prefix' internal error, name: '$name', doc: '$docId', failure: '${failure.getMessage}'")
+  }
+
+  override protected[core] def readAttachment[T](docId: DocId, name: String, sink: Sink[ByteString, Future[T]])(
+    implicit transid: TransactionId): Future[T] = {
+    require(name != null, "name undefined")
+    val start =
+      transid.started(
+        this,
+        DATABASE_ATT_GET,
+        s"[ATT_GET] '$prefix' finding attachment '$name' of document 'id: $docId'")
+    val (source, _) = client.download(bucket, objectKey(docId, name))
+
+    val f = source.runWith(sink)
+
+    val g = f.transform(
+      { s =>
+        transid
+          .finished(this, start, s"[ATT_GET] '$prefix' completed: found attachment '$name' of document 'id: $docId'")
+        s
+      }, {
+        case s: Throwable if isMissingKeyException(s) =>
+          transid
+            .finished(
+              this,
+              start,
+              s"[ATT_GET] '$prefix', retrieving attachment '$name' of document 'id: $docId'; not found.")
+          NoDocumentException("Not found on 'readAttachment'.")
+        case e => e
+      })
+
+    reportFailure(
+      g,
+      start,
+      failure =>
+        s"[ATT_GET] '$prefix' internal error, name: '$name', doc: 'id: $docId', failure: '${failure.getMessage}'")
+  }
+
+  override protected[core] def deleteAttachments(docId: DocId)(implicit transid: TransactionId): Future[Boolean] = {
+    val start =
+      transid.started(this, DATABASE_ATTS_DELETE, s"[ATT_DELETE] deleting attachments of document 'id: $docId'")
+
+    //S3 provides API to delete multiple objects in single call however alpakka client
+    //currently does not support that and also in current usage 1 docs has at most 1 attachment
+    //so current approach would also involve 2 remote calls
+    val f = client
+      .listBucket(bucket, Some(objectKeyPrefix(docId)))
+      .mapAsync(1)(bc => client.deleteObject(bc.bucketName, bc.key))
+      .runWith(Sink.seq)
+      .map(_ => true)
+
+    f.onSuccess {
+      case _ =>
+        transid.finished(this, start, s"[ATTS_DELETE] completed: deleting attachments of document 'id: $docId'")
+    }
+
+    reportFailure(
+      f,
+      start,
+      failure => s"[ATTS_DELETE] '$prefix' internal error, doc: '$docId', failure: '${failure.getMessage}'")
+  }
+
+  override protected[core] def deleteAttachment(docId: DocId, name: String)(
+    implicit transid: TransactionId): Future[Boolean] = {
+    val start =
+      transid.started(this, DATABASE_ATT_DELETE, s"[ATT_DELETE] deleting attachment '$name' of document 'id: $docId'")
+
+    val f = client
+      .deleteObject(bucket, objectKey(docId, name))
+      .map(_ => true)
+
+    f.onSuccess {
+      case _ =>
+        transid.finished(this, start, s"[ATT_DELETE] completed: deleting attachment '$name' of document 'id: $docId'")
+    }
+
+    reportFailure(
+      f,
+      start,
+      failure => s"[ATT_DELETE] '$prefix' internal error, doc: '$docId', failure: '${failure.getMessage}'")
+  }
+
+  override def shutdown(): Unit = {}
+
+  private def objectKey(id: DocId, name: String): String = s"$prefix/${id.id}/$name"
+
+  private def objectKeyPrefix(id: DocId): String = s"$prefix/${id.id}"
+
+  private def isMissingKeyException(e: Throwable): Boolean = {
+    //In some case S3Exception is a sub cause. So need to recurse
+    e match {
+      case s: S3Exception if s.code == "NoSuchKey"             => true
+      case t if t != null && isMissingKeyException(t.getCause) => true
+      case _                                                   => false
+    }
+  }
+}
diff --git a/tests/build.gradle b/tests/build.gradle
index b90c99d..05561e7 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -148,6 +148,8 @@ dependencies {
     compile 'io.opentracing:opentracing-mock:0.31.0'
     compile "org.apache.curator:curator-test:${gradle.curator.version}"
 
+    compile "com.amazonaws:aws-java-sdk-s3:1.11.295"
+
     compile project(':common:scala')
     compile project(':core:controller')
     compile project(':core:invoker')
diff --git a/tests/src/test/scala/actionContainers/ActionContainer.scala b/tests/src/test/scala/actionContainers/ActionContainer.scala
index 69f45ec..617c216 100644
--- a/tests/src/test/scala/actionContainers/ActionContainer.scala
+++ b/tests/src/test/scala/actionContainers/ActionContainer.scala
@@ -91,7 +91,7 @@ object ActionContainer {
     }.get // This fails if the docker binary couldn't be located.
   }
 
-  private lazy val dockerCmd: String = {
+  lazy val dockerCmd: String = {
     /*
      * The docker host is set to a provided property 'docker.host' if it's
      * available; otherwise we check with WhiskProperties to see whether we are
diff --git a/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala b/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreBehaviorBase.scala
similarity index 58%
copy from tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala
copy to tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreBehaviorBase.scala
index c47daaa..72b2f8e 100644
--- a/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala
+++ b/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreBehaviorBase.scala
@@ -17,26 +17,30 @@
 
 package whisk.core.database.memory
 
-import org.junit.runner.RunWith
 import org.scalatest.FlatSpec
-import org.scalatest.junit.JUnitRunner
-import whisk.core.database.ArtifactStore
-import whisk.core.database.test.behavior.ArtifactStoreBehavior
-import whisk.core.entity._
+import whisk.core.database.{ArtifactStore, AttachmentStore, DocumentSerializer}
+import whisk.core.database.test.behavior.ArtifactStoreBehaviorBase
+import whisk.core.entity.{
+  DocumentReader,
+  WhiskActivation,
+  WhiskAuth,
+  WhiskDocumentReader,
+  WhiskEntity,
+  WhiskEntityJsonFormat
+}
 
-import scala.reflect.classTag
+import scala.reflect.{classTag, ClassTag}
 
-@RunWith(classOf[JUnitRunner])
-class MemoryArtifactStoreTests extends FlatSpec with ArtifactStoreBehavior {
+trait MemoryArtifactStoreBehaviorBase extends FlatSpec with ArtifactStoreBehaviorBase {
   override def storeType = "Memory"
 
-  override val authStore = {
+  override lazy val authStore = {
     implicit val docReader: DocumentReader = WhiskDocumentReader
-    MemoryArtifactStoreProvider.makeStore[WhiskAuth]()
+    MemoryArtifactStoreProvider.makeArtifactStore[WhiskAuth](getAttachmentStore[WhiskAuth]())
   }
 
-  override val entityStore =
-    MemoryArtifactStoreProvider.makeStore[WhiskEntity]()(
+  override lazy val entityStore =
+    MemoryArtifactStoreProvider.makeArtifactStore[WhiskEntity](getAttachmentStore[WhiskEntity]())(
       classTag[WhiskEntity],
       WhiskEntityJsonFormat,
       WhiskDocumentReader,
@@ -44,11 +48,14 @@ class MemoryArtifactStoreTests extends FlatSpec with ArtifactStoreBehavior {
       logging,
       materializer)
 
-  override val activationStore = {
+  override lazy val activationStore = {
     implicit val docReader: DocumentReader = WhiskDocumentReader
-    MemoryArtifactStoreProvider.makeStore[WhiskActivation]()
+    MemoryArtifactStoreProvider.makeArtifactStore[WhiskActivation](getAttachmentStore[WhiskActivation]())
   }
 
   override protected def getAttachmentStore(store: ArtifactStore[_]) =
     Some(store.asInstanceOf[MemoryArtifactStore[_]].attachmentStore)
+
+  protected def getAttachmentStore[D <: DocumentSerializer: ClassTag](): AttachmentStore =
+    MemoryAttachmentStoreProvider.makeStore()
 }
diff --git a/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala b/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala
index c47daaa..e9860a0 100644
--- a/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala
+++ b/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala
@@ -20,35 +20,7 @@ package whisk.core.database.memory
 import org.junit.runner.RunWith
 import org.scalatest.FlatSpec
 import org.scalatest.junit.JUnitRunner
-import whisk.core.database.ArtifactStore
 import whisk.core.database.test.behavior.ArtifactStoreBehavior
-import whisk.core.entity._
-
-import scala.reflect.classTag
 
 @RunWith(classOf[JUnitRunner])
-class MemoryArtifactStoreTests extends FlatSpec with ArtifactStoreBehavior {
-  override def storeType = "Memory"
-
-  override val authStore = {
-    implicit val docReader: DocumentReader = WhiskDocumentReader
-    MemoryArtifactStoreProvider.makeStore[WhiskAuth]()
-  }
-
-  override val entityStore =
-    MemoryArtifactStoreProvider.makeStore[WhiskEntity]()(
-      classTag[WhiskEntity],
-      WhiskEntityJsonFormat,
-      WhiskDocumentReader,
-      actorSystem,
-      logging,
-      materializer)
-
-  override val activationStore = {
-    implicit val docReader: DocumentReader = WhiskDocumentReader
-    MemoryArtifactStoreProvider.makeStore[WhiskActivation]()
-  }
-
-  override protected def getAttachmentStore(store: ArtifactStore[_]) =
-    Some(store.asInstanceOf[MemoryArtifactStore[_]].attachmentStore)
-}
+class MemoryArtifactStoreTests extends FlatSpec with MemoryArtifactStoreBehaviorBase with ArtifactStoreBehavior
diff --git a/tests/src/test/scala/whisk/core/database/s3/S3AttachmentStoreAwsTests.scala b/tests/src/test/scala/whisk/core/database/s3/S3AttachmentStoreAwsTests.scala
new file mode 100644
index 0000000..2e99f58
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/s3/S3AttachmentStoreAwsTests.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.s3
+
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import whisk.core.entity.WhiskEntity
+
+@RunWith(classOf[JUnitRunner])
+class S3AttachmentStoreAwsTests extends S3AttachmentStoreBehaviorBase with S3Aws {
+  override lazy val store = makeS3Store[WhiskEntity]
+
+  override def storeType: String = "S3"
+}
diff --git a/tests/src/test/scala/whisk/core/database/s3/S3AttachmentStoreBehaviorBase.scala b/tests/src/test/scala/whisk/core/database/s3/S3AttachmentStoreBehaviorBase.scala
new file mode 100644
index 0000000..48def39
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/s3/S3AttachmentStoreBehaviorBase.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.s3
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import org.scalatest.FlatSpec
+import whisk.common.Logging
+import whisk.core.database.{AttachmentStore, DocumentSerializer}
+import whisk.core.database.memory.MemoryArtifactStoreBehaviorBase
+import whisk.core.database.test.AttachmentStoreBehaviors
+import whisk.core.database.test.behavior.ArtifactStoreAttachmentBehaviors
+import whisk.core.entity.WhiskEntity
+
+import scala.reflect.ClassTag
+import scala.util.Random
+
+trait S3AttachmentStoreBehaviorBase
+    extends FlatSpec
+    with MemoryArtifactStoreBehaviorBase
+    with ArtifactStoreAttachmentBehaviors
+    with AttachmentStoreBehaviors {
+  override lazy val store = makeS3Store[WhiskEntity]
+
+  override implicit val materializer: ActorMaterializer = ActorMaterializer()
+
+  override val prefix = s"attachmentTCK_${Random.alphanumeric.take(4).mkString}"
+
+  override def getAttachmentStore[D <: DocumentSerializer: ClassTag](): AttachmentStore =
+    makeS3Store[D]()
+
+  def makeS3Store[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
+                                                       logging: Logging,
+                                                       materializer: ActorMaterializer): AttachmentStore
+}
diff --git a/tests/src/test/scala/whisk/core/database/s3/S3AttachmentStoreMinioTests.scala b/tests/src/test/scala/whisk/core/database/s3/S3AttachmentStoreMinioTests.scala
new file mode 100644
index 0000000..07cf4b4
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/s3/S3AttachmentStoreMinioTests.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.s3
+
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import whisk.core.entity.WhiskEntity
+
+@RunWith(classOf[JUnitRunner])
+class S3AttachmentStoreMinioTests extends S3AttachmentStoreBehaviorBase with S3Minio {
+  override lazy val store = makeS3Store[WhiskEntity]
+
+  override def storeType: String = "S3Minio"
+
+  override def garbageCollectAttachments: Boolean = false
+}
diff --git a/tests/src/test/scala/whisk/core/database/s3/S3Aws.scala b/tests/src/test/scala/whisk/core/database/s3/S3Aws.scala
new file mode 100644
index 0000000..59c1cbe
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/s3/S3Aws.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.s3
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import com.typesafe.config.ConfigFactory
+import org.scalatest.FlatSpec
+import whisk.common.Logging
+import whisk.core.database.{AttachmentStore, DocumentSerializer}
+
+import scala.reflect.ClassTag
+
+trait S3Aws extends FlatSpec {
+  def makeS3Store[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
+                                                       logging: Logging,
+                                                       materializer: ActorMaterializer): AttachmentStore = {
+    val config = ConfigFactory.parseString(s"""
+       |whisk {
+       |   s3 {
+       |      alpakka {
+       |         aws {
+       |           credentials {
+       |             provider = static
+       |             access-key-id = "$accessKeyId"
+       |             secret-access-key = "$secretAccessKey"
+       |           }
+       |           region {
+       |             provider = static
+       |             default-region = "$region"
+       |           }
+       |         }
+       |      }
+       |      bucket = "$bucket"
+       |    }
+       |}
+      """.stripMargin).withFallback(ConfigFactory.load())
+    S3AttachmentStoreProvider.makeStore[D](config)
+  }
+
+  override protected def withFixture(test: NoArgTest) = {
+    assume(
+      secretAccessKey != null,
+      s"'AWS_SECRET_ACCESS_KEY' env not configured. Configure following " +
+        s"env variables for test to run. 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', 'AWS_REGION'")
+
+    require(accessKeyId != null, "'AWS_ACCESS_KEY_ID' env variable not set")
+    require(region != null, "'AWS_REGION' env variable not set")
+
+    super.withFixture(test)
+  }
+
+  val bucket = "test-ow-travis"
+
+  val accessKeyId = System.getenv("AWS_ACCESS_KEY_ID")
+  val secretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY")
+  val region = System.getenv("AWS_REGION")
+}
diff --git a/tests/src/test/scala/whisk/core/database/s3/S3Minio.scala b/tests/src/test/scala/whisk/core/database/s3/S3Minio.scala
new file mode 100644
index 0000000..0c59e68
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/s3/S3Minio.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.s3
+
+import java.net.ServerSocket
+
+import actionContainers.ActionContainer
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials}
+import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
+import com.amazonaws.services.s3.AmazonS3ClientBuilder
+import com.typesafe.config.ConfigFactory
+import common.{SimpleExec, StreamLogging}
+import org.scalatest.{BeforeAndAfterAll, FlatSpec}
+import whisk.common.{Logging, TransactionId}
+import whisk.core.database.{AttachmentStore, DocumentSerializer}
+
+import scala.concurrent.duration._
+import scala.reflect.ClassTag
+
+trait S3Minio extends FlatSpec with BeforeAndAfterAll with StreamLogging {
+  def makeS3Store[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
+                                                       logging: Logging,
+                                                       materializer: ActorMaterializer): AttachmentStore = {
+    val config = ConfigFactory.parseString(s"""
+      |whisk {
+      |     s3 {
+      |      alpakka {
+      |         aws {
+      |           credentials {
+      |             provider = static
+      |             access-key-id = "$accessKey"
+      |             secret-access-key = "$secretAccessKey"
+      |           }
+      |           region {
+      |             provider = static
+      |             default-region = us-west-2
+      |           }
+      |         }
+      |         endpoint-url = "http://localhost:$port"
+      |      }
+      |      bucket = "$bucket"
+      |     }
+      |}
+      """.stripMargin).withFallback(ConfigFactory.load())
+    S3AttachmentStoreProvider.makeStore[D](config)
+  }
+
+  private val accessKey = "TESTKEY"
+  private val secretAccessKey = "TESTSECRET"
+  private val port = freePort()
+  private val bucket = "test-ow-travis"
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    dockerExec(
+      s"run -d -e MINIO_ACCESS_KEY=$accessKey -e MINIO_SECRET_KEY=$secretAccessKey -p $port:9000 minio/minio server /data")
+    println(s"Started minio on $port")
+    createTestBucket()
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    val containerId = dockerExec("ps -q --filter ancestor=minio/minio")
+    containerId.split("\n").map(_.trim).foreach(id => dockerExec(s"stop $id"))
+    println(s"Stopped minio container")
+  }
+
+  def createTestBucket(): Unit = {
+    val endpoint = new EndpointConfiguration(s"http://localhost:$port", "us-west-2")
+    val client = AmazonS3ClientBuilder.standard
+      .withPathStyleAccessEnabled(true)
+      .withEndpointConfiguration(endpoint)
+      .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretAccessKey)))
+      .build
+
+    whisk.utils.retry(client.createBucket(bucket), 6, Some(1.minute))
+    println(s"Created bucket $bucket")
+  }
+
+  private def dockerExec(cmd: String): String = {
+    implicit val tid: TransactionId = TransactionId.testing
+    val command = s"${ActionContainer.dockerCmd} $cmd"
+    val cmdSeq = command.split(" ").map(_.trim).filter(_.nonEmpty)
+    val (out, err, code) = SimpleExec.syncRunCmd(cmdSeq)
+    assert(code == 0, s"Error occurred for command '$command'. Exit code: $code, Error: $err")
+    out
+  }
+
+  private def freePort(): Int = {
+    val socket = new ServerSocket(0)
+    try socket.getLocalPort
+    finally if (socket != null) socket.close()
+  }
+}