You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ty...@apache.org on 2020/08/19 16:45:49 UTC
[openwhisk] branch master updated: Support for azure cdn with
AzureBlobAttachmentStore (#4942)
This is an automated email from the ASF dual-hosted git repository.
tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 17a7c32 Support for azure cdn with AzureBlobAttachmentStore (#4942)
17a7c32 is described below
commit 17a7c3258d6648c18336af5cc52770475aa7d58a
Author: tysonnorris <tn...@adobe.com>
AuthorDate: Wed Aug 19 09:42:01 2020 -0700
Support for azure cdn with AzureBlobAttachmentStore (#4942)
---
common/scala/src/main/resources/application.conf | 3 +
.../database/azblob/AzureBlobAttachmentStore.scala | 128 +++++++++++++++------
.../openwhisk/core/database/azblob/AzureBlob.scala | 3 +
.../azblob/AzureBlobAttachmentStoreCDNTests.scala | 47 ++++++++
.../database/test/AttachmentStoreBehaviors.scala | 2 +
5 files changed, 145 insertions(+), 38 deletions(-)
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index a856305..aa9fa08 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -367,6 +367,9 @@ whisk {
retry-delay = 10 milliseconds
#secondary-host = ""
}
+ #azure-cdn-config {
+ # domain-name = "<your azure cdn domain>"
+ #}
}
# transaction ID related configuration
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStore.scala
index a74eed7..3642429 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStore.scala
@@ -17,14 +17,20 @@
package org.apache.openwhisk.core.database.azblob
+import java.time.OffsetDateTime
+
import akka.actor.ActorSystem
import akka.event.Logging
import akka.event.Logging.InfoLevel
-import akka.http.scaladsl.model.ContentType
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.model.StatusCodes.NotFound
+import akka.http.scaladsl.model.{ContentType, HttpRequest, HttpResponse, Uri}
+import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.util.{ByteString, ByteStringBuilder}
-import com.azure.storage.blob.{BlobContainerAsyncClient, BlobContainerClientBuilder}
+import com.azure.storage.blob.sas.{BlobContainerSasPermission, BlobServiceSasSignatureValues}
+import com.azure.storage.blob.{BlobContainerAsyncClient, BlobContainerClientBuilder, BlobUrlParts}
import com.azure.storage.common.StorageSharedKeyCredential
import com.azure.storage.common.policy.{RequestRetryOptions, RetryPolicyType}
import com.typesafe.config.Config
@@ -37,13 +43,7 @@ import org.apache.openwhisk.common.LoggingMarkers.{
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.database.StoreUtils.{combinedSink, reportFailure}
-import org.apache.openwhisk.core.database.{
- AttachResult,
- AttachmentStore,
- AttachmentStoreProvider,
- DocumentSerializer,
- NoDocumentException
-}
+import org.apache.openwhisk.core.database._
import org.apache.openwhisk.core.entity.DocId
import pureconfig._
import pureconfig.generic.auto._
@@ -53,15 +53,16 @@ import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}
import scala.reflect.ClassTag
-import scala.util.Success
+case class AzureCDNConfig(domainName: String)
case class AzBlobConfig(endpoint: String,
accountKey: String,
containerName: String,
accountName: String,
connectionString: Option[String],
prefix: Option[String],
- retryConfig: AzBlobRetryConfig) {
+ retryConfig: AzBlobRetryConfig,
+ azureCdnConfig: Option[AzureCDNConfig] = None) {
def prefixFor[D](implicit tag: ClassTag[D]): String = {
val className = tag.runtimeClass.getSimpleName.toLowerCase
prefix.map(p => s"$p/$className").getOrElse(className)
@@ -83,7 +84,7 @@ object AzureBlobAttachmentStoreProvider extends AttachmentStoreProvider {
logging: Logging,
materializer: ActorMaterializer): AttachmentStore = {
val azConfig = loadConfigOrThrow[AzBlobConfig](config, ConfigKeys.azBlob)
- new AzureBlobAttachmentStore(createClient(azConfig), azConfig.prefixFor[D])
+ new AzureBlobAttachmentStore(createClient(azConfig), azConfig.prefixFor[D], azConfig)
}
def createClient(config: AzBlobConfig): BlobContainerAsyncClient = {
@@ -112,9 +113,10 @@ object AzureBlobAttachmentStoreProvider extends AttachmentStoreProvider {
}
}
-class AzureBlobAttachmentStore(client: BlobContainerAsyncClient, prefix: String)(implicit system: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer)
+class AzureBlobAttachmentStore(client: BlobContainerAsyncClient, prefix: String, config: AzBlobConfig)(
+ implicit system: ActorSystem,
+ logging: Logging,
+ materializer: ActorMaterializer)
extends AttachmentStore {
override protected[core] def scheme: String = "az"
@@ -158,32 +160,32 @@ class AzureBlobAttachmentStore(client: BlobContainerAsyncClient, prefix: String)
this,
DATABASE_ATT_GET,
s"[ATT_GET] '$prefix' finding attachment '$name' of document 'id: $docId'")
- val blobClient = getBlobClient(docId, name)
- val f = blobClient.exists().toFuture.toScala.flatMap { exists =>
- if (exists) {
- val bbFlux = blobClient.download()
- val rf = Source.fromPublisher(bbFlux).map(ByteString(_)).runWith(sink)
- rf.andThen {
- case Success(_) =>
- transid
- .finished(
- this,
- start,
- s"[ATT_GET] '$prefix' completed: found attachment '$name' of document 'id: $docId'")
- }
- } else {
- transid
- .finished(
- this,
- start,
- s"[ATT_GET] '$prefix', retrieving attachment '$name' of document 'id: $docId'; not found.",
- logLevel = Logging.ErrorLevel)
- Future.failed(NoDocumentException("Not found on 'readAttachment'."))
- }
+ val source = getAttachmentSource(objectKey(docId, name), config)
+
+ val f = source.flatMap {
+ case Some(x) => x.runWith(sink)
+ case None => Future.failed(NoDocumentException("Not found on 'readAttachment'."))
}
+ val g = f.transform(
+ { s =>
+ transid
+ .finished(this, start, s"[ATT_GET] '$prefix' completed: found attachment '$name' of document 'id: $docId'")
+ s
+ }, {
+ case e: NoDocumentException =>
+ transid
+ .finished(
+ this,
+ start,
+ s"[ATT_GET] '$prefix', retrieving attachment '$name' of document 'id: $docId'; not found.",
+ logLevel = Logging.ErrorLevel)
+ e
+ case e => e
+ })
+
reportFailure(
- f,
+ g,
start,
failure =>
s"[ATT_GET] '$prefix' internal error, name: '$name', doc: 'id: $docId', failure: '${failure.getMessage}'")
@@ -273,4 +275,54 @@ class AzureBlobAttachmentStore(client: BlobContainerAsyncClient, prefix: String)
private def getBlobClient(docId: DocId, name: String) =
client.getBlobAsyncClient(objectKey(docId, name)).getBlockBlobAsyncClient
+
+ private def getAttachmentSource(objectKey: String, config: AzBlobConfig)(
+ implicit tid: TransactionId): Future[Option[Source[ByteString, Any]]] = {
+ val blobClient = client.getBlobAsyncClient(objectKey).getBlockBlobAsyncClient
+
+ config.azureCdnConfig match {
+ case Some(cdnConfig) =>
+ //setup sas token
+ def expiryTime = OffsetDateTime.now().plusDays(1)
+ def permissions =
+ new BlobContainerSasPermission()
+ .setReadPermission(true)
+ val sigValues = new BlobServiceSasSignatureValues(expiryTime, permissions)
+ val sas = blobClient.generateSas(sigValues)
+ //parse the url, and reset the host
+ val parts = BlobUrlParts.parse(blobClient.getBlobUrl)
+ val url = parts.setHost(cdnConfig.domainName)
+ logging.info(
+ this,
+ s"[ATT_GET] '$prefix' downloading attachment from azure cdn '$objectKey' with url (sas params not displayed) ${url}")
+ //append the sas params to the url before downloading
+ val cdnUrlWithSas = s"${url.toUrl.toString}?$sas"
+ getUrlContent(cdnUrlWithSas)
+ case None =>
+ blobClient.exists().toFuture.toScala.map { exists =>
+ if (exists) {
+ val bbFlux = blobClient.download()
+ Some(Source.fromPublisher(bbFlux).map(ByteString.fromByteBuffer))
+ } else {
+ throw NoDocumentException("Not found on 'readAttachment'.")
+ }
+ }
+ }
+ }
+ private def getUrlContent(uri: Uri): Future[Option[Source[ByteString, Any]]] = {
+ val future = Http().singleRequest(HttpRequest(uri = uri))
+ future.flatMap {
+ case HttpResponse(status, _, entity, _) if status.isSuccess() && !status.isRedirection() =>
+ Future.successful(Some(entity.dataBytes))
+ case HttpResponse(status, _, entity, _) =>
+ if (status == NotFound) {
+ entity.discardBytes()
+ throw NoDocumentException("Not found on 'readAttachment'.")
+ } else {
+ Unmarshal(entity).to[String].map { err =>
+ throw new Exception(s"failed to download ${uri} status was ${status} response was ${err}")
+ }
+ }
+ }
+ }
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlob.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlob.scala
index 87018b8..0dadcb7 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlob.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlob.scala
@@ -27,6 +27,8 @@ import org.scalatest.FlatSpec
import scala.reflect.ClassTag
trait AzureBlob extends FlatSpec {
+ def azureCdnConfig: String = ""
+
def makeAzureStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): AttachmentStore = {
@@ -38,6 +40,7 @@ trait AzureBlob extends FlatSpec {
| container-name = "$containerName"
| account-key = "$accountKey"
| prefix = $prefix
+ | $azureCdnConfig
| }
|}""".stripMargin).withFallback(ConfigFactory.load()).resolve()
AzureBlobAttachmentStoreProvider.makeStore[D](config)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStoreCDNTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStoreCDNTests.scala
new file mode 100644
index 0000000..c3664ca
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStoreCDNTests.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.azblob
+
+import org.apache.openwhisk.core.entity.WhiskEntity
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+
+@RunWith(classOf[JUnitRunner])
+class AzureBlobAttachmentStoreCDNTests extends AzureBlobAttachmentStoreBehaviorBase with AzureBlob {
+ override lazy val store = makeAzureStore[WhiskEntity]
+
+ override def storeType: String = "AzureBlob_AzureCDN"
+ override def azureCdnConfig: String =
+ """
+ |azure-cdn-config {
+ | domain-name = ${AZ_CDN_DOMAIN}
+ |}
+ """.stripMargin
+
+ override protected def withFixture(test: NoArgTest) = {
+ assume(
+ System.getenv("AZ_CDN_DOMAIN") != null,
+ "Configure following env variables for test " +
+ "to run 'AZ_CDN_DOMAIN' ")
+ super.withFixture(test)
+ }
+
+ //With AzureCDN deletes are not immediate and instead the objects may live in CDN cache untill TTL
+ override protected val lazyDeletes = true
+
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentStoreBehaviors.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentStoreBehaviors.scala
index d560928..e63e99c 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentStoreBehaviors.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentStoreBehaviors.scala
@@ -104,6 +104,7 @@ trait AttachmentStoreBehaviors
r21.length shouldBe 1000
r22.length shouldBe 2000
+ println(s"creating doc ${docId}")
attachmentBytes(docId, "c1").futureValue.result() shouldBe ByteString(b1)
attachmentBytes(docId, "c2").futureValue.result() shouldBe ByteString(b2)
attachmentBytes(docId, "c3").futureValue.result() shouldBe ByteString(b3)
@@ -114,6 +115,7 @@ trait AttachmentStoreBehaviors
store.deleteAttachment(docId, "c1").futureValue shouldBe true
//Non deleted attachments related to same docId must still be accessible
+ println(s"read missing doc ${docId}")
if (!lazyDeletes) attachmentBytes(docId, "c1").failed.futureValue shouldBe a[NoDocumentException]
attachmentBytes(docId, "c2").futureValue.result() shouldBe ByteString(b2)
attachmentBytes(docId, "c3").futureValue.result() shouldBe ByteString(b3)