You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2022/10/17 05:59:04 UTC

[nifi] branch main updated: NIFI-10644 Add Message Demarcator-style processing in PublishMQTT

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 298dd2024e NIFI-10644 Add Message Demarcator-style processing in PublishMQTT
298dd2024e is described below

commit 298dd2024e8aa43a744c937e19c32de127fb3a94
Author: Nandor Soma Abonyi <ab...@gmail.com>
AuthorDate: Fri Oct 14 00:40:02 2022 +0200

    NIFI-10644 Add Message Demarcator-style processing in PublishMQTT
    
    This closes #6530.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../apache/nifi/processors/mqtt/ConsumeMQTT.java   |  31 +---
 .../apache/nifi/processors/mqtt/PublishMQTT.java   | 171 ++++++++++++++-----
 .../mqtt/common/AbstractMQTTProcessor.java         |  14 ++
 .../nifi/processors/mqtt/TestConsumeMQTT.java      |  12 ++
 .../nifi/processors/mqtt/TestPublishMQTT.java      | 188 +++++++++++++++++++--
 5 files changed, 338 insertions(+), 78 deletions(-)

diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
index c3c299905f..50fa2916e0 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
@@ -33,7 +33,6 @@ import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -172,6 +171,15 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
             .description("The Record Writer to use for serializing Records before writing them to a FlowFile.")
             .build();
 
+    public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BASE_MESSAGE_DEMARCATOR)
+            .description("With this property, you have an option to output FlowFiles which contains multiple messages. "
+                    + "This property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart "
+                    + "multiple messages. This is an optional property ; if not provided, and if not defining a "
+                    + "Record Reader/Writer, each message received will result in a single FlowFile. To enter special "
+                    + "character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS.")
+            .build();
+
     public static final PropertyDescriptor ADD_ATTRIBUTES_AS_FIELDS = new PropertyDescriptor.Builder()
             .name("add-attributes-as-fields")
             .displayName("Add attributes as fields")
@@ -184,19 +192,6 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
             .dependsOn(RECORD_READER)
             .build();
 
-    public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
-            .name("message-demarcator")
-            .displayName("Message Demarcator")
-            .required(false)
-            .addValidator(Validator.VALID)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .description("With this property, you have an option to output FlowFiles which contains multiple messages. "
-                    + "This property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart "
-                    + "multiple messages. This is an optional property ; if not provided, and if not defining a "
-                    + "Reader/Writer, each message received will result in a single FlowFile which. To enter special "
-                    + "character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS.")
-            .build();
-
     private volatile int qos;
     private volatile String topicPrefix = "";
     private volatile String topicFilter;
@@ -296,13 +291,6 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
                     .build());
         }
 
-        final boolean readerIsSet = context.getProperty(RECORD_READER).isSet();
-        final boolean demarcatorIsSet = context.getProperty(MESSAGE_DEMARCATOR).isSet();
-        if (readerIsSet && demarcatorIsSet) {
-            results.add(new ValidationResult.Builder().subject("Reader and Writer").valid(false)
-                    .explanation("message Demarcator and Record Reader/Writer cannot be used at the same time.").build());
-        }
-
         return results;
     }
 
@@ -345,7 +333,6 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
         }
     }
 
-
     @OnStopped
     public void onStopped(final ProcessContext context) {
         if (mqttQueue != null && !mqttQueue.isEmpty() && processSessionFactory != null) {
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
index ac0df25d95..ffb9633549 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
@@ -54,10 +54,14 @@ import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.StopWatch;
 
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Scanner;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -109,6 +113,15 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
             .description("The Record Writer to use for serializing Records before publishing them as an MQTT Message.")
             .build();
 
+    public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BASE_MESSAGE_DEMARCATOR)
+            .description("With this property, you have an option to publish multiple messages from a single FlowFile. "
+                    + "This property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart "
+                    + "the FlowFile content. This is an optional property ; if not provided, and if not defining a "
+                    + "Record Reader/Writer, each FlowFile will be published as a single message. To enter special "
+                    + "character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS.")
+            .build();
+
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("FlowFiles that are sent successfully to the destination are transferred to this relationship.")
@@ -132,6 +145,7 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
             PROP_QOS,
             RECORD_READER,
             RECORD_WRITER,
+            MESSAGE_DEMARCATOR,
             PROP_CONN_TIMEOUT,
             PROP_KEEP_ALIVE_INTERVAL,
             PROP_LAST_WILL_MESSAGE,
@@ -145,10 +159,6 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
             REL_FAILURE
     )));
 
