You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by olegz <gi...@git.apache.org> on 2016/09/15 13:39:20 UTC

[GitHub] nifi pull request #1022: NIFI-2774 changed the default ACK mode to CLIENT

GitHub user olegz opened a pull request:

    https://github.com/apache/nifi/pull/1022

    NIFI-2774 changed the default ACK mode to CLIENT

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/olegz/nifi NIFI-2774

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/1022.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1022
    
----
commit a9a4d2efefe99ecefa14f2b502a13c34545f19b5
Author: Oleg Zhurakousky <ol...@suitcase.io>
Date:   2016-09-15T13:33:27Z

    NIFI-2774 changed the default ACK mode to CLIENT

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1022: NIFI-2774 changed the default ACK mode to CLIENT

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the issue:

    https://github.com/apache/nifi/pull/1022
  
    
    Other.commit
    Nifi.commit
    
    This means at most once.  Loss is possible but dupes are not.
    
    
    Nifi.commit
    Other.commit
    
    This means at least once.  Dupes are possible but loss is not.
    
    Default mode should always be at least once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1022: NIFI-2774 changed the default ACK mode to CLIENT

Posted by ckmcd <gi...@git.apache.org>.
Github user ckmcd commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1022#discussion_r79008301
  
    --- Diff: nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java ---
    @@ -78,22 +77,24 @@
          */
         @Override
         protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession) throws ProcessException {
    -        final JMSResponse response = this.targetResource.consume();
    -        if (response != null){
    -            FlowFile flowFile = processSession.create();
    -            flowFile = processSession.write(flowFile, new OutputStreamCallback() {
    -                @Override
    -                public void process(final OutputStream out) throws IOException {
    -                    out.write(response.getMessageBody());
    -                }
    -            });
    -            Map<String, Object> jmsHeaders = response.getMessageHeaders();
    -            flowFile = this.updateFlowFileAttributesWithJmsHeaders(jmsHeaders, flowFile, processSession);
    -            processSession.getProvenanceReporter().receive(flowFile, context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
    -            processSession.transfer(flowFile, REL_SUCCESS);
    -        } else {
    -            context.yield();
    -        }
    +        this.targetResource.consume(response -> {
    +            if (response != null) {
    +                FlowFile flowFile = processSession.create();
    +                flowFile = processSession.write(flowFile, new OutputStreamCallback() {
    +                    @Override
    +                    public void process(final OutputStream out) throws IOException {
    +                        out.write(response.getMessageBody());
    +                    }
    +                });
    +                Map<String, Object> jmsHeaders = response.getMessageHeaders();
    +                flowFile = this.updateFlowFileAttributesWithJmsHeaders(jmsHeaders, flowFile, processSession);
    +                processSession.getProvenanceReporter().receive(flowFile,
    +                        context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
    +                processSession.transfer(flowFile, REL_SUCCESS);
    --- End diff --
    
    Question: doesn't the session need to be explicitly committed here, before the message gets acknowledged, in order to complete the chain of custody?  Otherwise I believe there is a small window where if NiFi crashed before session is committed by scheduler, the acknowledged message can be lost.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1022: NIFI-2774 changed the default ACK mode to CLIENT

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1022#discussion_r79015458
  
    --- Diff: nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java ---
    @@ -78,22 +77,24 @@
          */
         @Override
         protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession) throws ProcessException {
    -        final JMSResponse response = this.targetResource.consume();
    -        if (response != null){
    -            FlowFile flowFile = processSession.create();
    -            flowFile = processSession.write(flowFile, new OutputStreamCallback() {
    -                @Override
    -                public void process(final OutputStream out) throws IOException {
    -                    out.write(response.getMessageBody());
    -                }
    -            });
    -            Map<String, Object> jmsHeaders = response.getMessageHeaders();
    -            flowFile = this.updateFlowFileAttributesWithJmsHeaders(jmsHeaders, flowFile, processSession);
    -            processSession.getProvenanceReporter().receive(flowFile, context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
    -            processSession.transfer(flowFile, REL_SUCCESS);
    -        } else {
    -            context.yield();
    -        }
    +        this.targetResource.consume(response -> {
    +            if (response != null) {
    +                FlowFile flowFile = processSession.create();
    +                flowFile = processSession.write(flowFile, new OutputStreamCallback() {
    +                    @Override
    +                    public void process(final OutputStream out) throws IOException {
    +                        out.write(response.getMessageBody());
    +                    }
    +                });
    +                Map<String, Object> jmsHeaders = response.getMessageHeaders();
    +                flowFile = this.updateFlowFileAttributesWithJmsHeaders(jmsHeaders, flowFile, processSession);
    +                processSession.getProvenanceReporter().receive(flowFile,
    +                        context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
    +                processSession.transfer(flowFile, REL_SUCCESS);
    --- End diff --
    
    @ckmcd good catch. Indeed that window exists and we need to commit session within the callback and before message ack. Will address.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1022: NIFI-2774 changed the default ACK mode to CLIENT

Posted by olegz <gi...@git.apache.org>.
Github user olegz closed the pull request at:

    https://github.com/apache/nifi/pull/1022


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1022: NIFI-2774 changed the default ACK mode to CLIENT

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the issue:

    https://github.com/apache/nifi/pull/1022
  
    Closing this PR because of the complexity with fixing merge conflicts due to changes that came in with #1027. Will submit new one shortly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1022: NIFI-2774 changed the default ACK mode to CLIENT

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the issue:

    https://github.com/apache/nifi/pull/1022
  
    @ckmcd PR comments addressed with few minor improvements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---