You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "Peter Gyori (Jira)" <ji...@apache.org> on 2021/11/23 15:36:00 UTC

[jira] [Updated] (NIFI-9410) ConsumeMQTT does not work in stateless flows

     [ https://issues.apache.org/jira/browse/NIFI-9410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Peter Gyori updated NIFI-9410:
------------------------------
    Description: 
When ConsumeMQTT is executed in stateless environment, the {{transferQueue()}} method gets into an endless loop ({{{}while (!mqttQueue.isEmpty()){}}}). At the end of each iteration of this loop
{code:java}
session.commitAsync(() -> mqttQueue.remove(mqttMessage));{code}
is supposed to remove the message from the queue. but since the commit is async, the method is already processing the same message in the next iteration by the time it would execute. The session cannot commit so it does a rollback, and the message is never removed from the queue.

The endless loop consumes all resources and NiFi needs to be restarted to break the loop.

  was:
When ConsumeMQTT is executed in stateless environment, the {{transferQueue()}} method gets into an endless loop ({{{}while (!mqttQueue.isEmpty()){}}}). At the end of each iteration of this loop

 
{code:java}
session.commitAsync(() -> mqttQueue.remove(mqttMessage));{code}
 

is supposed to remove the message from the queue. but since the commit is async, the method is already processing the same message in the next iteration by the time it would execute. The session cannot commit so it does a rollback, and the message is never removed from the queue.

The endless loop consumes all resources and NiFi needs to be restarted to break the loop.


> ConsumeMQTT does not work in stateless flows
> --------------------------------------------
>
>                 Key: NIFI-9410
>                 URL: https://issues.apache.org/jira/browse/NIFI-9410
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>            Reporter: Peter Gyori
>            Assignee: Peter Gyori
>            Priority: Major
>
> When ConsumeMQTT is executed in stateless environment, the {{transferQueue()}} method gets into an endless loop ({{{}while (!mqttQueue.isEmpty()){}}}). At the end of each iteration of this loop
> {code:java}
> session.commitAsync(() -> mqttQueue.remove(mqttMessage));{code}
> is supposed to remove the message from the queue. but since the commit is async, the method is already processing the same message in the next iteration by the time it would execute. The session cannot commit so it does a rollback, and the message is never removed from the queue.
> The endless loop consumes all resources and NiFi needs to be restarted to break the loop.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)