You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by markap14 <gi...@git.apache.org> on 2018/01/02 20:33:48 UTC

[GitHub] nifi pull request #2362: NIFI-4724: Support 0 byte message with PublishKafka

Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2362#discussion_r159308383
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java ---
    @@ -71,9 +72,18 @@ void publish(final FlowFile flowFile, final InputStream flowFileContent, final b
                 tracker = new InFlightMessageTracker();
             }
     
    -        try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
    +        try {
                 byte[] messageContent;
    -            try {
    +            if (demarcatorBytes == null || demarcatorBytes.length == 0) {
    --- End diff --
    
    @ijokarumawak in this case, we end up blindly buffering the entire contents of the FlowFile into memory. If a 1 GB FlowFile is sent in, for example, we will buffer a full 1 GB of data in heap. Currently, in this case, the StreamDemarcator would have buffered only 1 MB (by default) and then thrown a TokenTooLargeException, which would avoid exhausting the JVM heap. I think we need to also do the same here, checking flowFile.getSize() and if it's larger than the maxMessageSize throw an Exception instead of copying the content to a ByteArrayOutputStream.


---