You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/09/07 16:51:00 UTC

[GitHub] [nifi] nandorsoma opened a new pull request, #6373: NIFI-10411 Add record processing feature to PublishMQTT processor

nandorsoma opened a new pull request, #6373:
URL: https://github.com/apache/nifi/pull/6373

   <!-- Licensed to the Apache Software Foundation (ASF) under one or more -->
   <!-- contributor license agreements.  See the NOTICE file distributed with -->
   <!-- this work for additional information regarding copyright ownership. -->
   <!-- The ASF licenses this file to You under the Apache License, Version 2.0 -->
   <!-- (the "License"); you may not use this file except in compliance with -->
   <!-- the License.  You may obtain a copy of the License at -->
   <!--     http://www.apache.org/licenses/LICENSE-2.0 -->
   <!-- Unless required by applicable law or agreed to in writing, software -->
   <!-- distributed under the License is distributed on an "AS IS" BASIS, -->
   <!-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -->
   <!-- See the License for the specific language governing permissions and -->
   <!-- limitations under the License. -->
   
   # Summary
   
   [NIFI-10411](https://issues.apache.org/jira/browse/NIFI-10411)
   
   This pr adds record processing feature to PublishMQTT processor. I also made a little bit of cleanup around the tests because the current structure is no longer useful (in the past multiple Test extended the Common one) and now it just makes the code difficult to understand. 
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [x] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created
   
   ### Pull Request Tracking
   
   - [x] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [x] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [x] Pull Request based on current revision of the `main` branch
   - [x] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request creation.
   
   ### Build
   
   - [x] Build completed using `mvn clean install -P contrib-check`
     - [x] JDK 8
     - [ ] JDK 11
     - [ ] JDK 17
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [ ] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on a diff in pull request #6373: NIFI-10411 Add record processing feature to PublishMQTT processor

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on code in PR #6373:
URL: https://github.com/apache/nifi/pull/6373#discussion_r966165839


##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java:
##########
@@ -232,6 +235,22 @@ private static String getSupportedSchemeList() {
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("The Record Reader to use for received messages")

Review Comment:
   Changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] asfgit closed pull request #6373: NIFI-10411 Add record processing feature to PublishMQTT processor

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #6373: NIFI-10411 Add record processing feature to PublishMQTT processor
URL: https://github.com/apache/nifi/pull/6373


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6373: NIFI-10411 Add record processing feature to PublishMQTT processor

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6373:
URL: https://github.com/apache/nifi/pull/6373#discussion_r965334984


##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java:
##########
@@ -232,6 +235,22 @@ private static String getSupportedSchemeList() {
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("The Record Reader to use for received messages")

Review Comment:
   The descriptions for the Record Reader/Writer were inherited from the Consume processor but they are not exactly applicable to the Publish processor because it does not receive messages and does not write FlowFiles but the opposite (reads FF and writes message).
   
   I would avoid changing it to a common but generic (and meaningless) message and would rather suggest the following:
   - do not specify description in the abstract class
   - redeclare the PropertyDescriptor in Consume and Publish using `new PropertyDescriptor.Builder().fromPropertyDescriptor()` and add the specific descriptions there



##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml:
##########
@@ -93,5 +93,11 @@
             <artifactId>nifi-schema-registry-service-api</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.glassfish</groupId>
+            <artifactId>javax.json</artifactId>
+            <version>1.0.4</version>
+            <scope>test</scope>
+        </dependency>

Review Comment:
   We more commonly use Jackson for Json processing in the NiFi code base. Would it be possible to use this library here too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6373: NIFI-10411 Add record processing feature to PublishMQTT processor

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6373:
URL: https://github.com/apache/nifi/pull/6373#discussion_r966731657


##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java:
##########
@@ -17,52 +17,368 @@
 
 package org.apache.nifi.processors.mqtt;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.nifi.processors.mqtt.common.MqttClient;
-import org.apache.nifi.processors.mqtt.common.MqttException;
 import org.apache.nifi.processors.mqtt.common.MqttTestClient;
 import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
-import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_FAILURE;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
+import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetReaderService;
+import static org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetWriterService;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
 
-public class TestPublishMQTT extends TestPublishMqttCommon {
+public class TestPublishMQTT {
 
-    @Override
-    public void verifyPublishedMessage(byte[] payload, int qos, boolean retain) {
-        StandardMqttMessage lastPublishedMessage = mqttTestClient.getLastPublishedMessage();
-        String lastPublishedTopic = mqttTestClient.getLastPublishedTopic();
+    private static final String BROKER_URI = "tcp://localhost:1883";
+    private static final String TOPIC = "testTopic";
+    private static final String RETAIN = "false";
+
+    private MqttTestClient mqttTestClient;
+    private TestRunner testRunner;
+
+    @AfterEach
+    public void cleanup() {
+        testRunner = null;
+        mqttTestClient = null;
+    }
+
+    @Test
+    public void testQoS0() {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "0");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        assertProvenanceEvent();
+
+        verifyPublishedMessage(testMessage.getBytes(), 0, false);
+    }
+
+    @Test
+    public void testQoS1() {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "1");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 1, false);
+    }
+
+    @Test
+    public void testQoS2NotCleanSession() {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        // Publisher executes synchronously so the only time whether its Clean or Not matters is when the processor stops in the middle of the publishing
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.setProperty(PublishMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, false);
+    }
+
+    @Test
+    public void testQoS2() {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, false);
+    }
+
+    @Test
+    public void testRetainQoS2() {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.setProperty(PublishMQTT.PROP_RETAIN, "true");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, true);
+    }
+
+    @Test
+    public void testPublishRecordSet() throws InitializationException {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.assertValid();
+
+        final ArrayNode testInput = createTestJsonInput();
+
+        testRunner.enqueue(testInput.toString().getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS, 3));
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testInput.get(0).toString().getBytes(), 2, false);
+        verifyPublishedMessage(testInput.get(1).toString().getBytes(), 2, false);
+        verifyPublishedMessage(testInput.get(2).toString().getBytes(), 2, false);
+        verifyNoMorePublished();
+
+        final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+
+        final MockFlowFile successfulFlowFile = flowFiles.get(0);
+        final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+        assertFalse(successfulFlowFile.getAttributes().containsKey(publishFailedIndexAttributeName), "Failed attribute should not be present on the FlowFile");
+    }
+
+    @Test
+    public void testPublishRecordSetFailed() throws InitializationException {
+        mqttTestClient = Mockito.spy(new MqttTestClient(MqttTestClient.ConnectType.Publisher));
+        Mockito.doCallRealMethod()
+                .doThrow(new RuntimeException("Second publish failed."))
+                .when(mqttTestClient).publish(any(), any());
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.assertValid();
+
+        final ArrayNode testInput = createTestJsonInput();
+
+        testRunner.enqueue(testInput.toString().getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
+        assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE, 1));
+
+        verify(mqttTestClient, Mockito.times(2)).publish(any(), any());
+        verifyPublishedMessage(testInput.get(0).toString().getBytes(), 2, false);
+        verifyNoMorePublished();
+
+        List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(REL_FAILURE);
+        assertEquals(1, flowFiles.size());
+
+        final MockFlowFile failedFlowFile = flowFiles.get(0);
+        final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+        assertEquals("1", failedFlowFile.getAttribute(publishFailedIndexAttributeName), "Only one record is expected to be published successfully.");
+    }
+
+    @Test
+    public void testContinuePublishRecordsAndFailAgainWhenPreviousPublishFailed() throws InitializationException {
+        mqttTestClient = Mockito.spy(new MqttTestClient(MqttTestClient.ConnectType.Publisher));
+        Mockito.doCallRealMethod()
+                .doThrow(new RuntimeException("Second publish failed."))
+                .when(mqttTestClient).publish(any(), any());
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.assertValid();
+
+        final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+        final ArrayNode testInput = createTestJsonInput();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(publishFailedIndexAttributeName, "1");
+        testRunner.enqueue(testInput.toString().getBytes(), attributes);
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
+        assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE, 2));
+
+        verify(mqttTestClient, Mockito.times(2)).publish(any(), any());
+        verifyPublishedMessage(testInput.get(1).toString().getBytes(), 2, false);
+        verifyNoMorePublished();
+
+        final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(REL_FAILURE);
+        assertEquals(1, flowFiles.size());
+
+        final MockFlowFile failedFlowFile = flowFiles.get(0);
+        assertEquals("2", failedFlowFile.getAttribute(publishFailedIndexAttributeName), "Only one record is expected to be published successfully.");
+    }
+
+    @Test
+    public void testContinuePublishRecordsSuccessfullyWhenPreviousPublishFailed() throws InitializationException {
+        mqttTestClient = Mockito.spy(new MqttTestClient(MqttTestClient.ConnectType.Publisher));
+        Mockito.doCallRealMethod().when(mqttTestClient).publish(any(), any());
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.assertValid();
+
+        final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+        final ArrayNode testInput = createTestJsonInput();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(publishFailedIndexAttributeName, "1");
+        testRunner.enqueue(testInput.toString().getBytes(), attributes);
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER, 3));
+
+        verify(mqttTestClient, Mockito.times(2)).publish(any(), any());
+        verifyPublishedMessage(testInput.get(1).toString().getBytes(), 2, false);
+        verifyPublishedMessage(testInput.get(2).toString().getBytes(), 2, false);
+        verifyNoMorePublished();
+
+        final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+
+        final MockFlowFile successfulFlowFile = flowFiles.get(0);
+        assertNull(successfulFlowFile.getAttribute(publishFailedIndexAttributeName),
+                publishFailedIndexAttributeName + " is expected to be removed after all remaining records have been published successfully.");
+    }
+
+    private void verifyPublishedMessage(byte[] payload, int qos, boolean retain) {
+        final Pair<String, StandardMqttMessage> lastPublished = mqttTestClient.getLastPublished();
+        final String lastPublishedTopic = lastPublished.getLeft();
+        final StandardMqttMessage lastPublishedMessage = lastPublished.getRight();
         assertEquals(Arrays.toString(payload), Arrays.toString(lastPublishedMessage.getPayload()));
         assertEquals(qos, lastPublishedMessage.getQos());
         assertEquals(retain, lastPublishedMessage.isRetained());
-        assertEquals(topic, lastPublishedTopic);
+        assertEquals(TOPIC, lastPublishedTopic);
     }
 
