You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ch...@apache.org on 2019/05/22 08:30:45 UTC

[incubator-openwhisk] branch master updated: Update to Alpakka S3 Connector v1.0 release (#4479)

This is an automated email from the ASF dual-hosted git repository.

chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 73732b4  Update to Alpakka S3 Connector v1.0 release (#4479)
73732b4 is described below

commit 73732b4ace1fcce277a7a66d2e00ff7c6dddcca7
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Wed May 22 14:00:32 2019 +0530

    Update to Alpakka S3 Connector v1.0 release (#4479)
    
    Updates to Alpakka S3 Connector v1.0.1 release.
    
    This commit also includes some fixes related to #4484 which were causing compatibility issues with existing setup. For existing setup indexes were still using `Hash` indexing and that caused failure while creating `IndexingPolicy` upon collection read. So added back support for `Hash` index but not using them to create new indexes now
---
 common/scala/build.gradle                          |  2 +-
 common/scala/src/main/resources/s3-reference.conf  |  2 +-
 .../core/database/cosmosdb/IndexingPolicy.scala    |  3 +
 .../core/database/s3/S3AttachmentStore.scala       | 91 +++++++++++-----------
 .../database/cosmosdb/CosmosDBSupportTests.scala   |  9 +--
 5 files changed, 54 insertions(+), 53 deletions(-)

diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 64649f2..2f457fc 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -85,7 +85,7 @@ dependencies {
     compile 'io.reactivex:rxjava-reactive-streams:1.2.1'
     compile ('com.microsoft.azure:azure-cosmosdb:2.4.2')
 
-    compile ('com.lightbend.akka:akka-stream-alpakka-s3_2.12:0.19') {
+    compile ('com.lightbend.akka:akka-stream-alpakka-s3_2.12:1.0.1') {
         exclude group: 'org.apache.httpcomponents' //Not used as alpakka uses akka-http
         exclude group: 'com.fasterxml.jackson.core'
         exclude group: 'com.fasterxml.jackson.dataformat'
diff --git a/common/scala/src/main/resources/s3-reference.conf b/common/scala/src/main/resources/s3-reference.conf
index 664bdcf..39b09c5 100644
--- a/common/scala/src/main/resources/s3-reference.conf
+++ b/common/scala/src/main/resources/s3-reference.conf
@@ -26,7 +26,7 @@ whisk {
         # timeout = 10 min
     # }
 
-    # See https://developer.lightbend.com/docs/alpakka/current/s3.html#usage
+    # See https://github.com/akka/alpakka/blob/v1.0.0/s3/src/main/resources/reference.conf
     alpakka {
       # whether the buffer request chunks (up to 5MB each) to "memory" or "disk"
       buffer = "memory"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/IndexingPolicy.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/IndexingPolicy.scala
index 8d35f71..82c25b5 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/IndexingPolicy.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/IndexingPolicy.scala
@@ -19,6 +19,7 @@ package org.apache.openwhisk.core.database.cosmosdb
 
 import com.microsoft.azure.cosmosdb.{
   DataType,
+  HashIndex,
   IndexKind,
   RangeIndex,
   ExcludedPath => JExcludedPath,
@@ -99,6 +100,7 @@ object ExcludedPath {
 
 case class Index(kind: IndexKind, dataType: DataType, precision: Int) {
   def asJava(): JIndex = kind match {
+    case IndexKind.Hash  => JIndex.Hash(dataType, precision)
     case IndexKind.Range => JIndex.Range(dataType, precision)
     case _               => throw new RuntimeException(s"Unsupported kind $kind")
   }
@@ -106,6 +108,7 @@ case class Index(kind: IndexKind, dataType: DataType, precision: Int) {
 
 object Index {
   def apply(index: JIndex): Index = index match {
+    case i: HashIndex  => Index(i.getKind, i.getDataType, i.getPrecision)
     case i: RangeIndex => Index(i.getKind, i.getDataType, i.getPrecision)
     case _             => throw new RuntimeException(s"Unsupported kind $index")
   }
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStore.scala
index 8122274..c5977c8 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStore.scala
@@ -17,19 +17,17 @@
 
 package org.apache.openwhisk.core.database.s3
 
-import akka.NotUsed
 import akka.actor.ActorSystem
 import akka.event.Logging
 import akka.http.scaladsl.Http
 import akka.http.scaladsl.model.headers.CacheDirectives._
 import akka.http.scaladsl.model.headers._
-import akka.http.scaladsl.model.{ContentType, HttpRequest, HttpResponse, ResponseEntity, Uri}
+import akka.http.scaladsl.model.{ContentType, HttpRequest, HttpResponse, Uri}
 import akka.http.scaladsl.unmarshalling.Unmarshal
 import akka.stream.ActorMaterializer
-import akka.stream.alpakka.s3.acl.CannedAcl
-import akka.stream.alpakka.s3.impl.S3Headers
-import akka.stream.alpakka.s3.scaladsl.S3Client
-import akka.stream.alpakka.s3.{S3Exception, S3Settings}
+import akka.stream.alpakka.s3.headers.CannedAcl
+import akka.stream.alpakka.s3.scaladsl.S3
+import akka.stream.alpakka.s3.{S3Attributes, S3Exception, S3Headers, S3Settings}
 import akka.stream.scaladsl.{Sink, Source}
 import akka.util.ByteString
 import com.typesafe.config.Config
@@ -64,35 +62,38 @@ object S3AttachmentStoreProvider extends AttachmentStoreProvider {
   override def makeStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
                                                               logging: Logging,
                                                               materializer: ActorMaterializer): AttachmentStore = {
-    val client = new S3Client(S3Settings(alpakkaConfigKey))
     val config = loadConfigOrThrow[S3Config](ConfigKeys.s3)
-    new S3AttachmentStore(client, config.bucket, config.prefixFor[D], config.signer)
+    new S3AttachmentStore(s3Settings(actorSystem.settings.config), config.bucket, config.prefixFor[D], config.signer)
   }
 
   def makeStore[D <: DocumentSerializer: ClassTag](config: Config)(implicit actorSystem: ActorSystem,
                                                                    logging: Logging,
                                                                    materializer: ActorMaterializer): AttachmentStore = {
-    val client = new S3Client(S3Settings(config, alpakkaConfigKey))
     val s3config = loadConfigOrThrow[S3Config](config, ConfigKeys.s3)
-    new S3AttachmentStore(client, s3config.bucket, s3config.prefixFor[D], s3config.signer)
+    new S3AttachmentStore(s3Settings(config), s3config.bucket, s3config.prefixFor[D], s3config.signer)
   }
 
+  private def s3Settings(config: Config) = S3Settings(config.getConfig(alpakkaConfigKey))
+
 }
 
 trait UrlSigner {
   def getSignedURL(s3ObjectKey: String): Uri
 }
 
-class S3AttachmentStore(client: S3Client, bucket: String, prefix: String, urlSigner: Option[UrlSigner])(
+class S3AttachmentStore(s3Settings: S3Settings, bucket: String, prefix: String, urlSigner: Option[UrlSigner])(
   implicit system: ActorSystem,
   logging: Logging,
   materializer: ActorMaterializer)
     extends AttachmentStore {
-  private val commonS3Headers = S3Headers(
-    Seq(
-      CannedAcl.Private.header, //All objects are private
-      `Cache-Control`(`max-age`(365.days.toSeconds))) //As objects are immutable cache them for long time
-  )
+
+  private val s3attributes = S3Attributes.settings(s3Settings)
+  private val commonS3Headers = {
+    val cache = `Cache-Control`(`max-age`(365.days.toSeconds))
+    S3Headers()
+      .withCannedAcl(CannedAcl.Private) //All contents are private
+      .withCustomHeaders(Map(cache.name -> cache.value)) //As objects are immutable cache them for long time
+  }
   override val scheme = "s3"
 
   override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher
@@ -112,8 +113,9 @@ class S3AttachmentStore(client: S3Client, bucket: String, prefix: String, urlSig
     //and thus use 1 remote call instead of 3
     val f = docStream
       .runWith(
-        combinedSink(client
-          .multipartUploadWithHeaders(bucket, objectKey(docId, name), contentType, s3Headers = Some(commonS3Headers))))
+        combinedSink(
+          S3.multipartUploadWithHeaders(bucket, objectKey(docId, name), contentType, s3Headers = commonS3Headers)
+            .withAttributes(s3attributes)))
       .map(r => AttachResult(r.digest, r.length))
 
     f.foreach(_ =>
@@ -136,7 +138,10 @@ class S3AttachmentStore(client: S3Client, bucket: String, prefix: String, urlSig
         s"[ATT_GET] '$prefix' finding attachment '$name' of document 'id: $docId'")
     val source = getAttachmentSource(objectKey(docId, name))
 
-    val f = source.runWith(sink)
+    val f = source.flatMap {
+      case Some(x) => x.withAttributes(s3attributes).runWith(sink)
+      case None    => Future.failed(NoDocumentException("Not found on 'readAttachment'."))
+    }
 
     val g = f.transform(
       { s =>
@@ -144,14 +149,14 @@ class S3AttachmentStore(client: S3Client, bucket: String, prefix: String, urlSig
           .finished(this, start, s"[ATT_GET] '$prefix' completed: found attachment '$name' of document 'id: $docId'")
         s
       }, {
-        case s: Throwable if isMissingKeyException(s) =>
+        case e: NoDocumentException =>
           transid
             .finished(
               this,
               start,
               s"[ATT_GET] '$prefix', retrieving attachment '$name' of document 'id: $docId'; not found.",
               logLevel = Logging.ErrorLevel)
-          NoDocumentException("Not found on 'readAttachment'.")
+          e
         case e => e
       })
 
@@ -162,29 +167,31 @@ class S3AttachmentStore(client: S3Client, bucket: String, prefix: String, urlSig
         s"[ATT_GET] '$prefix' internal error, name: '$name', doc: 'id: $docId', failure: '${failure.getMessage}'")
   }
 
-  private def getAttachmentSource(objectKey: String): Source[ByteString, NotUsed] = urlSigner match {
+  private def getAttachmentSource(objectKey: String): Future[Option[Source[ByteString, Any]]] = urlSigner match {
     case Some(signer) => getUrlContent(signer.getSignedURL(objectKey))
-    case None         => client.download(bucket, objectKey)._1
-  }
 
-  private def getUrlContent(uri: Uri): Source[ByteString, NotUsed] = {
-    val future = Http().singleRequest(HttpRequest(uri = uri))
-    Source
-      .fromFuture(future.flatMap(entityForSuccess))
-      .map(_.dataBytes)
-      .flatMapConcat(identity)
+    // When reading from S3 we get an optional source of ByteString and Metadata if the object exist
+    // For such case drop the metadata
+    case None =>
+      S3.download(bucket, objectKey)
+        .withAttributes(s3attributes)
+        .runWith(Sink.head)
+        .map(x => x.map(_._1))
   }
 
-  private def entityForSuccess(resp: HttpResponse): Future[ResponseEntity] =
-    resp match {
+  private def getUrlContent(uri: Uri): Future[Option[Source[ByteString, Any]]] = {
+    val future = Http().singleRequest(HttpRequest(uri = uri))
+    future.flatMap {
       case HttpResponse(status, _, entity, _) if status.isSuccess() && !status.isRedirection() =>
-        Future.successful(entity)
+        Future.successful(Some(entity.dataBytes))
       case HttpResponse(_, _, entity, _) =>
         Unmarshal(entity).to[String].map { err =>
           //With CloudFront also the error message confirms to same S3 exception format
-          throw new S3Exception(err)
+          val exp = new S3Exception(err)
+          if (isMissingKeyException(exp)) None else throw exp
         }
     }
+  }
 
   override protected[core] def deleteAttachments(docId: DocId)(implicit transid: TransactionId): Future[Boolean] = {
     val start =
@@ -193,15 +200,9 @@ class S3AttachmentStore(client: S3Client, bucket: String, prefix: String, urlSig
         DATABASE_ATTS_DELETE,
         s"[ATT_DELETE] deleting attachments of document 'id: $docId' with prefix ${objectKeyPrefix(docId)}")
 
-    //S3 provides API to delete multiple objects in single call however alpakka client
-    //currently does not support that and also in current usage 1 docs has at most 1 attachment
-    //so current approach would also involve 2 remote calls
-    val f = client
-      .listBucket(bucket, Some(objectKeyPrefix(docId)))
-      .mapAsync(1) { bc =>
-        logging.info(this, s"[ATT_DELETE] deleting attachment '${bc.key}' of document 'id: $docId'")
-        client.deleteObject(bc.bucketName, bc.key)
-      }
+    val f = S3
+      .deleteObjectsByPrefix(bucket, Some(objectKeyPrefix(docId)))
+      .withAttributes(s3attributes)
       .runWith(Sink.seq)
       .map(_ => true)
 
@@ -219,8 +220,10 @@ class S3AttachmentStore(client: S3Client, bucket: String, prefix: String, urlSig
     val start =
       transid.started(this, DATABASE_ATT_DELETE, s"[ATT_DELETE] deleting attachment '$name' of document 'id: $docId'")
 
-    val f = client
+    val f = S3
       .deleteObject(bucket, objectKey(docId, name))
+      .withAttributes(s3attributes)
+      .runWith(Sink.head)
       .map(_ => true)
 
     f.foreach(_ =>
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupportTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupportTests.scala
index 478463f..2359e3d 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupportTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupportTests.scala
@@ -18,7 +18,7 @@
 package org.apache.openwhisk.core.database.cosmosdb
 
 import akka.stream.ActorMaterializer
-import com.microsoft.azure.cosmosdb.IndexKind.Hash
+import com.microsoft.azure.cosmosdb.IndexKind.Range
 import com.microsoft.azure.cosmosdb.DataType.String
 import com.microsoft.azure.cosmosdb.DocumentCollection
 import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
@@ -60,11 +60,6 @@ class CosmosDBSupportTests
     val (_, coll) = new CosmosTest(config, client, newMapper(indexedPaths1)).initialize()
     coll.getDefaultTimeToLive shouldBe -1
     indexedPaths(coll) should contain theSameElementsAs indexedPaths1
-
-    //Test if index definition is updated in code it gets updated in db also
-    val indexedPaths2 = Set("/foo/?", "/bar2/?")
-    val (_, coll2) = new CosmosTest(config, client, newMapper(indexedPaths2)).initialize()
-    indexedPaths(coll2) should contain theSameElementsAs indexedPaths2
   }
 
   it should "set ttl" in {
@@ -121,7 +116,7 @@ class CosmosDBSupportTests
     coll.getIndexingPolicy.getIncludedPaths.asScala.map(_.getPath).toList
 
   protected def newTestIndexingPolicy(paths: Set[String]): IndexingPolicy =
-    IndexingPolicy(includedPaths = paths.map(p => IncludedPath(p, Index(Hash, String, -1))))
+    IndexingPolicy(includedPaths = paths.map(p => IncludedPath(p, Index(Range, String, -1))))
 
   private class CosmosTest(override val config: CosmosDBConfig,
                            override val client: AsyncDocumentClient,