You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by jtstorck <gi...@git.apache.org> on 2017/04/30 00:54:06 UTC

[GitHub] nifi pull request #1719: NIFI-1833 Updates to Azure Storage Processor PR to ...

GitHub user jtstorck opened a pull request:

    https://github.com/apache/nifi/pull/1719

    NIFI-1833 Updates to Azure Storage Processor PR to fix dependencies and integration tests

    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [X] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [X] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [X] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [ ] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [X] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [ ] Have you written or updated unit tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jtstorck/nifi NIFI-1833

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/1719.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1719
    
----
commit 946e592fdc16780e6acb9024f9619e2fa949182a
Author: Simon Elliston Ball <si...@simonellistonball.com>
Date:   2016-05-01T23:35:34Z

    NIFI-1833 - Azure Storage processors

commit 57e9e3e9e9a803f809070083fdb1e70b47a0187d
Author: Andrew Grande <ap...@gmail.com>
Date:   2017-04-04T18:57:21Z

    Addressed dependency issues from the review.

commit 51e0a002e9045e78f864de31ee629b2efc223e9a
Author: Andrew Grande <ap...@gmail.com>
Date:   2017-04-04T19:39:17Z

    Addressed a checkstyle issue.

commit ee6ef788e4c984889c8a799870b6ecbc818a8096
Author: Andrew Grande <ap...@gmail.com>
Date:   2017-04-04T22:47:01Z

    Review: reworded the descriptions.

commit 68639f15aa6487f13be0eca83eb3d4db53bad76d
Author: Andrew Grande <ap...@gmail.com>
Date:   2017-04-04T22:54:20Z

    Review: implemented the reset condition logic.

commit 5f4c3813aac3c17fae42dd946426d92c76a14909
Author: Andrew Grande <ap...@gmail.com>
Date:   2017-04-04T23:01:41Z

    Review: dropped static qualifier from method signatures, not required really

commit 065bf332160fc98fefac62df9b140122b23e9fe2
Author: Andrew Grande <ap...@gmail.com>
Date:   2017-04-04T23:16:00Z

    Review: removed sys.out, inlined a single method to get access to the ProcessContext.getName()

commit 066f62cf0a50ba9c465803dab5bdaa7f8f660ec8
Author: Andrew Grande <ap...@gmail.com>
Date:   2017-04-04T23:28:42Z

    Switched to HTTPS as per MSFT recommendation. Some DRY. Dropped cruft.

commit 23989e7367f60da04e111751d325089435d26729
Author: Andrew Grande <ap...@gmail.com>
Date:   2017-04-05T17:03:34Z

    Addressing review suggestions from 4/5

commit 4693cf0e0d574528b610d284fb47a181a7611dbd
Author: Andrew Grande <ap...@gmail.com>
Date:   2017-04-10T17:30:39Z

    Review: documentation improvements

commit 08668a1056c1ad5a6fb1c1c7d734789c1c35273a
Author: Andrew Grande <ap...@gmail.com>
Date:   2017-04-10T17:59:15Z

    Review: documentation improvements

commit 7a288047b4632fb0c20285c6d36d40a949002770
Author: Jeff Storck <jt...@gmail.com>
Date:   2017-04-30T00:23:35Z

    NIFI-1833 Moved AbstractListProcessor.java, EntityListing.java, and ListableEntity.java from nifi-standard-processors into nifi-processor-utils
    Moved TestAbstractListProcessor.java into nifi-processor-utils
    Set nifi-azure-nar's nar dependency back to nifi-standard-services-api-nar
    Fixed failing integration tests (ITFetchAzureBlobStorage.java, ITListAzureBlobStorage.java, and ITPutAzureStorageBlob.java) and refactored them to be able to run in parallel

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1719: NIFI-1833 Updates to Azure Storage Processor PR to ...

