You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2022/10/17 05:59:04 UTC
[nifi] branch main updated: NIFI-10644 Add Message Demarcator-style processing in PublishMQTT
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 298dd2024e NIFI-10644 Add Message Demarcator-style processing in PublishMQTT
298dd2024e is described below
commit 298dd2024e8aa43a744c937e19c32de127fb3a94
Author: Nandor Soma Abonyi <ab...@gmail.com>
AuthorDate: Fri Oct 14 00:40:02 2022 +0200
NIFI-10644 Add Message Demarcator-style processing in PublishMQTT
This closes #6530.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../apache/nifi/processors/mqtt/ConsumeMQTT.java | 31 +---
.../apache/nifi/processors/mqtt/PublishMQTT.java | 171 ++++++++++++++-----
.../mqtt/common/AbstractMQTTProcessor.java | 14 ++
.../nifi/processors/mqtt/TestConsumeMQTT.java | 12 ++
.../nifi/processors/mqtt/TestPublishMQTT.java | 188 +++++++++++++++++++--
5 files changed, 338 insertions(+), 78 deletions(-)
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
index c3c299905f..50fa2916e0 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
@@ -33,7 +33,6 @@ import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -172,6 +171,15 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
.description("The Record Writer to use for serializing Records before writing them to a FlowFile.")
.build();
+ public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(BASE_MESSAGE_DEMARCATOR)
+ .description("With this property, you have an option to output FlowFiles which contains multiple messages. "
+ + "This property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart "
+ + "multiple messages. This is an optional property ; if not provided, and if not defining a "
+ + "Record Reader/Writer, each message received will result in a single FlowFile. To enter special "
+ + "character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS.")
+ .build();
+
public static final PropertyDescriptor ADD_ATTRIBUTES_AS_FIELDS = new PropertyDescriptor.Builder()
.name("add-attributes-as-fields")
.displayName("Add attributes as fields")
@@ -184,19 +192,6 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
.dependsOn(RECORD_READER)
.build();
- public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
- .name("message-demarcator")
- .displayName("Message Demarcator")
- .required(false)
- .addValidator(Validator.VALID)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .description("With this property, you have an option to output FlowFiles which contains multiple messages. "
- + "This property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart "
- + "multiple messages. This is an optional property ; if not provided, and if not defining a "
- + "Reader/Writer, each message received will result in a single FlowFile which. To enter special "
- + "character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS.")
- .build();
-
private volatile int qos;
private volatile String topicPrefix = "";
private volatile String topicFilter;
@@ -296,13 +291,6 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
.build());
}
- final boolean readerIsSet = context.getProperty(RECORD_READER).isSet();
- final boolean demarcatorIsSet = context.getProperty(MESSAGE_DEMARCATOR).isSet();
- if (readerIsSet && demarcatorIsSet) {
- results.add(new ValidationResult.Builder().subject("Reader and Writer").valid(false)
- .explanation("message Demarcator and Record Reader/Writer cannot be used at the same time.").build());
- }
-
return results;
}
@@ -345,7 +333,6 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
}
}
-
@OnStopped
public void onStopped(final ProcessContext context) {
if (mqttQueue != null && !mqttQueue.isEmpty() && processSessionFactory != null) {
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
index ac0df25d95..ffb9633549 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
@@ -54,10 +54,14 @@ import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -109,6 +113,15 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
.description("The Record Writer to use for serializing Records before publishing them as an MQTT Message.")
.build();
+ public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(BASE_MESSAGE_DEMARCATOR)
+ .description("With this property, you have an option to publish multiple messages from a single FlowFile. "
+ + "This property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart "
+ + "the FlowFile content. This is an optional property ; if not provided, and if not defining a "
+ + "Record Reader/Writer, each FlowFile will be published as a single message. To enter special "
+ + "character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS.")
+ .build();
+
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles that are sent successfully to the destination are transferred to this relationship.")
@@ -132,6 +145,7 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
PROP_QOS,
RECORD_READER,
RECORD_WRITER,
+ MESSAGE_DEMARCATOR,
PROP_CONN_TIMEOUT,
PROP_KEEP_ALIVE_INTERVAL,
PROP_LAST_WILL_MESSAGE,
@@ -145,10 +159,6 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
REL_FAILURE
)));
- static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE = "Publish failed after %d successfully published records.";
- static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER = "Successfully finished publishing previously failed records. Total record count: %d";
- static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS = "Successfully published all records. Total record count: %d";
-
static final String ATTR_PUBLISH_FAILED_INDEX_SUFFIX = ".mqtt.publish.failed.index";
private String publishFailedIndexAttributeName;
@@ -205,62 +215,31 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
}
if (context.getProperty(RECORD_READER).isSet()) {
- processRecordSet(context, session, flowfile, topic);
+ processMultiMessageFlowFile(new ProcessRecordSetStrategy(), context, session, flowfile, topic);
+ } else if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) {
+ processMultiMessageFlowFile(new ProcessDemarcatedContentStrategy(), context, session, flowfile, topic);
} else {
processStandardFlowFile(context, session, flowfile, topic);
}
}
- private void processRecordSet(ProcessContext context, ProcessSession session, final FlowFile flowfile, String topic) {
+ private void processMultiMessageFlowFile(ProcessStrategy processStrategy, ProcessContext context, ProcessSession session, final FlowFile flowfile, String topic) {
final StopWatch stopWatch = new StopWatch(true);
final AtomicInteger processedRecords = new AtomicInteger();
try {
- final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
- final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-
final Long previousProcessFailedAt = ofNullable(flowfile.getAttribute(publishFailedIndexAttributeName)).map(Long::valueOf).orElse(null);
- session.read(flowfile, in -> {
- try (final RecordReader reader = readerFactory.createRecordReader(flowfile, in, logger)) {
- final RecordSet recordSet = reader.createRecordSet();
-
- final RecordSchema schema = writerFactory.getSchema(flowfile.getAttributes(), recordSet.getSchema());
-
- final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
-
- Record record;
- while ((record = recordSet.next()) != null) {
- if (previousProcessFailedAt != null && processedRecords.get() < previousProcessFailedAt) {
- processedRecords.getAndIncrement();
- continue;
- }
-
- baos.reset();
-
- try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos, flowfile)) {
- writer.write(record);
- writer.flush();
- }
-
- final byte[] messageContent = baos.toByteArray();
-
- publishMessage(context, flowfile, topic, messageContent);
- processedRecords.getAndIncrement();
- }
- } catch (SchemaNotFoundException | MalformedRecordException e) {
- throw new ProcessException("An error happened during creating components for serialization.", e);
- }
- });
+ session.read(flowfile, in -> processStrategy.process(context, flowfile, in, topic, processedRecords, previousProcessFailedAt));
FlowFile successFlowFile = flowfile;
String provenanceEventDetails;
if (previousProcessFailedAt != null) {
successFlowFile = session.removeAttribute(flowfile, publishFailedIndexAttributeName);
- provenanceEventDetails = String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER, processedRecords.get());
+ provenanceEventDetails = String.format(processStrategy.getRecoverTemplateMessage(), processedRecords.get());
} else {
- provenanceEventDetails = String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS, processedRecords.get());
+ provenanceEventDetails = String.format(processStrategy.getSuccessTemplateMessage(), processedRecords.get());
}
session.getProvenanceReporter().send(flowfile, clientProperties.getRawBrokerUris(), provenanceEventDetails, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
@@ -274,7 +253,7 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
session.getProvenanceReporter().send(
failedFlowFile,
clientProperties.getRawBrokerUris(),
- String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE, processedRecords.get()),
+ String.format(processStrategy.getFailureTemplateMessage(), processedRecords.get()),
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
}
@@ -336,4 +315,110 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
logger.trace("Received 'delivery complete' message from broker. Token: [{}]", token);
}
+ interface ProcessStrategy {
+ void process(ProcessContext context, FlowFile flowfile, InputStream in, String topic, AtomicInteger processedRecords, Long previousProcessFailedAt) throws IOException;
+ String getFailureTemplateMessage();
+ String getRecoverTemplateMessage();
+ String getSuccessTemplateMessage();
+ }
+
+ class ProcessRecordSetStrategy implements ProcessStrategy {
+
+ static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE = "Publish failed after %d successfully published records.";
+ static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER = "Successfully finished publishing previously failed records. Total record count: %d";
+ static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS = "Successfully published all records. Total record count: %d";
+
+ @Override
+ public void process(ProcessContext context, FlowFile flowfile, InputStream in, String topic, AtomicInteger processedRecords, Long previousProcessFailedAt) throws IOException {
+ final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+ try (final RecordReader reader = readerFactory.createRecordReader(flowfile, in, logger)) {
+ final RecordSet recordSet = reader.createRecordSet();
+
+ final RecordSchema schema = writerFactory.getSchema(flowfile.getAttributes(), recordSet.getSchema());
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+
+ Record record;
+ while ((record = recordSet.next()) != null) {
+ if (previousProcessFailedAt != null && processedRecords.get() < previousProcessFailedAt) {
+ processedRecords.getAndIncrement();
+ continue;
+ }
+
+ baos.reset();
+
+ try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos, flowfile)) {
+ writer.write(record);
+ writer.flush();
+ }
+
+ final byte[] messageContent = baos.toByteArray();
+
+ publishMessage(context, flowfile, topic, messageContent);
+ processedRecords.getAndIncrement();
+ }
+ } catch (SchemaNotFoundException | MalformedRecordException e) {
+ throw new ProcessException("An error happened during creating components for serialization.", e);
+ }
+ }
+
+ @Override
+ public String getFailureTemplateMessage() {
+ return PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
+ }
+
+ @Override
+ public String getRecoverTemplateMessage() {
+ return PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
+ }
+
+ @Override
+ public String getSuccessTemplateMessage() {
+ return PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
+ }
+ }
+
+ class ProcessDemarcatedContentStrategy implements ProcessStrategy {
+
+ static final String PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_FAILURE = "Publish failed after %d successfully published messages.";
+ static final String PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_RECOVER = "Successfully finished publishing previously failed messages. Total message count: %d";
+ static final String PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_SUCCESS = "Successfully published all messages. Total message count: %d";
+
+ @Override
+ public void process(ProcessContext context, FlowFile flowfile, InputStream in, String topic, AtomicInteger processedRecords, Long previousProcessFailedAt) {
+ final String demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue();
+
+ try (final Scanner scanner = new Scanner(in)) {
+ scanner.useDelimiter(demarcator);
+ while (scanner.hasNext()) {
+ final String messageContent = scanner.next();
+
+ if (previousProcessFailedAt != null && processedRecords.get() < previousProcessFailedAt) {
+ processedRecords.getAndIncrement();
+ continue;
+ }
+
+ publishMessage(context, flowfile, topic, messageContent.getBytes(StandardCharsets.UTF_8));
+ processedRecords.getAndIncrement();
+ }
+ }
+ }
+
+ @Override
+ public String getFailureTemplateMessage() {
+ return PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_FAILURE;
+ }
+
+ @Override
+ public String getRecoverTemplateMessage() {
+ return PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_RECOVER;
+ }
+
+ @Override
+ public String getSuccessTemplateMessage() {
+ return PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_SUCCESS;
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
index 1605499552..2cd2f6587d 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
@@ -240,6 +240,14 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
.required(false)
.build();
+ public static final PropertyDescriptor BASE_MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
+ .name("message-demarcator")
+ .displayName("Message Demarcator")
+ .required(false)
+ .addValidator(Validator.VALID)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
@Override
public Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(1);
@@ -291,6 +299,12 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
.explanation("both properties must be set when used.").build());
}
+ final boolean demarcatorIsSet = validationContext.getProperty(BASE_MESSAGE_DEMARCATOR).isSet();
+ if (readerIsSet && demarcatorIsSet) {
+ results.add(new ValidationResult.Builder().subject("Reader and Writer").valid(false)
+ .explanation("Message Demarcator and Record Reader/Writer cannot be used at the same time.").build());
+ }
+
return results;
}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
index 9a004af7e2..f8ecfa6e8e 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
@@ -156,6 +156,18 @@ public class TestConsumeMQTT {
testRunner.assertValid();
}
+ @Test
+ public void testRecordAndDemarcatorConfigurationTogetherIsInvalid() throws InitializationException {
+ mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+ testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+ testRunner.setProperty(ConsumeMQTT.MESSAGE_DEMARCATOR, "\n");
+
+ testRunner.assertNotValid();
+ }
+
@Test
public void testQoS2() throws Exception {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
index b9815339da..93d526bd2e 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
@@ -40,9 +40,12 @@ import java.util.Map;
import static java.util.Arrays.asList;
import static org.apache.nifi.processors.mqtt.PublishMQTT.ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
-import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
-import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
-import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.ProcessDemarcatedContentStrategy.PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_FAILURE;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.ProcessDemarcatedContentStrategy.PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_RECOVER;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.ProcessDemarcatedContentStrategy.PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_SUCCESS;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.ProcessRecordSetStrategy.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.ProcessRecordSetStrategy.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.ProcessRecordSetStrategy.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_FAILURE;
import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
@@ -70,6 +73,18 @@ public class TestPublishMQTT {
mqttTestClient = null;
}
+ @Test
+ public void testRecordAndDemarcatorConfigurationTogetherIsInvalid() throws InitializationException {
+ mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(PublishMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+ testRunner.setProperty(PublishMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+ testRunner.setProperty(PublishMQTT.MESSAGE_DEMARCATOR, "\n");
+
+ testRunner.assertNotValid();
+ }
+
@Test
public void testQoS0() {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
@@ -180,12 +195,12 @@ public class TestPublishMQTT {
}
@Test
- public void testPublishRecordSet() throws InitializationException {
+ public void testPublishRecords() throws InitializationException {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
testRunner = initializeTestRunner(mqttTestClient);
- testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
- testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+ testRunner.setProperty(PublishMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+ testRunner.setProperty(PublishMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
testRunner.assertValid();
@@ -213,15 +228,15 @@ public class TestPublishMQTT {
}
@Test
- public void testPublishRecordSetFailed() throws InitializationException {
+ public void testPublishRecordsFailed() throws InitializationException {
mqttTestClient = Mockito.spy(new MqttTestClient(MqttTestClient.ConnectType.Publisher));
Mockito.doCallRealMethod()
.doThrow(new RuntimeException("Second publish failed."))
.when(mqttTestClient).publish(any(), any());
testRunner = initializeTestRunner(mqttTestClient);
- testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
- testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+ testRunner.setProperty(PublishMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+ testRunner.setProperty(PublishMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
testRunner.assertValid();
@@ -254,8 +269,8 @@ public class TestPublishMQTT {
.when(mqttTestClient).publish(any(), any());
testRunner = initializeTestRunner(mqttTestClient);
- testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
- testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+ testRunner.setProperty(PublishMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+ testRunner.setProperty(PublishMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
testRunner.assertValid();
@@ -288,8 +303,8 @@ public class TestPublishMQTT {
Mockito.doCallRealMethod().when(mqttTestClient).publish(any(), any());
testRunner = initializeTestRunner(mqttTestClient);
- testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
- testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+ testRunner.setProperty(PublishMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+ testRunner.setProperty(PublishMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
testRunner.assertValid();
@@ -318,6 +333,149 @@ public class TestPublishMQTT {
publishFailedIndexAttributeName + " is expected to be removed after all remaining records have been published successfully.");
}
+ @Test
+ public void testPublishDemarcatedContent() {
+ mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ final String demarcator = "\n";
+
+ testRunner.setProperty(PublishMQTT.MESSAGE_DEMARCATOR, demarcator);
+ testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+ testRunner.assertValid();
+
+ final List<String> testInput = createMultipleInput();
+
+ testRunner.enqueue(String.join(demarcator, testInput).getBytes());
+
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+ assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_SUCCESS, 3));
+
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ verifyPublishedMessage(testInput.get(0).getBytes(), 2, false);
+ verifyPublishedMessage(testInput.get(1).getBytes(), 2, false);
+ verifyPublishedMessage(testInput.get(2).getBytes(), 2, false);
+ verifyNoMorePublished();
+
+ final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(REL_SUCCESS);
+ assertEquals(1, flowFiles.size());
+
+ final MockFlowFile successfulFlowFile = flowFiles.get(0);
+ final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+ assertFalse(successfulFlowFile.getAttributes().containsKey(publishFailedIndexAttributeName), "Failed attribute should not be present on the FlowFile");
+ }
+
+ @Test
+ public void testPublishDemarcatedContentFailed() {
+ mqttTestClient = Mockito.spy(new MqttTestClient(MqttTestClient.ConnectType.Publisher));
+ Mockito.doCallRealMethod()
+ .doThrow(new RuntimeException("Second publish failed."))
+ .when(mqttTestClient).publish(any(), any());
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ final String demarcator = "\n";
+
+ testRunner.setProperty(PublishMQTT.MESSAGE_DEMARCATOR, demarcator);
+ testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+ testRunner.assertValid();
+
+ final List<String> testInput = createMultipleInput();
+
+ testRunner.enqueue(String.join(demarcator, testInput).getBytes());
+
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
+ assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_FAILURE, 1));
+
+ verify(mqttTestClient, Mockito.times(2)).publish(any(), any());
+ verifyPublishedMessage(testInput.get(0).getBytes(), 2, false);
+ verifyNoMorePublished();
+
+ List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(REL_FAILURE);
+ assertEquals(1, flowFiles.size());
+
+ final MockFlowFile failedFlowFile = flowFiles.get(0);
+ final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+ assertEquals("1", failedFlowFile.getAttribute(publishFailedIndexAttributeName), "Only one record is expected to be published successfully.");
+ }
+
+ @Test
+ public void testContinuePublishDemarcatedContentAndFailAgainWhenPreviousPublishFailed() {
+ mqttTestClient = Mockito.spy(new MqttTestClient(MqttTestClient.ConnectType.Publisher));
+ Mockito.doCallRealMethod()
+ .doThrow(new RuntimeException("Second publish failed."))
+ .when(mqttTestClient).publish(any(), any());
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ final String demarcator = "\n";
+
+ testRunner.setProperty(PublishMQTT.MESSAGE_DEMARCATOR, demarcator);
+ testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+ testRunner.assertValid();
+
+ final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+ final List<String> testInput = createMultipleInput();
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(publishFailedIndexAttributeName, "1");
+ testRunner.enqueue(String.join(demarcator, testInput).getBytes(), attributes);
+
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
+ assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_FAILURE, 2));
+
+ verify(mqttTestClient, Mockito.times(2)).publish(any(), any());
+ verifyPublishedMessage(testInput.get(1).getBytes(), 2, false);
+ verifyNoMorePublished();
+
+ final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(REL_FAILURE);
+ assertEquals(1, flowFiles.size());
+
+ final MockFlowFile failedFlowFile = flowFiles.get(0);
+ assertEquals("2", failedFlowFile.getAttribute(publishFailedIndexAttributeName), "Only one record is expected to be published successfully.");
+ }
+
+ @Test
+ public void testContinuePublishDemarcatedContentSuccessfullyWhenPreviousPublishFailed() {
+ mqttTestClient = Mockito.spy(new MqttTestClient(MqttTestClient.ConnectType.Publisher));
+ Mockito.doCallRealMethod().when(mqttTestClient).publish(any(), any());
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ final String demarcator = "\n";
+
+ testRunner.setProperty(PublishMQTT.MESSAGE_DEMARCATOR, demarcator);
+ testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+ testRunner.assertValid();
+
+ final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+ final List<String> testInput = createMultipleInput();
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(publishFailedIndexAttributeName, "1");
+ testRunner.enqueue(String.join(demarcator, testInput).getBytes(), attributes);
+
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+ assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_RECOVER, 3));
+
+ verify(mqttTestClient, Mockito.times(2)).publish(any(), any());
+ verifyPublishedMessage(testInput.get(1).getBytes(), 2, false);
+ verifyPublishedMessage(testInput.get(2).getBytes(), 2, false);
+ verifyNoMorePublished();
+
+ final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(REL_SUCCESS);
+ assertEquals(1, flowFiles.size());
+
+ final MockFlowFile successfulFlowFile = flowFiles.get(0);
+ assertNull(successfulFlowFile.getAttribute(publishFailedIndexAttributeName),
+ publishFailedIndexAttributeName + " is expected to be removed after all remaining records have been published successfully.");
+ }
+
private void verifyPublishedMessage(byte[] payload, int qos, boolean retain) {
final Pair<String, StandardMqttMessage> lastPublished = mqttTestClient.getLastPublished();
final String lastPublishedTopic = lastPublished.getLeft();
@@ -367,6 +525,10 @@ public class TestPublishMQTT {
));
}
+ private static List<String> createMultipleInput() {
+ return Arrays.asList("message1", "message2", "message3");
+ }
+
private TestRunner initializeTestRunner(MqttTestClient mqttTestClient) {
final TestRunner testRunner = TestRunners.newTestRunner(new PublishMQTT() {
@Override