-    private MqttTestClient mqttTestClient;
+    private void verifyNoMorePublished() {
+        assertNull(mqttTestClient.getLastPublished(), "TestClient's queue should be empty.");
+    }
+
+    private ProvenanceEventRecord assertProvenanceEvent() {
+        final List<ProvenanceEventRecord> provenanceEvents = testRunner.getProvenanceEvents();
+        assertNotNull(provenanceEvents);
+        assertEquals(1, provenanceEvents.size());
 
-    public class UnitTestablePublishMqtt extends PublishMQTT {
+        final ProvenanceEventRecord event = provenanceEvents.get(0);
+        assertEquals(ProvenanceEventType.SEND, event.getEventType());
 
-        public UnitTestablePublishMqtt(){
-            super();
-        }
+        return event;
+    }
 
-        @Override
-        protected MqttClient createMqttClient() throws MqttException {
-            mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
-            return mqttTestClient;
-        }
+    private void assertProvenanceEvent(String expectedDetails) {
+        final ProvenanceEventRecord event = assertProvenanceEvent();
+        assertEquals(expectedDetails, event.getDetails());
     }
 
-    @BeforeEach
-    public void init() {
-        UnitTestablePublishMqtt proc = new UnitTestablePublishMqtt();
-        testRunner = TestRunners.newTestRunner(proc);
-        testRunner.setProperty(PublishMQTT.PROP_BROKER_URI, "tcp://localhost:1883");
-        testRunner.setProperty(PublishMQTT.PROP_RETAIN, "false");
-        topic = "testTopic";
-        testRunner.setProperty(PublishMQTT.PROP_TOPIC, topic);
+    private static ArrayNode createTestJsonInput() {
+        final ObjectMapper mapper = new ObjectMapper();

Review Comment:
   I think it is a best practice to initialize `ObjectMapper` once and store it in a `static` field because it seems to be a heavy-weight object but thread-safe.
   
   It can be modified in a future commit. Will go ahead with merging this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on a diff in pull request #6373: NIFI-10411 Add record processing feature to PublishMQTT processor

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on code in PR #6373:
URL: https://github.com/apache/nifi/pull/6373#discussion_r966166779


##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java:
##########
@@ -17,52 +17,373 @@
 
 package org.apache.nifi.processors.mqtt;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.nifi.processors.mqtt.common.MqttClient;
-import org.apache.nifi.processors.mqtt.common.MqttException;
 import org.apache.nifi.processors.mqtt.common.MqttTestClient;
 import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
-import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
+import javax.json.Json;
+import javax.json.JsonArray;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
+import static org.apache.nifi.processors.mqtt.PublishMQTT.ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_FAILURE;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
+import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetReaderService;
+import static org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetWriterService;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
 
-public class TestPublishMQTT extends TestPublishMqttCommon {
+public class TestPublishMQTT {
 
-    @Override
-    public void verifyPublishedMessage(byte[] payload, int qos, boolean retain) {
-        StandardMqttMessage lastPublishedMessage = mqttTestClient.getLastPublishedMessage();
-        String lastPublishedTopic = mqttTestClient.getLastPublishedTopic();
+    private static final String BROKER_URI = "tcp://localhost:1883";
+    private static final String TOPIC = "testTopic";
+    private static final String RETAIN = "false";
+
+    private TestRunner testRunner;
+    private MqttTestClient mqttTestClient;
+
+    @Test
+    public void testQoS0() {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "0");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        assertProvenanceEvent();
+
+        verifyPublishedMessage(testMessage.getBytes(), 0, false);
+    }
+
+    @Test
+    public void testQoS1() {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "1");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 1, false);
+    }
+
+    @Test
+    public void testQoS2NotCleanSession() {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        // Publisher executes synchronously so the only time whether its Clean or Not matters is when the processor stops in the middle of the publishing
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.setProperty(PublishMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, false);
+    }
+
+    @Test
+    public void testQoS2() {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, false);
+    }
+
+    @Test
+    public void testRetainQoS2() {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.setProperty(PublishMQTT.PROP_RETAIN, "true");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, true);
+    }
+
+    @Test
+    public void testPublishRecordSet() throws InitializationException {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.assertValid();
+
+        final JsonArray testInput = createTestJsonInput();
+
+        testRunner.enqueue(testInput.toString().getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS, 3));
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testInput.get(0).toString().getBytes(), 2, false);
+        verifyPublishedMessage(testInput.get(1).toString().getBytes(), 2, false);
+        verifyPublishedMessage(testInput.get(2).toString().getBytes(), 2, false);
+        assertNull(mqttTestClient.getLastPublished(), "TestClient's queue should be empty.");

