You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by br...@apache.org on 2017/05/02 18:40:59 UTC

[2/5] nifi git commit: NIFI-1833 - Addressed issues from PR review.

NIFI-1833 - Addressed issues from PR review.

Addressed dependency issues from the review.
Addressed a checkstyle issue.
Review: reworded the descriptions.
Review: implemented the reset condition logic.
Review: dropped static qualifier from method signatures, not required really
Review: removed sys.out, inlined a single method to get access to the ProcessContext.getName()
Switched to HTTPS as per MSFT recommendation. Some DRY. Dropped cruft.
Addressing review suggestions from 4/5
Review: documentation improvements
Review: documentation improvements

This closes #1636.

Signed-off-by: Bryan Rosander <br...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f30c8169
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f30c8169
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f30c8169

Branch: refs/heads/master
Commit: f30c8169abe8705911b0cdac18aa90f3a217dc78
Parents: 3488a16
Author: Andrew Grande <ap...@gmail.com>
Authored: Tue Apr 4 14:57:21 2017 -0400
Committer: Bryan Rosander <br...@apache.org>
Committed: Tue May 2 14:39:37 2017 -0400

----------------------------------------------------------------------
 .../nifi-azure-bundle/nifi-azure-nar/pom.xml    |  2 +-
 .../nifi-azure-processors/pom.xml               |  1 +
 .../azure/AbstractAzureBlobProcessor.java       |  6 +-
 .../azure/AbstractAzureProcessor.java           | 12 ++--
 .../nifi/processors/azure/AzureConstants.java   |  3 +
 .../azure/storage/FetchAzureBlobStorage.java    | 20 +++---
 .../azure/storage/ListAzureBlobStorage.java     | 73 ++++++++------------
 .../azure/storage/PutAzureBlobStorage.java      | 40 +++++------
 .../azure/storage/utils/BlobInfo.java           |  2 +-
 .../additionalDetails.html                      | 39 +++++++++++
 .../additionalDetails.html                      | 39 +++++++++++
 .../additionalDetails.html                      | 39 +++++++++++
 .../azure/storage/AbstractAzureIT.java          | 35 ++++------
 .../azure/storage/ITFetchAzureBlobStorage.java  |  3 +-
 14 files changed, 205 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
index f75bb7f..e6c3c9b 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
@@ -38,7 +38,7 @@
         
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <artifactId>nifi-standard-nar</artifactId>
             <type>nar</type>
         </dependency>  
     </dependencies>

http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index 8330bcc..9b4f28b 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -85,6 +85,7 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-standard-processors</artifactId>
             <version>${project.version}</version>
+            <scope>provided</scope>
         </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
index 82eae12..2026711 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
@@ -27,13 +27,13 @@ public abstract class AbstractAzureBlobProcessor extends AbstractAzureProcessor
 
     public static final PropertyDescriptor BLOB = new PropertyDescriptor.Builder().name("Blob").description("The filename of the blob").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(true).required(true).defaultValue("${azure.blobname}").build();
