You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "Randy Bovay (JIRA)" <ji...@apache.org> on 2019/01/29 17:16:00 UTC

[jira] [Commented] (NIFI-5117) AMQP Consumer: Error during creation of Flow File results in lost message

    [ https://issues.apache.org/jira/browse/NIFI-5117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16755218#comment-16755218 ] 

Randy Bovay commented on NIFI-5117:
-----------------------------------

This story is fixed by the 1.7.1 changes to ConsumeAMQP.processor to act as a consumer by default

In fact, we've tested this, and in backpressure situations where NiFi is not able to move the message to the queue, A nice error or warning message is created and the Message is tagged as unacknowledged in RabbitMQ.(Confirmed by our Rabbit support team), and later pulled back in on subsequent pulls. 


We DO have the 'Auto-Acknowledge messages' set to False

> AMQP Consumer: Error during creation of Flow File results in lost message
> -------------------------------------------------------------------------
>
>                 Key: NIFI-5117
>                 URL: https://issues.apache.org/jira/browse/NIFI-5117
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 1.3.0, 1.4.0, 1.5.0, 1.6.0
>            Reporter: Edward Armes
>            Priority: Major
>
> The AMQP Consumer performs a "basicGet()". The was this basicGet is called results in the message being dequeued from the AMQP queue.
> If a processor instances fails to submit a flow file to the output as a result in the case of an error in "session.write()" or the processor is unexpectedly halted before the flow file is created and persisted, the message consumer from an AMQP queue is lost and can't be recovered.
> Reference: [https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/Channel.html#basicGet-java.lang.String-boolean-:]
> A potential fix here would be to:
>  # AMQPConsumer.java: Change the call "basicGet(this.queueName, true)" -> "basicGet(this.queueName, false)"
>  # AMQPConsumer.java: New method that wraps the basicAck() and basicNack() methods to taking a long (the delivery tag) and boolean (successes) if successes is true basicAck() is called is false basicNack() with requeue is called
>  # ConsumerAMQP.java: An additional call(s) to "consumer" to call the new method as needed in case of successes and error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)