You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "nandorsoma (via GitHub)" <gi...@apache.org> on 2023/06/07 11:34:22 UTC

[GitHub] [nifi] nandorsoma opened a new pull request, #7352: NIFI-11609 Support Request-Response pattern in MQTT5 Processors

nandorsoma opened a new pull request, #7352:
URL: https://github.com/apache/nifi/pull/7352

   <!-- Licensed to the Apache Software Foundation (ASF) under one or more -->
   <!-- contributor license agreements.  See the NOTICE file distributed with -->
   <!-- this work for additional information regarding copyright ownership. -->
   <!-- The ASF licenses this file to You under the Apache License, Version 2.0 -->
   <!-- (the "License"); you may not use this file except in compliance with -->
   <!-- the License.  You may obtain a copy of the License at -->
   <!--     http://www.apache.org/licenses/LICENSE-2.0 -->
   <!-- Unless required by applicable law or agreed to in writing, software -->
   <!-- distributed under the License is distributed on an "AS IS" BASIS, -->
   <!-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -->
   <!-- See the License for the specific language governing permissions and -->
   <!-- limitations under the License. -->
   
   # Summary
   
   [NIFI-11609](https://issues.apache.org/jira/browse/NIFI-11609)
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [x] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created
   
   ### Pull Request Tracking
   
   - [x] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [x] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [x] Pull Request based on current revision of the `main` branch
   - [x] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request creation.
   
   ### Build
   
   - [x] Build completed using `mvn clean install -P contrib-check`
     - [x] JDK 11
     - [ ] JDK 17
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [ ] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #7352: NIFI-11609 Support Request-Response pattern in MQTT5 Processors

Posted by "turcsanyip (via GitHub)" <gi...@apache.org>.
turcsanyip commented on code in PR #7352:
URL: https://github.com/apache/nifi/pull/7352#discussion_r1277712005


##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java:
##########
@@ -139,11 +202,15 @@ public class PublishMQTT extends AbstractMQTTProcessor {
             PROP_SESSION_EXPIRY_INTERVAL,
             PROP_CLIENTID,
             PROP_TOPIC,
+            PROP_TOPIC_IS_RECORD_PATH,
             PROP_RETAIN,
             PROP_QOS,
             RECORD_READER,
             RECORD_WRITER,
             MESSAGE_DEMARCATOR,
+            PROP_RESPONSE_TOPIC,
+            PROP_CORRELATION_DATA,
+            PROP_CORRELATION_DATA_IS_RECORD_PATH,

Review Comment:
   @nandorsoma Thanks for adding this new feature in the MQTT processors!
   
   The functionality looks good, however the configuration of the new properties is not really straightforward. E.g. when sending the response to a request, the response topic (contained in the original request) needs to be set in  `Topic`, not `Response Topic` (`Response Topic` is relevant for request messages, not responses). Also, the new properties may be confusing in the "normal" (non request/response) mode, so in most of the cases.
   
   That's why I would suggest to document the request/response scenarios in the additional details page.
   Also, introducing `Messaging Type` (or `Messaging Pattern`) property in PublishMQTT seems useful for me with the following allowable values:
   - **Standalone**: default/current logic (no `Correlation Data` or `Response Topic` displayed)
   - **Request**: sending a request message and expecting a response to it (`Correlation Data` and `Response Topic` displayed)
   - **Response**: sending a response message to a received request (`Correlation Data` is displayed, but `Response Topic` is not)
   
   The role/origin of `Correlation Data` is a bit different in the last 2 cases. For requests, it is the message sender's responsibility to generate a unique correlation data. For responses, the message sender has to echo the received correlation data back. It can be mentioned in the property description or the additional details page.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] NIFI-11609 Support Request-Response pattern in MQTT5 Processors [nifi]

Posted by "nandorsoma (via GitHub)" <gi...@apache.org>.
nandorsoma closed pull request #7352: NIFI-11609 Support Request-Response pattern in MQTT5 Processors
URL: https://github.com/apache/nifi/pull/7352


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] NIFI-11609 Support Request-Response pattern in MQTT5 Processors [nifi]

Posted by "nandorsoma (via GitHub)" <gi...@apache.org>.
nandorsoma commented on PR #7352:
URL: https://github.com/apache/nifi/pull/7352#issuecomment-1747763293

   Closing as the proposed solution with the extra fields is inconvenient. We need to find a generic way to specify raw value or record path in the same field.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on a diff in pull request #7352: NIFI-11609 Support Request-Response pattern in MQTT5 Processors

Posted by "nandorsoma (via GitHub)" <gi...@apache.org>.
nandorsoma commented on code in PR #7352:
URL: https://github.com/apache/nifi/pull/7352#discussion_r1221436090


##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java:
##########
@@ -328,21 +439,57 @@ public void process(ProcessContext context, FlowFile flowfile, InputStream in, S
 
                     baos.reset();
 
+                    String topic;
+                    if (topicRecordPath != null) {
+                        topic = extractRecordValue(record, topicRecordPath);
+                    } else {
+                        topic = context.getProperty(PROP_TOPIC).evaluateAttributeExpressions(flowfile).getValue();;
+                    }
+
+                    String correlationData;
+                    if (correlationDataRecordPath != null) {
+                        correlationData = extractRecordValue(record, correlationDataRecordPath);
+                    } else {
+                        correlationData = context.getProperty(PROP_CORRELATION_DATA).evaluateAttributeExpressions(flowfile).getValue();
+                    }
+
                     try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos, flowfile)) {
                         writer.write(record);
                         writer.flush();
                     }
 
                     final byte[] messageContent = baos.toByteArray();
 
-                    publishMessage(context, flowfile, topic, messageContent);
+                    publishMessage(context, flowfile, topic, correlationData, messageContent);
                     processedRecords.getAndIncrement();
                 }
             } catch (SchemaNotFoundException | MalformedRecordException e) {
                 throw new ProcessException("An error happened during creating components for serialization.", e);
             }
         }
 
+        private String extractRecordValue(Record record, RecordPath recordPath) {
+            final Optional<FieldValue> fv = recordPath.evaluate(record).getSelectedFields().findFirst();
+            if (fv.isPresent()) {

Review Comment:
   It is a bit misleading, because FieldValue is present even when there is no result with a given record path. This makes me wonder that is it a valid scenario when fv is empty.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org