You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2019/09/26 16:37:10 UTC

[nifi] branch master updated: NIFI-6550: Create controller service for Azure Storage Credentials

This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new a4c3ca5  NIFI-6550: Create controller service for Azure Storage Credentials
a4c3ca5 is described below

commit a4c3ca50dd819e7cffb563c46675bc7ab9d6cd5c
Author: Peter Turcsanyi <tu...@cloudera.com>
AuthorDate: Fri Sep 13 22:10:09 2019 +0200

    NIFI-6550: Create controller service for Azure Storage Credentials
    
    Added AzureStorageCredentialsService controller service interface
    with 2 implementations:
     - AzureStorageCredentialsControllerService: holds the credential properties
       (account name, account key, sas token)
     - AzureStorageCredentialsControllerServiceLookup: can be used to dynamically
       look up another AzureStorageCredentialsService (similar to
       DBCPConnectionPoolLookup)
    
    The controller service can be used by the Azure Storage processors:
    {List|Fetch|Put|Delete}AzureBlobStorage, {Get|Put}AzureQueueStorage
    
    NIFI-6550: Review changes.
    
    NIFI-6550: Review changes #2.
    
    This closes #3742.
    
    Signed-off-by: Koji Kawamura <ij...@apache.org>
---
 nifi-assembly/pom.xml                              |   6 +
 .../java/org/apache/nifi/util/NoOpProcessor.java   |  29 ++
 .../nifi-azure-bundle/nifi-azure-nar/pom.xml       |   2 +-
 .../nifi-azure-processors/pom.xml                  |  18 +-
 .../azure/AbstractAzureBlobProcessor.java          |   3 +-
 .../azure/eventhub/ConsumeAzureEventHub.java       |   5 +-
 .../azure/storage/ListAzureBlobStorage.java        |   4 +-
 .../storage/queue/AbstractAzureQueueStorage.java   | 190 ++++++------
 .../azure/storage/queue/GetAzureQueueStorage.java  |   4 +-
 .../azure/storage/queue/PutAzureQueueStorage.java  | 318 ++++++++++-----------
 .../azure/storage/utils/AzureStorageUtils.java     | 127 ++++----
 .../AzureStorageCredentialsControllerService.java  | 103 +++++++
 ...eStorageCredentialsControllerServiceLookup.java | 141 +++++++++
 .../org.apache.nifi.controller.ControllerService   |  16 ++
 .../azure/AbstractAzureBlobStorageIT.java          |  53 ----
 .../azure/storage/AbstractAzureBlobStorageIT.java  |  58 ++++
 .../azure/storage/AbstractAzureStorageIT.java      | 101 +++++++
 .../processors/azure/storage/AzureTestUtil.java    | 105 -------
 .../azure/storage/ITDeleteAzureBlobStorage.java    | 127 ++++----
 .../azure/storage/ITFetchAzureBlobStorage.java     |  81 +++---
 .../azure/storage/ITListAzureBlobStorage.java      |  71 +++--
 .../azure/storage/ITPutAzureBlobStorage.java       |  72 +++++
 .../azure/storage/ITPutAzureStorageBlob.java       |  63 ----
 .../storage/queue/AbstractAzureQueueStorageIT.java |  61 ++++
 .../storage/queue/GetAzureQueueStorageIT.java      | 138 ++++-----
 .../storage/queue/PutAzureQueueStorageIT.java      | 217 +++++++-------
 ...reStorageUtilsGetStorageCredentialsDetails.java | 164 +++++++++++
 ...reStorageUtilsValidateCredentialProperties.java | 159 +++++++++++
 ...stAzureStorageCredentialsControllerService.java |  93 ++++++
 ...eStorageCredentialsControllerServiceLookup.java | 139 +++++++++
 .../src/test/resources/hello.txt                   |   1 -
 .../pom.xml                                        |  22 +-
 .../pom.xml                                        |  35 ++-
 .../storage/AzureStorageCredentialsDetails.java    |  39 +++
 .../storage/AzureStorageCredentialsService.java    |  35 +++
 nifi-nar-bundles/nifi-azure-bundle/pom.xml         |  21 ++
 36 files changed, 1899 insertions(+), 922 deletions(-)

diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index f2c5021..8ce38b5 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -429,6 +429,12 @@ language governing permissions and limitations under the License. -->
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-azure-services-api-nar</artifactId>
+            <version>1.10.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-scripting-nar</artifactId>
             <version>1.10.0-SNAPSHOT</version>
             <type>nar</type>
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/NoOpProcessor.java b/nifi-mock/src/main/java/org/apache/nifi/util/NoOpProcessor.java
new file mode 100644
index 0000000..eff3bff
--- /dev/null
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/NoOpProcessor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+public class NoOpProcessor extends AbstractProcessor {
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+    }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
index 09863c5..b61388a 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
@@ -38,7 +38,7 @@
         
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <artifactId>nifi-azure-services-api-nar</artifactId>
             <version>1.10.0-SNAPSHOT</version>
             <type>nar</type>
         </dependency>  
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index f1f7d03..6df6b5b 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -45,6 +45,11 @@
             <artifactId>nifi-proxy-configuration-api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-azure-services-api</artifactId>
+            <version>1.10.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
             <groupId>com.microsoft.azure</groupId>
             <artifactId>azure-eventhubs</artifactId>
             <version>${azure-eventhubs.version}</version>
@@ -57,13 +62,12 @@
         <dependency>
             <groupId>com.microsoft.azure</groupId>
             <artifactId>azure-storage</artifactId>