Review Comment:
   Changed, thanks for the idea!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on a diff in pull request #6373: NIFI-10411 Add record processing feature to PublishMQTT processor

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on code in PR #6373:
URL: https://github.com/apache/nifi/pull/6373#discussion_r965767410


##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml:
##########
@@ -93,5 +93,11 @@
             <artifactId>nifi-schema-registry-service-api</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.glassfish</groupId>
+            <artifactId>javax.json</artifactId>
+            <version>1.0.4</version>
+            <scope>test</scope>
+        </dependency>

Review Comment:
   I picked this library because it is already used in NiFi. I wouldn't use it in production code, but it is more convenient for tests because it is easier to read the JSON I initialize. Nevertheless, it is possible to use Jackson instead of that one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on a diff in pull request #6373: NIFI-10411 Add record processing feature to PublishMQTT processor

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on code in PR #6373:
URL: https://github.com/apache/nifi/pull/6373#discussion_r966172583


##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java:
##########
@@ -17,52 +17,373 @@
 
 package org.apache.nifi.processors.mqtt;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.nifi.processors.mqtt.common.MqttClient;
-import org.apache.nifi.processors.mqtt.common.MqttException;
 import org.apache.nifi.processors.mqtt.common.MqttTestClient;
 import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
-import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
+import javax.json.Json;
+import javax.json.JsonArray;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
+import static org.apache.nifi.processors.mqtt.PublishMQTT.ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_FAILURE;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
+import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetReaderService;
+import static org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetWriterService;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
 
