You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2022/10/15 16:10:27 UTC
[nifi] branch main updated: NIFI-10650 Fix demarcator is appended to the end of the FlowFile's content by ConsumeMQTT
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new eecb6bfb38 NIFI-10650 Fix demarcator is appended to the end of the FlowFile's content by ConsumeMQTT
eecb6bfb38 is described below
commit eecb6bfb3842c72727dd65a463b396d567a06e50
Author: Nandor Soma Abonyi <ab...@gmail.com>
AuthorDate: Fri Oct 14 17:19:24 2022 +0200
NIFI-10650 Fix demarcator is appended to the end of the FlowFile's content by ConsumeMQTT
This closes #6534.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java | 4 +++-
.../test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java | 2 +-
2 files changed, 4 insertions(+), 2 deletions(-)
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
index 849cb17324..c3c299905f 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
@@ -431,8 +431,10 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
int i = 0;
while (!mqttQueue.isEmpty() && i < MAX_MESSAGES_PER_FLOW_FILE) {
final ReceivedMqttMessage mqttMessage = mqttQueue.poll();
+ if (i > 0) {
+ out.write(demarcator);
+ }
out.write(mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload());
- out.write(demarcator);
session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
i++;
}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
index af830f93d2..9a004af7e2 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
@@ -516,7 +516,7 @@ public class TestConsumeMQTT {
assertEquals(flowFiles.size(), 1);
assertEquals("{\"name\":\"Apache NiFi\"}\\n"
+ THIS_IS_NOT_JSON + "\\n"
- + "{\"name\":\"Apache NiFi\"}\\n",
+ + "{\"name\":\"Apache NiFi\"}",
new String(flowFiles.get(0).toByteArray()));
final List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);