You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ch...@apache.org on 2019/05/22 08:30:45 UTC
[incubator-openwhisk] branch master updated: Update to Alpakka S3
Connector v1.0 release (#4479)
This is an automated email from the ASF dual-hosted git repository.
chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 73732b4 Update to Alpakka S3 Connector v1.0 release (#4479)
73732b4 is described below
commit 73732b4ace1fcce277a7a66d2e00ff7c6dddcca7
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Wed May 22 14:00:32 2019 +0530
Update to Alpakka S3 Connector v1.0 release (#4479)
Updates to Alpakka S3 Connector v1.0.1 release.
This commit also includes some fixes related to #4484 which were causing compatibility issues with existing setup. For existing setup indexes were still using `Hash` indexing and that caused failure while creating `IndexingPolicy` upon collection read. So added back support for `Hash` index but not using them to create new indexes now
---
common/scala/build.gradle | 2 +-
common/scala/src/main/resources/s3-reference.conf | 2 +-
.../core/database/cosmosdb/IndexingPolicy.scala | 3 +
.../core/database/s3/S3AttachmentStore.scala | 91 +++++++++++-----------
.../database/cosmosdb/CosmosDBSupportTests.scala | 9 +--
5 files changed, 54 insertions(+), 53 deletions(-)
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 64649f2..2f457fc 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -85,7 +85,7 @@ dependencies {
compile 'io.reactivex:rxjava-reactive-streams:1.2.1'
compile ('com.microsoft.azure:azure-cosmosdb:2.4.2')
- compile ('com.lightbend.akka:akka-stream-alpakka-s3_2.12:0.19') {
+ compile ('com.lightbend.akka:akka-stream-alpakka-s3_2.12:1.0.1') {
exclude group: 'org.apache.httpcomponents' //Not used as alpakka uses akka-http
exclude group: 'com.fasterxml.jackson.core'
exclude group: 'com.fasterxml.jackson.dataformat'
diff --git a/common/scala/src/main/resources/s3-reference.conf b/common/scala/src/main/resources/s3-reference.conf
index 664bdcf..39b09c5 100644
--- a/common/scala/src/main/resources/s3-reference.conf
+++ b/common/scala/src/main/resources/s3-reference.conf
@@ -26,7 +26,7 @@ whisk {
# timeout = 10 min
# }
- # See https://developer.lightbend.com/docs/alpakka/current/s3.html#usage
+ # See https://github.com/akka/alpakka/blob/v1.0.0/s3/src/main/resources/reference.conf
alpakka {
# whether the buffer request chunks (up to 5MB each) to "memory" or "disk"
buffer = "memory"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/IndexingPolicy.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/IndexingPolicy.scala
index 8d35f71..82c25b5 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/IndexingPolicy.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/IndexingPolicy.scala
@@ -19,6 +19,7 @@ package org.apache.openwhisk.core.database.cosmosdb
import com.microsoft.azure.cosmosdb.{
DataType,
+ HashIndex,
IndexKind,
RangeIndex,
ExcludedPath => JExcludedPath,
@@ -99,6 +100,7 @@ object ExcludedPath {
case class Index(kind: IndexKind, dataType: DataType, precision: Int) {
def asJava(): JIndex = kind match {
+ case IndexKind.Hash => JIndex.Hash(dataType, precision)
case IndexKind.Range => JIndex.Range(dataType, precision)
case _ => throw new RuntimeException(s"Unsupported kind $kind")
}
@@ -106,6 +108,7 @@ case class Index(kind: IndexKind, dataType: DataType, precision: Int) {
object Index {
def apply(index: JIndex): Index = index match {
+ case i: HashIndex => Index(i.getKind, i.getDataType, i.getPrecision)
case i: RangeIndex => Index(i.getKind, i.getDataType, i.getPrecision)
case _ => throw new RuntimeException(s"Unsupported kind $index")
}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStore.scala
index 8122274..c5977c8 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStore.scala
@@ -17,19 +17,17 @@
package org.apache.openwhisk.core.database.s3
-import akka.NotUsed
import akka.actor.ActorSystem
import akka.event.Logging
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.CacheDirectives._
import akka.http.scaladsl.model.headers._
-import akka.http.scaladsl.model.{ContentType, HttpRequest, HttpResponse, ResponseEntity, Uri}
+import akka.http.scaladsl.model.{ContentType, HttpRequest, HttpResponse, Uri}
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
-import akka.stream.alpakka.s3.acl.CannedAcl
-import akka.stream.alpakka.s3.impl.S3Headers
-import akka.stream.alpakka.s3.scaladsl.S3Client
-import akka.stream.alpakka.s3.{S3Exception, S3Settings}
+import akka.stream.alpakka.s3.headers.CannedAcl
+import akka.stream.alpakka.s3.scaladsl.S3
+import akka.stream.alpakka.s3.{S3Attributes, S3Exception, S3Headers, S3Settings}
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import com.typesafe.config.Config
@@ -64,35 +62,38 @@ object S3AttachmentStoreProvider extends AttachmentStoreProvider {
override def makeStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): AttachmentStore = {
- val client = new S3Client(S3Settings(alpakkaConfigKey))
val config = loadConfigOrThrow[S3Config](ConfigKeys.s3)
- new S3AttachmentStore(client, config.bucket, config.prefixFor[D], config.signer)
+ new S3AttachmentStore(s3Settings(actorSystem.settings.config), config.bucket, config.prefixFor[D], config.signer)
}
def makeStore[D <: DocumentSerializer: ClassTag](config: Config)(implicit actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): AttachmentStore = {
- val client = new S3Client(S3Settings(config, alpakkaConfigKey))
val s3config = loadConfigOrThrow[S3Config](config, ConfigKeys.s3)
- new S3AttachmentStore(client, s3config.bucket, s3config.prefixFor[D], s3config.signer)
+ new S3AttachmentStore(s3Settings(config), s3config.bucket, s3config.prefixFor[D], s3config.signer)
}
+ private def s3Settings(config: Config) = S3Settings(config.getConfig(alpakkaConfigKey))
+
}
trait UrlSigner {
def getSignedURL(s3ObjectKey: String): Uri
}
-class S3AttachmentStore(client: S3Client, bucket: String, prefix: String, urlSigner: Option[UrlSigner])(
+class S3AttachmentStore(s3Settings: S3Settings, bucket: String, prefix: String, urlSigner: Option[UrlSigner])(
implicit system: ActorSystem,
logging: Logging,
materializer: ActorMaterializer)
extends AttachmentStore {
- private val commonS3Headers = S3Headers(
- Seq(
- CannedAcl.Private.header, //All objects are private
- `Cache-Control`(`max-age`(365.days.toSeconds))) //As objects are immutable cache them for long time
- )
+
+ private val s3attributes = S3Attributes.settings(s3Settings)
+ private val commonS3Headers = {
+ val cache = `Cache-Control`(`max-age`(365.days.toSeconds))
+ S3Headers()
+ .withCannedAcl(CannedAcl.Private) //All contents are private
+ .withCustomHeaders(Map(cache.name -> cache.value)) //As objects are immutable cache them for long time
+ }
override val scheme = "s3"
override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher
@@ -112,8 +113,9 @@ class S3AttachmentStore(client: S3Client, bucket: String, prefix: String, urlSig
//and thus use 1 remote call instead of 3
val f = docStream
.runWith(
- combinedSink(client
- .multipartUploadWithHeaders(bucket, objectKey(docId, name), contentType, s3Headers = Some(commonS3Headers))))
+ combinedSink(
+ S3.multipartUploadWithHeaders(bucket, objectKey(docId, name), contentType, s3Headers = commonS3Headers)
+ .withAttributes(s3attributes)))
.map(r => AttachResult(r.digest, r.length))
f.foreach(_ =>
@@ -136,7 +138,10 @@ class S3AttachmentStore(client: S3Client, bucket: String, prefix: String, urlSig
s"[ATT_GET] '$prefix' finding attachment '$name' of document 'id: $docId'")
val source = getAttachmentSource(objectKey(docId, name))
- val f = source.runWith(sink)
+ val f = source.flatMap {
+ case Some(x) => x.withAttributes(s3attributes).runWith(sink)
+ case None => Future.failed(NoDocumentException("Not found on 'readAttachment'."))
+ }
val g = f.transform(
{ s =>
@@ -144,14 +149,14 @@ class S3AttachmentStore(client: S3Client, bucket: String, prefix: String, urlSig
.finished(this, start, s"[ATT_GET] '$prefix' completed: found attachment '$name' of document 'id: $docId'")
s
}, {
- case s: Throwable if isMissingKeyException(s) =>
+ case e: NoDocumentException =>
transid
.finished(
this,
start,
s"[ATT_GET] '$prefix', retrieving attachment '$name' of document 'id: $docId'; not found.",
logLevel = Logging.ErrorLevel)
- NoDocumentException("Not found on 'readAttachment'.")
+ e
case e => e
})
@@ -162,29 +167,31 @@ class S3AttachmentStore(client: S3Client, bucket: String, prefix: String, urlSig
s"[ATT_GET] '$prefix' internal error, name: '$name', doc: 'id: $docId', failure: '${failure.getMessage}'")
}
- private def getAttachmentSource(objectKey: String): Source[ByteString, NotUsed] = urlSigner match {
+ private def getAttachmentSource(objectKey: String): Future[Option[Source[ByteString, Any]]] = urlSigner match {
case Some(signer) => getUrlContent(signer.getSignedURL(objectKey))
- case None => client.download(bucket, objectKey)._1
- }
- private def getUrlContent(uri: Uri): Source[ByteString, NotUsed] = {
- val future = Http().singleRequest(HttpRequest(uri = uri))
- Source
- .fromFuture(future.flatMap(entityForSuccess))
- .map(_.dataBytes)
- .flatMapConcat(identity)
+ // When reading from S3 we get an optional source of ByteString and Metadata if the object exist
+ // For such case drop the metadata
+ case None =>
+ S3.download(bucket, objectKey)
+ .withAttributes(s3attributes)
+ .runWith(Sink.head)
+ .map(x => x.map(_._1))
}
- private def entityForSuccess(resp: HttpResponse): Future[ResponseEntity] =
- resp match {
+ private def getUrlContent(uri: Uri): Future[Option[Source[ByteString, Any]]] = {
+ val future = Http().singleRequest(HttpRequest(uri = uri))
+ future.flatMap {
case HttpResponse(status, _, entity, _) if status.isSuccess() && !status.isRedirection() =>
- Future.successful(entity)
+ Future.successful(Some(entity.dataBytes))
case HttpResponse(_, _, entity, _) =>
Unmarshal(entity).to[String].map { err =>
//With CloudFront also the error message confirms to same S3 exception format
- throw new S3Exception(err)
+ val exp = new S3Exception(err)
+ if (isMissingKeyException(exp)) None else throw exp
}
}
+ }
override protected[core] def deleteAttachments(docId: DocId)(implicit transid: TransactionId): Future[Boolean] = {
val start =
@@ -193,15 +200,9 @@ class S3AttachmentStore(client: S3Client, bucket: String, prefix: String, urlSig
DATABASE_ATTS_DELETE,
s"[ATT_DELETE] deleting attachments of document 'id: $docId' with prefix ${objectKeyPrefix(docId)}")
- //S3 provides API to delete multiple objects in single call however alpakka client
- //currently does not support that and also in current usage 1 docs has at most 1 attachment
- //so current approach would also involve 2 remote calls
- val f = client
- .listBucket(bucket, Some(objectKeyPrefix(docId)))
- .mapAsync(1) { bc =>
- logging.info(this, s"[ATT_DELETE] deleting attachment '${bc.key}' of document 'id: $docId'")
- client.deleteObject(bc.bucketName, bc.key)
- }
+ val f = S3
+ .deleteObjectsByPrefix(bucket, Some(objectKeyPrefix(docId)))
+ .withAttributes(s3attributes)
.runWith(Sink.seq)
.map(_ => true)
@@ -219,8 +220,10 @@ class S3AttachmentStore(client: S3Client, bucket: String, prefix: String, urlSig
val start =
transid.started(this, DATABASE_ATT_DELETE, s"[ATT_DELETE] deleting attachment '$name' of document 'id: $docId'")
- val f = client
+ val f = S3
.deleteObject(bucket, objectKey(docId, name))
+ .withAttributes(s3attributes)
+ .runWith(Sink.head)
.map(_ => true)
f.foreach(_ =>
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupportTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupportTests.scala
index 478463f..2359e3d 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupportTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupportTests.scala
@@ -18,7 +18,7 @@
package org.apache.openwhisk.core.database.cosmosdb
import akka.stream.ActorMaterializer
-import com.microsoft.azure.cosmosdb.IndexKind.Hash
+import com.microsoft.azure.cosmosdb.IndexKind.Range
import com.microsoft.azure.cosmosdb.DataType.String
import com.microsoft.azure.cosmosdb.DocumentCollection
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
@@ -60,11 +60,6 @@ class CosmosDBSupportTests
val (_, coll) = new CosmosTest(config, client, newMapper(indexedPaths1)).initialize()
coll.getDefaultTimeToLive shouldBe -1
indexedPaths(coll) should contain theSameElementsAs indexedPaths1
-
- //Test if index definition is updated in code it gets updated in db also
- val indexedPaths2 = Set("/foo/?", "/bar2/?")
- val (_, coll2) = new CosmosTest(config, client, newMapper(indexedPaths2)).initialize()
- indexedPaths(coll2) should contain theSameElementsAs indexedPaths2
}
it should "set ttl" in {
@@ -121,7 +116,7 @@ class CosmosDBSupportTests
coll.getIndexingPolicy.getIncludedPaths.asScala.map(_.getPath).toList
protected def newTestIndexingPolicy(paths: Set[String]): IndexingPolicy =
- IndexingPolicy(includedPaths = paths.map(p => IncludedPath(p, Index(Hash, String, -1))))
+ IndexingPolicy(includedPaths = paths.map(p => IncludedPath(p, Index(Range, String, -1))))
private class CosmosTest(override val config: CosmosDBConfig,
override val client: AsyncDocumentClient,