You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "pvillard31 (via GitHub)" <gi...@apache.org> on 2023/11/29 12:55:11 UTC

[PR] NIFI-12371 - Support tombstone messages in non-record Kafka processors [nifi]

pvillard31 opened a new pull request, #8076:
URL: https://github.com/apache/nifi/pull/8076

   # Summary
   
   [NIFI-12371](https://issues.apache.org/jira/browse/NIFI-12371) - Support tombstone messages in non-record Kafka processors
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [ ] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created
   
   ### Pull Request Tracking
   
   - [ ] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [ ] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [ ] Pull Request based on current revision of the `main` branch
   - [ ] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request creation.
   
   ### Build
   
   - [ ] Build completed using `mvn clean install -P contrib-check`
     - [ ] JDK 21
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [ ] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] NIFI-12371 - Support tombstone messages in non-record Kafka processors [nifi]

Posted by "pvillard31 (via GitHub)" <gi...@apache.org>.
pvillard31 commented on PR #8076:
URL: https://github.com/apache/nifi/pull/8076#issuecomment-1856496547

   Thanks for the review @exceptionfactory - I believe I addressed your comments


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] NIFI-12371 - Support tombstone messages in non-record Kafka processors [nifi]

Posted by "exceptionfactory (via GitHub)" <gi...@apache.org>.
exceptionfactory closed pull request #8076: NIFI-12371 - Support tombstone messages in non-record Kafka processors
URL: https://github.com/apache/nifi/pull/8076


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] NIFI-12371 - Support tombstone messages in non-record Kafka processors [nifi]

Posted by "exceptionfactory (via GitHub)" <gi...@apache.org>.
exceptionfactory commented on code in PR #8076:
URL: https://github.com/apache/nifi/pull/8076#discussion_r1415789879


##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java:
##########
@@ -159,9 +160,15 @@ void publish(final FlowFile flowFile, final InputStream flowFileContent, final b
                     tracker.fail(flowFile, new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + maxMessageSize + " bytes."));
                     return;
                 }
-                // Send FlowFile content as it is, to support sending 0 byte message.
-                messageContent = new byte[(int) flowFile.getSize()];
-                StreamUtils.fillBuffer(flowFileContent, messageContent);
+                if(flowFile.getAttribute(KafkaFlowFileAttribute.KAFKA_IS_TOMBSTONE) != null
+                        && flowFile.getAttribute(KafkaFlowFileAttribute.KAFKA_IS_TOMBSTONE).equals("true")

Review Comment:
   Can this `getAttribute()` call return `null`? It looks like it would be safer to reverse the comparison as follows:
   ```suggestion
                           && Boolean.TRUE.toString().equals(flowFile.getAttribute(KafkaFlowFileAttribute.KAFKA_IS_TOMBSTONE))
   ```



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java:
##########
@@ -159,9 +160,15 @@ void publish(final FlowFile flowFile, final InputStream flowFileContent, final b
                     tracker.fail(flowFile, new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + maxMessageSize + " bytes."));
                     return;
                 }
-                // Send FlowFile content as it is, to support sending 0 byte message.
-                messageContent = new byte[(int) flowFile.getSize()];
-                StreamUtils.fillBuffer(flowFileContent, messageContent);
+                if(flowFile.getAttribute(KafkaFlowFileAttribute.KAFKA_IS_TOMBSTONE) != null

Review Comment:
   Spacing:
   ```suggestion
                   if (flowFile.getAttribute(KafkaFlowFileAttribute.KAFKA_IS_TOMBSTONE) != null
   ```



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java:
##########
@@ -87,6 +88,8 @@
         + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
         + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ",
         expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT)
+@ReadsAttribute(attribute = KafkaFlowFileAttribute.KAFKA_IS_TOMBSTONE, description = "If this attribute is set to 'true', if the processor is not configured "
+        + "with a demarcator and if the FlowFile's content is null, then a tombtsone message will be sent to Kafka.")

Review Comment:
   Recommend noting that tombstone means zero bytes:
   ```suggestion
           + "with a demarcator and if the FlowFile's content is null, then a tombstone message with zero bytes will be sent to Kafka.")
   ```



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/attribute/KafkaFlowFileAttribute.java:
##########
@@ -39,4 +39,6 @@ public interface KafkaFlowFileAttribute {
     String KAFKA_CONSUMER_GROUP_ID = "kafka.consumer.id";
 
     String KAFKA_CONSUMER_OFFSETS_COMMITTED = "kafka.consumer.offsets.committed";
+
+    String KAFKA_IS_TOMBSTONE = "kafka.isTombstone";

Review Comment:
   What do you think about naming this `kafka.tombstone`, dropping the `is` prefix?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org