You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/12/22 16:23:30 UTC

[GitHub] [nifi] pvillard31 opened a new pull request #4738: NIFI-7890 - Added record support to ConsumeMQTT processor

pvillard31 opened a new pull request #4738:
URL: https://github.com/apache/nifi/pull/4738


   This also adds the option to define a delimiter instead of record reader/writer so that multiple messages can be appended into a single flow file. This greatly increases the performances of the processor when the use case requires high throughput in terms of events per second.
   
   Thank you for submitting a contribution to Apache NiFi.
   
   Please provide a short description of the PR here:
   
   #### Description of PR
   
   _Enables X functionality; fixes bug NIFI-YYYY._
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [ ] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically `main`)?
   
   - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [ ] Have you written or updated unit tests to verify your changes?
   - [ ] Have you verified that the full build is successful on JDK 8?
   - [ ] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pvillard31 commented on pull request #4738: NIFI-7890 - Added record support to ConsumeMQTT processor

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on pull request #4738:
URL: https://github.com/apache/nifi/pull/4738#issuecomment-764515288


   Thanks for the review @turcsanyip - I think I addressed your comments, let me know if there is something else.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pvillard31 commented on a change in pull request #4738: NIFI-7890 - Added record support to ConsumeMQTT processor

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on a change in pull request #4738:
URL: https://github.com/apache/nifi/pull/4738#discussion_r551542640