-    static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE = "Publish failed after %d successfully published records.";
-    static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER = "Successfully finished publishing previously failed records. Total record count: %d";
-    static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS = "Successfully published all records. Total record count: %d";
-
     static final String ATTR_PUBLISH_FAILED_INDEX_SUFFIX = ".mqtt.publish.failed.index";
     private String publishFailedIndexAttributeName;
 
@@ -205,62 +215,31 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
         }
 
         if (context.getProperty(RECORD_READER).isSet()) {
-            processRecordSet(context, session, flowfile, topic);
+            processMultiMessageFlowFile(new ProcessRecordSetStrategy(), context, session, flowfile, topic);
+        } else if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) {
+            processMultiMessageFlowFile(new ProcessDemarcatedContentStrategy(), context, session, flowfile, topic);
         } else {
             processStandardFlowFile(context, session, flowfile, topic);
         }
     }
 
-    private void processRecordSet(ProcessContext context, ProcessSession session, final FlowFile flowfile, String topic) {
+    private void processMultiMessageFlowFile(ProcessStrategy processStrategy, ProcessContext context, ProcessSession session, final FlowFile flowfile, String topic) {
         final StopWatch stopWatch = new StopWatch(true);
         final AtomicInteger processedRecords = new AtomicInteger();
 
         try {
-            final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
-            final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-
             final Long previousProcessFailedAt = ofNullable(flowfile.getAttribute(publishFailedIndexAttributeName)).map(Long::valueOf).orElse(null);
 
-            session.read(flowfile, in -> {
-                try (final RecordReader reader = readerFactory.createRecordReader(flowfile, in, logger)) {
-                    final RecordSet recordSet = reader.createRecordSet();
-
-                    final RecordSchema schema = writerFactory.getSchema(flowfile.getAttributes(), recordSet.getSchema());
-
-                    final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
-
-                    Record record;
-                    while ((record = recordSet.next()) != null) {
-                        if (previousProcessFailedAt != null && processedRecords.get() < previousProcessFailedAt) {
-                            processedRecords.getAndIncrement();
-                            continue;
-                        }
-
-                        baos.reset();
-
-                        try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos, flowfile)) {
-                            writer.write(record);
-                            writer.flush();
-                        }
-
-                        final byte[] messageContent = baos.toByteArray();
-
-                        publishMessage(context, flowfile, topic, messageContent);
-                        processedRecords.getAndIncrement();
-                    }
-                } catch (SchemaNotFoundException | MalformedRecordException e) {
-                    throw new ProcessException("An error happened during creating components for serialization.", e);
-                }
-            });
+            session.read(flowfile, in -> processStrategy.process(context, flowfile, in, topic, processedRecords, previousProcessFailedAt));
 
             FlowFile successFlowFile = flowfile;
 
             String provenanceEventDetails;
             if (previousProcessFailedAt != null) {
                 successFlowFile = session.removeAttribute(flowfile, publishFailedIndexAttributeName);
-                provenanceEventDetails = String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER, processedRecords.get());
+                provenanceEventDetails = String.format(processStrategy.getRecoverTemplateMessage(), processedRecords.get());
             } else {
-                provenanceEventDetails = String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS, processedRecords.get());
+                provenanceEventDetails = String.format(processStrategy.getSuccessTemplateMessage(), processedRecords.get());
             }
 
             session.getProvenanceReporter().send(flowfile, clientProperties.getRawBrokerUris(), provenanceEventDetails, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
@@ -274,7 +253,7 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
                 session.getProvenanceReporter().send(
                         failedFlowFile,
                         clientProperties.getRawBrokerUris(),
-                        String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE, processedRecords.get()),
+                        String.format(processStrategy.getFailureTemplateMessage(), processedRecords.get()),
                         stopWatch.getElapsed(TimeUnit.MILLISECONDS));
             }
 
@@ -336,4 +315,110 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
         logger.trace("Received 'delivery complete' message from broker. Token: [{}]", token);
     }
 
