You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2022/10/15 16:10:27 UTC

[nifi] branch main updated: NIFI-10650 Fix demarcator is appended to the end of the FlowFile's content by ConsumeMQTT

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new eecb6bfb38 NIFI-10650 Fix demarcator is appended to the end of the FlowFile's content by ConsumeMQTT
eecb6bfb38 is described below

commit eecb6bfb3842c72727dd65a463b396d567a06e50
Author: Nandor Soma Abonyi <ab...@gmail.com>
AuthorDate: Fri Oct 14 17:19:24 2022 +0200

    NIFI-10650 Fix demarcator is appended to the end of the FlowFile's content by ConsumeMQTT
    
    This closes #6534.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java    | 4 +++-
 .../test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java    | 2 +-
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
index 849cb17324..c3c299905f 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
@@ -431,8 +431,10 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
             int i = 0;
             while (!mqttQueue.isEmpty() && i < MAX_MESSAGES_PER_FLOW_FILE) {
                 final ReceivedMqttMessage mqttMessage = mqttQueue.poll();
+                if (i > 0) {
+                    out.write(demarcator);
+                }
                 out.write(mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload());
-                out.write(demarcator);
                 session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
                 i++;
             }
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
index af830f93d2..9a004af7e2 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
@@ -516,7 +516,7 @@ public class TestConsumeMQTT {
         assertEquals(flowFiles.size(), 1);
         assertEquals("{\"name\":\"Apache NiFi\"}\\n"
                         + THIS_IS_NOT_JSON + "\\n"
-                        + "{\"name\":\"Apache NiFi\"}\\n",
+                        + "{\"name\":\"Apache NiFi\"}",
                 new String(flowFiles.get(0).toByteArray()));
 
         final List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);