You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ty...@apache.org on 2020/08/10 16:22:15 UTC

[openwhisk] branch master updated: AttachmentStore implementation based on Azure Blob (#4716)

This is an automated email from the ASF dual-hosted git repository.

tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 35f1a3e  AttachmentStore implementation based on Azure Blob (#4716)
35f1a3e is described below

commit 35f1a3ec2154a1f0ed5dfaca480a14cc9a11c853
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Mon Aug 10 21:51:34 2020 +0530

    AttachmentStore implementation based on Azure Blob (#4716)
    
    * Initial implementation for the Azure Blob based AttachmentStore
    
    * add retry options for azure blob sdk to avoid sporadic long running requests
    removed redundant blob list code
    
    * add additional config for retry options
    
    Co-authored-by: Andy Steed <an...@adobe.com>
    Co-authored-by: Tyson Norris <tn...@adobe.com>
---
 common/scala/build.gradle                          |   4 +
 common/scala/src/main/resources/application.conf   |  34 +++
 .../org/apache/openwhisk/core/WhiskConfig.scala    |   2 +
 .../database/azblob/AzureBlobAttachmentStore.scala | 276 +++++++++++++++++++++
 .../openwhisk/core/database/azblob/AzureBlob.scala |  60 +++++
 .../AzureBlobAttachmentStoreBehaviorBase.scala     |  55 ++++
 .../azblob/AzureBlobAttachmentStoreITTests.scala   |  29 +++
 .../core/database/azblob/AzureBlobConfigTest.scala |  40 +++
 .../database/test/AttachmentStoreBehaviors.scala   |   2 +
 9 files changed, 502 insertions(+)

diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 4c434cc..82f2a8c 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -94,6 +94,10 @@ dependencies {
         exclude group: 'com.fasterxml.jackson.dataformat'
     }
     compile "com.amazonaws:aws-java-sdk-cloudfront:1.11.517"
+
+    compile ("com.azure:azure-storage-blob:12.6.0") {
+        exclude group: "com.azure", module: "azure-core-test"
+    }
 }
 
 configurations {
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index a794ee2..a856305 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -335,6 +335,40 @@ whisk {
     #   }
     # }
 
+    azure-blob {
+        # Config property when using AzureBlobAttachmentStore
+        # whisk {
+        #  spi {
+        #    AttachmentStoreProvider = org.apache.openwhisk.core.database.azblob.AzureBlobAttachmentStoreProvider
+        #  }
+        #}
+
+        # Blob container endpoint like https://foostore.blob.core.windows.net/test-ow-travis
+        # It is of format https://<account-name>.blob.core.windows.net/<container-name>
+        # endpoint =
+
+        # Storage account name
+        # account-name =
+
+        # Container name within storage account used to store the blobs
+        # container-name =
+
+        # Shared key credentials
+        # https://github.com/Azure/azure-sdk-for-java/tree/master/sdk/storage/azure-storage-blob#shared-key-credential
+        # account-key
+
+        # Folder path within the container (optional)
+        # prefix
+
+        retry-config {
+            retry-policy-type = FIXED
+            max-tries = 3
+            try-timeout = 5 seconds
+            retry-delay = 10 milliseconds
+            #secondary-host = ""
+        }
+    }
+
     # transaction ID related configuration
     transactions {
         header = "X-Request-ID"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 6a54f63..0dd1c16 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -272,4 +272,6 @@ object ConfigKeys {
   val apacheClientConfig = "whisk.apache-client"
 
   val parameterStorage = "whisk.parameter-storage"
+
+  val azBlob = "whisk.azure-blob"
 }
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStore.scala
new file mode 100644
index 0000000..a74eed7
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStore.scala
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.azblob
+
+import akka.actor.ActorSystem
+import akka.event.Logging
+import akka.event.Logging.InfoLevel
+import akka.http.scaladsl.model.ContentType
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.{Sink, Source}
+import akka.util.{ByteString, ByteStringBuilder}
+import com.azure.storage.blob.{BlobContainerAsyncClient, BlobContainerClientBuilder}
+import com.azure.storage.common.StorageSharedKeyCredential
+import com.azure.storage.common.policy.{RequestRetryOptions, RetryPolicyType}
+import com.typesafe.config.Config
+import org.apache.openwhisk.common.LoggingMarkers.{
+  DATABASE_ATTS_DELETE,
+  DATABASE_ATT_DELETE,
+  DATABASE_ATT_GET,
+  DATABASE_ATT_SAVE
+}
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.database.StoreUtils.{combinedSink, reportFailure}
+import org.apache.openwhisk.core.database.{
+  AttachResult,
+  AttachmentStore,
+  AttachmentStoreProvider,
+  DocumentSerializer,
+  NoDocumentException
+}
+import org.apache.openwhisk.core.entity.DocId
+import pureconfig._
+import pureconfig.generic.auto._
+import reactor.core.publisher.Flux
+
+import scala.compat.java8.FutureConverters._
+import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.{ExecutionContext, Future}
+import scala.reflect.ClassTag
+import scala.util.Success
+
+case class AzBlobConfig(endpoint: String,
+                        accountKey: String,
+                        containerName: String,
+                        accountName: String,
+                        connectionString: Option[String],
+                        prefix: Option[String],
+                        retryConfig: AzBlobRetryConfig) {
+  def prefixFor[D](implicit tag: ClassTag[D]): String = {
+    val className = tag.runtimeClass.getSimpleName.toLowerCase
+    prefix.map(p => s"$p/$className").getOrElse(className)
+  }
+}
+case class AzBlobRetryConfig(retryPolicyType: RetryPolicyType,
+                             maxTries: Int,
+                             tryTimeout: FiniteDuration,
+                             retryDelay: FiniteDuration,
+                             secondaryHost: Option[String])
+object AzureBlobAttachmentStoreProvider extends AttachmentStoreProvider {
+  override def makeStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
+                                                              logging: Logging,
+                                                              materializer: ActorMaterializer): AttachmentStore = {
+    makeStore[D](actorSystem.settings.config)
+  }
+
+  def makeStore[D <: DocumentSerializer: ClassTag](config: Config)(implicit actorSystem: ActorSystem,
+                                                                   logging: Logging,
+                                                                   materializer: ActorMaterializer): AttachmentStore = {
+    val azConfig = loadConfigOrThrow[AzBlobConfig](config, ConfigKeys.azBlob)
+    new AzureBlobAttachmentStore(createClient(azConfig), azConfig.prefixFor[D])
+  }
+
+  def createClient(config: AzBlobConfig): BlobContainerAsyncClient = {
+    val builder = new BlobContainerClientBuilder()
+
+    //If connection string is specified then it would have all needed info
+    //Mostly used for testing using Azurite
+    config.connectionString match {
+      case Some(s) => builder.connectionString(s)
+      case _ =>
+        builder
+          .endpoint(config.endpoint)
+          .credential(new StorageSharedKeyCredential(config.accountName, config.accountKey))
+    }
+
+    builder
+      .containerName(config.containerName)
+      .retryOptions(new RequestRetryOptions(
+        config.retryConfig.retryPolicyType,
+        config.retryConfig.maxTries,
+        config.retryConfig.tryTimeout.toSeconds.toInt,
+        config.retryConfig.retryDelay.toMillis,
+        config.retryConfig.retryDelay.toMillis,
+        config.retryConfig.secondaryHost.orNull))
+      .buildAsyncClient()
+  }
+}
+
+class AzureBlobAttachmentStore(client: BlobContainerAsyncClient, prefix: String)(implicit system: ActorSystem,
+                                                                                 logging: Logging,
+                                                                                 materializer: ActorMaterializer)
+    extends AttachmentStore {
+  override protected[core] def scheme: String = "az"
+
+  override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher
+
+  override protected[core] def attach(
+    docId: DocId,
+    name: String,
+    contentType: ContentType,
+    docStream: Source[ByteString, _])(implicit transid: TransactionId): Future[AttachResult] = {
+    require(name != null, "name undefined")
+    val start =
+      transid.started(this, DATABASE_ATT_SAVE, s"[ATT_PUT] uploading attachment '$name' of document 'id: $docId'")
+    val blobClient = getBlobClient(docId, name)
+
+    //TODO Use BlobAsyncClient#upload(Flux<ByteBuffer>, com.azure.storage.blob.models.ParallelTransferOptions, boolean)
+    val uploadSink = Sink.fold[ByteStringBuilder, ByteString](new ByteStringBuilder)((builder, b) => builder ++= b)
+
+    val f = docStream.runWith(combinedSink(uploadSink))
+    val g = f.flatMap { r =>
+      val buff = r.uploadResult.result().compact
+      val uf = blobClient.upload(Flux.fromArray(Array(buff.asByteBuffer)), buff.size).toFuture.toScala
+      uf.map(_ => AttachResult(r.digest, r.length))
+    }
+
+    g.foreach(_ =>
+      transid
+        .finished(this, start, s"[ATT_PUT] '$prefix' completed uploading attachment '$name' of document 'id: $docId'"))
+
+    reportFailure(
+      g,
+      start,
+      failure => s"[ATT_PUT] '$prefix' internal error, name: '$name', doc: '$docId', failure: '${failure.getMessage}'")
+  }
+
+  override protected[core] def readAttachment[T](docId: DocId, name: String, sink: Sink[ByteString, Future[T]])(
+    implicit transid: TransactionId): Future[T] = {
+    require(name != null, "name undefined")
+    val start =
+      transid.started(
+        this,
+        DATABASE_ATT_GET,
+        s"[ATT_GET] '$prefix' finding attachment '$name' of document 'id: $docId'")
+    val blobClient = getBlobClient(docId, name)
+    val f = blobClient.exists().toFuture.toScala.flatMap { exists =>
+      if (exists) {
+        val bbFlux = blobClient.download()
+        val rf = Source.fromPublisher(bbFlux).map(ByteString(_)).runWith(sink)
+        rf.andThen {
+          case Success(_) =>
+            transid
+              .finished(
+                this,
+                start,
+                s"[ATT_GET] '$prefix' completed: found attachment '$name' of document 'id: $docId'")
+        }
+      } else {
+        transid
+          .finished(
+            this,
+            start,
+            s"[ATT_GET] '$prefix', retrieving attachment '$name' of document 'id: $docId'; not found.",
+            logLevel = Logging.ErrorLevel)
+        Future.failed(NoDocumentException("Not found on 'readAttachment'."))
+      }
+    }
+
+    reportFailure(
+      f,
+      start,
+      failure =>
+        s"[ATT_GET] '$prefix' internal error, name: '$name', doc: 'id: $docId', failure: '${failure.getMessage}'")
+  }
+
+  override protected[core] def deleteAttachments(docId: DocId)(implicit transid: TransactionId): Future[Boolean] = {
+    val start =
+      transid.started(
+        this,
+        DATABASE_ATTS_DELETE,
+        s"[ATTS_DELETE] deleting attachments of document 'id: $docId' with prefix ${objectKeyPrefix(docId)}")
+
+    var count = 0
+    val f = Source
+      .fromPublisher(client.listBlobsByHierarchy(objectKeyPrefix(docId)))
+      .mapAsync(1) { b =>
+        count += 1
+        val startDelete =
+          transid.started(
+            this,
+            DATABASE_ATT_DELETE,
+            s"[ATT_DELETE] deleting attachment '${b.getName}' of document 'id: $docId'")
+        client
+          .getBlobAsyncClient(b.getName)
+          .delete()
+          .toFuture
+          .toScala
+          .map(
+            _ =>
+              transid.finished(
+                this,
+                startDelete,
+                s"[ATT_DELETE] completed: deleting attachment '${b.getName}' of document 'id: $docId'"))
+          .recover {
+            case t =>
+              transid.failed(
+                this,
+                startDelete,
+                s"[ATT_DELETE] failed: deleting attachment '${b.getName}' of document 'id: $docId' error: $t")
+          }
+
+      }
+      .recover {
+        case t =>
+          logging.error(this, s"[ATT_DELETE] :error in delete ${t}")
+          throw t
+      }
+      .runWith(Sink.seq)
+      .map(_ => true)
+
+    f.foreach(
+      _ =>
+        transid.finished(
+          this,
+          start,
+          s"[ATTS_DELETE] completed: deleting ${count} attachments of document 'id: $docId'",
+          InfoLevel))
+
+    reportFailure(
+      f,
+      start,
+      failure => s"[ATTS_DELETE] '$prefix' internal error, doc: '$docId', failure: '${failure.getMessage}'")
+  }
+
+  override protected[core] def deleteAttachment(docId: DocId, name: String)(
+    implicit transid: TransactionId): Future[Boolean] = {
+    val start =
+      transid.started(this, DATABASE_ATT_DELETE, s"[ATT_DELETE] deleting attachment '$name' of document 'id: $docId'")
+
+    val f = getBlobClient(docId, name).delete().toFuture.toScala.map(_ => true)
+
+    f.foreach(_ =>
+      transid.finished(this, start, s"[ATT_DELETE] completed: deleting attachment '$name' of document 'id: $docId'"))
+
+    reportFailure(
+      f,
+      start,
+      failure => s"[ATT_DELETE] '$prefix' internal error, doc: '$docId', failure: '${failure.getMessage}'")
+  }
+
+  override def shutdown(): Unit = {}
+
+  private def objectKey(id: DocId, name: String): String = s"$prefix/${id.id}/$name"
+
+  private def objectKeyPrefix(id: DocId): String =
+    s"$prefix/${id.id}/" //must end with a slash so that ".../<package>/<action>other" does not match for "<package>/<action>"
+
+  private def getBlobClient(docId: DocId, name: String) =
+    client.getBlobAsyncClient(objectKey(docId, name)).getBlockBlobAsyncClient
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlob.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlob.scala
new file mode 100644
index 0000000..87018b8
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlob.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.azblob
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import com.typesafe.config.ConfigFactory
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.database.{AttachmentStore, DocumentSerializer}
+import org.scalatest.FlatSpec
+
+import scala.reflect.ClassTag
+
+trait AzureBlob extends FlatSpec {
+  def makeAzureStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
+                                                          logging: Logging,
+                                                          materializer: ActorMaterializer): AttachmentStore = {
+    val config = ConfigFactory.parseString(s"""
+        |whisk {
+        |  azure-blob {
+        |    endpoint = "$endpoint"
+        |    account-name = "$accountName"
+        |    container-name = "$containerName"
+        |    account-key = "$accountKey"
+        |    prefix = $prefix
+        |  }
+        |}""".stripMargin).withFallback(ConfigFactory.load()).resolve()
+    AzureBlobAttachmentStoreProvider.makeStore[D](config)
+  }
+
+  override protected def withFixture(test: NoArgTest) = {
+    assume(
+      accountKey != null,
+      "'AZ_ACCOUNT_KEY' env not configured. Configure following " +
+        "env variables for test to run. 'AZ_ENDPOINT', 'AZ_ACCOUNT_NAME', 'AZ_CONTAINER_NAME'")
+    super.withFixture(test)
+  }
+
+  val endpoint = System.getenv("AZ_ENDPOINT")
+  val accountName = System.getenv("AZ_ACCOUNT_NAME")
+  val containerName = sys.env.getOrElse("AZ_CONTAINER_NAME", "test-ow-travis")
+  val accountKey = System.getenv("AZ_ACCOUNT_KEY")
+
+  def prefix: String
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStoreBehaviorBase.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStoreBehaviorBase.scala
new file mode 100644
index 0000000..825a445
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStoreBehaviorBase.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.azblob
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.database.{AttachmentStore, DocumentSerializer}
+import org.apache.openwhisk.core.database.memory.{MemoryArtifactStoreBehaviorBase, MemoryArtifactStoreProvider}
+import org.apache.openwhisk.core.database.test.AttachmentStoreBehaviors
+import org.apache.openwhisk.core.database.test.behavior.ArtifactStoreAttachmentBehaviors
+import org.apache.openwhisk.core.entity.WhiskEntity
+import org.scalatest.FlatSpec
+
+import scala.reflect.ClassTag
+import scala.util.Random
+
+trait AzureBlobAttachmentStoreBehaviorBase
+    extends FlatSpec
+    with MemoryArtifactStoreBehaviorBase
+    with ArtifactStoreAttachmentBehaviors
+    with AttachmentStoreBehaviors {
+  override lazy val store = makeAzureStore[WhiskEntity]
+
+  override implicit val materializer: ActorMaterializer = ActorMaterializer()
+
+  override val prefix = s"attachmentTCK_${Random.alphanumeric.take(4).mkString}"
+
+  override protected def beforeAll(): Unit = {
+    MemoryArtifactStoreProvider.purgeAll()
+    super.beforeAll()
+  }
+
+  override def getAttachmentStore[D <: DocumentSerializer: ClassTag](): AttachmentStore =
+    makeAzureStore[D]()
+
+  def makeAzureStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
+                                                          logging: Logging,
+                                                          materializer: ActorMaterializer): AttachmentStore
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStoreITTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStoreITTests.scala
new file mode 100644
index 0000000..2986eea
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStoreITTests.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.azblob
+
+import org.apache.openwhisk.core.entity.WhiskEntity
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+
+@RunWith(classOf[JUnitRunner])
+class AzureBlobAttachmentStoreITTests extends AzureBlobAttachmentStoreBehaviorBase with AzureBlob {
+  override lazy val store = makeAzureStore[WhiskEntity]
+
+  override def storeType: String = "Azure"
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlobConfigTest.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlobConfigTest.scala
new file mode 100644
index 0000000..ed7ae8c
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/azblob/AzureBlobConfigTest.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.azblob
+
+import com.azure.storage.common.policy.RetryPolicyType
+import org.apache.openwhisk.core.ConfigKeys
+import org.junit.runner.RunWith
+import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.junit.JUnitRunner
+import pureconfig._
+import pureconfig.generic.auto._
+
+@RunWith(classOf[JUnitRunner])
+class AzureBlobConfigTest extends FlatSpec with Matchers {
+
+  behavior of "AzureBlobConfig"
+  it should "use valid defaults for retry option" in {
+    val config: AzBlobRetryConfig = loadConfigOrThrow[AzBlobRetryConfig](ConfigKeys.azBlob + ".retry-config")
+    //make sure retry type is set to FIXED
+    config.retryPolicyType shouldBe RetryPolicyType.FIXED
+    //make sure optional secondaryHost is not set
+    config.secondaryHost shouldBe None
+
+  }
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentStoreBehaviors.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentStoreBehaviors.scala
index 46f08ba..d560928 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentStoreBehaviors.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentStoreBehaviors.scala
@@ -127,6 +127,8 @@ trait AttachmentStoreBehaviors
     //Make sure doc2 attachments are left untouched
     if (!lazyDeletes) attachmentBytes(docId2, "c21").futureValue.result() shouldBe ByteString(b1)
     if (!lazyDeletes) attachmentBytes(docId2, "c22").futureValue.result() shouldBe ByteString(b2)
+
+    attachmentsToDelete += docId2.asString
   }
 
   it should "throw NoDocumentException on reading non existing attachment" in {