You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/12/23 13:05:21 UTC

[GitHub] [nifi] exceptionfactory commented on a change in pull request #4738: NIFI-7890 - Added record support to ConsumeMQTT processor

exceptionfactory commented on a change in pull request #4738:
URL: https://github.com/apache/nifi/pull/4738#discussion_r547938223



##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -322,14 +425,203 @@ public void process(final OutputStream out) throws IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){
+        final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                out.write(mqttMessage.getPayload());
+                out.write(demarcator);
+                session.adjustCounter("Records Received", 1L, false);

Review comment:
       Recommend declaring a static variable for the received counter name and reusing in other `adjustCounter()` references.

##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -322,14 +425,203 @@ public void process(final OutputStream out) throws IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){
+        final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                out.write(mqttMessage.getPayload());
+                out.write(demarcator);
+                session.adjustCounter("Records Received", 1L, false);
+            }
+        });
+
+        session.getProvenanceReporter().receive(messageFlowfile, new StringBuilder(broker).append(topicPrefix).append(topicFilter).toString());
+        session.transfer(messageFlowfile, REL_MESSAGE);
+        session.commit();
+    }
+
+    private void transferFailure(final ProcessSession session, final MQTTQueueMessage mqttMessage) {
+        FlowFile messageFlowfile = session.create();
+
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(BROKER_ATTRIBUTE_KEY, broker);
+        attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
+        attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
+        attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
+        attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
+
+        messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
+
+        messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream out) throws IOException {
+                out.write(mqttMessage.getPayload());
+            }
+        });
+
+        String transitUri = new StringBuilder(broker).append(mqttMessage.getTopic()).toString();
+        session.getProvenanceReporter().receive(messageFlowfile, transitUri);
+        session.transfer(messageFlowfile, REL_PARSE_FAILURE);
+        session.adjustCounter("Parse Failures", 1, false);
+    }
+
+    private void transferQueueRecord(final ProcessContext context, final ProcessSession session){
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        FlowFile flowFile = session.create();
+        session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker);
+
+        final Map<String, String> attributes = new HashMap<>();
+        final AtomicInteger recordCount = new AtomicInteger();
+
+        final List<MQTTQueueMessage> doneList = new ArrayList<MQTTQueueMessage>();
+
+        RecordSetWriter writer = null;
+        boolean isWriterInitialized = false;
+
+        try {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.take();
+                final byte[] recordBytes = mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload();
+
+                try (final InputStream in = new ByteArrayInputStream(recordBytes)) {
+                    final RecordReader reader;
+
+                    try {
+                        reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
+                    } catch (final Exception e) {
+                        logger.error("Failed to parse the message from the internal queue, sending to the parse failure relationship", e);
+                        transferFailure(session, mqttMessage);
+                        continue;
+                    }
+
+                    try {
+                        Record record;
+                        while ((record = reader.nextRecord()) != null) {
+
+                            if(!isWriterInitialized) {
+                                final RecordSchema recordSchema = record.getSchema();
+                                final OutputStream rawOut = session.write(flowFile);
+
+                                RecordSchema writeSchema;
+                                try {
+                                    writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
+                                    if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+                                        final List<RecordField> fields = new ArrayList<>();
+                                        fields.addAll(writeSchema.getFields());
+
+                                        fields.add(new RecordField(TOPIC_FIELD_KEY, RecordFieldType.STRING.getDataType()));
+                                        fields.add(new RecordField(QOS_FIELD_KEY, RecordFieldType.INT.getDataType()));
+                                        fields.add(new RecordField(IS_DUPLICATE_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
+                                        fields.add(new RecordField(IS_RETAINED_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
+
+                                        writeSchema = new SimpleRecordSchema(fields);
+                                    }
+                                } catch (final Exception e) {
+                                    logger.error("Failed to obtain Schema for FlowFile, sending to the parse failure relationship", e);
+                                    transferFailure(session, mqttMessage);
+                                    continue;
+                                }
+
+                                writer = writerFactory.createWriter(logger, writeSchema, rawOut, flowFile);
+                                writer.beginRecordSet();
+                            }
+
+                            try {
+                                if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+                                    record.setValue(TOPIC_FIELD_KEY, mqttMessage.getTopic());
+                                    record.setValue(QOS_FIELD_KEY, mqttMessage.getQos());
+                                    record.setValue(IS_RETAINED_FIELD_KEY, mqttMessage.isRetained());
+                                    record.setValue(IS_DUPLICATE_FIELD_KEY, mqttMessage.isDuplicate());
+                                }
+                                writer.write(record);
+                                isWriterInitialized = true;
+                                doneList.add(mqttMessage);
+                            } catch (final RuntimeException re) {
+                                logger.error("Failed to write message using the configured Record Writer, sending to the parse failure relationship", re);
+                                transferFailure(session, mqttMessage);
+                                continue;
+                            }
+
+                            session.adjustCounter("Records Received", 1L, false);
+                        }
+                    } catch (final IOException | MalformedRecordException | SchemaValidationException e) {
+                        logger.error("Failed to write message, sending to the parse failure relationship", e);
+                        transferFailure(session, mqttMessage);
+                        continue;
+                    }
+                } catch (Exception e) {
+                    logger.error("Failed to write message, sending to the parse failure relationship", e);
+                    transferFailure(session, mqttMessage);
+                    continue;
+                }
+            }
+
+            if(writer != null) {
+                final WriteResult writeResult = writer.finishRecordSet();
+                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));

Review comment:
       Should the `record.count` key be added to the list of `WritesAttribute` annotations?  Recommend declaring as a static variable if so.

##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -124,6 +148,45 @@
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("The Record Reader to use for received messages")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("The Record Writer to use in order to serialize the data before writing it to a FlowFile")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor ADD_ATTRIBUTES_AS_FIELDS = new PropertyDescriptor.Builder()
+            .name("Add attributes as fields")
+            .description("If using the Records reader/writer and if setting this property to true, default fields "
+                    + "are going to be added in each record: _topic, _qos, _isDuplicate, _isRetained.")

Review comment:
       Is there a reason for not including the `broker` attribute as one of the internal fields that could be added to each record?

##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -322,14 +425,203 @@ public void process(final OutputStream out) throws IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){
+        final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                out.write(mqttMessage.getPayload());
+                out.write(demarcator);
+                session.adjustCounter("Records Received", 1L, false);
+            }
+        });
+
+        session.getProvenanceReporter().receive(messageFlowfile, new StringBuilder(broker).append(topicPrefix).append(topicFilter).toString());
+        session.transfer(messageFlowfile, REL_MESSAGE);
+        session.commit();
+    }
+
+    private void transferFailure(final ProcessSession session, final MQTTQueueMessage mqttMessage) {
+        FlowFile messageFlowfile = session.create();
+
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(BROKER_ATTRIBUTE_KEY, broker);
+        attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
+        attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
+        attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
+        attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
+
+        messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
+
+        messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream out) throws IOException {
+                out.write(mqttMessage.getPayload());
+            }
+        });
+
+        String transitUri = new StringBuilder(broker).append(mqttMessage.getTopic()).toString();
+        session.getProvenanceReporter().receive(messageFlowfile, transitUri);
+        session.transfer(messageFlowfile, REL_PARSE_FAILURE);
+        session.adjustCounter("Parse Failures", 1, false);
+    }
+
+    private void transferQueueRecord(final ProcessContext context, final ProcessSession session){
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        FlowFile flowFile = session.create();
+        session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker);
+
+        final Map<String, String> attributes = new HashMap<>();
+        final AtomicInteger recordCount = new AtomicInteger();
+
+        final List<MQTTQueueMessage> doneList = new ArrayList<MQTTQueueMessage>();
+
+        RecordSetWriter writer = null;
+        boolean isWriterInitialized = false;
+
+        try {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.take();
+                final byte[] recordBytes = mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload();
+
+                try (final InputStream in = new ByteArrayInputStream(recordBytes)) {
+                    final RecordReader reader;
+
+                    try {
+                        reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
+                    } catch (final Exception e) {
+                        logger.error("Failed to parse the message from the internal queue, sending to the parse failure relationship", e);

Review comment:
       With the possibility of multiple messages being sent to failure during this loop, are there any key properties of the `MQTTQueueMessage` that could be logged to indicate which message failed?

##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
##########
@@ -389,6 +393,227 @@ public void testResizeBuffer() throws Exception {
         flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
     }
 
+    @Test
+    public void testConsumeRecordsWithAddedFields() throws Exception {
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, "record-reader");
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, "record-writer");
+
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        testRunner.addControllerService("record-reader", jsonReader);
+        testRunner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
+        testRunner.enableControllerService(jsonReader);
+
+        final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+        testRunner.addControllerService("record-writer", jsonWriter);
+        testRunner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
+        testRunner.enableControllerService(jsonWriter);
+
+        testRunner.assertValid();
+
+        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT, testRunner.getProcessContext());
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        PublishMessage testMessage = new PublishMessage();
+        testMessage.setPayload(ByteBuffer.wrap("{\"name\":\"Apache NiFi\"}".getBytes()));
+        testMessage.setTopicName("testTopic");
+        testMessage.setDupFlag(false);
+        testMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
+        testMessage.setRetainFlag(false);
+
+        PublishMessage badMessage = new PublishMessage();
+        badMessage.setPayload(ByteBuffer.wrap("ThisIsNotAJSON".getBytes()));
+        badMessage.setTopicName("testTopic");
+        badMessage.setDupFlag(false);
+        badMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
+        badMessage.setRetainFlag(false);
+
+        internalPublish(testMessage);
+        internalPublish(badMessage);
+        internalPublish(testMessage);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        testRunner.run(1, false, false);
+
+        List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+        assertTrue(flowFiles.size() == 1);
+        assertEquals("[{\"name\":\"Apache NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false},"
+                + "{\"name\":\"Apache NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false}]",
+                new String(flowFiles.get(0).toByteArray()));
+
+        List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
+        assertTrue(badFlowFiles.size() == 1);
+        assertEquals("ThisIsNotAJSON", new String(badFlowFiles.get(0).toByteArray()));

Review comment:
       Declaring a variable for the invalid JSON string and reusing it both here and when setting the message payload would help make the test easier to maintain and read.

##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -124,6 +148,45 @@
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("The Record Reader to use for received messages")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("The Record Writer to use in order to serialize the data before writing it to a FlowFile")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor ADD_ATTRIBUTES_AS_FIELDS = new PropertyDescriptor.Builder()
+            .name("Add attributes as fields")

Review comment:
       This property descriptor is missing a `displayName()` field.

##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -322,14 +425,203 @@ public void process(final OutputStream out) throws IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){
+        final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                out.write(mqttMessage.getPayload());
+                out.write(demarcator);
+                session.adjustCounter("Records Received", 1L, false);
+            }
+        });
+
+        session.getProvenanceReporter().receive(messageFlowfile, new StringBuilder(broker).append(topicPrefix).append(topicFilter).toString());
+        session.transfer(messageFlowfile, REL_MESSAGE);
+        session.commit();
+    }
+
+    private void transferFailure(final ProcessSession session, final MQTTQueueMessage mqttMessage) {
+        FlowFile messageFlowfile = session.create();
+
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(BROKER_ATTRIBUTE_KEY, broker);
+        attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
+        attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
+        attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
+        attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
+
+        messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
+
+        messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream out) throws IOException {
+                out.write(mqttMessage.getPayload());
+            }
+        });
+
+        String transitUri = new StringBuilder(broker).append(mqttMessage.getTopic()).toString();
+        session.getProvenanceReporter().receive(messageFlowfile, transitUri);
+        session.transfer(messageFlowfile, REL_PARSE_FAILURE);
+        session.adjustCounter("Parse Failures", 1, false);
+    }
+
+    private void transferQueueRecord(final ProcessContext context, final ProcessSession session){
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        FlowFile flowFile = session.create();
+        session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker);
+
+        final Map<String, String> attributes = new HashMap<>();
+        final AtomicInteger recordCount = new AtomicInteger();
+
+        final List<MQTTQueueMessage> doneList = new ArrayList<MQTTQueueMessage>();
+
+        RecordSetWriter writer = null;
+        boolean isWriterInitialized = false;
+
+        try {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.take();
+                final byte[] recordBytes = mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload();
+
+                try (final InputStream in = new ByteArrayInputStream(recordBytes)) {
+                    final RecordReader reader;
+
+                    try {
+                        reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
+                    } catch (final Exception e) {
+                        logger.error("Failed to parse the message from the internal queue, sending to the parse failure relationship", e);
+                        transferFailure(session, mqttMessage);
+                        continue;
+                    }
+
+                    try {
+                        Record record;
+                        while ((record = reader.nextRecord()) != null) {
+
+                            if(!isWriterInitialized) {
+                                final RecordSchema recordSchema = record.getSchema();
+                                final OutputStream rawOut = session.write(flowFile);
+
+                                RecordSchema writeSchema;
+                                try {
+                                    writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
+                                    if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+                                        final List<RecordField> fields = new ArrayList<>();
+                                        fields.addAll(writeSchema.getFields());
+
+                                        fields.add(new RecordField(TOPIC_FIELD_KEY, RecordFieldType.STRING.getDataType()));
+                                        fields.add(new RecordField(QOS_FIELD_KEY, RecordFieldType.INT.getDataType()));
+                                        fields.add(new RecordField(IS_DUPLICATE_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
+                                        fields.add(new RecordField(IS_RETAINED_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
+
+                                        writeSchema = new SimpleRecordSchema(fields);
+                                    }
+                                } catch (final Exception e) {
+                                    logger.error("Failed to obtain Schema for FlowFile, sending to the parse failure relationship", e);
+                                    transferFailure(session, mqttMessage);
+                                    continue;
+                                }
+
+                                writer = writerFactory.createWriter(logger, writeSchema, rawOut, flowFile);
+                                writer.beginRecordSet();
+                            }
+
+                            try {
+                                if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+                                    record.setValue(TOPIC_FIELD_KEY, mqttMessage.getTopic());
+                                    record.setValue(QOS_FIELD_KEY, mqttMessage.getQos());
+                                    record.setValue(IS_RETAINED_FIELD_KEY, mqttMessage.isRetained());
+                                    record.setValue(IS_DUPLICATE_FIELD_KEY, mqttMessage.isDuplicate());
+                                }
+                                writer.write(record);
+                                isWriterInitialized = true;
+                                doneList.add(mqttMessage);
+                            } catch (final RuntimeException re) {
+                                logger.error("Failed to write message using the configured Record Writer, sending to the parse failure relationship", re);
+                                transferFailure(session, mqttMessage);
+                                continue;
+                            }
+
+                            session.adjustCounter("Records Received", 1L, false);
+                        }
+                    } catch (final IOException | MalformedRecordException | SchemaValidationException e) {
+                        logger.error("Failed to write message, sending to the parse failure relationship", e);
+                        transferFailure(session, mqttMessage);
+                        continue;
+                    }
+                } catch (Exception e) {
+                    logger.error("Failed to write message, sending to the parse failure relationship", e);
+                    transferFailure(session, mqttMessage);
+                    continue;
+                }
+            }
+
+            if(writer != null) {
+                final WriteResult writeResult = writer.finishRecordSet();
+                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+                attributes.putAll(writeResult.getAttributes());
+                recordCount.set(writeResult.getRecordCount());
+            }
+
+        } catch (final Exception e) {
+            context.yield();
+
+            // we try to add the messages back into the internal queue
+            for(MQTTQueueMessage done : doneList) {
+                try {
+                    mqttQueue.offer(done, 1, TimeUnit.SECONDS);
+                } catch (InterruptedException ex) {
+                    logger.error("Could not add message back into the internal queue, this could lead to data loss", ex);

Review comment:
       Similar to the queue processing loop, it would be helpful to indicate the particular message that failed as part of this error message instead of repeating the same message for multiple failures.

##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -322,14 +425,203 @@ public void process(final OutputStream out) throws IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){
+        final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                out.write(mqttMessage.getPayload());
+                out.write(demarcator);
+                session.adjustCounter("Records Received", 1L, false);
+            }
+        });
+
+        session.getProvenanceReporter().receive(messageFlowfile, new StringBuilder(broker).append(topicPrefix).append(topicFilter).toString());
+        session.transfer(messageFlowfile, REL_MESSAGE);
+        session.commit();
+    }
+
+    private void transferFailure(final ProcessSession session, final MQTTQueueMessage mqttMessage) {
+        FlowFile messageFlowfile = session.create();
+
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(BROKER_ATTRIBUTE_KEY, broker);
+        attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
+        attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
+        attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
+        attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
+
+        messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
+
+        messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream out) throws IOException {
+                out.write(mqttMessage.getPayload());
+            }
+        });
+
+        String transitUri = new StringBuilder(broker).append(mqttMessage.getTopic()).toString();
+        session.getProvenanceReporter().receive(messageFlowfile, transitUri);
+        session.transfer(messageFlowfile, REL_PARSE_FAILURE);
+        session.adjustCounter("Parse Failures", 1, false);
+    }
+
+    private void transferQueueRecord(final ProcessContext context, final ProcessSession session){
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        FlowFile flowFile = session.create();
+        session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker);
+
+        final Map<String, String> attributes = new HashMap<>();
+        final AtomicInteger recordCount = new AtomicInteger();
+
+        final List<MQTTQueueMessage> doneList = new ArrayList<MQTTQueueMessage>();
+
+        RecordSetWriter writer = null;
+        boolean isWriterInitialized = false;
+
+        try {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.take();
+                final byte[] recordBytes = mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload();
+
+                try (final InputStream in = new ByteArrayInputStream(recordBytes)) {
+                    final RecordReader reader;
+
+                    try {
+                        reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
+                    } catch (final Exception e) {
+                        logger.error("Failed to parse the message from the internal queue, sending to the parse failure relationship", e);
+                        transferFailure(session, mqttMessage);
+                        continue;
+                    }
+
+                    try {
+                        Record record;
+                        while ((record = reader.nextRecord()) != null) {
+
+                            if(!isWriterInitialized) {
+                                final RecordSchema recordSchema = record.getSchema();
+                                final OutputStream rawOut = session.write(flowFile);
+
+                                RecordSchema writeSchema;
+                                try {
+                                    writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
+                                    if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+                                        final List<RecordField> fields = new ArrayList<>();
+                                        fields.addAll(writeSchema.getFields());
+
+                                        fields.add(new RecordField(TOPIC_FIELD_KEY, RecordFieldType.STRING.getDataType()));
+                                        fields.add(new RecordField(QOS_FIELD_KEY, RecordFieldType.INT.getDataType()));
+                                        fields.add(new RecordField(IS_DUPLICATE_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
+                                        fields.add(new RecordField(IS_RETAINED_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
+
+                                        writeSchema = new SimpleRecordSchema(fields);
+                                    }
+                                } catch (final Exception e) {
+                                    logger.error("Failed to obtain Schema for FlowFile, sending to the parse failure relationship", e);
+                                    transferFailure(session, mqttMessage);
+                                    continue;
+                                }
+
+                                writer = writerFactory.createWriter(logger, writeSchema, rawOut, flowFile);
+                                writer.beginRecordSet();
+                            }
+
+                            try {
+                                if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+                                    record.setValue(TOPIC_FIELD_KEY, mqttMessage.getTopic());
+                                    record.setValue(QOS_FIELD_KEY, mqttMessage.getQos());
+                                    record.setValue(IS_RETAINED_FIELD_KEY, mqttMessage.isRetained());
+                                    record.setValue(IS_DUPLICATE_FIELD_KEY, mqttMessage.isDuplicate());
+                                }
+                                writer.write(record);
+                                isWriterInitialized = true;
+                                doneList.add(mqttMessage);
+                            } catch (final RuntimeException re) {
+                                logger.error("Failed to write message using the configured Record Writer, sending to the parse failure relationship", re);
+                                transferFailure(session, mqttMessage);
+                                continue;
+                            }
+
+                            session.adjustCounter("Records Received", 1L, false);
+                        }
+                    } catch (final IOException | MalformedRecordException | SchemaValidationException e) {
+                        logger.error("Failed to write message, sending to the parse failure relationship", e);
+                        transferFailure(session, mqttMessage);
+                        continue;
+                    }
+                } catch (Exception e) {
+                    logger.error("Failed to write message, sending to the parse failure relationship", e);
+                    transferFailure(session, mqttMessage);
+                    continue;
+                }
+            }
+
+            if(writer != null) {
+                final WriteResult writeResult = writer.finishRecordSet();
+                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+                attributes.putAll(writeResult.getAttributes());
+                recordCount.set(writeResult.getRecordCount());
+            }
+
+        } catch (final Exception e) {
+            context.yield();
+
+            // we try to add the messages back into the internal queue
+            for(MQTTQueueMessage done : doneList) {
+                try {
+                    mqttQueue.offer(done, 1, TimeUnit.SECONDS);
+                } catch (InterruptedException ex) {
+                    logger.error("Could not add message back into the internal queue, this could lead to data loss", ex);
+                }
+            }
+
+            throw new ProcessException("Could not process data received from the MQTT broker(s).", e);

Review comment:
       Should be the `broker` value be included in this message?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org