You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ty...@apache.org on 2020/08/19 16:45:49 UTC

[openwhisk] branch master updated: Support for azure cdn with AzureBlobAttachmentStore (#4942)

This is an automated email from the ASF dual-hosted git repository.

tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 17a7c32  Support for azure cdn with AzureBlobAttachmentStore (#4942)
17a7c32 is described below

commit 17a7c3258d6648c18336af5cc52770475aa7d58a
Author: tysonnorris <tn...@adobe.com>
AuthorDate: Wed Aug 19 09:42:01 2020 -0700

    Support for azure cdn with AzureBlobAttachmentStore (#4942)
---
 common/scala/src/main/resources/application.conf   |   3 +
 .../database/azblob/AzureBlobAttachmentStore.scala | 128 +++++++++++++++------
 .../openwhisk/core/database/azblob/AzureBlob.scala |   3 +
 .../azblob/AzureBlobAttachmentStoreCDNTests.scala  |  47 ++++++++
 .../database/test/AttachmentStoreBehaviors.scala   |   2 +
 5 files changed, 145 insertions(+), 38 deletions(-)

diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index a856305..aa9fa08 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -367,6 +367,9 @@ whisk {
             retry-delay = 10 milliseconds
             #secondary-host = ""
         }
+        #azure-cdn-config {
+        #    domain-name = "<your azure cdn domain>"
+        #}
     }
 
     # transaction ID related configuration
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStore.scala
index a74eed7..3642429 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStore.scala
@@ -17,14 +17,20 @@
 
 package org.apache.openwhisk.core.database.azblob
 
+import java.time.OffsetDateTime
+
 import akka.actor.ActorSystem
 import akka.event.Logging
 import akka.event.Logging.InfoLevel
-import akka.http.scaladsl.model.ContentType
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.model.StatusCodes.NotFound
+import akka.http.scaladsl.model.{ContentType, HttpRequest, HttpResponse, Uri}
+import akka.http.scaladsl.unmarshalling.Unmarshal
 import akka.stream.ActorMaterializer
 import akka.stream.scaladsl.{Sink, Source}
 import akka.util.{ByteString, ByteStringBuilder}
-import com.azure.storage.blob.{BlobContainerAsyncClient, BlobContainerClientBuilder}
+import com.azure.storage.blob.sas.{BlobContainerSasPermission, BlobServiceSasSignatureValues}
+import com.azure.storage.blob.{BlobContainerAsyncClient, BlobContainerClientBuilder, BlobUrlParts}
 import com.azure.storage.common.StorageSharedKeyCredential
 import com.azure.storage.common.policy.{RequestRetryOptions, RetryPolicyType}
 import com.typesafe.config.Config
@@ -37,13 +43,7 @@ import org.apache.openwhisk.common.LoggingMarkers.{
 import org.apache.openwhisk.common.{Logging, TransactionId}
 import org.apache.openwhisk.core.ConfigKeys
 import org.apache.openwhisk.core.database.StoreUtils.{combinedSink, reportFailure}
-import org.apache.openwhisk.core.database.{
-  AttachResult,
-  AttachmentStore,
-  AttachmentStoreProvider,
-  DocumentSerializer,
-  NoDocumentException
-}
+import org.apache.openwhisk.core.database._
 import org.apache.openwhisk.core.entity.DocId
 import pureconfig._
 import pureconfig.generic.auto._
@@ -53,15 +53,16 @@ import scala.compat.java8.FutureConverters._
 import scala.concurrent.duration.FiniteDuration
 import scala.concurrent.{ExecutionContext, Future}
 import scala.reflect.ClassTag
-import scala.util.Success
 
+case class AzureCDNConfig(domainName: String)
 case class AzBlobConfig(endpoint: String,
                         accountKey: String,
                         containerName: String,
                         accountName: String,
                         connectionString: Option[String],
                         prefix: Option[String],
-                        retryConfig: AzBlobRetryConfig) {
+                        retryConfig: AzBlobRetryConfig,
+                        azureCdnConfig: Option[AzureCDNConfig] = None) {
   def prefixFor[D](implicit tag: ClassTag[D]): String = {
     val className = tag.runtimeClass.getSimpleName.toLowerCase
     prefix.map(p => s"$p/$className").getOrElse(className)
@@ -83,7 +84,7 @@ object AzureBlobAttachmentStoreProvider extends AttachmentStoreProvider {
                                                                    logging: Logging,
                                                                    materializer: ActorMaterializer): AttachmentStore = {
     val azConfig = loadConfigOrThrow[AzBlobConfig](config, ConfigKeys.azBlob)
-    new AzureBlobAttachmentStore(createClient(azConfig), azConfig.prefixFor[D])
+    new AzureBlobAttachmentStore(createClient(azConfig), azConfig.prefixFor[D], azConfig)
   }
 
   def createClient(config: AzBlobConfig): BlobContainerAsyncClient = {
@@ -112,9 +113,10 @@ object AzureBlobAttachmentStoreProvider extends AttachmentStoreProvider {
   }
 }
 
-class AzureBlobAttachmentStore(client: BlobContainerAsyncClient, prefix: String)(implicit system: ActorSystem,
-                                                                                 logging: Logging,
-                                                                                 materializer: ActorMaterializer)
+class AzureBlobAttachmentStore(client: BlobContainerAsyncClient, prefix: String, config: AzBlobConfig)(
+  implicit system: ActorSystem,
+  logging: Logging,
+  materializer: ActorMaterializer)
     extends AttachmentStore {
   override protected[core] def scheme: String = "az"
 
@@ -158,32 +160,32 @@ class AzureBlobAttachmentStore(client: BlobContainerAsyncClient, prefix: String)
         this,
         DATABASE_ATT_GET,
         s"[ATT_GET] '$prefix' finding attachment '$name' of document 'id: $docId'")
-    val blobClient = getBlobClient(docId, name)
-    val f = blobClient.exists().toFuture.toScala.flatMap { exists =>
-      if (exists) {
-        val bbFlux = blobClient.download()
-        val rf = Source.fromPublisher(bbFlux).map(ByteString(_)).runWith(sink)
-        rf.andThen {
-          case Success(_) =>
-            transid
-              .finished(
-                this,
-                start,
-                s"[ATT_GET] '$prefix' completed: found attachment '$name' of document 'id: $docId'")
-        }
-      } else {
-        transid
-          .finished(
-            this,
-            start,
-            s"[ATT_GET] '$prefix', retrieving attachment '$name' of document 'id: $docId'; not found.",
-            logLevel = Logging.ErrorLevel)
-        Future.failed(NoDocumentException("Not found on 'readAttachment'."))
-      }
+    val source = getAttachmentSource(objectKey(docId, name), config)
+
+    val f = source.flatMap {
+      case Some(x) => x.runWith(sink)
+      case None    => Future.failed(NoDocumentException("Not found on 'readAttachment'."))
     }
 
+    val g = f.transform(
+      { s =>
+        transid
+          .finished(this, start, s"[ATT_GET] '$prefix' completed: found attachment '$name' of document 'id: $docId'")
+        s
+      }, {
+        case e: NoDocumentException =>
+          transid
+            .finished(
+              this,
+              start,
+              s"[ATT_GET] '$prefix', retrieving attachment '$name' of document 'id: $docId'; not found.",
+              logLevel = Logging.ErrorLevel)
+          e
+        case e => e
+      })
+
     reportFailure(
-      f,
+      g,
       start,
       failure =>
         s"[ATT_GET] '$prefix' internal error, name: '$name', doc: 'id: $docId', failure: '${failure.getMessage}'")
@@ -273,4 +275,54 @@ class AzureBlobAttachmentStore(client: BlobContainerAsyncClient, prefix: String)
 
   private def getBlobClient(docId: DocId, name: String) =
     client.getBlobAsyncClient(objectKey(docId, name)).getBlockBlobAsyncClient
+
+  private def getAttachmentSource(objectKey: String, config: AzBlobConfig)(
+    implicit tid: TransactionId): Future[Option[Source[ByteString, Any]]] = {
+    val blobClient = client.getBlobAsyncClient(objectKey).getBlockBlobAsyncClient
+
+    config.azureCdnConfig match {
+      case Some(cdnConfig) =>
+        //setup sas token
+        def expiryTime = OffsetDateTime.now().plusDays(1)
+        def permissions =
+          new BlobContainerSasPermission()
+            .setReadPermission(true)
+        val sigValues = new BlobServiceSasSignatureValues(expiryTime, permissions)
+        val sas = blobClient.generateSas(sigValues)
+        //parse the url, and reset the host
+        val parts = BlobUrlParts.parse(blobClient.getBlobUrl)
+        val url = parts.setHost(cdnConfig.domainName)
+        logging.info(
+          this,
+          s"[ATT_GET] '$prefix' downloading attachment from azure cdn '$objectKey' with url (sas params not displayed) ${url}")
+        //append the sas params to the url before downloading
+        val cdnUrlWithSas = s"${url.toUrl.toString}?$sas"
+        getUrlContent(cdnUrlWithSas)
+      case None =>
+        blobClient.exists().toFuture.toScala.map { exists =>
+          if (exists) {
+            val bbFlux = blobClient.download()
+            Some(Source.fromPublisher(bbFlux).map(ByteString.fromByteBuffer))
+          } else {
+            throw NoDocumentException("Not found on 'readAttachment'.")
+          }
+        }
+    }
+  }
+  private def getUrlContent(uri: Uri): Future[Option[Source[ByteString, Any]]] = {
+    val future = Http().singleRequest(HttpRequest(uri = uri))
+    future.flatMap {
+      case HttpResponse(status, _, entity, _) if status.isSuccess() && !status.isRedirection() =>
+        Future.successful(Some(entity.dataBytes))
+      case HttpResponse(status, _, entity, _) =>
+        if (status == NotFound) {
+          entity.discardBytes()
+          throw NoDocumentException("Not found on 'readAttachment'.")
+        } else {
+          Unmarshal(entity).to[String].map { err =>
+            throw new Exception(s"failed to download ${uri} status was ${status} response was ${err}")
+          }
+        }
+    }
+  }
 }
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlob.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlob.scala
index 87018b8..0dadcb7 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlob.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlob.scala
@@ -27,6 +27,8 @@ import org.scalatest.FlatSpec
 import scala.reflect.ClassTag
 
 trait AzureBlob extends FlatSpec {
+  def azureCdnConfig: String = ""
+
   def makeAzureStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
                                                           logging: Logging,
                                                           materializer: ActorMaterializer): AttachmentStore = {
@@ -38,6 +40,7 @@ trait AzureBlob extends FlatSpec {
         |    container-name = "$containerName"
         |    account-key = "$accountKey"
         |    prefix = $prefix
+        |    $azureCdnConfig
         |  }
         |}""".stripMargin).withFallback(ConfigFactory.load()).resolve()
     AzureBlobAttachmentStoreProvider.makeStore[D](config)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStoreCDNTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStoreCDNTests.scala
new file mode 100644
index 0000000..c3664ca
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStoreCDNTests.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.azblob
+
+import org.apache.openwhisk.core.entity.WhiskEntity
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+
+@RunWith(classOf[JUnitRunner])
+class AzureBlobAttachmentStoreCDNTests extends AzureBlobAttachmentStoreBehaviorBase with AzureBlob {
+  override lazy val store = makeAzureStore[WhiskEntity]
+
+  override def storeType: String = "AzureBlob_AzureCDN"
+  override def azureCdnConfig: String =
+    """
+      |azure-cdn-config {
+      |  domain-name = ${AZ_CDN_DOMAIN}
+      |}
+    """.stripMargin
+
+  override protected def withFixture(test: NoArgTest) = {
+    assume(
+      System.getenv("AZ_CDN_DOMAIN") != null,
+      "Configure following env variables for test " +
+        "to run 'AZ_CDN_DOMAIN' ")
+    super.withFixture(test)
+  }
+
+  //With AzureCDN deletes are not immediate and instead the objects may live in CDN cache untill TTL
+  override protected val lazyDeletes = true
+
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentStoreBehaviors.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentStoreBehaviors.scala
index d560928..e63e99c 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentStoreBehaviors.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentStoreBehaviors.scala
@@ -104,6 +104,7 @@ trait AttachmentStoreBehaviors
     r21.length shouldBe 1000
     r22.length shouldBe 2000
 
+    println(s"creating doc ${docId}")
     attachmentBytes(docId, "c1").futureValue.result() shouldBe ByteString(b1)
     attachmentBytes(docId, "c2").futureValue.result() shouldBe ByteString(b2)
     attachmentBytes(docId, "c3").futureValue.result() shouldBe ByteString(b3)
@@ -114,6 +115,7 @@ trait AttachmentStoreBehaviors
     store.deleteAttachment(docId, "c1").futureValue shouldBe true
 
     //Non deleted attachments related to same docId must still be accessible
+    println(s"read missing doc ${docId}")
     if (!lazyDeletes) attachmentBytes(docId, "c1").failed.futureValue shouldBe a[NoDocumentException]
     attachmentBytes(docId, "c2").futureValue.result() shouldBe ByteString(b2)
     attachmentBytes(docId, "c3").futureValue.result() shouldBe ByteString(b3)