##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -322,14 +425,203 @@ public void process(final OutputStream out) throws IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){
+        final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                out.write(mqttMessage.getPayload());
+                out.write(demarcator);
+                session.adjustCounter("Records Received", 1L, false);
+            }
+        });
+
+        session.getProvenanceReporter().receive(messageFlowfile, new StringBuilder(broker).append(topicPrefix).append(topicFilter).toString());
+        session.transfer(messageFlowfile, REL_MESSAGE);
+        session.commit();
+    }
+
+    private void transferFailure(final ProcessSession session, final MQTTQueueMessage mqttMessage) {
+        FlowFile messageFlowfile = session.create();
+
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(BROKER_ATTRIBUTE_KEY, broker);
+        attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
+        attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
+        attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
+        attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
+
+        messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
+
+        messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream out) throws IOException {
+                out.write(mqttMessage.getPayload());
+            }
+        });
+
+        String transitUri = new StringBuilder(broker).append(mqttMessage.getTopic()).toString();
+        session.getProvenanceReporter().receive(messageFlowfile, transitUri);
+        session.transfer(messageFlowfile, REL_PARSE_FAILURE);
+        session.adjustCounter("Parse Failures", 1, false);
+    }
+
+    private void transferQueueRecord(final ProcessContext context, final ProcessSession session){
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        FlowFile flowFile = session.create();
+        session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker);
+
+        final Map<String, String> attributes = new HashMap<>();
+        final AtomicInteger recordCount = new AtomicInteger();
+
+        final List<MQTTQueueMessage> doneList = new ArrayList<MQTTQueueMessage>();
+
+        RecordSetWriter writer = null;
+        boolean isWriterInitialized = false;
+
+        try {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.take();
+                final byte[] recordBytes = mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload();
+
+                try (final InputStream in = new ByteArrayInputStream(recordBytes)) {
+                    final RecordReader reader;
+
+                    try {
+                        reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
+                    } catch (final Exception e) {
+                        logger.error("Failed to parse the message from the internal queue, sending to the parse failure relationship", e);
+                        transferFailure(session, mqttMessage);
+                        continue;
+                    }
+
+                    try {
+                        Record record;
+                        while ((record = reader.nextRecord()) != null) {
+
+                            if(!isWriterInitialized) {
+                                final RecordSchema recordSchema = record.getSchema();
+                                final OutputStream rawOut = session.write(flowFile);
+
+                                RecordSchema writeSchema;
+                                try {
+                                    writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
+                                    if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+                                        final List<RecordField> fields = new ArrayList<>();
+                                        fields.addAll(writeSchema.getFields());
+
+                                        fields.add(new RecordField(TOPIC_FIELD_KEY, RecordFieldType.STRING.getDataType()));
+                                        fields.add(new RecordField(QOS_FIELD_KEY, RecordFieldType.INT.getDataType()));
+                                        fields.add(new RecordField(IS_DUPLICATE_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
+                                        fields.add(new RecordField(IS_RETAINED_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
+
+                                        writeSchema = new SimpleRecordSchema(fields);
+                                    }
+                                } catch (final Exception e) {
+                                    logger.error("Failed to obtain Schema for FlowFile, sending to the parse failure relationship", e);
+                                    transferFailure(session, mqttMessage);
+                                    continue;
+                                }
+
+                                writer = writerFactory.createWriter(logger, writeSchema, rawOut, flowFile);
+                                writer.beginRecordSet();
+                            }
+
+                            try {
+                                if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+                                    record.setValue(TOPIC_FIELD_KEY, mqttMessage.getTopic());
+                                    record.setValue(QOS_FIELD_KEY, mqttMessage.getQos());
+                                    record.setValue(IS_RETAINED_FIELD_KEY, mqttMessage.isRetained());
+                                    record.setValue(IS_DUPLICATE_FIELD_KEY, mqttMessage.isDuplicate());
+                                }
+                                writer.write(record);
+                                isWriterInitialized = true;
+                                doneList.add(mqttMessage);
+                            } catch (final RuntimeException re) {
+                                logger.error("Failed to write message using the configured Record Writer, sending to the parse failure relationship", re);
+                                transferFailure(session, mqttMessage);
+                                continue;
+                            }
+
+                            session.adjustCounter("Records Received", 1L, false);
+                        }
+                    } catch (final IOException | MalformedRecordException | SchemaValidationException e) {
+                        logger.error("Failed to write message, sending to the parse failure relationship", e);
+                        transferFailure(session, mqttMessage);
+                        continue;
+                    }
+                } catch (Exception e) {
+                    logger.error("Failed to write message, sending to the parse failure relationship", e);
+                    transferFailure(session, mqttMessage);
+                    continue;
+                }
+            }
+
+            if(writer != null) {
+                final WriteResult writeResult = writer.finishRecordSet();
+                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+                attributes.putAll(writeResult.getAttributes());
+                recordCount.set(writeResult.getRecordCount());
+            }
+
+        } catch (final Exception e) {
+            context.yield();
+
+            // we try to add the messages back into the internal queue
+            for(MQTTQueueMessage done : doneList) {
+                try {
+                    mqttQueue.offer(done, 1, TimeUnit.SECONDS);
+                } catch (InterruptedException ex) {
+                    logger.error("Could not add message back into the internal queue, this could lead to data loss", ex);

Review comment:
       The only option I see is to log the payload but this would raise security concerns IMO. I can change the loop to have the log message only once and say how many messages we could not re-insert in the queue.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pvillard31 commented on pull request #4738: NIFI-7890 - Added record support to ConsumeMQTT processor

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on pull request #4738:
URL: https://github.com/apache/nifi/pull/4738#issuecomment-754200530


   Thanks for the review @exceptionfactory and @markap14 - I made some changes following your recommendations.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pvillard31 commented on a change in pull request #4738: NIFI-7890 - Added record support to ConsumeMQTT processor

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on a change in pull request #4738:
URL: https://github.com/apache/nifi/pull/4738#discussion_r551539908



##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -322,14 +425,203 @@ public void process(final OutputStream out) throws IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){
+        final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                out.write(mqttMessage.getPayload());
+                out.write(demarcator);
+                session.adjustCounter("Records Received", 1L, false);
+            }
+        });
+
+        session.getProvenanceReporter().receive(messageFlowfile, new StringBuilder(broker).append(topicPrefix).append(topicFilter).toString());
+        session.transfer(messageFlowfile, REL_MESSAGE);
+        session.commit();
+    }
+
+    private void transferFailure(final ProcessSession session, final MQTTQueueMessage mqttMessage) {
+        FlowFile messageFlowfile = session.create();
+
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(BROKER_ATTRIBUTE_KEY, broker);
+        attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
+        attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
+        attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
+        attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
+
+        messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
+
+        messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream out) throws IOException {
+                out.write(mqttMessage.getPayload());
+            }
+        });
+
+        String transitUri = new StringBuilder(broker).append(mqttMessage.getTopic()).toString();
+        session.getProvenanceReporter().receive(messageFlowfile, transitUri);
+        session.transfer(messageFlowfile, REL_PARSE_FAILURE);
+        session.adjustCounter("Parse Failures", 1, false);
+    }
+
+    private void transferQueueRecord(final ProcessContext context, final ProcessSession session){
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        FlowFile flowFile = session.create();
+        session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker);
+
+        final Map<String, String> attributes = new HashMap<>();
+        final AtomicInteger recordCount = new AtomicInteger();
+
+        final List<MQTTQueueMessage> doneList = new ArrayList<MQTTQueueMessage>();
+
+        RecordSetWriter writer = null;
+        boolean isWriterInitialized = false;
+
+        try {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.take();
+                final byte[] recordBytes = mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload();
+
+                try (final InputStream in = new ByteArrayInputStream(recordBytes)) {
+                    final RecordReader reader;
+
+                    try {
+                        reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
+                    } catch (final Exception e) {
+                        logger.error("Failed to parse the message from the internal queue, sending to the parse failure relationship", e);

Review comment:
       Not really, we just have the payload, QoS and topic.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] asfgit closed pull request #4738: NIFI-7890 - Added record support to ConsumeMQTT processor

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #4738:
URL: https://github.com/apache/nifi/pull/4738


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] exceptionfactory commented on a change in pull request #4738: NIFI-7890 - Added record support to ConsumeMQTT processor

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on a change in pull request #4738:
URL: https://github.com/apache/nifi/pull/4738#discussion_r547938223



##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -322,14 +425,203 @@ public void process(final OutputStream out) throws IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){
+        final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                out.write(mqttMessage.getPayload());
+                out.write(demarcator);
+                session.adjustCounter("Records Received", 1L, false);

Review comment:
       Recommend declaring a static variable for the received counter name and reusing in other `adjustCounter()` references.

##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -322,14 +425,203 @@ public void process(final OutputStream out) throws IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){
+        final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                out.write(mqttMessage.getPayload());
+                out.write(demarcator);
+                session.adjustCounter("Records Received", 1L, false);
+            }
+        });
+
+        session.getProvenanceReporter().receive(messageFlowfile, new StringBuilder(broker).append(topicPrefix).append(topicFilter).toString());
+        session.transfer(messageFlowfile, REL_MESSAGE);
+        session.commit();
+    }
+
+    private void transferFailure(final ProcessSession session, final MQTTQueueMessage mqttMessage) {
+        FlowFile messageFlowfile = session.create();
+
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(BROKER_ATTRIBUTE_KEY, broker);
+        attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
+        attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
+        attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
+        attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
+
+        messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
+
+        messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream out) throws IOException {
+                out.write(mqttMessage.getPayload());
+            }
+        });
+
+        String transitUri = new StringBuilder(broker).append(mqttMessage.getTopic()).toString();
+        session.getProvenanceReporter().receive(messageFlowfile, transitUri);
+        session.transfer(messageFlowfile, REL_PARSE_FAILURE);
+        session.adjustCounter("Parse Failures", 1, false);
+    }
+
+    private void transferQueueRecord(final ProcessContext context, final ProcessSession session){
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        FlowFile flowFile = session.create();
+        session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker);
+
+        final Map<String, String> attributes = new HashMap<>();
+        final AtomicInteger recordCount = new AtomicInteger();
+
+        final List<MQTTQueueMessage> doneList = new ArrayList<MQTTQueueMessage>();
+
+        RecordSetWriter writer = null;
+        boolean isWriterInitialized = false;
+
+        try {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.take();
+                final byte[] recordBytes = mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload();
+
+                try (final InputStream in = new ByteArrayInputStream(recordBytes)) {
+                    final RecordReader reader;
+
+                    try {
+                        reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
+                    } catch (final Exception e) {
+                        logger.error("Failed to parse the message from the internal queue, sending to the parse failure relationship", e);
+                        transferFailure(session, mqttMessage);
+                        continue;
+                    }
+
+                    try {
+                        Record record;
+                        while ((record = reader.nextRecord()) != null) {
+
+                            if(!isWriterInitialized) {
+                                final RecordSchema recordSchema = record.getSchema();
+                                final OutputStream rawOut = session.write(flowFile);
+
+                                RecordSchema writeSchema;
+                                try {
+                                    writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
+                                    if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+                                        final List<RecordField> fields = new ArrayList<>();
+                                        fields.addAll(writeSchema.getFields());
+
+                                        fields.add(new RecordField(TOPIC_FIELD_KEY, RecordFieldType.STRING.getDataType()));
+                                        fields.add(new RecordField(QOS_FIELD_KEY, RecordFieldType.INT.getDataType()));
+                                        fields.add(new RecordField(IS_DUPLICATE_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
+                                        fields.add(new RecordField(IS_RETAINED_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
+
+                                        writeSchema = new SimpleRecordSchema(fields);
+                                    }
+                                } catch (final Exception e) {
+                                    logger.error("Failed to obtain Schema for FlowFile, sending to the parse failure relationship", e);
+                                    transferFailure(session, mqttMessage);
+                                    continue;
+                                }
+
+                                writer = writerFactory.createWriter(logger, writeSchema, rawOut, flowFile);
+                                writer.beginRecordSet();
+                            }
+
+                            try {
+                                if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+                                    record.setValue(TOPIC_FIELD_KEY, mqttMessage.getTopic());
+                                    record.setValue(QOS_FIELD_KEY, mqttMessage.getQos());
+                                    record.setValue(IS_RETAINED_FIELD_KEY, mqttMessage.isRetained());
+                                    record.setValue(IS_DUPLICATE_FIELD_KEY, mqttMessage.isDuplicate());
+                                }
+                                writer.write(record);
+                                isWriterInitialized = true;
+                                doneList.add(mqttMessage);
+                            } catch (final RuntimeException re) {
+                                logger.error("Failed to write message using the configured Record Writer, sending to the parse failure relationship", re);
+                                transferFailure(session, mqttMessage);
+                                continue;
+                            }
+
+                            session.adjustCounter("Records Received", 1L, false);
+                        }
+                    } catch (final IOException | MalformedRecordException | SchemaValidationException e) {
+                        logger.error("Failed to write message, sending to the parse failure relationship", e);
+                        transferFailure(session, mqttMessage);
+                        continue;
+                    }
+                } catch (Exception e) {
+                    logger.error("Failed to write message, sending to the parse failure relationship", e);
+                    transferFailure(session, mqttMessage);
+                    continue;
+                }
+            }
+
+            if(writer != null) {
+                final WriteResult writeResult = writer.finishRecordSet();
+                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));

Review comment:
       Should the `record.count` key be added to the list of `WritesAttribute` annotations?  Recommend declaring as a static variable if so.

##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -124,6 +148,45 @@
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("The Record Reader to use for received messages")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("The Record Writer to use in order to serialize the data before writing it to a FlowFile")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor ADD_ATTRIBUTES_AS_FIELDS = new PropertyDescriptor.Builder()
+            .name("Add attributes as fields")
+            .description("If using the Records reader/writer and if setting this property to true, default fields "
+                    + "are going to be added in each record: _topic, _qos, _isDuplicate, _isRetained.")

Review comment:
       Is there a reason for not including the `broker` attribute as one of the internal fields that could be added to each record?

##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -322,14 +425,203 @@ public void process(final OutputStream out) throws IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){
+        final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                out.write(mqttMessage.getPayload());
+                out.write(demarcator);
+                session.adjustCounter("Records Received", 1L, false);
+            }
+        });
+
+        session.getProvenanceReporter().receive(messageFlowfile, new StringBuilder(broker).append(topicPrefix).append(topicFilter).toString());
+        session.transfer(messageFlowfile, REL_MESSAGE);
+        session.commit();
+    }
+
+    private void transferFailure(final ProcessSession session, final MQTTQueueMessage mqttMessage) {
+        FlowFile messageFlowfile = session.create();
+
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(BROKER_ATTRIBUTE_KEY, broker);
+        attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
+        attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
+        attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
+        attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
+
+        messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
+
+        messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream out) throws IOException {
+                out.write(mqttMessage.getPayload());
+            }
+        });
+
+        String transitUri = new StringBuilder(broker).append(mqttMessage.getTopic()).toString();
+        session.getProvenanceReporter().receive(messageFlowfile, transitUri);
+        session.transfer(messageFlowfile, REL_PARSE_FAILURE);
+        session.adjustCounter("Parse Failures", 1, false);
+    }
+
+    private void transferQueueRecord(final ProcessContext context, final ProcessSession session){
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        FlowFile flowFile = session.create();
+        session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker);
+
+        final Map<String, String> attributes = new HashMap<>();
+        final AtomicInteger recordCount = new AtomicInteger();
+
+        final List<MQTTQueueMessage> doneList = new ArrayList<MQTTQueueMessage>();
+
+        RecordSetWriter writer = null;
+        boolean isWriterInitialized = false;
+
+        try {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.take();
+                final byte[] recordBytes = mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload();
+
+                try (final InputStream in = new ByteArrayInputStream(recordBytes)) {
+                    final RecordReader reader;
+
+                    try {
+                        reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
+                    } catch (final Exception e) {
+                        logger.error("Failed to parse the message from the internal queue, sending to the parse failure relationship", e);

Review comment:
       With the possibility of multiple messages being sent to failure during this loop, are there any key properties of the `MQTTQueueMessage` that could be logged to indicate which message failed?

##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
##########
@@ -389,6 +393,227 @@ public void testResizeBuffer() throws Exception {
         flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
     }
 
+    @Test
+    public void testConsumeRecordsWithAddedFields() throws Exception {
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, "record-reader");
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, "record-writer");
+
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        testRunner.addControllerService("record-reader", jsonReader);
+        testRunner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
+        testRunner.enableControllerService(jsonReader);
+
+        final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+        testRunner.addControllerService("record-writer", jsonWriter);
+        testRunner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
+        testRunner.enableControllerService(jsonWriter);
+
+        testRunner.assertValid();
+
+        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT, testRunner.getProcessContext());
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        PublishMessage testMessage = new PublishMessage();
+        testMessage.setPayload(ByteBuffer.wrap("{\"name\":\"Apache NiFi\"}".getBytes()));
+        testMessage.setTopicName("testTopic");
+        testMessage.setDupFlag(false);
+        testMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
+        testMessage.setRetainFlag(false);
+
+        PublishMessage badMessage = new PublishMessage();
+        badMessage.setPayload(ByteBuffer.wrap("ThisIsNotAJSON".getBytes()));
+        badMessage.setTopicName("testTopic");
+        badMessage.setDupFlag(false);
+        badMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
+        badMessage.setRetainFlag(false);
+
+        internalPublish(testMessage);
+        internalPublish(badMessage);
+        internalPublish(testMessage);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        testRunner.run(1, false, false);
+
+        List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+        assertTrue(flowFiles.size() == 1);
+        assertEquals("[{\"name\":\"Apache NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false},"
+                + "{\"name\":\"Apache NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false}]",
+                new String(flowFiles.get(0).toByteArray()));
+
+        List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
+        assertTrue(badFlowFiles.size() == 1);
+        assertEquals("ThisIsNotAJSON", new String(badFlowFiles.get(0).toByteArray()));

Review comment:
       Declaring a variable for the invalid JSON string and reusing it both here and when setting the message payload would help make the test easier to maintain and read.

##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -124,6 +148,45 @@
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("The Record Reader to use for received messages")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("The Record Writer to use in order to serialize the data before writing it to a FlowFile")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor ADD_ATTRIBUTES_AS_FIELDS = new PropertyDescriptor.Builder()
+            .name("Add attributes as fields")

Review comment:
       This property descriptor is missing a `displayName()` field.

##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -322,14 +425,203 @@ public void process(final OutputStream out) throws IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){
+        final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                out.write(mqttMessage.getPayload());
+                out.write(demarcator);
+                session.adjustCounter("Records Received", 1L, false);
+            }
+        });
+
+        session.getProvenanceReporter().receive(messageFlowfile, new StringBuilder(broker).append(topicPrefix).append(topicFilter).toString());
+        session.transfer(messageFlowfile, REL_MESSAGE);
+        session.commit();
+    }
+
+    private void transferFailure(final ProcessSession session, final MQTTQueueMessage mqttMessage) {
+        FlowFile messageFlowfile = session.create();
+
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(BROKER_ATTRIBUTE_KEY, broker);
+        attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
+        attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
+        attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
+        attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
+
+        messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
+
+        messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream out) throws IOException {
+                out.write(mqttMessage.getPayload());
+            }
+        });
+
+        String transitUri = new StringBuilder(broker).append(mqttMessage.getTopic()).toString();
+        session.getProvenanceReporter().receive(messageFlowfile, transitUri);
+        session.transfer(messageFlowfile, REL_PARSE_FAILURE);
+        session.adjustCounter("Parse Failures", 1, false);
+    }
+
+    private void transferQueueRecord(final ProcessContext context, final ProcessSession session){
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        FlowFile flowFile = session.create();
+        session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker);
+
+        final Map<String, String> attributes = new HashMap<>();
+        final AtomicInteger recordCount = new AtomicInteger();
+
+        final List<MQTTQueueMessage> doneList = new ArrayList<MQTTQueueMessage>();
+
+        RecordSetWriter writer = null;
+        boolean isWriterInitialized = false;
+
+        try {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.take();
+                final byte[] recordBytes = mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload();
+
+                try (final InputStream in = new ByteArrayInputStream(recordBytes)) {
+                    final RecordReader reader;
+
+                    try {
+                        reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
+                    } catch (final Exception e) {
+                        logger.error("Failed to parse the message from the internal queue, sending to the parse failure relationship", e);
+                        transferFailure(session, mqttMessage);
+                        continue;
+                    }
+
+                    try {
+                        Record record;
+                        while ((record = reader.nextRecord()) != null) {
+
+                            if(!isWriterInitialized) {
+                                final RecordSchema recordSchema = record.getSchema();
+                                final OutputStream rawOut = session.write(flowFile);
+
+                                RecordSchema writeSchema;
+                                try {
+                                    writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
+                                    if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+                                        final List<RecordField> fields = new ArrayList<>();
+                                        fields.addAll(writeSchema.getFields());
+
+                                        fields.add(new RecordField(TOPIC_FIELD_KEY, RecordFieldType.STRING.getDataType()));
+                                        fields.add(new RecordField(QOS_FIELD_KEY, RecordFieldType.INT.getDataType()));
+                                        fields.add(new RecordField(IS_DUPLICATE_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
+                                        fields.add(new RecordField(IS_RETAINED_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
+
+                                        writeSchema = new SimpleRecordSchema(fields);
+                                    }
+                                } catch (final Exception e) {
+                                    logger.error("Failed to obtain Schema for FlowFile, sending to the parse failure relationship", e);
+                                    transferFailure(session, mqttMessage);
+                                    continue;
+                                }
+
+                                writer = writerFactory.createWriter(logger, writeSchema, rawOut, flowFile);
+                                writer.beginRecordSet();
+                            }
+
+                            try {
+                                if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+                                    record.setValue(TOPIC_FIELD_KEY, mqttMessage.getTopic());
+                                    record.setValue(QOS_FIELD_KEY, mqttMessage.getQos());
+                                    record.setValue(IS_RETAINED_FIELD_KEY, mqttMessage.isRetained());
+                                    record.setValue(IS_DUPLICATE_FIELD_KEY, mqttMessage.isDuplicate());
+                                }
+                                writer.write(record);
+                                isWriterInitialized = true;
+                                doneList.add(mqttMessage);
+                            } catch (final RuntimeException re) {
+                                logger.error("Failed to write message using the configured Record Writer, sending to the parse failure relationship", re);
+                                transferFailure(session, mqttMessage);
+                                continue;
+                            }
+
+                            session.adjustCounter("Records Received", 1L, false);
+                        }
+                    } catch (final IOException | MalformedRecordException | SchemaValidationException e) {
+                        logger.error("Failed to write message, sending to the parse failure relationship", e);
+                        transferFailure(session, mqttMessage);
+                        continue;
+                    }
+                } catch (Exception e) {
+                    logger.error("Failed to write message, sending to the parse failure relationship", e);
+                    transferFailure(session, mqttMessage);
+                    continue;
+                }
+            }
+
+            if(writer != null) {
+                final WriteResult writeResult = writer.finishRecordSet();
+                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+                attributes.putAll(writeResult.getAttributes());
+                recordCount.set(writeResult.getRecordCount());
+            }
+
+        } catch (final Exception e) {
+            context.yield();
+
+            // we try to add the messages back into the internal queue
+            for(MQTTQueueMessage done : doneList) {
+                try {
+                    mqttQueue.offer(done, 1, TimeUnit.SECONDS);
+                } catch (InterruptedException ex) {
+                    logger.error("Could not add message back into the internal queue, this could lead to data loss", ex);

Review comment:
       Similar to the queue processing loop, it would be helpful to indicate the particular message that failed as part of this error message instead of repeating the same message for multiple failures.

##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -322,14 +425,203 @@ public void process(final OutputStream out) throws IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){
+        final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                out.write(mqttMessage.getPayload());
+                out.write(demarcator);
+                session.adjustCounter("Records Received", 1L, false);
+            }
+        });
+
+        session.getProvenanceReporter().receive(messageFlowfile, new StringBuilder(broker).append(topicPrefix).append(topicFilter).toString());
+        session.transfer(messageFlowfile, REL_MESSAGE);
+        session.commit();
+    }
+
+    private void transferFailure(final ProcessSession session, final MQTTQueueMessage mqttMessage) {
+        FlowFile messageFlowfile = session.create();
+
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(BROKER_ATTRIBUTE_KEY, broker);
+        attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
+        attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
+        attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
+        attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
+
+        messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
+
+        messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream out) throws IOException {
+                out.write(mqttMessage.getPayload());
+            }
+        });
+
+        String transitUri = new StringBuilder(broker).append(mqttMessage.getTopic()).toString();
+        session.getProvenanceReporter().receive(messageFlowfile, transitUri);
+        session.transfer(messageFlowfile, REL_PARSE_FAILURE);
+        session.adjustCounter("Parse Failures", 1, false);
+    }
+
+    private void transferQueueRecord(final ProcessContext context, final ProcessSession session){
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        FlowFile flowFile = session.create();
+        session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker);
+
+        final Map<String, String> attributes = new HashMap<>();
+        final AtomicInteger recordCount = new AtomicInteger();
+
+        final List<MQTTQueueMessage> doneList = new ArrayList<MQTTQueueMessage>();
+
+        RecordSetWriter writer = null;
+        boolean isWriterInitialized = false;
+
+        try {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.take();
+                final byte[] recordBytes = mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload();
+
+                try (final InputStream in = new ByteArrayInputStream(recordBytes)) {
+                    final RecordReader reader;
+
+                    try {
+                        reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
+                    } catch (final Exception e) {
+                        logger.error("Failed to parse the message from the internal queue, sending to the parse failure relationship", e);
+                        transferFailure(session, mqttMessage);
+                        continue;
+                    }
+
+                    try {
+                        Record record;
+                        while ((record = reader.nextRecord()) != null) {
+
+                            if(!isWriterInitialized) {
+                                final RecordSchema recordSchema = record.getSchema();
+                                final OutputStream rawOut = session.write(flowFile);
+
+                                RecordSchema writeSchema;
+                                try {
+                                    writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
+                                    if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+                                        final List<RecordField> fields = new ArrayList<>();
+                                        fields.addAll(writeSchema.getFields());
+
+                                        fields.add(new RecordField(TOPIC_FIELD_KEY, RecordFieldType.STRING.getDataType()));
+                                        fields.add(new RecordField(QOS_FIELD_KEY, RecordFieldType.INT.getDataType()));
+                                        fields.add(new RecordField(IS_DUPLICATE_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
+                                        fields.add(new RecordField(IS_RETAINED_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
+
+                                        writeSchema = new SimpleRecordSchema(fields);
+                                    }
+                                } catch (final Exception e) {
+                                    logger.error("Failed to obtain Schema for FlowFile, sending to the parse failure relationship", e);
+                                    transferFailure(session, mqttMessage);
+                                    continue;
+                                }
+
+                                writer = writerFactory.createWriter(logger, writeSchema, rawOut, flowFile);
+                                writer.beginRecordSet();
+                            }
+
+                            try {
+                                if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+                                    record.setValue(TOPIC_FIELD_KEY, mqttMessage.getTopic());
+                                    record.setValue(QOS_FIELD_KEY, mqttMessage.getQos());
+                                    record.setValue(IS_RETAINED_FIELD_KEY, mqttMessage.isRetained());
+                                    record.setValue(IS_DUPLICATE_FIELD_KEY, mqttMessage.isDuplicate());
+                                }
+                                writer.write(record);
+                                isWriterInitialized = true;
+                                doneList.add(mqttMessage);
+                            } catch (final RuntimeException re) {
+                                logger.error("Failed to write message using the configured Record Writer, sending to the parse failure relationship", re);
+                                transferFailure(session, mqttMessage);
+                                continue;
+                            }
+
+                            session.adjustCounter("Records Received", 1L, false);
+                        }
+                    } catch (final IOException | MalformedRecordException | SchemaValidationException e) {
+                        logger.error("Failed to write message, sending to the parse failure relationship", e);
+                        transferFailure(session, mqttMessage);
+                        continue;
+                    }
+                } catch (Exception e) {
+                    logger.error("Failed to write message, sending to the parse failure relationship", e);
+                    transferFailure(session, mqttMessage);
+                    continue;
+                }
+            }
+
+            if(writer != null) {
+                final WriteResult writeResult = writer.finishRecordSet();
+                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+                attributes.putAll(writeResult.getAttributes());
+                recordCount.set(writeResult.getRecordCount());
+            }
+
+        } catch (final Exception e) {
+            context.yield();
+
+            // we try to add the messages back into the internal queue
+            for(MQTTQueueMessage done : doneList) {
+                try {
+                    mqttQueue.offer(done, 1, TimeUnit.SECONDS);
+                } catch (InterruptedException ex) {
+                    logger.error("Could not add message back into the internal queue, this could lead to data loss", ex);
+                }
+            }
+
+            throw new ProcessException("Could not process data received from the MQTT broker(s).", e);

Review comment:
       Should be the `broker` value be included in this message?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pvillard31 commented on pull request #4738: NIFI-7890 - Added record support to ConsumeMQTT processor

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on pull request #4738:
URL: https://github.com/apache/nifi/pull/4738#issuecomment-764515288


   Thanks for the review @turcsanyip - I think I addressed your comments, let me know if there is something else.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] turcsanyip commented on a change in pull request #4738: NIFI-7890 - Added record support to ConsumeMQTT processor

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4738:
URL: https://github.com/apache/nifi/pull/4738#discussion_r561332768



##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -334,14 +443,210 @@ public void process(final OutputStream out) throws IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){
+        final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                out.write(mqttMessage.getPayload());
+                out.write(demarcator);
+                session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
+            }
+        });
+
+        session.getProvenanceReporter().receive(messageFlowfile, new StringBuilder(broker).append(topicPrefix).append(topicFilter).toString());