Posted by jtstorck <gi...@git.apache.org>.
Github user jtstorck commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1719#discussion_r114141372
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java ---
    @@ -0,0 +1,41 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +public final class AzureConstants {
    +    public static final String BLOCK = "Block";
    +    public static final String PAGE = "Page";
    +
    +    public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder().name("Storage Account Key").description("The storage account key")
    --- End diff --
    
    Will add displayName.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1719: NIFI-1833 Updates to Azure Storage Processor PR to ...

Posted by brosander <gi...@git.apache.org>.
Github user brosander commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1719#discussion_r114135508
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +
    +public abstract class AbstractAzureBlobProcessor extends AbstractAzureProcessor {
    +
    +    public static final PropertyDescriptor BLOB = new PropertyDescriptor.Builder().name("Blob").description("The filename of the blob").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    --- End diff --
    
    displayName missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1719: NIFI-1833 Updates to Azure Storage Processor PR to ...

Posted by brosander <gi...@git.apache.org>.
Github user brosander commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1719#discussion_r114136443
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java ---
    @@ -0,0 +1,180 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage;
    +
    +import java.io.IOException;
    +import java.net.URISyntaxException;
    +import java.security.InvalidKeyException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.EnumSet;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.AzureConstants;
    +import org.apache.nifi.processors.azure.storage.utils.BlobInfo;
    +import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
    +import org.apache.nifi.processor.util.list.AbstractListProcessor;
    +
    +import com.microsoft.azure.storage.CloudStorageAccount;
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.StorageUri;
    +import com.microsoft.azure.storage.blob.BlobListingDetails;
    +import com.microsoft.azure.storage.blob.BlobProperties;
    +import com.microsoft.azure.storage.blob.CloudBlob;
    +import com.microsoft.azure.storage.blob.CloudBlobClient;
    +import com.microsoft.azure.storage.blob.CloudBlobContainer;
    +import com.microsoft.azure.storage.blob.CloudBlockBlob;
    +import com.microsoft.azure.storage.blob.ListBlobItem;
    +
    +@TriggerSerially
    +@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
    +@SeeAlso({ FetchAzureBlobStorage.class, PutAzureBlobStorage.class })
    +@CapabilityDescription("Lists blobs in an Azure Storage container. Listing details are attached to an empty FlowFile for use with FetchAzureBlobStorage")
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the Azure container"),
    +        @WritesAttribute(attribute = "azure.blobname", description = "The name of the Azure blob"),
    +        @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"),
    +        @WritesAttribute(attribute = "azure.secondaryUri", description = "Secondary location for blob content"),
    +        @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"),
    +        @WritesAttribute(attribute = "azure.length", description = "Length of the blob"),
    +        @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"),
    +        @WritesAttribute(attribute = "mime.type", description = "MimeType of the content"),
    +        @WritesAttribute(attribute = "lang", description = "Language code for the content"),
    +        @WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") })
    +@Stateful(scopes = { Scope.LOCAL, Scope.CLUSTER }, description = "After performing a listing of blobs, the timestamp of the newest blob is stored. "
    +        + "This allows the Processor to list only blobs that have been added or modified after " + "this date the next time that the Processor is run.")
    +public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
    +
    +    private static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder().name("Prefix").description("Search prefix for listing").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    --- End diff --
    
    displayName


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1719: NIFI-1833 Updates to Azure Storage Processor PR to ...

Posted by jtstorck <gi...@git.apache.org>.
Github user jtstorck commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1719#discussion_r114352442
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java ---
    @@ -103,10 +107,15 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
                 session.transfer(flowFile, REL_SUCCESS);
                 final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
                 session.getProvenanceReporter().fetch(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis);
    -
    -        } catch (IllegalArgumentException | URISyntaxException | StorageException e1) {
    -            flowFile = session.penalize(flowFile);
    -            session.transfer(flowFile, REL_FAILURE);
    +        } catch (IllegalArgumentException | URISyntaxException | StorageException | ProcessException e) {
    +            if (e instanceof ProcessException && storedException.get() == null) {
    +                throw (ProcessException) e;
    +            } else {
    +                Exception failureException = Optional.ofNullable(storedException.get()).map(x -> x).orElse(e);
    --- End diff --
    
    Agreed, I will remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1719: NIFI-1833 Updates to Azure Storage Processor PR to ...

Posted by jtstorck <gi...@git.apache.org>.
Github user jtstorck commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1719#discussion_r114141342
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +
    +public abstract class AbstractAzureBlobProcessor extends AbstractAzureProcessor {
    +
    +    public static final PropertyDescriptor BLOB = new PropertyDescriptor.Builder().name("Blob").description("The filename of the blob").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    --- End diff --
    
    Will add displayName.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1719: NIFI-1833 Updates to Azure Storage Processor PR to ...

Posted by brosander <gi...@git.apache.org>.
Github user brosander commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1719#discussion_r114136630
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java ---
    @@ -0,0 +1,180 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage;
    +
    +import java.io.IOException;
    +import java.net.URISyntaxException;
    +import java.security.InvalidKeyException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.EnumSet;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.AzureConstants;
    +import org.apache.nifi.processors.azure.storage.utils.BlobInfo;
    +import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
    +import org.apache.nifi.processor.util.list.AbstractListProcessor;
    +
    +import com.microsoft.azure.storage.CloudStorageAccount;
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.StorageUri;
    +import com.microsoft.azure.storage.blob.BlobListingDetails;
    +import com.microsoft.azure.storage.blob.BlobProperties;
    +import com.microsoft.azure.storage.blob.CloudBlob;
    +import com.microsoft.azure.storage.blob.CloudBlobClient;
    +import com.microsoft.azure.storage.blob.CloudBlobContainer;
    +import com.microsoft.azure.storage.blob.CloudBlockBlob;
    +import com.microsoft.azure.storage.blob.ListBlobItem;
    +
    +@TriggerSerially
    +@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
    +@SeeAlso({ FetchAzureBlobStorage.class, PutAzureBlobStorage.class })
    +@CapabilityDescription("Lists blobs in an Azure Storage container. Listing details are attached to an empty FlowFile for use with FetchAzureBlobStorage")
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the Azure container"),
    +        @WritesAttribute(attribute = "azure.blobname", description = "The name of the Azure blob"),
    +        @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"),
    +        @WritesAttribute(attribute = "azure.secondaryUri", description = "Secondary location for blob content"),
    +        @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"),
    +        @WritesAttribute(attribute = "azure.length", description = "Length of the blob"),
    +        @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"),
    +        @WritesAttribute(attribute = "mime.type", description = "MimeType of the content"),
    +        @WritesAttribute(attribute = "lang", description = "Language code for the content"),
    +        @WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") })
    +@Stateful(scopes = { Scope.LOCAL, Scope.CLUSTER }, description = "After performing a listing of blobs, the timestamp of the newest blob is stored. "
    +        + "This allows the Processor to list only blobs that have been added or modified after " + "this date the next time that the Processor is run.")
    +public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
    +
    +    private static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder().name("Prefix").description("Search prefix for listing").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true).required(false).build();
    +
    +    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, PREFIX));
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @Override
    +    protected Map<String, String> createAttributes(BlobInfo entity, ProcessContext context) {
    +        final Map<String, String> attributes = new HashMap<>();
    +        attributes.put("azure.etag", entity.getEtag());
    +        attributes.put("azure.primaryUri", entity.getPrimaryUri());
    +        attributes.put("azure.secondaryUri", entity.getSecondaryUri());
    +        attributes.put("azure.blobname", entity.getName());
    +        attributes.put("azure.blobtype", entity.getBlobType());
    +        attributes.put("azure.length", String.valueOf(entity.getLength()));
    +        attributes.put("azure.timestamp", String.valueOf(entity.getTimestamp()));
    +        attributes.put("mime.type", entity.getContentType());
    +        attributes.put("lang", entity.getContentLanguage());
    +
    +        return attributes;
    +    }
    +
    +    @Override
    +    protected String getPath(final ProcessContext context) {
    +        return context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions().getValue();
    +    }
    +
    +    @Override
    +    protected boolean isListingResetNecessary(final PropertyDescriptor property) {
    +        // re-list if configuration changed, but not when security keys are rolled (not included in the condition)
    +        return PREFIX.equals(property)
    +                   || AzureConstants.ACCOUNT_NAME.equals(property)
    +                   || AzureConstants.CONTAINER.equals(property);
    +    }
    +
    +    @Override
    +    protected Scope getStateScope(final ProcessContext context) {
    +        return Scope.CLUSTER;
    --- End diff --
    
    annotations at top of class indicate local or cluster scope is supported


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1719: NIFI-1833 Updates to Azure Storage Processor PR to ...

Posted by jtstorck <gi...@git.apache.org>.
Github user jtstorck commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1719#discussion_r114141280
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml ---
    @@ -32,11 +32,35 @@ language governing permissions and limitations under the License. -->
                 <artifactId>nifi-utils</artifactId>
             </dependency>
             <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-ssl-context-service-api</artifactId>
    +            <scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.avro</groupId>
    +            <artifactId>avro</artifactId>
    +        </dependency>
    +        <dependency>
                 <groupId>com.microsoft.azure</groupId>
                 <artifactId>azure-eventhubs</artifactId>
                 <version>0.9.0</version>
             </dependency>
             <dependency>
    +            <groupId>com.fasterxml.jackson.core</groupId>
    +            <artifactId>jackson-core</artifactId>
    +            <version>2.8.6</version>
    +        </dependency>
    +        <!--<dependency>
    --- End diff --
    
    Most likely, I'll take a look at it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1719: NIFI-1833 Updates to Azure Storage Processor PR to ...

Posted by jtstorck <gi...@git.apache.org>.
Github user jtstorck commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1719#discussion_r114170142
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java ---
    @@ -0,0 +1,180 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage;
    +
    +import java.io.IOException;
    +import java.net.URISyntaxException;
    +import java.security.InvalidKeyException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.EnumSet;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.AzureConstants;
    +import org.apache.nifi.processors.azure.storage.utils.BlobInfo;
    +import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
    +import org.apache.nifi.processor.util.list.AbstractListProcessor;
    +
    +import com.microsoft.azure.storage.CloudStorageAccount;
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.StorageUri;
    +import com.microsoft.azure.storage.blob.BlobListingDetails;
    +import com.microsoft.azure.storage.blob.BlobProperties;
    +import com.microsoft.azure.storage.blob.CloudBlob;
    +import com.microsoft.azure.storage.blob.CloudBlobClient;
    +import com.microsoft.azure.storage.blob.CloudBlobContainer;
    +import com.microsoft.azure.storage.blob.CloudBlockBlob;
    +import com.microsoft.azure.storage.blob.ListBlobItem;
    +
    +@TriggerSerially
    +@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
    +@SeeAlso({ FetchAzureBlobStorage.class, PutAzureBlobStorage.class })
    +@CapabilityDescription("Lists blobs in an Azure Storage container. Listing details are attached to an empty FlowFile for use with FetchAzureBlobStorage")
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the Azure container"),
    +        @WritesAttribute(attribute = "azure.blobname", description = "The name of the Azure blob"),
    +        @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"),
    +        @WritesAttribute(attribute = "azure.secondaryUri", description = "Secondary location for blob content"),
    +        @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"),
    +        @WritesAttribute(attribute = "azure.length", description = "Length of the blob"),
    +        @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"),
    +        @WritesAttribute(attribute = "mime.type", description = "MimeType of the content"),
    +        @WritesAttribute(attribute = "lang", description = "Language code for the content"),
    +        @WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") })
    +@Stateful(scopes = { Scope.LOCAL, Scope.CLUSTER }, description = "After performing a listing of blobs, the timestamp of the newest blob is stored. "
    +        + "This allows the Processor to list only blobs that have been added or modified after " + "this date the next time that the Processor is run.")
    +public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
    +
    +    private static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder().name("Prefix").description("Search prefix for listing").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true).required(false).build();
    +
    +    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, PREFIX));
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @Override
    +    protected Map<String, String> createAttributes(BlobInfo entity, ProcessContext context) {
    +        final Map<String, String> attributes = new HashMap<>();
    +        attributes.put("azure.etag", entity.getEtag());
    +        attributes.put("azure.primaryUri", entity.getPrimaryUri());
    +        attributes.put("azure.secondaryUri", entity.getSecondaryUri());
    +        attributes.put("azure.blobname", entity.getName());
    +        attributes.put("azure.blobtype", entity.getBlobType());
    +        attributes.put("azure.length", String.valueOf(entity.getLength()));
    +        attributes.put("azure.timestamp", String.valueOf(entity.getTimestamp()));
    +        attributes.put("mime.type", entity.getContentType());
    +        attributes.put("lang", entity.getContentLanguage());
    +
    +        return attributes;
    +    }
    +
    +    @Override
    +    protected String getPath(final ProcessContext context) {
    +        return context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions().getValue();
    +    }
    +
    +    @Override
    +    protected boolean isListingResetNecessary(final PropertyDescriptor property) {
    +        // re-list if configuration changed, but not when security keys are rolled (not included in the condition)
    +        return PREFIX.equals(property)
    +                   || AzureConstants.ACCOUNT_NAME.equals(property)
    +                   || AzureConstants.CONTAINER.equals(property);
    +    }
    +
    +    @Override
    +    protected Scope getStateScope(final ProcessContext context) {
    +        return Scope.CLUSTER;
    --- End diff --
    
    I talked to Mark Payne and since the cluster scope is explicitly used, he recommended that the local scope be removed from the annotiation, so I'll make that update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1719: NIFI-1833 Updates to Azure Storage Processor PR to ...

Posted by brosander <gi...@git.apache.org>.
Github user brosander commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1719#discussion_r114228274
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java ---
    @@ -106,10 +110,15 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
                 final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
                 session.getProvenanceReporter().send(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis);
     
    -        } catch (IllegalArgumentException | URISyntaxException | StorageException e) {
    -            getLogger().error("Failed to put Azure blob {}", new Object[]{blobPath}, e);
    -            flowFile = session.penalize(flowFile);
    -            session.transfer(flowFile, REL_FAILURE);
    +        } catch (IllegalArgumentException | URISyntaxException | StorageException | ProcessException e) {
    +            if (e instanceof ProcessException && storedException.get() == null) {
    +                throw (ProcessException) e;
    +            } else {
    +                Exception failureException = Optional.ofNullable(storedException.get()).map(x -> x).orElse(e);
    --- End diff --
    
    @jtstorck I don't think you need the .map() here. It looks like orElse() will already give you the value if the optional is absent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1719: NIFI-1833 Updates to Azure Storage Processor PR to ...

Posted by jtstorck <gi...@git.apache.org>.
Github user jtstorck commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1719#discussion_r114352461
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java ---
    @@ -106,10 +110,15 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
                 final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
                 session.getProvenanceReporter().send(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis);
     
    -        } catch (IllegalArgumentException | URISyntaxException | StorageException e) {
    -            getLogger().error("Failed to put Azure blob {}", new Object[]{blobPath}, e);
    -            flowFile = session.penalize(flowFile);
    -            session.transfer(flowFile, REL_FAILURE);
    +        } catch (IllegalArgumentException | URISyntaxException | StorageException | ProcessException e) {
    +            if (e instanceof ProcessException && storedException.get() == null) {
    +                throw (ProcessException) e;
    +            } else {
    +                Exception failureException = Optional.ofNullable(storedException.get()).map(x -> x).orElse(e);
    --- End diff --
    
    Agreed, I will remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1719: NIFI-1833 Updates to Azure Storage Processor PR to ...

Posted by brosander <gi...@git.apache.org>.
Github user brosander commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1719#discussion_r114135293
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml ---
    @@ -32,11 +32,35 @@ language governing permissions and limitations under the License. -->
                 <artifactId>nifi-utils</artifactId>
             </dependency>
             <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-ssl-context-service-api</artifactId>
    +            <scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.avro</groupId>
    +            <artifactId>avro</artifactId>
    +        </dependency>
    +        <dependency>
                 <groupId>com.microsoft.azure</groupId>
                 <artifactId>azure-eventhubs</artifactId>
                 <version>0.9.0</version>
             </dependency>
             <dependency>
    +            <groupId>com.fasterxml.jackson.core</groupId>
    +            <artifactId>jackson-core</artifactId>
    +            <version>2.8.6</version>
    +        </dependency>
    +        <!--<dependency>
    --- End diff --
    
    If this isn't needed can it be removed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1719: NIFI-1833 Updates to Azure Storage Processor PR to ...

Posted by brosander <gi...@git.apache.org>.
Github user brosander commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1719#discussion_r114135754
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java ---
    @@ -0,0 +1,41 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +public final class AzureConstants {
    +    public static final String BLOCK = "Block";
    +    public static final String PAGE = "Page";
    +
    +    public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder().name("Storage Account Key").description("The storage account key")
    --- End diff --
    
    displayName missing for these properties


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1719: NIFI-1833 Updates to Azure Storage Processor PR to fix dep...

Posted by brosander <gi...@git.apache.org>.
Github user brosander commented on the issue:

    https://github.com/apache/nifi/pull/1719
  
    +1 Merged


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1719: NIFI-1833 Updates to Azure Storage Processor PR to ...

Posted by jtstorck <gi...@git.apache.org>.
Github user jtstorck commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1719#discussion_r114141863
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java ---
    @@ -0,0 +1,180 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage;
    +
    +import java.io.IOException;
    +import java.net.URISyntaxException;
    +import java.security.InvalidKeyException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.EnumSet;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.AzureConstants;
    +import org.apache.nifi.processors.azure.storage.utils.BlobInfo;
    +import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
    +import org.apache.nifi.processor.util.list.AbstractListProcessor;
    +
    +import com.microsoft.azure.storage.CloudStorageAccount;
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.StorageUri;
    +import com.microsoft.azure.storage.blob.BlobListingDetails;
    +import com.microsoft.azure.storage.blob.BlobProperties;
    +import com.microsoft.azure.storage.blob.CloudBlob;
    +import com.microsoft.azure.storage.blob.CloudBlobClient;
    +import com.microsoft.azure.storage.blob.CloudBlobContainer;
    +import com.microsoft.azure.storage.blob.CloudBlockBlob;
    +import com.microsoft.azure.storage.blob.ListBlobItem;
    +
    +@TriggerSerially
    +@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
    +@SeeAlso({ FetchAzureBlobStorage.class, PutAzureBlobStorage.class })
    +@CapabilityDescription("Lists blobs in an Azure Storage container. Listing details are attached to an empty FlowFile for use with FetchAzureBlobStorage")
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the Azure container"),
    +        @WritesAttribute(attribute = "azure.blobname", description = "The name of the Azure blob"),
    +        @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"),
    +        @WritesAttribute(attribute = "azure.secondaryUri", description = "Secondary location for blob content"),
    +        @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"),
    +        @WritesAttribute(attribute = "azure.length", description = "Length of the blob"),
    +        @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"),
    +        @WritesAttribute(attribute = "mime.type", description = "MimeType of the content"),
    +        @WritesAttribute(attribute = "lang", description = "Language code for the content"),
    +        @WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") })
    +@Stateful(scopes = { Scope.LOCAL, Scope.CLUSTER }, description = "After performing a listing of blobs, the timestamp of the newest blob is stored. "
    +        + "This allows the Processor to list only blobs that have been added or modified after " + "this date the next time that the Processor is run.")
    +public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
    +
    +    private static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder().name("Prefix").description("Search prefix for listing").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true).required(false).build();
    +
    +    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, PREFIX));
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @Override
    +    protected Map<String, String> createAttributes(BlobInfo entity, ProcessContext context) {
    +        final Map<String, String> attributes = new HashMap<>();
    +        attributes.put("azure.etag", entity.getEtag());
    +        attributes.put("azure.primaryUri", entity.getPrimaryUri());
    +        attributes.put("azure.secondaryUri", entity.getSecondaryUri());
    +        attributes.put("azure.blobname", entity.getName());
    +        attributes.put("azure.blobtype", entity.getBlobType());
    +        attributes.put("azure.length", String.valueOf(entity.getLength()));
    +        attributes.put("azure.timestamp", String.valueOf(entity.getTimestamp()));
    +        attributes.put("mime.type", entity.getContentType());
    +        attributes.put("lang", entity.getContentLanguage());
    +
    +        return attributes;
    +    }
    +
    +    @Override
    +    protected String getPath(final ProcessContext context) {
    +        return context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions().getValue();
    +    }
    +
    +    @Override
    +    protected boolean isListingResetNecessary(final PropertyDescriptor property) {
    +        // re-list if configuration changed, but not when security keys are rolled (not included in the condition)
    +        return PREFIX.equals(property)
    +                   || AzureConstants.ACCOUNT_NAME.equals(property)
    +                   || AzureConstants.CONTAINER.equals(property);
    +    }
    +
    +    @Override
    +    protected Scope getStateScope(final ProcessContext context) {
    +        return Scope.CLUSTER;
    --- End diff --
    
    It forces cluster scope, which still works locally if there is no cluster state management.  Is there a different way that you'd like this to be addressed?  Take a look at: https://github.com/apache/nifi/pull/1636#discussion_r109812402


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1719: NIFI-1833 Updates to Azure Storage Processor PR to ...

Posted by jtstorck <gi...@git.apache.org>.
Github user jtstorck commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1719#discussion_r114141390
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java ---
    @@ -0,0 +1,180 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage;
    +
    +import java.io.IOException;
    +import java.net.URISyntaxException;
    +import java.security.InvalidKeyException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.EnumSet;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.AzureConstants;
    +import org.apache.nifi.processors.azure.storage.utils.BlobInfo;
    +import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
    +import org.apache.nifi.processor.util.list.AbstractListProcessor;
    +
    +import com.microsoft.azure.storage.CloudStorageAccount;
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.StorageUri;
    +import com.microsoft.azure.storage.blob.BlobListingDetails;
    +import com.microsoft.azure.storage.blob.BlobProperties;
    +import com.microsoft.azure.storage.blob.CloudBlob;
    +import com.microsoft.azure.storage.blob.CloudBlobClient;
    +import com.microsoft.azure.storage.blob.CloudBlobContainer;
    +import com.microsoft.azure.storage.blob.CloudBlockBlob;
    +import com.microsoft.azure.storage.blob.ListBlobItem;
    +
    +@TriggerSerially
    +@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
    +@SeeAlso({ FetchAzureBlobStorage.class, PutAzureBlobStorage.class })
    +@CapabilityDescription("Lists blobs in an Azure Storage container. Listing details are attached to an empty FlowFile for use with FetchAzureBlobStorage")
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the Azure container"),
    +        @WritesAttribute(attribute = "azure.blobname", description = "The name of the Azure blob"),
    +        @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"),
    +        @WritesAttribute(attribute = "azure.secondaryUri", description = "Secondary location for blob content"),
    +        @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"),
    +        @WritesAttribute(attribute = "azure.length", description = "Length of the blob"),
    +        @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"),
    +        @WritesAttribute(attribute = "mime.type", description = "MimeType of the content"),
    +        @WritesAttribute(attribute = "lang", description = "Language code for the content"),
    +        @WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") })
    +@Stateful(scopes = { Scope.LOCAL, Scope.CLUSTER }, description = "After performing a listing of blobs, the timestamp of the newest blob is stored. "
    +        + "This allows the Processor to list only blobs that have been added or modified after " + "this date the next time that the Processor is run.")
    +public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
    +
    +    private static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder().name("Prefix").description("Search prefix for listing").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    --- End diff --
    
    Will add displayName.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1719: NIFI-1833 Updates to Azure Storage Processor PR to ...

Posted by brosander <gi...@git.apache.org>.
Github user brosander commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1719#discussion_r114169124
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java ---
    @@ -0,0 +1,180 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage;
    +
    +import java.io.IOException;
    +import java.net.URISyntaxException;
    +import java.security.InvalidKeyException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.EnumSet;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.AzureConstants;
    +import org.apache.nifi.processors.azure.storage.utils.BlobInfo;
    +import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
    +import org.apache.nifi.processor.util.list.AbstractListProcessor;
    +
    +import com.microsoft.azure.storage.CloudStorageAccount;
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.StorageUri;
    +import com.microsoft.azure.storage.blob.BlobListingDetails;
    +import com.microsoft.azure.storage.blob.BlobProperties;
    +import com.microsoft.azure.storage.blob.CloudBlob;
    +import com.microsoft.azure.storage.blob.CloudBlobClient;
    +import com.microsoft.azure.storage.blob.CloudBlobContainer;
    +import com.microsoft.azure.storage.blob.CloudBlockBlob;
    +import com.microsoft.azure.storage.blob.ListBlobItem;
    +
    +@TriggerSerially
    +@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
    +@SeeAlso({ FetchAzureBlobStorage.class, PutAzureBlobStorage.class })
    +@CapabilityDescription("Lists blobs in an Azure Storage container. Listing details are attached to an empty FlowFile for use with FetchAzureBlobStorage")
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the Azure container"),
    +        @WritesAttribute(attribute = "azure.blobname", description = "The name of the Azure blob"),
    +        @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"),
    +        @WritesAttribute(attribute = "azure.secondaryUri", description = "Secondary location for blob content"),
    +        @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"),
    +        @WritesAttribute(attribute = "azure.length", description = "Length of the blob"),
    +        @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"),
    +        @WritesAttribute(attribute = "mime.type", description = "MimeType of the content"),
    +        @WritesAttribute(attribute = "lang", description = "Language code for the content"),
    +        @WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") })
    +@Stateful(scopes = { Scope.LOCAL, Scope.CLUSTER }, description = "After performing a listing of blobs, the timestamp of the newest blob is stored. "
    +        + "This allows the Processor to list only blobs that have been added or modified after " + "this date the next time that the Processor is run.")
    +public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
    +
    +    private static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder().name("Prefix").description("Search prefix for listing").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true).required(false).build();
    +
    +    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, PREFIX));
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @Override
    +    protected Map<String, String> createAttributes(BlobInfo entity, ProcessContext context) {
    +        final Map<String, String> attributes = new HashMap<>();
    +        attributes.put("azure.etag", entity.getEtag());
    +        attributes.put("azure.primaryUri", entity.getPrimaryUri());
    +        attributes.put("azure.secondaryUri", entity.getSecondaryUri());
    +        attributes.put("azure.blobname", entity.getName());
    +        attributes.put("azure.blobtype", entity.getBlobType());
    +        attributes.put("azure.length", String.valueOf(entity.getLength()));
    +        attributes.put("azure.timestamp", String.valueOf(entity.getTimestamp()));
    +        attributes.put("mime.type", entity.getContentType());
    +        attributes.put("lang", entity.getContentLanguage());
    +
    +        return attributes;
    +    }
    +
    +    @Override
    +    protected String getPath(final ProcessContext context) {
    +        return context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions().getValue();
    +    }
    +
    +    @Override
    +    protected boolean isListingResetNecessary(final PropertyDescriptor property) {
    +        // re-list if configuration changed, but not when security keys are rolled (not included in the condition)
    +        return PREFIX.equals(property)
    +                   || AzureConstants.ACCOUNT_NAME.equals(property)
    +                   || AzureConstants.CONTAINER.equals(property);
    +    }
    +
    +    @Override
    +    protected Scope getStateScope(final ProcessContext context) {
    +        return Scope.CLUSTER;
    --- End diff --
    
    That's fine, my understanding of the way we handle state was (and probably still is \U0001f61b ) lacking.  I tried locally and it seems work as expected.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1719: NIFI-1833 Updates to Azure Storage Processor PR to ...

Posted by brosander <gi...@git.apache.org>.
Github user brosander commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1719#discussion_r114168730
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage;
    +
    +import java.io.IOException;
    +import java.net.URISyntaxException;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
    +import org.apache.nifi.processors.azure.AzureConstants;
    +
    +import com.microsoft.azure.storage.CloudStorageAccount;
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.blob.CloudBlob;
    +import com.microsoft.azure.storage.blob.CloudBlobClient;
    +import com.microsoft.azure.storage.blob.CloudBlobContainer;
    +
    +@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
    +@CapabilityDescription("Retrieves contents of an Azure Storage Blob, writing the contents to the content of the FlowFile")
    +@SeeAlso({ ListAzureBlobStorage.class, PutAzureBlobStorage.class })
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "azure.length", description = "The length of the blob fetched")
    +})
    +public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
    +
    +    private static final List<PropertyDescriptor> PROPERTIES = Collections
    +            .unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB));
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final long startNanos = System.nanoTime();
    +
    +        String containerName = context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
    +        String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        try {
    +            CloudStorageAccount storageAccount = createStorageConnection(context, flowFile);
    +            CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
    +            CloudBlobContainer container = blobClient.getContainerReference(containerName);
    +
    +            final Map<String, String> attributes = new HashMap<>();
    +            final CloudBlob blob = container.getBlockBlobReference(blobPath);
    +
    +            // TODO - we may be able do fancier things with ranges and
    +            // distribution of download over threads, investigate
    +            flowFile = session.write(flowFile, os -> {
    +                try {
    +                    blob.download(os);
    +                } catch (StorageException e) {
    +                    throw new IOException(e);
    --- End diff --
    
    Since we wrap the StorageException in an IOException here, we should probably add it to the catch block below or use an AtomicReference to propagate it instead of wrapping in an IOE.  When I deliberately used a blob name that didn't exist, it didn't get routed to failure but instead was retried over and over.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1719: NIFI-1833 Updates to Azure Storage Processor PR to ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/1719


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1719: NIFI-1833 Updates to Azure Storage Processor PR to fix dep...

Posted by jtstorck <gi...@git.apache.org>.
Github user jtstorck commented on the issue:

    https://github.com/apache/nifi/pull/1719
  
    Since these are important security notices regarding Azure account key and name values allowed to be passed in as attributes, should they be presented in the main documentation of the processor, rather than additional details?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1719: NIFI-1833 Updates to Azure Storage Processor PR to ...

Posted by brosander <gi...@git.apache.org>.
Github user brosander commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1719#discussion_r114228190
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java ---
    @@ -103,10 +107,15 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
                 session.transfer(flowFile, REL_SUCCESS);
                 final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
                 session.getProvenanceReporter().fetch(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis);
    -
    -        } catch (IllegalArgumentException | URISyntaxException | StorageException e1) {
    -            flowFile = session.penalize(flowFile);
    -            session.transfer(flowFile, REL_FAILURE);
    +        } catch (IllegalArgumentException | URISyntaxException | StorageException | ProcessException e) {
    +            if (e instanceof ProcessException && storedException.get() == null) {
    +                throw (ProcessException) e;
    +            } else {
    +                Exception failureException = Optional.ofNullable(storedException.get()).map(x -> x).orElse(e);
    --- End diff --
    
    @jtstorck I don't think you need the .map() here.  It looks like [orElse()](https://docs.oracle.com/javase/8/docs/api/java/util/Optional.html#orElse-T-) will already give you the value if the optional is absent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1719: NIFI-1833 Updates to Azure Storage Processor PR to ...

Posted by brosander <gi...@git.apache.org>.
Github user brosander commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1719#discussion_r114169894
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java ---
    @@ -0,0 +1,116 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage;
    +
    +import java.io.BufferedInputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.net.URISyntaxException;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
    +import org.apache.nifi.processors.azure.AzureConstants;
    +
    +import com.microsoft.azure.storage.CloudStorageAccount;
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.blob.BlobProperties;
    +import com.microsoft.azure.storage.blob.CloudBlob;
    +import com.microsoft.azure.storage.blob.CloudBlobClient;
    +import com.microsoft.azure.storage.blob.CloudBlobContainer;
    +
    +@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
    +@SeeAlso({ ListAzureBlobStorage.class, FetchAzureBlobStorage.class })
    +@CapabilityDescription("Puts content into an Azure Storage Blob")
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the Azure container"),
    +        @WritesAttribute(attribute = "azure.blobname", description = "The name of the Azure blob"),
    +        @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"),
    +        @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"),
    +        @WritesAttribute(attribute = "azure.length", description = "Length of the blob"),
    +        @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob")})
    +public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
    +
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final long startNanos = System.nanoTime();
    +
    +        String containerName = context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        try {
    +            CloudStorageAccount storageAccount = createStorageConnection(context, flowFile);
    +            CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
    +            CloudBlobContainer container = blobClient.getContainerReference(containerName);
    +
    +            CloudBlob blob = container.getBlockBlobReference(blobPath);
    +
    +            final Map<String, String> attributes = new HashMap<>();
    +            long length = flowFile.getSize();
    +            session.read(flowFile, rawIn -> {
    +                InputStream in = rawIn;
    +                if (!(in instanceof BufferedInputStream)) {
    +                    // do not double-wrap
    +                    in = new BufferedInputStream(rawIn);
    +                }
    +
    +                try {
    +                    blob.upload(in, length);
    +                    BlobProperties properties = blob.getProperties();
    +                    attributes.put("azure.container", containerName);
    +                    attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString());
    +                    attributes.put("azure.etag", properties.getEtag());
    +                    attributes.put("azure.length", String.valueOf(length));
    +                    attributes.put("azure.timestamp", String.valueOf(properties.getLastModified()));
    +                } catch (StorageException | URISyntaxException e) {
    +                    throw new IOException(e);
    --- End diff --
    
    Same as comment on FetchAzureBlobStorage.  We should either add IOE to below catch block or propagate the actual exception so flow files are properly sent to failure relationship.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---