You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by zenfenan <gi...@git.apache.org> on 2018/04/06 15:43:13 UTC

[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...

GitHub user zenfenan opened a pull request:

    https://github.com/apache/nifi/pull/2611

    NIFI-5015: Implemented Azure Queue Storage processors

    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [x] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [x] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [x] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [x] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [x] Have you written or updated unit tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [x] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zenfenan/nifi NIFI-5015

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/2611.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2611
    
----
commit 42e01874c44923d6b27955a7e39a18007c67940c
Author: zenfenan <si...@...>
Date:   2018-04-02T02:18:37Z

    NIFI-5015: Implemented Azure Queue Storage processors

----


---

[GitHub] nifi issue #2611: NIFI-5015: Implemented Azure Queue Storage processors

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/2611
  
    Thanks for the update @zenfenan - it LGTM, +1, merging to master


---

[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2611#discussion_r182946668
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java ---
    @@ -0,0 +1,130 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage.queue;
    +
    +import com.microsoft.azure.storage.CloudStorageAccount;
    +import com.microsoft.azure.storage.StorageCredentials;
    +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.queue.CloudQueue;
    +import com.microsoft.azure.storage.queue.CloudQueueClient;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
    +
    +import java.net.URI;
    +import java.net.URISyntaxException;
    +import java.security.InvalidKeyException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
    +            .name("storage-cloudQueue-name")
    +            .displayName("Queue Name")
    +            .description("Name of the Azure Storage Queue")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All successfully processed FlowFiles are routed to this relationship")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("Unsuccessful operations will be transferred to the failure relationship.")
    +            .build();
    +
    +    private static final String FORMAT_QUEUE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
    +    private static final String FORMAT_QUEUE_BASE_URI = "https://%s.queue.core.windows.net";
    --- End diff --
    
    The end point won't change unless Azure wants to break backwards compatibility.. Just kidding. 
    
    I don't think the end  point will change since it is what the `azure-storage` SDKs use. However, it might be possible to property'lize one thing, the connection mode. The SDKs, I believe, support both `http` as well as `https` but since https is recommended both in the Azure docs as well as in general, I went with https by default. Also the blob processors use https.


---

[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2611#discussion_r182947890
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java ---
    @@ -0,0 +1,130 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage.queue;
    +
    +import com.microsoft.azure.storage.CloudStorageAccount;
    +import com.microsoft.azure.storage.StorageCredentials;
    +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.queue.CloudQueue;
    +import com.microsoft.azure.storage.queue.CloudQueueClient;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
    +
    +import java.net.URI;
    +import java.net.URISyntaxException;
    +import java.security.InvalidKeyException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
    +            .name("storage-cloudQueue-name")
    +            .displayName("Queue Name")
    +            .description("Name of the Azure Storage Queue")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All successfully processed FlowFiles are routed to this relationship")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("Unsuccessful operations will be transferred to the failure relationship.")
    +            .build();
    +
    +    private static final String FORMAT_QUEUE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
    --- End diff --
    
    I don't think the connection string takes any other properties other than `AccountName`, `AccountKey`, `DefaultEndpointsProtocol`, `EndpointSuffix` (for Azure China & Azure Government). You are aware about any?


---

[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2611#discussion_r182948627
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage.queue;
    +
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.queue.CloudQueueMessage;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Collections;
    +import java.util.Arrays;
    +import java.util.Set;
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.concurrent.TimeUnit;
    +
    +@SeeAlso({PutAzureQueueStorage.class})
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
    +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
    +        "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
    +        @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
    +        @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
    +        @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
    +        @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
    +})
    +public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
    +
    +    public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
    +            .name("auto-delete-messages")
    +            .displayName("Auto Delete Messages")
    +            .description("Specifies whether the received message is to be automatically deleted from the queue.")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("batch-size")
    +            .displayName("Batch Size")
    +            .description("The number of messages to be retrieved from the queue.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("32")
    +            .build();
    +
    +    public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("visibility-timeout")
    +            .displayName("Visibility Timeout")
    +            .description("The duration during which the retrieved message should be invisible to other consumers.")
    +            .required(true)
    +            .defaultValue("30 secs")
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +
    +    private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
    +            AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
    +            BATCH_SIZE, VISIBILITY_TIMEOUT));
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
    +
    +        final Iterable<CloudQueueMessage> retrievedMessagesIterable;
    +
    +        try {
    +            retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
    +        } catch (final StorageException e) {
    +            getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e});
    +            context.yield();
    +            return;
    +        }
    +
    +        final List<CloudQueueMessage> cloudQueueMessages = toList(retrievedMessagesIterable);
    +
    +        for (final CloudQueueMessage message : cloudQueueMessages) {
    +            FlowFile flowFile = session.create();
    +
    +            final Map<String, String> attributes = new HashMap<>();
    +
    +            attributes.put("azure.queue.uri", cloudQueue.getUri().toString());
    +            attributes.put("azure.queue.insertionTime", message.getInsertionTime().toString());
    +            attributes.put("azure.queue.expirationTime", message.getExpirationTime().toString());
    +            attributes.put("azure.queue.messageId", message.getMessageId());
    +            attributes.put("azure.queue.popReceipt", message.getPopReceipt());
    +
    +            flowFile = session.putAllAttributes(flowFile, attributes);
    +            flowFile = session.write(flowFile, out -> {
    +                try {
    +                    out.write(message.getMessageContentAsByte());
    +                } catch (StorageException e) {
    +                    getLogger().error("Failed to write the retrieved queue message to FlowFile content due to {}", new Object[] {e});
    +                    context.yield();
    +                }
    +            });
    +
    +            session.transfer(flowFile, REL_SUCCESS);
    +            session.getProvenanceReporter().receive(flowFile, cloudQueue.getStorageUri().toString());
    +        }
    +
    +        if(autoDelete) {
    +            session.commit();
    +
    +            for (final CloudQueueMessage message : cloudQueueMessages) {
    +                try {
    +                    cloudQueue.deleteMessage(message);
    +                } catch (StorageException e) {
    +                    getLogger().error("Failed to delete the retrieved message with the id {} from the queue due to {}",
    +                            new Object[] {message.getMessageId(), e});
    +                }
    +            }
    +        }
    +
    +    }
    +
    +    @Override
    +    public Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
    +        final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
    +
    +        final int batchSize = validationContext.getProperty(BATCH_SIZE).asInteger();
    +
    +        //Azure Queue Storage supports only a max of 32 messages to be retrieved in a single invocation
    +        if (batchSize > 32) {
    +            problems.add(new ValidationResult.Builder()
    +                                             .valid(false)
    +                                             .subject(BATCH_SIZE.getDisplayName())
    +                                             .explanation("only up to 32 messages can be retrieved at a time")
    +                                             .build()
    +            );
    +        }
    +
    +        final int visibilityTimeout = validationContext.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
    +
    +        if (visibilityTimeout <= 0) {
    --- End diff --
    
    My bad.. I'll fix it.


---

[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2611#discussion_r182836448
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage.queue;
    +
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.queue.CloudQueueMessage;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Collections;
    +import java.util.Arrays;
    +import java.util.Set;
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.concurrent.TimeUnit;
    +
    +@SeeAlso({PutAzureQueueStorage.class})
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
    +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
    +        "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
    +        @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
    +        @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
    +        @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
    +        @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
    +})
    +public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
    +
    +    public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
    +            .name("auto-delete-messages")
    +            .displayName("Auto Delete Messages")
    +            .description("Specifies whether the received message is to be automatically deleted from the queue.")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("batch-size")
    +            .displayName("Batch Size")
    +            .description("The number of messages to be retrieved from the queue.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("32")
    +            .build();
    +
    +    public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("visibility-timeout")
    +            .displayName("Visibility Timeout")
    +            .description("The duration during which the retrieved message should be invisible to other consumers.")
    +            .required(true)
    +            .defaultValue("30 secs")
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +
    +    private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
    +            AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
    +            BATCH_SIZE, VISIBILITY_TIMEOUT));
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
    +
    +        final Iterable<CloudQueueMessage> retrievedMessagesIterable;
    +
    +        try {
    +            retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
    --- End diff --
    
    how does this work in a clustered approach? should this processor only run on primary node? if yes, could be indicated in the doc/processor description.


---

[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2611#discussion_r183125399
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage.queue;
    +
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.queue.CloudQueueMessage;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Collections;
    +import java.util.Arrays;
    +import java.util.Set;
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.concurrent.TimeUnit;
    +
    +@SeeAlso({PutAzureQueueStorage.class})
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
    +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
    +        "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
    +        @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
    +        @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
    +        @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
    +        @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
    +})
    +public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
    +
    +    public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
    +            .name("auto-delete-messages")
    +            .displayName("Auto Delete Messages")
    +            .description("Specifies whether the received message is to be automatically deleted from the queue.")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("batch-size")
    +            .displayName("Batch Size")
    +            .description("The number of messages to be retrieved from the queue.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("32")
    +            .build();
    +
    +    public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("visibility-timeout")
    +            .displayName("Visibility Timeout")
    +            .description("The duration during which the retrieved message should be invisible to other consumers.")
    +            .required(true)
    +            .defaultValue("30 secs")
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +
    +    private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
    +            AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
    +            BATCH_SIZE, VISIBILITY_TIMEOUT));
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
    +
    +        final Iterable<CloudQueueMessage> retrievedMessagesIterable;
    +
    +        try {
    +            retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
    --- End diff --
    
    Checked it and in clustered scenarios it works as expected. Only one node at a time retrieves the message. Tested with varying message counts as well different number of nodes.


---

[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2611#discussion_r182835251
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java ---
    @@ -0,0 +1,130 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage.queue;
    +
    +import com.microsoft.azure.storage.CloudStorageAccount;
    +import com.microsoft.azure.storage.StorageCredentials;
    +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.queue.CloudQueue;
    +import com.microsoft.azure.storage.queue.CloudQueueClient;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
    +
    +import java.net.URI;
    +import java.net.URISyntaxException;
    +import java.security.InvalidKeyException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
    +            .name("storage-cloudQueue-name")
    +            .displayName("Queue Name")
    +            .description("Name of the Azure Storage Queue")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All successfully processed FlowFiles are routed to this relationship")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("Unsuccessful operations will be transferred to the failure relationship.")
    +            .build();
    +
    +    private static final String FORMAT_QUEUE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
    +    private static final String FORMAT_QUEUE_BASE_URI = "https://%s.queue.core.windows.net";
    +
    +    private static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
    +
    +    protected CloudQueue cloudQueue;
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
    +        final List<ValidationResult> problems = new ArrayList<>(AzureStorageUtils.validateCredentialProperties(validationContext));
    +
    +        final String queueName = validationContext.getProperty(QUEUE).evaluateAttributeExpressions().getValue();
    +        if(!StringUtils.isAllLowerCase(queueName)) {
    --- End diff --
    
    instead of doing a custom validate this could be forced to lower case directly in the code, no?


---

[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2611#discussion_r182958368
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage.queue;
    +
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.queue.CloudQueueMessage;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Collections;
    +import java.util.Arrays;
    +import java.util.Set;
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.concurrent.TimeUnit;
    +
    +@SeeAlso({PutAzureQueueStorage.class})
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
    +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
    +        "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
    +        @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
    +        @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
    +        @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
    +        @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
    +})
    +public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
    +
    +    public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
    +            .name("auto-delete-messages")
    +            .displayName("Auto Delete Messages")
    +            .description("Specifies whether the received message is to be automatically deleted from the queue.")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("batch-size")
    +            .displayName("Batch Size")
    +            .description("The number of messages to be retrieved from the queue.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("32")
    +            .build();
    +
    +    public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("visibility-timeout")
    +            .displayName("Visibility Timeout")
    +            .description("The duration during which the retrieved message should be invisible to other consumers.")
    +            .required(true)
    +            .defaultValue("30 secs")
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +
    +    private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
    +            AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
    +            BATCH_SIZE, VISIBILITY_TIMEOUT));
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
    +
    +        final Iterable<CloudQueueMessage> retrievedMessagesIterable;
    +
    +        try {
    +            retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
    +        } catch (final StorageException e) {
    +            getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e});
    +            context.yield();
    +            return;
    +        }
    +
    +        final List<CloudQueueMessage> cloudQueueMessages = toList(retrievedMessagesIterable);
    +
    +        for (final CloudQueueMessage message : cloudQueueMessages) {
    +            FlowFile flowFile = session.create();
    +
    +            final Map<String, String> attributes = new HashMap<>();
    +
    +            attributes.put("azure.queue.uri", cloudQueue.getUri().toString());
    +            attributes.put("azure.queue.insertionTime", message.getInsertionTime().toString());
    +            attributes.put("azure.queue.expirationTime", message.getExpirationTime().toString());
    +            attributes.put("azure.queue.messageId", message.getMessageId());
    +            attributes.put("azure.queue.popReceipt", message.getPopReceipt());
    +
    +            flowFile = session.putAllAttributes(flowFile, attributes);
    +            flowFile = session.write(flowFile, out -> {
    +                try {
    +                    out.write(message.getMessageContentAsByte());
    +                } catch (StorageException e) {
    +                    getLogger().error("Failed to write the retrieved queue message to FlowFile content due to {}", new Object[] {e});
    +                    context.yield();
    +                }
    +            });
    +
    +            session.transfer(flowFile, REL_SUCCESS);
    +            session.getProvenanceReporter().receive(flowFile, cloudQueue.getStorageUri().toString());
    +        }
    +
    +        if(autoDelete) {
    +            session.commit();
    +
    +            for (final CloudQueueMessage message : cloudQueueMessages) {
    +                try {
    +                    cloudQueue.deleteMessage(message);
    +                } catch (StorageException e) {
    +                    getLogger().error("Failed to delete the retrieved message with the id {} from the queue due to {}",
    +                            new Object[] {message.getMessageId(), e});
    +                }
    +            }
    +        }
    +
    +    }
    +
    +    @Override
    +    public Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
    +        final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
    +
    +        final int batchSize = validationContext.getProperty(BATCH_SIZE).asInteger();
    +
    +        //Azure Queue Storage supports only a max of 32 messages to be retrieved in a single invocation
    +        if (batchSize > 32) {
    +            problems.add(new ValidationResult.Builder()
    +                                             .valid(false)
    +                                             .subject(BATCH_SIZE.getDisplayName())
    +                                             .explanation("only up to 32 messages can be retrieved at a time")
    +                                             .build()
    +            );
    +        }
    +
    +        final int visibilityTimeout = validationContext.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
    +
    +        if (visibilityTimeout <= 0) {
    --- End diff --
    
    No.. `VISIBILITY_TIMEOUT` is using `TIME_PERIOD_VALIDATOR` as expected.. this custom validation is to check if the validated time period (in seconds) is `0` or less. (less may not happen) The reason why it is checked: visibility timeout of 0 secs will throw an error in runtime.. Not sure if that's a bug in the storage SDK.


---

[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2611#discussion_r183125143
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage.queue;
    +
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.queue.CloudQueueMessage;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Collections;
    +import java.util.Arrays;
    +import java.util.Set;
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.concurrent.TimeUnit;
    +
    +@SeeAlso({PutAzureQueueStorage.class})
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
    +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
    +        "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
    +        @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
    +        @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
    +        @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
    +        @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
    +})
    +public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
    +
    +    public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
    +            .name("auto-delete-messages")
    +            .displayName("Auto Delete Messages")
    +            .description("Specifies whether the received message is to be automatically deleted from the queue.")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("batch-size")
    +            .displayName("Batch Size")
    +            .description("The number of messages to be retrieved from the queue.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("32")
    +            .build();
    +
    +    public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("visibility-timeout")
    +            .displayName("Visibility Timeout")
    +            .description("The duration during which the retrieved message should be invisible to other consumers.")
    +            .required(true)
    +            .defaultValue("30 secs")
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +
    +    private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
    +            AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
    +            BATCH_SIZE, VISIBILITY_TIMEOUT));
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
    +
    +        final Iterable<CloudQueueMessage> retrievedMessagesIterable;
    +
    +        try {
    +            retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
    +        } catch (final StorageException e) {
    +            getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e});
    +            context.yield();
    +            return;
    +        }
    +
    +        final List<CloudQueueMessage> cloudQueueMessages = toList(retrievedMessagesIterable);
    +
    +        for (final CloudQueueMessage message : cloudQueueMessages) {
    +            FlowFile flowFile = session.create();
    +
    +            final Map<String, String> attributes = new HashMap<>();
    +
    +            attributes.put("azure.queue.uri", cloudQueue.getUri().toString());
    +            attributes.put("azure.queue.insertionTime", message.getInsertionTime().toString());
    +            attributes.put("azure.queue.expirationTime", message.getExpirationTime().toString());
    +            attributes.put("azure.queue.messageId", message.getMessageId());
    +            attributes.put("azure.queue.popReceipt", message.getPopReceipt());
    +
    +            flowFile = session.putAllAttributes(flowFile, attributes);
    +            flowFile = session.write(flowFile, out -> {
    +                try {
    +                    out.write(message.getMessageContentAsByte());
    +                } catch (StorageException e) {
    +                    getLogger().error("Failed to write the retrieved queue message to FlowFile content due to {}", new Object[] {e});
    +                    context.yield();
    +                }
    +            });
    +
    +            session.transfer(flowFile, REL_SUCCESS);
    +            session.getProvenanceReporter().receive(flowFile, cloudQueue.getStorageUri().toString());
    +        }
    +
    +        if(autoDelete) {
    +            session.commit();
    --- End diff --
    
    Done


---

[GitHub] nifi issue #2611: NIFI-5015: Implemented Azure Queue Storage processors

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on the issue:

    https://github.com/apache/nifi/pull/2611
  
    @pvillard31 Mind taking a look at this? 


---

[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2611#discussion_r182836230
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage.queue;
    +
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.queue.CloudQueueMessage;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Collections;
    +import java.util.Arrays;
    +import java.util.Set;
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.concurrent.TimeUnit;
    +
    +@SeeAlso({PutAzureQueueStorage.class})
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
    +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
    +        "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
    +        @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
    +        @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
    +        @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
    +        @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
    +})
    +public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
    +
    +    public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
    +            .name("auto-delete-messages")
    +            .displayName("Auto Delete Messages")
    +            .description("Specifies whether the received message is to be automatically deleted from the queue.")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("batch-size")
    +            .displayName("Batch Size")
    +            .description("The number of messages to be retrieved from the queue.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("32")
    +            .build();
    +
    +    public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("visibility-timeout")
    +            .displayName("Visibility Timeout")
    +            .description("The duration during which the retrieved message should be invisible to other consumers.")
    +            .required(true)
    +            .defaultValue("30 secs")
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +
    +    private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
    +            AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
    +            BATCH_SIZE, VISIBILITY_TIMEOUT));
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
    +
    +        final Iterable<CloudQueueMessage> retrievedMessagesIterable;
    +
    +        try {
    +            retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
    +        } catch (final StorageException e) {
    +            getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e});
    +            context.yield();
    +            return;
    +        }
    +
    +        final List<CloudQueueMessage> cloudQueueMessages = toList(retrievedMessagesIterable);
    +
    +        for (final CloudQueueMessage message : cloudQueueMessages) {
    +            FlowFile flowFile = session.create();
    +
    +            final Map<String, String> attributes = new HashMap<>();
    +
    +            attributes.put("azure.queue.uri", cloudQueue.getUri().toString());
    +            attributes.put("azure.queue.insertionTime", message.getInsertionTime().toString());
    +            attributes.put("azure.queue.expirationTime", message.getExpirationTime().toString());
    +            attributes.put("azure.queue.messageId", message.getMessageId());
    +            attributes.put("azure.queue.popReceipt", message.getPopReceipt());
    +
    +            flowFile = session.putAllAttributes(flowFile, attributes);
    +            flowFile = session.write(flowFile, out -> {
    +                try {
    +                    out.write(message.getMessageContentAsByte());
    +                } catch (StorageException e) {
    +                    getLogger().error("Failed to write the retrieved queue message to FlowFile content due to {}", new Object[] {e});
    +                    context.yield();
    +                }
    +            });
    +
    +            session.transfer(flowFile, REL_SUCCESS);
    +            session.getProvenanceReporter().receive(flowFile, cloudQueue.getStorageUri().toString());
    +        }
    +
    +        if(autoDelete) {
    +            session.commit();
    --- End diff --
    
    should we clearly indicate that in case of issues we could create duplicates? (in case we committed the session but didn't delete the messages from the queue)


---

[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2611#discussion_r182834919
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java ---
    @@ -0,0 +1,130 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage.queue;
    +
    +import com.microsoft.azure.storage.CloudStorageAccount;
    +import com.microsoft.azure.storage.StorageCredentials;
    +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.queue.CloudQueue;
    +import com.microsoft.azure.storage.queue.CloudQueueClient;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
    +
    +import java.net.URI;
    +import java.net.URISyntaxException;
    +import java.security.InvalidKeyException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
    +            .name("storage-cloudQueue-name")
    +            .displayName("Queue Name")
    +            .description("Name of the Azure Storage Queue")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    --- End diff --
    
    shouldn't it be with the variable registry scope?


---

[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2611#discussion_r182836816
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage.queue;
    +
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.queue.CloudQueueMessage;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Collections;
    +import java.util.Arrays;
    +import java.util.Set;
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.concurrent.TimeUnit;
    +
    +@SeeAlso({PutAzureQueueStorage.class})
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
    +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
    +        "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
    +        @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
    +        @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
    +        @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
    +        @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
    +})
    +public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
    +
    +    public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
    +            .name("auto-delete-messages")
    +            .displayName("Auto Delete Messages")
    +            .description("Specifies whether the received message is to be automatically deleted from the queue.")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("batch-size")
    +            .displayName("Batch Size")
    +            .description("The number of messages to be retrieved from the queue.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("32")
    +            .build();
    +
    +    public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("visibility-timeout")
    +            .displayName("Visibility Timeout")
    +            .description("The duration during which the retrieved message should be invisible to other consumers.")
    +            .required(true)
    +            .defaultValue("30 secs")
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +
    +    private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
    +            AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
    +            BATCH_SIZE, VISIBILITY_TIMEOUT));
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
    +
    +        final Iterable<CloudQueueMessage> retrievedMessagesIterable;
    +
    +        try {
    +            retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
    +        } catch (final StorageException e) {
    +            getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e});
    +            context.yield();
    +            return;
    +        }
    +
    +        final List<CloudQueueMessage> cloudQueueMessages = toList(retrievedMessagesIterable);
    +
    +        for (final CloudQueueMessage message : cloudQueueMessages) {
    +            FlowFile flowFile = session.create();
    +
    +            final Map<String, String> attributes = new HashMap<>();
    +
    +            attributes.put("azure.queue.uri", cloudQueue.getUri().toString());
    +            attributes.put("azure.queue.insertionTime", message.getInsertionTime().toString());
    +            attributes.put("azure.queue.expirationTime", message.getExpirationTime().toString());
    +            attributes.put("azure.queue.messageId", message.getMessageId());
    +            attributes.put("azure.queue.popReceipt", message.getPopReceipt());
    +
    +            flowFile = session.putAllAttributes(flowFile, attributes);
    +            flowFile = session.write(flowFile, out -> {
    +                try {
    +                    out.write(message.getMessageContentAsByte());
    +                } catch (StorageException e) {
    +                    getLogger().error("Failed to write the retrieved queue message to FlowFile content due to {}", new Object[] {e});
    +                    context.yield();
    +                }
    +            });
    +
    +            session.transfer(flowFile, REL_SUCCESS);
    +            session.getProvenanceReporter().receive(flowFile, cloudQueue.getStorageUri().toString());
    +        }
    +
    +        if(autoDelete) {
    +            session.commit();
    +
    +            for (final CloudQueueMessage message : cloudQueueMessages) {
    +                try {
    +                    cloudQueue.deleteMessage(message);
    +                } catch (StorageException e) {
    +                    getLogger().error("Failed to delete the retrieved message with the id {} from the queue due to {}",
    +                            new Object[] {message.getMessageId(), e});
    +                }
    +            }
    +        }
    +
    +    }
    +
    +    @Override
    +    public Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
    +        final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
    +
    +        final int batchSize = validationContext.getProperty(BATCH_SIZE).asInteger();
    +
    +        //Azure Queue Storage supports only a max of 32 messages to be retrieved in a single invocation
    +        if (batchSize > 32) {
    +            problems.add(new ValidationResult.Builder()
    +                                             .valid(false)
    +                                             .subject(BATCH_SIZE.getDisplayName())
    +                                             .explanation("only up to 32 messages can be retrieved at a time")
    +                                             .build()
    +            );
    +        }
    +
    +        final int visibilityTimeout = validationContext.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
    +
    +        if (visibilityTimeout <= 0) {
    --- End diff --
    
    You can specify in the property that you want a positive integer validator instead of creating a custom validator


---

[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2611#discussion_r182965345
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage.queue;
    +
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.queue.CloudQueueMessage;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Collections;
    +import java.util.Arrays;
    +import java.util.Set;
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.concurrent.TimeUnit;
    +
    +@SeeAlso({PutAzureQueueStorage.class})
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
    +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
    +        "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
    +        @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
    +        @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
    +        @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
    +        @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
    +})
    +public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
    +
    +    public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
    +            .name("auto-delete-messages")
    +            .displayName("Auto Delete Messages")
    +            .description("Specifies whether the received message is to be automatically deleted from the queue.")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("batch-size")
    +            .displayName("Batch Size")
    +            .description("The number of messages to be retrieved from the queue.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("32")
    +            .build();
    +
    +    public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("visibility-timeout")
    +            .displayName("Visibility Timeout")
    +            .description("The duration during which the retrieved message should be invisible to other consumers.")
    +            .required(true)
    +            .defaultValue("30 secs")
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +
    +    private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
    +            AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
    +            BATCH_SIZE, VISIBILITY_TIMEOUT));
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
    +
    +        final Iterable<CloudQueueMessage> retrievedMessagesIterable;
    +
    +        try {
    +            retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
    +        } catch (final StorageException e) {
    +            getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e});
    +            context.yield();
    +            return;
    +        }
    +
    +        final List<CloudQueueMessage> cloudQueueMessages = toList(retrievedMessagesIterable);
    +
    +        for (final CloudQueueMessage message : cloudQueueMessages) {
    +            FlowFile flowFile = session.create();
    +
    +            final Map<String, String> attributes = new HashMap<>();
    +
    +            attributes.put("azure.queue.uri", cloudQueue.getUri().toString());
    +            attributes.put("azure.queue.insertionTime", message.getInsertionTime().toString());
    +            attributes.put("azure.queue.expirationTime", message.getExpirationTime().toString());
    +            attributes.put("azure.queue.messageId", message.getMessageId());
    +            attributes.put("azure.queue.popReceipt", message.getPopReceipt());
    +
    +            flowFile = session.putAllAttributes(flowFile, attributes);
    +            flowFile = session.write(flowFile, out -> {
    +                try {
    +                    out.write(message.getMessageContentAsByte());
    +                } catch (StorageException e) {
    +                    getLogger().error("Failed to write the retrieved queue message to FlowFile content due to {}", new Object[] {e});
    +                    context.yield();
    +                }
    +            });
    +
    +            session.transfer(flowFile, REL_SUCCESS);
    +            session.getProvenanceReporter().receive(flowFile, cloudQueue.getStorageUri().toString());
    +        }
    +
    +        if(autoDelete) {
    +            session.commit();
    --- End diff --
    
    A note in the capability description of the processor sounds good to me.


---

[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2611#discussion_r182948045
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java ---
    @@ -0,0 +1,130 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage.queue;
    +
    +import com.microsoft.azure.storage.CloudStorageAccount;
    +import com.microsoft.azure.storage.StorageCredentials;
    +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.queue.CloudQueue;
    +import com.microsoft.azure.storage.queue.CloudQueueClient;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
    +
    +import java.net.URI;
    +import java.net.URISyntaxException;
    +import java.security.InvalidKeyException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
    +            .name("storage-cloudQueue-name")
    +            .displayName("Queue Name")
    +            .description("Name of the Azure Storage Queue")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All successfully processed FlowFiles are routed to this relationship")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("Unsuccessful operations will be transferred to the failure relationship.")
    +            .build();
    +
    +    private static final String FORMAT_QUEUE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
    +    private static final String FORMAT_QUEUE_BASE_URI = "https://%s.queue.core.windows.net";
    +
    +    private static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
    +
    +    protected CloudQueue cloudQueue;
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
    +        final List<ValidationResult> problems = new ArrayList<>(AzureStorageUtils.validateCredentialProperties(validationContext));
    +
    +        final String queueName = validationContext.getProperty(QUEUE).evaluateAttributeExpressions().getValue();
    +        if(!StringUtils.isAllLowerCase(queueName)) {
    --- End diff --
    
    That's what I thought because that's the simplest thing to do. However I wanted to emphasis to the user about the same. I din't know any other way other than letting the validator do its thing.


---

[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2611#discussion_r182946138
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java ---
    @@ -0,0 +1,130 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage.queue;
    +
    +import com.microsoft.azure.storage.CloudStorageAccount;
    +import com.microsoft.azure.storage.StorageCredentials;
    +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.queue.CloudQueue;
    +import com.microsoft.azure.storage.queue.CloudQueueClient;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
    +
    +import java.net.URI;
    +import java.net.URISyntaxException;
    +import java.security.InvalidKeyException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
    +            .name("storage-cloudQueue-name")
    +            .displayName("Queue Name")
    +            .description("Name of the Azure Storage Queue")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    --- End diff --
    
    Any particular reason why it shouldn't be expected to be present as a flowfile attribute? It is not sensitive so I thought it can be read from both FlowFile as well as Variable Registry


---

[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2611#discussion_r182964837
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java ---
    @@ -0,0 +1,130 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage.queue;
    +
    +import com.microsoft.azure.storage.CloudStorageAccount;
    +import com.microsoft.azure.storage.StorageCredentials;
    +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.queue.CloudQueue;
    +import com.microsoft.azure.storage.queue.CloudQueueClient;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
    +
    +import java.net.URI;
    +import java.net.URISyntaxException;
    +import java.security.InvalidKeyException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
    +            .name("storage-cloudQueue-name")
    +            .displayName("Queue Name")
    +            .description("Name of the Azure Storage Queue")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    --- End diff --
    
    I don't see where you're accessing this property using flow files. In the abstract class, you're doing: 
    ````java
    final String queueName = validationContext.getProperty(QUEUE).evaluateAttributeExpressions().getValue();
    ````
    in the ``@OnScheduled`` method. So you're not using the flow files. Am I missing something?


---

[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2611#discussion_r182948251
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage.queue;
    +
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.queue.CloudQueueMessage;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Collections;
    +import java.util.Arrays;
    +import java.util.Set;
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.concurrent.TimeUnit;
    +
    +@SeeAlso({PutAzureQueueStorage.class})
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
    +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
    +        "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
    +        @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
    +        @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
    +        @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
    +        @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
    +})
    +public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
    +
    +    public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
    +            .name("auto-delete-messages")
    +            .displayName("Auto Delete Messages")
    +            .description("Specifies whether the received message is to be automatically deleted from the queue.")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("batch-size")
    +            .displayName("Batch Size")
    +            .description("The number of messages to be retrieved from the queue.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("32")
    +            .build();
    +
    +    public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("visibility-timeout")
    +            .displayName("Visibility Timeout")
    +            .description("The duration during which the retrieved message should be invisible to other consumers.")
    +            .required(true)
    +            .defaultValue("30 secs")
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +
    +    private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
    +            AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
    +            BATCH_SIZE, VISIBILITY_TIMEOUT));
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
    +
    +        final Iterable<CloudQueueMessage> retrievedMessagesIterable;
    +
    +        try {
    +            retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
    +        } catch (final StorageException e) {
    +            getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e});
    +            context.yield();
    +            return;
    +        }
    +
    +        final List<CloudQueueMessage> cloudQueueMessages = toList(retrievedMessagesIterable);
    +
    +        for (final CloudQueueMessage message : cloudQueueMessages) {
    +            FlowFile flowFile = session.create();
    +
    +            final Map<String, String> attributes = new HashMap<>();
    +
    +            attributes.put("azure.queue.uri", cloudQueue.getUri().toString());
    +            attributes.put("azure.queue.insertionTime", message.getInsertionTime().toString());
    +            attributes.put("azure.queue.expirationTime", message.getExpirationTime().toString());
    +            attributes.put("azure.queue.messageId", message.getMessageId());
    +            attributes.put("azure.queue.popReceipt", message.getPopReceipt());
    +
    +            flowFile = session.putAllAttributes(flowFile, attributes);
    +            flowFile = session.write(flowFile, out -> {
    +                try {
    +                    out.write(message.getMessageContentAsByte());
    +                } catch (StorageException e) {
    +                    getLogger().error("Failed to write the retrieved queue message to FlowFile content due to {}", new Object[] {e});
    +                    context.yield();
    +                }
    +            });
    +
    +            session.transfer(flowFile, REL_SUCCESS);
    +            session.getProvenanceReporter().receive(flowFile, cloudQueue.getStorageUri().toString());
    +        }
    +
    +        if(autoDelete) {
    +            session.commit();
    --- End diff --
    
    Good point. Where should we add it to? Documentation ?


---

[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2611#discussion_r182835135
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java ---
    @@ -0,0 +1,130 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage.queue;
    +
    +import com.microsoft.azure.storage.CloudStorageAccount;
    +import com.microsoft.azure.storage.StorageCredentials;
    +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.queue.CloudQueue;
    +import com.microsoft.azure.storage.queue.CloudQueueClient;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
    +
    +import java.net.URI;
    +import java.net.URISyntaxException;
    +import java.security.InvalidKeyException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
    +            .name("storage-cloudQueue-name")
    +            .displayName("Queue Name")
    +            .description("Name of the Azure Storage Queue")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All successfully processed FlowFiles are routed to this relationship")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("Unsuccessful operations will be transferred to the failure relationship.")
    +            .build();
    +
    +    private static final String FORMAT_QUEUE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
    --- End diff --
    
    should it be configurable in case user wants to provide more parameters? with dynamic properties?


---

[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2611#discussion_r182836695
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage.queue;
    +
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.queue.CloudQueueMessage;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Collections;
    +import java.util.Arrays;
    +import java.util.Set;
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.concurrent.TimeUnit;
    +
    +@SeeAlso({PutAzureQueueStorage.class})
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
    +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
    +        "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
    +        @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
    +        @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
    +        @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
    +        @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
    +})
    +public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
    +
    +    public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
    +            .name("auto-delete-messages")
    +            .displayName("Auto Delete Messages")
    +            .description("Specifies whether the received message is to be automatically deleted from the queue.")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("batch-size")
    +            .displayName("Batch Size")
    +            .description("The number of messages to be retrieved from the queue.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("32")
    +            .build();
    +
    +    public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("visibility-timeout")
    +            .displayName("Visibility Timeout")
    +            .description("The duration during which the retrieved message should be invisible to other consumers.")
    +            .required(true)
    +            .defaultValue("30 secs")
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +
    +    private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
    +            AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
    +            BATCH_SIZE, VISIBILITY_TIMEOUT));
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
    +
    +        final Iterable<CloudQueueMessage> retrievedMessagesIterable;
    +
    +        try {
    +            retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
    +        } catch (final StorageException e) {
    +            getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e});
    +            context.yield();
    +            return;
    +        }
    +
    +        final List<CloudQueueMessage> cloudQueueMessages = toList(retrievedMessagesIterable);
    +
    +        for (final CloudQueueMessage message : cloudQueueMessages) {
    +            FlowFile flowFile = session.create();
    +
    +            final Map<String, String> attributes = new HashMap<>();
    +
    +            attributes.put("azure.queue.uri", cloudQueue.getUri().toString());
    +            attributes.put("azure.queue.insertionTime", message.getInsertionTime().toString());
    +            attributes.put("azure.queue.expirationTime", message.getExpirationTime().toString());
    +            attributes.put("azure.queue.messageId", message.getMessageId());
    +            attributes.put("azure.queue.popReceipt", message.getPopReceipt());
    +
    +            flowFile = session.putAllAttributes(flowFile, attributes);
    +            flowFile = session.write(flowFile, out -> {
    +                try {
    +                    out.write(message.getMessageContentAsByte());
    +                } catch (StorageException e) {
    +                    getLogger().error("Failed to write the retrieved queue message to FlowFile content due to {}", new Object[] {e});
    +                    context.yield();
    +                }
    +            });
    +
    +            session.transfer(flowFile, REL_SUCCESS);
    +            session.getProvenanceReporter().receive(flowFile, cloudQueue.getStorageUri().toString());
    +        }
    +
    +        if(autoDelete) {
    +            session.commit();
    +
    +            for (final CloudQueueMessage message : cloudQueueMessages) {
    +                try {
    +                    cloudQueue.deleteMessage(message);
    +                } catch (StorageException e) {
    +                    getLogger().error("Failed to delete the retrieved message with the id {} from the queue due to {}",
    +                            new Object[] {message.getMessageId(), e});
    +                }
    +            }
    +        }
    +
    +    }
    +
    +    @Override
    +    public Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
    +        final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
    +
    +        final int batchSize = validationContext.getProperty(BATCH_SIZE).asInteger();
    --- End diff --
    
    I believe you can set a range validator directly in the property (IIRC it's in CommonValidator or something like that)


---

[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/2611


---

[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2611#discussion_r183126762
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java ---
    @@ -0,0 +1,130 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage.queue;
    +
    +import com.microsoft.azure.storage.CloudStorageAccount;
    +import com.microsoft.azure.storage.StorageCredentials;
    +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.queue.CloudQueue;
    +import com.microsoft.azure.storage.queue.CloudQueueClient;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
    +
    +import java.net.URI;
    +import java.net.URISyntaxException;
    +import java.security.InvalidKeyException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
    +            .name("storage-cloudQueue-name")
    +            .displayName("Queue Name")
    +            .description("Name of the Azure Storage Queue")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    --- End diff --
    
    You were right. Removed `@OnScheduled` not just because of this reason alone but `ACCOUNT_NAME` and `ACCOUNT_KEY` or `SAS_TOKEN` properties use `ExpressionLanguageScope.FLOWFILES` so that could be a potential bug since with that method dint get flowfile as a parameter before. Now it has been changed.


---

[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2611#discussion_r183125717
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage.queue;
    +
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.queue.CloudQueueMessage;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Collections;
    +import java.util.Arrays;
    +import java.util.Set;
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.concurrent.TimeUnit;
    +
    +@SeeAlso({PutAzureQueueStorage.class})
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
    +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
    +        "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
    +        @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
    +        @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
    +        @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
    +        @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
    +})
    +public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
    +
    +    public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
    +            .name("auto-delete-messages")
    +            .displayName("Auto Delete Messages")
    +            .description("Specifies whether the received message is to be automatically deleted from the queue.")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("batch-size")
    +            .displayName("Batch Size")
    +            .description("The number of messages to be retrieved from the queue.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("32")
    +            .build();
    +
    +    public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("visibility-timeout")
    +            .displayName("Visibility Timeout")
    +            .description("The duration during which the retrieved message should be invisible to other consumers.")
    +            .required(true)
    +            .defaultValue("30 secs")
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +
    +    private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
    +            AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
    +            BATCH_SIZE, VISIBILITY_TIMEOUT));
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
    +
    +        final Iterable<CloudQueueMessage> retrievedMessagesIterable;
    +
    +        try {
    +            retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
    +        } catch (final StorageException e) {
    +            getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e});
    +            context.yield();
    +            return;
    +        }
    +
    +        final List<CloudQueueMessage> cloudQueueMessages = toList(retrievedMessagesIterable);
    +
    +        for (final CloudQueueMessage message : cloudQueueMessages) {
    +            FlowFile flowFile = session.create();
    +
    +            final Map<String, String> attributes = new HashMap<>();
    +
    +            attributes.put("azure.queue.uri", cloudQueue.getUri().toString());
    +            attributes.put("azure.queue.insertionTime", message.getInsertionTime().toString());
    +            attributes.put("azure.queue.expirationTime", message.getExpirationTime().toString());
    +            attributes.put("azure.queue.messageId", message.getMessageId());
    +            attributes.put("azure.queue.popReceipt", message.getPopReceipt());
    +
    +            flowFile = session.putAllAttributes(flowFile, attributes);
    +            flowFile = session.write(flowFile, out -> {
    +                try {
    +                    out.write(message.getMessageContentAsByte());
    +                } catch (StorageException e) {
    +                    getLogger().error("Failed to write the retrieved queue message to FlowFile content due to {}", new Object[] {e});
    +                    context.yield();
    +                }
    +            });
    +
    +            session.transfer(flowFile, REL_SUCCESS);
    +            session.getProvenanceReporter().receive(flowFile, cloudQueue.getStorageUri().toString());
    +        }
    +
    +        if(autoDelete) {
    +            session.commit();
    +
    +            for (final CloudQueueMessage message : cloudQueueMessages) {
    +                try {
    +                    cloudQueue.deleteMessage(message);
    +                } catch (StorageException e) {
    +                    getLogger().error("Failed to delete the retrieved message with the id {} from the queue due to {}",
    +                            new Object[] {message.getMessageId(), e});
    +                }
    +            }
    +        }
    +
    +    }
    +
    +    @Override
    +    public Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
    +        final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
    +
    +        final int batchSize = validationContext.getProperty(BATCH_SIZE).asInteger();
    --- End diff --
    
    Done. It is `createLongValidator` although I feel it could have been better named in order to be easily found while developing time itself :)