-public class TestPublishMQTT extends TestPublishMqttCommon {
+public class TestPublishMQTT {
 
-    @Override
-    public void verifyPublishedMessage(byte[] payload, int qos, boolean retain) {
-        StandardMqttMessage lastPublishedMessage = mqttTestClient.getLastPublishedMessage();
-        String lastPublishedTopic = mqttTestClient.getLastPublishedTopic();
+    private static final String BROKER_URI = "tcp://localhost:1883";
+    private static final String TOPIC = "testTopic";
+    private static final String RETAIN = "false";
+
+    private TestRunner testRunner;
+    private MqttTestClient mqttTestClient;
+
+    @Test
+    public void testQoS0() {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "0");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        assertProvenanceEvent();
+
+        verifyPublishedMessage(testMessage.getBytes(), 0, false);
+    }
+
+    @Test
+    public void testQoS1() {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "1");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 1, false);
+    }
+
+    @Test
+    public void testQoS2NotCleanSession() {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        // Publisher executes synchronously so the only time whether its Clean or Not matters is when the processor stops in the middle of the publishing
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.setProperty(PublishMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, false);
+    }
+
+    @Test
+    public void testQoS2() {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, false);
+    }
+
+    @Test
+    public void testRetainQoS2() {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.setProperty(PublishMQTT.PROP_RETAIN, "true");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, true);
+    }
+
+    @Test
+    public void testPublishRecordSet() throws InitializationException {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.assertValid();
+
+        final JsonArray testInput = createTestJsonInput();
+
+        testRunner.enqueue(testInput.toString().getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS, 3));
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testInput.get(0).toString().getBytes(), 2, false);
+        verifyPublishedMessage(testInput.get(1).toString().getBytes(), 2, false);
+        verifyPublishedMessage(testInput.get(2).toString().getBytes(), 2, false);
+        assertNull(mqttTestClient.getLastPublished(), "TestClient's queue should be empty.");
+
+        final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+
+        final MockFlowFile successfulFlowFile = flowFiles.get(0);
+        final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+        assertFalse(successfulFlowFile.getAttributes().containsKey(publishFailedIndexAttributeName), "Failed attribute should not be present on the FlowFile");
+
+        // clean runner by removing records reader/writer
+        testRunner.removeProperty(ConsumeMQTT.RECORD_READER);
+        testRunner.removeProperty(ConsumeMQTT.RECORD_WRITER);