Review comment:
       There is no separator character between the broker and the topic prefix (eg.: `tcp://myhost:1883mytopic`).
   `'/'` cloud be added before topic prefix.
   It could be changed in the existing `transferQueue()` method too. 

##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -334,14 +443,210 @@ public void process(final OutputStream out) throws IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){
+        final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {

Review comment:
       Emptying the queue seems to me a bit non-deterministic behaviour because the queue is being written at the same time by the receiver thread.
   Would not it be useful to define a max. size that may be fetched in one go? (a magic number or a processor property) 

##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -334,14 +443,210 @@ public void process(final OutputStream out) throws IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){
+        final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                out.write(mqttMessage.getPayload());
+                out.write(demarcator);
+                session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
+            }
+        });
+
+        session.getProvenanceReporter().receive(messageFlowfile, new StringBuilder(broker).append(topicPrefix).append(topicFilter).toString());
+        session.transfer(messageFlowfile, REL_MESSAGE);
+        session.commit();
+    }
+
+    private void transferFailure(final ProcessSession session, final MQTTQueueMessage mqttMessage) {
+        FlowFile messageFlowfile = session.create();
+
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(BROKER_ATTRIBUTE_KEY, broker);
+        attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
+        attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
+        attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
+        attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
+
+        messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
+
+        messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream out) throws IOException {
+                out.write(mqttMessage.getPayload());
+            }
+        });
+
+        String transitUri = new StringBuilder(broker).append(mqttMessage.getTopic()).toString();
+        session.getProvenanceReporter().receive(messageFlowfile, transitUri);
+        session.transfer(messageFlowfile, REL_PARSE_FAILURE);
+        session.adjustCounter(COUNTER_PARSE_FAILURES, 1, false);
+    }
+
+    private void transferQueueRecord(final ProcessContext context, final ProcessSession session){
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        FlowFile flowFile = session.create();
+        session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker);
+
+        final Map<String, String> attributes = new HashMap<>();
+        final AtomicInteger recordCount = new AtomicInteger();
+
+        final List<MQTTQueueMessage> doneList = new ArrayList<MQTTQueueMessage>();
+
+        RecordSetWriter writer = null;
+        boolean isWriterInitialized = false;
+
+        try {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.take();
+                final byte[] recordBytes = mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload();
+
+                try (final InputStream in = new ByteArrayInputStream(recordBytes)) {
+                    final RecordReader reader;
+
+                    try {
+                        reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
+                    } catch (final Exception e) {
+                        logger.error("Failed to parse the message from the internal queue, sending to the parse failure relationship", e);
+                        transferFailure(session, mqttMessage);
+                        continue;
+                    }
+
+                    try {
+                        Record record;
+                        while ((record = reader.nextRecord()) != null) {
+
+                            if(!isWriterInitialized) {
+                                final RecordSchema recordSchema = record.getSchema();
+                                final OutputStream rawOut = session.write(flowFile);
+
+                                RecordSchema writeSchema;
+                                try {
+                                    writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
+                                    if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+                                        final List<RecordField> fields = new ArrayList<>();
+                                        fields.addAll(writeSchema.getFields());
+
+                                        fields.add(new RecordField(TOPIC_FIELD_KEY, RecordFieldType.STRING.getDataType()));
+                                        fields.add(new RecordField(QOS_FIELD_KEY, RecordFieldType.INT.getDataType()));
+                                        fields.add(new RecordField(IS_DUPLICATE_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
+                                        fields.add(new RecordField(IS_RETAINED_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
+
+                                        writeSchema = new SimpleRecordSchema(fields);
+                                    }
+                                } catch (final Exception e) {
+                                    logger.error("Failed to obtain Schema for FlowFile, sending to the parse failure relationship", e);
+                                    transferFailure(session, mqttMessage);
+                                    continue;
+                                }
+
+                                writer = writerFactory.createWriter(logger, writeSchema, rawOut, flowFile);
+                                writer.beginRecordSet();
+                            }
+
+                            try {
+                                if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+                                    record.setValue(TOPIC_FIELD_KEY, mqttMessage.getTopic());
+                                    record.setValue(QOS_FIELD_KEY, mqttMessage.getQos());
+                                    record.setValue(IS_RETAINED_FIELD_KEY, mqttMessage.isRetained());
+                                    record.setValue(IS_DUPLICATE_FIELD_KEY, mqttMessage.isDuplicate());
+                                }
+                                writer.write(record);
+                                isWriterInitialized = true;
+                                doneList.add(mqttMessage);
+                            } catch (final RuntimeException re) {
+                                logger.error("Failed to write message using the configured Record Writer, sending to the parse failure relationship", re);
+                                transferFailure(session, mqttMessage);
+                                continue;
+                            }
+
+                            session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
+                        }
+                    } catch (final IOException | MalformedRecordException | SchemaValidationException e) {
+                        logger.error("Failed to write message, sending to the parse failure relationship", e);
+                        transferFailure(session, mqttMessage);
+                        continue;
+                    }
+                } catch (Exception e) {
+                    logger.error("Failed to write message, sending to the parse failure relationship", e);
+                    transferFailure(session, mqttMessage);
+                    continue;
+                }
+            }
+
+            if(writer != null) {
+                final WriteResult writeResult = writer.finishRecordSet();
+                attributes.put(RECORD_COUNT_KEY, String.valueOf(writeResult.getRecordCount()));
+                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+                attributes.putAll(writeResult.getAttributes());
+                recordCount.set(writeResult.getRecordCount());
+            }
+
+        } catch (final Exception e) {
+            context.yield();
+
+            // we try to add the messages back into the internal queue
+            int numberOfMessages = 0;
+            for(MQTTQueueMessage done : doneList) {
+                try {
+                    mqttQueue.offer(done, 1, TimeUnit.SECONDS);
+                } catch (InterruptedException ex) {
+                    numberOfMessages++;
+                    if(getLogger().isDebugEnabled()) {
+                        logger.debug("Could not add message back into the internal queue, this could lead to data loss", ex);
+                    }
+                }
+            }
+            if(numberOfMessages > 0) {
+                logger.error("Could not add {} message(s) back into the internal queue, this could mean data loss", new Object[] {numberOfMessages});
+            }
+
+            throw new ProcessException("Could not process data received from the MQTT broker(s): " + broker, e);
+        } finally {
+            closeWriter(writer);
+        }
+
+        if(recordCount.get() == 0) {
+            session.remove(flowFile);
+            return;
+        }
+
+        session.putAllAttributes(flowFile, attributes);
+        session.getProvenanceReporter().receive(flowFile, broker);

Review comment:
       Topic info could be added to the Transit Uri as in the demarcator method.

##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -334,14 +443,210 @@ public void process(final OutputStream out) throws IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){
+        final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                out.write(mqttMessage.getPayload());
+                out.write(demarcator);
+                session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
+            }
+        });
+
+        session.getProvenanceReporter().receive(messageFlowfile, new StringBuilder(broker).append(topicPrefix).append(topicFilter).toString());
+        session.transfer(messageFlowfile, REL_MESSAGE);
+        session.commit();
+    }
+
+    private void transferFailure(final ProcessSession session, final MQTTQueueMessage mqttMessage) {
+        FlowFile messageFlowfile = session.create();
+
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(BROKER_ATTRIBUTE_KEY, broker);
+        attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
+        attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
+        attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
+        attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
+
+        messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
+
+        messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream out) throws IOException {
+                out.write(mqttMessage.getPayload());
+            }
+        });
+
+        String transitUri = new StringBuilder(broker).append(mqttMessage.getTopic()).toString();
+        session.getProvenanceReporter().receive(messageFlowfile, transitUri);
+        session.transfer(messageFlowfile, REL_PARSE_FAILURE);
+        session.adjustCounter(COUNTER_PARSE_FAILURES, 1, false);
+    }
+
+    private void transferQueueRecord(final ProcessContext context, final ProcessSession session){
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        FlowFile flowFile = session.create();
+        session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker);
+
+        final Map<String, String> attributes = new HashMap<>();
+        final AtomicInteger recordCount = new AtomicInteger();
+
+        final List<MQTTQueueMessage> doneList = new ArrayList<MQTTQueueMessage>();
+
+        RecordSetWriter writer = null;
+        boolean isWriterInitialized = false;
+
+        try {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.take();
+                final byte[] recordBytes = mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload();

Review comment:
       `null` is not checked in the other two `transferQueue*` methods. Should it be checked there too or is it unnecessary here?

##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -334,14 +443,210 @@ public void process(final OutputStream out) throws IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){
+        final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                out.write(mqttMessage.getPayload());
+                out.write(demarcator);
+                session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
+            }
+        });
+
+        session.getProvenanceReporter().receive(messageFlowfile, new StringBuilder(broker).append(topicPrefix).append(topicFilter).toString());
+        session.transfer(messageFlowfile, REL_MESSAGE);
+        session.commit();
+    }
+
+    private void transferFailure(final ProcessSession session, final MQTTQueueMessage mqttMessage) {
+        FlowFile messageFlowfile = session.create();
+
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(BROKER_ATTRIBUTE_KEY, broker);
+        attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
+        attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
+        attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
+        attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
+
+        messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
+
+        messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream out) throws IOException {
+                out.write(mqttMessage.getPayload());
+            }
+        });
+
+        String transitUri = new StringBuilder(broker).append(mqttMessage.getTopic()).toString();
+        session.getProvenanceReporter().receive(messageFlowfile, transitUri);
+        session.transfer(messageFlowfile, REL_PARSE_FAILURE);
+        session.adjustCounter(COUNTER_PARSE_FAILURES, 1, false);
+    }
+
+    private void transferQueueRecord(final ProcessContext context, final ProcessSession session){
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        FlowFile flowFile = session.create();
+        session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker);
+
+        final Map<String, String> attributes = new HashMap<>();
+        final AtomicInteger recordCount = new AtomicInteger();
+
+        final List<MQTTQueueMessage> doneList = new ArrayList<MQTTQueueMessage>();
+
+        RecordSetWriter writer = null;
+        boolean isWriterInitialized = false;
+
+        try {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.take();

Review comment:
       Is there a specific reason why `take()` is used here, while `poll()` in the other two `transferQueue*` methods?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] asfgit closed pull request #4738: NIFI-7890 - Added record support to ConsumeMQTT processor

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #4738:
URL: https://github.com/apache/nifi/pull/4738


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] turcsanyip commented on a change in pull request #4738: NIFI-7890 - Added record support to ConsumeMQTT processor

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4738:
URL: https://github.com/apache/nifi/pull/4738#discussion_r561332768