-            <version>5.2.0</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>jackson-core</artifactId>
-                </exclusion>
-            </exclusions>
+        </dependency>
+        <!-- overriding jackson-core in azure-storage -->
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>${jackson.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
index 2156b56..afee09f 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
@@ -56,9 +56,10 @@ public abstract class AbstractAzureBlobProcessor extends AbstractProcessor {
     private static final List<PropertyDescriptor> PROPERTIES = Collections
             .unmodifiableList(Arrays.asList(
                     AzureStorageUtils.CONTAINER,
-                    AzureStorageUtils.PROP_SAS_TOKEN,
+                    AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE,
                     AzureStorageUtils.ACCOUNT_NAME,
                     AzureStorageUtils.ACCOUNT_KEY,
+                    AzureStorageUtils.PROP_SAS_TOKEN,
                     BLOB,
                     AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
 
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
index e9bafb0..203dd5b 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
@@ -48,7 +48,6 @@ import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
@@ -91,6 +90,8 @@ import static org.apache.nifi.util.StringUtils.isEmpty;
 })
 public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
 
+    private static final String FORMAT_STORAGE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
+
     static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder()
             .name("event-hub-namespace")
             .displayName("Event Hub Namespace")
@@ -626,7 +627,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
                 .evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
         options.setReceiveTimeOut(Duration.ofMillis(receiveTimeoutMillis));
 
-        final String storageConnectionString = String.format(AzureStorageUtils.FORMAT_BLOB_CONNECTION_STRING, storageAccountName, storageAccountKey);
+        final String storageConnectionString = String.format(FORMAT_STORAGE_CONNECTION_STRING, storageAccountName, storageAccountKey);
 
         final ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder(namespaceName, eventHubName, sasName, sasKey);
 
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
index bf82029..0d950a6 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
@@ -96,9 +96,10 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
     private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
             LISTING_STRATEGY,
             AzureStorageUtils.CONTAINER,
-            AzureStorageUtils.PROP_SAS_TOKEN,
+            AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE,
             AzureStorageUtils.ACCOUNT_NAME,
             AzureStorageUtils.ACCOUNT_KEY,
+            AzureStorageUtils.PROP_SAS_TOKEN,
             PROP_PREFIX,
             AzureStorageUtils.PROXY_CONFIGURATION_SERVICE,
             ListedEntityTracker.TRACKING_STATE_CACHE,
@@ -113,6 +114,7 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
 
     @Override
     protected void customValidate(ValidationContext validationContext, Collection<ValidationResult> results) {
+        results.addAll(AzureStorageUtils.validateCredentialProperties(validationContext));
         AzureStorageUtils.validateProxySpec(validationContext, results);
     }
 
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java
index caab936..3266b56 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java
@@ -1,110 +1,80 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage.queue;
-
-import com.microsoft.azure.storage.CloudStorageAccount;
-import com.microsoft.azure.storage.StorageCredentials;
-import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
-import com.microsoft.azure.storage.queue.CloudQueueClient;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.security.InvalidKeyException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
-
-    public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
-            .name("storage-queue-name")
-            .displayName("Queue Name")
-            .description("Name of the Azure Storage Queue")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .build();
-
-    public static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("All successfully processed FlowFiles are routed to this relationship")
-            .build();
-
-    public static final Relationship REL_FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("Unsuccessful operations will be transferred to the failure relationship.")
-            .build();
-
-    private static final String FORMAT_QUEUE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
-    private static final String FORMAT_QUEUE_BASE_URI = "https://%s.queue.core.windows.net";
-
-    private static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        return relationships;
-    }
-
-    protected final CloudQueueClient createCloudQueueClient(final ProcessContext context, final FlowFile flowFile) {
-        final String storageAccountName;
-        final String storageAccountKey;
-        final String sasToken;
-        final String connectionString;
-
-        if (flowFile == null) {
-            storageAccountName = context.getProperty(AzureStorageUtils.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
-            storageAccountKey = context.getProperty(AzureStorageUtils.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
-            sasToken = context.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).evaluateAttributeExpressions().getValue();
-        } else {
-            storageAccountName = context.getProperty(AzureStorageUtils.ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
-            storageAccountKey = context.getProperty(AzureStorageUtils.ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
-            sasToken = context.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).evaluateAttributeExpressions(flowFile).getValue();
-        }
-
-        CloudQueueClient cloudQueueClient;
-        try {
-            if (StringUtils.isNoneBlank(sasToken)) {
-                connectionString = String.format(FORMAT_QUEUE_BASE_URI, storageAccountName);
-                StorageCredentials storageCredentials = new StorageCredentialsSharedAccessSignature(sasToken);
-                cloudQueueClient = new CloudQueueClient(new URI(connectionString), storageCredentials);
-            } else {
-                connectionString = String.format(FORMAT_QUEUE_CONNECTION_STRING, storageAccountName, storageAccountKey);
-                CloudStorageAccount storageAccount = CloudStorageAccount.parse(connectionString);
-                cloudQueueClient = storageAccount.createCloudQueueClient();
-            }
-        } catch (IllegalArgumentException | URISyntaxException e) {
-            getLogger().error("Invalid connection string URI for '{}'", new Object[]{context.getName()}, e);
-            throw new IllegalArgumentException(e);
-        } catch (InvalidKeyException e) {
-            getLogger().error("Invalid connection credentials for '{}'", new Object[]{context.getName()}, e);
-            throw new IllegalArgumentException(e);
-        }
-        return cloudQueueClient;
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.queue.CloudQueueClient;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails;
+
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
+
+    public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
+            .name("storage-queue-name")
+            .displayName("Queue Name")
+            .description("Name of the Azure Storage Queue")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All successfully processed FlowFiles are routed to this relationship")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Unsuccessful operations will be transferred to the failure relationship.")
+            .build();
+
+    private static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    protected final CloudQueueClient createCloudQueueClient(final ProcessContext context, final FlowFile flowFile) throws URISyntaxException {
+        final AzureStorageCredentialsDetails storageCredentialsDetails = AzureStorageUtils.getStorageCredentialsDetails(context, flowFile);
+        final CloudStorageAccount cloudStorageAccount = new CloudStorageAccount(storageCredentialsDetails.getStorageCredentials(), true, null, storageCredentialsDetails.getStorageAccountName());
+        final CloudQueueClient cloudQueueClient = cloudStorageAccount.createCloudQueueClient();
+
+        return cloudQueueClient;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        return AzureStorageUtils.validateCredentialProperties(validationContext);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java
index c3a2877..88a27f2 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java
@@ -94,8 +94,8 @@ public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
             .build();
 
     private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
-            AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
-            BATCH_SIZE, VISIBILITY_TIMEOUT, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
+            AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN,
+            QUEUE, AUTO_DELETE, BATCH_SIZE, VISIBILITY_TIMEOUT, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
 
     @Override
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java
index 4172c89..480c41d 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java
@@ -1,159 +1,159 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage.queue;
-
-import com.microsoft.azure.storage.OperationContext;
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.queue.CloudQueue;
-import com.microsoft.azure.storage.queue.CloudQueueClient;
-import com.microsoft.azure.storage.queue.CloudQueueMessage;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
-
-import java.io.ByteArrayOutputStream;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-@SeeAlso({GetAzureQueueStorage.class})
-@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-@Tags({ "azure", "microsoft", "cloud", "storage", "queue", "enqueue" })
-@CapabilityDescription("Writes the content of the incoming FlowFiles to the configured Azure Queue Storage.")
-public class PutAzureQueueStorage extends AbstractAzureQueueStorage {
-
-    public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
-            .name("time-to-live")
-            .displayName("TTL")
-            .description("Maximum time to allow the message to be in the queue. If left empty, the default value of 7 days will be used.")
-            .required(false)
-            .defaultValue("7 days")
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor VISIBILITY_DELAY = new PropertyDescriptor.Builder()
-            .name("visibility-delay")
-            .displayName("Visibility Delay")
-            .description("The length of time during which the message will be invisible, starting when it is added to the queue. " +
-                         "This value must be greater than or equal to 0 and less than the TTL value.")
-            .required(false)
-            .defaultValue("0 secs")
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .build();
-
-    private static final List<PropertyDescriptor> properties =  Collections.unmodifiableList(Arrays.asList(
-            AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, TTL,
-            QUEUE, VISIBILITY_DELAY, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
-
-    @Override
-    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return properties;
-    }
-
-    @Override
-    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
-        FlowFile flowFile = session.get();
-        if (flowFile == null) {
-            return;
-        }
-
-        final long startNanos = System.nanoTime();
-
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        session.exportTo(flowFile, baos);
-        final String flowFileContent = baos.toString();
-
-        CloudQueueMessage message = new CloudQueueMessage(flowFileContent);
-        CloudQueueClient cloudQueueClient;
-        CloudQueue cloudQueue;
-
-        final int ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS).intValue();
-        final int delay = context.getProperty(VISIBILITY_DELAY).asTimePeriod(TimeUnit.SECONDS).intValue();
-        final String queue = context.getProperty(QUEUE).evaluateAttributeExpressions(flowFile).getValue().toLowerCase();
-
-        try {
-            cloudQueueClient = createCloudQueueClient(context, flowFile);
-            cloudQueue = cloudQueueClient.getQueueReference(queue);
-
-            final OperationContext operationContext = new OperationContext();
-            AzureStorageUtils.setProxy(operationContext, context);
-
-            cloudQueue.addMessage(message, ttl, delay, null, operationContext);
-        } catch (URISyntaxException | StorageException e) {
-            getLogger().error("Failed to write the message to Azure Queue Storage due to {}", new Object[]{e});
-            flowFile = session.penalize(flowFile);
-            session.transfer(flowFile, REL_FAILURE);
-            return;
-        }
-
-        session.transfer(flowFile, REL_SUCCESS);
-        final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-        session.getProvenanceReporter().send(flowFile, cloudQueue.getUri().toString(), transmissionMillis);
-    }
-
-    @Override
-    public Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
-
-        final boolean ttlSet = validationContext.getProperty(TTL).isSet();
-        final boolean delaySet = validationContext.getProperty(VISIBILITY_DELAY).isSet();
-
-        final int ttl = validationContext.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS).intValue();
-
-        if (ttlSet) {
-            final int SEVEN_DAYS_TIMEPERIOD_IN_SECS = 604800; // i.e. 7 * 24 * 60 * 60
-
-            if (ttl > SEVEN_DAYS_TIMEPERIOD_IN_SECS) {
-                problems.add(new ValidationResult.Builder()
-                                                 .subject(TTL.getDisplayName())
-                                                 .valid(false)
-                                                 .explanation(TTL.getDisplayName() + " exceeds the allowed limit of 7 days. Set a value less than 7 days")
-                                                 .build());
-            }
-        }
-
-        if (delaySet) {
-            int delay = validationContext.getProperty(VISIBILITY_DELAY).asTimePeriod(TimeUnit.SECONDS).intValue();
-
-            if (delay > ttl || delay < 0) {
-                problems.add(new ValidationResult.Builder()
-                                                 .subject(VISIBILITY_DELAY.getDisplayName())
-                                                 .valid(false)
-                                                 .explanation(VISIBILITY_DELAY.getDisplayName() + " should be greater than or equal to 0 and less than " + TTL.getDisplayName())
-                                                 .build());
-            }
-        }
-
-        AzureStorageUtils.validateProxySpec(validationContext, problems);
-
-        return problems;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.OperationContext;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueue;
+import com.microsoft.azure.storage.queue.CloudQueueClient;
+import com.microsoft.azure.storage.queue.CloudQueueMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@SeeAlso({GetAzureQueueStorage.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({ "azure", "microsoft", "cloud", "storage", "queue", "enqueue" })
+@CapabilityDescription("Writes the content of the incoming FlowFiles to the configured Azure Queue Storage.")
+public class PutAzureQueueStorage extends AbstractAzureQueueStorage {
+
+    public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
+            .name("time-to-live")
+            .displayName("TTL")
+            .description("Maximum time to allow the message to be in the queue. If left empty, the default value of 7 days will be used.")
+            .required(false)
+            .defaultValue("7 days")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor VISIBILITY_DELAY = new PropertyDescriptor.Builder()
+            .name("visibility-delay")
+            .displayName("Visibility Delay")
+            .description("The length of time during which the message will be invisible, starting when it is added to the queue. " +
+                         "This value must be greater than or equal to 0 and less than the TTL value.")
+            .required(false)
+            .defaultValue("0 secs")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    private static final List<PropertyDescriptor> properties =  Collections.unmodifiableList(Arrays.asList(
+            AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN,
+            TTL, QUEUE, VISIBILITY_DELAY, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final long startNanos = System.nanoTime();
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        session.exportTo(flowFile, baos);
+        final String flowFileContent = baos.toString();
+
+        CloudQueueMessage message = new CloudQueueMessage(flowFileContent);
+        CloudQueueClient cloudQueueClient;
+        CloudQueue cloudQueue;
+
+        final int ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS).intValue();
+        final int delay = context.getProperty(VISIBILITY_DELAY).asTimePeriod(TimeUnit.SECONDS).intValue();
+        final String queue = context.getProperty(QUEUE).evaluateAttributeExpressions(flowFile).getValue().toLowerCase();
+
+        try {
+            cloudQueueClient = createCloudQueueClient(context, flowFile);
+            cloudQueue = cloudQueueClient.getQueueReference(queue);
+
+            final OperationContext operationContext = new OperationContext();
+            AzureStorageUtils.setProxy(operationContext, context);
+
+            cloudQueue.addMessage(message, ttl, delay, null, operationContext);
+        } catch (URISyntaxException | StorageException e) {
+            getLogger().error("Failed to write the message to Azure Queue Storage due to {}", new Object[]{e});
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        session.transfer(flowFile, REL_SUCCESS);
+        final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+        session.getProvenanceReporter().send(flowFile, cloudQueue.getUri().toString(), transmissionMillis);
+    }
+
+    @Override
+    public Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+        final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
+
+        final boolean ttlSet = validationContext.getProperty(TTL).isSet();
+        final boolean delaySet = validationContext.getProperty(VISIBILITY_DELAY).isSet();
+
+        final int ttl = validationContext.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS).intValue();
+
+        if (ttlSet) {
+            final int SEVEN_DAYS_TIMEPERIOD_IN_SECS = 604800; // i.e. 7 * 24 * 60 * 60
+
+            if (ttl > SEVEN_DAYS_TIMEPERIOD_IN_SECS) {
+                problems.add(new ValidationResult.Builder()
+                                                 .subject(TTL.getDisplayName())
+                                                 .valid(false)
+                                                 .explanation(TTL.getDisplayName() + " exceeds the allowed limit of 7 days. Set a value less than 7 days")
+                                                 .build());
+            }
+        }
+
+        if (delaySet) {
+            int delay = validationContext.getProperty(VISIBILITY_DELAY).asTimePeriod(TimeUnit.SECONDS).intValue();
+
+            if (delay > ttl || delay < 0) {
+                problems.add(new ValidationResult.Builder()
+                                                 .subject(VISIBILITY_DELAY.getDisplayName())
+                                                 .valid(false)
+                                                 .explanation(VISIBILITY_DELAY.getDisplayName() + " should be greater than or equal to 0 and less than " + TTL.getDisplayName())
+                                                 .build());
+            }
+        }
+
+        AzureStorageUtils.validateProxySpec(validationContext, problems);
+
+        return problems;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
index 2821258..0e1dfd2 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
@@ -19,12 +19,14 @@ package org.apache.nifi.processors.azure.storage.utils;
 import com.microsoft.azure.storage.CloudStorageAccount;
 import com.microsoft.azure.storage.OperationContext;
 import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
 import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
 import com.microsoft.azure.storage.blob.CloudBlobClient;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
@@ -32,13 +34,15 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.proxy.ProxyConfiguration;
 import org.apache.nifi.proxy.ProxySpec;
+import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails;
+import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService;
 
-import java.net.URI;
 import java.net.URISyntaxException;
-import java.security.InvalidKeyException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 public final class AzureStorageUtils {
     public static final String BLOCK = "Block";
@@ -60,17 +64,24 @@ public final class AzureStorageUtils {
             .sensitive(true)
             .build();
 
+    public static final String ACCOUNT_NAME_BASE_DESCRIPTION =
+            "The storage account name.  There are certain risks in allowing the account name to be stored as a flowfile " +
+            "attribute. While it does provide for a more flexible flow by allowing the account name to " +
+            "be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " +
+            "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
+            "In addition, the provenance repositories may be put on encrypted disk partitions.";
+
     public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
             .name("storage-account-name")
             .displayName("Storage Account Name")
-            .description("The storage account name.  There are certain risks in allowing the account name to be stored as a flowfile " +
-                    "attribute. While it does provide for a more flexible flow by allowing the account name to " +
-                    "be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " +
-                    "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
-                    "In addition, the provenance repositories may be put on encrypted disk partitions.")
+            .description(ACCOUNT_NAME_BASE_DESCRIPTION +
+                    " Instead of defining the Storage Account Name, Storage Account Key and SAS Token properties directly on the processor, " +
+                    "the preferred way is to configure them through a controller service specified in the Storage Credentials property. " +
+                    "The controller service can provide a common/shared configuration for multiple/all Azure processors. Furthermore, the credentials " +
+                    "can also be looked up dynamically with the 'Lookup' version of the service.")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .required(true)
+            .required(false)
             .sensitive(true)
             .build();
 
@@ -98,9 +109,16 @@ public final class AzureStorageUtils {
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
-    // use HTTPS by default as per MSFT recommendation
-    public static final String FORMAT_BLOB_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
-    public static final String FORMAT_BASE_URI = "https://%s.blob.core.windows.net";
+    public static final PropertyDescriptor STORAGE_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
+            .name("storage-credentials-service")
+            .displayName("Storage Credentials")
+            .description("The Controller Service used to obtain Azure Storage Credentials. Instead of the processor level properties, " +
+                    "the credentials can be configured here through a common/shared controller service, which is the preferred way. " +
+                    "The 'Lookup' version of the service can also be used to select the credentials dynamically at runtime " +
+                    "based on a FlowFile attribute (if the processor has FlowFile input).")
+            .identifiesControllerService(AzureStorageCredentialsService.class)
+            .required(false)
+            .build();
 
     private AzureStorageUtils() {
         // do not instantiate
@@ -111,56 +129,65 @@ public final class AzureStorageUtils {
      * @param flowFile An incoming FlowFile can be used for NiFi Expression Language evaluation to derive
      *                 Account Name, Account Key or SAS Token. This can be null if not available.
      */
-    public static CloudBlobClient createCloudBlobClient(ProcessContext context, ComponentLog logger, FlowFile flowFile) {
-        final String accountName;
-        final String accountKey;
-        final String sasToken;
-
-        if (flowFile == null) {
-            accountName = context.getProperty(AzureStorageUtils.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
-            accountKey = context.getProperty(AzureStorageUtils.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
-            sasToken = context.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).evaluateAttributeExpressions().getValue();
+    public static CloudBlobClient createCloudBlobClient(ProcessContext context, ComponentLog logger, FlowFile flowFile) throws URISyntaxException {
+        final AzureStorageCredentialsDetails storageCredentialsDetails = getStorageCredentialsDetails(context, flowFile);
+        final CloudStorageAccount cloudStorageAccount = new CloudStorageAccount(storageCredentialsDetails.getStorageCredentials(), true, null, storageCredentialsDetails.getStorageAccountName());
+        final CloudBlobClient cloudBlobClient = cloudStorageAccount.createCloudBlobClient();
+
+        return cloudBlobClient;
+    }
+
+    public static AzureStorageCredentialsDetails getStorageCredentialsDetails(PropertyContext context, FlowFile flowFile) {
+        final Map<String, String> attributes = flowFile != null ? flowFile.getAttributes() : Collections.emptyMap();
+
+        final AzureStorageCredentialsService storageCredentialsService = context.getProperty(STORAGE_CREDENTIALS_SERVICE).asControllerService(AzureStorageCredentialsService.class);
+
+        if (storageCredentialsService != null) {
+            return storageCredentialsService.getStorageCredentialsDetails(attributes);
         } else {
-            accountName = context.getProperty(AzureStorageUtils.ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
-            accountKey = context.getProperty(AzureStorageUtils.ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
-            sasToken = context.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).evaluateAttributeExpressions(flowFile).getValue();
+            return createStorageCredentialsDetails(context, attributes);
         }
+    }
+
+    public static AzureStorageCredentialsDetails createStorageCredentialsDetails(PropertyContext context, Map<String, String> attributes) {
+        final String accountName = context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(attributes).getValue();
+        final String accountKey = context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(attributes).getValue();
+        final String sasToken = context.getProperty(PROP_SAS_TOKEN).evaluateAttributeExpressions(attributes).getValue();
 
-        CloudBlobClient cloudBlobClient;
-
-        try {
-            // sas token and acct name/key have different ways of creating a secure connection (e.g. new StorageCredentialsAccountAndKey didn't work)
-            if (StringUtils.isNotBlank(sasToken)) {
-                String storageConnectionString = String.format(AzureStorageUtils.FORMAT_BASE_URI, accountName);
-                StorageCredentials creds = new StorageCredentialsSharedAccessSignature(sasToken);
-                cloudBlobClient = new CloudBlobClient(new URI(storageConnectionString), creds);
-            } else {
-                String blobConnString = String.format(AzureStorageUtils.FORMAT_BLOB_CONNECTION_STRING, accountName, accountKey);
-                CloudStorageAccount storageAccount = CloudStorageAccount.parse(blobConnString);
-                cloudBlobClient = storageAccount.createCloudBlobClient();
-            }
-        } catch (IllegalArgumentException | URISyntaxException e) {
-            logger.error("Invalid connection string URI for '{}'", new Object[]{context.getName()}, e);
-            throw new IllegalArgumentException(e);
-        } catch (InvalidKeyException e) {
-            logger.error("Invalid connection credentials for '{}'", new Object[]{context.getName()}, e);
-            throw new IllegalArgumentException(e);
+        if (StringUtils.isBlank(accountName)) {
+            throw new IllegalArgumentException(String.format("'%s' must not be empty.", ACCOUNT_NAME.getDisplayName()));
         }
 
-        return cloudBlobClient;
+        StorageCredentials storageCredentials;
+
+        if (StringUtils.isNotBlank(accountKey)) {
+            storageCredentials = new StorageCredentialsAccountAndKey(accountName, accountKey);
+        } else if (StringUtils.isNotBlank(sasToken)) {
+            storageCredentials = new StorageCredentialsSharedAccessSignature(sasToken);
+        } else {
+            throw new IllegalArgumentException(String.format("Either '%s' or '%s' must be defined.", ACCOUNT_KEY.getDisplayName(), PROP_SAS_TOKEN.getDisplayName()));
+        }
+
+        return new AzureStorageCredentialsDetails(accountName, storageCredentials);
     }
 
     public static Collection<ValidationResult> validateCredentialProperties(ValidationContext validationContext) {
         final List<ValidationResult> results = new ArrayList<>();
 
-        String sasToken = validationContext.getProperty(PROP_SAS_TOKEN).getValue();
-        String acctName = validationContext.getProperty(ACCOUNT_KEY).getValue();
-        if ((StringUtils.isBlank(sasToken) && StringUtils.isBlank(acctName))
-                || (StringUtils.isNotBlank(sasToken) && StringUtils.isNotBlank(acctName))) {
+        final String storageCredentials = validationContext.getProperty(STORAGE_CREDENTIALS_SERVICE).getValue();
+        final String accountName = validationContext.getProperty(ACCOUNT_NAME).getValue();
+        final String accountKey = validationContext.getProperty(ACCOUNT_KEY).getValue();
+        final String sasToken = validationContext.getProperty(PROP_SAS_TOKEN).getValue();
+
+        if (!((StringUtils.isNotBlank(storageCredentials) && StringUtils.isBlank(accountName) && StringUtils.isBlank(accountKey) && StringUtils.isBlank(sasToken))
+                || (StringUtils.isBlank(storageCredentials) && StringUtils.isNotBlank(accountName) && StringUtils.isNotBlank(accountKey) && StringUtils.isBlank(sasToken))
+                || (StringUtils.isBlank(storageCredentials) && StringUtils.isNotBlank(accountName) && StringUtils.isBlank(accountKey) && StringUtils.isNotBlank(sasToken)))) {
             results.add(new ValidationResult.Builder().subject("AzureStorageUtils Credentials")
-                        .valid(false)
-                        .explanation("either Azure Account Key or Shared Access Signature required, but not both")
-                        .build());
+                    .valid(false)
+                    .explanation("either " + STORAGE_CREDENTIALS_SERVICE.getDisplayName()
+                            + ", or " + ACCOUNT_NAME.getDisplayName() + " with " + ACCOUNT_KEY.getDisplayName()
+                            + " or " + ACCOUNT_NAME.getDisplayName() + " with " + PROP_SAS_TOKEN.getDisplayName() + " must be specified")
+                    .build());
         }
 
         return results;
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService.java
new file mode 100644
index 0000000..96d6476
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.storage;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation of AbstractControllerService interface
+ *
+ * @see AbstractControllerService
+ */
+@Tags({ "azure", "microsoft", "cloud", "storage", "blob", "queue", "credentials" })
+@CapabilityDescription("Defines credentials for Azure Storage processors. " +
+        "Uses Account Name with Account Key or Account Name with SAS Token.")
+public class AzureStorageCredentialsControllerService extends AbstractControllerService implements AzureStorageCredentialsService {
+
+    public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
+            .name(AzureStorageUtils.ACCOUNT_NAME.getName())
+            .displayName(AzureStorageUtils.ACCOUNT_NAME.getDisplayName())
+            .description(AzureStorageUtils.ACCOUNT_NAME_BASE_DESCRIPTION)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections
+            .unmodifiableList(Arrays.asList(
+                    ACCOUNT_NAME,
+                    AzureStorageUtils.ACCOUNT_KEY,
+                    AzureStorageUtils.PROP_SAS_TOKEN));
+
+    private ConfigurationContext context;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>();
+
+        final String accountKey = validationContext.getProperty(AzureStorageUtils.ACCOUNT_KEY).getValue();
+        final String sasToken = validationContext.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).getValue();
+
+        if (StringUtils.isBlank(accountKey) && StringUtils.isBlank(sasToken)) {
+            results.add(new ValidationResult.Builder().subject("AzureStorageCredentialsControllerService")
+                    .valid(false)
+                    .explanation("either " + AzureStorageUtils.ACCOUNT_KEY.getDisplayName() + " or " + AzureStorageUtils.PROP_SAS_TOKEN.getDisplayName() + " is required")
+                    .build());
+        } else if (StringUtils.isNotBlank(accountKey) && StringUtils.isNotBlank(sasToken)) {
+            results.add(new ValidationResult.Builder().subject("AzureStorageCredentialsControllerService")
+                        .valid(false)
+                        .explanation("cannot set both " + AzureStorageUtils.ACCOUNT_KEY.getDisplayName() + " and " + AzureStorageUtils.PROP_SAS_TOKEN.getDisplayName())
+                        .build());
+        }
+
+        return results;
+    }
+
+    @OnEnabled
+    public void onEnabled(ConfigurationContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public AzureStorageCredentialsDetails getStorageCredentialsDetails(Map<String, String> attributes) {
+        return AzureStorageUtils.createStorageCredentialsDetails(context, attributes);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup.java
new file mode 100644
index 0000000..4899715
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.storage;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Tags({ "azure", "microsoft", "cloud", "storage", "blob", "queue", "credentials" })
+@CapabilityDescription("Provides an AzureStorageCredentialsService that can be used to dynamically select another AzureStorageCredentialsService. " +
+        "This service requires an attribute named 'azure.storage.credentials.name' to be passed in, and will throw an exception if the attribute is missing. " +
+        "The value of 'azure.storage.credentials.name' will be used to select the AzureStorageCredentialsService that has been registered with that name. " +
+        "This will allow multiple AzureStorageCredentialsServices to be defined and registered, and then selected dynamically at runtime by tagging flow files " +
+        "with the appropriate 'azure.storage.credentials.name' attribute.")
+@DynamicProperty(name = "The name to register AzureStorageCredentialsService", value = "The AzureStorageCredentialsService",
+        description = "If 'azure.storage.credentials.name' attribute contains the name of the dynamic property, then the AzureStorageCredentialsService (registered in the value) will be selected.",
+        expressionLanguageScope = ExpressionLanguageScope.NONE)
+public class AzureStorageCredentialsControllerServiceLookup extends AbstractControllerService implements AzureStorageCredentialsService {
+
+    public static final String AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE = "azure.storage.credentials.name";
+
+    private volatile Map<String, AzureStorageCredentialsService> serviceMap;
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .description("The " + AzureStorageCredentialsService.class.getSimpleName() + " to return when " + AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE + " = '" + propertyDescriptorName + "'")
+                .identifiesControllerService(AzureStorageCredentialsService.class)
+                .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+                .build();
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext context) {
+        final List<ValidationResult> results = new ArrayList<>();
+
+        int numDefinedServices = 0;
+        for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
+            if (descriptor.isDynamic()) {
+                numDefinedServices++;
+            }
+
+            final String referencedId = context.getProperty(descriptor).getValue();
+            if (this.getIdentifier().equals(referencedId)) {
+                results.add(new ValidationResult.Builder()
+                        .subject(descriptor.getDisplayName())
+                        .explanation("the current service cannot be registered as an " + AzureStorageCredentialsService.class.getSimpleName() + " to lookup")
+                        .valid(false)
+                        .build());
+            }
+        }
+
+        if (numDefinedServices == 0) {
+            results.add(new ValidationResult.Builder()
+                    .subject(this.getClass().getSimpleName())
+                    .explanation("at least one " + AzureStorageCredentialsService.class.getSimpleName() + " must be defined via dynamic properties")
+                    .valid(false)
+                    .build());
+        }
+
+        return results;
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        final Map<String, AzureStorageCredentialsService> map = new HashMap<>();
+
+        for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
+            if (descriptor.isDynamic()) {
+                final AzureStorageCredentialsService service = context.getProperty(descriptor).asControllerService(AzureStorageCredentialsService.class);
+                map.put(descriptor.getName(), service);
+            }
+        }
+
+        serviceMap = Collections.unmodifiableMap(map);
+    }
+
+    @OnDisabled
+    public void onDisabled() {
+        serviceMap = null;
+    }
+
+    @Override
+    public AzureStorageCredentialsDetails getStorageCredentialsDetails(Map<String, String> attributes) throws ProcessException {
+        final AzureStorageCredentialsService service = lookupAzureStorageCredentialsService(attributes);
+
+        return service.getStorageCredentialsDetails(attributes);
+    }
+
+    private AzureStorageCredentialsService lookupAzureStorageCredentialsService(Map<String, String> attributes) {
+        if (!attributes.containsKey(AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE)) {
+            throw new ProcessException("Attributes must contain an attribute name '" + AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE + "'");
+        }
+
+        final String storageCredentialService = attributes.get(AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE);
+        if (StringUtils.isBlank(storageCredentialService)) {
+            throw new ProcessException(AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE + " cannot be null or blank");
+        }
+
+        final AzureStorageCredentialsService service = serviceMap.get(storageCredentialService);
+        if (service == null) {
+            throw new ProcessException("No " + AzureStorageCredentialsService.class.getSimpleName() + " was found for " +
+                    AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE + " '" + storageCredentialService + "'");
+        }
+
+        return service;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000..659452b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService
+org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerServiceLookup
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/AbstractAzureBlobStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/AbstractAzureBlobStorageIT.java
deleted file mode 100644
index aebace6..0000000
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/AbstractAzureBlobStorageIT.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure;
-
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.CloudBlob;
-import com.microsoft.azure.storage.blob.CloudBlobContainer;
-import org.apache.nifi.processors.azure.storage.AzureTestUtil;
-import org.junit.Assert;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.security.InvalidKeyException;
-
-public class AbstractAzureBlobStorageIT {
-
-    protected final static String SAMPLE_FILE_NAME = "/hello.txt";
-    protected final static String SAMPLE_BLOB_NAME = "testing";
-
-    protected void uploadBlob(String containerName, String filePath) throws URISyntaxException, StorageException, InvalidKeyException, IOException {
-        CloudBlobContainer container = AzureTestUtil.getContainer(containerName);
-        CloudBlob blob = container.getBlockBlobReference(SAMPLE_BLOB_NAME);
-        blob.uploadFromFile(filePath);
-    }
-
-    protected String getFileFromResource(String fileName) {
-        URI uri = null;
-        try {
-            uri = this.getClass().getResource(fileName).toURI();
-        } catch (URISyntaxException e) {
-            Assert.fail("Cannot proceed without File : " + fileName);
-        }
-
-        return uri.toString();
-    }
-
-}
-
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorageIT.java
new file mode 100644
index 0000000..4361de7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorageIT.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.UUID;
+
+public abstract class AbstractAzureBlobStorageIT extends AbstractAzureStorageIT {
+
+    protected static final String TEST_CONTAINER_NAME_PREFIX = "nifi-test-container";
+    protected static final String TEST_BLOB_NAME = "nifi-test-blob";
+
+    protected CloudBlobContainer container;
+
+    @Before
+    public void setUpAzureBlobStorageIT() throws Exception {
+        String containerName = String.format("%s-%s", TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID());
+        CloudBlobClient blobClient = getStorageAccount().createCloudBlobClient();
+        container = blobClient.getContainerReference(containerName);
+        container.createIfNotExists();
+
+        runner.setProperty(AzureStorageUtils.CONTAINER, containerName);
+    }
+
+    @After
+    public void tearDownAzureBlobStorageIT() throws Exception {
+        container.deleteIfExists();
+    }
+
+    protected void uploadTestBlob() throws Exception {
+        CloudBlob blob = container.getBlockBlobReference(TEST_BLOB_NAME);
+        byte[] buf = "0123456789".getBytes();
+        InputStream in = new ByteArrayInputStream(buf);
+        blob.upload(in, 10);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureStorageIT.java
new file mode 100644
index 0000000..f5fa2e5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureStorageIT.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService;
+import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.Before;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.fail;
+
+public abstract class AbstractAzureStorageIT {private static final Properties CONFIG;
+
+    private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
+
+    static {
+        final FileInputStream fis;
+        CONFIG = new Properties();
+        try {
+            fis = new FileInputStream(CREDENTIALS_FILE);
+            try {
+                CONFIG.load(fis);
+            } catch (IOException e) {
+                fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
+            } finally {
+                FileUtils.closeQuietly(fis);
+            }
+        } catch (FileNotFoundException e) {
+            fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
+        }
+    }
+
+    protected static String getAccountName() {
+        return CONFIG.getProperty("accountName");
+    }
+
+    protected static String getAccountKey() {
+        return CONFIG.getProperty("accountKey");
+    }
+
+    protected TestRunner runner;
+
+    @Before
+    public void setUpAzureStorageIT() {
+        runner = TestRunners.newTestRunner(getProcessorClass());
+
+        runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, getAccountName());
+        runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, getAccountKey());
+    }
+
+    protected abstract Class<? extends Processor> getProcessorClass();
+
+    protected CloudStorageAccount getStorageAccount() throws Exception {
+        StorageCredentials storageCredentials = new StorageCredentialsAccountAndKey(getAccountName(), getAccountKey());
+        return new CloudStorageAccount(storageCredentials, true);
+    }
+
+    protected void configureCredentialsService() throws Exception {
+        runner.removeProperty(AzureStorageUtils.ACCOUNT_NAME);
+        runner.removeProperty(AzureStorageUtils.ACCOUNT_KEY);
+
+        AzureStorageCredentialsService credentialsService = new AzureStorageCredentialsControllerService();
+
+        runner.addControllerService("credentials-service", credentialsService);
+
+        runner.setProperty(credentialsService, AzureStorageUtils.ACCOUNT_NAME, getAccountName());
+        runner.setProperty(credentialsService, AzureStorageUtils.ACCOUNT_KEY, getAccountKey());
+
+        runner.assertValid(credentialsService);
+
+        runner.enableControllerService(credentialsService);
+
+        runner.setProperty(AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, credentialsService.getIdentifier());
+    }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java
deleted file mode 100644
index 6d3a692..0000000
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage;
-
-import static org.junit.Assert.fail;
-
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.security.InvalidKeyException;
-import java.util.Iterator;
-import java.util.Properties;
-
-import com.microsoft.azure.storage.queue.CloudQueue;
-import com.microsoft.azure.storage.queue.CloudQueueClient;
-import com.microsoft.azure.storage.queue.CloudQueueMessage;
-import org.apache.nifi.util.file.FileUtils;
-
-import com.microsoft.azure.storage.CloudStorageAccount;
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.CloudBlobClient;
-import com.microsoft.azure.storage.blob.CloudBlobContainer;
-
-public class AzureTestUtil {
-
-    private static final Properties CONFIG;
-
-    private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
-    private static final String FORMAT_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
-
-    public static final String TEST_BLOB_NAME = "testing";
-    public static final String TEST_STORAGE_QUEUE = "testqueue";
-    public static final String TEST_CONTAINER_NAME_PREFIX = "nifitest";
-
-    public static CloudQueue cloudQueue;
-
-    static {
-        final FileInputStream fis;
-        CONFIG = new Properties();
-        try {
-            fis = new FileInputStream(CREDENTIALS_FILE);
-            try {
-                CONFIG.load(fis);
-            } catch (IOException e) {
-                fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
-            } finally {
-                FileUtils.closeQuietly(fis);
-            }
-        } catch (FileNotFoundException e) {
-            fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
-        }
-
-    }
-
-    public static String getAccountName() {
-        return CONFIG.getProperty("accountName");
-    }
-
-    public static String getAccountKey() {
-        return CONFIG.getProperty("accountKey");
-    }
-
-    public static CloudBlobContainer getContainer(String containerName) throws InvalidKeyException, URISyntaxException, StorageException {
-        CloudBlobClient blobClient = getStorageAccount().createCloudBlobClient();
-        return blobClient.getContainerReference(containerName);
-    }
-
-    public static CloudQueue getQueue(String queueName) throws URISyntaxException, InvalidKeyException, StorageException {
-        CloudQueueClient cloudQueueClient = getStorageAccount().createCloudQueueClient();
-        cloudQueue = cloudQueueClient.getQueueReference(queueName);
-        return cloudQueue;
-    }
-
-    private static CloudStorageAccount getStorageAccount() throws URISyntaxException, InvalidKeyException {
-        String storageConnectionString = String.format(FORMAT_CONNECTION_STRING, getAccountName(), getAccountKey());
-        return CloudStorageAccount.parse(storageConnectionString);
-    }
-
-    public static int getQueueCount() throws StorageException {
-        Iterator<CloudQueueMessage> retrievedMessages = cloudQueue.retrieveMessages(10, 1, null, null).iterator();
-        int count = 0;
-
-        while (retrievedMessages.hasNext()) {
-            retrievedMessages.next();
-            count++;
-        }
-
-        return count;
-    }
-}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage.java
index b7481d4..0143b41 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage.java
@@ -1,61 +1,66 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage;
-
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.security.InvalidKeyException;
-import java.util.UUID;
-
-import org.apache.nifi.processors.azure.AbstractAzureBlobStorageIT;
-import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Test;
-
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.CloudBlobContainer;
-
-public class ITDeleteAzureBlobStorage extends AbstractAzureBlobStorageIT{
-
-    @Test
-    public void testDeleteBlob() throws StorageException, URISyntaxException, InvalidKeyException, IOException {
-        String containerName = String.format("%s-%s", AzureTestUtil.TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID());
-        CloudBlobContainer container = AzureTestUtil.getContainer(containerName);
-        container.createIfNotExists();
-
-        uploadBlob(containerName, getFileFromResource(SAMPLE_FILE_NAME));
-
-        final TestRunner runner = TestRunners.newTestRunner(DeleteAzureBlobStorage.class);
-
-        try {
-            runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
-            runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
-            runner.setProperty(AzureStorageUtils.CONTAINER, containerName);
-            runner.setProperty(DeleteAzureBlobStorage.BLOB, AzureTestUtil.TEST_BLOB_NAME);
-
-            runner.enqueue(new byte[0]);
-            runner.run(1);
-
-            runner.assertAllFlowFilesTransferred(DeleteAzureBlobStorage.REL_SUCCESS);
-
-        } finally {
-            container.deleteIfExists();
-        }
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import com.microsoft.azure.storage.blob.ListBlobItem;
+import org.apache.nifi.processor.Processor;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+
+public class ITDeleteAzureBlobStorage extends AbstractAzureBlobStorageIT {
+
+    @Override
+    protected Class<? extends Processor> getProcessorClass() {
+        return DeleteAzureBlobStorage.class;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        runner.setProperty(DeleteAzureBlobStorage.BLOB, TEST_BLOB_NAME);
+
+        uploadTestBlob();
+    }
+
+    @Test
+    public void testDeleteBlob() {
+        runner.assertValid();
+        runner.enqueue(new byte[0]);
+        runner.run(1);
+
+        assertResult();
+    }
+
+    @Test
+    public void testDeleteBlobUsingCredentialsService() throws Exception {
+        configureCredentialsService();
+
+        runner.assertValid();
+        runner.enqueue(new byte[0]);
+        runner.run(1);
+
+        assertResult();
+    }
+
+    private void assertResult() {
+        runner.assertAllFlowFilesTransferred(DeleteAzureBlobStorage.REL_SUCCESS);
+
+        Iterable<ListBlobItem> blobs = container.listBlobs(TEST_BLOB_NAME);
+        assertFalse(blobs.iterator().hasNext());
+    }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
index 1810a08..873390c 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
@@ -16,63 +16,54 @@
  */
 package org.apache.nifi.processors.azure.storage;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URISyntaxException;
-import java.security.InvalidKeyException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
+import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
-import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
 import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
 import org.junit.Test;
 
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.CloudBlob;
-import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import java.util.List;
+
+public class ITFetchAzureBlobStorage extends AbstractAzureBlobStorageIT {
+
+    @Override
+    protected Class<? extends Processor> getProcessorClass() {
+        return FetchAzureBlobStorage.class;
+    }
 
-public class ITFetchAzureBlobStorage {
+    @Before
+    public void setUp() throws Exception {
+        runner.setProperty(FetchAzureBlobStorage.BLOB, TEST_BLOB_NAME);
+
+        uploadTestBlob();
+    }
 
     @Test
-    public void testFetchingBlob() throws InvalidKeyException, URISyntaxException, StorageException, IOException {
-        String containerName = String.format("%s-%s", AzureTestUtil.TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID());
-        CloudBlobContainer container = AzureTestUtil.getContainer(containerName);
-        container.createIfNotExists();
+    public void testFetchBlob() throws Exception {
+        runner.assertValid();
+        runner.enqueue(new byte[0]);
+        runner.run();
 
-        CloudBlob blob = container.getBlockBlobReference(AzureTestUtil.TEST_BLOB_NAME);
-        byte[] buf = "0123456789".getBytes();
-        InputStream in = new ByteArrayInputStream(buf);
-        blob.upload(in, 10);
+        assertResult();
+    }
 
-        final TestRunner runner = TestRunners.newTestRunner(new FetchAzureBlobStorage());
+    @Test
+    public void testFetchBlobUsingCredentialService() throws Exception {
+        configureCredentialsService();
 
-        try {
-            runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
-            runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
-            runner.setProperty(AzureStorageUtils.CONTAINER, containerName);
-            runner.setProperty(FetchAzureBlobStorage.BLOB, "${azure.blobname}");
+        runner.assertValid();
+        runner.enqueue(new byte[0]);
+        runner.run();
 
-            final Map<String, String> attributes = new HashMap<>();
-            attributes.put("azure.primaryUri", "https://" + AzureTestUtil.getAccountName() + ".blob.core.windows.net/" + containerName + "/" + AzureTestUtil.TEST_BLOB_NAME);
-            attributes.put("azure.blobname", AzureTestUtil.TEST_BLOB_NAME);
-            attributes.put("azure.blobtype", AzureStorageUtils.BLOCK);
-            runner.enqueue(new byte[0], attributes);
-            runner.run();
+        assertResult();
+    }
 
-            runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor.REL_SUCCESS, 1);
-            List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS);
-            for (MockFlowFile flowFile : flowFilesForRelationship) {
-                flowFile.assertContentEquals("0123456789".getBytes());
-                flowFile.assertAttributeEquals("azure.length", "10");
-            }
-        } finally {
-            container.deleteIfExists();
+    private void assertResult() throws Exception {
+        runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor.REL_SUCCESS, 1);
+        List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS);
+        for (MockFlowFile flowFile : flowFilesForRelationship) {
+            flowFile.assertContentEquals("0123456789".getBytes());
+            flowFile.assertAttributeEquals("azure.length", "10");
         }
     }
 }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
index 311cf71..9806225 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
@@ -16,55 +16,52 @@
  */
 package org.apache.nifi.processors.azure.storage;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URISyntaxException;
-import java.security.InvalidKeyException;
-import java.util.UUID;
-
-import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.processor.Processor;
 import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
 import org.junit.Test;
 
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.CloudBlob;
-import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import java.util.concurrent.TimeUnit;
+
+public class ITListAzureBlobStorage extends AbstractAzureBlobStorageIT {
 
-public class ITListAzureBlobStorage {
+    @Override
+    protected Class<? extends Processor> getProcessorClass() {
+        return ListAzureBlobStorage.class;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        uploadTestBlob();
+
+        Thread.sleep(ListAzureBlobStorage.LISTING_LAG_MILLIS.get(TimeUnit.SECONDS));
+    }
 
     @Test
-    public void testListsAzureBlobStorageContent() throws InvalidKeyException, StorageException, URISyntaxException, IOException {
-        String containerName = String.format("%s-%s", AzureTestUtil.TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID());
-        CloudBlobContainer container = AzureTestUtil.getContainer(containerName);
-        container.createIfNotExists();
+    public void testListBlobs() {
+        runner.assertValid();
+        runner.run(1);
 
-        CloudBlob blob = container.getBlockBlobReference(AzureTestUtil.TEST_BLOB_NAME);
-        byte[] buf = "0123456789".getBytes();
-        InputStream in = new ByteArrayInputStream(buf);
-        blob.upload(in, 10);
+        assertResult();
+    }
 
-        final TestRunner runner = TestRunners.newTestRunner(new ListAzureBlobStorage());
+    @Test
+    public void testListBlobsUsingCredentialService() throws Exception {
+        configureCredentialsService();
 
-        try {
-            runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
-            runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
-            runner.setProperty(AzureStorageUtils.CONTAINER, containerName);
+        runner.assertValid();
+        runner.run(1);
 
-            // requires multiple runs to deal with List processor checking
-            runner.run(3);
+        assertResult();
+    }
 
-            runner.assertTransferCount(ListAzureBlobStorage.REL_SUCCESS, 1);
-            runner.assertAllFlowFilesTransferred(ListAzureBlobStorage.REL_SUCCESS, 1);
+    private void assertResult() {
+        runner.assertTransferCount(ListAzureBlobStorage.REL_SUCCESS, 1);
+        runner.assertAllFlowFilesTransferred(ListAzureBlobStorage.REL_SUCCESS, 1);
 
-            for (MockFlowFile entry : runner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS)) {
-                entry.assertAttributeEquals("azure.length", "10");
-                entry.assertAttributeEquals("mime.type", "application/octet-stream");
-            }
-        } finally {
-            container.deleteIfExists();
+        for (MockFlowFile entry : runner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS)) {
+            entry.assertAttributeEquals("azure.length", "10");
+            entry.assertAttributeEquals("mime.type", "application/octet-stream");
         }
     }
 }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java
new file mode 100644
index 0000000..e006c2c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import com.microsoft.azure.storage.blob.ListBlobItem;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public class ITPutAzureBlobStorage extends AbstractAzureBlobStorageIT {
+
+    @Override
+    protected Class<? extends Processor> getProcessorClass() {
+        return PutAzureBlobStorage.class;
+    }
+
+    @Before
+    public void setUp() {
+        runner.setProperty(PutAzureBlobStorage.BLOB, TEST_BLOB_NAME);
+    }
+
+    @Test
+    public void testPutBlob() throws Exception {
+        runner.assertValid();
+        runner.enqueue("0123456789".getBytes());
+        runner.run();
+
+        assertResult();
+    }
+
+    @Test
+    public void testPutBlobUsingCredentialsService() throws Exception {
+        configureCredentialsService();
+
+        runner.assertValid();
+        runner.enqueue("0123456789".getBytes());
+        runner.run();
+
+        assertResult();
+    }
+
+    private void assertResult() throws Exception {
+        runner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1);
+        List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureBlobStorage.REL_SUCCESS);
+        for (MockFlowFile flowFile : flowFilesForRelationship) {
+            flowFile.assertContentEquals("0123456789".getBytes());
+            flowFile.assertAttributeEquals("azure.length", "10");
+        }
+
+        Iterable<ListBlobItem> blobs = container.listBlobs(TEST_BLOB_NAME);
+        assertTrue(blobs.iterator().hasNext());
+    }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java
deleted file mode 100644
index bfa1d4a..0000000
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage;
-
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.security.InvalidKeyException;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Test;
-
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.CloudBlobContainer;
-
-public class ITPutAzureStorageBlob {
-
-    @Test
-    public void testPuttingBlob() throws IOException, InvalidKeyException, StorageException, URISyntaxException {
-        String containerName = String.format("%s-%s", AzureTestUtil.TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID());
-        CloudBlobContainer container = AzureTestUtil.getContainer(containerName);
-        container.createIfNotExists();
-
-        final TestRunner runner = TestRunners.newTestRunner(new PutAzureBlobStorage());
-
-        try {
-            runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
-            runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
-            runner.setProperty(AzureStorageUtils.CONTAINER, containerName);
-            runner.setProperty(FetchAzureBlobStorage.BLOB, "testingUpload");
-
-            runner.enqueue("0123456789".getBytes());
-            runner.run();
-
-            runner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1);
-            List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureBlobStorage.REL_SUCCESS);
-            for (MockFlowFile flowFile : flowFilesForRelationship) {
-                flowFile.assertContentEquals("0123456789".getBytes());
-                flowFile.assertAttributeEquals("azure.length", "10");
-            }
-        } finally {
-            container.deleteIfExists();
-        }
-    }
-}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorageIT.java
new file mode 100644
index 0000000..43657d8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorageIT.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.queue.CloudQueue;
+import com.microsoft.azure.storage.queue.CloudQueueClient;
+import com.microsoft.azure.storage.queue.CloudQueueMessage;
+import org.apache.nifi.processors.azure.storage.AbstractAzureStorageIT;
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.Iterator;
+import java.util.UUID;
+
+public abstract class AbstractAzureQueueStorageIT extends AbstractAzureStorageIT {
+
+    protected static final String TEST_QUEUE_NAME_PREFIX = "nifi-test-queue";
+
+    protected CloudQueue cloudQueue;
+
+    @Before
+    public void setUpAzureQueueStorageIT() throws Exception {
+        String queueName = String.format("%s-%s", TEST_QUEUE_NAME_PREFIX, UUID.randomUUID());
+        CloudQueueClient cloudQueueClient = getStorageAccount().createCloudQueueClient();
+        cloudQueue = cloudQueueClient.getQueueReference(queueName);
+        cloudQueue.createIfNotExists();
+
+        runner.setProperty(AbstractAzureQueueStorage.QUEUE, queueName);
+    }
+
+    @After
+    public void tearDownAzureQueueStorageIT() throws Exception {
+        cloudQueue.deleteIfExists();
+    }
+
+    protected int getMessageCount() throws Exception {
+        Iterator<CloudQueueMessage> retrievedMessages = cloudQueue.retrieveMessages(10, 1, null, null).iterator();
+        int count = 0;
+
+        while (retrievedMessages.hasNext()) {
+            retrievedMessages.next();
+            count++;
+        }
+
+        return count;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorageIT.java
index 1711bbd..a446f35 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorageIT.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorageIT.java
@@ -17,133 +17,119 @@
 package org.apache.nifi.processors.azure.storage.queue;
 
 import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.queue.CloudQueue;
 import com.microsoft.azure.storage.queue.CloudQueueMessage;
-import org.apache.nifi.processors.azure.storage.AzureTestUtil;
+import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
 import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
 
-import java.net.URISyntaxException;
-import java.security.InvalidKeyException;
 import java.util.List;
 
-public class GetAzureQueueStorageIT {
+public class GetAzureQueueStorageIT extends AbstractAzureQueueStorageIT {
 
-    private final TestRunner runner = TestRunners.newTestRunner(GetAzureQueueStorage.class);
-    private static CloudQueue cloudQueue;
+    @Override
+    protected Class<? extends Processor> getProcessorClass() {
+        return GetAzureQueueStorage.class;
+    }
 
-    @BeforeClass
-    public static void setup() throws InvalidKeyException, StorageException, URISyntaxException {
-        cloudQueue = AzureTestUtil.getQueue(AzureTestUtil.TEST_STORAGE_QUEUE);
-        cloudQueue.createIfNotExists();
+    @Before
+    public void setUp() throws StorageException {
+        cloudQueue.addMessage(new CloudQueueMessage("Dummy Message 1"), 604800, 0, null, null);
+        cloudQueue.addMessage(new CloudQueueMessage("Dummy Message 2"), 604800, 0, null, null);
+        cloudQueue.addMessage(new CloudQueueMessage("Dummy Message 3"), 604800, 0, null, null);
     }
 
     @Test
-    public void testGetWithAutoDeleteFalse() throws StorageException, InterruptedException {
-        cloudQueue.clear();
-        insertDummyMessages();
-
-        runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
-        runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
-        runner.setProperty(GetAzureQueueStorage.QUEUE, AzureTestUtil.TEST_STORAGE_QUEUE);
-        runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "10");
-        runner.setProperty(GetAzureQueueStorage.AUTO_DELETE, "false");
-        runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "1 secs");
-
+    public void testSimpleGet() throws Exception {
+        runner.assertValid();
         runner.run(1);
 
-        final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(GetAzureQueueStorage.REL_SUCCESS);
-        Assert.assertFalse(mockFlowFiles.isEmpty());
-
-        Thread.sleep(1500);
-        cloudQueue.downloadAttributes();
-        Assert.assertEquals(3, cloudQueue.getApproximateMessageCount());
+        assertResult(0);
     }
 
     @Test
-    public void testGetWithELAndAutoDeleteTrue() throws StorageException, InterruptedException {
-        cloudQueue.clear();
-        insertDummyMessages();
+    public void testSimpleGetWithCredentialsService() throws Exception {
+        configureCredentialsService();
 
+        runner.assertValid();
+        runner.run(1);
+
+        assertResult(0);
+    }
+
+    @Test
+    public void testSimpleGetWithEL() throws Exception {
         runner.setValidateExpressionUsage(true);
 
-        runner.setVariable("account.name", AzureTestUtil.getAccountName());
-        runner.setVariable("account.key", AzureTestUtil.getAccountKey());
-        runner.setVariable("queue.name", AzureTestUtil.TEST_STORAGE_QUEUE);
+        runner.setVariable("account.name", getAccountName());
+        runner.setVariable("account.key", getAccountKey());
+        runner.setVariable("queue.name", cloudQueue.getName());
 
         runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "${account.name}");
         runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "${account.key}");
         runner.setProperty(GetAzureQueueStorage.QUEUE, "${queue.name}");
-        runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "10");
-        runner.setProperty(GetAzureQueueStorage.AUTO_DELETE, "true");
-        runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "1 secs");
 
+        runner.assertValid();
         runner.run(1);
 
-        final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(GetAzureQueueStorage.REL_SUCCESS);
-        Assert.assertFalse(mockFlowFiles.isEmpty());
+        assertResult(0);
+    }
 
-        Thread.sleep(1500);
-        cloudQueue.downloadAttributes();
-        Assert.assertEquals(0, cloudQueue.getApproximateMessageCount());
+    @Test
+    public void testGetWithAutoDeleteFalse() throws Exception {
+        runner.setProperty(GetAzureQueueStorage.AUTO_DELETE, "false");
+
+        runner.assertValid();
+        runner.run(1);
+
+        assertResult(3);
     }
 
     @Test
-    public void testGetWithVisibilityTimeout() throws StorageException, InterruptedException {
-        cloudQueue.clear();
-        insertDummyMessages();
-
-        runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
-        runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
-        runner.setProperty(GetAzureQueueStorage.QUEUE, AzureTestUtil.TEST_STORAGE_QUEUE);
-        runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "10");
+    public void testGetWithVisibilityTimeout() throws Exception {
         runner.setProperty(GetAzureQueueStorage.AUTO_DELETE, "false");
         runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "1 secs");
 
+        runner.assertValid();
         runner.run(1);
 
-        final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(GetAzureQueueStorage.REL_SUCCESS);
-        Assert.assertFalse(mockFlowFiles.isEmpty());
-        Assert.assertEquals(0, AzureTestUtil.getQueueCount());
+        runner.assertAllFlowFilesTransferred(GetAzureQueueStorage.REL_SUCCESS, 3);
+        Assert.assertEquals(0, getMessageCount());
 
         Thread.sleep(1500);
-        Assert.assertEquals(3, AzureTestUtil.getQueueCount());
+        Assert.assertEquals(3, getMessageCount());
     }
 
     @Test
-    public void testGetWithBatchSize() throws StorageException {
-        cloudQueue.clear();
-        insertDummyMessages();
-
-        runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
-        runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
-        runner.setProperty(GetAzureQueueStorage.QUEUE, AzureTestUtil.TEST_STORAGE_QUEUE);
+    public void testGetWithBatchSize() throws Exception {
         runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "2");
-        runner.setProperty(GetAzureQueueStorage.AUTO_DELETE, "true");
-        runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "1 secs");
 
+        runner.assertValid();
         runner.run(1);
+
         runner.assertAllFlowFilesTransferred(GetAzureQueueStorage.REL_SUCCESS, 2);
+        cloudQueue.downloadAttributes();
+        Assert.assertEquals(1, cloudQueue.getApproximateMessageCount());
 
         runner.run(1);
-        runner.assertAllFlowFilesTransferred(GetAzureQueueStorage.REL_SUCCESS, 3);
 
+        runner.assertAllFlowFilesTransferred(GetAzureQueueStorage.REL_SUCCESS, 3);
+        cloudQueue.downloadAttributes();
+        Assert.assertEquals(0, cloudQueue.getApproximateMessageCount());
     }
 
-    private static void insertDummyMessages() throws StorageException {
-        cloudQueue.addMessage(new CloudQueueMessage("Dummy Message 1"), 604800, 0, null, null);
-        cloudQueue.addMessage(new CloudQueueMessage("Dummy Message 2"), 604800, 0, null, null);
-        cloudQueue.addMessage(new CloudQueueMessage("Dummy Message 3"), 604800, 0, null, null);
-    }
+    private void assertResult(int expectedMessageCountInQueue) throws Exception {
+        runner.assertAllFlowFilesTransferred(GetAzureQueueStorage.REL_SUCCESS, 3);
 
-    @AfterClass
-    public static void cleanup() throws StorageException {
-        cloudQueue.deleteIfExists();
+        List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(GetAzureQueueStorage.REL_SUCCESS);
+        int i = 1;
+        for (MockFlowFile mockFlowFile : mockFlowFiles) {
+            mockFlowFile.assertContentEquals("Dummy Message " + i++);
+        }
+
+        cloudQueue.downloadAttributes();
+        Assert.assertEquals(expectedMessageCountInQueue, cloudQueue.getApproximateMessageCount());
     }
 }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorageIT.java
index e02f16d..7d6fe9d 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorageIT.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorageIT.java
@@ -1,118 +1,99 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage.queue;
-
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.queue.CloudQueue;
-import org.apache.nifi.processors.azure.storage.AzureTestUtil;
-import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.net.URISyntaxException;
-import java.security.InvalidKeyException;
-
-public class PutAzureQueueStorageIT {
-
-    private final TestRunner runner = TestRunners.newTestRunner(PutAzureQueueStorage.class);
-    private static CloudQueue cloudQueue;
-
-    @BeforeClass
-    public static void setup() throws InvalidKeyException, StorageException, URISyntaxException {
-        cloudQueue = AzureTestUtil.getQueue(AzureTestUtil.TEST_STORAGE_QUEUE);
-        cloudQueue.createIfNotExists();
-    }
-
-    @Test
-    public void testSimplePut() throws InvalidKeyException, StorageException, URISyntaxException {
-        runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
-        runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
-        runner.setProperty(PutAzureQueueStorage.QUEUE, AzureTestUtil.TEST_STORAGE_QUEUE);
-
-        runner.enqueue("Dummy message");
-        runner.run(1);
-
-        runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
-    }
-
-    @Test
-    public void testSimplePutWithEL() throws StorageException, URISyntaxException, InvalidKeyException {
-        runner.setValidateExpressionUsage(true);
-
-        runner.setVariable("account.name", AzureTestUtil.getAccountName());
-        runner.setVariable("account.key", AzureTestUtil.getAccountKey());
-        runner.setVariable("queue.name", AzureTestUtil.TEST_STORAGE_QUEUE);
-
-        runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "${account.name}");
-        runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "${account.key}");
-        runner.setProperty(PutAzureQueueStorage.QUEUE, "${queue.name}");
-
-        runner.enqueue("Dummy message");
-        runner.run(1);
-
-        runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
-    }
-
-    @Test
-    public void testPutWithTTL() throws StorageException, InterruptedException {
-        cloudQueue.clear();
-
-        runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
-        runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
-        runner.setProperty(PutAzureQueueStorage.QUEUE, AzureTestUtil.TEST_STORAGE_QUEUE);
-        runner.setProperty(PutAzureQueueStorage.TTL, "2 secs");
-
-        runner.enqueue("Dummy message");
-        runner.run();
-
-        runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
-        Assert.assertEquals(1, AzureTestUtil.getQueueCount());
-
-        Thread.sleep(2400);
-        Assert.assertEquals(0, AzureTestUtil.getQueueCount());
-    }
-
-    @Test
-    public void testPutWithVisibilityDelay() throws StorageException, InterruptedException {
-        cloudQueue.clear();
-
-        cloudQueue.clear();
-
-        runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
-        runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
-        runner.setProperty(PutAzureQueueStorage.QUEUE, AzureTestUtil.TEST_STORAGE_QUEUE);
-        runner.setProperty(PutAzureQueueStorage.VISIBILITY_DELAY, "2 secs");
-
-        runner.enqueue("Dummy message");
-        runner.run(1);
-
-        runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
-        Assert.assertEquals(0, AzureTestUtil.getQueueCount());
-
-        Thread.sleep(2400);
-        Assert.assertEquals(1, AzureTestUtil.getQueueCount());
-    }
-
-    @AfterClass
-    public static void cleanup() throws StorageException {
-        cloudQueue.deleteIfExists();
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PutAzureQueueStorageIT extends AbstractAzureQueueStorageIT {
+
+    @Override
+    protected Class<? extends Processor> getProcessorClass() {
+        return PutAzureQueueStorage.class;
+    }
+
+    @Test
+    public void testSimplePut() {
+        runner.assertValid();
+        runner.enqueue("Dummy message");
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testSimplePutWithCredentialsService() throws Exception {
+        configureCredentialsService();
+
+        runner.assertValid();
+        runner.enqueue("Dummy message");
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testSimplePutWithEL() {
+        runner.setValidateExpressionUsage(true);
+
+        runner.setVariable("account.name", getAccountName());
+        runner.setVariable("account.key", getAccountKey());
+        runner.setVariable("queue.name", cloudQueue.getName());
+
+        runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "${account.name}");
+        runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "${account.key}");
+        runner.setProperty(PutAzureQueueStorage.QUEUE, "${queue.name}");
+
+        runner.assertValid();
+        runner.enqueue("Dummy message");
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testPutWithTTL() throws Exception {
+        runner.setProperty(PutAzureQueueStorage.TTL, "2 secs");
+
+        runner.assertValid();
+        runner.enqueue("Dummy message");
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
+        Assert.assertEquals(1, getMessageCount());
+
+        Thread.sleep(2400);
+        Assert.assertEquals(0, getMessageCount());
+    }
+
+    @Test
+    public void testPutWithVisibilityDelay() throws Exception {
+        runner.setProperty(PutAzureQueueStorage.VISIBILITY_DELAY, "2 secs");
+
+        runner.assertValid();
+        runner.enqueue("Dummy message");
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
+        Assert.assertEquals(0, getMessageCount());
+
+        Thread.sleep(2400);
+        Assert.assertEquals(1, getMessageCount());
+    }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java
new file mode 100644
index 0000000..bdc360b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
+import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
+import com.microsoft.azure.storage.core.Base64;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.azure.storage.ListAzureBlobStorage;
+import org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService;
+import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.apache.nifi.util.MockProcessContext;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestAzureStorageUtilsGetStorageCredentialsDetails {
+
+    private static final String CREDENTIALS_SERVICE_VALUE = "CredentialsService";
+    private static final String ACCOUNT_NAME_VALUE = "AccountName";
+    private static final String ACCOUNT_KEY_VALUE = Base64.encode("AccountKey".getBytes());
+    private static final String SAS_TOKEN_VALUE = "SasToken";
+
+    private MockProcessContext processContext;
+
+    @Before
+    public void setUp() {
+        Processor processor = new ListAzureBlobStorage();
+        processContext = new MockProcessContext(processor);
+    }
+
+    @Test
+    public void testAccountNameAndAccountKeyConfiguredOnProcessor() {
+        configureProcessorProperties(ACCOUNT_NAME_VALUE, ACCOUNT_KEY_VALUE, null);
+
+        AzureStorageCredentialsDetails storageCredentialsDetails = AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
+
+        assertStorageCredentialsDetailsAccountNameAndAccountKey(storageCredentialsDetails);
+    }
+
+    @Test
+    public void testAccountNameAndSasTokenConfiguredOnProcessor() {
+        configureProcessorProperties(ACCOUNT_NAME_VALUE, null, SAS_TOKEN_VALUE);
+
+        AzureStorageCredentialsDetails storageCredentialsDetails = AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
+
+        assertStorageCredentialsDetailsAccountNameAndSasToken(storageCredentialsDetails);
+    }
+
+    @Test
+    public void testAccountNameAndAccountKeyConfiguredOnControllerService() {
+        configureControllerService(ACCOUNT_NAME_VALUE, ACCOUNT_KEY_VALUE, null);
+
+        AzureStorageCredentialsDetails storageCredentialsDetails = AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
+
+        assertStorageCredentialsDetailsAccountNameAndAccountKey(storageCredentialsDetails);
+    }
+
+    @Test
+    public void testAccountNameAndSasTokenConfiguredOnControllerService() {
+        configureControllerService(ACCOUNT_NAME_VALUE, null, SAS_TOKEN_VALUE);
+
+        AzureStorageCredentialsDetails storageCredentialsDetails = AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
+
+        assertStorageCredentialsDetailsAccountNameAndSasToken(storageCredentialsDetails);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testAccountNameMissingConfiguredOnProcessor() {
+        configureProcessorProperties(null, ACCOUNT_KEY_VALUE, null);
+
+        AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testAccountKeyAndSasTokenMissingConfiguredOnProcessor() {
+        configureProcessorProperties(ACCOUNT_NAME_VALUE, null, null);
+
+        AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testAccountNameMissingConfiguredOnControllerService() {
+        configureControllerService(null, ACCOUNT_KEY_VALUE, null);
+
+        AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testAccountKeyAndSasTokenMissingConfiguredOnControllerService() {
+        configureControllerService(ACCOUNT_NAME_VALUE, null, null);
+
+        AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
+    }
+
+    private void configureProcessorProperties(String accountName, String accountKey, String sasToken) {
+        if (accountName != null) {
+            processContext.setProperty(AzureStorageUtils.ACCOUNT_NAME, ACCOUNT_NAME_VALUE);
+        }
+        if (accountKey != null) {
+            processContext.setProperty(AzureStorageUtils.ACCOUNT_KEY, ACCOUNT_KEY_VALUE);
+        }
+        if (sasToken != null) {
+            processContext.setProperty(AzureStorageUtils.PROP_SAS_TOKEN, sasToken);
+        }
+    }
+
+    private void configureControllerService(String accountName, String accountKey, String sasToken) {
+        AzureStorageCredentialsControllerService credentialsService = new AzureStorageCredentialsControllerService();
+
+        Map<PropertyDescriptor, String> properties = new HashMap<>();
+        if (accountName != null) {
+            properties.put(AzureStorageUtils.ACCOUNT_NAME, accountName);
+        }
+        if (accountKey != null) {
+            properties.put(AzureStorageUtils.ACCOUNT_KEY, accountKey);
+        }
+        if (sasToken != null) {
+            properties.put(AzureStorageUtils.PROP_SAS_TOKEN, sasToken);
+        }
+
+        MockConfigurationContext configurationContext = new MockConfigurationContext(properties, null);
+        credentialsService.onEnabled(configurationContext);
+
+        processContext.addControllerService(credentialsService, CREDENTIALS_SERVICE_VALUE);
+        processContext.setProperty(AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, CREDENTIALS_SERVICE_VALUE);
+    }
+
+    private void assertStorageCredentialsDetailsAccountNameAndAccountKey(AzureStorageCredentialsDetails storageCredentialsDetails) {
+        assertEquals(ACCOUNT_NAME_VALUE, storageCredentialsDetails.getStorageAccountName());
+        assertTrue(storageCredentialsDetails.getStorageCredentials() instanceof StorageCredentialsAccountAndKey);
+        StorageCredentialsAccountAndKey storageCredentials = (StorageCredentialsAccountAndKey) storageCredentialsDetails.getStorageCredentials();
+        assertEquals(ACCOUNT_NAME_VALUE, storageCredentials.getAccountName());
+        assertEquals(ACCOUNT_KEY_VALUE, storageCredentials.exportBase64EncodedKey());
+    }
+
+    private void assertStorageCredentialsDetailsAccountNameAndSasToken(AzureStorageCredentialsDetails storageCredentialsDetails) {
+        assertEquals(ACCOUNT_NAME_VALUE, storageCredentialsDetails.getStorageAccountName());
+        assertTrue(storageCredentialsDetails.getStorageCredentials() instanceof StorageCredentialsSharedAccessSignature);
+        StorageCredentialsSharedAccessSignature storageCredentials = (StorageCredentialsSharedAccessSignature) storageCredentialsDetails.getStorageCredentials();
+        assertEquals(SAS_TOKEN_VALUE, storageCredentials.getToken());
+    }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsValidateCredentialProperties.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsValidateCredentialProperties.java
new file mode 100644
index 0000000..95bd130
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsValidateCredentialProperties.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.azure.storage.ListAzureBlobStorage;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.MockValidationContext;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestAzureStorageUtilsValidateCredentialProperties {
+
+    private static final String CREDENTIALS_SERVICE_VALUE = "CredentialsService";
+    private static final String ACCOUNT_NAME_VALUE = "AccountName";
+    private static final String ACCOUNT_KEY_VALUE = "AccountKey";
+    private static final String SAS_TOKEN_VALUE = "SasToken";
+
+    private MockProcessContext processContext;
+    private MockValidationContext validationContext;
+
+    @Before
+    public void setUp() {
+        Processor processor = new ListAzureBlobStorage();
+        processContext = new MockProcessContext(processor);
+        validationContext = new MockValidationContext(processContext);
+    }
+
+    @Test
+    public void testValidWithCredentialsService() {
+        configureCredentialsService();
+
+        Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
+
+        assertValid(result);
+    }
+
+    @Test
+    public void testValidWithAccountNameAndAccountKey() {
+        configureAccountName();
+        configureAccountKey();
+
+        Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
+
+        assertValid(result);
+    }
+
+    @Test
+    public void testValidWithAccountNameAndSasToken() {
+        configureAccountName();
+        configureSasToken();
+
+        Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
+
+        assertValid(result);
+    }
+
+    @Test
+    public void testNotValidBecauseNothingSpecified() {
+        Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
+
+        assertNotValid(result);
+    }
+
+    @Test
+    public void testNotValidBecauseBothCredentialsServiceAndAccountNameSpecified() {
+        configureCredentialsService();
+        configureAccountName();
+
+        Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
+
+        assertNotValid(result);
+    }
+
+    @Test
+    public void testNotValidBecauseBothCredentialsServiceAndAccountKeySpecified() {
+        configureCredentialsService();
+        configureAccountKey();
+
+        Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
+
+        assertNotValid(result);
+    }
+
+    @Test
+    public void testNotValidBecauseBothCredentialsServiceAndSasTokenSpecified() {
+        configureCredentialsService();
+        configureSasToken();
+
+        Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
+
+        assertNotValid(result);
+    }
+
+    @Test
+    public void testNotValidBecauseAccountNameSpecifiedWithoutAccountKeyOrSasToken() {
+        configureAccountName();
+
+        Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
+
+        assertNotValid(result);
+    }
+
+    @Test
+    public void testNotValidBecauseAccountNameSpecifiedWithBothAccountKeyAndSasToken() {
+        configureAccountName();
+        configureAccountKey();
+        configureSasToken();
+
+        Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
+
+        assertNotValid(result);
+    }
+
+    private void configureCredentialsService() {
+        processContext.setProperty(AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, CREDENTIALS_SERVICE_VALUE);
+    }
+
+    private void configureAccountName() {
+        processContext.setProperty(AzureStorageUtils.ACCOUNT_NAME, ACCOUNT_NAME_VALUE);
+    }
+
+    private void configureAccountKey() {
+        processContext.setProperty(AzureStorageUtils.ACCOUNT_KEY, ACCOUNT_KEY_VALUE);
+    }
+
+    private void configureSasToken() {
+        processContext.setProperty(AzureStorageUtils.PROP_SAS_TOKEN, SAS_TOKEN_VALUE);
+    }
+
+    private void assertValid(Collection<ValidationResult> result) {
+        assertTrue("There should be no validation error", result.isEmpty());
+    }
+
+    private void assertNotValid(Collection<ValidationResult> result) {
+        assertFalse("There should be validation error", result.isEmpty());
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerService.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerService.java
new file mode 100644
index 0000000..18fbed8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerService.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.storage;
+
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAzureStorageCredentialsControllerService {
+
+    private static final String ACCOUNT_NAME_VALUE = "AccountName";
+    private static final String ACCOUNT_KEY_VALUE = "AccountKey";
+    private static final String SAS_TOKEN_VALUE = "SasToken";
+
+    private TestRunner runner;
+    private AzureStorageCredentialsService credentialsService;
+
+    @Before
+    public void setUp() throws InitializationException {
+        runner = TestRunners.newTestRunner(NoOpProcessor.class);
+        credentialsService = new AzureStorageCredentialsControllerService();
+        runner.addControllerService("credentials-service", credentialsService);
+    }
+
+    @Test
+    public void testValidWithAccountNameAndAccountKey() {
+        configureAccountName();
+        configureAccountKey();
+
+        runner.assertValid(credentialsService);
+    }
+
+    @Test
+    public void testValidWithAccountNameAndSasToken() {
+        configureAccountName();
+        configureSasToken();
+
+        runner.assertValid(credentialsService);
+    }
+
+    @Test
+    public void testNotValidBecauseAccountNameMissing() {
+        configureAccountKey();
+
+        runner.assertNotValid(credentialsService);
+    }
+
+    @Test
+    public void testNotValidBecauseAccountKeyAndSasTokenMissing() {
+        configureAccountName();
+
+        runner.assertNotValid(credentialsService);
+    }
+
+    @Test
+    public void testNotValidBecauseBothAccountKeyAndSasTokenSpecified() {
+        configureAccountName();
+        configureAccountKey();
+        configureSasToken();
+
+        runner.assertNotValid(credentialsService);
+    }
+
+    private void configureAccountName() {
+        runner.setProperty(credentialsService, AzureStorageCredentialsControllerService.ACCOUNT_NAME, ACCOUNT_NAME_VALUE);
+    }
+
+    private void configureAccountKey() {
+        runner.setProperty(credentialsService, AzureStorageUtils.ACCOUNT_KEY, ACCOUNT_KEY_VALUE);
+    }
+
+    private void configureSasToken() {
+        runner.setProperty(credentialsService, AzureStorageUtils.PROP_SAS_TOKEN, SAS_TOKEN_VALUE);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerServiceLookup.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerServiceLookup.java
new file mode 100644
index 0000000..72471bf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerServiceLookup.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.storage;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TestAzureStorageCredentialsControllerServiceLookup {
+
+    private MockAzureStorageCredentialsService serviceA;
+    private MockAzureStorageCredentialsService serviceB;
+
+    private AzureStorageCredentialsControllerServiceLookup lookupService;
+    private TestRunner runner;
+
+    @Before
+    public void setup() throws InitializationException {
+        serviceA = new MockAzureStorageCredentialsService(new AzureStorageCredentialsDetails("Account_A", null));
+        serviceB = new MockAzureStorageCredentialsService(new AzureStorageCredentialsDetails("Account_B", null));
+
+        lookupService = new AzureStorageCredentialsControllerServiceLookup();
+
+        runner = TestRunners.newTestRunner(NoOpProcessor.class);
+
+        final String serviceAIdentifier = "service-a";
+        runner.addControllerService(serviceAIdentifier, serviceA);
+
+        final String serviceBIdentifier = "service-b";
+        runner.addControllerService(serviceBIdentifier, serviceB);
+
+        runner.addControllerService("lookup-service", lookupService);
+        runner.setProperty(lookupService, "a", serviceAIdentifier);
+        runner.setProperty(lookupService, "b", serviceBIdentifier);
+
+        runner.enableControllerService(serviceA);
+        runner.enableControllerService(serviceB);
+        runner.enableControllerService(lookupService);
+    }
+
+    @Test
+    public void testLookupServiceA() {
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(AzureStorageCredentialsControllerServiceLookup.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE, "a");
+
+        final AzureStorageCredentialsDetails storageCredentialsDetails = lookupService.getStorageCredentialsDetails(attributes);
+        assertNotNull(storageCredentialsDetails);
+        assertEquals("Account_A", storageCredentialsDetails.getStorageAccountName());
+    }
+
+    @Test
+    public void testLookupServiceB() {
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(AzureStorageCredentialsControllerServiceLookup.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE, "b");
+
+        final AzureStorageCredentialsDetails storageCredentialsDetails = lookupService.getStorageCredentialsDetails(attributes);
+        assertNotNull(storageCredentialsDetails);
+        assertEquals("Account_B", storageCredentialsDetails.getStorageAccountName());
+    }
+
+    @Test(expected = ProcessException.class)
+    public void testLookupMissingCredentialsNameAttribute() {
+        final Map<String,String> attributes = new HashMap<>();
+        lookupService.getStorageCredentialsDetails(attributes);
+    }
+
+    @Test(expected = ProcessException.class)
+    public void testLookupWithCredentialsNameThatDoesNotExist() {
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(AzureStorageCredentialsControllerServiceLookup.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE, "DOES-NOT-EXIST");
+        lookupService.getStorageCredentialsDetails(attributes);
+    }
+
+    @Test
+    public void testCustomValidateAtLeaseOneServiceDefined() throws InitializationException {
+        // enable lookup service with no services registered, verify not valid
+        runner = TestRunners.newTestRunner(NoOpProcessor.class);
+        runner.addControllerService("lookup-service", lookupService);
+        runner.assertNotValid(lookupService);
+
+        final String serviceAIdentifier = "service-a";
+        runner.addControllerService(serviceAIdentifier, serviceA);
+
+        // register a service and now verify valid
+        runner.setProperty(lookupService, "a", serviceAIdentifier);
+        runner.enableControllerService(lookupService);
+        runner.assertValid(lookupService);
+    }
+
+    @Test
+    public void testCustomValidateSelfReferenceNotAllowed() throws InitializationException {
+        runner = TestRunners.newTestRunner(NoOpProcessor.class);
+        runner.addControllerService("lookup-service", lookupService);
+        runner.setProperty(lookupService, "lookup-service", "lookup-service");
+        runner.assertNotValid(lookupService);
+    }
+
+    /**
+     * A mock AzureStorageCredentialsService that will always return the passed in AzureStorageCredentialsDetails.
+     */
+    private static class MockAzureStorageCredentialsService extends AbstractControllerService implements AzureStorageCredentialsService {
+
+        private AzureStorageCredentialsDetails storageCredentialsDetails;
+
+        MockAzureStorageCredentialsService(AzureStorageCredentialsDetails storageCredentialsDetails) {
+            this.storageCredentialsDetails = storageCredentialsDetails;
+        }
+
+        @Override
+        public AzureStorageCredentialsDetails getStorageCredentialsDetails(Map<String, String> attributes) {
+            return storageCredentialsDetails;
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/resources/hello.txt b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/resources/hello.txt
deleted file mode 100644
index ee13cb7..0000000
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/resources/hello.txt
+++ /dev/null
@@ -1 +0,0 @@
-Hello, World!!
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api-nar/pom.xml
similarity index 81%
copy from nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
copy to nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api-nar/pom.xml
index 09863c5..a22754b 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api-nar/pom.xml
@@ -13,17 +13,19 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <groupId>org.apache.nifi</groupId>
         <artifactId>nifi-azure-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
         <version>1.10.0-SNAPSHOT</version>
     </parent>
+    <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>nifi-azure-nar</artifactId>
+    <artifactId>nifi-azure-services-api-nar</artifactId>
     <packaging>nar</packaging>
+
     <properties>
         <maven.javadoc.skip>true</maven.javadoc.skip>
         <source.skip>true</source.skip>
@@ -32,16 +34,14 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-azure-processors</artifactId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
             <version>1.10.0-SNAPSHOT</version>
+            <type>nar</type>
         </dependency>
-        
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <artifactId>nifi-azure-services-api</artifactId>
             <version>1.10.0-SNAPSHOT</version>
-            <type>nar</type>
-        </dependency>  
+        </dependency>
     </dependencies>
-
 </project>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml
similarity index 61%
copy from nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
copy to nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml
index 09863c5..f6638be 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml
@@ -13,35 +13,32 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <groupId>org.apache.nifi</groupId>
         <artifactId>nifi-azure-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
         <version>1.10.0-SNAPSHOT</version>
     </parent>
+    <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>nifi-azure-nar</artifactId>
-    <packaging>nar</packaging>
-    <properties>
-        <maven.javadoc.skip>true</maven.javadoc.skip>
-        <source.skip>true</source.skip>
-    </properties>
+    <artifactId>nifi-azure-services-api</artifactId>
 
     <dependencies>
         <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-azure-processors</artifactId>
-            <version>1.10.0-SNAPSHOT</version>
+            <groupId>com.microsoft.azure</groupId>
+            <artifactId>azure-storage</artifactId>
+        </dependency>
+        <!-- overriding jackson-core in azure-storage -->
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>${jackson.version}</version>
         </dependency>
-        
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-standard-services-api-nar</artifactId>
-            <version>1.10.0-SNAPSHOT</version>
-            <type>nar</type>
-        </dependency>  
+            <artifactId>nifi-api</artifactId>
+        </dependency>
     </dependencies>
-
 </project>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsDetails.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsDetails.java
new file mode 100644
index 0000000..e95e486
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsDetails.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.storage;
+
+import com.microsoft.azure.storage.StorageCredentials;
+
+public class AzureStorageCredentialsDetails {
+
+    private final String storageAccountName;
+
+    private final StorageCredentials storageCredentials;
+
+    public AzureStorageCredentialsDetails(String storageAccountName, StorageCredentials storageCredentials) {
+        this.storageAccountName = storageAccountName;
+        this.storageCredentials = storageCredentials;
+    }
+
+    public String getStorageAccountName() {
+        return storageAccountName;
+    }
+
+    public StorageCredentials getStorageCredentials() {
+        return storageCredentials;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsService.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsService.java
new file mode 100644
index 0000000..e281e75
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsService.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.storage;
+
+import org.apache.nifi.controller.ControllerService;
+
+import java.util.Map;
+
+/**
+ * AzureStorageCredentialsService interface to support getting Storage Account Name and Storage Credentials
+ * used for instantiating Azure Storage clients.
+ */
+public interface AzureStorageCredentialsService extends ControllerService {
+
+    /**
+     * Get AzureStorageCredentialsDetails object which contains the Storage Account Name and the Storage Credentials
+     * @param attributes FlowFile attributes (typically)
+     * @return AzureStorageCredentialsDetails object
+     */
+    AzureStorageCredentialsDetails getStorageCredentialsDetails(Map<String, String> attributes);
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/pom.xml
index cd2dc56..722a5dd 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/pom.xml
@@ -25,9 +25,30 @@
     <artifactId>nifi-azure-bundle</artifactId>
     <packaging>pom</packaging>
 
+    <properties>
+        <azure-storage.version>8.4.0</azure-storage.version>
+    </properties>
+
     <modules>
         <module>nifi-azure-processors</module>
         <module>nifi-azure-nar</module>
+        <module>nifi-azure-services-api</module>
+        <module>nifi-azure-services-api-nar</module>
     </modules>
 
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>com.microsoft.azure</groupId>
+                <artifactId>azure-storage</artifactId>
+                <version>${azure-storage.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>com.fasterxml.jackson.core</groupId>
+                        <artifactId>jackson-core</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
 </project>