You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2019/09/26 16:37:10 UTC
[nifi] branch master updated: NIFI-6550: Create controller service
for Azure Storage Credentials
This is an automated email from the ASF dual-hosted git repository.
ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new a4c3ca5 NIFI-6550: Create controller service for Azure Storage Credentials
a4c3ca5 is described below
commit a4c3ca50dd819e7cffb563c46675bc7ab9d6cd5c
Author: Peter Turcsanyi <tu...@cloudera.com>
AuthorDate: Fri Sep 13 22:10:09 2019 +0200
NIFI-6550: Create controller service for Azure Storage Credentials
Added AzureStorageCredentialsService controller service interface
with 2 implementations:
- AzureStorageCredentialsControllerService: holds the credential properties
(account name, account key, sas token)
- AzureStorageCredentialsControllerServiceLookup: can be used to dynamically
look up another AzureStorageCredentialsService (similar to
DBCPConnectionPoolLookup)
The controller service can be used by the Azure Storage processors:
{List|Fetch|Put|Delete}AzureBlobStorage, {Get|Put}AzureQueueStorage
NIFI-6550: Review changes.
NIFI-6550: Review changes #2.
This closes #3742.
Signed-off-by: Koji Kawamura <ij...@apache.org>
---
nifi-assembly/pom.xml | 6 +
.../java/org/apache/nifi/util/NoOpProcessor.java | 29 ++
.../nifi-azure-bundle/nifi-azure-nar/pom.xml | 2 +-
.../nifi-azure-processors/pom.xml | 18 +-
.../azure/AbstractAzureBlobProcessor.java | 3 +-
.../azure/eventhub/ConsumeAzureEventHub.java | 5 +-
.../azure/storage/ListAzureBlobStorage.java | 4 +-
.../storage/queue/AbstractAzureQueueStorage.java | 190 ++++++------
.../azure/storage/queue/GetAzureQueueStorage.java | 4 +-
.../azure/storage/queue/PutAzureQueueStorage.java | 318 ++++++++++-----------
.../azure/storage/utils/AzureStorageUtils.java | 127 ++++----
.../AzureStorageCredentialsControllerService.java | 103 +++++++
...eStorageCredentialsControllerServiceLookup.java | 141 +++++++++
.../org.apache.nifi.controller.ControllerService | 16 ++
.../azure/AbstractAzureBlobStorageIT.java | 53 ----
.../azure/storage/AbstractAzureBlobStorageIT.java | 58 ++++
.../azure/storage/AbstractAzureStorageIT.java | 101 +++++++
.../processors/azure/storage/AzureTestUtil.java | 105 -------
.../azure/storage/ITDeleteAzureBlobStorage.java | 127 ++++----
.../azure/storage/ITFetchAzureBlobStorage.java | 81 +++---
.../azure/storage/ITListAzureBlobStorage.java | 71 +++--
.../azure/storage/ITPutAzureBlobStorage.java | 72 +++++
.../azure/storage/ITPutAzureStorageBlob.java | 63 ----
.../storage/queue/AbstractAzureQueueStorageIT.java | 61 ++++
.../storage/queue/GetAzureQueueStorageIT.java | 138 ++++-----
.../storage/queue/PutAzureQueueStorageIT.java | 217 +++++++-------
...reStorageUtilsGetStorageCredentialsDetails.java | 164 +++++++++++
...reStorageUtilsValidateCredentialProperties.java | 159 +++++++++++
...stAzureStorageCredentialsControllerService.java | 93 ++++++
...eStorageCredentialsControllerServiceLookup.java | 139 +++++++++
.../src/test/resources/hello.txt | 1 -
.../pom.xml | 22 +-
.../pom.xml | 35 ++-
.../storage/AzureStorageCredentialsDetails.java | 39 +++
.../storage/AzureStorageCredentialsService.java | 35 +++
nifi-nar-bundles/nifi-azure-bundle/pom.xml | 21 ++
36 files changed, 1899 insertions(+), 922 deletions(-)
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index f2c5021..8ce38b5 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -429,6 +429,12 @@ language governing permissions and limitations under the License. -->
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-azure-services-api-nar</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-scripting-nar</artifactId>
<version>1.10.0-SNAPSHOT</version>
<type>nar</type>
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/NoOpProcessor.java b/nifi-mock/src/main/java/org/apache/nifi/util/NoOpProcessor.java
new file mode 100644
index 0000000..eff3bff
--- /dev/null
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/NoOpProcessor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+public class NoOpProcessor extends AbstractProcessor {
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
index 09863c5..b61388a 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
@@ -38,7 +38,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-standard-services-api-nar</artifactId>
+ <artifactId>nifi-azure-services-api-nar</artifactId>
<version>1.10.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index f1f7d03..6df6b5b 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -45,6 +45,11 @@
<artifactId>nifi-proxy-configuration-api</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-azure-services-api</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>${azure-eventhubs.version}</version>
@@ -57,13 +62,12 @@
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId>
- <version>5.2.0</version>
- <exclusions>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </exclusion>
- </exclusions>
+ </dependency>
+ <!-- overriding jackson-core in azure-storage -->
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
index 2156b56..afee09f 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
@@ -56,9 +56,10 @@ public abstract class AbstractAzureBlobProcessor extends AbstractProcessor {
private static final List<PropertyDescriptor> PROPERTIES = Collections
.unmodifiableList(Arrays.asList(
AzureStorageUtils.CONTAINER,
- AzureStorageUtils.PROP_SAS_TOKEN,
+ AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE,
AzureStorageUtils.ACCOUNT_NAME,
AzureStorageUtils.ACCOUNT_KEY,
+ AzureStorageUtils.PROP_SAS_TOKEN,
BLOB,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
index e9bafb0..203dd5b 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
@@ -48,7 +48,6 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
@@ -91,6 +90,8 @@ import static org.apache.nifi.util.StringUtils.isEmpty;
})
public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
+ private static final String FORMAT_STORAGE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
+
static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder()
.name("event-hub-namespace")
.displayName("Event Hub Namespace")
@@ -626,7 +627,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
.evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
options.setReceiveTimeOut(Duration.ofMillis(receiveTimeoutMillis));
- final String storageConnectionString = String.format(AzureStorageUtils.FORMAT_BLOB_CONNECTION_STRING, storageAccountName, storageAccountKey);
+ final String storageConnectionString = String.format(FORMAT_STORAGE_CONNECTION_STRING, storageAccountName, storageAccountKey);
final ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder(namespaceName, eventHubName, sasName, sasKey);
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
index bf82029..0d950a6 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
@@ -96,9 +96,10 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
LISTING_STRATEGY,
AzureStorageUtils.CONTAINER,
- AzureStorageUtils.PROP_SAS_TOKEN,
+ AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE,
AzureStorageUtils.ACCOUNT_NAME,
AzureStorageUtils.ACCOUNT_KEY,
+ AzureStorageUtils.PROP_SAS_TOKEN,
PROP_PREFIX,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE,
ListedEntityTracker.TRACKING_STATE_CACHE,
@@ -113,6 +114,7 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
@Override
protected void customValidate(ValidationContext validationContext, Collection<ValidationResult> results) {
+ results.addAll(AzureStorageUtils.validateCredentialProperties(validationContext));
AzureStorageUtils.validateProxySpec(validationContext, results);
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java
index caab936..3266b56 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java
@@ -1,110 +1,80 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage.queue;
-
-import com.microsoft.azure.storage.CloudStorageAccount;
-import com.microsoft.azure.storage.StorageCredentials;
-import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
-import com.microsoft.azure.storage.queue.CloudQueueClient;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.security.InvalidKeyException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
-
- public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
- .name("storage-queue-name")
- .displayName("Queue Name")
- .description("Name of the Azure Storage Queue")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
-
- public static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("All successfully processed FlowFiles are routed to this relationship")
- .build();
-
- public static final Relationship REL_FAILURE = new Relationship.Builder()
- .name("failure")
- .description("Unsuccessful operations will be transferred to the failure relationship.")
- .build();
-
- private static final String FORMAT_QUEUE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
- private static final String FORMAT_QUEUE_BASE_URI = "https://%s.queue.core.windows.net";
-
- private static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
-
- @Override
- public Set<Relationship> getRelationships() {
- return relationships;
- }
-
- protected final CloudQueueClient createCloudQueueClient(final ProcessContext context, final FlowFile flowFile) {
- final String storageAccountName;
- final String storageAccountKey;
- final String sasToken;
- final String connectionString;
-
- if (flowFile == null) {
- storageAccountName = context.getProperty(AzureStorageUtils.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
- storageAccountKey = context.getProperty(AzureStorageUtils.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
- sasToken = context.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).evaluateAttributeExpressions().getValue();
- } else {
- storageAccountName = context.getProperty(AzureStorageUtils.ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
- storageAccountKey = context.getProperty(AzureStorageUtils.ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
- sasToken = context.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).evaluateAttributeExpressions(flowFile).getValue();
- }
-
- CloudQueueClient cloudQueueClient;
- try {
- if (StringUtils.isNoneBlank(sasToken)) {
- connectionString = String.format(FORMAT_QUEUE_BASE_URI, storageAccountName);
- StorageCredentials storageCredentials = new StorageCredentialsSharedAccessSignature(sasToken);
- cloudQueueClient = new CloudQueueClient(new URI(connectionString), storageCredentials);
- } else {
- connectionString = String.format(FORMAT_QUEUE_CONNECTION_STRING, storageAccountName, storageAccountKey);
- CloudStorageAccount storageAccount = CloudStorageAccount.parse(connectionString);
- cloudQueueClient = storageAccount.createCloudQueueClient();
- }
- } catch (IllegalArgumentException | URISyntaxException e) {
- getLogger().error("Invalid connection string URI for '{}'", new Object[]{context.getName()}, e);
- throw new IllegalArgumentException(e);
- } catch (InvalidKeyException e) {
- getLogger().error("Invalid connection credentials for '{}'", new Object[]{context.getName()}, e);
- throw new IllegalArgumentException(e);
- }
- return cloudQueueClient;
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.queue.CloudQueueClient;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails;
+
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
+
+ public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
+ .name("storage-queue-name")
+ .displayName("Queue Name")
+ .description("Name of the Azure Storage Queue")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All successfully processed FlowFiles are routed to this relationship")
+ .build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("Unsuccessful operations will be transferred to the failure relationship.")
+ .build();
+
+ private static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ protected final CloudQueueClient createCloudQueueClient(final ProcessContext context, final FlowFile flowFile) throws URISyntaxException {
+ final AzureStorageCredentialsDetails storageCredentialsDetails = AzureStorageUtils.getStorageCredentialsDetails(context, flowFile);
+ final CloudStorageAccount cloudStorageAccount = new CloudStorageAccount(storageCredentialsDetails.getStorageCredentials(), true, null, storageCredentialsDetails.getStorageAccountName());
+ final CloudQueueClient cloudQueueClient = cloudStorageAccount.createCloudQueueClient();
+
+ return cloudQueueClient;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ return AzureStorageUtils.validateCredentialProperties(validationContext);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java
index c3a2877..88a27f2 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java
@@ -94,8 +94,8 @@ public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
.build();
private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
- AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
- BATCH_SIZE, VISIBILITY_TIMEOUT, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
+ AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN,
+ QUEUE, AUTO_DELETE, BATCH_SIZE, VISIBILITY_TIMEOUT, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java
index 4172c89..480c41d 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java
@@ -1,159 +1,159 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage.queue;
-
-import com.microsoft.azure.storage.OperationContext;
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.queue.CloudQueue;
-import com.microsoft.azure.storage.queue.CloudQueueClient;
-import com.microsoft.azure.storage.queue.CloudQueueMessage;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
-
-import java.io.ByteArrayOutputStream;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-@SeeAlso({GetAzureQueueStorage.class})
-@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-@Tags({ "azure", "microsoft", "cloud", "storage", "queue", "enqueue" })
-@CapabilityDescription("Writes the content of the incoming FlowFiles to the configured Azure Queue Storage.")
-public class PutAzureQueueStorage extends AbstractAzureQueueStorage {
-
- public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
- .name("time-to-live")
- .displayName("TTL")
- .description("Maximum time to allow the message to be in the queue. If left empty, the default value of 7 days will be used.")
- .required(false)
- .defaultValue("7 days")
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor VISIBILITY_DELAY = new PropertyDescriptor.Builder()
- .name("visibility-delay")
- .displayName("Visibility Delay")
- .description("The length of time during which the message will be invisible, starting when it is added to the queue. " +
- "This value must be greater than or equal to 0 and less than the TTL value.")
- .required(false)
- .defaultValue("0 secs")
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .build();
-
- private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
- AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, TTL,
- QUEUE, VISIBILITY_DELAY, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
-
- @Override
- public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return properties;
- }
-
- @Override
- public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
- FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
- }
-
- final long startNanos = System.nanoTime();
-
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- session.exportTo(flowFile, baos);
- final String flowFileContent = baos.toString();
-
- CloudQueueMessage message = new CloudQueueMessage(flowFileContent);
- CloudQueueClient cloudQueueClient;
- CloudQueue cloudQueue;
-
- final int ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS).intValue();
- final int delay = context.getProperty(VISIBILITY_DELAY).asTimePeriod(TimeUnit.SECONDS).intValue();
- final String queue = context.getProperty(QUEUE).evaluateAttributeExpressions(flowFile).getValue().toLowerCase();
-
- try {
- cloudQueueClient = createCloudQueueClient(context, flowFile);
- cloudQueue = cloudQueueClient.getQueueReference(queue);
-
- final OperationContext operationContext = new OperationContext();
- AzureStorageUtils.setProxy(operationContext, context);
-
- cloudQueue.addMessage(message, ttl, delay, null, operationContext);
- } catch (URISyntaxException | StorageException e) {
- getLogger().error("Failed to write the message to Azure Queue Storage due to {}", new Object[]{e});
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
-
- session.transfer(flowFile, REL_SUCCESS);
- final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- session.getProvenanceReporter().send(flowFile, cloudQueue.getUri().toString(), transmissionMillis);
- }
-
- @Override
- public Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
- final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
-
- final boolean ttlSet = validationContext.getProperty(TTL).isSet();
- final boolean delaySet = validationContext.getProperty(VISIBILITY_DELAY).isSet();
-
- final int ttl = validationContext.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS).intValue();
-
- if (ttlSet) {
- final int SEVEN_DAYS_TIMEPERIOD_IN_SECS = 604800; // i.e. 7 * 24 * 60 * 60
-
- if (ttl > SEVEN_DAYS_TIMEPERIOD_IN_SECS) {
- problems.add(new ValidationResult.Builder()
- .subject(TTL.getDisplayName())
- .valid(false)
- .explanation(TTL.getDisplayName() + " exceeds the allowed limit of 7 days. Set a value less than 7 days")
- .build());
- }
- }
-
- if (delaySet) {
- int delay = validationContext.getProperty(VISIBILITY_DELAY).asTimePeriod(TimeUnit.SECONDS).intValue();
-
- if (delay > ttl || delay < 0) {
- problems.add(new ValidationResult.Builder()
- .subject(VISIBILITY_DELAY.getDisplayName())
- .valid(false)
- .explanation(VISIBILITY_DELAY.getDisplayName() + " should be greater than or equal to 0 and less than " + TTL.getDisplayName())
- .build());
- }
- }
-
- AzureStorageUtils.validateProxySpec(validationContext, problems);
-
- return problems;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.OperationContext;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueue;
+import com.microsoft.azure.storage.queue.CloudQueueClient;
+import com.microsoft.azure.storage.queue.CloudQueueMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@SeeAlso({GetAzureQueueStorage.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({ "azure", "microsoft", "cloud", "storage", "queue", "enqueue" })
+@CapabilityDescription("Writes the content of the incoming FlowFiles to the configured Azure Queue Storage.")
+public class PutAzureQueueStorage extends AbstractAzureQueueStorage {
+
+ public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
+ .name("time-to-live")
+ .displayName("TTL")
+ .description("Maximum time to allow the message to be in the queue. If left empty, the default value of 7 days will be used.")
+ .required(false)
+ .defaultValue("7 days")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor VISIBILITY_DELAY = new PropertyDescriptor.Builder()
+ .name("visibility-delay")
+ .displayName("Visibility Delay")
+ .description("The length of time during which the message will be invisible, starting when it is added to the queue. " +
+ "This value must be greater than or equal to 0 and less than the TTL value.")
+ .required(false)
+ .defaultValue("0 secs")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
+ AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN,
+ TTL, QUEUE, VISIBILITY_DELAY, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final long startNanos = System.nanoTime();
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ session.exportTo(flowFile, baos);
+ final String flowFileContent = baos.toString();
+
+ CloudQueueMessage message = new CloudQueueMessage(flowFileContent);
+ CloudQueueClient cloudQueueClient;
+ CloudQueue cloudQueue;
+
+ final int ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS).intValue();
+ final int delay = context.getProperty(VISIBILITY_DELAY).asTimePeriod(TimeUnit.SECONDS).intValue();
+ final String queue = context.getProperty(QUEUE).evaluateAttributeExpressions(flowFile).getValue().toLowerCase();
+
+ try {
+ cloudQueueClient = createCloudQueueClient(context, flowFile);
+ cloudQueue = cloudQueueClient.getQueueReference(queue);
+
+ final OperationContext operationContext = new OperationContext();
+ AzureStorageUtils.setProxy(operationContext, context);
+
+ cloudQueue.addMessage(message, ttl, delay, null, operationContext);
+ } catch (URISyntaxException | StorageException e) {
+ getLogger().error("Failed to write the message to Azure Queue Storage due to {}", new Object[]{e});
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ session.transfer(flowFile, REL_SUCCESS);
+ final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ session.getProvenanceReporter().send(flowFile, cloudQueue.getUri().toString(), transmissionMillis);
+ }
+
+ @Override
+ public Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+ final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
+
+ final boolean ttlSet = validationContext.getProperty(TTL).isSet();
+ final boolean delaySet = validationContext.getProperty(VISIBILITY_DELAY).isSet();
+
+ final int ttl = validationContext.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS).intValue();
+
+ if (ttlSet) {
+ final int SEVEN_DAYS_TIMEPERIOD_IN_SECS = 604800; // i.e. 7 * 24 * 60 * 60
+
+ if (ttl > SEVEN_DAYS_TIMEPERIOD_IN_SECS) {
+ problems.add(new ValidationResult.Builder()
+ .subject(TTL.getDisplayName())
+ .valid(false)
+ .explanation(TTL.getDisplayName() + " exceeds the allowed limit of 7 days. Set a value less than 7 days")
+ .build());
+ }
+ }
+
+ if (delaySet) {
+ int delay = validationContext.getProperty(VISIBILITY_DELAY).asTimePeriod(TimeUnit.SECONDS).intValue();
+
+ if (delay > ttl || delay < 0) {
+ problems.add(new ValidationResult.Builder()
+ .subject(VISIBILITY_DELAY.getDisplayName())
+ .valid(false)
+ .explanation(VISIBILITY_DELAY.getDisplayName() + " should be greater than or equal to 0 and less than " + TTL.getDisplayName())
+ .build());
+ }
+ }
+
+ AzureStorageUtils.validateProxySpec(validationContext, problems);
+
+ return problems;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
index 2821258..0e1dfd2 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
@@ -19,12 +19,14 @@ package org.apache.nifi.processors.azure.storage.utils;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
@@ -32,13 +34,15 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
+import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails;
+import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService;
-import java.net.URI;
import java.net.URISyntaxException;
-import java.security.InvalidKeyException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
public final class AzureStorageUtils {
public static final String BLOCK = "Block";
@@ -60,17 +64,24 @@ public final class AzureStorageUtils {
.sensitive(true)
.build();
+ public static final String ACCOUNT_NAME_BASE_DESCRIPTION =
+ "The storage account name. There are certain risks in allowing the account name to be stored as a flowfile " +
+ "attribute. While it does provide for a more flexible flow by allowing the account name to " +
+ "be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " +
+ "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
+ "In addition, the provenance repositories may be put on encrypted disk partitions.";
+
public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
.name("storage-account-name")
.displayName("Storage Account Name")
- .description("The storage account name. There are certain risks in allowing the account name to be stored as a flowfile " +
- "attribute. While it does provide for a more flexible flow by allowing the account name to " +
- "be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " +
- "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
- "In addition, the provenance repositories may be put on encrypted disk partitions.")
+ .description(ACCOUNT_NAME_BASE_DESCRIPTION +
+ " Instead of defining the Storage Account Name, Storage Account Key and SAS Token properties directly on the processor, " +
+ "the preferred way is to configure them through a controller service specified in the Storage Credentials property. " +
+ "The controller service can provide a common/shared configuration for multiple/all Azure processors. Furthermore, the credentials " +
+ "can also be looked up dynamically with the 'Lookup' version of the service.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .required(true)
+ .required(false)
.sensitive(true)
.build();
@@ -98,9 +109,16 @@ public final class AzureStorageUtils {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
- // use HTTPS by default as per MSFT recommendation
- public static final String FORMAT_BLOB_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
- public static final String FORMAT_BASE_URI = "https://%s.blob.core.windows.net";
+ public static final PropertyDescriptor STORAGE_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
+ .name("storage-credentials-service")
+ .displayName("Storage Credentials")
+ .description("The Controller Service used to obtain Azure Storage Credentials. Instead of the processor level properties, " +
+ "the credentials can be configured here through a common/shared controller service, which is the preferred way. " +
+ "The 'Lookup' version of the service can also be used to select the credentials dynamically at runtime " +
+ "based on a FlowFile attribute (if the processor has FlowFile input).")
+ .identifiesControllerService(AzureStorageCredentialsService.class)
+ .required(false)
+ .build();
private AzureStorageUtils() {
// do not instantiate
@@ -111,56 +129,65 @@ public final class AzureStorageUtils {
* @param flowFile An incoming FlowFile can be used for NiFi Expression Language evaluation to derive
* Account Name, Account Key or SAS Token. This can be null if not available.
*/
- public static CloudBlobClient createCloudBlobClient(ProcessContext context, ComponentLog logger, FlowFile flowFile) {
- final String accountName;
- final String accountKey;
- final String sasToken;
-
- if (flowFile == null) {
- accountName = context.getProperty(AzureStorageUtils.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
- accountKey = context.getProperty(AzureStorageUtils.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
- sasToken = context.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).evaluateAttributeExpressions().getValue();
+ public static CloudBlobClient createCloudBlobClient(ProcessContext context, ComponentLog logger, FlowFile flowFile) throws URISyntaxException {
+ final AzureStorageCredentialsDetails storageCredentialsDetails = getStorageCredentialsDetails(context, flowFile);
+ final CloudStorageAccount cloudStorageAccount = new CloudStorageAccount(storageCredentialsDetails.getStorageCredentials(), true, null, storageCredentialsDetails.getStorageAccountName());
+ final CloudBlobClient cloudBlobClient = cloudStorageAccount.createCloudBlobClient();
+
+ return cloudBlobClient;
+ }
+
+ public static AzureStorageCredentialsDetails getStorageCredentialsDetails(PropertyContext context, FlowFile flowFile) {
+ final Map<String, String> attributes = flowFile != null ? flowFile.getAttributes() : Collections.emptyMap();
+
+ final AzureStorageCredentialsService storageCredentialsService = context.getProperty(STORAGE_CREDENTIALS_SERVICE).asControllerService(AzureStorageCredentialsService.class);
+
+ if (storageCredentialsService != null) {
+ return storageCredentialsService.getStorageCredentialsDetails(attributes);
} else {
- accountName = context.getProperty(AzureStorageUtils.ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
- accountKey = context.getProperty(AzureStorageUtils.ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
- sasToken = context.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).evaluateAttributeExpressions(flowFile).getValue();
+ return createStorageCredentialsDetails(context, attributes);
}
+ }
+
+ public static AzureStorageCredentialsDetails createStorageCredentialsDetails(PropertyContext context, Map<String, String> attributes) {
+ final String accountName = context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(attributes).getValue();
+ final String accountKey = context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(attributes).getValue();
+ final String sasToken = context.getProperty(PROP_SAS_TOKEN).evaluateAttributeExpressions(attributes).getValue();
- CloudBlobClient cloudBlobClient;
-
- try {
- // sas token and acct name/key have different ways of creating a secure connection (e.g. new StorageCredentialsAccountAndKey didn't work)
- if (StringUtils.isNotBlank(sasToken)) {
- String storageConnectionString = String.format(AzureStorageUtils.FORMAT_BASE_URI, accountName);
- StorageCredentials creds = new StorageCredentialsSharedAccessSignature(sasToken);
- cloudBlobClient = new CloudBlobClient(new URI(storageConnectionString), creds);
- } else {
- String blobConnString = String.format(AzureStorageUtils.FORMAT_BLOB_CONNECTION_STRING, accountName, accountKey);
- CloudStorageAccount storageAccount = CloudStorageAccount.parse(blobConnString);
- cloudBlobClient = storageAccount.createCloudBlobClient();
- }
- } catch (IllegalArgumentException | URISyntaxException e) {
- logger.error("Invalid connection string URI for '{}'", new Object[]{context.getName()}, e);
- throw new IllegalArgumentException(e);
- } catch (InvalidKeyException e) {
- logger.error("Invalid connection credentials for '{}'", new Object[]{context.getName()}, e);
- throw new IllegalArgumentException(e);
+ if (StringUtils.isBlank(accountName)) {
+ throw new IllegalArgumentException(String.format("'%s' must not be empty.", ACCOUNT_NAME.getDisplayName()));
}
- return cloudBlobClient;
+ StorageCredentials storageCredentials;
+
+ if (StringUtils.isNotBlank(accountKey)) {
+ storageCredentials = new StorageCredentialsAccountAndKey(accountName, accountKey);
+ } else if (StringUtils.isNotBlank(sasToken)) {
+ storageCredentials = new StorageCredentialsSharedAccessSignature(sasToken);
+ } else {
+ throw new IllegalArgumentException(String.format("Either '%s' or '%s' must be defined.", ACCOUNT_KEY.getDisplayName(), PROP_SAS_TOKEN.getDisplayName()));
+ }
+
+ return new AzureStorageCredentialsDetails(accountName, storageCredentials);
}
public static Collection<ValidationResult> validateCredentialProperties(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
- String sasToken = validationContext.getProperty(PROP_SAS_TOKEN).getValue();
- String acctName = validationContext.getProperty(ACCOUNT_KEY).getValue();
- if ((StringUtils.isBlank(sasToken) && StringUtils.isBlank(acctName))
- || (StringUtils.isNotBlank(sasToken) && StringUtils.isNotBlank(acctName))) {
+ final String storageCredentials = validationContext.getProperty(STORAGE_CREDENTIALS_SERVICE).getValue();
+ final String accountName = validationContext.getProperty(ACCOUNT_NAME).getValue();
+ final String accountKey = validationContext.getProperty(ACCOUNT_KEY).getValue();
+ final String sasToken = validationContext.getProperty(PROP_SAS_TOKEN).getValue();
+
+ if (!((StringUtils.isNotBlank(storageCredentials) && StringUtils.isBlank(accountName) && StringUtils.isBlank(accountKey) && StringUtils.isBlank(sasToken))
+ || (StringUtils.isBlank(storageCredentials) && StringUtils.isNotBlank(accountName) && StringUtils.isNotBlank(accountKey) && StringUtils.isBlank(sasToken))
+ || (StringUtils.isBlank(storageCredentials) && StringUtils.isNotBlank(accountName) && StringUtils.isBlank(accountKey) && StringUtils.isNotBlank(sasToken)))) {
results.add(new ValidationResult.Builder().subject("AzureStorageUtils Credentials")
- .valid(false)
- .explanation("either Azure Account Key or Shared Access Signature required, but not both")
- .build());
+ .valid(false)
+ .explanation("either " + STORAGE_CREDENTIALS_SERVICE.getDisplayName()
+ + ", or " + ACCOUNT_NAME.getDisplayName() + " with " + ACCOUNT_KEY.getDisplayName()
+ + " or " + ACCOUNT_NAME.getDisplayName() + " with " + PROP_SAS_TOKEN.getDisplayName() + " must be specified")
+ .build());
}
return results;
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService.java
new file mode 100644
index 0000000..96d6476
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.storage;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation of AbstractControllerService interface
+ *
+ * @see AbstractControllerService
+ */
+@Tags({ "azure", "microsoft", "cloud", "storage", "blob", "queue", "credentials" })
+@CapabilityDescription("Defines credentials for Azure Storage processors. " +
+ "Uses Account Name with Account Key or Account Name with SAS Token.")
+public class AzureStorageCredentialsControllerService extends AbstractControllerService implements AzureStorageCredentialsService {
+
+ public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
+ .name(AzureStorageUtils.ACCOUNT_NAME.getName())
+ .displayName(AzureStorageUtils.ACCOUNT_NAME.getDisplayName())
+ .description(AzureStorageUtils.ACCOUNT_NAME_BASE_DESCRIPTION)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(true)
+ .sensitive(true)
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTIES = Collections
+ .unmodifiableList(Arrays.asList(
+ ACCOUNT_NAME,
+ AzureStorageUtils.ACCOUNT_KEY,
+ AzureStorageUtils.PROP_SAS_TOKEN));
+
+ private ConfigurationContext context;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ final List<ValidationResult> results = new ArrayList<>();
+
+ final String accountKey = validationContext.getProperty(AzureStorageUtils.ACCOUNT_KEY).getValue();
+ final String sasToken = validationContext.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).getValue();
+
+ if (StringUtils.isBlank(accountKey) && StringUtils.isBlank(sasToken)) {
+ results.add(new ValidationResult.Builder().subject("AzureStorageCredentialsControllerService")
+ .valid(false)
+ .explanation("either " + AzureStorageUtils.ACCOUNT_KEY.getDisplayName() + " or " + AzureStorageUtils.PROP_SAS_TOKEN.getDisplayName() + " is required")
+ .build());
+ } else if (StringUtils.isNotBlank(accountKey) && StringUtils.isNotBlank(sasToken)) {
+ results.add(new ValidationResult.Builder().subject("AzureStorageCredentialsControllerService")
+ .valid(false)
+ .explanation("cannot set both " + AzureStorageUtils.ACCOUNT_KEY.getDisplayName() + " and " + AzureStorageUtils.PROP_SAS_TOKEN.getDisplayName())
+ .build());
+ }
+
+ return results;
+ }
+
+ @OnEnabled
+ public void onEnabled(ConfigurationContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public AzureStorageCredentialsDetails getStorageCredentialsDetails(Map<String, String> attributes) {
+ return AzureStorageUtils.createStorageCredentialsDetails(context, attributes);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup.java
new file mode 100644
index 0000000..4899715
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.storage;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Tags({ "azure", "microsoft", "cloud", "storage", "blob", "queue", "credentials" })
+@CapabilityDescription("Provides an AzureStorageCredentialsService that can be used to dynamically select another AzureStorageCredentialsService. " +
+ "This service requires an attribute named 'azure.storage.credentials.name' to be passed in, and will throw an exception if the attribute is missing. " +
+ "The value of 'azure.storage.credentials.name' will be used to select the AzureStorageCredentialsService that has been registered with that name. " +
+ "This will allow multiple AzureStorageCredentialsServices to be defined and registered, and then selected dynamically at runtime by tagging flow files " +
+ "with the appropriate 'azure.storage.credentials.name' attribute.")
+@DynamicProperty(name = "The name to register AzureStorageCredentialsService", value = "The AzureStorageCredentialsService",
+ description = "If 'azure.storage.credentials.name' attribute contains the name of the dynamic property, then the AzureStorageCredentialsService (registered in the value) will be selected.",
+ expressionLanguageScope = ExpressionLanguageScope.NONE)
+public class AzureStorageCredentialsControllerServiceLookup extends AbstractControllerService implements AzureStorageCredentialsService {
+
+ public static final String AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE = "azure.storage.credentials.name";
+
+ private volatile Map<String, AzureStorageCredentialsService> serviceMap;
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .description("The " + AzureStorageCredentialsService.class.getSimpleName() + " to return when " + AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE + " = '" + propertyDescriptorName + "'")
+ .identifiesControllerService(AzureStorageCredentialsService.class)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext context) {
+ final List<ValidationResult> results = new ArrayList<>();
+
+ int numDefinedServices = 0;
+ for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
+ if (descriptor.isDynamic()) {
+ numDefinedServices++;
+ }
+
+ final String referencedId = context.getProperty(descriptor).getValue();
+ if (this.getIdentifier().equals(referencedId)) {
+ results.add(new ValidationResult.Builder()
+ .subject(descriptor.getDisplayName())
+ .explanation("the current service cannot be registered as an " + AzureStorageCredentialsService.class.getSimpleName() + " to lookup")
+ .valid(false)
+ .build());
+ }
+ }
+
+ if (numDefinedServices == 0) {
+ results.add(new ValidationResult.Builder()
+ .subject(this.getClass().getSimpleName())
+ .explanation("at least one " + AzureStorageCredentialsService.class.getSimpleName() + " must be defined via dynamic properties")
+ .valid(false)
+ .build());
+ }
+
+ return results;
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ final Map<String, AzureStorageCredentialsService> map = new HashMap<>();
+
+ for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
+ if (descriptor.isDynamic()) {
+ final AzureStorageCredentialsService service = context.getProperty(descriptor).asControllerService(AzureStorageCredentialsService.class);
+ map.put(descriptor.getName(), service);
+ }
+ }
+
+ serviceMap = Collections.unmodifiableMap(map);
+ }
+
+ @OnDisabled
+ public void onDisabled() {
+ serviceMap = null;
+ }
+
+ @Override
+ public AzureStorageCredentialsDetails getStorageCredentialsDetails(Map<String, String> attributes) throws ProcessException {
+ final AzureStorageCredentialsService service = lookupAzureStorageCredentialsService(attributes);
+
+ return service.getStorageCredentialsDetails(attributes);
+ }
+
+ private AzureStorageCredentialsService lookupAzureStorageCredentialsService(Map<String, String> attributes) {
+ if (!attributes.containsKey(AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE)) {
+ throw new ProcessException("Attributes must contain an attribute name '" + AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE + "'");
+ }
+
+ final String storageCredentialService = attributes.get(AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE);
+ if (StringUtils.isBlank(storageCredentialService)) {
+ throw new ProcessException(AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE + " cannot be null or blank");
+ }
+
+ final AzureStorageCredentialsService service = serviceMap.get(storageCredentialService);
+ if (service == null) {
+ throw new ProcessException("No " + AzureStorageCredentialsService.class.getSimpleName() + " was found for " +
+ AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE + " '" + storageCredentialService + "'");
+ }
+
+ return service;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000..659452b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService
+org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerServiceLookup
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/AbstractAzureBlobStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/AbstractAzureBlobStorageIT.java
deleted file mode 100644
index aebace6..0000000
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/AbstractAzureBlobStorageIT.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure;
-
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.CloudBlob;
-import com.microsoft.azure.storage.blob.CloudBlobContainer;
-import org.apache.nifi.processors.azure.storage.AzureTestUtil;
-import org.junit.Assert;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.security.InvalidKeyException;
-
-public class AbstractAzureBlobStorageIT {
-
- protected final static String SAMPLE_FILE_NAME = "/hello.txt";
- protected final static String SAMPLE_BLOB_NAME = "testing";
-
- protected void uploadBlob(String containerName, String filePath) throws URISyntaxException, StorageException, InvalidKeyException, IOException {
- CloudBlobContainer container = AzureTestUtil.getContainer(containerName);
- CloudBlob blob = container.getBlockBlobReference(SAMPLE_BLOB_NAME);
- blob.uploadFromFile(filePath);
- }
-
- protected String getFileFromResource(String fileName) {
- URI uri = null;
- try {
- uri = this.getClass().getResource(fileName).toURI();
- } catch (URISyntaxException e) {
- Assert.fail("Cannot proceed without File : " + fileName);
- }
-
- return uri.toString();
- }
-
-}
-
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorageIT.java
new file mode 100644
index 0000000..4361de7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorageIT.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.UUID;
+
+public abstract class AbstractAzureBlobStorageIT extends AbstractAzureStorageIT {
+
+ protected static final String TEST_CONTAINER_NAME_PREFIX = "nifi-test-container";
+ protected static final String TEST_BLOB_NAME = "nifi-test-blob";
+
+ protected CloudBlobContainer container;
+
+ @Before
+ public void setUpAzureBlobStorageIT() throws Exception {
+ String containerName = String.format("%s-%s", TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID());
+ CloudBlobClient blobClient = getStorageAccount().createCloudBlobClient();
+ container = blobClient.getContainerReference(containerName);
+ container.createIfNotExists();
+
+ runner.setProperty(AzureStorageUtils.CONTAINER, containerName);
+ }
+
+ @After
+ public void tearDownAzureBlobStorageIT() throws Exception {
+ container.deleteIfExists();
+ }
+
+ protected void uploadTestBlob() throws Exception {
+ CloudBlob blob = container.getBlockBlobReference(TEST_BLOB_NAME);
+ byte[] buf = "0123456789".getBytes();
+ InputStream in = new ByteArrayInputStream(buf);
+ blob.upload(in, 10);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureStorageIT.java
new file mode 100644
index 0000000..f5fa2e5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureStorageIT.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService;
+import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.Before;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.fail;
+
+public abstract class AbstractAzureStorageIT {private static final Properties CONFIG;
+
+ private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
+
+ static {
+ final FileInputStream fis;
+ CONFIG = new Properties();
+ try {
+ fis = new FileInputStream(CREDENTIALS_FILE);
+ try {
+ CONFIG.load(fis);
+ } catch (IOException e) {
+ fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
+ } finally {
+ FileUtils.closeQuietly(fis);
+ }
+ } catch (FileNotFoundException e) {
+ fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
+ }
+ }
+
+ protected static String getAccountName() {
+ return CONFIG.getProperty("accountName");
+ }
+
+ protected static String getAccountKey() {
+ return CONFIG.getProperty("accountKey");
+ }
+
+ protected TestRunner runner;
+
+ @Before
+ public void setUpAzureStorageIT() {
+ runner = TestRunners.newTestRunner(getProcessorClass());
+
+ runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, getAccountName());
+ runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, getAccountKey());
+ }
+
+ protected abstract Class<? extends Processor> getProcessorClass();
+
+ protected CloudStorageAccount getStorageAccount() throws Exception {
+ StorageCredentials storageCredentials = new StorageCredentialsAccountAndKey(getAccountName(), getAccountKey());
+ return new CloudStorageAccount(storageCredentials, true);
+ }
+
+ protected void configureCredentialsService() throws Exception {
+ runner.removeProperty(AzureStorageUtils.ACCOUNT_NAME);
+ runner.removeProperty(AzureStorageUtils.ACCOUNT_KEY);
+
+ AzureStorageCredentialsService credentialsService = new AzureStorageCredentialsControllerService();
+
+ runner.addControllerService("credentials-service", credentialsService);
+
+ runner.setProperty(credentialsService, AzureStorageUtils.ACCOUNT_NAME, getAccountName());
+ runner.setProperty(credentialsService, AzureStorageUtils.ACCOUNT_KEY, getAccountKey());
+
+ runner.assertValid(credentialsService);
+
+ runner.enableControllerService(credentialsService);
+
+ runner.setProperty(AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, credentialsService.getIdentifier());
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java
deleted file mode 100644
index 6d3a692..0000000
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage;
-
-import static org.junit.Assert.fail;
-
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.security.InvalidKeyException;
-import java.util.Iterator;
-import java.util.Properties;
-
-import com.microsoft.azure.storage.queue.CloudQueue;
-import com.microsoft.azure.storage.queue.CloudQueueClient;
-import com.microsoft.azure.storage.queue.CloudQueueMessage;
-import org.apache.nifi.util.file.FileUtils;
-
-import com.microsoft.azure.storage.CloudStorageAccount;
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.CloudBlobClient;
-import com.microsoft.azure.storage.blob.CloudBlobContainer;
-
-public class AzureTestUtil {
-
- private static final Properties CONFIG;
-
- private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
- private static final String FORMAT_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
-
- public static final String TEST_BLOB_NAME = "testing";
- public static final String TEST_STORAGE_QUEUE = "testqueue";
- public static final String TEST_CONTAINER_NAME_PREFIX = "nifitest";
-
- public static CloudQueue cloudQueue;
-
- static {
- final FileInputStream fis;
- CONFIG = new Properties();
- try {
- fis = new FileInputStream(CREDENTIALS_FILE);
- try {
- CONFIG.load(fis);
- } catch (IOException e) {
- fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
- } finally {
- FileUtils.closeQuietly(fis);
- }
- } catch (FileNotFoundException e) {
- fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
- }
-
- }
-
- public static String getAccountName() {
- return CONFIG.getProperty("accountName");
- }
-
- public static String getAccountKey() {
- return CONFIG.getProperty("accountKey");
- }
-
- public static CloudBlobContainer getContainer(String containerName) throws InvalidKeyException, URISyntaxException, StorageException {
- CloudBlobClient blobClient = getStorageAccount().createCloudBlobClient();
- return blobClient.getContainerReference(containerName);
- }
-
- public static CloudQueue getQueue(String queueName) throws URISyntaxException, InvalidKeyException, StorageException {
- CloudQueueClient cloudQueueClient = getStorageAccount().createCloudQueueClient();
- cloudQueue = cloudQueueClient.getQueueReference(queueName);
- return cloudQueue;
- }
-
- private static CloudStorageAccount getStorageAccount() throws URISyntaxException, InvalidKeyException {
- String storageConnectionString = String.format(FORMAT_CONNECTION_STRING, getAccountName(), getAccountKey());
- return CloudStorageAccount.parse(storageConnectionString);
- }
-
- public static int getQueueCount() throws StorageException {
- Iterator<CloudQueueMessage> retrievedMessages = cloudQueue.retrieveMessages(10, 1, null, null).iterator();
- int count = 0;
-
- while (retrievedMessages.hasNext()) {
- retrievedMessages.next();
- count++;
- }
-
- return count;
- }
-}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage.java
index b7481d4..0143b41 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage.java
@@ -1,61 +1,66 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage;
-
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.security.InvalidKeyException;
-import java.util.UUID;
-
-import org.apache.nifi.processors.azure.AbstractAzureBlobStorageIT;
-import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Test;
-
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.CloudBlobContainer;
-
-public class ITDeleteAzureBlobStorage extends AbstractAzureBlobStorageIT{
-
- @Test
- public void testDeleteBlob() throws StorageException, URISyntaxException, InvalidKeyException, IOException {
- String containerName = String.format("%s-%s", AzureTestUtil.TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID());
- CloudBlobContainer container = AzureTestUtil.getContainer(containerName);
- container.createIfNotExists();
-
- uploadBlob(containerName, getFileFromResource(SAMPLE_FILE_NAME));
-
- final TestRunner runner = TestRunners.newTestRunner(DeleteAzureBlobStorage.class);
-
- try {
- runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
- runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
- runner.setProperty(AzureStorageUtils.CONTAINER, containerName);
- runner.setProperty(DeleteAzureBlobStorage.BLOB, AzureTestUtil.TEST_BLOB_NAME);
-
- runner.enqueue(new byte[0]);
- runner.run(1);
-
- runner.assertAllFlowFilesTransferred(DeleteAzureBlobStorage.REL_SUCCESS);
-
- } finally {
- container.deleteIfExists();
- }
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import com.microsoft.azure.storage.blob.ListBlobItem;
+import org.apache.nifi.processor.Processor;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+
+public class ITDeleteAzureBlobStorage extends AbstractAzureBlobStorageIT {
+
+ @Override
+ protected Class<? extends Processor> getProcessorClass() {
+ return DeleteAzureBlobStorage.class;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ runner.setProperty(DeleteAzureBlobStorage.BLOB, TEST_BLOB_NAME);
+
+ uploadTestBlob();
+ }
+
+ @Test
+ public void testDeleteBlob() {
+ runner.assertValid();
+ runner.enqueue(new byte[0]);
+ runner.run(1);
+
+ assertResult();
+ }
+
+ @Test
+ public void testDeleteBlobUsingCredentialsService() throws Exception {
+ configureCredentialsService();
+
+ runner.assertValid();
+ runner.enqueue(new byte[0]);
+ runner.run(1);
+
+ assertResult();
+ }
+
+ private void assertResult() {
+ runner.assertAllFlowFilesTransferred(DeleteAzureBlobStorage.REL_SUCCESS);
+
+ Iterable<ListBlobItem> blobs = container.listBlobs(TEST_BLOB_NAME);
+ assertFalse(blobs.iterator().hasNext());
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
index 1810a08..873390c 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
@@ -16,63 +16,54 @@
*/
package org.apache.nifi.processors.azure.storage;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URISyntaxException;
-import java.security.InvalidKeyException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
+import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
-import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
import org.junit.Test;
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.CloudBlob;
-import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import java.util.List;
+
+public class ITFetchAzureBlobStorage extends AbstractAzureBlobStorageIT {
+
+ @Override
+ protected Class<? extends Processor> getProcessorClass() {
+ return FetchAzureBlobStorage.class;
+ }
-public class ITFetchAzureBlobStorage {
+ @Before
+ public void setUp() throws Exception {
+ runner.setProperty(FetchAzureBlobStorage.BLOB, TEST_BLOB_NAME);
+
+ uploadTestBlob();
+ }
@Test
- public void testFetchingBlob() throws InvalidKeyException, URISyntaxException, StorageException, IOException {
- String containerName = String.format("%s-%s", AzureTestUtil.TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID());
- CloudBlobContainer container = AzureTestUtil.getContainer(containerName);
- container.createIfNotExists();
+ public void testFetchBlob() throws Exception {
+ runner.assertValid();
+ runner.enqueue(new byte[0]);
+ runner.run();
- CloudBlob blob = container.getBlockBlobReference(AzureTestUtil.TEST_BLOB_NAME);
- byte[] buf = "0123456789".getBytes();
- InputStream in = new ByteArrayInputStream(buf);
- blob.upload(in, 10);
+ assertResult();
+ }
- final TestRunner runner = TestRunners.newTestRunner(new FetchAzureBlobStorage());
+ @Test
+ public void testFetchBlobUsingCredentialService() throws Exception {
+ configureCredentialsService();
- try {
- runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
- runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
- runner.setProperty(AzureStorageUtils.CONTAINER, containerName);
- runner.setProperty(FetchAzureBlobStorage.BLOB, "${azure.blobname}");
+ runner.assertValid();
+ runner.enqueue(new byte[0]);
+ runner.run();
- final Map<String, String> attributes = new HashMap<>();
- attributes.put("azure.primaryUri", "https://" + AzureTestUtil.getAccountName() + ".blob.core.windows.net/" + containerName + "/" + AzureTestUtil.TEST_BLOB_NAME);
- attributes.put("azure.blobname", AzureTestUtil.TEST_BLOB_NAME);
- attributes.put("azure.blobtype", AzureStorageUtils.BLOCK);
- runner.enqueue(new byte[0], attributes);
- runner.run();
+ assertResult();
+ }
- runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor.REL_SUCCESS, 1);
- List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS);
- for (MockFlowFile flowFile : flowFilesForRelationship) {
- flowFile.assertContentEquals("0123456789".getBytes());
- flowFile.assertAttributeEquals("azure.length", "10");
- }
- } finally {
- container.deleteIfExists();
+ private void assertResult() throws Exception {
+ runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor.REL_SUCCESS, 1);
+ List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS);
+ for (MockFlowFile flowFile : flowFilesForRelationship) {
+ flowFile.assertContentEquals("0123456789".getBytes());
+ flowFile.assertAttributeEquals("azure.length", "10");
}
}
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
index 311cf71..9806225 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
@@ -16,55 +16,52 @@
*/
package org.apache.nifi.processors.azure.storage;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URISyntaxException;
-import java.security.InvalidKeyException;
-import java.util.UUID;
-
-import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.processor.Processor;
import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
import org.junit.Test;
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.CloudBlob;
-import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import java.util.concurrent.TimeUnit;
+
+public class ITListAzureBlobStorage extends AbstractAzureBlobStorageIT {
-public class ITListAzureBlobStorage {
+ @Override
+ protected Class<? extends Processor> getProcessorClass() {
+ return ListAzureBlobStorage.class;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ uploadTestBlob();
+
+ Thread.sleep(ListAzureBlobStorage.LISTING_LAG_MILLIS.get(TimeUnit.SECONDS));
+ }
@Test
- public void testListsAzureBlobStorageContent() throws InvalidKeyException, StorageException, URISyntaxException, IOException {
- String containerName = String.format("%s-%s", AzureTestUtil.TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID());
- CloudBlobContainer container = AzureTestUtil.getContainer(containerName);
- container.createIfNotExists();
+ public void testListBlobs() {
+ runner.assertValid();
+ runner.run(1);
- CloudBlob blob = container.getBlockBlobReference(AzureTestUtil.TEST_BLOB_NAME);
- byte[] buf = "0123456789".getBytes();
- InputStream in = new ByteArrayInputStream(buf);
- blob.upload(in, 10);
+ assertResult();
+ }
- final TestRunner runner = TestRunners.newTestRunner(new ListAzureBlobStorage());
+ @Test
+ public void testListBlobsUsingCredentialService() throws Exception {
+ configureCredentialsService();
- try {
- runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
- runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
- runner.setProperty(AzureStorageUtils.CONTAINER, containerName);
+ runner.assertValid();
+ runner.run(1);
- // requires multiple runs to deal with List processor checking
- runner.run(3);
+ assertResult();
+ }
- runner.assertTransferCount(ListAzureBlobStorage.REL_SUCCESS, 1);
- runner.assertAllFlowFilesTransferred(ListAzureBlobStorage.REL_SUCCESS, 1);
+ private void assertResult() {
+ runner.assertTransferCount(ListAzureBlobStorage.REL_SUCCESS, 1);
+ runner.assertAllFlowFilesTransferred(ListAzureBlobStorage.REL_SUCCESS, 1);
- for (MockFlowFile entry : runner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS)) {
- entry.assertAttributeEquals("azure.length", "10");
- entry.assertAttributeEquals("mime.type", "application/octet-stream");
- }
- } finally {
- container.deleteIfExists();
+ for (MockFlowFile entry : runner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS)) {
+ entry.assertAttributeEquals("azure.length", "10");
+ entry.assertAttributeEquals("mime.type", "application/octet-stream");
}
}
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java
new file mode 100644
index 0000000..e006c2c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import com.microsoft.azure.storage.blob.ListBlobItem;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public class ITPutAzureBlobStorage extends AbstractAzureBlobStorageIT {
+
+ @Override
+ protected Class<? extends Processor> getProcessorClass() {
+ return PutAzureBlobStorage.class;
+ }
+
+ @Before
+ public void setUp() {
+ runner.setProperty(PutAzureBlobStorage.BLOB, TEST_BLOB_NAME);
+ }
+
+ @Test
+ public void testPutBlob() throws Exception {
+ runner.assertValid();
+ runner.enqueue("0123456789".getBytes());
+ runner.run();
+
+ assertResult();
+ }
+
+ @Test
+ public void testPutBlobUsingCredentialsService() throws Exception {
+ configureCredentialsService();
+
+ runner.assertValid();
+ runner.enqueue("0123456789".getBytes());
+ runner.run();
+
+ assertResult();
+ }
+
+ private void assertResult() throws Exception {
+ runner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1);
+ List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureBlobStorage.REL_SUCCESS);
+ for (MockFlowFile flowFile : flowFilesForRelationship) {
+ flowFile.assertContentEquals("0123456789".getBytes());
+ flowFile.assertAttributeEquals("azure.length", "10");
+ }
+
+ Iterable<ListBlobItem> blobs = container.listBlobs(TEST_BLOB_NAME);
+ assertTrue(blobs.iterator().hasNext());
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java
deleted file mode 100644
index bfa1d4a..0000000
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage;
-
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.security.InvalidKeyException;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Test;
-
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.CloudBlobContainer;
-
-public class ITPutAzureStorageBlob {
-
- @Test
- public void testPuttingBlob() throws IOException, InvalidKeyException, StorageException, URISyntaxException {
- String containerName = String.format("%s-%s", AzureTestUtil.TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID());
- CloudBlobContainer container = AzureTestUtil.getContainer(containerName);
- container.createIfNotExists();
-
- final TestRunner runner = TestRunners.newTestRunner(new PutAzureBlobStorage());
-
- try {
- runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
- runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
- runner.setProperty(AzureStorageUtils.CONTAINER, containerName);
- runner.setProperty(FetchAzureBlobStorage.BLOB, "testingUpload");
-
- runner.enqueue("0123456789".getBytes());
- runner.run();
-
- runner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1);
- List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureBlobStorage.REL_SUCCESS);
- for (MockFlowFile flowFile : flowFilesForRelationship) {
- flowFile.assertContentEquals("0123456789".getBytes());
- flowFile.assertAttributeEquals("azure.length", "10");
- }
- } finally {
- container.deleteIfExists();
- }
- }
-}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorageIT.java
new file mode 100644
index 0000000..43657d8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorageIT.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.queue.CloudQueue;
+import com.microsoft.azure.storage.queue.CloudQueueClient;
+import com.microsoft.azure.storage.queue.CloudQueueMessage;
+import org.apache.nifi.processors.azure.storage.AbstractAzureStorageIT;
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.Iterator;
+import java.util.UUID;
+
+public abstract class AbstractAzureQueueStorageIT extends AbstractAzureStorageIT {
+
+ protected static final String TEST_QUEUE_NAME_PREFIX = "nifi-test-queue";
+
+ protected CloudQueue cloudQueue;
+
+ @Before
+ public void setUpAzureQueueStorageIT() throws Exception {
+ String queueName = String.format("%s-%s", TEST_QUEUE_NAME_PREFIX, UUID.randomUUID());
+ CloudQueueClient cloudQueueClient = getStorageAccount().createCloudQueueClient();
+ cloudQueue = cloudQueueClient.getQueueReference(queueName);
+ cloudQueue.createIfNotExists();
+
+ runner.setProperty(AbstractAzureQueueStorage.QUEUE, queueName);
+ }
+
+ @After
+ public void tearDownAzureQueueStorageIT() throws Exception {
+ cloudQueue.deleteIfExists();
+ }
+
+ protected int getMessageCount() throws Exception {
+ Iterator<CloudQueueMessage> retrievedMessages = cloudQueue.retrieveMessages(10, 1, null, null).iterator();
+ int count = 0;
+
+ while (retrievedMessages.hasNext()) {
+ retrievedMessages.next();
+ count++;
+ }
+
+ return count;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorageIT.java
index 1711bbd..a446f35 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorageIT.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorageIT.java
@@ -17,133 +17,119 @@
package org.apache.nifi.processors.azure.storage.queue;
import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.queue.CloudQueue;
import com.microsoft.azure.storage.queue.CloudQueueMessage;
-import org.apache.nifi.processors.azure.storage.AzureTestUtil;
+import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.AfterClass;
import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
import org.junit.Test;
-import java.net.URISyntaxException;
-import java.security.InvalidKeyException;
import java.util.List;
-public class GetAzureQueueStorageIT {
+public class GetAzureQueueStorageIT extends AbstractAzureQueueStorageIT {
- private final TestRunner runner = TestRunners.newTestRunner(GetAzureQueueStorage.class);
- private static CloudQueue cloudQueue;
+ @Override
+ protected Class<? extends Processor> getProcessorClass() {
+ return GetAzureQueueStorage.class;
+ }
- @BeforeClass
- public static void setup() throws InvalidKeyException, StorageException, URISyntaxException {
- cloudQueue = AzureTestUtil.getQueue(AzureTestUtil.TEST_STORAGE_QUEUE);
- cloudQueue.createIfNotExists();
+ @Before
+ public void setUp() throws StorageException {
+ cloudQueue.addMessage(new CloudQueueMessage("Dummy Message 1"), 604800, 0, null, null);
+ cloudQueue.addMessage(new CloudQueueMessage("Dummy Message 2"), 604800, 0, null, null);
+ cloudQueue.addMessage(new CloudQueueMessage("Dummy Message 3"), 604800, 0, null, null);
}
@Test
- public void testGetWithAutoDeleteFalse() throws StorageException, InterruptedException {
- cloudQueue.clear();
- insertDummyMessages();
-
- runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
- runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
- runner.setProperty(GetAzureQueueStorage.QUEUE, AzureTestUtil.TEST_STORAGE_QUEUE);
- runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "10");
- runner.setProperty(GetAzureQueueStorage.AUTO_DELETE, "false");
- runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "1 secs");
-
+ public void testSimpleGet() throws Exception {
+ runner.assertValid();
runner.run(1);
- final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(GetAzureQueueStorage.REL_SUCCESS);
- Assert.assertFalse(mockFlowFiles.isEmpty());
-
- Thread.sleep(1500);
- cloudQueue.downloadAttributes();
- Assert.assertEquals(3, cloudQueue.getApproximateMessageCount());
+ assertResult(0);
}
@Test
- public void testGetWithELAndAutoDeleteTrue() throws StorageException, InterruptedException {
- cloudQueue.clear();
- insertDummyMessages();
+ public void testSimpleGetWithCredentialsService() throws Exception {
+ configureCredentialsService();
+ runner.assertValid();
+ runner.run(1);
+
+ assertResult(0);
+ }
+
+ @Test
+ public void testSimpleGetWithEL() throws Exception {
runner.setValidateExpressionUsage(true);
- runner.setVariable("account.name", AzureTestUtil.getAccountName());
- runner.setVariable("account.key", AzureTestUtil.getAccountKey());
- runner.setVariable("queue.name", AzureTestUtil.TEST_STORAGE_QUEUE);
+ runner.setVariable("account.name", getAccountName());
+ runner.setVariable("account.key", getAccountKey());
+ runner.setVariable("queue.name", cloudQueue.getName());
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "${account.name}");
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "${account.key}");
runner.setProperty(GetAzureQueueStorage.QUEUE, "${queue.name}");
- runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "10");
- runner.setProperty(GetAzureQueueStorage.AUTO_DELETE, "true");
- runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "1 secs");
+ runner.assertValid();
runner.run(1);
- final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(GetAzureQueueStorage.REL_SUCCESS);
- Assert.assertFalse(mockFlowFiles.isEmpty());
+ assertResult(0);
+ }
- Thread.sleep(1500);
- cloudQueue.downloadAttributes();
- Assert.assertEquals(0, cloudQueue.getApproximateMessageCount());
+ @Test
+ public void testGetWithAutoDeleteFalse() throws Exception {
+ runner.setProperty(GetAzureQueueStorage.AUTO_DELETE, "false");
+
+ runner.assertValid();
+ runner.run(1);
+
+ assertResult(3);
}
@Test
- public void testGetWithVisibilityTimeout() throws StorageException, InterruptedException {
- cloudQueue.clear();
- insertDummyMessages();
-
- runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
- runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
- runner.setProperty(GetAzureQueueStorage.QUEUE, AzureTestUtil.TEST_STORAGE_QUEUE);
- runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "10");
+ public void testGetWithVisibilityTimeout() throws Exception {
runner.setProperty(GetAzureQueueStorage.AUTO_DELETE, "false");
runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "1 secs");
+ runner.assertValid();
runner.run(1);
- final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(GetAzureQueueStorage.REL_SUCCESS);
- Assert.assertFalse(mockFlowFiles.isEmpty());
- Assert.assertEquals(0, AzureTestUtil.getQueueCount());
+ runner.assertAllFlowFilesTransferred(GetAzureQueueStorage.REL_SUCCESS, 3);
+ Assert.assertEquals(0, getMessageCount());
Thread.sleep(1500);
- Assert.assertEquals(3, AzureTestUtil.getQueueCount());
+ Assert.assertEquals(3, getMessageCount());
}
@Test
- public void testGetWithBatchSize() throws StorageException {
- cloudQueue.clear();
- insertDummyMessages();
-
- runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
- runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
- runner.setProperty(GetAzureQueueStorage.QUEUE, AzureTestUtil.TEST_STORAGE_QUEUE);
+ public void testGetWithBatchSize() throws Exception {
runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "2");
- runner.setProperty(GetAzureQueueStorage.AUTO_DELETE, "true");
- runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "1 secs");
+ runner.assertValid();
runner.run(1);
+
runner.assertAllFlowFilesTransferred(GetAzureQueueStorage.REL_SUCCESS, 2);
+ cloudQueue.downloadAttributes();
+ Assert.assertEquals(1, cloudQueue.getApproximateMessageCount());
runner.run(1);
- runner.assertAllFlowFilesTransferred(GetAzureQueueStorage.REL_SUCCESS, 3);
+ runner.assertAllFlowFilesTransferred(GetAzureQueueStorage.REL_SUCCESS, 3);
+ cloudQueue.downloadAttributes();
+ Assert.assertEquals(0, cloudQueue.getApproximateMessageCount());
}
- private static void insertDummyMessages() throws StorageException {
- cloudQueue.addMessage(new CloudQueueMessage("Dummy Message 1"), 604800, 0, null, null);
- cloudQueue.addMessage(new CloudQueueMessage("Dummy Message 2"), 604800, 0, null, null);
- cloudQueue.addMessage(new CloudQueueMessage("Dummy Message 3"), 604800, 0, null, null);
- }
+ private void assertResult(int expectedMessageCountInQueue) throws Exception {
+ runner.assertAllFlowFilesTransferred(GetAzureQueueStorage.REL_SUCCESS, 3);
- @AfterClass
- public static void cleanup() throws StorageException {
- cloudQueue.deleteIfExists();
+ List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(GetAzureQueueStorage.REL_SUCCESS);
+ int i = 1;
+ for (MockFlowFile mockFlowFile : mockFlowFiles) {
+ mockFlowFile.assertContentEquals("Dummy Message " + i++);
+ }
+
+ cloudQueue.downloadAttributes();
+ Assert.assertEquals(expectedMessageCountInQueue, cloudQueue.getApproximateMessageCount());
}
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorageIT.java
index e02f16d..7d6fe9d 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorageIT.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorageIT.java
@@ -1,118 +1,99 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage.queue;
-
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.queue.CloudQueue;
-import org.apache.nifi.processors.azure.storage.AzureTestUtil;
-import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.net.URISyntaxException;
-import java.security.InvalidKeyException;
-
-public class PutAzureQueueStorageIT {
-
- private final TestRunner runner = TestRunners.newTestRunner(PutAzureQueueStorage.class);
- private static CloudQueue cloudQueue;
-
- @BeforeClass
- public static void setup() throws InvalidKeyException, StorageException, URISyntaxException {
- cloudQueue = AzureTestUtil.getQueue(AzureTestUtil.TEST_STORAGE_QUEUE);
- cloudQueue.createIfNotExists();
- }
-
- @Test
- public void testSimplePut() throws InvalidKeyException, StorageException, URISyntaxException {
- runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
- runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
- runner.setProperty(PutAzureQueueStorage.QUEUE, AzureTestUtil.TEST_STORAGE_QUEUE);
-
- runner.enqueue("Dummy message");
- runner.run(1);
-
- runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
- }
-
- @Test
- public void testSimplePutWithEL() throws StorageException, URISyntaxException, InvalidKeyException {
- runner.setValidateExpressionUsage(true);
-
- runner.setVariable("account.name", AzureTestUtil.getAccountName());
- runner.setVariable("account.key", AzureTestUtil.getAccountKey());
- runner.setVariable("queue.name", AzureTestUtil.TEST_STORAGE_QUEUE);
-
- runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "${account.name}");
- runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "${account.key}");
- runner.setProperty(PutAzureQueueStorage.QUEUE, "${queue.name}");
-
- runner.enqueue("Dummy message");
- runner.run(1);
-
- runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
- }
-
- @Test
- public void testPutWithTTL() throws StorageException, InterruptedException {
- cloudQueue.clear();
-
- runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
- runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
- runner.setProperty(PutAzureQueueStorage.QUEUE, AzureTestUtil.TEST_STORAGE_QUEUE);
- runner.setProperty(PutAzureQueueStorage.TTL, "2 secs");
-
- runner.enqueue("Dummy message");
- runner.run();
-
- runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
- Assert.assertEquals(1, AzureTestUtil.getQueueCount());
-
- Thread.sleep(2400);
- Assert.assertEquals(0, AzureTestUtil.getQueueCount());
- }
-
- @Test
- public void testPutWithVisibilityDelay() throws StorageException, InterruptedException {
- cloudQueue.clear();
-
- cloudQueue.clear();
-
- runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
- runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
- runner.setProperty(PutAzureQueueStorage.QUEUE, AzureTestUtil.TEST_STORAGE_QUEUE);
- runner.setProperty(PutAzureQueueStorage.VISIBILITY_DELAY, "2 secs");
-
- runner.enqueue("Dummy message");
- runner.run(1);
-
- runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
- Assert.assertEquals(0, AzureTestUtil.getQueueCount());
-
- Thread.sleep(2400);
- Assert.assertEquals(1, AzureTestUtil.getQueueCount());
- }
-
- @AfterClass
- public static void cleanup() throws StorageException {
- cloudQueue.deleteIfExists();
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PutAzureQueueStorageIT extends AbstractAzureQueueStorageIT {
+
+ @Override
+ protected Class<? extends Processor> getProcessorClass() {
+ return PutAzureQueueStorage.class;
+ }
+
+ @Test
+ public void testSimplePut() {
+ runner.assertValid();
+ runner.enqueue("Dummy message");
+ runner.run(1);
+
+ runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
+ }
+
+ @Test
+ public void testSimplePutWithCredentialsService() throws Exception {
+ configureCredentialsService();
+
+ runner.assertValid();
+ runner.enqueue("Dummy message");
+ runner.run(1);
+
+ runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
+ }
+
+ @Test
+ public void testSimplePutWithEL() {
+ runner.setValidateExpressionUsage(true);
+
+ runner.setVariable("account.name", getAccountName());
+ runner.setVariable("account.key", getAccountKey());
+ runner.setVariable("queue.name", cloudQueue.getName());
+
+ runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "${account.name}");
+ runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "${account.key}");
+ runner.setProperty(PutAzureQueueStorage.QUEUE, "${queue.name}");
+
+ runner.assertValid();
+ runner.enqueue("Dummy message");
+ runner.run(1);
+
+ runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
+ }
+
+ @Test
+ public void testPutWithTTL() throws Exception {
+ runner.setProperty(PutAzureQueueStorage.TTL, "2 secs");
+
+ runner.assertValid();
+ runner.enqueue("Dummy message");
+ runner.run(1);
+
+ runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
+ Assert.assertEquals(1, getMessageCount());
+
+ Thread.sleep(2400);
+ Assert.assertEquals(0, getMessageCount());
+ }
+
+ @Test
+ public void testPutWithVisibilityDelay() throws Exception {
+ runner.setProperty(PutAzureQueueStorage.VISIBILITY_DELAY, "2 secs");
+
+ runner.assertValid();
+ runner.enqueue("Dummy message");
+ runner.run(1);
+
+ runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
+ Assert.assertEquals(0, getMessageCount());
+
+ Thread.sleep(2400);
+ Assert.assertEquals(1, getMessageCount());
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java
new file mode 100644
index 0000000..bdc360b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
+import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
+import com.microsoft.azure.storage.core.Base64;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.azure.storage.ListAzureBlobStorage;
+import org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService;
+import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.apache.nifi.util.MockProcessContext;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestAzureStorageUtilsGetStorageCredentialsDetails {
+
+ private static final String CREDENTIALS_SERVICE_VALUE = "CredentialsService";
+ private static final String ACCOUNT_NAME_VALUE = "AccountName";
+ private static final String ACCOUNT_KEY_VALUE = Base64.encode("AccountKey".getBytes());
+ private static final String SAS_TOKEN_VALUE = "SasToken";
+
+ private MockProcessContext processContext;
+
+ @Before
+ public void setUp() {
+ Processor processor = new ListAzureBlobStorage();
+ processContext = new MockProcessContext(processor);
+ }
+
+ @Test
+ public void testAccountNameAndAccountKeyConfiguredOnProcessor() {
+ configureProcessorProperties(ACCOUNT_NAME_VALUE, ACCOUNT_KEY_VALUE, null);
+
+ AzureStorageCredentialsDetails storageCredentialsDetails = AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
+
+ assertStorageCredentialsDetailsAccountNameAndAccountKey(storageCredentialsDetails);
+ }
+
+ @Test
+ public void testAccountNameAndSasTokenConfiguredOnProcessor() {
+ configureProcessorProperties(ACCOUNT_NAME_VALUE, null, SAS_TOKEN_VALUE);
+
+ AzureStorageCredentialsDetails storageCredentialsDetails = AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
+
+ assertStorageCredentialsDetailsAccountNameAndSasToken(storageCredentialsDetails);
+ }
+
+ @Test
+ public void testAccountNameAndAccountKeyConfiguredOnControllerService() {
+ configureControllerService(ACCOUNT_NAME_VALUE, ACCOUNT_KEY_VALUE, null);
+
+ AzureStorageCredentialsDetails storageCredentialsDetails = AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
+
+ assertStorageCredentialsDetailsAccountNameAndAccountKey(storageCredentialsDetails);
+ }
+
+ @Test
+ public void testAccountNameAndSasTokenConfiguredOnControllerService() {
+ configureControllerService(ACCOUNT_NAME_VALUE, null, SAS_TOKEN_VALUE);
+
+ AzureStorageCredentialsDetails storageCredentialsDetails = AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
+
+ assertStorageCredentialsDetailsAccountNameAndSasToken(storageCredentialsDetails);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testAccountNameMissingConfiguredOnProcessor() {
+ configureProcessorProperties(null, ACCOUNT_KEY_VALUE, null);
+
+ AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testAccountKeyAndSasTokenMissingConfiguredOnProcessor() {
+ configureProcessorProperties(ACCOUNT_NAME_VALUE, null, null);
+
+ AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testAccountNameMissingConfiguredOnControllerService() {
+ configureControllerService(null, ACCOUNT_KEY_VALUE, null);
+
+ AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testAccountKeyAndSasTokenMissingConfiguredOnControllerService() {
+ configureControllerService(ACCOUNT_NAME_VALUE, null, null);
+
+ AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
+ }
+
+ private void configureProcessorProperties(String accountName, String accountKey, String sasToken) {
+ if (accountName != null) {
+ processContext.setProperty(AzureStorageUtils.ACCOUNT_NAME, ACCOUNT_NAME_VALUE);
+ }
+ if (accountKey != null) {
+ processContext.setProperty(AzureStorageUtils.ACCOUNT_KEY, ACCOUNT_KEY_VALUE);
+ }
+ if (sasToken != null) {
+ processContext.setProperty(AzureStorageUtils.PROP_SAS_TOKEN, sasToken);
+ }
+ }
+
+ private void configureControllerService(String accountName, String accountKey, String sasToken) {
+ AzureStorageCredentialsControllerService credentialsService = new AzureStorageCredentialsControllerService();
+
+ Map<PropertyDescriptor, String> properties = new HashMap<>();
+ if (accountName != null) {
+ properties.put(AzureStorageUtils.ACCOUNT_NAME, accountName);
+ }
+ if (accountKey != null) {
+ properties.put(AzureStorageUtils.ACCOUNT_KEY, accountKey);
+ }
+ if (sasToken != null) {
+ properties.put(AzureStorageUtils.PROP_SAS_TOKEN, sasToken);
+ }
+
+ MockConfigurationContext configurationContext = new MockConfigurationContext(properties, null);
+ credentialsService.onEnabled(configurationContext);
+
+ processContext.addControllerService(credentialsService, CREDENTIALS_SERVICE_VALUE);
+ processContext.setProperty(AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, CREDENTIALS_SERVICE_VALUE);
+ }
+
+ private void assertStorageCredentialsDetailsAccountNameAndAccountKey(AzureStorageCredentialsDetails storageCredentialsDetails) {
+ assertEquals(ACCOUNT_NAME_VALUE, storageCredentialsDetails.getStorageAccountName());
+ assertTrue(storageCredentialsDetails.getStorageCredentials() instanceof StorageCredentialsAccountAndKey);
+ StorageCredentialsAccountAndKey storageCredentials = (StorageCredentialsAccountAndKey) storageCredentialsDetails.getStorageCredentials();
+ assertEquals(ACCOUNT_NAME_VALUE, storageCredentials.getAccountName());
+ assertEquals(ACCOUNT_KEY_VALUE, storageCredentials.exportBase64EncodedKey());
+ }
+
+ private void assertStorageCredentialsDetailsAccountNameAndSasToken(AzureStorageCredentialsDetails storageCredentialsDetails) {
+ assertEquals(ACCOUNT_NAME_VALUE, storageCredentialsDetails.getStorageAccountName());
+ assertTrue(storageCredentialsDetails.getStorageCredentials() instanceof StorageCredentialsSharedAccessSignature);
+ StorageCredentialsSharedAccessSignature storageCredentials = (StorageCredentialsSharedAccessSignature) storageCredentialsDetails.getStorageCredentials();
+ assertEquals(SAS_TOKEN_VALUE, storageCredentials.getToken());
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsValidateCredentialProperties.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsValidateCredentialProperties.java
new file mode 100644
index 0000000..95bd130
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsValidateCredentialProperties.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.azure.storage.ListAzureBlobStorage;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.MockValidationContext;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestAzureStorageUtilsValidateCredentialProperties {
+
+ private static final String CREDENTIALS_SERVICE_VALUE = "CredentialsService";
+ private static final String ACCOUNT_NAME_VALUE = "AccountName";
+ private static final String ACCOUNT_KEY_VALUE = "AccountKey";
+ private static final String SAS_TOKEN_VALUE = "SasToken";
+
+ private MockProcessContext processContext;
+ private MockValidationContext validationContext;
+
+ @Before
+ public void setUp() {
+ Processor processor = new ListAzureBlobStorage();
+ processContext = new MockProcessContext(processor);
+ validationContext = new MockValidationContext(processContext);
+ }
+
+ @Test
+ public void testValidWithCredentialsService() {
+ configureCredentialsService();
+
+ Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
+
+ assertValid(result);
+ }
+
+ @Test
+ public void testValidWithAccountNameAndAccountKey() {
+ configureAccountName();
+ configureAccountKey();
+
+ Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
+
+ assertValid(result);
+ }
+
+ @Test
+ public void testValidWithAccountNameAndSasToken() {
+ configureAccountName();
+ configureSasToken();
+
+ Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
+
+ assertValid(result);
+ }
+
+ @Test
+ public void testNotValidBecauseNothingSpecified() {
+ Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
+
+ assertNotValid(result);
+ }
+
+ @Test
+ public void testNotValidBecauseBothCredentialsServiceAndAccountNameSpecified() {
+ configureCredentialsService();
+ configureAccountName();
+
+ Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
+
+ assertNotValid(result);
+ }
+
+ @Test
+ public void testNotValidBecauseBothCredentialsServiceAndAccountKeySpecified() {
+ configureCredentialsService();
+ configureAccountKey();
+
+ Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
+
+ assertNotValid(result);
+ }
+
+ @Test
+ public void testNotValidBecauseBothCredentialsServiceAndSasTokenSpecified() {
+ configureCredentialsService();
+ configureSasToken();
+
+ Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
+
+ assertNotValid(result);
+ }
+
+ @Test
+ public void testNotValidBecauseAccountNameSpecifiedWithoutAccountKeyOrSasToken() {
+ configureAccountName();
+
+ Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
+
+ assertNotValid(result);
+ }
+
+ @Test
+ public void testNotValidBecauseAccountNameSpecifiedWithBothAccountKeyAndSasToken() {
+ configureAccountName();
+ configureAccountKey();
+ configureSasToken();
+
+ Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
+
+ assertNotValid(result);
+ }
+
+ private void configureCredentialsService() {
+ processContext.setProperty(AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, CREDENTIALS_SERVICE_VALUE);
+ }
+
+ private void configureAccountName() {
+ processContext.setProperty(AzureStorageUtils.ACCOUNT_NAME, ACCOUNT_NAME_VALUE);
+ }
+
+ private void configureAccountKey() {
+ processContext.setProperty(AzureStorageUtils.ACCOUNT_KEY, ACCOUNT_KEY_VALUE);
+ }
+
+ private void configureSasToken() {
+ processContext.setProperty(AzureStorageUtils.PROP_SAS_TOKEN, SAS_TOKEN_VALUE);
+ }
+
+ private void assertValid(Collection<ValidationResult> result) {
+ assertTrue("There should be no validation error", result.isEmpty());
+ }
+
+ private void assertNotValid(Collection<ValidationResult> result) {
+ assertFalse("There should be validation error", result.isEmpty());
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerService.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerService.java
new file mode 100644
index 0000000..18fbed8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerService.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.storage;
+
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAzureStorageCredentialsControllerService {
+
+ private static final String ACCOUNT_NAME_VALUE = "AccountName";
+ private static final String ACCOUNT_KEY_VALUE = "AccountKey";
+ private static final String SAS_TOKEN_VALUE = "SasToken";
+
+ private TestRunner runner;
+ private AzureStorageCredentialsService credentialsService;
+
+ @Before
+ public void setUp() throws InitializationException {
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
+ credentialsService = new AzureStorageCredentialsControllerService();
+ runner.addControllerService("credentials-service", credentialsService);
+ }
+
+ @Test
+ public void testValidWithAccountNameAndAccountKey() {
+ configureAccountName();
+ configureAccountKey();
+
+ runner.assertValid(credentialsService);
+ }
+
+ @Test
+ public void testValidWithAccountNameAndSasToken() {
+ configureAccountName();
+ configureSasToken();
+
+ runner.assertValid(credentialsService);
+ }
+
+ @Test
+ public void testNotValidBecauseAccountNameMissing() {
+ configureAccountKey();
+
+ runner.assertNotValid(credentialsService);
+ }
+
+ @Test
+ public void testNotValidBecauseAccountKeyAndSasTokenMissing() {
+ configureAccountName();
+
+ runner.assertNotValid(credentialsService);
+ }
+
+ @Test
+ public void testNotValidBecauseBothAccountKeyAndSasTokenSpecified() {
+ configureAccountName();
+ configureAccountKey();
+ configureSasToken();
+
+ runner.assertNotValid(credentialsService);
+ }
+
+ private void configureAccountName() {
+ runner.setProperty(credentialsService, AzureStorageCredentialsControllerService.ACCOUNT_NAME, ACCOUNT_NAME_VALUE);
+ }
+
+ private void configureAccountKey() {
+ runner.setProperty(credentialsService, AzureStorageUtils.ACCOUNT_KEY, ACCOUNT_KEY_VALUE);
+ }
+
+ private void configureSasToken() {
+ runner.setProperty(credentialsService, AzureStorageUtils.PROP_SAS_TOKEN, SAS_TOKEN_VALUE);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerServiceLookup.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerServiceLookup.java
new file mode 100644
index 0000000..72471bf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerServiceLookup.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.storage;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TestAzureStorageCredentialsControllerServiceLookup {
+
+ private MockAzureStorageCredentialsService serviceA;
+ private MockAzureStorageCredentialsService serviceB;
+
+ private AzureStorageCredentialsControllerServiceLookup lookupService;
+ private TestRunner runner;
+
+ @Before
+ public void setup() throws InitializationException {
+ serviceA = new MockAzureStorageCredentialsService(new AzureStorageCredentialsDetails("Account_A", null));
+ serviceB = new MockAzureStorageCredentialsService(new AzureStorageCredentialsDetails("Account_B", null));
+
+ lookupService = new AzureStorageCredentialsControllerServiceLookup();
+
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
+
+ final String serviceAIdentifier = "service-a";
+ runner.addControllerService(serviceAIdentifier, serviceA);
+
+ final String serviceBIdentifier = "service-b";
+ runner.addControllerService(serviceBIdentifier, serviceB);
+
+ runner.addControllerService("lookup-service", lookupService);
+ runner.setProperty(lookupService, "a", serviceAIdentifier);
+ runner.setProperty(lookupService, "b", serviceBIdentifier);
+
+ runner.enableControllerService(serviceA);
+ runner.enableControllerService(serviceB);
+ runner.enableControllerService(lookupService);
+ }
+
+ @Test
+ public void testLookupServiceA() {
+ final Map<String,String> attributes = new HashMap<>();
+ attributes.put(AzureStorageCredentialsControllerServiceLookup.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE, "a");
+
+ final AzureStorageCredentialsDetails storageCredentialsDetails = lookupService.getStorageCredentialsDetails(attributes);
+ assertNotNull(storageCredentialsDetails);
+ assertEquals("Account_A", storageCredentialsDetails.getStorageAccountName());
+ }
+
+ @Test
+ public void testLookupServiceB() {
+ final Map<String,String> attributes = new HashMap<>();
+ attributes.put(AzureStorageCredentialsControllerServiceLookup.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE, "b");
+
+ final AzureStorageCredentialsDetails storageCredentialsDetails = lookupService.getStorageCredentialsDetails(attributes);
+ assertNotNull(storageCredentialsDetails);
+ assertEquals("Account_B", storageCredentialsDetails.getStorageAccountName());
+ }
+
+ @Test(expected = ProcessException.class)
+ public void testLookupMissingCredentialsNameAttribute() {
+ final Map<String,String> attributes = new HashMap<>();
+ lookupService.getStorageCredentialsDetails(attributes);
+ }
+
+ @Test(expected = ProcessException.class)
+ public void testLookupWithCredentialsNameThatDoesNotExist() {
+ final Map<String,String> attributes = new HashMap<>();
+ attributes.put(AzureStorageCredentialsControllerServiceLookup.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE, "DOES-NOT-EXIST");
+ lookupService.getStorageCredentialsDetails(attributes);
+ }
+
+ @Test
+ public void testCustomValidateAtLeaseOneServiceDefined() throws InitializationException {
+ // enable lookup service with no services registered, verify not valid
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
+ runner.addControllerService("lookup-service", lookupService);
+ runner.assertNotValid(lookupService);
+
+ final String serviceAIdentifier = "service-a";
+ runner.addControllerService(serviceAIdentifier, serviceA);
+
+ // register a service and now verify valid
+ runner.setProperty(lookupService, "a", serviceAIdentifier);
+ runner.enableControllerService(lookupService);
+ runner.assertValid(lookupService);
+ }
+
+ @Test
+ public void testCustomValidateSelfReferenceNotAllowed() throws InitializationException {
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
+ runner.addControllerService("lookup-service", lookupService);
+ runner.setProperty(lookupService, "lookup-service", "lookup-service");
+ runner.assertNotValid(lookupService);
+ }
+
+ /**
+ * A mock AzureStorageCredentialsService that will always return the passed in AzureStorageCredentialsDetails.
+ */
+ private static class MockAzureStorageCredentialsService extends AbstractControllerService implements AzureStorageCredentialsService {
+
+ private AzureStorageCredentialsDetails storageCredentialsDetails;
+
+ MockAzureStorageCredentialsService(AzureStorageCredentialsDetails storageCredentialsDetails) {
+ this.storageCredentialsDetails = storageCredentialsDetails;
+ }
+
+ @Override
+ public AzureStorageCredentialsDetails getStorageCredentialsDetails(Map<String, String> attributes) {
+ return storageCredentialsDetails;
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/resources/hello.txt b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/resources/hello.txt
deleted file mode 100644
index ee13cb7..0000000
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/resources/hello.txt
+++ /dev/null
@@ -1 +0,0 @@
-Hello, World!!
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api-nar/pom.xml
similarity index 81%
copy from nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
copy to nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api-nar/pom.xml
index 09863c5..a22754b 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api-nar/pom.xml
@@ -13,17 +13,19 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <groupId>org.apache.nifi</groupId>
<artifactId>nifi-azure-bundle</artifactId>
+ <groupId>org.apache.nifi</groupId>
<version>1.10.0-SNAPSHOT</version>
</parent>
+ <modelVersion>4.0.0</modelVersion>
- <artifactId>nifi-azure-nar</artifactId>
+ <artifactId>nifi-azure-services-api-nar</artifactId>
<packaging>nar</packaging>
+
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
@@ -32,16 +34,14 @@
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-azure-processors</artifactId>
+ <artifactId>nifi-standard-services-api-nar</artifactId>
<version>1.10.0-SNAPSHOT</version>
+ <type>nar</type>
</dependency>
-
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-standard-services-api-nar</artifactId>
+ <artifactId>nifi-azure-services-api</artifactId>
<version>1.10.0-SNAPSHOT</version>
- <type>nar</type>
- </dependency>
+ </dependency>
</dependencies>
-
</project>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml
similarity index 61%
copy from nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
copy to nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml
index 09863c5..f6638be 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml
@@ -13,35 +13,32 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <groupId>org.apache.nifi</groupId>
<artifactId>nifi-azure-bundle</artifactId>
+ <groupId>org.apache.nifi</groupId>
<version>1.10.0-SNAPSHOT</version>
</parent>
+ <modelVersion>4.0.0</modelVersion>
- <artifactId>nifi-azure-nar</artifactId>
- <packaging>nar</packaging>
- <properties>
- <maven.javadoc.skip>true</maven.javadoc.skip>
- <source.skip>true</source.skip>
- </properties>
+ <artifactId>nifi-azure-services-api</artifactId>
<dependencies>
<dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-azure-processors</artifactId>
- <version>1.10.0-SNAPSHOT</version>
+ <groupId>com.microsoft.azure</groupId>
+ <artifactId>azure-storage</artifactId>
+ </dependency>
+ <!-- overriding jackson-core in azure-storage -->
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>${jackson.version}</version>
</dependency>
-
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-standard-services-api-nar</artifactId>
- <version>1.10.0-SNAPSHOT</version>
- <type>nar</type>
- </dependency>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
</dependencies>
-
</project>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsDetails.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsDetails.java
new file mode 100644
index 0000000..e95e486
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsDetails.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.storage;
+
+import com.microsoft.azure.storage.StorageCredentials;
+
+public class AzureStorageCredentialsDetails {
+
+ private final String storageAccountName;
+
+ private final StorageCredentials storageCredentials;
+
+ public AzureStorageCredentialsDetails(String storageAccountName, StorageCredentials storageCredentials) {
+ this.storageAccountName = storageAccountName;
+ this.storageCredentials = storageCredentials;
+ }
+
+ public String getStorageAccountName() {
+ return storageAccountName;
+ }
+
+ public StorageCredentials getStorageCredentials() {
+ return storageCredentials;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsService.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsService.java
new file mode 100644
index 0000000..e281e75
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsService.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.storage;
+
+import org.apache.nifi.controller.ControllerService;
+
+import java.util.Map;
+
+/**
+ * AzureStorageCredentialsService interface to support getting Storage Account Name and Storage Credentials
+ * used for instantiating Azure Storage clients.
+ */
+public interface AzureStorageCredentialsService extends ControllerService {
+
+ /**
+ * Get AzureStorageCredentialsDetails object which contains the Storage Account Name and the Storage Credentials
+ * @param attributes FlowFile attributes (typically)
+ * @return AzureStorageCredentialsDetails object
+ */
+ AzureStorageCredentialsDetails getStorageCredentialsDetails(Map<String, String> attributes);
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/pom.xml
index cd2dc56..722a5dd 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/pom.xml
@@ -25,9 +25,30 @@
<artifactId>nifi-azure-bundle</artifactId>
<packaging>pom</packaging>
+ <properties>
+ <azure-storage.version>8.4.0</azure-storage.version>
+ </properties>
+
<modules>
<module>nifi-azure-processors</module>
<module>nifi-azure-nar</module>
+ <module>nifi-azure-services-api</module>
+ <module>nifi-azure-services-api-nar</module>
</modules>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>com.microsoft.azure</groupId>
+ <artifactId>azure-storage</artifactId>
+ <version>${azure-storage.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
</project>