##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -334,14 +443,210 @@ public void process(final OutputStream out) throws IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){
+        final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                out.write(mqttMessage.getPayload());
+                out.write(demarcator);
+                session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
+            }
+        });
+
+        session.getProvenanceReporter().receive(messageFlowfile, new StringBuilder(broker).append(topicPrefix).append(topicFilter).toString());

Review comment:
       There is no separator character between the broker and the topic prefix (eg.: `tcp://myhost:1883mytopic`).
   `'/'` cloud be added before topic prefix.
   It could be changed in the existing `transferQueue()` method too. 

##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -334,14 +443,210 @@ public void process(final OutputStream out) throws IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){
+        final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {

Review comment:
       Emptying the queue seems to me a bit non-deterministic behaviour because the queue is being written at the same time by the receiver thread.
   Would not it be useful to define a max. size that may be fetched in one go? (a magic number or a processor property) 

##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -334,14 +443,210 @@ public void process(final OutputStream out) throws IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){
+        final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                out.write(mqttMessage.getPayload());
+                out.write(demarcator);
+                session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
+            }
+        });
+
+        session.getProvenanceReporter().receive(messageFlowfile, new StringBuilder(broker).append(topicPrefix).append(topicFilter).toString());
+        session.transfer(messageFlowfile, REL_MESSAGE);
+        session.commit();
+    }
+
+    private void transferFailure(final ProcessSession session, final MQTTQueueMessage mqttMessage) {
+        FlowFile messageFlowfile = session.create();
+
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(BROKER_ATTRIBUTE_KEY, broker);
+        attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
+        attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
+        attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
+        attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
+
+        messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
+
+        messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream out) throws IOException {
+                out.write(mqttMessage.getPayload());
+            }
+        });
+
+        String transitUri = new StringBuilder(broker).append(mqttMessage.getTopic()).toString();
+        session.getProvenanceReporter().receive(messageFlowfile, transitUri);
+        session.transfer(messageFlowfile, REL_PARSE_FAILURE);
+        session.adjustCounter(COUNTER_PARSE_FAILURES, 1, false);
+    }
+
+    private void transferQueueRecord(final ProcessContext context, final ProcessSession session){
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        FlowFile flowFile = session.create();
+        session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker);
+
+        final Map<String, String> attributes = new HashMap<>();
+        final AtomicInteger recordCount = new AtomicInteger();
+
+        final List<MQTTQueueMessage> doneList = new ArrayList<MQTTQueueMessage>();
+
+        RecordSetWriter writer = null;
+        boolean isWriterInitialized = false;
+
+        try {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.take();
+                final byte[] recordBytes = mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload();
+
+                try (final InputStream in = new ByteArrayInputStream(recordBytes)) {
+                    final RecordReader reader;
+
+                    try {
+                        reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
+                    } catch (final Exception e) {
+                        logger.error("Failed to parse the message from the internal queue, sending to the parse failure relationship", e);
+                        transferFailure(session, mqttMessage);
+                        continue;
+                    }
+
+                    try {
+                        Record record;
+                        while ((record = reader.nextRecord()) != null) {
+
+                            if(!isWriterInitialized) {
+                                final RecordSchema recordSchema = record.getSchema();
+                                final OutputStream rawOut = session.write(flowFile);
+
+                                RecordSchema writeSchema;
+                                try {
+                                    writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
+                                    if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+                                        final List<RecordField> fields = new ArrayList<>();
+                                        fields.addAll(writeSchema.getFields());
+
+                                        fields.add(new RecordField(TOPIC_FIELD_KEY, RecordFieldType.STRING.getDataType()));
+                                        fields.add(new RecordField(QOS_FIELD_KEY, RecordFieldType.INT.getDataType()));
+                                        fields.add(new RecordField(IS_DUPLICATE_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
+                                        fields.add(new RecordField(IS_RETAINED_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
+
+                                        writeSchema = new SimpleRecordSchema(fields);
+                                    }
+                                } catch (final Exception e) {
+                                    logger.error("Failed to obtain Schema for FlowFile, sending to the parse failure relationship", e);
+                                    transferFailure(session, mqttMessage);
+                                    continue;
+                                }
+
+                                writer = writerFactory.createWriter(logger, writeSchema, rawOut, flowFile);
+                                writer.beginRecordSet();
+                            }
+
+                            try {
+                                if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+                                    record.setValue(TOPIC_FIELD_KEY, mqttMessage.getTopic());
+                                    record.setValue(QOS_FIELD_KEY, mqttMessage.getQos());
+                                    record.setValue(IS_RETAINED_FIELD_KEY, mqttMessage.isRetained());
+                                    record.setValue(IS_DUPLICATE_FIELD_KEY, mqttMessage.isDuplicate());
+                                }
+                                writer.write(record);
+                                isWriterInitialized = true;
+                                doneList.add(mqttMessage);
+                            } catch (final RuntimeException re) {
+                                logger.error("Failed to write message using the configured Record Writer, sending to the parse failure relationship", re);
+                                transferFailure(session, mqttMessage);
+                                continue;
+                            }
+
+                            session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
+                        }
+                    } catch (final IOException | MalformedRecordException | SchemaValidationException e) {
+                        logger.error("Failed to write message, sending to the parse failure relationship", e);
+                        transferFailure(session, mqttMessage);
+                        continue;
+                    }
+                } catch (Exception e) {
+                    logger.error("Failed to write message, sending to the parse failure relationship", e);
+                    transferFailure(session, mqttMessage);
+                    continue;
+                }
+            }
+
+            if(writer != null) {
+                final WriteResult writeResult = writer.finishRecordSet();
+                attributes.put(RECORD_COUNT_KEY, String.valueOf(writeResult.getRecordCount()));
+                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+                attributes.putAll(writeResult.getAttributes());
+                recordCount.set(writeResult.getRecordCount());
+            }
+
+        } catch (final Exception e) {
+            context.yield();
+
+            // we try to add the messages back into the internal queue
+            int numberOfMessages = 0;
+            for(MQTTQueueMessage done : doneList) {
+                try {
+                    mqttQueue.offer(done, 1, TimeUnit.SECONDS);
+                } catch (InterruptedException ex) {
+                    numberOfMessages++;
+                    if(getLogger().isDebugEnabled()) {
+                        logger.debug("Could not add message back into the internal queue, this could lead to data loss", ex);
+                    }
+                }
+            }
+            if(numberOfMessages > 0) {
+                logger.error("Could not add {} message(s) back into the internal queue, this could mean data loss", new Object[] {numberOfMessages});
+            }
+
+            throw new ProcessException("Could not process data received from the MQTT broker(s): " + broker, e);
+        } finally {
+            closeWriter(writer);
+        }
+
+        if(recordCount.get() == 0) {
+            session.remove(flowFile);
+            return;
+        }
+
+        session.putAllAttributes(flowFile, attributes);
+        session.getProvenanceReporter().receive(flowFile, broker);

Review comment:
       Topic info could be added to the Transit Uri as in the demarcator method.

##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -334,14 +443,210 @@ public void process(final OutputStream out) throws IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){
+        final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                out.write(mqttMessage.getPayload());
+                out.write(demarcator);
+                session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
+            }
+        });
+
+        session.getProvenanceReporter().receive(messageFlowfile, new StringBuilder(broker).append(topicPrefix).append(topicFilter).toString());
+        session.transfer(messageFlowfile, REL_MESSAGE);
+        session.commit();
+    }
+
+    private void transferFailure(final ProcessSession session, final MQTTQueueMessage mqttMessage) {
+        FlowFile messageFlowfile = session.create();
+
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(BROKER_ATTRIBUTE_KEY, broker);
+        attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
+        attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
+        attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
+        attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
+
+        messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
+
+        messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream out) throws IOException {
+                out.write(mqttMessage.getPayload());
+            }
+        });
+
+        String transitUri = new StringBuilder(broker).append(mqttMessage.getTopic()).toString();
+        session.getProvenanceReporter().receive(messageFlowfile, transitUri);
+        session.transfer(messageFlowfile, REL_PARSE_FAILURE);
+        session.adjustCounter(COUNTER_PARSE_FAILURES, 1, false);
+    }
+
+    private void transferQueueRecord(final ProcessContext context, final ProcessSession session){
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        FlowFile flowFile = session.create();
+        session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker);
+
+        final Map<String, String> attributes = new HashMap<>();
+        final AtomicInteger recordCount = new AtomicInteger();
+
+        final List<MQTTQueueMessage> doneList = new ArrayList<MQTTQueueMessage>();
+
+        RecordSetWriter writer = null;
+        boolean isWriterInitialized = false;
+
+        try {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.take();
+                final byte[] recordBytes = mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload();

Review comment:
       `null` is not checked in the other two `transferQueue*` methods. Should it be checked there too or is it unnecessary here?

##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -334,14 +443,210 @@ public void process(final OutputStream out) throws IOException {
             if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
                 logger.warn(new StringBuilder("FlowFile ")
                         .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" for Mqtt message ")
+                        .append(" for MQTT message ")
                         .append(mqttMessage)
                         .append(" had already been removed from queue, possible duplication of flow files")
                         .toString());
             }
         }
     }
 
