You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by zenfenan <gi...@git.apache.org> on 2018/04/06 15:43:13 UTC
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
GitHub user zenfenan opened a pull request:
https://github.com/apache/nifi/pull/2611
NIFI-5015: Implemented Azure Queue Storage processors
Thank you for submitting a contribution to Apache NiFi.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
- [x] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
- [x] Has your PR been rebased against the latest commit within the target branch (typically master)?
- [x] Is your initial contribution a single, squashed commit?
### For code changes:
- [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
- [x] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
- [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
- [x] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
### Note:
Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/zenfenan/nifi NIFI-5015
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/nifi/pull/2611.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2611
----
commit 42e01874c44923d6b27955a7e39a18007c67940c
Author: zenfenan <si...@...>
Date: 2018-04-02T02:18:37Z
NIFI-5015: Implemented Azure Queue Storage processors
----
---
[GitHub] nifi issue #2611: NIFI-5015: Implemented Azure Queue Storage processors
Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:
https://github.com/apache/nifi/pull/2611
Thanks for the update @zenfenan - it LGTM, +1, merging to master
---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2611#discussion_r182946668
--- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueue;
+import com.microsoft.azure.storage.queue.CloudQueueClient;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
+
+ public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
+ .name("storage-cloudQueue-name")
+ .displayName("Queue Name")
+ .description("Name of the Azure Storage Queue")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All successfully processed FlowFiles are routed to this relationship")
+ .build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("Unsuccessful operations will be transferred to the failure relationship.")
+ .build();
+
+ private static final String FORMAT_QUEUE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
+ private static final String FORMAT_QUEUE_BASE_URI = "https://%s.queue.core.windows.net";
--- End diff --
The end point won't change unless Azure wants to break backwards compatibility.. Just kidding.
I don't think the end point will change since it is what the `azure-storage` SDKs use. However, it might be possible to property'lize one thing, the connection mode. The SDKs, I believe, support both `http` as well as `https` but since https is recommended both in the Azure docs as well as in general, I went with https by default. Also the blob processors use https.
---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2611#discussion_r182947890
--- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueue;
+import com.microsoft.azure.storage.queue.CloudQueueClient;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
+
+ public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
+ .name("storage-cloudQueue-name")
+ .displayName("Queue Name")
+ .description("Name of the Azure Storage Queue")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All successfully processed FlowFiles are routed to this relationship")
+ .build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("Unsuccessful operations will be transferred to the failure relationship.")
+ .build();
+
+ private static final String FORMAT_QUEUE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
--- End diff --
I don't think the connection string takes any other properties other than `AccountName`, `AccountKey`, `DefaultEndpointsProtocol`, `EndpointSuffix` (for Azure China & Azure Government). You are aware about any?
---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2611#discussion_r182948627
--- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueueMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Collections;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+@SeeAlso({PutAzureQueueStorage.class})
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
+@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
+ "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
+ @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
+ @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
+ @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
+ @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
+})
+public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
+
+ public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
+ .name("auto-delete-messages")
+ .displayName("Auto Delete Messages")
+ .description("Specifies whether the received message is to be automatically deleted from the queue.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("batch-size")
+ .displayName("Batch Size")
+ .description("The number of messages to be retrieved from the queue.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("32")
+ .build();
+
+ public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("visibility-timeout")
+ .displayName("Visibility Timeout")
+ .description("The duration during which the retrieved message should be invisible to other consumers.")
+ .required(true)
+ .defaultValue("30 secs")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
+ AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
+ BATCH_SIZE, VISIBILITY_TIMEOUT));
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Collections.singleton(REL_SUCCESS);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+
+ final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
+
+ final Iterable<CloudQueueMessage> retrievedMessagesIterable;
+
+ try {
+ retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
+ } catch (final StorageException e) {
+ getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e});
+ context.yield();
+ return;
+ }
+
+ final List<CloudQueueMessage> cloudQueueMessages = toList(retrievedMessagesIterable);
+
+ for (final CloudQueueMessage message : cloudQueueMessages) {
+ FlowFile flowFile = session.create();
+
+ final Map<String, String> attributes = new HashMap<>();
+
+ attributes.put("azure.queue.uri", cloudQueue.getUri().toString());
+ attributes.put("azure.queue.insertionTime", message.getInsertionTime().toString());
+ attributes.put("azure.queue.expirationTime", message.getExpirationTime().toString());
+ attributes.put("azure.queue.messageId", message.getMessageId());
+ attributes.put("azure.queue.popReceipt", message.getPopReceipt());
+
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ flowFile = session.write(flowFile, out -> {
+ try {
+ out.write(message.getMessageContentAsByte());
+ } catch (StorageException e) {
+ getLogger().error("Failed to write the retrieved queue message to FlowFile content due to {}", new Object[] {e});
+ context.yield();
+ }
+ });
+
+ session.transfer(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().receive(flowFile, cloudQueue.getStorageUri().toString());
+ }
+
+ if(autoDelete) {
+ session.commit();
+
+ for (final CloudQueueMessage message : cloudQueueMessages) {
+ try {
+ cloudQueue.deleteMessage(message);
+ } catch (StorageException e) {
+ getLogger().error("Failed to delete the retrieved message with the id {} from the queue due to {}",
+ new Object[] {message.getMessageId(), e});
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+ final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
+
+ final int batchSize = validationContext.getProperty(BATCH_SIZE).asInteger();
+
+ //Azure Queue Storage supports only a max of 32 messages to be retrieved in a single invocation
+ if (batchSize > 32) {
+ problems.add(new ValidationResult.Builder()
+ .valid(false)
+ .subject(BATCH_SIZE.getDisplayName())
+ .explanation("only up to 32 messages can be retrieved at a time")
+ .build()
+ );
+ }
+
+ final int visibilityTimeout = validationContext.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+
+ if (visibilityTimeout <= 0) {
--- End diff --
My bad.. I'll fix it.
---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2611#discussion_r182836448
--- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueueMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Collections;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+@SeeAlso({PutAzureQueueStorage.class})
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
+@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
+ "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
+ @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
+ @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
+ @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
+ @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
+})
+public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
+
+ public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
+ .name("auto-delete-messages")
+ .displayName("Auto Delete Messages")
+ .description("Specifies whether the received message is to be automatically deleted from the queue.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("batch-size")
+ .displayName("Batch Size")
+ .description("The number of messages to be retrieved from the queue.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("32")
+ .build();
+
+ public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("visibility-timeout")
+ .displayName("Visibility Timeout")
+ .description("The duration during which the retrieved message should be invisible to other consumers.")
+ .required(true)
+ .defaultValue("30 secs")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
+ AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
+ BATCH_SIZE, VISIBILITY_TIMEOUT));
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Collections.singleton(REL_SUCCESS);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+
+ final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
+
+ final Iterable<CloudQueueMessage> retrievedMessagesIterable;
+
+ try {
+ retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
--- End diff --
how does this work in a clustered approach? should this processor only run on primary node? if yes, could be indicated in the doc/processor description.
---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2611#discussion_r183125399
--- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueueMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Collections;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+@SeeAlso({PutAzureQueueStorage.class})
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
+@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
+ "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
+ @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
+ @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
+ @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
+ @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
+})
+public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
+
+ public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
+ .name("auto-delete-messages")
+ .displayName("Auto Delete Messages")
+ .description("Specifies whether the received message is to be automatically deleted from the queue.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("batch-size")
+ .displayName("Batch Size")
+ .description("The number of messages to be retrieved from the queue.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("32")
+ .build();
+
+ public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("visibility-timeout")
+ .displayName("Visibility Timeout")
+ .description("The duration during which the retrieved message should be invisible to other consumers.")
+ .required(true)
+ .defaultValue("30 secs")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
+ AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
+ BATCH_SIZE, VISIBILITY_TIMEOUT));
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Collections.singleton(REL_SUCCESS);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+
+ final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
+
+ final Iterable<CloudQueueMessage> retrievedMessagesIterable;
+
+ try {
+ retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
--- End diff --
Checked it and in clustered scenarios it works as expected. Only one node at a time retrieves the message. Tested with varying message counts as well different number of nodes.
---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2611#discussion_r182835251
--- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueue;
+import com.microsoft.azure.storage.queue.CloudQueueClient;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
+
+ public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
+ .name("storage-cloudQueue-name")
+ .displayName("Queue Name")
+ .description("Name of the Azure Storage Queue")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All successfully processed FlowFiles are routed to this relationship")
+ .build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("Unsuccessful operations will be transferred to the failure relationship.")
+ .build();
+
+ private static final String FORMAT_QUEUE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
+ private static final String FORMAT_QUEUE_BASE_URI = "https://%s.queue.core.windows.net";
+
+ private static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
+
+ protected CloudQueue cloudQueue;
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+ final List<ValidationResult> problems = new ArrayList<>(AzureStorageUtils.validateCredentialProperties(validationContext));
+
+ final String queueName = validationContext.getProperty(QUEUE).evaluateAttributeExpressions().getValue();
+ if(!StringUtils.isAllLowerCase(queueName)) {
--- End diff --
instead of doing a custom validate this could be forced to lower case directly in the code, no?
---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2611#discussion_r182958368
--- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueueMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Collections;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+@SeeAlso({PutAzureQueueStorage.class})
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
+@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
+ "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
+ @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
+ @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
+ @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
+ @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
+})
+public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
+
+ public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
+ .name("auto-delete-messages")
+ .displayName("Auto Delete Messages")
+ .description("Specifies whether the received message is to be automatically deleted from the queue.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("batch-size")
+ .displayName("Batch Size")
+ .description("The number of messages to be retrieved from the queue.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("32")
+ .build();
+
+ public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("visibility-timeout")
+ .displayName("Visibility Timeout")
+ .description("The duration during which the retrieved message should be invisible to other consumers.")
+ .required(true)
+ .defaultValue("30 secs")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
+ AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
+ BATCH_SIZE, VISIBILITY_TIMEOUT));
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Collections.singleton(REL_SUCCESS);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+
+ final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
+
+ final Iterable<CloudQueueMessage> retrievedMessagesIterable;
+
+ try {
+ retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
+ } catch (final StorageException e) {
+ getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e});
+ context.yield();
+ return;
+ }
+
+ final List<CloudQueueMessage> cloudQueueMessages = toList(retrievedMessagesIterable);
+
+ for (final CloudQueueMessage message : cloudQueueMessages) {
+ FlowFile flowFile = session.create();
+
+ final Map<String, String> attributes = new HashMap<>();
+
+ attributes.put("azure.queue.uri", cloudQueue.getUri().toString());
+ attributes.put("azure.queue.insertionTime", message.getInsertionTime().toString());
+ attributes.put("azure.queue.expirationTime", message.getExpirationTime().toString());
+ attributes.put("azure.queue.messageId", message.getMessageId());
+ attributes.put("azure.queue.popReceipt", message.getPopReceipt());
+
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ flowFile = session.write(flowFile, out -> {
+ try {
+ out.write(message.getMessageContentAsByte());
+ } catch (StorageException e) {
+ getLogger().error("Failed to write the retrieved queue message to FlowFile content due to {}", new Object[] {e});
+ context.yield();
+ }
+ });
+
+ session.transfer(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().receive(flowFile, cloudQueue.getStorageUri().toString());
+ }
+
+ if(autoDelete) {
+ session.commit();
+
+ for (final CloudQueueMessage message : cloudQueueMessages) {
+ try {
+ cloudQueue.deleteMessage(message);
+ } catch (StorageException e) {
+ getLogger().error("Failed to delete the retrieved message with the id {} from the queue due to {}",
+ new Object[] {message.getMessageId(), e});
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+ final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
+
+ final int batchSize = validationContext.getProperty(BATCH_SIZE).asInteger();
+
+ //Azure Queue Storage supports only a max of 32 messages to be retrieved in a single invocation
+ if (batchSize > 32) {
+ problems.add(new ValidationResult.Builder()
+ .valid(false)
+ .subject(BATCH_SIZE.getDisplayName())
+ .explanation("only up to 32 messages can be retrieved at a time")
+ .build()
+ );
+ }
+
+ final int visibilityTimeout = validationContext.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+
+ if (visibilityTimeout <= 0) {
--- End diff --
No.. `VISIBILITY_TIMEOUT` is using `TIME_PERIOD_VALIDATOR` as expected.. this custom validation is to check if the validated time period (in seconds) is `0` or less. (less may not happen) The reason why it is checked: visibility timeout of 0 secs will throw an error in runtime.. Not sure if that's a bug in the storage SDK.
---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2611#discussion_r183125143
--- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueueMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Collections;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+@SeeAlso({PutAzureQueueStorage.class})
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
+@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
+ "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
+ @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
+ @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
+ @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
+ @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
+})
+public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
+
+ public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
+ .name("auto-delete-messages")
+ .displayName("Auto Delete Messages")
+ .description("Specifies whether the received message is to be automatically deleted from the queue.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("batch-size")
+ .displayName("Batch Size")
+ .description("The number of messages to be retrieved from the queue.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("32")
+ .build();
+
+ public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("visibility-timeout")
+ .displayName("Visibility Timeout")
+ .description("The duration during which the retrieved message should be invisible to other consumers.")
+ .required(true)
+ .defaultValue("30 secs")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
+ AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
+ BATCH_SIZE, VISIBILITY_TIMEOUT));
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Collections.singleton(REL_SUCCESS);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+
+ final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
+
+ final Iterable<CloudQueueMessage> retrievedMessagesIterable;
+
+ try {
+ retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
+ } catch (final StorageException e) {
+ getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e});
+ context.yield();
+ return;
+ }
+
+ final List<CloudQueueMessage> cloudQueueMessages = toList(retrievedMessagesIterable);
+
+ for (final CloudQueueMessage message : cloudQueueMessages) {
+ FlowFile flowFile = session.create();
+
+ final Map<String, String> attributes = new HashMap<>();
+
+ attributes.put("azure.queue.uri", cloudQueue.getUri().toString());
+ attributes.put("azure.queue.insertionTime", message.getInsertionTime().toString());
+ attributes.put("azure.queue.expirationTime", message.getExpirationTime().toString());
+ attributes.put("azure.queue.messageId", message.getMessageId());
+ attributes.put("azure.queue.popReceipt", message.getPopReceipt());
+
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ flowFile = session.write(flowFile, out -> {
+ try {
+ out.write(message.getMessageContentAsByte());
+ } catch (StorageException e) {
+ getLogger().error("Failed to write the retrieved queue message to FlowFile content due to {}", new Object[] {e});
+ context.yield();
+ }
+ });
+
+ session.transfer(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().receive(flowFile, cloudQueue.getStorageUri().toString());
+ }
+
+ if(autoDelete) {
+ session.commit();
--- End diff --
Done
---
[GitHub] nifi issue #2611: NIFI-5015: Implemented Azure Queue Storage processors
Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on the issue:
https://github.com/apache/nifi/pull/2611
@pvillard31 Mind taking a look at this?
---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2611#discussion_r182836230
--- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueueMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Collections;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+@SeeAlso({PutAzureQueueStorage.class})
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
+@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
+ "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
+ @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
+ @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
+ @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
+ @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
+})
+public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
+
+ public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
+ .name("auto-delete-messages")
+ .displayName("Auto Delete Messages")
+ .description("Specifies whether the received message is to be automatically deleted from the queue.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("batch-size")
+ .displayName("Batch Size")
+ .description("The number of messages to be retrieved from the queue.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("32")
+ .build();
+
+ public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("visibility-timeout")
+ .displayName("Visibility Timeout")
+ .description("The duration during which the retrieved message should be invisible to other consumers.")
+ .required(true)
+ .defaultValue("30 secs")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
+ AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
+ BATCH_SIZE, VISIBILITY_TIMEOUT));
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Collections.singleton(REL_SUCCESS);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+
+ final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
+
+ final Iterable<CloudQueueMessage> retrievedMessagesIterable;
+
+ try {
+ retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
+ } catch (final StorageException e) {
+ getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e});
+ context.yield();
+ return;
+ }
+
+ final List<CloudQueueMessage> cloudQueueMessages = toList(retrievedMessagesIterable);
+
+ for (final CloudQueueMessage message : cloudQueueMessages) {
+ FlowFile flowFile = session.create();
+
+ final Map<String, String> attributes = new HashMap<>();
+
+ attributes.put("azure.queue.uri", cloudQueue.getUri().toString());
+ attributes.put("azure.queue.insertionTime", message.getInsertionTime().toString());
+ attributes.put("azure.queue.expirationTime", message.getExpirationTime().toString());
+ attributes.put("azure.queue.messageId", message.getMessageId());
+ attributes.put("azure.queue.popReceipt", message.getPopReceipt());
+
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ flowFile = session.write(flowFile, out -> {
+ try {
+ out.write(message.getMessageContentAsByte());
+ } catch (StorageException e) {
+ getLogger().error("Failed to write the retrieved queue message to FlowFile content due to {}", new Object[] {e});
+ context.yield();
+ }
+ });
+
+ session.transfer(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().receive(flowFile, cloudQueue.getStorageUri().toString());
+ }
+
+ if(autoDelete) {
+ session.commit();
--- End diff --
should we clearly indicate that in case of issues we could create duplicates? (in case we committed the session but didn't delete the messages from the queue)
---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2611#discussion_r182834919
--- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueue;
+import com.microsoft.azure.storage.queue.CloudQueueClient;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
+
+ public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
+ .name("storage-cloudQueue-name")
+ .displayName("Queue Name")
+ .description("Name of the Azure Storage Queue")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
--- End diff --
shouldn't it be with the variable registry scope?
---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2611#discussion_r182836816
--- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueueMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Collections;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+@SeeAlso({PutAzureQueueStorage.class})
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
+@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
+ "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
+ @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
+ @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
+ @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
+ @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
+})
+public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
+
+ public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
+ .name("auto-delete-messages")
+ .displayName("Auto Delete Messages")
+ .description("Specifies whether the received message is to be automatically deleted from the queue.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("batch-size")
+ .displayName("Batch Size")
+ .description("The number of messages to be retrieved from the queue.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("32")
+ .build();
+
+ public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("visibility-timeout")
+ .displayName("Visibility Timeout")
+ .description("The duration during which the retrieved message should be invisible to other consumers.")
+ .required(true)
+ .defaultValue("30 secs")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
+ AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
+ BATCH_SIZE, VISIBILITY_TIMEOUT));
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Collections.singleton(REL_SUCCESS);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+
+ final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
+
+ final Iterable<CloudQueueMessage> retrievedMessagesIterable;
+
+ try {
+ retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
+ } catch (final StorageException e) {
+ getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e});
+ context.yield();
+ return;
+ }
+
+ final List<CloudQueueMessage> cloudQueueMessages = toList(retrievedMessagesIterable);
+
+ for (final CloudQueueMessage message : cloudQueueMessages) {
+ FlowFile flowFile = session.create();
+
+ final Map<String, String> attributes = new HashMap<>();
+
+ attributes.put("azure.queue.uri", cloudQueue.getUri().toString());
+ attributes.put("azure.queue.insertionTime", message.getInsertionTime().toString());
+ attributes.put("azure.queue.expirationTime", message.getExpirationTime().toString());
+ attributes.put("azure.queue.messageId", message.getMessageId());
+ attributes.put("azure.queue.popReceipt", message.getPopReceipt());
+
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ flowFile = session.write(flowFile, out -> {
+ try {
+ out.write(message.getMessageContentAsByte());
+ } catch (StorageException e) {
+ getLogger().error("Failed to write the retrieved queue message to FlowFile content due to {}", new Object[] {e});
+ context.yield();
+ }
+ });
+
+ session.transfer(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().receive(flowFile, cloudQueue.getStorageUri().toString());
+ }
+
+ if(autoDelete) {
+ session.commit();
+
+ for (final CloudQueueMessage message : cloudQueueMessages) {
+ try {
+ cloudQueue.deleteMessage(message);
+ } catch (StorageException e) {
+ getLogger().error("Failed to delete the retrieved message with the id {} from the queue due to {}",
+ new Object[] {message.getMessageId(), e});
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+ final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
+
+ final int batchSize = validationContext.getProperty(BATCH_SIZE).asInteger();
+
+ //Azure Queue Storage supports only a max of 32 messages to be retrieved in a single invocation
+ if (batchSize > 32) {
+ problems.add(new ValidationResult.Builder()
+ .valid(false)
+ .subject(BATCH_SIZE.getDisplayName())
+ .explanation("only up to 32 messages can be retrieved at a time")
+ .build()
+ );
+ }
+
+ final int visibilityTimeout = validationContext.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+
+ if (visibilityTimeout <= 0) {
--- End diff --
You can specify in the property that you want a positive integer validator instead of creating a custom validator
---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2611#discussion_r182965345
--- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueueMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Collections;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+@SeeAlso({PutAzureQueueStorage.class})
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
+@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
+ "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
+ @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
+ @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
+ @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
+ @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
+})
+public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
+
+ public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
+ .name("auto-delete-messages")
+ .displayName("Auto Delete Messages")
+ .description("Specifies whether the received message is to be automatically deleted from the queue.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("batch-size")
+ .displayName("Batch Size")
+ .description("The number of messages to be retrieved from the queue.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("32")
+ .build();
+
+ public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("visibility-timeout")
+ .displayName("Visibility Timeout")
+ .description("The duration during which the retrieved message should be invisible to other consumers.")
+ .required(true)
+ .defaultValue("30 secs")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
+ AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
+ BATCH_SIZE, VISIBILITY_TIMEOUT));
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Collections.singleton(REL_SUCCESS);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+
+ final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
+
+ final Iterable<CloudQueueMessage> retrievedMessagesIterable;
+
+ try {
+ retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
+ } catch (final StorageException e) {
+ getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e});
+ context.yield();
+ return;
+ }
+
+ final List<CloudQueueMessage> cloudQueueMessages = toList(retrievedMessagesIterable);
+
+ for (final CloudQueueMessage message : cloudQueueMessages) {
+ FlowFile flowFile = session.create();
+
+ final Map<String, String> attributes = new HashMap<>();
+
+ attributes.put("azure.queue.uri", cloudQueue.getUri().toString());
+ attributes.put("azure.queue.insertionTime", message.getInsertionTime().toString());
+ attributes.put("azure.queue.expirationTime", message.getExpirationTime().toString());
+ attributes.put("azure.queue.messageId", message.getMessageId());
+ attributes.put("azure.queue.popReceipt", message.getPopReceipt());
+
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ flowFile = session.write(flowFile, out -> {
+ try {
+ out.write(message.getMessageContentAsByte());
+ } catch (StorageException e) {
+ getLogger().error("Failed to write the retrieved queue message to FlowFile content due to {}", new Object[] {e});
+ context.yield();
+ }
+ });
+
+ session.transfer(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().receive(flowFile, cloudQueue.getStorageUri().toString());
+ }
+
+ if(autoDelete) {
+ session.commit();
--- End diff --
A note in the capability description of the processor sounds good to me.
---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2611#discussion_r182948045
--- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueue;
+import com.microsoft.azure.storage.queue.CloudQueueClient;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
+
+ public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
+ .name("storage-cloudQueue-name")
+ .displayName("Queue Name")
+ .description("Name of the Azure Storage Queue")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All successfully processed FlowFiles are routed to this relationship")
+ .build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("Unsuccessful operations will be transferred to the failure relationship.")
+ .build();
+
+ private static final String FORMAT_QUEUE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
+ private static final String FORMAT_QUEUE_BASE_URI = "https://%s.queue.core.windows.net";
+
+ private static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
+
+ protected CloudQueue cloudQueue;
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+ final List<ValidationResult> problems = new ArrayList<>(AzureStorageUtils.validateCredentialProperties(validationContext));
+
+ final String queueName = validationContext.getProperty(QUEUE).evaluateAttributeExpressions().getValue();
+ if(!StringUtils.isAllLowerCase(queueName)) {
--- End diff --
That's what I thought because that's the simplest thing to do. However I wanted to emphasis to the user about the same. I din't know any other way other than letting the validator do its thing.
---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2611#discussion_r182946138
--- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueue;
+import com.microsoft.azure.storage.queue.CloudQueueClient;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
+
+ public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
+ .name("storage-cloudQueue-name")
+ .displayName("Queue Name")
+ .description("Name of the Azure Storage Queue")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
--- End diff --
Any particular reason why it shouldn't be expected to be present as a flowfile attribute? It is not sensitive so I thought it can be read from both FlowFile as well as Variable Registry
---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2611#discussion_r182964837
--- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueue;
+import com.microsoft.azure.storage.queue.CloudQueueClient;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
+
+ public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
+ .name("storage-cloudQueue-name")
+ .displayName("Queue Name")
+ .description("Name of the Azure Storage Queue")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
--- End diff --
I don't see where you're accessing this property using flow files. In the abstract class, you're doing:
````java
final String queueName = validationContext.getProperty(QUEUE).evaluateAttributeExpressions().getValue();
````
in the ``@OnScheduled`` method. So you're not using the flow files. Am I missing something?
---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2611#discussion_r182948251
--- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueueMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Collections;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+@SeeAlso({PutAzureQueueStorage.class})
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
+@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
+ "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
+ @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
+ @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
+ @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
+ @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
+})
+public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
+
+ public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
+ .name("auto-delete-messages")
+ .displayName("Auto Delete Messages")
+ .description("Specifies whether the received message is to be automatically deleted from the queue.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("batch-size")
+ .displayName("Batch Size")
+ .description("The number of messages to be retrieved from the queue.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("32")
+ .build();
+
+ public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("visibility-timeout")
+ .displayName("Visibility Timeout")
+ .description("The duration during which the retrieved message should be invisible to other consumers.")
+ .required(true)
+ .defaultValue("30 secs")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
+ AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
+ BATCH_SIZE, VISIBILITY_TIMEOUT));
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Collections.singleton(REL_SUCCESS);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+
+ final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
+
+ final Iterable<CloudQueueMessage> retrievedMessagesIterable;
+
+ try {
+ retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
+ } catch (final StorageException e) {
+ getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e});
+ context.yield();
+ return;
+ }
+
+ final List<CloudQueueMessage> cloudQueueMessages = toList(retrievedMessagesIterable);
+
+ for (final CloudQueueMessage message : cloudQueueMessages) {
+ FlowFile flowFile = session.create();
+
+ final Map<String, String> attributes = new HashMap<>();
+
+ attributes.put("azure.queue.uri", cloudQueue.getUri().toString());
+ attributes.put("azure.queue.insertionTime", message.getInsertionTime().toString());
+ attributes.put("azure.queue.expirationTime", message.getExpirationTime().toString());
+ attributes.put("azure.queue.messageId", message.getMessageId());
+ attributes.put("azure.queue.popReceipt", message.getPopReceipt());
+
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ flowFile = session.write(flowFile, out -> {
+ try {
+ out.write(message.getMessageContentAsByte());
+ } catch (StorageException e) {
+ getLogger().error("Failed to write the retrieved queue message to FlowFile content due to {}", new Object[] {e});
+ context.yield();
+ }
+ });
+
+ session.transfer(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().receive(flowFile, cloudQueue.getStorageUri().toString());
+ }
+
+ if(autoDelete) {
+ session.commit();
--- End diff --
Good point. Where should we add it to? Documentation ?
---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2611#discussion_r182835135
--- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueue;
+import com.microsoft.azure.storage.queue.CloudQueueClient;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
+
+ public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
+ .name("storage-cloudQueue-name")
+ .displayName("Queue Name")
+ .description("Name of the Azure Storage Queue")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All successfully processed FlowFiles are routed to this relationship")
+ .build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("Unsuccessful operations will be transferred to the failure relationship.")
+ .build();
+
+ private static final String FORMAT_QUEUE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
--- End diff --
should it be configurable in case user wants to provide more parameters? with dynamic properties?
---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2611#discussion_r182836695
--- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueueMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Collections;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+@SeeAlso({PutAzureQueueStorage.class})
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
+@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
+ "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
+ @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
+ @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
+ @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
+ @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
+})
+public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
+
+ public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
+ .name("auto-delete-messages")
+ .displayName("Auto Delete Messages")
+ .description("Specifies whether the received message is to be automatically deleted from the queue.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("batch-size")
+ .displayName("Batch Size")
+ .description("The number of messages to be retrieved from the queue.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("32")
+ .build();
+
+ public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("visibility-timeout")
+ .displayName("Visibility Timeout")
+ .description("The duration during which the retrieved message should be invisible to other consumers.")
+ .required(true)
+ .defaultValue("30 secs")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
+ AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
+ BATCH_SIZE, VISIBILITY_TIMEOUT));
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Collections.singleton(REL_SUCCESS);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+
+ final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
+
+ final Iterable<CloudQueueMessage> retrievedMessagesIterable;
+
+ try {
+ retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
+ } catch (final StorageException e) {
+ getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e});
+ context.yield();
+ return;
+ }
+
+ final List<CloudQueueMessage> cloudQueueMessages = toList(retrievedMessagesIterable);
+
+ for (final CloudQueueMessage message : cloudQueueMessages) {
+ FlowFile flowFile = session.create();
+
+ final Map<String, String> attributes = new HashMap<>();
+
+ attributes.put("azure.queue.uri", cloudQueue.getUri().toString());
+ attributes.put("azure.queue.insertionTime", message.getInsertionTime().toString());
+ attributes.put("azure.queue.expirationTime", message.getExpirationTime().toString());
+ attributes.put("azure.queue.messageId", message.getMessageId());
+ attributes.put("azure.queue.popReceipt", message.getPopReceipt());
+
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ flowFile = session.write(flowFile, out -> {
+ try {
+ out.write(message.getMessageContentAsByte());
+ } catch (StorageException e) {
+ getLogger().error("Failed to write the retrieved queue message to FlowFile content due to {}", new Object[] {e});
+ context.yield();
+ }
+ });
+
+ session.transfer(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().receive(flowFile, cloudQueue.getStorageUri().toString());
+ }
+
+ if(autoDelete) {
+ session.commit();
+
+ for (final CloudQueueMessage message : cloudQueueMessages) {
+ try {
+ cloudQueue.deleteMessage(message);
+ } catch (StorageException e) {
+ getLogger().error("Failed to delete the retrieved message with the id {} from the queue due to {}",
+ new Object[] {message.getMessageId(), e});
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+ final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
+
+ final int batchSize = validationContext.getProperty(BATCH_SIZE).asInteger();
--- End diff --
I believe you can set a range validator directly in the property (IIRC it's in CommonValidator or something like that)
---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/nifi/pull/2611
---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2611#discussion_r183126762
--- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueue;
+import com.microsoft.azure.storage.queue.CloudQueueClient;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
+
+ public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
+ .name("storage-cloudQueue-name")
+ .displayName("Queue Name")
+ .description("Name of the Azure Storage Queue")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
--- End diff --
You were right. Removed `@OnScheduled` not just because of this reason alone but `ACCOUNT_NAME` and `ACCOUNT_KEY` or `SAS_TOKEN` properties use `ExpressionLanguageScope.FLOWFILES` so that could be a potential bug since with that method dint get flowfile as a parameter before. Now it has been changed.
---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2611#discussion_r183125717
--- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueueMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Collections;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+@SeeAlso({PutAzureQueueStorage.class})
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
+@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
+ "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
+ @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
+ @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
+ @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
+ @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
+})
+public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
+
+ public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
+ .name("auto-delete-messages")
+ .displayName("Auto Delete Messages")
+ .description("Specifies whether the received message is to be automatically deleted from the queue.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("batch-size")
+ .displayName("Batch Size")
+ .description("The number of messages to be retrieved from the queue.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("32")
+ .build();
+
+ public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("visibility-timeout")
+ .displayName("Visibility Timeout")
+ .description("The duration during which the retrieved message should be invisible to other consumers.")
+ .required(true)
+ .defaultValue("30 secs")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
+ AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
+ BATCH_SIZE, VISIBILITY_TIMEOUT));
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Collections.singleton(REL_SUCCESS);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+
+ final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
+
+ final Iterable<CloudQueueMessage> retrievedMessagesIterable;
+
+ try {
+ retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
+ } catch (final StorageException e) {
+ getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e});
+ context.yield();
+ return;
+ }
+
+ final List<CloudQueueMessage> cloudQueueMessages = toList(retrievedMessagesIterable);
+
+ for (final CloudQueueMessage message : cloudQueueMessages) {
+ FlowFile flowFile = session.create();
+
+ final Map<String, String> attributes = new HashMap<>();
+
+ attributes.put("azure.queue.uri", cloudQueue.getUri().toString());
+ attributes.put("azure.queue.insertionTime", message.getInsertionTime().toString());
+ attributes.put("azure.queue.expirationTime", message.getExpirationTime().toString());
+ attributes.put("azure.queue.messageId", message.getMessageId());
+ attributes.put("azure.queue.popReceipt", message.getPopReceipt());
+
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ flowFile = session.write(flowFile, out -> {
+ try {
+ out.write(message.getMessageContentAsByte());
+ } catch (StorageException e) {
+ getLogger().error("Failed to write the retrieved queue message to FlowFile content due to {}", new Object[] {e});
+ context.yield();
+ }
+ });
+
+ session.transfer(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().receive(flowFile, cloudQueue.getStorageUri().toString());
+ }
+
+ if(autoDelete) {
+ session.commit();
+
+ for (final CloudQueueMessage message : cloudQueueMessages) {
+ try {
+ cloudQueue.deleteMessage(message);
+ } catch (StorageException e) {
+ getLogger().error("Failed to delete the retrieved message with the id {} from the queue due to {}",
+ new Object[] {message.getMessageId(), e});
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+ final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
+
+ final int batchSize = validationContext.getProperty(BATCH_SIZE).asInteger();
--- End diff --
Done. It is `createLongValidator` although I feel it could have been better named in order to be easily found while developing time itself :)
---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2611#discussion_r182835025
--- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueue;
+import com.microsoft.azure.storage.queue.CloudQueueClient;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
+
+ public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
+ .name("storage-cloudQueue-name")
+ .displayName("Queue Name")
+ .description("Name of the Azure Storage Queue")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All successfully processed FlowFiles are routed to this relationship")
+ .build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("Unsuccessful operations will be transferred to the failure relationship.")
+ .build();
+
+ private static final String FORMAT_QUEUE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
+ private static final String FORMAT_QUEUE_BASE_URI = "https://%s.queue.core.windows.net";
--- End diff --
are we sure this will never change? could be something provided by the user?
---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2611#discussion_r182948346
--- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueueMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Collections;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+@SeeAlso({PutAzureQueueStorage.class})
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
+@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
+ "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
+ @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
+ @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
+ @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
+ @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
+})
+public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
+
+ public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
+ .name("auto-delete-messages")
+ .displayName("Auto Delete Messages")
+ .description("Specifies whether the received message is to be automatically deleted from the queue.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("batch-size")
+ .displayName("Batch Size")
+ .description("The number of messages to be retrieved from the queue.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("32")
+ .build();
+
+ public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("visibility-timeout")
+ .displayName("Visibility Timeout")
+ .description("The duration during which the retrieved message should be invisible to other consumers.")
+ .required(true)
+ .defaultValue("30 secs")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
+ AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
+ BATCH_SIZE, VISIBILITY_TIMEOUT));
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Collections.singleton(REL_SUCCESS);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+
+ final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
+
+ final Iterable<CloudQueueMessage> retrievedMessagesIterable;
+
+ try {
+ retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
--- End diff --
You got me.. Forgot to do a thorough check on that. Will do that and update.
---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2611#discussion_r182948377
--- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueueMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Collections;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+@SeeAlso({PutAzureQueueStorage.class})
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
+@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
+ "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
+ @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
+ @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
+ @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
+ @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
+})
+public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
+
+ public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
+ .name("auto-delete-messages")
+ .displayName("Auto Delete Messages")
+ .description("Specifies whether the received message is to be automatically deleted from the queue.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("batch-size")
+ .displayName("Batch Size")
+ .description("The number of messages to be retrieved from the queue.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("32")
+ .build();
+
+ public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("visibility-timeout")
+ .displayName("Visibility Timeout")
+ .description("The duration during which the retrieved message should be invisible to other consumers.")
+ .required(true)
+ .defaultValue("30 secs")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
+ AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
+ BATCH_SIZE, VISIBILITY_TIMEOUT));
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Collections.singleton(REL_SUCCESS);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+
+ final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
+
+ final Iterable<CloudQueueMessage> retrievedMessagesIterable;
+
+ try {
+ retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
+ } catch (final StorageException e) {
+ getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e});
+ context.yield();
+ return;
+ }
+
+ final List<CloudQueueMessage> cloudQueueMessages = toList(retrievedMessagesIterable);
+
+ for (final CloudQueueMessage message : cloudQueueMessages) {
+ FlowFile flowFile = session.create();
+
+ final Map<String, String> attributes = new HashMap<>();
+
+ attributes.put("azure.queue.uri", cloudQueue.getUri().toString());
+ attributes.put("azure.queue.insertionTime", message.getInsertionTime().toString());
+ attributes.put("azure.queue.expirationTime", message.getExpirationTime().toString());
+ attributes.put("azure.queue.messageId", message.getMessageId());
+ attributes.put("azure.queue.popReceipt", message.getPopReceipt());
+
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ flowFile = session.write(flowFile, out -> {
+ try {
+ out.write(message.getMessageContentAsByte());
+ } catch (StorageException e) {
+ getLogger().error("Failed to write the retrieved queue message to FlowFile content due to {}", new Object[] {e});
+ context.yield();
+ }
+ });
+
+ session.transfer(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().receive(flowFile, cloudQueue.getStorageUri().toString());
+ }
+
+ if(autoDelete) {
+ session.commit();
+
+ for (final CloudQueueMessage message : cloudQueueMessages) {
+ try {
+ cloudQueue.deleteMessage(message);
+ } catch (StorageException e) {
+ getLogger().error("Failed to delete the retrieved message with the id {} from the queue due to {}",
+ new Object[] {message.getMessageId(), e});
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+ final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
+
+ final int batchSize = validationContext.getProperty(BATCH_SIZE).asInteger();
--- End diff --
Will take a look.
---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2611#discussion_r182965161
--- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueue;
+import com.microsoft.azure.storage.queue.CloudQueueClient;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
+
+ public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
+ .name("storage-cloudQueue-name")
+ .displayName("Queue Name")
+ .description("Name of the Azure Storage Queue")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All successfully processed FlowFiles are routed to this relationship")
+ .build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("Unsuccessful operations will be transferred to the failure relationship.")
+ .build();
+
+ private static final String FORMAT_QUEUE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
--- End diff --
No, it's just in case some are added and we don't want to update the code. But we can leave as-is.
---