Review Comment:
   Thanks for the idea! These cleanups are not needed at all, because every test creates a new runner. Nevertheless, I've added @AfterEach to properly cleanup (just for the sake of safety) and there were 2 runners that used local variables that I now changed.



##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java:
##########
@@ -17,105 +17,583 @@
 
 package org.apache.nifi.processors.mqtt;
 
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
 import org.apache.nifi.processors.mqtt.common.MqttClient;
 import org.apache.nifi.processors.mqtt.common.MqttTestClient;
 import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
 import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
-import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.security.util.SslContextFactory;
 import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
-import org.apache.nifi.security.util.TlsConfiguration;
 import org.apache.nifi.security.util.TlsException;
 import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import javax.net.ssl.SSLContext;
+import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetReaderService;
+import static org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetWriterService;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class TestConsumeMQTT extends TestConsumeMqttCommon {
-    private static TlsConfiguration tlsConfiguration;
+public class TestConsumeMQTT {
 
-    public MqttTestClient mqttTestClient;
+    private static final int PUBLISH_WAIT_MS = 0;
+    private static final String THIS_IS_NOT_JSON = "ThisIsNotAJSON";
+    private static final String BROKER_URI = "tcp://localhost:1883";
+    private static final String CLIENT_ID = "TestClient";
+    private static final String TOPIC_NAME = "testTopic";
+    private static final String INTERNAL_QUEUE_SIZE = "100";
 
-    public class UnitTestableConsumeMqtt extends ConsumeMQTT {
+    private static final String STRING_MESSAGE = "testMessage";
+    private static final String JSON_PAYLOAD = "{\"name\":\"Apache NiFi\"}";
 
-        public UnitTestableConsumeMqtt(){
-            super();
-        }
+    private static final int MOST_ONE = 0;
+    private static final int LEAST_ONE = 1;
+    private static final int EXACTLY_ONCE = 2;

Review Comment:
   Sure, changed!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on pull request #6373: NIFI-10411 Add record processing feature to PublishMQTT processor

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on PR #6373:
URL: https://github.com/apache/nifi/pull/6373#issuecomment-1240946332

   Thank you for your reviews @turcsanyip and @exceptionfactory! Please see my latest commit!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on a diff in pull request #6373: NIFI-10411 Add record processing feature to PublishMQTT processor

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on code in PR #6373:
URL: https://github.com/apache/nifi/pull/6373#discussion_r965341740


##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java:
##########
@@ -232,6 +235,22 @@ private static String getSupportedSchemeList() {
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("The Record Reader to use for received messages")

Review Comment:
   Thats a good idea, will do it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on a diff in pull request #6373: NIFI-10411 Add record processing feature to PublishMQTT processor

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on code in PR #6373:
URL: https://github.com/apache/nifi/pull/6373#discussion_r966166633


##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml:
##########
@@ -93,5 +93,11 @@
             <artifactId>nifi-schema-registry-service-api</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.glassfish</groupId>
+            <artifactId>javax.json</artifactId>
+            <version>1.0.4</version>
+            <scope>test</scope>
+        </dependency>

Review Comment:
   After all it doesn't look as bad with Jackson as I thought. I didn't remember that it is possible to chain `.put()`. Changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6373: NIFI-10411 Add record processing feature to PublishMQTT processor

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #6373:
URL: https://github.com/apache/nifi/pull/6373#discussion_r965871887


##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml:
##########
@@ -93,5 +93,11 @@
             <artifactId>nifi-schema-registry-service-api</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.glassfish</groupId>
+            <artifactId>javax.json</artifactId>
+            <version>1.0.4</version>
+            <scope>test</scope>
+        </dependency>

Review Comment:
   Although the `javax.json` library is used in a couple places, I concur with @turcsanyip that we should use Jackson as much as possible. Some third-party libraries depend on alternative JSON implementations, but from a direct dependency management and code maintenance perspective, it is better to use Jackson.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6373: NIFI-10411 Add record processing feature to PublishMQTT processor

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6373:
URL: https://github.com/apache/nifi/pull/6373#discussion_r965946668


##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java:
##########
@@ -17,52 +17,373 @@
 
 package org.apache.nifi.processors.mqtt;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.nifi.processors.mqtt.common.MqttClient;
-import org.apache.nifi.processors.mqtt.common.MqttException;
 import org.apache.nifi.processors.mqtt.common.MqttTestClient;
 import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
-import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
+import javax.json.Json;
+import javax.json.JsonArray;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
+import static org.apache.nifi.processors.mqtt.PublishMQTT.ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_FAILURE;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
+import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetReaderService;
+import static org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetWriterService;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
 
-public class TestPublishMQTT extends TestPublishMqttCommon {
+public class TestPublishMQTT {
 
-    @Override
-    public void verifyPublishedMessage(byte[] payload, int qos, boolean retain) {
-        StandardMqttMessage lastPublishedMessage = mqttTestClient.getLastPublishedMessage();
-        String lastPublishedTopic = mqttTestClient.getLastPublishedTopic();
+    private static final String BROKER_URI = "tcp://localhost:1883";
+    private static final String TOPIC = "testTopic";
+    private static final String RETAIN = "false";
+
+    private TestRunner testRunner;
+    private MqttTestClient mqttTestClient;
+
+    @Test
+    public void testQoS0() {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "0");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        assertProvenanceEvent();
+
+        verifyPublishedMessage(testMessage.getBytes(), 0, false);
+    }
+
+    @Test
+    public void testQoS1() {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "1");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 1, false);
+    }
+
+    @Test
+    public void testQoS2NotCleanSession() {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        // Publisher executes synchronously so the only time whether its Clean or Not matters is when the processor stops in the middle of the publishing
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.setProperty(PublishMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, false);
+    }
+
+    @Test
+    public void testQoS2() {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, false);
+    }
+
+    @Test
+    public void testRetainQoS2() {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.setProperty(PublishMQTT.PROP_RETAIN, "true");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, true);
+    }
+
+    @Test
+    public void testPublishRecordSet() throws InitializationException {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.assertValid();
+
+        final JsonArray testInput = createTestJsonInput();
+
+        testRunner.enqueue(testInput.toString().getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS, 3));
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testInput.get(0).toString().getBytes(), 2, false);
+        verifyPublishedMessage(testInput.get(1).toString().getBytes(), 2, false);
+        verifyPublishedMessage(testInput.get(2).toString().getBytes(), 2, false);
+        assertNull(mqttTestClient.getLastPublished(), "TestClient's queue should be empty.");

Review Comment:
   Minor: might make sense to extract this assertion to`verifyNoMorePublished()` (more descriptive and used in all record tests)



##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java:
##########
@@ -17,52 +17,373 @@
 
 package org.apache.nifi.processors.mqtt;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.nifi.processors.mqtt.common.MqttClient;
-import org.apache.nifi.processors.mqtt.common.MqttException;
 import org.apache.nifi.processors.mqtt.common.MqttTestClient;
 import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
-import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
+import javax.json.Json;
+import javax.json.JsonArray;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
+import static org.apache.nifi.processors.mqtt.PublishMQTT.ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_FAILURE;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
+import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetReaderService;
+import static org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetWriterService;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
 
-public class TestPublishMQTT extends TestPublishMqttCommon {
+public class TestPublishMQTT {
 
-    @Override
-    public void verifyPublishedMessage(byte[] payload, int qos, boolean retain) {
-        StandardMqttMessage lastPublishedMessage = mqttTestClient.getLastPublishedMessage();
-        String lastPublishedTopic = mqttTestClient.getLastPublishedTopic();
+    private static final String BROKER_URI = "tcp://localhost:1883";
+    private static final String TOPIC = "testTopic";
+    private static final String RETAIN = "false";
+
+    private TestRunner testRunner;
+    private MqttTestClient mqttTestClient;
+
+    @Test
+    public void testQoS0() {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "0");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        assertProvenanceEvent();
+
+        verifyPublishedMessage(testMessage.getBytes(), 0, false);
+    }
+
+    @Test
+    public void testQoS1() {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "1");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 1, false);
+    }
+
+    @Test
+    public void testQoS2NotCleanSession() {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        // Publisher executes synchronously so the only time whether its Clean or Not matters is when the processor stops in the middle of the publishing
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.setProperty(PublishMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, false);
+    }
+
+    @Test
+    public void testQoS2() {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, false);
+    }
+
+    @Test
+    public void testRetainQoS2() {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.setProperty(PublishMQTT.PROP_RETAIN, "true");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, true);
+    }
+
+    @Test
+    public void testPublishRecordSet() throws InitializationException {
+        mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.assertValid();
+
+        final JsonArray testInput = createTestJsonInput();
+
+        testRunner.enqueue(testInput.toString().getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS, 3));
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testInput.get(0).toString().getBytes(), 2, false);
+        verifyPublishedMessage(testInput.get(1).toString().getBytes(), 2, false);
+        verifyPublishedMessage(testInput.get(2).toString().getBytes(), 2, false);
+        assertNull(mqttTestClient.getLastPublished(), "TestClient's queue should be empty.");
+
+        final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+
+        final MockFlowFile successfulFlowFile = flowFiles.get(0);
+        final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+        assertFalse(successfulFlowFile.getAttributes().containsKey(publishFailedIndexAttributeName), "Failed attribute should not be present on the FlowFile");
+
+        // clean runner by removing records reader/writer
+        testRunner.removeProperty(ConsumeMQTT.RECORD_READER);
+        testRunner.removeProperty(ConsumeMQTT.RECORD_WRITER);