+    private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){
+        final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
+
+        FlowFile messageFlowfile = session.create();
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
+
+
+        messageFlowfile = session.append(messageFlowfile, out -> {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                out.write(mqttMessage.getPayload());
+                out.write(demarcator);
+                session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
+            }
+        });
+
+        session.getProvenanceReporter().receive(messageFlowfile, new StringBuilder(broker).append(topicPrefix).append(topicFilter).toString());
+        session.transfer(messageFlowfile, REL_MESSAGE);
+        session.commit();
+    }
+
+    private void transferFailure(final ProcessSession session, final MQTTQueueMessage mqttMessage) {
+        FlowFile messageFlowfile = session.create();
+
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(BROKER_ATTRIBUTE_KEY, broker);
+        attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
+        attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
+        attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
+        attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
+
+        messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
+
+        messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream out) throws IOException {
+                out.write(mqttMessage.getPayload());
+            }
+        });
+
+        String transitUri = new StringBuilder(broker).append(mqttMessage.getTopic()).toString();
+        session.getProvenanceReporter().receive(messageFlowfile, transitUri);
+        session.transfer(messageFlowfile, REL_PARSE_FAILURE);
+        session.adjustCounter(COUNTER_PARSE_FAILURES, 1, false);
+    }
+
+    private void transferQueueRecord(final ProcessContext context, final ProcessSession session){
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        FlowFile flowFile = session.create();
+        session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker);
+
+        final Map<String, String> attributes = new HashMap<>();
+        final AtomicInteger recordCount = new AtomicInteger();
+
+        final List<MQTTQueueMessage> doneList = new ArrayList<MQTTQueueMessage>();
+
+        RecordSetWriter writer = null;
+        boolean isWriterInitialized = false;
+
+        try {
+            while (!mqttQueue.isEmpty()) {
+                final MQTTQueueMessage mqttMessage = mqttQueue.take();

Review comment:
       Is there a specific reason why `take()` is used here, while `poll()` in the other two `transferQueue*` methods?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] markap14 commented on pull request #4738: NIFI-7890 - Added record support to ConsumeMQTT processor

Posted by GitBox <gi...@apache.org>.
markap14 commented on pull request #4738:
URL: https://github.com/apache/nifi/pull/4738#issuecomment-754049914


   Thanks for the improvement @pvillard31. I think this will give us MUCH better performance for any MQTT related use cases. @exceptionfactory had a few comments but otherwise I'm a +1.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pvillard31 commented on pull request #4738: NIFI-7890 - Added record support to ConsumeMQTT processor

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on pull request #4738:
URL: https://github.com/apache/nifi/pull/4738#issuecomment-755511518


   Squashed and rebased against main to fix the conflicts caused by the 2 others PRs related to this processor being merged today.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] exceptionfactory commented on pull request #4738: NIFI-7890 - Added record support to ConsumeMQTT processor

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on pull request #4738:
URL: https://github.com/apache/nifi/pull/4738#issuecomment-754329946


   @pvillard31 Thanks for the feedback and changes, looks good.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pvillard31 commented on a change in pull request #4738: NIFI-7890 - Added record support to ConsumeMQTT processor

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on a change in pull request #4738:
URL: https://github.com/apache/nifi/pull/4738#discussion_r551547883



##########
File path: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -124,6 +148,45 @@
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("The Record Reader to use for received messages")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("The Record Writer to use in order to serialize the data before writing it to a FlowFile")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor ADD_ATTRIBUTES_AS_FIELDS = new PropertyDescriptor.Builder()
+            .name("Add attributes as fields")
+            .description("If using the Records reader/writer and if setting this property to true, default fields "
+                    + "are going to be added in each record: _topic, _qos, _isDuplicate, _isRetained.")

Review comment:
       The broker is not something we can retrieve from the MqttMessage object, it'll be the same for each record and is already added as an attribute of the flow file. If this is required for a use case, one could add it to each record using UpdateRecord processor. I think it's better not to include it by default to keep the data we write on disk as minimal as possible. Thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org