+    interface ProcessStrategy {
+        void process(ProcessContext context, FlowFile flowfile, InputStream in, String topic, AtomicInteger processedRecords, Long previousProcessFailedAt) throws IOException;
+        String getFailureTemplateMessage();
+        String getRecoverTemplateMessage();
+        String getSuccessTemplateMessage();
+    }
+
+    class ProcessRecordSetStrategy implements ProcessStrategy {
+
+        static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE = "Publish failed after %d successfully published records.";
+        static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER = "Successfully finished publishing previously failed records. Total record count: %d";
+        static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS = "Successfully published all records. Total record count: %d";
+
+        @Override
+        public void process(ProcessContext context, FlowFile flowfile, InputStream in, String topic, AtomicInteger processedRecords, Long previousProcessFailedAt) throws IOException {
+            final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+            final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+            try (final RecordReader reader = readerFactory.createRecordReader(flowfile, in, logger)) {
+                final RecordSet recordSet = reader.createRecordSet();
+
+                final RecordSchema schema = writerFactory.getSchema(flowfile.getAttributes(), recordSet.getSchema());
+
+                final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+
+                Record record;
+                while ((record = recordSet.next()) != null) {
+                    if (previousProcessFailedAt != null && processedRecords.get() < previousProcessFailedAt) {
+                        processedRecords.getAndIncrement();
+                        continue;
+                    }
+
+                    baos.reset();
+
+                    try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos, flowfile)) {
+                        writer.write(record);
+                        writer.flush();
+                    }
+
+                    final byte[] messageContent = baos.toByteArray();
+
+                    publishMessage(context, flowfile, topic, messageContent);
+                    processedRecords.getAndIncrement();
+                }
+            } catch (SchemaNotFoundException | MalformedRecordException e) {
+                throw new ProcessException("An error happened during creating components for serialization.", e);
+            }
+        }
+
+        @Override
+        public String getFailureTemplateMessage() {
+            return PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
+        }
+
+        @Override
+        public String getRecoverTemplateMessage() {
+            return PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
+        }
+
+        @Override
+        public String getSuccessTemplateMessage() {
+            return PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
+        }
+    }
+
+    class ProcessDemarcatedContentStrategy implements ProcessStrategy {
+
+        static final String PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_FAILURE = "Publish failed after %d successfully published messages.";
+        static final String PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_RECOVER = "Successfully finished publishing previously failed messages. Total message count: %d";
+        static final String PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_SUCCESS = "Successfully published all messages. Total message count: %d";
+
+        @Override
+        public void process(ProcessContext context, FlowFile flowfile, InputStream in, String topic, AtomicInteger processedRecords, Long previousProcessFailedAt) {
+            final String demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue();
+
+            try (final Scanner scanner = new Scanner(in)) {
+                scanner.useDelimiter(demarcator);
+                while (scanner.hasNext()) {
+                    final String messageContent = scanner.next();
+
+                    if (previousProcessFailedAt != null && processedRecords.get() < previousProcessFailedAt) {
+                        processedRecords.getAndIncrement();
+                        continue;
+                    }
+
+                    publishMessage(context, flowfile, topic, messageContent.getBytes(StandardCharsets.UTF_8));
+                    processedRecords.getAndIncrement();
+                }
+            }
+        }
+
+        @Override
+        public String getFailureTemplateMessage() {
+            return PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_FAILURE;
+        }
+
+        @Override
+        public String getRecoverTemplateMessage() {
+            return PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_RECOVER;
+        }
+
+        @Override
+        public String getSuccessTemplateMessage() {
+            return PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_SUCCESS;
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
index 1605499552..2cd2f6587d 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
@@ -240,6 +240,14 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
             .required(false)
             .build();
 
+    public static final PropertyDescriptor BASE_MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
+            .name("message-demarcator")
+            .displayName("Message Demarcator")
+            .required(false)
+            .addValidator(Validator.VALID)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
     @Override
     public Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
         final List<ValidationResult> results = new ArrayList<>(1);
@@ -291,6 +299,12 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
                     .explanation("both properties must be set when used.").build());
         }
 
+        final boolean demarcatorIsSet = validationContext.getProperty(BASE_MESSAGE_DEMARCATOR).isSet();
+        if (readerIsSet && demarcatorIsSet) {
+            results.add(new ValidationResult.Builder().subject("Reader and Writer").valid(false)
+                    .explanation("Message Demarcator and Record Reader/Writer cannot be used at the same time.").build());
+        }
+
         return results;
     }
 
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
index 9a004af7e2..f8ecfa6e8e 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
@@ -156,6 +156,18 @@ public class TestConsumeMQTT {
         testRunner.assertValid();
     }
 