-    
-    public static final List<PropertyDescriptor> properties = Collections
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections
             .unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB));
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return properties;
+        return PROPERTIES;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
index 5ab1f8b..c95ee99 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
@@ -32,8 +32,8 @@ import com.microsoft.azure.storage.CloudStorageAccount;
 
 public abstract class AbstractAzureProcessor extends AbstractProcessor {
 
-    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are received are routed to success").build();
-    protected static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any failed fetches will be transferred to the failure relation.").build();
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All successfully processed FlowFiles are routed to this relationship").build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Unsuccessful operations will be transferred to the failure relationship.").build();
     public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
 
     protected CloudStorageAccount createStorageConnection(ProcessContext context) {
@@ -49,7 +49,7 @@ public abstract class AbstractAzureProcessor extends AbstractProcessor {
     }
 
     private CloudStorageAccount createStorageConnectionFromNameAndKey(String accountName, String accountKey) {
-        final String storageConnectionString = String.format("DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s", accountName, accountKey);
+        final String storageConnectionString = String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, accountName, accountKey);
         try {
             return createStorageAccountFromConnectionString(storageConnectionString);
         } catch (InvalidKeyException | IllegalArgumentException | URISyntaxException e) {
@@ -65,13 +65,11 @@ public abstract class AbstractAzureProcessor extends AbstractProcessor {
      * @return The newly created CloudStorageAccount object
      *
      */
-    protected static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException {
+    private static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException {
         CloudStorageAccount storageAccount;
         try {
             storageAccount = CloudStorageAccount.parse(storageConnectionString);
-        } catch (IllegalArgumentException | URISyntaxException e) {
-            throw e;
-        } catch (InvalidKeyException e) {
+        } catch (IllegalArgumentException | URISyntaxException | InvalidKeyException e) {
             throw e;
         }
         return storageAccount;

http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java
index eaa234c..9a51030 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java
@@ -32,6 +32,9 @@ public final class AzureConstants {
     public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder().name("Container name").description("Name of the azure storage container")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).build();
 
+    // use HTTPS by default as per MSFT recommendation
+    public static final String FORMAT_DEFAULT_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
+
     private AzureConstants() {
         // do not instantiate
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
index 2229cfd..163a962 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
@@ -17,7 +17,6 @@
 package org.apache.nifi.processors.azure.storage;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.Collections;
@@ -31,13 +30,13 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
 import org.apache.nifi.processors.azure.AzureConstants;
 
@@ -49,12 +48,14 @@ import com.microsoft.azure.storage.blob.CloudBlobContainer;
 
 @Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
 @CapabilityDescription("Retrieves contents of an Azure Storage Blob, writing the contents to the content of the FlowFile")
+@SeeAlso({ ListAzureBlobStorage.class, PutAzureBlobStorage.class })
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @WritesAttributes({
     @WritesAttribute(attribute = "azure.length", description = "The length of the blob fetched")
 })
 public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
-    public static final List<PropertyDescriptor> PROPERTIES = Collections
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections
             .unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB));
 
     @Override
@@ -84,14 +85,11 @@ public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
 
             // TODO - we may be able do fancier things with ranges and
             // distribution of download over threads, investigate
-            flowFile = session.write(flowFile, new OutputStreamCallback() {
-                @Override
-                public void process(OutputStream os) throws IOException {
-                    try {
-                        blob.download(os);
-                    } catch (StorageException e) {
-                        throw new IOException(e);
-                    }
+            flowFile = session.write(flowFile, os -> {
+                try {
+                    blob.download(os);
+                } catch (StorageException e) {
+                    throw new IOException(e);
                 }
             });
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
index f4a793b..f8a6c4d 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
@@ -46,12 +46,10 @@ import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
 import org.apache.nifi.processors.standard.AbstractListProcessor;
 
 import com.microsoft.azure.storage.CloudStorageAccount;
-import com.microsoft.azure.storage.OperationContext;
 import com.microsoft.azure.storage.StorageException;
 import com.microsoft.azure.storage.StorageUri;
 import com.microsoft.azure.storage.blob.BlobListingDetails;
 import com.microsoft.azure.storage.blob.BlobProperties;
-import com.microsoft.azure.storage.blob.BlobRequestOptions;
 import com.microsoft.azure.storage.blob.CloudBlob;
 import com.microsoft.azure.storage.blob.CloudBlobClient;
 import com.microsoft.azure.storage.blob.CloudBlobContainer;
@@ -60,14 +58,18 @@ import com.microsoft.azure.storage.blob.ListBlobItem;
 
 @TriggerSerially
 @Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
-@SeeAlso({ FetchAzureBlobStorage.class })
+@SeeAlso({ FetchAzureBlobStorage.class, PutAzureBlobStorage.class })
 @CapabilityDescription("Lists blobs in an Azure Storage container. Listing details are attached to an empty FlowFile for use with FetchAzureBlobStorage")
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
-@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the azure container"),
-        @WritesAttribute(attribute = "azure.blobname", description = "The name of the azure blob"), @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"),
-        @WritesAttribute(attribute = "azure.secondaryUri", description = "Secondary location for blob content"), @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"),
-        @WritesAttribute(attribute = "azure.length", description = "Length of the blob"), @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"),
-        @WritesAttribute(attribute = "mime.type", description = "MimeType of the content"), @WritesAttribute(attribute = "lang", description = "Language code for the content"),
+@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the Azure container"),
+        @WritesAttribute(attribute = "azure.blobname", description = "The name of the Azure blob"),
+        @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"),
+        @WritesAttribute(attribute = "azure.secondaryUri", description = "Secondary location for blob content"),
+        @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"),
+        @WritesAttribute(attribute = "azure.length", description = "Length of the blob"),
+        @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"),
+        @WritesAttribute(attribute = "mime.type", description = "MimeType of the content"),
+        @WritesAttribute(attribute = "lang", description = "Language code for the content"),
         @WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") })
 @Stateful(scopes = { Scope.LOCAL, Scope.CLUSTER }, description = "After performing a listing of blobs, the timestamp of the newest blob is stored. "
         + "This allows the Processor to list only blobs that have been added or modified after " + "this date the next time that the Processor is run.")
@@ -76,7 +78,7 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
     private static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder().name("Prefix").description("Search prefix for listing").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(true).required(false).build();
 
-    public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, PREFIX));
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, PREFIX));
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -106,8 +108,10 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
 
     @Override
     protected boolean isListingResetNecessary(final PropertyDescriptor property) {
-        // TODO - implement
-        return false;
+        // re-list if configuration changed, but not when security keys are rolled (not included in the condition)
+        return PREFIX.equals(property)
+                   || AzureConstants.ACCOUNT_NAME.equals(property)
+                   || AzureConstants.CONTAINER.equals(property);
     }
 
     @Override
@@ -128,10 +132,7 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
             CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
             CloudBlobContainer container = blobClient.getContainerReference(containerName);
 
-            BlobRequestOptions blobRequestOptions = null;
-            OperationContext operationContext = null;
-
-            for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), blobRequestOptions, operationContext)) {
+            for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, null)) {
                 if (blob instanceof CloudBlob) {
                     CloudBlob cloudBlob = (CloudBlob) blob;
                     BlobProperties properties = cloudBlob.getProperties();
@@ -154,40 +155,26 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
         return listing;
     }
 
-    protected static CloudStorageAccount createStorageConnection(ProcessContext context) {
+    protected CloudStorageAccount createStorageConnection(ProcessContext context) {
         final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
         final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
-        final String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", accountName, accountKey);
+        final String storageConnectionString = String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, accountName, accountKey);
         try {
-            return createStorageAccountFromConnectionString(storageConnectionString);
+
+            CloudStorageAccount storageAccount;
+            try {
+                storageAccount = CloudStorageAccount.parse(storageConnectionString);
+            } catch (IllegalArgumentException | URISyntaxException e) {
+                getLogger().error("Invalid connection string URI for '{}'", new Object[]{context.getName()}, e);
+                throw e;
+            } catch (InvalidKeyException e) {
+                getLogger().error("Invalid connection credentials for '{}'", new Object[]{context.getName()}, e);
+                throw e;
+            }
+            return storageAccount;
         } catch (InvalidKeyException | URISyntaxException e) {
             throw new IllegalArgumentException(e);
         }
     }
 
-    /**
-     * Validates the connection string and returns the storage account. The connection string must be in the Azure connection string format.
-     *
-     * @param storageConnectionString
-     *            Connection string for the storage service or the emulator
-     * @return The newly created CloudStorageAccount object
-     *
-     */
-    private static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException {
-
-        CloudStorageAccount storageAccount;
-        try {
-            storageAccount = CloudStorageAccount.parse(storageConnectionString);
-        } catch (IllegalArgumentException | URISyntaxException e) {
-            System.out.println("\nConnection string specifies an invalid URI.");
-            System.out.println("Please confirm the connection string is in the Azure connection string format.");
-            throw e;
-        } catch (InvalidKeyException e) {
-            System.out.println("\nConnection string specifies an invalid key.");
-            System.out.println("Please confirm the AccountName and AccountKey in the connection string are valid.");
-            throw e;
-        }
-        return storageAccount;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
index 1327a0b..e03bc25 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
@@ -35,7 +35,6 @@ import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
 import org.apache.nifi.processors.azure.AzureConstants;
 
@@ -50,13 +49,12 @@ import com.microsoft.azure.storage.blob.CloudBlobContainer;
 @SeeAlso({ ListAzureBlobStorage.class, FetchAzureBlobStorage.class })
 @CapabilityDescription("Puts content into an Azure Storage Blob")
 @InputRequirement(Requirement.INPUT_REQUIRED)
-@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the azure container"),
-        @WritesAttribute(attribute = "azure.blobname", description = "The name of the azure blob"),
+@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the Azure container"),
+        @WritesAttribute(attribute = "azure.blobname", description = "The name of the Azure blob"),
         @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"),
         @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"),
         @WritesAttribute(attribute = "azure.length", description = "Length of the blob"),
-        @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"),
-        @WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") })
+        @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob")})
 public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
 
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
@@ -80,21 +78,23 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
 
             final Map<String, String> attributes = new HashMap<>();
             long length = flowFile.getSize();
-            session.read(flowFile, new InputStreamCallback() {
-                @Override
-                public void process(final InputStream rawIn) throws IOException {
-                    final InputStream in = new BufferedInputStream(rawIn);
-                    try {
-                        blob.upload(in, length);
-                        BlobProperties properties = blob.getProperties();
-                        attributes.put("azure.container", containerName);
-                        attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString());
-                        attributes.put("azure.etag", properties.getEtag());
-                        attributes.put("azure.length", String.valueOf(length));
-                        attributes.put("azure.timestamp", String.valueOf(properties.getLastModified()));
-                    } catch (StorageException | URISyntaxException e) {
-                        throw new IOException(e);
-                    }
+            session.read(flowFile, rawIn -> {
+                InputStream in = rawIn;
+                if (!(in instanceof BufferedInputStream)) {
+                    // do not double-wrap
+                    in = new BufferedInputStream(rawIn);
+                }
+
+                try {
+                    blob.upload(in, length);
+                    BlobProperties properties = blob.getProperties();
+                    attributes.put("azure.container", containerName);
+                    attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString());
+                    attributes.put("azure.etag", properties.getEtag());
+                    attributes.put("azure.length", String.valueOf(length));
+                    attributes.put("azure.timestamp", String.valueOf(properties.getLastModified()));
+                } catch (StorageException | URISyntaxException e) {
+                    throw new IOException(e);
                 }
             });
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java
index d429878..6907d94 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java
@@ -159,7 +159,7 @@ public class BlobInfo implements Comparable<BlobInfo>, Serializable, ListableEnt
         return etag.compareTo(o.etag);
     }
 
-    protected BlobInfo(final Builder builder) {
+    private BlobInfo(final Builder builder) {
         this.primaryUri = builder.primaryUri;
         this.secondaryUri = builder.secondaryUri;
         this.contentType = builder.contentType;

http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage/additionalDetails.html b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage/additionalDetails.html
new file mode 100644
index 0000000..b4b8e3b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage/additionalDetails.html
@@ -0,0 +1,39 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>FetchAzureBlobStorage Processor</title>
+    <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+
+<h2>Apache NiFi Azure Processors</h2>
+
+<h3>Important Security Note</h3>
+<p>
+    There are certain risks in allowing the account name and key to be stored as flowfile
+    attributes. While it does provide for a more flexible flow by allowing the account name and key
+    be fetched dynamically from the flow file attributes, care must be taken to restrict access to
+    the recorded event provenance data (e.g. by strictly controlling the provenance policy permission).
+    In addition, the provenance repositories may be put on encrypted disk partitions.
+</p>
+<p>
+    <a href="#" onclick="history.back()">Return to a previous page</a>
+</p>
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.ListAzureBlobStorage/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.ListAzureBlobStorage/additionalDetails.html b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.ListAzureBlobStorage/additionalDetails.html
new file mode 100644
index 0000000..76e8775
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.ListAzureBlobStorage/additionalDetails.html
@@ -0,0 +1,39 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>ListAzureBlobStorage Processor</title>
+    <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+
+<h2>Apache NiFi Azure Processors</h2>
+
+<h3>Important Security Note</h3>
+<p>
+    There are certain risks in allowing the account name and key to be stored as flowfile
+    attributes. While it does provide for a more flexible flow by allowing the account name and key
+    be fetched dynamically from the flow file attributes, care must be taken to restrict access to
+    the recorded event provenance data (e.g. by strictly controlling the provenance policy permission).
+    In addition, the provenance repositories may be put on encrypted disk partitions.
+</p>
+<p>
+    <a href="#" onclick="history.back()">Return to a previous page</a>
+</p>
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureBlobStorage/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureBlobStorage/additionalDetails.html b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureBlobStorage/additionalDetails.html
new file mode 100644
index 0000000..0a7ff35
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureBlobStorage/additionalDetails.html
@@ -0,0 +1,39 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PutAzureBlobStorage Processor</title>
+    <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+
+<h2>Apache NiFi Azure Processors</h2>
+
+<h3>Important Security Note</h3>
+<p>
+    There are certain risks in allowing the account name and key to be stored as flowfile
+    attributes. While it does provide for a more flexible flow by allowing the account name and key
+    be fetched dynamically from the flow file attributes, care must be taken to restrict access to
+    the recorded event provenance data (e.g. by strictly controlling the provenance policy permission).
+    In addition, the provenance repositories may be put on encrypted disk partitions.
+</p>
+<p>
+    <a href="#" onclick="history.back()">Return to a previous page</a>
+</p>
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java
index 34702eb..91a8c73 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java
@@ -16,7 +16,17 @@
  */
 package org.apache.nifi.processors.azure.storage;
 
-import static org.junit.Assert.fail;
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
+import com.microsoft.azure.storage.blob.ListBlobItem;
+import org.apache.nifi.processors.azure.AzureConstants;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
@@ -25,19 +35,7 @@ import java.net.URISyntaxException;
 import java.security.InvalidKeyException;
 import java.util.Properties;
 
-import org.apache.nifi.util.file.FileUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import com.microsoft.azure.storage.CloudStorageAccount;
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.CloudBlob;
-import com.microsoft.azure.storage.blob.CloudBlobClient;
-import com.microsoft.azure.storage.blob.CloudBlobContainer;
-import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
-import com.microsoft.azure.storage.blob.ListBlobItem;
-import com.microsoft.azure.storage.table.CloudTable;
-import com.microsoft.azure.storage.table.CloudTableClient;
+import static org.junit.Assert.fail;
 
 public abstract class AbstractAzureIT {
     protected static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
@@ -90,17 +88,10 @@ public abstract class AbstractAzureIT {
     }
 
     protected static CloudBlobContainer getContainer() throws InvalidKeyException, URISyntaxException, StorageException {
-        String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", getAccountName(), getAccountKey());
+        String storageConnectionString = String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, getAccountName(), getAccountKey());
         CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
         CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
         return blobClient.getContainerReference(TEST_CONTAINER_NAME);
     }
 
-    protected static CloudTable getTable() throws InvalidKeyException, URISyntaxException, StorageException {
-        String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", getAccountName(), getAccountKey());
-        CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
-        CloudTableClient tableClient = storageAccount.createCloudTableClient();
-        return tableClient.getTableReference(TEST_TABLE_NAME);
-    }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
index 1e8a8f7..7dc8830 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.nifi.processors.azure.AbstractAzureProcessor;
 import org.apache.nifi.processors.azure.AzureConstants;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -51,7 +52,7 @@ public class ITFetchAzureBlobStorage extends AbstractAzureIT {
         runner.enqueue(new byte[0], attributes);
         runner.run();
 
-        runner.assertAllFlowFilesTransferred(FetchAzureBlobStorage.REL_SUCCESS, 1);
+        runner.assertAllFlowFilesTransferred(AbstractAzureProcessor.REL_SUCCESS, 1);
         List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS);
         for (MockFlowFile flowFile : flowFilesForRelationship) {
             flowFile.assertContentEquals("0123456789".getBytes());