You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by pvillard31 <gi...@git.apache.org> on 2017/07/05 08:32:05 UTC

[GitHub] nifi pull request #1977: NIFI-515 - DeleteSQS and PutSQS should offer batch ...

GitHub user pvillard31 opened a pull request:

    https://github.com/apache/nifi/pull/1977

    NIFI-515 - DeleteSQS and PutSQS should offer batch processing

    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [ ] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [ ] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [ ] Have you written or updated unit tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/pvillard31/nifi NIFI-515-temp

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/1977.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1977
    
----
commit 87812b4dbcd6f7caec8d70e8805c53c59bb81a5b
Author: Pierre Villard <pi...@gmail.com>
Date:   2017-07-03T15:10:53Z

    NIFI-515 - DeleteSQS and PutSQS should offer batch processing

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1977: NIFI-515 - DeleteSQS and PutSQS should offer batch ...

Posted by jzonthemtn <gi...@git.apache.org>.
Github user jzonthemtn commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1977#discussion_r125632070
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java ---
    @@ -108,43 +108,57 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(flowFile).getValue();
             request.setQueueUrl(queueUrl);
     
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        final List<FlowFile> flowFiles = session.get(new UrlFlowFileFilter(batchSize, queueUrl, context));
    +        flowFiles.add(flowFile);
    +
             final Set<SendMessageBatchRequestEntry> entries = new HashSet<>();
     
    -        final SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
    -        entry.setId(flowFile.getAttribute("uuid"));
    -        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
    -        session.exportTo(flowFile, baos);
    -        final String flowFileContent = baos.toString();
    -        entry.setMessageBody(flowFileContent);
    +        for(FlowFile flowFileItem : flowFiles) {
     
    -        final Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
    +            final SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
    +            entry.setId(flowFileItem.getAttribute("uuid"));
    +            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +            session.exportTo(flowFileItem, baos);
    +            final String flowFileContent = baos.toString();
    +            entry.setMessageBody(flowFileContent);
     
    -        for (final PropertyDescriptor descriptor : userDefinedProperties) {
    -            final MessageAttributeValue mav = new MessageAttributeValue();
    -            mav.setDataType("String");
    -            mav.setStringValue(context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue());
    -            messageAttributes.put(descriptor.getName(), mav);
    -        }
    +            final Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
     
    -        entry.setMessageAttributes(messageAttributes);
    -        entry.setDelaySeconds(context.getProperty(DELAY).asTimePeriod(TimeUnit.SECONDS).intValue());
    -        entries.add(entry);
    +            for (final PropertyDescriptor descriptor : userDefinedProperties) {
    +                final MessageAttributeValue mav = new MessageAttributeValue();
    +                mav.setDataType("String");
    +                mav.setStringValue(context.getProperty(descriptor).evaluateAttributeExpressions(flowFileItem).getValue());
    +                messageAttributes.put(descriptor.getName(), mav);
    +            }
    +
    +            entry.setMessageAttributes(messageAttributes);
    +            entry.setDelaySeconds(context.getProperty(DELAY).asTimePeriod(TimeUnit.SECONDS).intValue());
    +            entries.add(entry);
    +
    +        }
     
             request.setEntries(entries);
     
             try {
                 client.sendMessageBatch(request);
             } catch (final Exception e) {
    -            getLogger().error("Failed to send messages to Amazon SQS due to {}; routing to failure", new Object[]{e});
    -            flowFile = session.penalize(flowFile);
    -            session.transfer(flowFile, REL_FAILURE);
    +            getLogger().error("Failed to send {} messages to Amazon SQS due to {}; routing to failure", new Object[]{flowFiles.size(), e});
    --- End diff --
    
    There could be a mix of successful/failed messages in the `SendMessageBatchResult`. Does that impact how the session is transferred?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1977: NIFI-515 - DeleteSQS and PutSQS should offer batch ...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1977#discussion_r125633660
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java ---
    @@ -108,43 +108,57 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(flowFile).getValue();
             request.setQueueUrl(queueUrl);
     
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        final List<FlowFile> flowFiles = session.get(new UrlFlowFileFilter(batchSize, queueUrl, context));
    +        flowFiles.add(flowFile);
    +
             final Set<SendMessageBatchRequestEntry> entries = new HashSet<>();
     
    -        final SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
    -        entry.setId(flowFile.getAttribute("uuid"));
    -        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
    -        session.exportTo(flowFile, baos);
    -        final String flowFileContent = baos.toString();
    -        entry.setMessageBody(flowFileContent);
    +        for(FlowFile flowFileItem : flowFiles) {
     
    -        final Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
    +            final SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
    +            entry.setId(flowFileItem.getAttribute("uuid"));
    +            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +            session.exportTo(flowFileItem, baos);
    +            final String flowFileContent = baos.toString();
    +            entry.setMessageBody(flowFileContent);
     
    -        for (final PropertyDescriptor descriptor : userDefinedProperties) {
    -            final MessageAttributeValue mav = new MessageAttributeValue();
    -            mav.setDataType("String");
    -            mav.setStringValue(context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue());
    -            messageAttributes.put(descriptor.getName(), mav);
    -        }
    +            final Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
     
    -        entry.setMessageAttributes(messageAttributes);
    -        entry.setDelaySeconds(context.getProperty(DELAY).asTimePeriod(TimeUnit.SECONDS).intValue());
    -        entries.add(entry);
    +            for (final PropertyDescriptor descriptor : userDefinedProperties) {
    +                final MessageAttributeValue mav = new MessageAttributeValue();
    +                mav.setDataType("String");
    +                mav.setStringValue(context.getProperty(descriptor).evaluateAttributeExpressions(flowFileItem).getValue());
    +                messageAttributes.put(descriptor.getName(), mav);
    +            }
    +
    +            entry.setMessageAttributes(messageAttributes);
    +            entry.setDelaySeconds(context.getProperty(DELAY).asTimePeriod(TimeUnit.SECONDS).intValue());
    +            entries.add(entry);
    +
    +        }
     
             request.setEntries(entries);
     
             try {
                 client.sendMessageBatch(request);
             } catch (final Exception e) {
    -            getLogger().error("Failed to send messages to Amazon SQS due to {}; routing to failure", new Object[]{e});
    -            flowFile = session.penalize(flowFile);
    -            session.transfer(flowFile, REL_FAILURE);
    +            getLogger().error("Failed to send {} messages to Amazon SQS due to {}; routing to failure", new Object[]{flowFiles.size(), e});
    --- End diff --
    
    You're right, I forgot that we have individual responses for each entry part of the request. Will update the PR. Thanks @jzonthemtn 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1977: NIFI-515 - DeleteSQS and PutSQS should offer batch process...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/1977
  
    @jzonthemtn - I just pushed a commit to address your comment. Basically, I parse the request results to split failures and success. I also added a unit test for that case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1977: NIFI-515 - DeleteSQS and PutSQS should offer batch process...

Posted by jzonthemtn <gi...@git.apache.org>.
Github user jzonthemtn commented on the issue:

    https://github.com/apache/nifi/pull/1977
  
    @pvillard31 That sounds great!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1977: NIFI-515 - DeleteSQS and PutSQS should offer batch process...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/1977
  
    Hey @jzonthemtn - sorry for the late answer, I've been busy lately. I just rebased this PR against master. If it still looks good to you, I'm sure we can find someone willing to merge it.


---

[GitHub] nifi issue #1977: NIFI-515 - DeleteSQS and PutSQS should offer batch process...

Posted by jzonthemtn <gi...@git.apache.org>.
Github user jzonthemtn commented on the issue:

    https://github.com/apache/nifi/pull/1977
  
    @pvillard31 Wondering if there's still community interest in this one? It would be useful for me at least. I can help with the merge conflicts if there is.


---