+    @Test
+    public void testRecordAndDemarcatorConfigurationTogetherIsInvalid() throws InitializationException {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+        testRunner.setProperty(ConsumeMQTT.MESSAGE_DEMARCATOR, "\n");
+
+        testRunner.assertNotValid();
+    }
+
     @Test
     public void testQoS2() throws Exception {
         mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
index b9815339da..93d526bd2e 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
@@ -40,9 +40,12 @@ import java.util.Map;
 
 import static java.util.Arrays.asList;
 import static org.apache.nifi.processors.mqtt.PublishMQTT.ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
-import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
-import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
-import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.ProcessDemarcatedContentStrategy.PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_FAILURE;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.ProcessDemarcatedContentStrategy.PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_RECOVER;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.ProcessDemarcatedContentStrategy.PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_SUCCESS;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.ProcessRecordSetStrategy.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.ProcessRecordSetStrategy.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.ProcessRecordSetStrategy.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
 import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_FAILURE;
 import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
 import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
@@ -70,6 +73,18 @@ public class TestPublishMQTT {
         mqttTestClient = null;
     }
 
+    @Test
+    public void testRecordAndDemarcatorConfigurationTogetherIsInvalid() throws InitializationException {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(PublishMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+        testRunner.setProperty(PublishMQTT.MESSAGE_DEMARCATOR, "\n");
+
+        testRunner.assertNotValid();
+    }
+
     @Test
     public void testQoS0() {
         mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
@@ -180,12 +195,12 @@ public class TestPublishMQTT {
     }
 
     @Test
-    public void testPublishRecordSet() throws InitializationException {
+    public void testPublishRecords() throws InitializationException {
         mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
         testRunner = initializeTestRunner(mqttTestClient);
 
-        testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
-        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+        testRunner.setProperty(PublishMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(PublishMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
         testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
         testRunner.assertValid();
 
@@ -213,15 +228,15 @@ public class TestPublishMQTT {
     }
 
     @Test
-    public void testPublishRecordSetFailed() throws InitializationException {
+    public void testPublishRecordsFailed() throws InitializationException {
         mqttTestClient = Mockito.spy(new MqttTestClient(MqttTestClient.ConnectType.Publisher));
         Mockito.doCallRealMethod()
                 .doThrow(new RuntimeException("Second publish failed."))
                 .when(mqttTestClient).publish(any(), any());
         testRunner = initializeTestRunner(mqttTestClient);
 
-        testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
-        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+        testRunner.setProperty(PublishMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(PublishMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
         testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
         testRunner.assertValid();
 
@@ -254,8 +269,8 @@ public class TestPublishMQTT {
                 .when(mqttTestClient).publish(any(), any());
         testRunner = initializeTestRunner(mqttTestClient);
 
-        testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
-        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+        testRunner.setProperty(PublishMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(PublishMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
         testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
         testRunner.assertValid();
 
@@ -288,8 +303,8 @@ public class TestPublishMQTT {
         Mockito.doCallRealMethod().when(mqttTestClient).publish(any(), any());
         testRunner = initializeTestRunner(mqttTestClient);
 
-        testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
-        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+        testRunner.setProperty(PublishMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(PublishMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
         testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
         testRunner.assertValid();
 
@@ -318,6 +333,149 @@ public class TestPublishMQTT {
                 publishFailedIndexAttributeName + " is expected to be removed after all remaining records have been published successfully.");
     }
 
+    @Test
+    public void testPublishDemarcatedContent() {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        final String demarcator = "\n";
+
+        testRunner.setProperty(PublishMQTT.MESSAGE_DEMARCATOR, demarcator);
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.assertValid();
+
+        final List<String> testInput = createMultipleInput();
+
+        testRunner.enqueue(String.join(demarcator, testInput).getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_SUCCESS, 3));
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testInput.get(0).getBytes(), 2, false);
+        verifyPublishedMessage(testInput.get(1).getBytes(), 2, false);
+        verifyPublishedMessage(testInput.get(2).getBytes(), 2, false);
+        verifyNoMorePublished();
+
+        final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+
+        final MockFlowFile successfulFlowFile = flowFiles.get(0);
+        final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+        assertFalse(successfulFlowFile.getAttributes().containsKey(publishFailedIndexAttributeName), "Failed attribute should not be present on the FlowFile");
+    }
+
+    @Test
+    public void testPublishDemarcatedContentFailed() {
+        mqttTestClient = Mockito.spy(new MqttTestClient(MqttTestClient.ConnectType.Publisher));
+        Mockito.doCallRealMethod()
+                .doThrow(new RuntimeException("Second publish failed."))
+                .when(mqttTestClient).publish(any(), any());
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        final String demarcator = "\n";
+
+        testRunner.setProperty(PublishMQTT.MESSAGE_DEMARCATOR, demarcator);
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.assertValid();
+
+        final List<String> testInput = createMultipleInput();
+
+        testRunner.enqueue(String.join(demarcator, testInput).getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
+        assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_FAILURE, 1));
+
+        verify(mqttTestClient, Mockito.times(2)).publish(any(), any());
+        verifyPublishedMessage(testInput.get(0).getBytes(), 2, false);
+        verifyNoMorePublished();
+
+        List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(REL_FAILURE);
+        assertEquals(1, flowFiles.size());
+
+        final MockFlowFile failedFlowFile = flowFiles.get(0);
+        final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+        assertEquals("1", failedFlowFile.getAttribute(publishFailedIndexAttributeName), "Only one record is expected to be published successfully.");
+    }
+
+    @Test
+    public void testContinuePublishDemarcatedContentAndFailAgainWhenPreviousPublishFailed() {
+        mqttTestClient = Mockito.spy(new MqttTestClient(MqttTestClient.ConnectType.Publisher));
+        Mockito.doCallRealMethod()
+                .doThrow(new RuntimeException("Second publish failed."))
+                .when(mqttTestClient).publish(any(), any());
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        final String demarcator = "\n";
+
+        testRunner.setProperty(PublishMQTT.MESSAGE_DEMARCATOR, demarcator);
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.assertValid();
+
+        final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+        final List<String> testInput = createMultipleInput();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(publishFailedIndexAttributeName, "1");
+        testRunner.enqueue(String.join(demarcator, testInput).getBytes(), attributes);
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
+        assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_FAILURE, 2));
+
+        verify(mqttTestClient, Mockito.times(2)).publish(any(), any());
+        verifyPublishedMessage(testInput.get(1).getBytes(), 2, false);
+        verifyNoMorePublished();
+
+        final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(REL_FAILURE);
+        assertEquals(1, flowFiles.size());
+
+        final MockFlowFile failedFlowFile = flowFiles.get(0);
+        assertEquals("2", failedFlowFile.getAttribute(publishFailedIndexAttributeName), "Only one record is expected to be published successfully.");
+    }
+
+    @Test
+    public void testContinuePublishDemarcatedContentSuccessfullyWhenPreviousPublishFailed() {
+        mqttTestClient = Mockito.spy(new MqttTestClient(MqttTestClient.ConnectType.Publisher));
+        Mockito.doCallRealMethod().when(mqttTestClient).publish(any(), any());
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        final String demarcator = "\n";
+
+        testRunner.setProperty(PublishMQTT.MESSAGE_DEMARCATOR, demarcator);
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.assertValid();
+
+        final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+        final List<String> testInput = createMultipleInput();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(publishFailedIndexAttributeName, "1");
+        testRunner.enqueue(String.join(demarcator, testInput).getBytes(), attributes);
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_RECOVER, 3));
+
+        verify(mqttTestClient, Mockito.times(2)).publish(any(), any());
+        verifyPublishedMessage(testInput.get(1).getBytes(), 2, false);
+        verifyPublishedMessage(testInput.get(2).getBytes(), 2, false);
+        verifyNoMorePublished();
+
+        final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+
+        final MockFlowFile successfulFlowFile = flowFiles.get(0);
+        assertNull(successfulFlowFile.getAttribute(publishFailedIndexAttributeName),
+                publishFailedIndexAttributeName + " is expected to be removed after all remaining records have been published successfully.");
+    }
+
     private void verifyPublishedMessage(byte[] payload, int qos, boolean retain) {
         final Pair<String, StandardMqttMessage> lastPublished = mqttTestClient.getLastPublished();
         final String lastPublishedTopic = lastPublished.getLeft();
@@ -367,6 +525,10 @@ public class TestPublishMQTT {
         ));
     }
 
+    private static List<String> createMultipleInput() {
+        return Arrays.asList("message1", "message2", "message3");
+    }
+
     private TestRunner initializeTestRunner(MqttTestClient mqttTestClient) {
         final TestRunner testRunner = TestRunners.newTestRunner(new PublishMQTT() {
             @Override