You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2018/07/27 18:24:03 UTC
[incubator-openwhisk] branch master updated: S3AttachmentStore
(#3779)
This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 4fea23b S3AttachmentStore (#3779)
4fea23b is described below
commit 4fea23bc06831d66a54f61a8353926cfebd3cb3c
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Fri Jul 27 23:53:59 2018 +0530
S3AttachmentStore (#3779)
This PR introduces a S3AttachmentStore which is an AttachmentStore implementation for storing attachments in S3 API compatible object storages.
---
common/scala/build.gradle | 7 +
common/scala/src/main/resources/reference.conf | 2 +
common/scala/src/main/resources/s3-reference.conf | 78 +++++++++
.../src/main/scala/whisk/core/WhiskConfig.scala | 2 +
.../whisk/core/database/AttachmentSupport.scala | 5 +-
.../whisk/core/database/CouchDbRestStore.scala | 4 +-
.../whisk/core/database/s3/S3AttachmentStore.scala | 192 +++++++++++++++++++++
tests/build.gradle | 2 +
.../scala/actionContainers/ActionContainer.scala | 2 +-
...scala => MemoryArtifactStoreBehaviorBase.scala} | 35 ++--
.../database/memory/MemoryArtifactStoreTests.scala | 30 +---
.../database/s3/S3AttachmentStoreAwsTests.scala | 29 ++++
.../s3/S3AttachmentStoreBehaviorBase.scala | 50 ++++++
.../database/s3/S3AttachmentStoreMinioTests.scala | 31 ++++
.../test/scala/whisk/core/database/s3/S3Aws.scala | 73 ++++++++
.../scala/whisk/core/database/s3/S3Minio.scala | 111 ++++++++++++
16 files changed, 606 insertions(+), 47 deletions(-)
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 8cb91bf..5edea10 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -76,6 +76,13 @@ dependencies {
compile 'io.reactivex:rxscala_2.11:0.26.5'
compile 'io.reactivex:rxjava-reactive-streams:1.2.1'
compile 'com.microsoft.azure:azure-cosmosdb:2.0.0'
+
+ compile ('com.lightbend.akka:akka-stream-alpakka-s3_2.11:0.19') {
+ exclude group: 'commons-logging'
+ exclude group: 'org.apache.httpcomponents' //Not used as alpakka uses akka-http
+ exclude group: 'com.fasterxml.jackson.core'
+ exclude group: 'com.fasterxml.jackson.dataformat'
+ }
scoverage gradle.scoverage.deps
}
diff --git a/common/scala/src/main/resources/reference.conf b/common/scala/src/main/resources/reference.conf
index 4c958df..b671f02 100644
--- a/common/scala/src/main/resources/reference.conf
+++ b/common/scala/src/main/resources/reference.conf
@@ -1,6 +1,8 @@
# Licensed to the Apache Software Foundation (ASF) under one or more contributor
# license agreements; and to You under the Apache License, Version 2.0.
+include "s3-reference.conf"
+
whisk.spi {
ArtifactStoreProvider = whisk.core.database.CouchDbStoreProvider
ActivationStoreProvider = whisk.core.database.ArtifactActivationStoreProvider
diff --git a/common/scala/src/main/resources/s3-reference.conf b/common/scala/src/main/resources/s3-reference.conf
new file mode 100644
index 0000000..92bc23e
--- /dev/null
+++ b/common/scala/src/main/resources/s3-reference.conf
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more contributor
+# license agreements; and to You under the Apache License, Version 2.0.
+
+whisk {
+ s3 {
+ # See https://developer.lightbend.com/docs/alpakka/current/s3.html#usage
+ alpakka {
+ # whether the buffer request chunks (up to 5MB each) to "memory" or "disk"
+ buffer = "memory"
+
+ # location for temporary files, if buffer is set to "disk". If empty, uses the standard java temp path.
+ disk-buffer-path = ""
+
+ proxy {
+ # hostname of the proxy. If undefined ("") proxy is not enabled.
+ host = ""
+ port = 8000
+
+ # if "secure" is set to "true" then HTTPS will be used for all requests to S3, otherwise HTTP will be used
+ secure = true
+ }
+
+ # default values for AWS configuration. If credentials and/or region are not specified when creating S3Client,
+ # these values will be used.
+ aws {
+ # If this section is absent, the fallback behavior is to use the
+ # com.amazonaws.auth.DefaultAWSCredentialsProviderChain instance to resolve credentials
+ credentials {
+ # supported providers:
+ # anon - anonymous requests ("no auth")
+ # static - static credentials,
+ # required params:
+ # access-key-id
+ # secret-access-key
+ # optional:
+ # token
+ # default: as described in com.amazonaws.auth.DefaultAWSCredentialsProviderChain docs,
+ # attempts to get the credentials from either:
+ # - environment variables
+ # - system properties
+ # - credentials file
+ # - EC2 credentials service
+ # - IAM / metadata
+ provider = default
+ }
+
+ # If this section is absent, the fallback behavior is to use the
+ # com.amazonaws.regions.AwsRegionProvider.DefaultAwsRegionProviderChain instance to resolve region
+ region {
+ # supported providers:
+ # static - static credentials,
+ # required params:
+ # default-region
+ # default: as described in com.amazonaws.regions.AwsRegionProvider.DefaultAwsRegionProviderChain docs,
+ # attempts to get the region from either:
+ # - environment variables
+ # - system properties
+ # - progile file
+ # - EC2 metadata
+ provider = default
+ }
+ }
+
+ # Enable path style access to s3, i.e. "https://s3-eu-west-1.amazonaws.com/my.bucket/myobject"
+ # Default is virtual-hosted style.
+ # When using virtual hosted–style buckets with SSL, the S3 wild card certificate only matches buckets that do not contain periods.
+ # Buckets containing periods will lead to certificate errors. In those cases it's useful to enable path-style access.
+ path-style-access = true
+
+ # Custom endpoint url, used for alternate s3 implementations
+ # endpoint-url = null
+
+ # Which version of the list bucket api to use. Set to 1 to use the old style version 1 API.
+ # By default the newer version 2 api is used.
+ list-bucket-api-version = 2
+ }
+ }
+}
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 7396c02..4646dc2 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -243,4 +243,6 @@ object ConfigKeys {
val containerProxy = "whisk.container-proxy"
val containerProxyTimeouts = s"$containerProxy.timeouts"
+ val s3 = "whisk.s3"
+
}
diff --git a/common/scala/src/main/scala/whisk/core/database/AttachmentSupport.scala b/common/scala/src/main/scala/whisk/core/database/AttachmentSupport.scala
index 72f7754..600e8b6 100644
--- a/common/scala/src/main/scala/whisk/core/database/AttachmentSupport.scala
+++ b/common/scala/src/main/scala/whisk/core/database/AttachmentSupport.scala
@@ -102,10 +102,13 @@ trait AttachmentSupport[DocumentAbstraction <: DocumentSerializer] extends Defau
protected[database] def uriOf(bytesOrSource: Either[ByteString, Source[ByteString, _]], path: => String): Uri = {
bytesOrSource match {
case Left(bytes) => Uri.from(scheme = MemScheme, path = encode(bytes))
- case Right(_) => Uri.from(scheme = attachmentScheme, path = path)
+ case Right(_) => uriFrom(scheme = attachmentScheme, path = path)
}
}
+ //Not using Uri.from due to https://github.com/akka/akka-http/issues/2080
+ protected[database] def uriFrom(scheme: String, path: String): Uri = Uri(s"$scheme:$path")
+
/**
* Constructs a source from inlined attachment contents
*/
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
index aa35358..8518a95 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
@@ -375,7 +375,7 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
docStream: Source[ByteString, _])(implicit transid: TransactionId): Future[(DocInfo, Attached)] = {
if (maxInlineSize.toBytes == 0) {
- val uri = Uri.from(scheme = attachmentScheme, path = UUID().asString)
+ val uri = uriFrom(scheme = attachmentScheme, path = UUID().asString)
for {
attached <- Future.successful(Attached(uri.toString, contentType))
i1 <- put(update(doc, attached))
@@ -550,7 +550,7 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
*/
private def getAttachmentName(name: String): String = {
Try(java.util.UUID.fromString(name))
- .map(_ => Uri.from(scheme = attachmentScheme, path = name).toString)
+ .map(_ => uriFrom(scheme = attachmentScheme, path = name).toString)
.getOrElse(name)
}
diff --git a/common/scala/src/main/scala/whisk/core/database/s3/S3AttachmentStore.scala b/common/scala/src/main/scala/whisk/core/database/s3/S3AttachmentStore.scala
new file mode 100644
index 0000000..fee6c5f
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/database/s3/S3AttachmentStore.scala
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.s3
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.ContentType
+import akka.stream.ActorMaterializer
+import akka.stream.alpakka.s3.scaladsl.S3Client
+import akka.stream.alpakka.s3.{S3Exception, S3Settings}
+import akka.stream.scaladsl.{Sink, Source}
+import akka.util.ByteString
+import com.typesafe.config.Config
+import pureconfig.loadConfigOrThrow
+import whisk.common.LoggingMarkers.{DATABASE_ATTS_DELETE, DATABASE_ATT_DELETE, DATABASE_ATT_GET, DATABASE_ATT_SAVE}
+import whisk.common.{Logging, TransactionId}
+import whisk.core.ConfigKeys
+import whisk.core.database.StoreUtils._
+import whisk.core.database._
+import whisk.core.entity.DocId
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.reflect.ClassTag
+
+object S3AttachmentStoreProvider extends AttachmentStoreProvider {
+ val alpakkaConfigKey = s"${ConfigKeys.s3}.alpakka"
+ case class S3Config(bucket: String) {
+ def prefixFor[D](implicit tag: ClassTag[D]): String = {
+ tag.runtimeClass.getSimpleName.toLowerCase
+ }
+ }
+
+ override def makeStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
+ logging: Logging,
+ materializer: ActorMaterializer): AttachmentStore = {
+ val client = new S3Client(S3Settings(alpakkaConfigKey))
+ val config = loadConfigOrThrow[S3Config](ConfigKeys.s3)
+ new S3AttachmentStore(client, config.bucket, config.prefixFor[D])
+ }
+
+ def makeStore[D <: DocumentSerializer: ClassTag](config: Config)(implicit actorSystem: ActorSystem,
+ logging: Logging,
+ materializer: ActorMaterializer): AttachmentStore = {
+ val client = new S3Client(S3Settings(config, alpakkaConfigKey))
+ val s3config = loadConfigOrThrow[S3Config](config, ConfigKeys.s3)
+ new S3AttachmentStore(client, s3config.bucket, s3config.prefixFor[D])
+ }
+
+}
+class S3AttachmentStore(client: S3Client, bucket: String, prefix: String)(implicit system: ActorSystem,
+ logging: Logging,
+ materializer: ActorMaterializer)
+ extends AttachmentStore {
+ override val scheme = "s3"
+
+ override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher
+
+ override protected[core] def attach(
+ docId: DocId,
+ name: String,
+ contentType: ContentType,
+ docStream: Source[ByteString, _])(implicit transid: TransactionId): Future[AttachResult] = {
+ require(name != null, "name undefined")
+ val start =
+ transid.started(this, DATABASE_ATT_SAVE, s"[ATT_PUT] uploading attachment '$name' of document 'id: $docId'")
+
+ //A possible optimization for small attachments < 5MB can be to use putObject instead of multipartUpload
+ //and thus use 1 remote call instead of 3
+ val f = docStream
+ .runWith(combinedSink(client.multipartUpload(bucket, objectKey(docId, name), contentType)))
+ .map(r => AttachResult(r.digest, r.length))
+
+ f.onSuccess({
+ case _ =>
+ transid
+ .finished(this, start, s"[ATT_PUT] '$prefix' completed uploading attachment '$name' of document 'id: $docId'")
+ })
+
+ reportFailure(
+ f,
+ start,
+ failure => s"[ATT_PUT] '$prefix' internal error, name: '$name', doc: '$docId', failure: '${failure.getMessage}'")
+ }
+
+ override protected[core] def readAttachment[T](docId: DocId, name: String, sink: Sink[ByteString, Future[T]])(
+ implicit transid: TransactionId): Future[T] = {
+ require(name != null, "name undefined")
+ val start =
+ transid.started(
+ this,
+ DATABASE_ATT_GET,
+ s"[ATT_GET] '$prefix' finding attachment '$name' of document 'id: $docId'")
+ val (source, _) = client.download(bucket, objectKey(docId, name))
+
+ val f = source.runWith(sink)
+
+ val g = f.transform(
+ { s =>
+ transid
+ .finished(this, start, s"[ATT_GET] '$prefix' completed: found attachment '$name' of document 'id: $docId'")
+ s
+ }, {
+ case s: Throwable if isMissingKeyException(s) =>
+ transid
+ .finished(
+ this,
+ start,
+ s"[ATT_GET] '$prefix', retrieving attachment '$name' of document 'id: $docId'; not found.")
+ NoDocumentException("Not found on 'readAttachment'.")
+ case e => e
+ })
+
+ reportFailure(
+ g,
+ start,
+ failure =>
+ s"[ATT_GET] '$prefix' internal error, name: '$name', doc: 'id: $docId', failure: '${failure.getMessage}'")
+ }
+
+ override protected[core] def deleteAttachments(docId: DocId)(implicit transid: TransactionId): Future[Boolean] = {
+ val start =
+ transid.started(this, DATABASE_ATTS_DELETE, s"[ATT_DELETE] deleting attachments of document 'id: $docId'")
+
+ //S3 provides API to delete multiple objects in single call however alpakka client
+ //currently does not support that and also in current usage 1 docs has at most 1 attachment
+ //so current approach would also involve 2 remote calls
+ val f = client
+ .listBucket(bucket, Some(objectKeyPrefix(docId)))
+ .mapAsync(1)(bc => client.deleteObject(bc.bucketName, bc.key))
+ .runWith(Sink.seq)
+ .map(_ => true)
+
+ f.onSuccess {
+ case _ =>
+ transid.finished(this, start, s"[ATTS_DELETE] completed: deleting attachments of document 'id: $docId'")
+ }
+
+ reportFailure(
+ f,
+ start,
+ failure => s"[ATTS_DELETE] '$prefix' internal error, doc: '$docId', failure: '${failure.getMessage}'")
+ }
+
+ override protected[core] def deleteAttachment(docId: DocId, name: String)(
+ implicit transid: TransactionId): Future[Boolean] = {
+ val start =
+ transid.started(this, DATABASE_ATT_DELETE, s"[ATT_DELETE] deleting attachment '$name' of document 'id: $docId'")
+
+ val f = client
+ .deleteObject(bucket, objectKey(docId, name))
+ .map(_ => true)
+
+ f.onSuccess {
+ case _ =>
+ transid.finished(this, start, s"[ATT_DELETE] completed: deleting attachment '$name' of document 'id: $docId'")
+ }
+
+ reportFailure(
+ f,
+ start,
+ failure => s"[ATT_DELETE] '$prefix' internal error, doc: '$docId', failure: '${failure.getMessage}'")
+ }
+
+ override def shutdown(): Unit = {}
+
+ private def objectKey(id: DocId, name: String): String = s"$prefix/${id.id}/$name"
+
+ private def objectKeyPrefix(id: DocId): String = s"$prefix/${id.id}"
+
+ private def isMissingKeyException(e: Throwable): Boolean = {
+ //In some case S3Exception is a sub cause. So need to recurse
+ e match {
+ case s: S3Exception if s.code == "NoSuchKey" => true
+ case t if t != null && isMissingKeyException(t.getCause) => true
+ case _ => false
+ }
+ }
+}
diff --git a/tests/build.gradle b/tests/build.gradle
index b90c99d..05561e7 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -148,6 +148,8 @@ dependencies {
compile 'io.opentracing:opentracing-mock:0.31.0'
compile "org.apache.curator:curator-test:${gradle.curator.version}"
+ compile "com.amazonaws:aws-java-sdk-s3:1.11.295"
+
compile project(':common:scala')
compile project(':core:controller')
compile project(':core:invoker')
diff --git a/tests/src/test/scala/actionContainers/ActionContainer.scala b/tests/src/test/scala/actionContainers/ActionContainer.scala
index 69f45ec..617c216 100644
--- a/tests/src/test/scala/actionContainers/ActionContainer.scala
+++ b/tests/src/test/scala/actionContainers/ActionContainer.scala
@@ -91,7 +91,7 @@ object ActionContainer {
}.get // This fails if the docker binary couldn't be located.
}
- private lazy val dockerCmd: String = {
+ lazy val dockerCmd: String = {
/*
* The docker host is set to a provided property 'docker.host' if it's
* available; otherwise we check with WhiskProperties to see whether we are
diff --git a/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala b/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreBehaviorBase.scala
similarity index 58%
copy from tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala
copy to tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreBehaviorBase.scala
index c47daaa..72b2f8e 100644
--- a/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala
+++ b/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreBehaviorBase.scala
@@ -17,26 +17,30 @@
package whisk.core.database.memory
-import org.junit.runner.RunWith
import org.scalatest.FlatSpec
-import org.scalatest.junit.JUnitRunner
-import whisk.core.database.ArtifactStore
-import whisk.core.database.test.behavior.ArtifactStoreBehavior
-import whisk.core.entity._
+import whisk.core.database.{ArtifactStore, AttachmentStore, DocumentSerializer}
+import whisk.core.database.test.behavior.ArtifactStoreBehaviorBase
+import whisk.core.entity.{
+ DocumentReader,
+ WhiskActivation,
+ WhiskAuth,
+ WhiskDocumentReader,
+ WhiskEntity,
+ WhiskEntityJsonFormat
+}
-import scala.reflect.classTag
+import scala.reflect.{classTag, ClassTag}
-@RunWith(classOf[JUnitRunner])
-class MemoryArtifactStoreTests extends FlatSpec with ArtifactStoreBehavior {
+trait MemoryArtifactStoreBehaviorBase extends FlatSpec with ArtifactStoreBehaviorBase {
override def storeType = "Memory"
- override val authStore = {
+ override lazy val authStore = {
implicit val docReader: DocumentReader = WhiskDocumentReader
- MemoryArtifactStoreProvider.makeStore[WhiskAuth]()
+ MemoryArtifactStoreProvider.makeArtifactStore[WhiskAuth](getAttachmentStore[WhiskAuth]())
}
- override val entityStore =
- MemoryArtifactStoreProvider.makeStore[WhiskEntity]()(
+ override lazy val entityStore =
+ MemoryArtifactStoreProvider.makeArtifactStore[WhiskEntity](getAttachmentStore[WhiskEntity]())(
classTag[WhiskEntity],
WhiskEntityJsonFormat,
WhiskDocumentReader,
@@ -44,11 +48,14 @@ class MemoryArtifactStoreTests extends FlatSpec with ArtifactStoreBehavior {
logging,
materializer)
- override val activationStore = {
+ override lazy val activationStore = {
implicit val docReader: DocumentReader = WhiskDocumentReader
- MemoryArtifactStoreProvider.makeStore[WhiskActivation]()
+ MemoryArtifactStoreProvider.makeArtifactStore[WhiskActivation](getAttachmentStore[WhiskActivation]())
}
override protected def getAttachmentStore(store: ArtifactStore[_]) =
Some(store.asInstanceOf[MemoryArtifactStore[_]].attachmentStore)
+
+ protected def getAttachmentStore[D <: DocumentSerializer: ClassTag](): AttachmentStore =
+ MemoryAttachmentStoreProvider.makeStore()
}
diff --git a/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala b/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala
index c47daaa..e9860a0 100644
--- a/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala
+++ b/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala
@@ -20,35 +20,7 @@ package whisk.core.database.memory
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
-import whisk.core.database.ArtifactStore
import whisk.core.database.test.behavior.ArtifactStoreBehavior
-import whisk.core.entity._
-
-import scala.reflect.classTag
@RunWith(classOf[JUnitRunner])
-class MemoryArtifactStoreTests extends FlatSpec with ArtifactStoreBehavior {
- override def storeType = "Memory"
-
- override val authStore = {
- implicit val docReader: DocumentReader = WhiskDocumentReader
- MemoryArtifactStoreProvider.makeStore[WhiskAuth]()
- }
-
- override val entityStore =
- MemoryArtifactStoreProvider.makeStore[WhiskEntity]()(
- classTag[WhiskEntity],
- WhiskEntityJsonFormat,
- WhiskDocumentReader,
- actorSystem,
- logging,
- materializer)
-
- override val activationStore = {
- implicit val docReader: DocumentReader = WhiskDocumentReader
- MemoryArtifactStoreProvider.makeStore[WhiskActivation]()
- }
-
- override protected def getAttachmentStore(store: ArtifactStore[_]) =
- Some(store.asInstanceOf[MemoryArtifactStore[_]].attachmentStore)
-}
+class MemoryArtifactStoreTests extends FlatSpec with MemoryArtifactStoreBehaviorBase with ArtifactStoreBehavior
diff --git a/tests/src/test/scala/whisk/core/database/s3/S3AttachmentStoreAwsTests.scala b/tests/src/test/scala/whisk/core/database/s3/S3AttachmentStoreAwsTests.scala
new file mode 100644
index 0000000..2e99f58
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/s3/S3AttachmentStoreAwsTests.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.s3
+
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import whisk.core.entity.WhiskEntity
+
+@RunWith(classOf[JUnitRunner])
+class S3AttachmentStoreAwsTests extends S3AttachmentStoreBehaviorBase with S3Aws {
+ override lazy val store = makeS3Store[WhiskEntity]
+
+ override def storeType: String = "S3"
+}
diff --git a/tests/src/test/scala/whisk/core/database/s3/S3AttachmentStoreBehaviorBase.scala b/tests/src/test/scala/whisk/core/database/s3/S3AttachmentStoreBehaviorBase.scala
new file mode 100644
index 0000000..48def39
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/s3/S3AttachmentStoreBehaviorBase.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.s3
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import org.scalatest.FlatSpec
+import whisk.common.Logging
+import whisk.core.database.{AttachmentStore, DocumentSerializer}
+import whisk.core.database.memory.MemoryArtifactStoreBehaviorBase
+import whisk.core.database.test.AttachmentStoreBehaviors
+import whisk.core.database.test.behavior.ArtifactStoreAttachmentBehaviors
+import whisk.core.entity.WhiskEntity
+
+import scala.reflect.ClassTag
+import scala.util.Random
+
+trait S3AttachmentStoreBehaviorBase
+ extends FlatSpec
+ with MemoryArtifactStoreBehaviorBase
+ with ArtifactStoreAttachmentBehaviors
+ with AttachmentStoreBehaviors {
+ override lazy val store = makeS3Store[WhiskEntity]
+
+ override implicit val materializer: ActorMaterializer = ActorMaterializer()
+
+ override val prefix = s"attachmentTCK_${Random.alphanumeric.take(4).mkString}"
+
+ override def getAttachmentStore[D <: DocumentSerializer: ClassTag](): AttachmentStore =
+ makeS3Store[D]()
+
+ def makeS3Store[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
+ logging: Logging,
+ materializer: ActorMaterializer): AttachmentStore
+}
diff --git a/tests/src/test/scala/whisk/core/database/s3/S3AttachmentStoreMinioTests.scala b/tests/src/test/scala/whisk/core/database/s3/S3AttachmentStoreMinioTests.scala
new file mode 100644
index 0000000..07cf4b4
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/s3/S3AttachmentStoreMinioTests.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.s3
+
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import whisk.core.entity.WhiskEntity
+
+@RunWith(classOf[JUnitRunner])
+class S3AttachmentStoreMinioTests extends S3AttachmentStoreBehaviorBase with S3Minio {
+ override lazy val store = makeS3Store[WhiskEntity]
+
+ override def storeType: String = "S3Minio"
+
+ override def garbageCollectAttachments: Boolean = false
+}
diff --git a/tests/src/test/scala/whisk/core/database/s3/S3Aws.scala b/tests/src/test/scala/whisk/core/database/s3/S3Aws.scala
new file mode 100644
index 0000000..59c1cbe
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/s3/S3Aws.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.s3
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import com.typesafe.config.ConfigFactory
+import org.scalatest.FlatSpec
+import whisk.common.Logging
+import whisk.core.database.{AttachmentStore, DocumentSerializer}
+
+import scala.reflect.ClassTag
+
+trait S3Aws extends FlatSpec {
+ def makeS3Store[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
+ logging: Logging,
+ materializer: ActorMaterializer): AttachmentStore = {
+ val config = ConfigFactory.parseString(s"""
+ |whisk {
+ | s3 {
+ | alpakka {
+ | aws {
+ | credentials {
+ | provider = static
+ | access-key-id = "$accessKeyId"
+ | secret-access-key = "$secretAccessKey"
+ | }
+ | region {
+ | provider = static
+ | default-region = "$region"
+ | }
+ | }
+ | }
+ | bucket = "$bucket"
+ | }
+ |}
+ """.stripMargin).withFallback(ConfigFactory.load())
+ S3AttachmentStoreProvider.makeStore[D](config)
+ }
+
+ override protected def withFixture(test: NoArgTest) = {
+ assume(
+ secretAccessKey != null,
+ s"'AWS_SECRET_ACCESS_KEY' env not configured. Configure following " +
+ s"env variables for test to run. 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', 'AWS_REGION'")
+
+ require(accessKeyId != null, "'AWS_ACCESS_KEY_ID' env variable not set")
+ require(region != null, "'AWS_REGION' env variable not set")
+
+ super.withFixture(test)
+ }
+
+ val bucket = "test-ow-travis"
+
+ val accessKeyId = System.getenv("AWS_ACCESS_KEY_ID")
+ val secretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY")
+ val region = System.getenv("AWS_REGION")
+}
diff --git a/tests/src/test/scala/whisk/core/database/s3/S3Minio.scala b/tests/src/test/scala/whisk/core/database/s3/S3Minio.scala
new file mode 100644
index 0000000..0c59e68
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/database/s3/S3Minio.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.s3
+
+import java.net.ServerSocket
+
+import actionContainers.ActionContainer
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials}
+import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
+import com.amazonaws.services.s3.AmazonS3ClientBuilder
+import com.typesafe.config.ConfigFactory
+import common.{SimpleExec, StreamLogging}
+import org.scalatest.{BeforeAndAfterAll, FlatSpec}
+import whisk.common.{Logging, TransactionId}
+import whisk.core.database.{AttachmentStore, DocumentSerializer}
+
+import scala.concurrent.duration._
+import scala.reflect.ClassTag
+
+trait S3Minio extends FlatSpec with BeforeAndAfterAll with StreamLogging {
+ def makeS3Store[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
+ logging: Logging,
+ materializer: ActorMaterializer): AttachmentStore = {
+ val config = ConfigFactory.parseString(s"""
+ |whisk {
+ | s3 {
+ | alpakka {
+ | aws {
+ | credentials {
+ | provider = static
+ | access-key-id = "$accessKey"
+ | secret-access-key = "$secretAccessKey"
+ | }
+ | region {
+ | provider = static
+ | default-region = us-west-2
+ | }
+ | }
+ | endpoint-url = "http://localhost:$port"
+ | }
+ | bucket = "$bucket"
+ | }
+ |}
+ """.stripMargin).withFallback(ConfigFactory.load())
+ S3AttachmentStoreProvider.makeStore[D](config)
+ }
+
+ private val accessKey = "TESTKEY"
+ private val secretAccessKey = "TESTSECRET"
+ private val port = freePort()
+ private val bucket = "test-ow-travis"
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ dockerExec(
+ s"run -d -e MINIO_ACCESS_KEY=$accessKey -e MINIO_SECRET_KEY=$secretAccessKey -p $port:9000 minio/minio server /data")
+ println(s"Started minio on $port")
+ createTestBucket()
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ val containerId = dockerExec("ps -q --filter ancestor=minio/minio")
+ containerId.split("\n").map(_.trim).foreach(id => dockerExec(s"stop $id"))
+ println(s"Stopped minio container")
+ }
+
+ def createTestBucket(): Unit = {
+ val endpoint = new EndpointConfiguration(s"http://localhost:$port", "us-west-2")
+ val client = AmazonS3ClientBuilder.standard
+ .withPathStyleAccessEnabled(true)
+ .withEndpointConfiguration(endpoint)
+ .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretAccessKey)))
+ .build
+
+ whisk.utils.retry(client.createBucket(bucket), 6, Some(1.minute))
+ println(s"Created bucket $bucket")
+ }
+
+ private def dockerExec(cmd: String): String = {
+ implicit val tid: TransactionId = TransactionId.testing
+ val command = s"${ActionContainer.dockerCmd} $cmd"
+ val cmdSeq = command.split(" ").map(_.trim).filter(_.nonEmpty)
+ val (out, err, code) = SimpleExec.syncRunCmd(cmdSeq)
+ assert(code == 0, s"Error occurred for command '$command'. Exit code: $code, Error: $err")
+ out
+ }
+
+ private def freePort(): Int = {
+ val socket = new ServerSocket(0)
+ try socket.getLocalPort
+ finally if (socket != null) socket.close()
+ }
+}