You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "ASF subversion and git services (Jira)" <ji...@apache.org> on 2020/03/11 10:00:19 UTC

[jira] [Commented] (NIFI-7050) ConsumeJMS is not yielded in case of exception

    [ https://issues.apache.org/jira/browse/NIFI-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17056811#comment-17056811 ] 

ASF subversion and git services commented on NIFI-7050:
-------------------------------------------------------

Commit 89d8b877f9adc3f474983c4dec45bf4586a6baef in nifi's branch refs/heads/master from Gardella Juan Pablo
[ https://gitbox.apache.org/repos/asf?p=nifi.git;h=89d8b87 ]

NIFI-7050 ConsumeJMS is not yielded in case of exception

This closes #4004.

Signed-off-by: Peter Turcsanyi <tu...@apache.org>


> ConsumeJMS is not yielded in case of exception
> ----------------------------------------------
>
>                 Key: NIFI-7050
>                 URL: https://issues.apache.org/jira/browse/NIFI-7050
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>    Affects Versions: 1.10.0
>            Reporter: Gardella Juan Pablo
>            Assignee: Gardella Juan Pablo
>            Priority: Minor
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> If any exception happens when ConsumerJMS tries to read messages, the process tries again immediately. 
> {code:java}
>   try {
>             consumer.consume(destinationName, errorQueueName, durable, shared, subscriptionName, charset, new ConsumerCallback() {
>                 @Override
>                 public void accept(final JMSResponse response) {
>                     if (response == null) {
>                         return;
>                     }
>                     FlowFile flowFile = processSession.create();
>                     flowFile = processSession.write(flowFile, out -> out.write(response.getMessageBody()));
>                     final Map<String, String> jmsHeaders = response.getMessageHeaders();
>                     final Map<String, String> jmsProperties = response.getMessageProperties();
>                     flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession);
>                     flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession);
>                     flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName);
>                     processSession.getProvenanceReporter().receive(flowFile, destinationName);
>                     processSession.putAttribute(flowFile, JMS_MESSAGETYPE, response.getMessageType());
>                     processSession.transfer(flowFile, REL_SUCCESS);
>                     processSession.commit();
>                 }
>             });
>         } catch(Exception e) {
>             consumer.setValid(false);
>             throw e; // for backward compatibility with exception handling in flows
>         }
>     }
> {code}
> It should call {{context.yield}} in exception block. Notice [PublishJMS|https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java#L166] is yielded in the same scenario. It is requires to do in the ConsumeJMS processor only.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)