Review Comment:
   Minor: the `TestRunner` instance is used only in this test method (which has its own `TestPublishMQTT` instance with the `TestRunner` field at runtime, if I'm not wrong), so this clean-up is not necessary and can be omitted.



##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java:
##########
@@ -17,105 +17,583 @@
 
 package org.apache.nifi.processors.mqtt;
 
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
 import org.apache.nifi.processors.mqtt.common.MqttClient;
 import org.apache.nifi.processors.mqtt.common.MqttTestClient;
 import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
 import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
-import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.security.util.SslContextFactory;
 import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
-import org.apache.nifi.security.util.TlsConfiguration;
 import org.apache.nifi.security.util.TlsException;
 import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import javax.net.ssl.SSLContext;
+import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetReaderService;
+import static org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetWriterService;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class TestConsumeMQTT extends TestConsumeMqttCommon {
-    private static TlsConfiguration tlsConfiguration;
+public class TestConsumeMQTT {
 
-    public MqttTestClient mqttTestClient;
+    private static final int PUBLISH_WAIT_MS = 0;
+    private static final String THIS_IS_NOT_JSON = "ThisIsNotAJSON";
+    private static final String BROKER_URI = "tcp://localhost:1883";
+    private static final String CLIENT_ID = "TestClient";
+    private static final String TOPIC_NAME = "testTopic";
+    private static final String INTERNAL_QUEUE_SIZE = "100";
 
-    public class UnitTestableConsumeMqtt extends ConsumeMQTT {
+    private static final String STRING_MESSAGE = "testMessage";
+    private static final String JSON_PAYLOAD = "{\"name\":\"Apache NiFi\"}";
 
-        public UnitTestableConsumeMqtt(){
-            super();
-        }
+    private static final int MOST_ONE = 0;
+    private static final int LEAST_ONE = 1;
+    private static final int EXACTLY_ONCE = 2;

Review Comment:
   I know it comes from an old commit but could you please fix the names to match the proper terminology?
   ```suggestion
       private static final int AT_MOST_ONCE = 0;
       private static final int AT_LEAST_ONCE = 1;
       private static final int EXACTLY_ONCE = 2;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org