You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/12/13 18:43:12 UTC

[nifi] 08/22: NIFI-9410: Fix for ConsumeMQTT processor in stateless environment

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 96af7a692cbf3c945a2a1dc5096c121b2e22c5dd
Author: Peter Gyori <pe...@gmail.com>
AuthorDate: Tue Nov 23 17:04:00 2021 +0100

    NIFI-9410: Fix for ConsumeMQTT processor in stateless environment
    
    Signed-off-by: Peter Gyori <pe...@gmail.com>
    
    NIFI-9410: Added displayName to the QoS processor property
    
    Signed-off-by: Peter Gyori <pe...@gmail.com>
---
 .../main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java    | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
index 2755585..7c5a7ff 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
@@ -145,7 +145,8 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
 
     public static final PropertyDescriptor PROP_QOS = new PropertyDescriptor.Builder()
             .name("Quality of Service(QoS)")
-            .description("The Quality of Service(QoS) to receive the message with. Accepts values '0', '1' or '2'; '0' for 'at most once', '1' for 'at least once', '2' for 'exactly once'.")
+            .displayName("Quality of Service (QoS)")
+            .description("The Quality of Service (QoS) to receive the message with. Accepts values '0', '1' or '2'; '0' for 'at most once', '1' for 'at least once', '2' for 'exactly once'.")
             .required(true)
             .defaultValue(ALLOWABLE_VALUE_QOS_0.getValue())
             .allowableValues(
@@ -387,7 +388,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
             return;
         }
 
-        if(context.getProperty(RECORD_READER).isSet()) {
+        if (context.getProperty(RECORD_READER).isSet()) {
             transferQueueRecord(context, session);
         } else if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) {
             transferQueueDemarcator(context, session);
@@ -440,7 +441,8 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
 
             session.getProvenanceReporter().receive(messageFlowfile, getTransitUri(mqttMessage.getTopic()));
             session.transfer(messageFlowfile, REL_MESSAGE);
-            session.commitAsync(() -> mqttQueue.remove(mqttMessage));
+            session.commitAsync();
+            mqttQueue.remove(mqttMessage);
         }
     }