---

[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2611#discussion_r182835025
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java ---
    @@ -0,0 +1,130 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage.queue;
    +
    +import com.microsoft.azure.storage.CloudStorageAccount;
    +import com.microsoft.azure.storage.StorageCredentials;
    +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.queue.CloudQueue;
    +import com.microsoft.azure.storage.queue.CloudQueueClient;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
    +
    +import java.net.URI;
    +import java.net.URISyntaxException;
    +import java.security.InvalidKeyException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
    +            .name("storage-cloudQueue-name")
    +            .displayName("Queue Name")
    +            .description("Name of the Azure Storage Queue")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All successfully processed FlowFiles are routed to this relationship")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("Unsuccessful operations will be transferred to the failure relationship.")
    +            .build();
    +
    +    private static final String FORMAT_QUEUE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
    +    private static final String FORMAT_QUEUE_BASE_URI = "https://%s.queue.core.windows.net";
    --- End diff --
    
    are we sure this will never change? could be something provided by the user?


---

[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2611#discussion_r182948346
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage.queue;
    +
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.queue.CloudQueueMessage;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Collections;
    +import java.util.Arrays;
    +import java.util.Set;
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.concurrent.TimeUnit;
    +
    +@SeeAlso({PutAzureQueueStorage.class})
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
    +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
    +        "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
    +        @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
    +        @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
    +        @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
    +        @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
    +})
    +public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
    +
    +    public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
    +            .name("auto-delete-messages")
    +            .displayName("Auto Delete Messages")
    +            .description("Specifies whether the received message is to be automatically deleted from the queue.")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("batch-size")
    +            .displayName("Batch Size")
    +            .description("The number of messages to be retrieved from the queue.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("32")
    +            .build();
    +
    +    public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("visibility-timeout")
    +            .displayName("Visibility Timeout")
    +            .description("The duration during which the retrieved message should be invisible to other consumers.")
    +            .required(true)
    +            .defaultValue("30 secs")
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +
    +    private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
    +            AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
    +            BATCH_SIZE, VISIBILITY_TIMEOUT));
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
    +
    +        final Iterable<CloudQueueMessage> retrievedMessagesIterable;
    +
    +        try {
    +            retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
    --- End diff --
    
    You got me.. Forgot to do a thorough check on that. Will do that and update.


---

[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2611#discussion_r182948377
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage.queue;
    +
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.queue.CloudQueueMessage;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Collections;
    +import java.util.Arrays;
    +import java.util.Set;
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.concurrent.TimeUnit;
    +
    +@SeeAlso({PutAzureQueueStorage.class})
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
    +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
    +        "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
    +        @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
    +        @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
    +        @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
    +        @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
    +})
    +public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
    +
    +    public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
    +            .name("auto-delete-messages")
    +            .displayName("Auto Delete Messages")
    +            .description("Specifies whether the received message is to be automatically deleted from the queue.")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("batch-size")
    +            .displayName("Batch Size")
    +            .description("The number of messages to be retrieved from the queue.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("32")
    +            .build();
    +
    +    public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("visibility-timeout")
    +            .displayName("Visibility Timeout")
    +            .description("The duration during which the retrieved message should be invisible to other consumers.")
    +            .required(true)
    +            .defaultValue("30 secs")
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +
    +    private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
    +            AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
    +            BATCH_SIZE, VISIBILITY_TIMEOUT));
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +
    +        final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
    +
    +        final Iterable<CloudQueueMessage> retrievedMessagesIterable;
    +
    +        try {
    +            retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
    +        } catch (final StorageException e) {
    +            getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e});
    +            context.yield();
    +            return;
    +        }
    +
    +        final List<CloudQueueMessage> cloudQueueMessages = toList(retrievedMessagesIterable);
    +
    +        for (final CloudQueueMessage message : cloudQueueMessages) {
    +            FlowFile flowFile = session.create();
    +
    +            final Map<String, String> attributes = new HashMap<>();
    +
    +            attributes.put("azure.queue.uri", cloudQueue.getUri().toString());
    +            attributes.put("azure.queue.insertionTime", message.getInsertionTime().toString());
    +            attributes.put("azure.queue.expirationTime", message.getExpirationTime().toString());
    +            attributes.put("azure.queue.messageId", message.getMessageId());
    +            attributes.put("azure.queue.popReceipt", message.getPopReceipt());
    +
    +            flowFile = session.putAllAttributes(flowFile, attributes);
    +            flowFile = session.write(flowFile, out -> {
    +                try {
    +                    out.write(message.getMessageContentAsByte());
    +                } catch (StorageException e) {
    +                    getLogger().error("Failed to write the retrieved queue message to FlowFile content due to {}", new Object[] {e});
    +                    context.yield();
    +                }
    +            });
    +
    +            session.transfer(flowFile, REL_SUCCESS);
    +            session.getProvenanceReporter().receive(flowFile, cloudQueue.getStorageUri().toString());
    +        }
    +
    +        if(autoDelete) {
    +            session.commit();
    +
    +            for (final CloudQueueMessage message : cloudQueueMessages) {
    +                try {
    +                    cloudQueue.deleteMessage(message);
    +                } catch (StorageException e) {
    +                    getLogger().error("Failed to delete the retrieved message with the id {} from the queue due to {}",
    +                            new Object[] {message.getMessageId(), e});
    +                }
    +            }
    +        }
    +
    +    }
    +
    +    @Override
    +    public Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
    +        final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
    +
    +        final int batchSize = validationContext.getProperty(BATCH_SIZE).asInteger();
    --- End diff --
    
    Will take a look.


---

[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2611#discussion_r182965161
  
    --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java ---
    @@ -0,0 +1,130 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.azure.storage.queue;
    +
    +import com.microsoft.azure.storage.CloudStorageAccount;
    +import com.microsoft.azure.storage.StorageCredentials;
    +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
    +import com.microsoft.azure.storage.StorageException;
    +import com.microsoft.azure.storage.queue.CloudQueue;
    +import com.microsoft.azure.storage.queue.CloudQueueClient;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
    +
    +import java.net.URI;
    +import java.net.URISyntaxException;
    +import java.security.InvalidKeyException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
    +            .name("storage-cloudQueue-name")
    +            .displayName("Queue Name")
    +            .description("Name of the Azure Storage Queue")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All successfully processed FlowFiles are routed to this relationship")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("Unsuccessful operations will be transferred to the failure relationship.")
    +            .build();
    +
    +    private static final String FORMAT_QUEUE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
    --- End diff --
    
    No, it's just in case some are added and we don't want to update the code. But we can leave as-is.


---