You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <ch...@hpe.com> on 2016/08/20 19:25:52 UTC

Max Kafka message size

Hi folks,

From experimentation and looking at the code it seems that the max message size that can be sent via the PublishKafka and PutKafka processors in 0.7.0 is 1MB.  Can someone please confirm my read on this?

Thanks,

Chris McDermott

Remote Business Analytics
STaTS/StoreFront Remote
HPE Storage
Hewlett Packard Enterprise
Mobile: +1 978-697-5315

[cid:image001.png@01D1FAF7.22EDC910]

Re: Max Kafka message size

Posted by Joe Witt <jo...@gmail.com>.
Rgr that. Thanks chris

On Aug 20, 2016 7:04 PM, "McDermott, Chris Kevin (MSDU -
STaTS/StorefrontRemote)" <ch...@hpe.com> wrote:

> Jira is https://issues.apache.org/jira/browse/NIFI-2614.
>
> Thanks,
>
> Chris McDermott
>
> Remote Business Analytics
> STaTS/StoreFront Remote
> HPE Storage
> Hewlett Packard Enterprise
> Mobile: +1 978-697-5315
>
>
>
> On 8/20/16, 6:57 PM, "McDermott, Chris Kevin (MSDU -
> STaTS/StorefrontRemote)" <ch...@hpe.com> wrote:
>
>     I’ll raise a JIRA, Joe.
>
>     Thanks,
>
>     Chris McDermott
>
>     Remote Business Analytics
>     STaTS/StoreFront Remote
>     HPE Storage
>     Hewlett Packard Enterprise
>     Mobile: +1 978-697-5315
>
>
>
>     On 8/20/16, 6:52 PM, "Joe Witt" <jo...@gmail.com> wrote:
>
>         If no jira is raised sooner I'll raise one and get it sorted.
>
>         On Aug 20, 2016 6:40 PM, "Andrew Psaltis" <
> psaltis.andrew@gmail.com> wrote:
>
>         > Hi Chris,
>         > Sorry for not catching that code path. I am not sure if it is
> actually a
>         > regression as I took a look at the 1.0.0-BETA code and it
> matches the
>         > 0.7.0, specifically this comment block:
>         >
>         > /*
>         >  * We're using the default value from Kafka. We are using it to
> control the
>         >  * message size before it goes to to Kafka thus limiting
> possibility of a
>         >  * late failures in Kafka client.
>         >  */
>         >
>         > found at[1] leads me to believe it was intentional and not a
> regression.
>         > Looking at the 0.6.1 release code it appears that PutKafka used
> a default
>         > of 5 MB [2].
>         >
>         > I can speculate on the reasoning behind it, however, I will
> refrain from
>         > opining on it as I was not involved in any of the conversations
> related to
>         > the change and enforcement of the 1 MB max.
>         >
>         > [1]
>         > https://github.com/apache/nifi/blob/rel/nifi-1.0.0-BETA-
>         > official/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-
>         > processors/src/main/java/org/apache/nifi/processors/kafka/
>         > PublishingContext.java#L36-L41
>         > [2]
>         > https://github.com/apache/nifi/blob/rel/nifi-0.6.1/nifi-
>         > nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
>         > main/java/org/apache/nifi/processors/kafka/PutKafka.
> java#L169-L176
>         >
>         > Thanks,
>         > Andrew
>         >
>         > On Sat, Aug 20, 2016 at 6:09 PM, McDermott, Chris Kevin (MSDU -
>         > STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
>         >
>         > > Thanks, Andrew.
>         > >
>         > > I’ve set all of the right broker configs to allow larger
> messages.
>         > > Believe me I spent a lot of time banging my head against the
> wall
>         > thinking
>         > > that the broker and topic configs were wrong.
>         > >
>         > > PublisingKafka uses PublishingContext.  That class has bean
> property
>         > > called maxRequestSize, which defaults to 1048576.  As far as I
> can tell
>         > the
>         > > setMaxRequestSize() method is never called (except by some
> test code.)
>         > > KafkaPublisher.publish() calls Max Record
> Size.getMaxRequestSize() and
>         > > passes the result to the constructor for StreamDemarcator.
>  The publish
>         > > method then calls the StreamDemarcator. getNextToken(), which
> in turns
>         > > calls StreamDemarcator.fill() which compares the stream
> position against
>         > > the maxRequestSize and throws the exception with this line.
>         > >
>         > > throw new IllegalStateException("Maximum allowed data size of
> " +
>         > > this.maxDataSize + " exceeded.");
>         > >
>         > > Which matches what I see in the nifi-app.log file…
>         > >
>         > > 2016-08-20 22:03:05,470 ERROR [Timer-Driven Process Thread-8]
>         > > o.apache.nifi.processors.kafka.PutKafka
>         > > java.lang.IllegalStateException: Maximum allowed data size of
> 1048576
>         > > exceeded.
>         > >         at org.apache.nifi.stream.io.
> util.StreamDemarcator.fill(
>         > StreamDemarcator.java:153)
>         > > ~[nifi-utils-0.7.0.jar:0.7.0]
>         > >         at org.apache.nifi.stream.io.util.StreamDemarcator.
>         > > nextToken(StreamDemarcator.java:105)
> ~[nifi-utils-0.7.0.jar:0.7.0]
>         > >         at org.apache.nifi.processors.
> kafka.KafkaPublisher.publish(
>         > KafkaPublisher.java:129)
>         > > ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
>         > >         at org.apache.nifi.processors.
> kafka.PutKafka$1.process(
>         > PutKafka.java:315)
>         > > ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
>         > >         at org.apache.nifi.controller.repository.
>         > > StandardProcessSession.read(StandardProcessSession.java:1851)
>         > > ~[nifi-framework-core-0.7.0.jar:0.7.0]
>         > >         at org.apache.nifi.controller.repository.
>         > > StandardProcessSession.read(StandardProcessSession.java:1822)
>         > > ~[nifi-framework-core-0.7.0.jar:0.7.0]
>         > >         at org.apache.nifi.processors.kafka.PutKafka.
>         > > doRendezvousWithKafka(PutKafka.java:311)
> ~[nifi-kafka-processors-0.7.0.
>         > > jar:0.7.0]
>         > >         at org.apache.nifi.processors.kafka.PutKafka.
>         > > rendezvousWithKafka(PutKafka.java:287)
> ~[nifi-kafka-processors-0.7.0.
>         > > jar:0.7.0]
>         > >         at org.apache.nifi.processors.
> kafka.AbstractKafkaProcessor.
>         > > onTrigger(AbstractKafkaProcessor.java:76)
> ~[nifi-kafka-processors-0.7.0.
>         > > jar:0.7.0]
>         > >         at org.apache.nifi.controller.StandardProcessorNode.
> onTrigger(
>         > > StandardProcessorNode.java:1054) [nifi-framework-core-0.7.0.
> jar:0.7.0]
>         > >         at org.apache.nifi.controller.tasks.
> ContinuallyRunProcessorTask.
>         > > call(ContinuallyRunProcessorTask.java:136)
> [nifi-framework-core-0.7.0.
>         > > jar:0.7.0]
>         > >         at org.apache.nifi.controller.tasks.
> ContinuallyRunProcessorTask.
>         > > call(ContinuallyRunProcessorTask.java:47)
> [nifi-framework-core-0.7.0.
>         > > jar:0.7.0]
>         > >         at org.apache.nifi.controller.scheduling.
>         > > TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.
> java:127)
>         > > [nifi-framework-core-0.7.0.jar:0.7.0]
>         > >         at java.util.concurrent.Executors$RunnableAdapter.
>         > call(Executors.java:511)
>         > > [na:1.8.0_45]
>         > >         at java.util.concurrent.FutureTask.runAndReset(
>         > FutureTask.java:308)
>         > > [na:1.8.0_45]
>         > >         at java.util.concurrent.ScheduledThreadPoolExecutor$
>         > > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.
> java:180)
>         > > [na:1.8.0_45]
>         > >         at java.util.concurrent.ScheduledThreadPoolExecutor$
>         > > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>         > > [na:1.8.0_45]
>         > >         at java.util.concurrent.ThreadPoolExecutor.runWorker(
>         > ThreadPoolExecutor.java:1142)
>         > > [na:1.8.0_45]
>         > >         at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>         > ThreadPoolExecutor.java:617)
>         > > [na:1.8.0_45]
>         > >         at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
>         > >
>         > > This occurs using PublishKafka, and PutKafka.  Setting the Max
> Record
>         > Size
>         > > property in the PutKafka processor has no affect on this.
> Note the stack
>         > > trace above is from the PutKafka processor with Max Record
> Size set to
>         > 10MB.
>         > >
>         > > I believe that this a regression from 0.6.0.
>         > >
>         > > Chris McDermott
>         > >
>         > > Remote Business Analytics
>         > > STaTS/StoreFront Remote
>         > > HPE Storage
>         > > Hewlett Packard Enterprise
>         > > Mobile: +1 978-697-5315
>         > >
>         > >
>         > >
>         > > On 8/20/16, 3:48 PM, "Andrew Psaltis" <
> psaltis.andrew@gmail.com> wrote:
>         > >
>         > >     Hi Chris,
>         > >     Regarding the PutKafka processor looking at this block[1]
> of the
>         > > PutKafka
>         > >     code, it has a default size of 1 MB, but it does not
> restrict the
>         > > size. The
>         > >     DATA_SIZE_VALIDATOR does a sanity check and also enforces
> that
>         > >     the supported value entered is the correct format <value>
> [B|
>         > > KB|MB|GB|TB].
>         > >     Later on in the code at this block[2], the value is set on
> the Kafka
>         > >     config, again this does not enforce a value maximum.
>         > >
>         > >     In regards to the PublishKafka processor I do not see
> where it
>         > accepts
>         > > a
>         > >     size nor restrict the size at all.
>         > >
>         > >     Have you adjusted the 'message.max.bytes' config value for
> your
>         > > broker(s)?
>         > >     The default value for that is 1 MB [3] (The url references
> the 0.8
>         > > Kafka,
>         > >     however I believe this default has been stable since the
> early days
>         > of
>         > > the
>         > >     project.)
>         > >
>         > >     If you really do want to send messages that are larger
> than 1 MB in
>         > > size, I
>         > >     would highly recommending reading this post[4] from Gwen
> Shapira.  It
>         > > does
>         > >     a great job of outlining the things you need to take into
>         > > consideration.
>         > >     This will also point you to the relevant configs in Kafka
> that will
>         > > need to
>         > >     be adjusted if you decide to go this route.
>         > >
>         > >
>         > >     Thanks,
>         > >     Andrew
>         > >
>         > >     [1]
>         > >     https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
>         > > nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
>         > > main/java/org/apache/nifi/processors/kafka/PutKafka.
> java#L174-L180
>         > >     [2]
>         > >     https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
>         > > nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
>         > > main/java/org/apache/nifi/processors/kafka/PutKafka.java#L495
>         > >     [3] https://kafka.apache.org/08/configuration.html
>         > >     [4] http://ingest.tips/2015/01/21/
> handling-large-messages-kafka/
>         > >
>         > >     On Sat, Aug 20, 2016 at 3:25 PM, McDermott, Chris Kevin
> (MSDU -
>         > >     STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
>         > >
>         > >     > Hi folks,
>         > >     >
>         > >     >
>         > >     >
>         > >     > From experimentation and looking at the code it seems
> that the max
>         > > message
>         > >     > size that can be sent via the PublishKafka and PutKafka
> processors
>         > > in 0.7.0
>         > >     > is 1MB.  Can someone please confirm my read on this?
>         > >     >
>         > >     >
>         > >     >
>         > >     > Thanks,
>         > >     >
>         > >     >
>         > >     >
>         > >     > Chris McDermott
>         > >     >
>         > >     >
>         > >     >
>         > >     > Remote Business Analytics
>         > >     >
>         > >     > STaTS/StoreFront Remote
>         > >     >
>         > >     > HPE Storage
>         > >     >
>         > >     > Hewlett Packard Enterprise
>         > >     >
>         > >     > Mobile: +1 978-697-5315
>         > >     >
>         > >     >
>         > >     >
>         > >     >
>         > >
>         > >
>         > >     --
>         > >     Thanks,
>         > >     Andrew
>         > >
>         > >     Subscribe to my book: Streaming Data <
> http://manning.com/psaltis>
>         > >     <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
>         > >     twiiter: @itmdata <http://twitter.com/intent/
>         > user?screen_name=itmdata>
>         > >
>         > >
>         > >
>         >
>         >
>         > --
>         > Thanks,
>         > Andrew
>         >
>         > Subscribe to my book: Streaming Data <http://manning.com/psaltis
> >
>         > <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
>         > twiiter: @itmdata <http://twitter.com/intent/
> user?screen_name=itmdata>
>         >
>
>
>
>
>

Re: Max Kafka message size

Posted by Joe Witt <jo...@gmail.com>.
Exactly what you describe based off topic metadata is what we should do.
Am looking into this with another issue at moment.

On Aug 21, 2016 6:15 PM, "McDermott, Chris Kevin (MSDU -
STaTS/StorefrontRemote)" <ch...@hpe.com> wrote:

> Thanks for getting back, Oleg. I’d be happy to send demarcated messages;
> the problem is reassembling them at the other end of the pipe.  I’ve done a
> lot of searching for techniques to do this.  They all seem to have major
> draw backs in terms of reliable message delivery or in terms of garbage
> collection (not in the sense of Java GC, but cleanup of files bounced off
> of a shared file system.) The nice thing about Kafka is its atomic, it has
> replicated delivery, and guaranteed GC semantics.   My use case has fairly
> low throughput requirements (thousands, not millions of TPM) where most
> messages are fairly small but a few are larger.
>
> It would be nice if the Kafka client could learn the max message size from
> Kafka itself by querying the max.message.bytes on the topic, rather than
> have the flow designer be required to set it on the producer Processors.
> For now, though I’d be happy going back to the old behavior where its set
> on the producer Processors.  On the flip side I am also concerned that the
> clients (GetKafka and ConsumeKafka) do not expose a max message parameter.
> That will be equally problematic.
>
> Anyway, enough of my blathering.
>
> Yours and the communities help is greatly appreciated.
>
> Thanks,
>
> Chris McDermott
>
> Remote Business Analytics
> STaTS/StoreFront Remote
> HPE Storage
> Hewlett Packard Enterprise
> Mobile: +1 978-697-5315
>
>
>
> On 8/21/16, 3:43 PM, "Oleg Zhurakousky" <oz...@hortonworks.com>
> wrote:
>
>     Chris
>
>     Sorry for getting late to this
>
>     This indeed was intentional, primarily to follow Kafka’s best
> practices where Kafka was not designed to be a general data transfer system
> of arbitrary size but rather “manageable” size. Also, as you know we have
> ability to demarcate messages essentially allowing you to send a very large
> FlowFile that will be parsed into chunks where each chunk will end up to be
> a separate Kafka message.
>     That said, we did consider that at some point we may expose it as
> configuration property and it seems like the time has come.
>
>     Cheers
>     Oleg
>
>     > On Aug 20, 2016, at 7:03 PM, McDermott, Chris Kevin (MSDU -
> STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
>     >
>     > Jira is https://issues.apache.org/jira/browse/NIFI-2614.
>     >
>     > Thanks,
>     >
>     > Chris McDermott
>     >
>     > Remote Business Analytics
>     > STaTS/StoreFront Remote
>     > HPE Storage
>     > Hewlett Packard Enterprise
>     > Mobile: +1 978-697-5315
>     >
>     >
>     >
>     > On 8/20/16, 6:57 PM, "McDermott, Chris Kevin (MSDU -
> STaTS/StorefrontRemote)" <ch...@hpe.com> wrote:
>     >
>     >    I’ll raise a JIRA, Joe.
>     >
>     >    Thanks,
>     >
>     >    Chris McDermott
>     >
>     >    Remote Business Analytics
>     >    STaTS/StoreFront Remote
>     >    HPE Storage
>     >    Hewlett Packard Enterprise
>     >    Mobile: +1 978-697-5315
>     >
>     >
>     >
>     >    On 8/20/16, 6:52 PM, "Joe Witt" <jo...@gmail.com> wrote:
>     >
>     >        If no jira is raised sooner I'll raise one and get it sorted.
>     >
>     >        On Aug 20, 2016 6:40 PM, "Andrew Psaltis" <
> psaltis.andrew@gmail.com> wrote:
>     >
>     >> Hi Chris,
>     >> Sorry for not catching that code path. I am not sure if it is
> actually a
>     >> regression as I took a look at the 1.0.0-BETA code and it matches
> the
>     >> 0.7.0, specifically this comment block:
>     >>
>     >> /*
>     >> * We're using the default value from Kafka. We are using it to
> control the
>     >> * message size before it goes to to Kafka thus limiting possibility
> of a
>     >> * late failures in Kafka client.
>     >> */
>     >>
>     >> found at[1] leads me to believe it was intentional and not a
> regression.
>     >> Looking at the 0.6.1 release code it appears that PutKafka used a
> default
>     >> of 5 MB [2].
>     >>
>     >> I can speculate on the reasoning behind it, however, I will refrain
> from
>     >> opining on it as I was not involved in any of the conversations
> related to
>     >> the change and enforcement of the 1 MB max.
>     >>
>     >> [1]
>     >> https://github.com/apache/nifi/blob/rel/nifi-1.0.0-BETA-
>     >> official/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-
>     >> processors/src/main/java/org/apache/nifi/processors/kafka/
>     >> PublishingContext.java#L36-L41
>     >> [2]
>     >> https://github.com/apache/nifi/blob/rel/nifi-0.6.1/nifi-
>     >> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
>     >> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L169-L176
>     >>
>     >> Thanks,
>     >> Andrew
>     >>
>     >> On Sat, Aug 20, 2016 at 6:09 PM, McDermott, Chris Kevin (MSDU -
>     >> STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
>     >>
>     >>> Thanks, Andrew.
>     >>>
>     >>> I’ve set all of the right broker configs to allow larger messages.
>     >>> Believe me I spent a lot of time banging my head against the wall
>     >> thinking
>     >>> that the broker and topic configs were wrong.
>     >>>
>     >>> PublisingKafka uses PublishingContext.  That class has bean
> property
>     >>> called maxRequestSize, which defaults to 1048576.  As far as I can
> tell
>     >> the
>     >>> setMaxRequestSize() method is never called (except by some test
> code.)
>     >>> KafkaPublisher.publish() calls Max Record Size.getMaxRequestSize()
> and
>     >>> passes the result to the constructor for StreamDemarcator.   The
> publish
>     >>> method then calls the StreamDemarcator. getNextToken(), which in
> turns
>     >>> calls StreamDemarcator.fill() which compares the stream position
> against
>     >>> the maxRequestSize and throws the exception with this line.
>     >>>
>     >>> throw new IllegalStateException("Maximum allowed data size of " +
>     >>> this.maxDataSize + " exceeded.");
>     >>>
>     >>> Which matches what I see in the nifi-app.log file…
>     >>>
>     >>> 2016-08-20 22:03:05,470 ERROR [Timer-Driven Process Thread-8]
>     >>> o.apache.nifi.processors.kafka.PutKafka
>     >>> java.lang.IllegalStateException: Maximum allowed data size of
> 1048576
>     >>> exceeded.
>     >>>        at org.apache.nifi.stream.io.util.StreamDemarcator.fill(
>     >> StreamDemarcator.java:153)
>     >>> ~[nifi-utils-0.7.0.jar:0.7.0]
>     >>>        at org.apache.nifi.stream.io.util.StreamDemarcator.
>     >>> nextToken(StreamDemarcator.java:105) ~[nifi-utils-0.7.0.jar:0.7.0]
>     >>>        at org.apache.nifi.processors.kafka.KafkaPublisher.publish(
>     >> KafkaPublisher.java:129)
>     >>> ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
>     >>>        at org.apache.nifi.processors.kafka.PutKafka$1.process(
>     >> PutKafka.java:315)
>     >>> ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
>     >>>        at org.apache.nifi.controller.repository.
>     >>> StandardProcessSession.read(StandardProcessSession.java:1851)
>     >>> ~[nifi-framework-core-0.7.0.jar:0.7.0]
>     >>>        at org.apache.nifi.controller.repository.
>     >>> StandardProcessSession.read(StandardProcessSession.java:1822)
>     >>> ~[nifi-framework-core-0.7.0.jar:0.7.0]
>     >>>        at org.apache.nifi.processors.kafka.PutKafka.
>     >>> doRendezvousWithKafka(PutKafka.java:311)
> ~[nifi-kafka-processors-0.7.0.
>     >>> jar:0.7.0]
>     >>>        at org.apache.nifi.processors.kafka.PutKafka.
>     >>> rendezvousWithKafka(PutKafka.java:287)
> ~[nifi-kafka-processors-0.7.0.
>     >>> jar:0.7.0]
>     >>>        at org.apache.nifi.processors.kafka.AbstractKafkaProcessor.
>     >>> onTrigger(AbstractKafkaProcessor.java:76)
> ~[nifi-kafka-processors-0.7.0.
>     >>> jar:0.7.0]
>     >>>        at org.apache.nifi.controller.StandardProcessorNode.
> onTrigger(
>     >>> StandardProcessorNode.java:1054) [nifi-framework-core-0.7.0.
> jar:0.7.0]
>     >>>        at org.apache.nifi.controller.tasks.
> ContinuallyRunProcessorTask.
>     >>> call(ContinuallyRunProcessorTask.java:136)
> [nifi-framework-core-0.7.0.
>     >>> jar:0.7.0]
>     >>>        at org.apache.nifi.controller.tasks.
> ContinuallyRunProcessorTask.
>     >>> call(ContinuallyRunProcessorTask.java:47)
> [nifi-framework-core-0.7.0.
>     >>> jar:0.7.0]
>     >>>        at org.apache.nifi.controller.scheduling.
>     >>> TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.
> java:127)
>     >>> [nifi-framework-core-0.7.0.jar:0.7.0]
>     >>>        at java.util.concurrent.Executors$RunnableAdapter.
>     >> call(Executors.java:511)
>     >>> [na:1.8.0_45]
>     >>>        at java.util.concurrent.FutureTask.runAndReset(
>     >> FutureTask.java:308)
>     >>> [na:1.8.0_45]
>     >>>        at java.util.concurrent.ScheduledThreadPoolExecutor$
>     >>> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.
> java:180)
>     >>> [na:1.8.0_45]
>     >>>        at java.util.concurrent.ScheduledThreadPoolExecutor$
>     >>> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     >>> [na:1.8.0_45]
>     >>>        at java.util.concurrent.ThreadPoolExecutor.runWorker(
>     >> ThreadPoolExecutor.java:1142)
>     >>> [na:1.8.0_45]
>     >>>        at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>     >> ThreadPoolExecutor.java:617)
>     >>> [na:1.8.0_45]
>     >>>        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
>     >>>
>     >>> This occurs using PublishKafka, and PutKafka.  Setting the Max
> Record
>     >> Size
>     >>> property in the PutKafka processor has no affect on this.  Note
> the stack
>     >>> trace above is from the PutKafka processor with Max Record Size
> set to
>     >> 10MB.
>     >>>
>     >>> I believe that this a regression from 0.6.0.
>     >>>
>     >>> Chris McDermott
>     >>>
>     >>> Remote Business Analytics
>     >>> STaTS/StoreFront Remote
>     >>> HPE Storage
>     >>> Hewlett Packard Enterprise
>     >>> Mobile: +1 978-697-5315
>     >>>
>     >>>
>     >>>
>     >>> On 8/20/16, 3:48 PM, "Andrew Psaltis" <ps...@gmail.com>
> wrote:
>     >>>
>     >>>    Hi Chris,
>     >>>    Regarding the PutKafka processor looking at this block[1] of the
>     >>> PutKafka
>     >>>    code, it has a default size of 1 MB, but it does not restrict
> the
>     >>> size. The
>     >>>    DATA_SIZE_VALIDATOR does a sanity check and also enforces that
>     >>>    the supported value entered is the correct format <value> [B|
>     >>> KB|MB|GB|TB].
>     >>>    Later on in the code at this block[2], the value is set on the
> Kafka
>     >>>    config, again this does not enforce a value maximum.
>     >>>
>     >>>    In regards to the PublishKafka processor I do not see where it
>     >> accepts
>     >>> a
>     >>>    size nor restrict the size at all.
>     >>>
>     >>>    Have you adjusted the 'message.max.bytes' config value for your
>     >>> broker(s)?
>     >>>    The default value for that is 1 MB [3] (The url references the
> 0.8
>     >>> Kafka,
>     >>>    however I believe this default has been stable since the early
> days
>     >> of
>     >>> the
>     >>>    project.)
>     >>>
>     >>>    If you really do want to send messages that are larger than 1
> MB in
>     >>> size, I
>     >>>    would highly recommending reading this post[4] from Gwen
> Shapira.  It
>     >>> does
>     >>>    a great job of outlining the things you need to take into
>     >>> consideration.
>     >>>    This will also point you to the relevant configs in Kafka that
> will
>     >>> need to
>     >>>    be adjusted if you decide to go this route.
>     >>>
>     >>>
>     >>>    Thanks,
>     >>>    Andrew
>     >>>
>     >>>    [1]
>     >>>    https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
>     >>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
>     >>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L174-L180
>     >>>    [2]
>     >>>    https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
>     >>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
>     >>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L495
>     >>>    [3] https://kafka.apache.org/08/configuration.html
>     >>>    [4] http://ingest.tips/2015/01/21/
> handling-large-messages-kafka/
>     >>>
>     >>>    On Sat, Aug 20, 2016 at 3:25 PM, McDermott, Chris Kevin (MSDU -
>     >>>    STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
>     >>>
>     >>>> Hi folks,
>     >>>>
>     >>>>
>     >>>>
>     >>>> From experimentation and looking at the code it seems that the max
>     >>> message
>     >>>> size that can be sent via the PublishKafka and PutKafka processors
>     >>> in 0.7.0
>     >>>> is 1MB.  Can someone please confirm my read on this?
>     >>>>
>     >>>>
>     >>>>
>     >>>> Thanks,
>     >>>>
>     >>>>
>     >>>>
>     >>>> Chris McDermott
>     >>>>
>     >>>>
>     >>>>
>     >>>> Remote Business Analytics
>     >>>>
>     >>>> STaTS/StoreFront Remote
>     >>>>
>     >>>> HPE Storage
>     >>>>
>     >>>> Hewlett Packard Enterprise
>     >>>>
>     >>>> Mobile: +1 978-697-5315
>     >>>>
>     >>>>
>     >>>>
>     >>>>
>     >>>
>     >>>
>     >>>    --
>     >>>    Thanks,
>     >>>    Andrew
>     >>>
>     >>>    Subscribe to my book: Streaming Data <
> http://manning.com/psaltis>
>     >>>    <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
>     >>>    twiiter: @itmdata <http://twitter.com/intent/
>     >> user?screen_name=itmdata>
>     >>>
>     >>>
>     >>>
>     >>
>     >>
>     >> --
>     >> Thanks,
>     >> Andrew
>     >>
>     >> Subscribe to my book: Streaming Data <http://manning.com/psaltis>
>     >> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
>     >> twiiter: @itmdata <http://twitter.com/intent/
> user?screen_name=itmdata>
>     >>
>     >
>     >
>     >
>     >
>
>
>
>

Re: Max Kafka message size

Posted by "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <ch...@hpe.com>.
Roger/Wilco


Chris McDermott
 
Remote Business Analytics
STaTS/StoreFront Remote
HPE Storage
Hewlett Packard Enterprise
Mobile: +1 978-697-5315
 


On 8/25/16, 4:15 PM, "Joe Witt" <jo...@gmail.com> wrote:

    Chris,
    
    I just commented on the JIRA but please consider closing this JIRA
    as-is.  It addresses the request for publish kafka to support larger
    messages and works now against 0.9 and 0.10.  it is in the build and
    ready.  We're trying very hard to get 1.0 closed down and this was
    just something we could tackle with the other effort for kafka 0.9 and
    0.10 support.  I agree it would be good to also add support for 0.8
    users as well.  For that please create another JIRA and it can get
    into the next release assuming there is a patch and review.
    
    Thanks
    Joe
    
    On Thu, Aug 25, 2016 at 4:05 PM, McDermott, Chris Kevin (MSDU -
    STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
    > Hey Guys,
    >
    > I apologize if I jumped the gun but I reopened the JiRA as it does not address the issue with PutKafka.
    >
    > Chris McDermott
    >
    > Remote Business Analytics
    > STaTS/StoreFront Remote
    > HPE Storage
    > Hewlett Packard Enterprise
    > Mobile: +1 978-697-5315
    >
    >
    >
    > On 8/24/16, 11:55 AM, "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <ch...@hpe.com> wrote:
    >
    >     I wanted to follow up on this as I think it could help others.
    >
    >     Oleg, I created a patch for PutKafka/PublishKafka that removes the limit on the size of the message which can be sent.  I’d don’t know that it’s what you want to solve the JIRA, but I’m thinking not.  I was just an expediency for me.  However, if you are interested, let me know.
    >
    >     I discovered that GetKafka/ConsumeKafka can read messages > 1MiB if you add a dynamic property to the processor.  fetch.message.max.bytes needs to be set for GetKafka and max.partition.fetch.bytes needs to be set for consume Kafka.
    >
    >     Cheers,
    >
    >     Chris McDermott
    >
    >     Remote Business Analytics
    >     STaTS/StoreFront Remote
    >     HPE Storage
    >     Hewlett Packard Enterprise
    >     Mobile: +1 978-697-5315
    >
    >
    >
    >     On 8/21/16, 6:45 PM, "Oleg Zhurakousky" <oz...@hortonworks.com> wrote:
    >
    >         Chris
    >
    >         The “. . . querying the max.message.bytes. . .” is exactly what I had in mind. As you mentioned earlier, that is where the default value came from in the first place. So, yes that is what we’re going to link together.
    >
    >         Cheers
    >         Oleg
    >
    >         > On Aug 21, 2016, at 6:15 PM, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
    >         >
    >         > Thanks for getting back, Oleg. I’d be happy to send demarcated messages; the problem is reassembling them at the other end of the pipe.  I’ve done a lot of searching for techniques to do this.  They all seem to have major draw backs in terms of reliable message delivery or in terms of garbage collection (not in the sense of Java GC, but cleanup of files bounced off of a shared file system.) The nice thing about Kafka is its atomic, it has replicated delivery, and guaranteed GC semantics.   My use case has fairly low throughput requirements (thousands, not millions of TPM) where most messages are fairly small but a few are larger.
    >         >
    >         > It would be nice if the Kafka client could learn the max message size from Kafka itself by querying the max.message.bytes on the topic, rather than have the flow designer be required to set it on the producer Processors.  For now, though I’d be happy going back to the old behavior where its set on the producer Processors.  On the flip side I am also concerned that the clients (GetKafka and ConsumeKafka) do not expose a max message parameter.  That will be equally problematic.
    >         >
    >         > Anyway, enough of my blathering.
    >         >
    >         > Yours and the communities help is greatly appreciated.
    >         >
    >         > Thanks,
    >         >
    >         > Chris McDermott
    >         >
    >         > Remote Business Analytics
    >         > STaTS/StoreFront Remote
    >         > HPE Storage
    >         > Hewlett Packard Enterprise
    >         > Mobile: +1 978-697-5315
    >         >
    >         >
    >         >
    >         > On 8/21/16, 3:43 PM, "Oleg Zhurakousky" <oz...@hortonworks.com> wrote:
    >         >
    >         >    Chris
    >         >
    >         >    Sorry for getting late to this
    >         >
    >         >    This indeed was intentional, primarily to follow Kafka’s best practices where Kafka was not designed to be a general data transfer system of arbitrary size but rather “manageable” size. Also, as you know we have ability to demarcate messages essentially allowing you to send a very large FlowFile that will be parsed into chunks where each chunk will end up to be a separate Kafka message.
    >         >    That said, we did consider that at some point we may expose it as configuration property and it seems like the time has come.
    >         >
    >         >    Cheers
    >         >    Oleg
    >         >
    >         >> On Aug 20, 2016, at 7:03 PM, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
    >         >>
    >         >> Jira is https://issues.apache.org/jira/browse/NIFI-2614.
    >         >>
    >         >> Thanks,
    >         >>
    >         >> Chris McDermott
    >         >>
    >         >> Remote Business Analytics
    >         >> STaTS/StoreFront Remote
    >         >> HPE Storage
    >         >> Hewlett Packard Enterprise
    >         >> Mobile: +1 978-697-5315
    >         >>
    >         >>
    >         >>
    >         >> On 8/20/16, 6:57 PM, "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <ch...@hpe.com> wrote:
    >         >>
    >         >>   I’ll raise a JIRA, Joe.
    >         >>
    >         >>   Thanks,
    >         >>
    >         >>   Chris McDermott
    >         >>
    >         >>   Remote Business Analytics
    >         >>   STaTS/StoreFront Remote
    >         >>   HPE Storage
    >         >>   Hewlett Packard Enterprise
    >         >>   Mobile: +1 978-697-5315
    >         >>
    >         >>
    >         >>
    >         >>   On 8/20/16, 6:52 PM, "Joe Witt" <jo...@gmail.com> wrote:
    >         >>
    >         >>       If no jira is raised sooner I'll raise one and get it sorted.
    >         >>
    >         >>       On Aug 20, 2016 6:40 PM, "Andrew Psaltis" <ps...@gmail.com> wrote:
    >         >>
    >         >>> Hi Chris,
    >         >>> Sorry for not catching that code path. I am not sure if it is actually a
    >         >>> regression as I took a look at the 1.0.0-BETA code and it matches the
    >         >>> 0.7.0, specifically this comment block:
    >         >>>
    >         >>> /*
    >         >>> * We're using the default value from Kafka. We are using it to control the
    >         >>> * message size before it goes to to Kafka thus limiting possibility of a
    >         >>> * late failures in Kafka client.
    >         >>> */
    >         >>>
    >         >>> found at[1] leads me to believe it was intentional and not a regression.
    >         >>> Looking at the 0.6.1 release code it appears that PutKafka used a default
    >         >>> of 5 MB [2].
    >         >>>
    >         >>> I can speculate on the reasoning behind it, however, I will refrain from
    >         >>> opining on it as I was not involved in any of the conversations related to
    >         >>> the change and enforcement of the 1 MB max.
    >         >>>
    >         >>> [1]
    >         >>> https://github.com/apache/nifi/blob/rel/nifi-1.0.0-BETA-
    >         >>> official/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-
    >         >>> processors/src/main/java/org/apache/nifi/processors/kafka/
    >         >>> PublishingContext.java#L36-L41
    >         >>> [2]
    >         >>> https://github.com/apache/nifi/blob/rel/nifi-0.6.1/nifi-
    >         >>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
    >         >>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L169-L176
    >         >>>
    >         >>> Thanks,
    >         >>> Andrew
    >         >>>
    >         >>> On Sat, Aug 20, 2016 at 6:09 PM, McDermott, Chris Kevin (MSDU -
    >         >>> STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
    >         >>>
    >         >>>> Thanks, Andrew.
    >         >>>>
    >         >>>> I’ve set all of the right broker configs to allow larger messages.
    >         >>>> Believe me I spent a lot of time banging my head against the wall
    >         >>> thinking
    >         >>>> that the broker and topic configs were wrong.
    >         >>>>
    >         >>>> PublisingKafka uses PublishingContext.  That class has bean property
    >         >>>> called maxRequestSize, which defaults to 1048576.  As far as I can tell
    >         >>> the
    >         >>>> setMaxRequestSize() method is never called (except by some test code.)
    >         >>>> KafkaPublisher.publish() calls Max Record Size.getMaxRequestSize() and
    >         >>>> passes the result to the constructor for StreamDemarcator.   The publish
    >         >>>> method then calls the StreamDemarcator. getNextToken(), which in turns
    >         >>>> calls StreamDemarcator.fill() which compares the stream position against
    >         >>>> the maxRequestSize and throws the exception with this line.
    >         >>>>
    >         >>>> throw new IllegalStateException("Maximum allowed data size of " +
    >         >>>> this.maxDataSize + " exceeded.");
    >         >>>>
    >         >>>> Which matches what I see in the nifi-app.log file…
    >         >>>>
    >         >>>> 2016-08-20 22:03:05,470 ERROR [Timer-Driven Process Thread-8]
    >         >>>> o.apache.nifi.processors.kafka.PutKafka
    >         >>>> java.lang.IllegalStateException: Maximum allowed data size of 1048576
    >         >>>> exceeded.
    >         >>>>       at org.apache.nifi.stream.io.util.StreamDemarcator.fill(
    >         >>> StreamDemarcator.java:153)
    >         >>>> ~[nifi-utils-0.7.0.jar:0.7.0]
    >         >>>>       at org.apache.nifi.stream.io.util.StreamDemarcator.
    >         >>>> nextToken(StreamDemarcator.java:105) ~[nifi-utils-0.7.0.jar:0.7.0]
    >         >>>>       at org.apache.nifi.processors.kafka.KafkaPublisher.publish(
    >         >>> KafkaPublisher.java:129)
    >         >>>> ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
    >         >>>>       at org.apache.nifi.processors.kafka.PutKafka$1.process(
    >         >>> PutKafka.java:315)
    >         >>>> ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
    >         >>>>       at org.apache.nifi.controller.repository.
    >         >>>> StandardProcessSession.read(StandardProcessSession.java:1851)
    >         >>>> ~[nifi-framework-core-0.7.0.jar:0.7.0]
    >         >>>>       at org.apache.nifi.controller.repository.
    >         >>>> StandardProcessSession.read(StandardProcessSession.java:1822)
    >         >>>> ~[nifi-framework-core-0.7.0.jar:0.7.0]
    >         >>>>       at org.apache.nifi.processors.kafka.PutKafka.
    >         >>>> doRendezvousWithKafka(PutKafka.java:311) ~[nifi-kafka-processors-0.7.0.
    >         >>>> jar:0.7.0]
    >         >>>>       at org.apache.nifi.processors.kafka.PutKafka.
    >         >>>> rendezvousWithKafka(PutKafka.java:287) ~[nifi-kafka-processors-0.7.0.
    >         >>>> jar:0.7.0]
    >         >>>>       at org.apache.nifi.processors.kafka.AbstractKafkaProcessor.
    >         >>>> onTrigger(AbstractKafkaProcessor.java:76) ~[nifi-kafka-processors-0.7.0.
    >         >>>> jar:0.7.0]
    >         >>>>       at org.apache.nifi.controller.StandardProcessorNode.onTrigger(
    >         >>>> StandardProcessorNode.java:1054) [nifi-framework-core-0.7.0.jar:0.7.0]
    >         >>>>       at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
    >         >>>> call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0.
    >         >>>> jar:0.7.0]
    >         >>>>       at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
    >         >>>> call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0.
    >         >>>> jar:0.7.0]
    >         >>>>       at org.apache.nifi.controller.scheduling.
    >         >>>> TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:127)
    >         >>>> [nifi-framework-core-0.7.0.jar:0.7.0]
    >         >>>>       at java.util.concurrent.Executors$RunnableAdapter.
    >         >>> call(Executors.java:511)
    >         >>>> [na:1.8.0_45]
    >         >>>>       at java.util.concurrent.FutureTask.runAndReset(
    >         >>> FutureTask.java:308)
    >         >>>> [na:1.8.0_45]
    >         >>>>       at java.util.concurrent.ScheduledThreadPoolExecutor$
    >         >>>> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    >         >>>> [na:1.8.0_45]
    >         >>>>       at java.util.concurrent.ScheduledThreadPoolExecutor$
    >         >>>> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    >         >>>> [na:1.8.0_45]
    >         >>>>       at java.util.concurrent.ThreadPoolExecutor.runWorker(
    >         >>> ThreadPoolExecutor.java:1142)
    >         >>>> [na:1.8.0_45]
    >         >>>>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(
    >         >>> ThreadPoolExecutor.java:617)
    >         >>>> [na:1.8.0_45]
    >         >>>>       at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
    >         >>>>
    >         >>>> This occurs using PublishKafka, and PutKafka.  Setting the Max Record
    >         >>> Size
    >         >>>> property in the PutKafka processor has no affect on this.  Note the stack
    >         >>>> trace above is from the PutKafka processor with Max Record Size set to
    >         >>> 10MB.
    >         >>>>
    >         >>>> I believe that this a regression from 0.6.0.
    >         >>>>
    >         >>>> Chris McDermott
    >         >>>>
    >         >>>> Remote Business Analytics
    >         >>>> STaTS/StoreFront Remote
    >         >>>> HPE Storage
    >         >>>> Hewlett Packard Enterprise
    >         >>>> Mobile: +1 978-697-5315
    >         >>>>
    >         >>>>
    >         >>>>
    >         >>>> On 8/20/16, 3:48 PM, "Andrew Psaltis" <ps...@gmail.com> wrote:
    >         >>>>
    >         >>>>   Hi Chris,
    >         >>>>   Regarding the PutKafka processor looking at this block[1] of the
    >         >>>> PutKafka
    >         >>>>   code, it has a default size of 1 MB, but it does not restrict the
    >         >>>> size. The
    >         >>>>   DATA_SIZE_VALIDATOR does a sanity check and also enforces that
    >         >>>>   the supported value entered is the correct format <value> [B|
    >         >>>> KB|MB|GB|TB].
    >         >>>>   Later on in the code at this block[2], the value is set on the Kafka
    >         >>>>   config, again this does not enforce a value maximum.
    >         >>>>
    >         >>>>   In regards to the PublishKafka processor I do not see where it
    >         >>> accepts
    >         >>>> a
    >         >>>>   size nor restrict the size at all.
    >         >>>>
    >         >>>>   Have you adjusted the 'message.max.bytes' config value for your
    >         >>>> broker(s)?
    >         >>>>   The default value for that is 1 MB [3] (The url references the 0.8
    >         >>>> Kafka,
    >         >>>>   however I believe this default has been stable since the early days
    >         >>> of
    >         >>>> the
    >         >>>>   project.)
    >         >>>>
    >         >>>>   If you really do want to send messages that are larger than 1 MB in
    >         >>>> size, I
    >         >>>>   would highly recommending reading this post[4] from Gwen Shapira.  It
    >         >>>> does
    >         >>>>   a great job of outlining the things you need to take into
    >         >>>> consideration.
    >         >>>>   This will also point you to the relevant configs in Kafka that will
    >         >>>> need to
    >         >>>>   be adjusted if you decide to go this route.
    >         >>>>
    >         >>>>
    >         >>>>   Thanks,
    >         >>>>   Andrew
    >         >>>>
    >         >>>>   [1]
    >         >>>>   https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
    >         >>>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
    >         >>>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L174-L180
    >         >>>>   [2]
    >         >>>>   https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
    >         >>>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
    >         >>>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L495
    >         >>>>   [3] https://kafka.apache.org/08/configuration.html
    >         >>>>   [4] http://ingest.tips/2015/01/21/handling-large-messages-kafka/
    >         >>>>
    >         >>>>   On Sat, Aug 20, 2016 at 3:25 PM, McDermott, Chris Kevin (MSDU -
    >         >>>>   STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
    >         >>>>
    >         >>>>> Hi folks,
    >         >>>>>
    >         >>>>>
    >         >>>>>
    >         >>>>> From experimentation and looking at the code it seems that the max
    >         >>>> message
    >         >>>>> size that can be sent via the PublishKafka and PutKafka processors
    >         >>>> in 0.7.0
    >         >>>>> is 1MB.  Can someone please confirm my read on this?
    >         >>>>>
    >         >>>>>
    >         >>>>>
    >         >>>>> Thanks,
    >         >>>>>
    >         >>>>>
    >         >>>>>
    >         >>>>> Chris McDermott
    >         >>>>>
    >         >>>>>
    >         >>>>>
    >         >>>>> Remote Business Analytics
    >         >>>>>
    >         >>>>> STaTS/StoreFront Remote
    >         >>>>>
    >         >>>>> HPE Storage
    >         >>>>>
    >         >>>>> Hewlett Packard Enterprise
    >         >>>>>
    >         >>>>> Mobile: +1 978-697-5315
    >         >>>>>
    >         >>>>>
    >         >>>>>
    >         >>>>>
    >         >>>>
    >         >>>>
    >         >>>>   --
    >         >>>>   Thanks,
    >         >>>>   Andrew
    >         >>>>
    >         >>>>   Subscribe to my book: Streaming Data <http://manning.com/psaltis>
    >         >>>>   <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
    >         >>>>   twiiter: @itmdata <http://twitter.com/intent/
    >         >>> user?screen_name=itmdata>
    >         >>>>
    >         >>>>
    >         >>>>
    >         >>>
    >         >>>
    >         >>> --
    >         >>> Thanks,
    >         >>> Andrew
    >         >>>
    >         >>> Subscribe to my book: Streaming Data <http://manning.com/psaltis>
    >         >>> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
    >         >>> twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata>
    >         >>>
    >         >>
    >         >>
    >         >>
    >         >>
    >         >
    >         >
    >         >
    >
    >
    >
    >
    >
    


Re: Max Kafka message size

Posted by Joe Witt <jo...@gmail.com>.
Chris,

I just commented on the JIRA but please consider closing this JIRA
as-is.  It addresses the request for publish kafka to support larger
messages and works now against 0.9 and 0.10.  it is in the build and
ready.  We're trying very hard to get 1.0 closed down and this was
just something we could tackle with the other effort for kafka 0.9 and
0.10 support.  I agree it would be good to also add support for 0.8
users as well.  For that please create another JIRA and it can get
into the next release assuming there is a patch and review.

Thanks
Joe

On Thu, Aug 25, 2016 at 4:05 PM, McDermott, Chris Kevin (MSDU -
STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
> Hey Guys,
>
> I apologize if I jumped the gun but I reopened the JiRA as it does not address the issue with PutKafka.
>
> Chris McDermott
>
> Remote Business Analytics
> STaTS/StoreFront Remote
> HPE Storage
> Hewlett Packard Enterprise
> Mobile: +1 978-697-5315
>
>
>
> On 8/24/16, 11:55 AM, "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <ch...@hpe.com> wrote:
>
>     I wanted to follow up on this as I think it could help others.
>
>     Oleg, I created a patch for PutKafka/PublishKafka that removes the limit on the size of the message which can be sent.  I’d don’t know that it’s what you want to solve the JIRA, but I’m thinking not.  I was just an expediency for me.  However, if you are interested, let me know.
>
>     I discovered that GetKafka/ConsumeKafka can read messages > 1MiB if you add a dynamic property to the processor.  fetch.message.max.bytes needs to be set for GetKafka and max.partition.fetch.bytes needs to be set for consume Kafka.
>
>     Cheers,
>
>     Chris McDermott
>
>     Remote Business Analytics
>     STaTS/StoreFront Remote
>     HPE Storage
>     Hewlett Packard Enterprise
>     Mobile: +1 978-697-5315
>
>
>
>     On 8/21/16, 6:45 PM, "Oleg Zhurakousky" <oz...@hortonworks.com> wrote:
>
>         Chris
>
>         The “. . . querying the max.message.bytes. . .” is exactly what I had in mind. As you mentioned earlier, that is where the default value came from in the first place. So, yes that is what we’re going to link together.
>
>         Cheers
>         Oleg
>
>         > On Aug 21, 2016, at 6:15 PM, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
>         >
>         > Thanks for getting back, Oleg. I’d be happy to send demarcated messages; the problem is reassembling them at the other end of the pipe.  I’ve done a lot of searching for techniques to do this.  They all seem to have major draw backs in terms of reliable message delivery or in terms of garbage collection (not in the sense of Java GC, but cleanup of files bounced off of a shared file system.) The nice thing about Kafka is its atomic, it has replicated delivery, and guaranteed GC semantics.   My use case has fairly low throughput requirements (thousands, not millions of TPM) where most messages are fairly small but a few are larger.
>         >
>         > It would be nice if the Kafka client could learn the max message size from Kafka itself by querying the max.message.bytes on the topic, rather than have the flow designer be required to set it on the producer Processors.  For now, though I’d be happy going back to the old behavior where its set on the producer Processors.  On the flip side I am also concerned that the clients (GetKafka and ConsumeKafka) do not expose a max message parameter.  That will be equally problematic.
>         >
>         > Anyway, enough of my blathering.
>         >
>         > Yours and the communities help is greatly appreciated.
>         >
>         > Thanks,
>         >
>         > Chris McDermott
>         >
>         > Remote Business Analytics
>         > STaTS/StoreFront Remote
>         > HPE Storage
>         > Hewlett Packard Enterprise
>         > Mobile: +1 978-697-5315
>         >
>         >
>         >
>         > On 8/21/16, 3:43 PM, "Oleg Zhurakousky" <oz...@hortonworks.com> wrote:
>         >
>         >    Chris
>         >
>         >    Sorry for getting late to this
>         >
>         >    This indeed was intentional, primarily to follow Kafka’s best practices where Kafka was not designed to be a general data transfer system of arbitrary size but rather “manageable” size. Also, as you know we have ability to demarcate messages essentially allowing you to send a very large FlowFile that will be parsed into chunks where each chunk will end up to be a separate Kafka message.
>         >    That said, we did consider that at some point we may expose it as configuration property and it seems like the time has come.
>         >
>         >    Cheers
>         >    Oleg
>         >
>         >> On Aug 20, 2016, at 7:03 PM, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
>         >>
>         >> Jira is https://issues.apache.org/jira/browse/NIFI-2614.
>         >>
>         >> Thanks,
>         >>
>         >> Chris McDermott
>         >>
>         >> Remote Business Analytics
>         >> STaTS/StoreFront Remote
>         >> HPE Storage
>         >> Hewlett Packard Enterprise
>         >> Mobile: +1 978-697-5315
>         >>
>         >>
>         >>
>         >> On 8/20/16, 6:57 PM, "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <ch...@hpe.com> wrote:
>         >>
>         >>   I’ll raise a JIRA, Joe.
>         >>
>         >>   Thanks,
>         >>
>         >>   Chris McDermott
>         >>
>         >>   Remote Business Analytics
>         >>   STaTS/StoreFront Remote
>         >>   HPE Storage
>         >>   Hewlett Packard Enterprise
>         >>   Mobile: +1 978-697-5315
>         >>
>         >>
>         >>
>         >>   On 8/20/16, 6:52 PM, "Joe Witt" <jo...@gmail.com> wrote:
>         >>
>         >>       If no jira is raised sooner I'll raise one and get it sorted.
>         >>
>         >>       On Aug 20, 2016 6:40 PM, "Andrew Psaltis" <ps...@gmail.com> wrote:
>         >>
>         >>> Hi Chris,
>         >>> Sorry for not catching that code path. I am not sure if it is actually a
>         >>> regression as I took a look at the 1.0.0-BETA code and it matches the
>         >>> 0.7.0, specifically this comment block:
>         >>>
>         >>> /*
>         >>> * We're using the default value from Kafka. We are using it to control the
>         >>> * message size before it goes to to Kafka thus limiting possibility of a
>         >>> * late failures in Kafka client.
>         >>> */
>         >>>
>         >>> found at[1] leads me to believe it was intentional and not a regression.
>         >>> Looking at the 0.6.1 release code it appears that PutKafka used a default
>         >>> of 5 MB [2].
>         >>>
>         >>> I can speculate on the reasoning behind it, however, I will refrain from
>         >>> opining on it as I was not involved in any of the conversations related to
>         >>> the change and enforcement of the 1 MB max.
>         >>>
>         >>> [1]
>         >>> https://github.com/apache/nifi/blob/rel/nifi-1.0.0-BETA-
>         >>> official/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-
>         >>> processors/src/main/java/org/apache/nifi/processors/kafka/
>         >>> PublishingContext.java#L36-L41
>         >>> [2]
>         >>> https://github.com/apache/nifi/blob/rel/nifi-0.6.1/nifi-
>         >>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
>         >>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L169-L176
>         >>>
>         >>> Thanks,
>         >>> Andrew
>         >>>
>         >>> On Sat, Aug 20, 2016 at 6:09 PM, McDermott, Chris Kevin (MSDU -
>         >>> STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
>         >>>
>         >>>> Thanks, Andrew.
>         >>>>
>         >>>> I’ve set all of the right broker configs to allow larger messages.
>         >>>> Believe me I spent a lot of time banging my head against the wall
>         >>> thinking
>         >>>> that the broker and topic configs were wrong.
>         >>>>
>         >>>> PublisingKafka uses PublishingContext.  That class has bean property
>         >>>> called maxRequestSize, which defaults to 1048576.  As far as I can tell
>         >>> the
>         >>>> setMaxRequestSize() method is never called (except by some test code.)
>         >>>> KafkaPublisher.publish() calls Max Record Size.getMaxRequestSize() and
>         >>>> passes the result to the constructor for StreamDemarcator.   The publish
>         >>>> method then calls the StreamDemarcator. getNextToken(), which in turns
>         >>>> calls StreamDemarcator.fill() which compares the stream position against
>         >>>> the maxRequestSize and throws the exception with this line.
>         >>>>
>         >>>> throw new IllegalStateException("Maximum allowed data size of " +
>         >>>> this.maxDataSize + " exceeded.");
>         >>>>
>         >>>> Which matches what I see in the nifi-app.log file…
>         >>>>
>         >>>> 2016-08-20 22:03:05,470 ERROR [Timer-Driven Process Thread-8]
>         >>>> o.apache.nifi.processors.kafka.PutKafka
>         >>>> java.lang.IllegalStateException: Maximum allowed data size of 1048576
>         >>>> exceeded.
>         >>>>       at org.apache.nifi.stream.io.util.StreamDemarcator.fill(
>         >>> StreamDemarcator.java:153)
>         >>>> ~[nifi-utils-0.7.0.jar:0.7.0]
>         >>>>       at org.apache.nifi.stream.io.util.StreamDemarcator.
>         >>>> nextToken(StreamDemarcator.java:105) ~[nifi-utils-0.7.0.jar:0.7.0]
>         >>>>       at org.apache.nifi.processors.kafka.KafkaPublisher.publish(
>         >>> KafkaPublisher.java:129)
>         >>>> ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
>         >>>>       at org.apache.nifi.processors.kafka.PutKafka$1.process(
>         >>> PutKafka.java:315)
>         >>>> ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
>         >>>>       at org.apache.nifi.controller.repository.
>         >>>> StandardProcessSession.read(StandardProcessSession.java:1851)
>         >>>> ~[nifi-framework-core-0.7.0.jar:0.7.0]
>         >>>>       at org.apache.nifi.controller.repository.
>         >>>> StandardProcessSession.read(StandardProcessSession.java:1822)
>         >>>> ~[nifi-framework-core-0.7.0.jar:0.7.0]
>         >>>>       at org.apache.nifi.processors.kafka.PutKafka.
>         >>>> doRendezvousWithKafka(PutKafka.java:311) ~[nifi-kafka-processors-0.7.0.
>         >>>> jar:0.7.0]
>         >>>>       at org.apache.nifi.processors.kafka.PutKafka.
>         >>>> rendezvousWithKafka(PutKafka.java:287) ~[nifi-kafka-processors-0.7.0.
>         >>>> jar:0.7.0]
>         >>>>       at org.apache.nifi.processors.kafka.AbstractKafkaProcessor.
>         >>>> onTrigger(AbstractKafkaProcessor.java:76) ~[nifi-kafka-processors-0.7.0.
>         >>>> jar:0.7.0]
>         >>>>       at org.apache.nifi.controller.StandardProcessorNode.onTrigger(
>         >>>> StandardProcessorNode.java:1054) [nifi-framework-core-0.7.0.jar:0.7.0]
>         >>>>       at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
>         >>>> call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0.
>         >>>> jar:0.7.0]
>         >>>>       at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
>         >>>> call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0.
>         >>>> jar:0.7.0]
>         >>>>       at org.apache.nifi.controller.scheduling.
>         >>>> TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:127)
>         >>>> [nifi-framework-core-0.7.0.jar:0.7.0]
>         >>>>       at java.util.concurrent.Executors$RunnableAdapter.
>         >>> call(Executors.java:511)
>         >>>> [na:1.8.0_45]
>         >>>>       at java.util.concurrent.FutureTask.runAndReset(
>         >>> FutureTask.java:308)
>         >>>> [na:1.8.0_45]
>         >>>>       at java.util.concurrent.ScheduledThreadPoolExecutor$
>         >>>> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>         >>>> [na:1.8.0_45]
>         >>>>       at java.util.concurrent.ScheduledThreadPoolExecutor$
>         >>>> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>         >>>> [na:1.8.0_45]
>         >>>>       at java.util.concurrent.ThreadPoolExecutor.runWorker(
>         >>> ThreadPoolExecutor.java:1142)
>         >>>> [na:1.8.0_45]
>         >>>>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>         >>> ThreadPoolExecutor.java:617)
>         >>>> [na:1.8.0_45]
>         >>>>       at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
>         >>>>
>         >>>> This occurs using PublishKafka, and PutKafka.  Setting the Max Record
>         >>> Size
>         >>>> property in the PutKafka processor has no affect on this.  Note the stack
>         >>>> trace above is from the PutKafka processor with Max Record Size set to
>         >>> 10MB.
>         >>>>
>         >>>> I believe that this a regression from 0.6.0.
>         >>>>
>         >>>> Chris McDermott
>         >>>>
>         >>>> Remote Business Analytics
>         >>>> STaTS/StoreFront Remote
>         >>>> HPE Storage
>         >>>> Hewlett Packard Enterprise
>         >>>> Mobile: +1 978-697-5315
>         >>>>
>         >>>>
>         >>>>
>         >>>> On 8/20/16, 3:48 PM, "Andrew Psaltis" <ps...@gmail.com> wrote:
>         >>>>
>         >>>>   Hi Chris,
>         >>>>   Regarding the PutKafka processor looking at this block[1] of the
>         >>>> PutKafka
>         >>>>   code, it has a default size of 1 MB, but it does not restrict the
>         >>>> size. The
>         >>>>   DATA_SIZE_VALIDATOR does a sanity check and also enforces that
>         >>>>   the supported value entered is the correct format <value> [B|
>         >>>> KB|MB|GB|TB].
>         >>>>   Later on in the code at this block[2], the value is set on the Kafka
>         >>>>   config, again this does not enforce a value maximum.
>         >>>>
>         >>>>   In regards to the PublishKafka processor I do not see where it
>         >>> accepts
>         >>>> a
>         >>>>   size nor restrict the size at all.
>         >>>>
>         >>>>   Have you adjusted the 'message.max.bytes' config value for your
>         >>>> broker(s)?
>         >>>>   The default value for that is 1 MB [3] (The url references the 0.8
>         >>>> Kafka,
>         >>>>   however I believe this default has been stable since the early days
>         >>> of
>         >>>> the
>         >>>>   project.)
>         >>>>
>         >>>>   If you really do want to send messages that are larger than 1 MB in
>         >>>> size, I
>         >>>>   would highly recommending reading this post[4] from Gwen Shapira.  It
>         >>>> does
>         >>>>   a great job of outlining the things you need to take into
>         >>>> consideration.
>         >>>>   This will also point you to the relevant configs in Kafka that will
>         >>>> need to
>         >>>>   be adjusted if you decide to go this route.
>         >>>>
>         >>>>
>         >>>>   Thanks,
>         >>>>   Andrew
>         >>>>
>         >>>>   [1]
>         >>>>   https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
>         >>>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
>         >>>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L174-L180
>         >>>>   [2]
>         >>>>   https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
>         >>>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
>         >>>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L495
>         >>>>   [3] https://kafka.apache.org/08/configuration.html
>         >>>>   [4] http://ingest.tips/2015/01/21/handling-large-messages-kafka/
>         >>>>
>         >>>>   On Sat, Aug 20, 2016 at 3:25 PM, McDermott, Chris Kevin (MSDU -
>         >>>>   STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
>         >>>>
>         >>>>> Hi folks,
>         >>>>>
>         >>>>>
>         >>>>>
>         >>>>> From experimentation and looking at the code it seems that the max
>         >>>> message
>         >>>>> size that can be sent via the PublishKafka and PutKafka processors
>         >>>> in 0.7.0
>         >>>>> is 1MB.  Can someone please confirm my read on this?
>         >>>>>
>         >>>>>
>         >>>>>
>         >>>>> Thanks,
>         >>>>>
>         >>>>>
>         >>>>>
>         >>>>> Chris McDermott
>         >>>>>
>         >>>>>
>         >>>>>
>         >>>>> Remote Business Analytics
>         >>>>>
>         >>>>> STaTS/StoreFront Remote
>         >>>>>
>         >>>>> HPE Storage
>         >>>>>
>         >>>>> Hewlett Packard Enterprise
>         >>>>>
>         >>>>> Mobile: +1 978-697-5315
>         >>>>>
>         >>>>>
>         >>>>>
>         >>>>>
>         >>>>
>         >>>>
>         >>>>   --
>         >>>>   Thanks,
>         >>>>   Andrew
>         >>>>
>         >>>>   Subscribe to my book: Streaming Data <http://manning.com/psaltis>
>         >>>>   <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
>         >>>>   twiiter: @itmdata <http://twitter.com/intent/
>         >>> user?screen_name=itmdata>
>         >>>>
>         >>>>
>         >>>>
>         >>>
>         >>>
>         >>> --
>         >>> Thanks,
>         >>> Andrew
>         >>>
>         >>> Subscribe to my book: Streaming Data <http://manning.com/psaltis>
>         >>> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
>         >>> twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata>
>         >>>
>         >>
>         >>
>         >>
>         >>
>         >
>         >
>         >
>
>
>
>
>

Re: Max Kafka message size

Posted by "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <ch...@hpe.com>.
Hey Guys,  

I apologize if I jumped the gun but I reopened the JiRA as it does not address the issue with PutKafka.  

Chris McDermott
 
Remote Business Analytics
STaTS/StoreFront Remote
HPE Storage
Hewlett Packard Enterprise
Mobile: +1 978-697-5315
 


On 8/24/16, 11:55 AM, "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <ch...@hpe.com> wrote:

    I wanted to follow up on this as I think it could help others.
    
    Oleg, I created a patch for PutKafka/PublishKafka that removes the limit on the size of the message which can be sent.  I’d don’t know that it’s what you want to solve the JIRA, but I’m thinking not.  I was just an expediency for me.  However, if you are interested, let me know.
    
    I discovered that GetKafka/ConsumeKafka can read messages > 1MiB if you add a dynamic property to the processor.  fetch.message.max.bytes needs to be set for GetKafka and max.partition.fetch.bytes needs to be set for consume Kafka.
    
    Cheers,
    
    Chris McDermott
     
    Remote Business Analytics
    STaTS/StoreFront Remote
    HPE Storage
    Hewlett Packard Enterprise
    Mobile: +1 978-697-5315
     
    
    
    On 8/21/16, 6:45 PM, "Oleg Zhurakousky" <oz...@hortonworks.com> wrote:
    
        Chris
        
        The “. . . querying the max.message.bytes. . .” is exactly what I had in mind. As you mentioned earlier, that is where the default value came from in the first place. So, yes that is what we’re going to link together.
        
        Cheers
        Oleg
        
        > On Aug 21, 2016, at 6:15 PM, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
        > 
        > Thanks for getting back, Oleg. I’d be happy to send demarcated messages; the problem is reassembling them at the other end of the pipe.  I’ve done a lot of searching for techniques to do this.  They all seem to have major draw backs in terms of reliable message delivery or in terms of garbage collection (not in the sense of Java GC, but cleanup of files bounced off of a shared file system.) The nice thing about Kafka is its atomic, it has replicated delivery, and guaranteed GC semantics.   My use case has fairly low throughput requirements (thousands, not millions of TPM) where most messages are fairly small but a few are larger.
        > 
        > It would be nice if the Kafka client could learn the max message size from Kafka itself by querying the max.message.bytes on the topic, rather than have the flow designer be required to set it on the producer Processors.  For now, though I’d be happy going back to the old behavior where its set on the producer Processors.  On the flip side I am also concerned that the clients (GetKafka and ConsumeKafka) do not expose a max message parameter.  That will be equally problematic.
        > 
        > Anyway, enough of my blathering.  
        > 
        > Yours and the communities help is greatly appreciated.
        > 
        > Thanks,
        > 
        > Chris McDermott
        > 
        > Remote Business Analytics
        > STaTS/StoreFront Remote
        > HPE Storage
        > Hewlett Packard Enterprise
        > Mobile: +1 978-697-5315
        > 
        > 
        > 
        > On 8/21/16, 3:43 PM, "Oleg Zhurakousky" <oz...@hortonworks.com> wrote:
        > 
        >    Chris
        > 
        >    Sorry for getting late to this
        > 
        >    This indeed was intentional, primarily to follow Kafka’s best practices where Kafka was not designed to be a general data transfer system of arbitrary size but rather “manageable” size. Also, as you know we have ability to demarcate messages essentially allowing you to send a very large FlowFile that will be parsed into chunks where each chunk will end up to be a separate Kafka message.
        >    That said, we did consider that at some point we may expose it as configuration property and it seems like the time has come.
        > 
        >    Cheers
        >    Oleg
        > 
        >> On Aug 20, 2016, at 7:03 PM, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
        >> 
        >> Jira is https://issues.apache.org/jira/browse/NIFI-2614.
        >> 
        >> Thanks,
        >> 
        >> Chris McDermott
        >> 
        >> Remote Business Analytics
        >> STaTS/StoreFront Remote
        >> HPE Storage
        >> Hewlett Packard Enterprise
        >> Mobile: +1 978-697-5315
        >> 
        >> 
        >> 
        >> On 8/20/16, 6:57 PM, "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <ch...@hpe.com> wrote:
        >> 
        >>   I’ll raise a JIRA, Joe.
        >> 
        >>   Thanks,
        >> 
        >>   Chris McDermott
        >> 
        >>   Remote Business Analytics
        >>   STaTS/StoreFront Remote
        >>   HPE Storage
        >>   Hewlett Packard Enterprise
        >>   Mobile: +1 978-697-5315
        >> 
        >> 
        >> 
        >>   On 8/20/16, 6:52 PM, "Joe Witt" <jo...@gmail.com> wrote:
        >> 
        >>       If no jira is raised sooner I'll raise one and get it sorted.
        >> 
        >>       On Aug 20, 2016 6:40 PM, "Andrew Psaltis" <ps...@gmail.com> wrote:
        >> 
        >>> Hi Chris,
        >>> Sorry for not catching that code path. I am not sure if it is actually a
        >>> regression as I took a look at the 1.0.0-BETA code and it matches the
        >>> 0.7.0, specifically this comment block:
        >>> 
        >>> /*
        >>> * We're using the default value from Kafka. We are using it to control the
        >>> * message size before it goes to to Kafka thus limiting possibility of a
        >>> * late failures in Kafka client.
        >>> */
        >>> 
        >>> found at[1] leads me to believe it was intentional and not a regression.
        >>> Looking at the 0.6.1 release code it appears that PutKafka used a default
        >>> of 5 MB [2].
        >>> 
        >>> I can speculate on the reasoning behind it, however, I will refrain from
        >>> opining on it as I was not involved in any of the conversations related to
        >>> the change and enforcement of the 1 MB max.
        >>> 
        >>> [1]
        >>> https://github.com/apache/nifi/blob/rel/nifi-1.0.0-BETA-
        >>> official/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-
        >>> processors/src/main/java/org/apache/nifi/processors/kafka/
        >>> PublishingContext.java#L36-L41
        >>> [2]
        >>> https://github.com/apache/nifi/blob/rel/nifi-0.6.1/nifi-
        >>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
        >>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L169-L176
        >>> 
        >>> Thanks,
        >>> Andrew
        >>> 
        >>> On Sat, Aug 20, 2016 at 6:09 PM, McDermott, Chris Kevin (MSDU -
        >>> STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
        >>> 
        >>>> Thanks, Andrew.
        >>>> 
        >>>> I’ve set all of the right broker configs to allow larger messages.
        >>>> Believe me I spent a lot of time banging my head against the wall
        >>> thinking
        >>>> that the broker and topic configs were wrong.
        >>>> 
        >>>> PublisingKafka uses PublishingContext.  That class has bean property
        >>>> called maxRequestSize, which defaults to 1048576.  As far as I can tell
        >>> the
        >>>> setMaxRequestSize() method is never called (except by some test code.)
        >>>> KafkaPublisher.publish() calls Max Record Size.getMaxRequestSize() and
        >>>> passes the result to the constructor for StreamDemarcator.   The publish
        >>>> method then calls the StreamDemarcator. getNextToken(), which in turns
        >>>> calls StreamDemarcator.fill() which compares the stream position against
        >>>> the maxRequestSize and throws the exception with this line.
        >>>> 
        >>>> throw new IllegalStateException("Maximum allowed data size of " +
        >>>> this.maxDataSize + " exceeded.");
        >>>> 
        >>>> Which matches what I see in the nifi-app.log file…
        >>>> 
        >>>> 2016-08-20 22:03:05,470 ERROR [Timer-Driven Process Thread-8]
        >>>> o.apache.nifi.processors.kafka.PutKafka
        >>>> java.lang.IllegalStateException: Maximum allowed data size of 1048576
        >>>> exceeded.
        >>>>       at org.apache.nifi.stream.io.util.StreamDemarcator.fill(
        >>> StreamDemarcator.java:153)
        >>>> ~[nifi-utils-0.7.0.jar:0.7.0]
        >>>>       at org.apache.nifi.stream.io.util.StreamDemarcator.
        >>>> nextToken(StreamDemarcator.java:105) ~[nifi-utils-0.7.0.jar:0.7.0]
        >>>>       at org.apache.nifi.processors.kafka.KafkaPublisher.publish(
        >>> KafkaPublisher.java:129)
        >>>> ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
        >>>>       at org.apache.nifi.processors.kafka.PutKafka$1.process(
        >>> PutKafka.java:315)
        >>>> ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
        >>>>       at org.apache.nifi.controller.repository.
        >>>> StandardProcessSession.read(StandardProcessSession.java:1851)
        >>>> ~[nifi-framework-core-0.7.0.jar:0.7.0]
        >>>>       at org.apache.nifi.controller.repository.
        >>>> StandardProcessSession.read(StandardProcessSession.java:1822)
        >>>> ~[nifi-framework-core-0.7.0.jar:0.7.0]
        >>>>       at org.apache.nifi.processors.kafka.PutKafka.
        >>>> doRendezvousWithKafka(PutKafka.java:311) ~[nifi-kafka-processors-0.7.0.
        >>>> jar:0.7.0]
        >>>>       at org.apache.nifi.processors.kafka.PutKafka.
        >>>> rendezvousWithKafka(PutKafka.java:287) ~[nifi-kafka-processors-0.7.0.
        >>>> jar:0.7.0]
        >>>>       at org.apache.nifi.processors.kafka.AbstractKafkaProcessor.
        >>>> onTrigger(AbstractKafkaProcessor.java:76) ~[nifi-kafka-processors-0.7.0.
        >>>> jar:0.7.0]
        >>>>       at org.apache.nifi.controller.StandardProcessorNode.onTrigger(
        >>>> StandardProcessorNode.java:1054) [nifi-framework-core-0.7.0.jar:0.7.0]
        >>>>       at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
        >>>> call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0.
        >>>> jar:0.7.0]
        >>>>       at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
        >>>> call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0.
        >>>> jar:0.7.0]
        >>>>       at org.apache.nifi.controller.scheduling.
        >>>> TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:127)
        >>>> [nifi-framework-core-0.7.0.jar:0.7.0]
        >>>>       at java.util.concurrent.Executors$RunnableAdapter.
        >>> call(Executors.java:511)
        >>>> [na:1.8.0_45]
        >>>>       at java.util.concurrent.FutureTask.runAndReset(
        >>> FutureTask.java:308)
        >>>> [na:1.8.0_45]
        >>>>       at java.util.concurrent.ScheduledThreadPoolExecutor$
        >>>> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        >>>> [na:1.8.0_45]
        >>>>       at java.util.concurrent.ScheduledThreadPoolExecutor$
        >>>> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        >>>> [na:1.8.0_45]
        >>>>       at java.util.concurrent.ThreadPoolExecutor.runWorker(
        >>> ThreadPoolExecutor.java:1142)
        >>>> [na:1.8.0_45]
        >>>>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(
        >>> ThreadPoolExecutor.java:617)
        >>>> [na:1.8.0_45]
        >>>>       at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
        >>>> 
        >>>> This occurs using PublishKafka, and PutKafka.  Setting the Max Record
        >>> Size
        >>>> property in the PutKafka processor has no affect on this.  Note the stack
        >>>> trace above is from the PutKafka processor with Max Record Size set to
        >>> 10MB.
        >>>> 
        >>>> I believe that this a regression from 0.6.0.
        >>>> 
        >>>> Chris McDermott
        >>>> 
        >>>> Remote Business Analytics
        >>>> STaTS/StoreFront Remote
        >>>> HPE Storage
        >>>> Hewlett Packard Enterprise
        >>>> Mobile: +1 978-697-5315
        >>>> 
        >>>> 
        >>>> 
        >>>> On 8/20/16, 3:48 PM, "Andrew Psaltis" <ps...@gmail.com> wrote:
        >>>> 
        >>>>   Hi Chris,
        >>>>   Regarding the PutKafka processor looking at this block[1] of the
        >>>> PutKafka
        >>>>   code, it has a default size of 1 MB, but it does not restrict the
        >>>> size. The
        >>>>   DATA_SIZE_VALIDATOR does a sanity check and also enforces that
        >>>>   the supported value entered is the correct format <value> [B|
        >>>> KB|MB|GB|TB].
        >>>>   Later on in the code at this block[2], the value is set on the Kafka
        >>>>   config, again this does not enforce a value maximum.
        >>>> 
        >>>>   In regards to the PublishKafka processor I do not see where it
        >>> accepts
        >>>> a
        >>>>   size nor restrict the size at all.
        >>>> 
        >>>>   Have you adjusted the 'message.max.bytes' config value for your
        >>>> broker(s)?
        >>>>   The default value for that is 1 MB [3] (The url references the 0.8
        >>>> Kafka,
        >>>>   however I believe this default has been stable since the early days
        >>> of
        >>>> the
        >>>>   project.)
        >>>> 
        >>>>   If you really do want to send messages that are larger than 1 MB in
        >>>> size, I
        >>>>   would highly recommending reading this post[4] from Gwen Shapira.  It
        >>>> does
        >>>>   a great job of outlining the things you need to take into
        >>>> consideration.
        >>>>   This will also point you to the relevant configs in Kafka that will
        >>>> need to
        >>>>   be adjusted if you decide to go this route.
        >>>> 
        >>>> 
        >>>>   Thanks,
        >>>>   Andrew
        >>>> 
        >>>>   [1]
        >>>>   https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
        >>>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
        >>>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L174-L180
        >>>>   [2]
        >>>>   https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
        >>>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
        >>>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L495
        >>>>   [3] https://kafka.apache.org/08/configuration.html
        >>>>   [4] http://ingest.tips/2015/01/21/handling-large-messages-kafka/
        >>>> 
        >>>>   On Sat, Aug 20, 2016 at 3:25 PM, McDermott, Chris Kevin (MSDU -
        >>>>   STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
        >>>> 
        >>>>> Hi folks,
        >>>>> 
        >>>>> 
        >>>>> 
        >>>>> From experimentation and looking at the code it seems that the max
        >>>> message
        >>>>> size that can be sent via the PublishKafka and PutKafka processors
        >>>> in 0.7.0
        >>>>> is 1MB.  Can someone please confirm my read on this?
        >>>>> 
        >>>>> 
        >>>>> 
        >>>>> Thanks,
        >>>>> 
        >>>>> 
        >>>>> 
        >>>>> Chris McDermott
        >>>>> 
        >>>>> 
        >>>>> 
        >>>>> Remote Business Analytics
        >>>>> 
        >>>>> STaTS/StoreFront Remote
        >>>>> 
        >>>>> HPE Storage
        >>>>> 
        >>>>> Hewlett Packard Enterprise
        >>>>> 
        >>>>> Mobile: +1 978-697-5315
        >>>>> 
        >>>>> 
        >>>>> 
        >>>>> 
        >>>> 
        >>>> 
        >>>>   --
        >>>>   Thanks,
        >>>>   Andrew
        >>>> 
        >>>>   Subscribe to my book: Streaming Data <http://manning.com/psaltis>
        >>>>   <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
        >>>>   twiiter: @itmdata <http://twitter.com/intent/
        >>> user?screen_name=itmdata>
        >>>> 
        >>>> 
        >>>> 
        >>> 
        >>> 
        >>> --
        >>> Thanks,
        >>> Andrew
        >>> 
        >>> Subscribe to my book: Streaming Data <http://manning.com/psaltis>
        >>> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
        >>> twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata>
        >>> 
        >> 
        >> 
        >> 
        >> 
        > 
        > 
        > 
        
        
    
    


Re: Max Kafka message size

Posted by "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <ch...@hpe.com>.
I wanted to follow up on this as I think it could help others.

Oleg, I created a patch for PutKafka/PublishKafka that removes the limit on the size of the message which can be sent.  I’d don’t know that it’s what you want to solve the JIRA, but I’m thinking not.  I was just an expediency for me.  However, if you are interested, let me know.

I discovered that GetKafka/ConsumeKafka can read messages > 1MiB if you add a dynamic property to the processor.  fetch.message.max.bytes needs to be set for GetKafka and max.partition.fetch.bytes needs to be set for consume Kafka.

Cheers,

Chris McDermott
 
Remote Business Analytics
STaTS/StoreFront Remote
HPE Storage
Hewlett Packard Enterprise
Mobile: +1 978-697-5315
 


On 8/21/16, 6:45 PM, "Oleg Zhurakousky" <oz...@hortonworks.com> wrote:

    Chris
    
    The “. . . querying the max.message.bytes. . .” is exactly what I had in mind. As you mentioned earlier, that is where the default value came from in the first place. So, yes that is what we’re going to link together.
    
    Cheers
    Oleg
    
    > On Aug 21, 2016, at 6:15 PM, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
    > 
    > Thanks for getting back, Oleg. I’d be happy to send demarcated messages; the problem is reassembling them at the other end of the pipe.  I’ve done a lot of searching for techniques to do this.  They all seem to have major draw backs in terms of reliable message delivery or in terms of garbage collection (not in the sense of Java GC, but cleanup of files bounced off of a shared file system.) The nice thing about Kafka is its atomic, it has replicated delivery, and guaranteed GC semantics.   My use case has fairly low throughput requirements (thousands, not millions of TPM) where most messages are fairly small but a few are larger.
    > 
    > It would be nice if the Kafka client could learn the max message size from Kafka itself by querying the max.message.bytes on the topic, rather than have the flow designer be required to set it on the producer Processors.  For now, though I’d be happy going back to the old behavior where its set on the producer Processors.  On the flip side I am also concerned that the clients (GetKafka and ConsumeKafka) do not expose a max message parameter.  That will be equally problematic.
    > 
    > Anyway, enough of my blathering.  
    > 
    > Yours and the communities help is greatly appreciated.
    > 
    > Thanks,
    > 
    > Chris McDermott
    > 
    > Remote Business Analytics
    > STaTS/StoreFront Remote
    > HPE Storage
    > Hewlett Packard Enterprise
    > Mobile: +1 978-697-5315
    > 
    > 
    > 
    > On 8/21/16, 3:43 PM, "Oleg Zhurakousky" <oz...@hortonworks.com> wrote:
    > 
    >    Chris
    > 
    >    Sorry for getting late to this
    > 
    >    This indeed was intentional, primarily to follow Kafka’s best practices where Kafka was not designed to be a general data transfer system of arbitrary size but rather “manageable” size. Also, as you know we have ability to demarcate messages essentially allowing you to send a very large FlowFile that will be parsed into chunks where each chunk will end up to be a separate Kafka message.
    >    That said, we did consider that at some point we may expose it as configuration property and it seems like the time has come.
    > 
    >    Cheers
    >    Oleg
    > 
    >> On Aug 20, 2016, at 7:03 PM, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
    >> 
    >> Jira is https://issues.apache.org/jira/browse/NIFI-2614.
    >> 
    >> Thanks,
    >> 
    >> Chris McDermott
    >> 
    >> Remote Business Analytics
    >> STaTS/StoreFront Remote
    >> HPE Storage
    >> Hewlett Packard Enterprise
    >> Mobile: +1 978-697-5315
    >> 
    >> 
    >> 
    >> On 8/20/16, 6:57 PM, "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <ch...@hpe.com> wrote:
    >> 
    >>   I’ll raise a JIRA, Joe.
    >> 
    >>   Thanks,
    >> 
    >>   Chris McDermott
    >> 
    >>   Remote Business Analytics
    >>   STaTS/StoreFront Remote
    >>   HPE Storage
    >>   Hewlett Packard Enterprise
    >>   Mobile: +1 978-697-5315
    >> 
    >> 
    >> 
    >>   On 8/20/16, 6:52 PM, "Joe Witt" <jo...@gmail.com> wrote:
    >> 
    >>       If no jira is raised sooner I'll raise one and get it sorted.
    >> 
    >>       On Aug 20, 2016 6:40 PM, "Andrew Psaltis" <ps...@gmail.com> wrote:
    >> 
    >>> Hi Chris,
    >>> Sorry for not catching that code path. I am not sure if it is actually a
    >>> regression as I took a look at the 1.0.0-BETA code and it matches the
    >>> 0.7.0, specifically this comment block:
    >>> 
    >>> /*
    >>> * We're using the default value from Kafka. We are using it to control the
    >>> * message size before it goes to to Kafka thus limiting possibility of a
    >>> * late failures in Kafka client.
    >>> */
    >>> 
    >>> found at[1] leads me to believe it was intentional and not a regression.
    >>> Looking at the 0.6.1 release code it appears that PutKafka used a default
    >>> of 5 MB [2].
    >>> 
    >>> I can speculate on the reasoning behind it, however, I will refrain from
    >>> opining on it as I was not involved in any of the conversations related to
    >>> the change and enforcement of the 1 MB max.
    >>> 
    >>> [1]
    >>> https://github.com/apache/nifi/blob/rel/nifi-1.0.0-BETA-
    >>> official/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-
    >>> processors/src/main/java/org/apache/nifi/processors/kafka/
    >>> PublishingContext.java#L36-L41
    >>> [2]
    >>> https://github.com/apache/nifi/blob/rel/nifi-0.6.1/nifi-
    >>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
    >>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L169-L176
    >>> 
    >>> Thanks,
    >>> Andrew
    >>> 
    >>> On Sat, Aug 20, 2016 at 6:09 PM, McDermott, Chris Kevin (MSDU -
    >>> STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
    >>> 
    >>>> Thanks, Andrew.
    >>>> 
    >>>> I’ve set all of the right broker configs to allow larger messages.
    >>>> Believe me I spent a lot of time banging my head against the wall
    >>> thinking
    >>>> that the broker and topic configs were wrong.
    >>>> 
    >>>> PublisingKafka uses PublishingContext.  That class has bean property
    >>>> called maxRequestSize, which defaults to 1048576.  As far as I can tell
    >>> the
    >>>> setMaxRequestSize() method is never called (except by some test code.)
    >>>> KafkaPublisher.publish() calls Max Record Size.getMaxRequestSize() and
    >>>> passes the result to the constructor for StreamDemarcator.   The publish
    >>>> method then calls the StreamDemarcator. getNextToken(), which in turns
    >>>> calls StreamDemarcator.fill() which compares the stream position against
    >>>> the maxRequestSize and throws the exception with this line.
    >>>> 
    >>>> throw new IllegalStateException("Maximum allowed data size of " +
    >>>> this.maxDataSize + " exceeded.");
    >>>> 
    >>>> Which matches what I see in the nifi-app.log file…
    >>>> 
    >>>> 2016-08-20 22:03:05,470 ERROR [Timer-Driven Process Thread-8]
    >>>> o.apache.nifi.processors.kafka.PutKafka
    >>>> java.lang.IllegalStateException: Maximum allowed data size of 1048576
    >>>> exceeded.
    >>>>       at org.apache.nifi.stream.io.util.StreamDemarcator.fill(
    >>> StreamDemarcator.java:153)
    >>>> ~[nifi-utils-0.7.0.jar:0.7.0]
    >>>>       at org.apache.nifi.stream.io.util.StreamDemarcator.
    >>>> nextToken(StreamDemarcator.java:105) ~[nifi-utils-0.7.0.jar:0.7.0]
    >>>>       at org.apache.nifi.processors.kafka.KafkaPublisher.publish(
    >>> KafkaPublisher.java:129)
    >>>> ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
    >>>>       at org.apache.nifi.processors.kafka.PutKafka$1.process(
    >>> PutKafka.java:315)
    >>>> ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
    >>>>       at org.apache.nifi.controller.repository.
    >>>> StandardProcessSession.read(StandardProcessSession.java:1851)
    >>>> ~[nifi-framework-core-0.7.0.jar:0.7.0]
    >>>>       at org.apache.nifi.controller.repository.
    >>>> StandardProcessSession.read(StandardProcessSession.java:1822)
    >>>> ~[nifi-framework-core-0.7.0.jar:0.7.0]
    >>>>       at org.apache.nifi.processors.kafka.PutKafka.
    >>>> doRendezvousWithKafka(PutKafka.java:311) ~[nifi-kafka-processors-0.7.0.
    >>>> jar:0.7.0]
    >>>>       at org.apache.nifi.processors.kafka.PutKafka.
    >>>> rendezvousWithKafka(PutKafka.java:287) ~[nifi-kafka-processors-0.7.0.
    >>>> jar:0.7.0]
    >>>>       at org.apache.nifi.processors.kafka.AbstractKafkaProcessor.
    >>>> onTrigger(AbstractKafkaProcessor.java:76) ~[nifi-kafka-processors-0.7.0.
    >>>> jar:0.7.0]
    >>>>       at org.apache.nifi.controller.StandardProcessorNode.onTrigger(
    >>>> StandardProcessorNode.java:1054) [nifi-framework-core-0.7.0.jar:0.7.0]
    >>>>       at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
    >>>> call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0.
    >>>> jar:0.7.0]
    >>>>       at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
    >>>> call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0.
    >>>> jar:0.7.0]
    >>>>       at org.apache.nifi.controller.scheduling.
    >>>> TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:127)
    >>>> [nifi-framework-core-0.7.0.jar:0.7.0]
    >>>>       at java.util.concurrent.Executors$RunnableAdapter.
    >>> call(Executors.java:511)
    >>>> [na:1.8.0_45]
    >>>>       at java.util.concurrent.FutureTask.runAndReset(
    >>> FutureTask.java:308)
    >>>> [na:1.8.0_45]
    >>>>       at java.util.concurrent.ScheduledThreadPoolExecutor$
    >>>> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    >>>> [na:1.8.0_45]
    >>>>       at java.util.concurrent.ScheduledThreadPoolExecutor$
    >>>> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    >>>> [na:1.8.0_45]
    >>>>       at java.util.concurrent.ThreadPoolExecutor.runWorker(
    >>> ThreadPoolExecutor.java:1142)
    >>>> [na:1.8.0_45]
    >>>>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(
    >>> ThreadPoolExecutor.java:617)
    >>>> [na:1.8.0_45]
    >>>>       at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
    >>>> 
    >>>> This occurs using PublishKafka, and PutKafka.  Setting the Max Record
    >>> Size
    >>>> property in the PutKafka processor has no affect on this.  Note the stack
    >>>> trace above is from the PutKafka processor with Max Record Size set to
    >>> 10MB.
    >>>> 
    >>>> I believe that this a regression from 0.6.0.
    >>>> 
    >>>> Chris McDermott
    >>>> 
    >>>> Remote Business Analytics
    >>>> STaTS/StoreFront Remote
    >>>> HPE Storage
    >>>> Hewlett Packard Enterprise
    >>>> Mobile: +1 978-697-5315
    >>>> 
    >>>> 
    >>>> 
    >>>> On 8/20/16, 3:48 PM, "Andrew Psaltis" <ps...@gmail.com> wrote:
    >>>> 
    >>>>   Hi Chris,
    >>>>   Regarding the PutKafka processor looking at this block[1] of the
    >>>> PutKafka
    >>>>   code, it has a default size of 1 MB, but it does not restrict the
    >>>> size. The
    >>>>   DATA_SIZE_VALIDATOR does a sanity check and also enforces that
    >>>>   the supported value entered is the correct format <value> [B|
    >>>> KB|MB|GB|TB].
    >>>>   Later on in the code at this block[2], the value is set on the Kafka
    >>>>   config, again this does not enforce a value maximum.
    >>>> 
    >>>>   In regards to the PublishKafka processor I do not see where it
    >>> accepts
    >>>> a
    >>>>   size nor restrict the size at all.
    >>>> 
    >>>>   Have you adjusted the 'message.max.bytes' config value for your
    >>>> broker(s)?
    >>>>   The default value for that is 1 MB [3] (The url references the 0.8
    >>>> Kafka,
    >>>>   however I believe this default has been stable since the early days
    >>> of
    >>>> the
    >>>>   project.)
    >>>> 
    >>>>   If you really do want to send messages that are larger than 1 MB in
    >>>> size, I
    >>>>   would highly recommending reading this post[4] from Gwen Shapira.  It
    >>>> does
    >>>>   a great job of outlining the things you need to take into
    >>>> consideration.
    >>>>   This will also point you to the relevant configs in Kafka that will
    >>>> need to
    >>>>   be adjusted if you decide to go this route.
    >>>> 
    >>>> 
    >>>>   Thanks,
    >>>>   Andrew
    >>>> 
    >>>>   [1]
    >>>>   https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
    >>>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
    >>>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L174-L180
    >>>>   [2]
    >>>>   https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
    >>>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
    >>>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L495
    >>>>   [3] https://kafka.apache.org/08/configuration.html
    >>>>   [4] http://ingest.tips/2015/01/21/handling-large-messages-kafka/
    >>>> 
    >>>>   On Sat, Aug 20, 2016 at 3:25 PM, McDermott, Chris Kevin (MSDU -
    >>>>   STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
    >>>> 
    >>>>> Hi folks,
    >>>>> 
    >>>>> 
    >>>>> 
    >>>>> From experimentation and looking at the code it seems that the max
    >>>> message
    >>>>> size that can be sent via the PublishKafka and PutKafka processors
    >>>> in 0.7.0
    >>>>> is 1MB.  Can someone please confirm my read on this?
    >>>>> 
    >>>>> 
    >>>>> 
    >>>>> Thanks,
    >>>>> 
    >>>>> 
    >>>>> 
    >>>>> Chris McDermott
    >>>>> 
    >>>>> 
    >>>>> 
    >>>>> Remote Business Analytics
    >>>>> 
    >>>>> STaTS/StoreFront Remote
    >>>>> 
    >>>>> HPE Storage
    >>>>> 
    >>>>> Hewlett Packard Enterprise
    >>>>> 
    >>>>> Mobile: +1 978-697-5315
    >>>>> 
    >>>>> 
    >>>>> 
    >>>>> 
    >>>> 
    >>>> 
    >>>>   --
    >>>>   Thanks,
    >>>>   Andrew
    >>>> 
    >>>>   Subscribe to my book: Streaming Data <http://manning.com/psaltis>
    >>>>   <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
    >>>>   twiiter: @itmdata <http://twitter.com/intent/
    >>> user?screen_name=itmdata>
    >>>> 
    >>>> 
    >>>> 
    >>> 
    >>> 
    >>> --
    >>> Thanks,
    >>> Andrew
    >>> 
    >>> Subscribe to my book: Streaming Data <http://manning.com/psaltis>
    >>> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
    >>> twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata>
    >>> 
    >> 
    >> 
    >> 
    >> 
    > 
    > 
    > 
    
    


Re: Max Kafka message size

Posted by Oleg Zhurakousky <oz...@hortonworks.com>.
Chris

The “. . . querying the max.message.bytes. . .” is exactly what I had in mind. As you mentioned earlier, that is where the default value came from in the first place. So, yes that is what we’re going to link together.

Cheers
Oleg

> On Aug 21, 2016, at 6:15 PM, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
> 
> Thanks for getting back, Oleg. I’d be happy to send demarcated messages; the problem is reassembling them at the other end of the pipe.  I’ve done a lot of searching for techniques to do this.  They all seem to have major draw backs in terms of reliable message delivery or in terms of garbage collection (not in the sense of Java GC, but cleanup of files bounced off of a shared file system.) The nice thing about Kafka is its atomic, it has replicated delivery, and guaranteed GC semantics.   My use case has fairly low throughput requirements (thousands, not millions of TPM) where most messages are fairly small but a few are larger.
> 
> It would be nice if the Kafka client could learn the max message size from Kafka itself by querying the max.message.bytes on the topic, rather than have the flow designer be required to set it on the producer Processors.  For now, though I’d be happy going back to the old behavior where its set on the producer Processors.  On the flip side I am also concerned that the clients (GetKafka and ConsumeKafka) do not expose a max message parameter.  That will be equally problematic.
> 
> Anyway, enough of my blathering.  
> 
> Yours and the communities help is greatly appreciated.
> 
> Thanks,
> 
> Chris McDermott
> 
> Remote Business Analytics
> STaTS/StoreFront Remote
> HPE Storage
> Hewlett Packard Enterprise
> Mobile: +1 978-697-5315
> 
> 
> 
> On 8/21/16, 3:43 PM, "Oleg Zhurakousky" <oz...@hortonworks.com> wrote:
> 
>    Chris
> 
>    Sorry for getting late to this
> 
>    This indeed was intentional, primarily to follow Kafka’s best practices where Kafka was not designed to be a general data transfer system of arbitrary size but rather “manageable” size. Also, as you know we have ability to demarcate messages essentially allowing you to send a very large FlowFile that will be parsed into chunks where each chunk will end up to be a separate Kafka message.
>    That said, we did consider that at some point we may expose it as configuration property and it seems like the time has come.
> 
>    Cheers
>    Oleg
> 
>> On Aug 20, 2016, at 7:03 PM, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
>> 
>> Jira is https://issues.apache.org/jira/browse/NIFI-2614.
>> 
>> Thanks,
>> 
>> Chris McDermott
>> 
>> Remote Business Analytics
>> STaTS/StoreFront Remote
>> HPE Storage
>> Hewlett Packard Enterprise
>> Mobile: +1 978-697-5315
>> 
>> 
>> 
>> On 8/20/16, 6:57 PM, "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <ch...@hpe.com> wrote:
>> 
>>   I’ll raise a JIRA, Joe.
>> 
>>   Thanks,
>> 
>>   Chris McDermott
>> 
>>   Remote Business Analytics
>>   STaTS/StoreFront Remote
>>   HPE Storage
>>   Hewlett Packard Enterprise
>>   Mobile: +1 978-697-5315
>> 
>> 
>> 
>>   On 8/20/16, 6:52 PM, "Joe Witt" <jo...@gmail.com> wrote:
>> 
>>       If no jira is raised sooner I'll raise one and get it sorted.
>> 
>>       On Aug 20, 2016 6:40 PM, "Andrew Psaltis" <ps...@gmail.com> wrote:
>> 
>>> Hi Chris,
>>> Sorry for not catching that code path. I am not sure if it is actually a
>>> regression as I took a look at the 1.0.0-BETA code and it matches the
>>> 0.7.0, specifically this comment block:
>>> 
>>> /*
>>> * We're using the default value from Kafka. We are using it to control the
>>> * message size before it goes to to Kafka thus limiting possibility of a
>>> * late failures in Kafka client.
>>> */
>>> 
>>> found at[1] leads me to believe it was intentional and not a regression.
>>> Looking at the 0.6.1 release code it appears that PutKafka used a default
>>> of 5 MB [2].
>>> 
>>> I can speculate on the reasoning behind it, however, I will refrain from
>>> opining on it as I was not involved in any of the conversations related to
>>> the change and enforcement of the 1 MB max.
>>> 
>>> [1]
>>> https://github.com/apache/nifi/blob/rel/nifi-1.0.0-BETA-
>>> official/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-
>>> processors/src/main/java/org/apache/nifi/processors/kafka/
>>> PublishingContext.java#L36-L41
>>> [2]
>>> https://github.com/apache/nifi/blob/rel/nifi-0.6.1/nifi-
>>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
>>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L169-L176
>>> 
>>> Thanks,
>>> Andrew
>>> 
>>> On Sat, Aug 20, 2016 at 6:09 PM, McDermott, Chris Kevin (MSDU -
>>> STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
>>> 
>>>> Thanks, Andrew.
>>>> 
>>>> I’ve set all of the right broker configs to allow larger messages.
>>>> Believe me I spent a lot of time banging my head against the wall
>>> thinking
>>>> that the broker and topic configs were wrong.
>>>> 
>>>> PublisingKafka uses PublishingContext.  That class has bean property
>>>> called maxRequestSize, which defaults to 1048576.  As far as I can tell
>>> the
>>>> setMaxRequestSize() method is never called (except by some test code.)
>>>> KafkaPublisher.publish() calls Max Record Size.getMaxRequestSize() and
>>>> passes the result to the constructor for StreamDemarcator.   The publish
>>>> method then calls the StreamDemarcator. getNextToken(), which in turns
>>>> calls StreamDemarcator.fill() which compares the stream position against
>>>> the maxRequestSize and throws the exception with this line.
>>>> 
>>>> throw new IllegalStateException("Maximum allowed data size of " +
>>>> this.maxDataSize + " exceeded.");
>>>> 
>>>> Which matches what I see in the nifi-app.log file…
>>>> 
>>>> 2016-08-20 22:03:05,470 ERROR [Timer-Driven Process Thread-8]
>>>> o.apache.nifi.processors.kafka.PutKafka
>>>> java.lang.IllegalStateException: Maximum allowed data size of 1048576
>>>> exceeded.
>>>>       at org.apache.nifi.stream.io.util.StreamDemarcator.fill(
>>> StreamDemarcator.java:153)
>>>> ~[nifi-utils-0.7.0.jar:0.7.0]
>>>>       at org.apache.nifi.stream.io.util.StreamDemarcator.
>>>> nextToken(StreamDemarcator.java:105) ~[nifi-utils-0.7.0.jar:0.7.0]
>>>>       at org.apache.nifi.processors.kafka.KafkaPublisher.publish(
>>> KafkaPublisher.java:129)
>>>> ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
>>>>       at org.apache.nifi.processors.kafka.PutKafka$1.process(
>>> PutKafka.java:315)
>>>> ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
>>>>       at org.apache.nifi.controller.repository.
>>>> StandardProcessSession.read(StandardProcessSession.java:1851)
>>>> ~[nifi-framework-core-0.7.0.jar:0.7.0]
>>>>       at org.apache.nifi.controller.repository.
>>>> StandardProcessSession.read(StandardProcessSession.java:1822)
>>>> ~[nifi-framework-core-0.7.0.jar:0.7.0]
>>>>       at org.apache.nifi.processors.kafka.PutKafka.
>>>> doRendezvousWithKafka(PutKafka.java:311) ~[nifi-kafka-processors-0.7.0.
>>>> jar:0.7.0]
>>>>       at org.apache.nifi.processors.kafka.PutKafka.
>>>> rendezvousWithKafka(PutKafka.java:287) ~[nifi-kafka-processors-0.7.0.
>>>> jar:0.7.0]
>>>>       at org.apache.nifi.processors.kafka.AbstractKafkaProcessor.
>>>> onTrigger(AbstractKafkaProcessor.java:76) ~[nifi-kafka-processors-0.7.0.
>>>> jar:0.7.0]
>>>>       at org.apache.nifi.controller.StandardProcessorNode.onTrigger(
>>>> StandardProcessorNode.java:1054) [nifi-framework-core-0.7.0.jar:0.7.0]
>>>>       at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
>>>> call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0.
>>>> jar:0.7.0]
>>>>       at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
>>>> call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0.
>>>> jar:0.7.0]
>>>>       at org.apache.nifi.controller.scheduling.
>>>> TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:127)
>>>> [nifi-framework-core-0.7.0.jar:0.7.0]
>>>>       at java.util.concurrent.Executors$RunnableAdapter.
>>> call(Executors.java:511)
>>>> [na:1.8.0_45]
>>>>       at java.util.concurrent.FutureTask.runAndReset(
>>> FutureTask.java:308)
>>>> [na:1.8.0_45]
>>>>       at java.util.concurrent.ScheduledThreadPoolExecutor$
>>>> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>> [na:1.8.0_45]
>>>>       at java.util.concurrent.ScheduledThreadPoolExecutor$
>>>> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>> [na:1.8.0_45]
>>>>       at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>> ThreadPoolExecutor.java:1142)
>>>> [na:1.8.0_45]
>>>>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>> ThreadPoolExecutor.java:617)
>>>> [na:1.8.0_45]
>>>>       at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
>>>> 
>>>> This occurs using PublishKafka, and PutKafka.  Setting the Max Record
>>> Size
>>>> property in the PutKafka processor has no affect on this.  Note the stack
>>>> trace above is from the PutKafka processor with Max Record Size set to
>>> 10MB.
>>>> 
>>>> I believe that this a regression from 0.6.0.
>>>> 
>>>> Chris McDermott
>>>> 
>>>> Remote Business Analytics
>>>> STaTS/StoreFront Remote
>>>> HPE Storage
>>>> Hewlett Packard Enterprise
>>>> Mobile: +1 978-697-5315
>>>> 
>>>> 
>>>> 
>>>> On 8/20/16, 3:48 PM, "Andrew Psaltis" <ps...@gmail.com> wrote:
>>>> 
>>>>   Hi Chris,
>>>>   Regarding the PutKafka processor looking at this block[1] of the
>>>> PutKafka
>>>>   code, it has a default size of 1 MB, but it does not restrict the
>>>> size. The
>>>>   DATA_SIZE_VALIDATOR does a sanity check and also enforces that
>>>>   the supported value entered is the correct format <value> [B|
>>>> KB|MB|GB|TB].
>>>>   Later on in the code at this block[2], the value is set on the Kafka
>>>>   config, again this does not enforce a value maximum.
>>>> 
>>>>   In regards to the PublishKafka processor I do not see where it
>>> accepts
>>>> a
>>>>   size nor restrict the size at all.
>>>> 
>>>>   Have you adjusted the 'message.max.bytes' config value for your
>>>> broker(s)?
>>>>   The default value for that is 1 MB [3] (The url references the 0.8
>>>> Kafka,
>>>>   however I believe this default has been stable since the early days
>>> of
>>>> the
>>>>   project.)
>>>> 
>>>>   If you really do want to send messages that are larger than 1 MB in
>>>> size, I
>>>>   would highly recommending reading this post[4] from Gwen Shapira.  It
>>>> does
>>>>   a great job of outlining the things you need to take into
>>>> consideration.
>>>>   This will also point you to the relevant configs in Kafka that will
>>>> need to
>>>>   be adjusted if you decide to go this route.
>>>> 
>>>> 
>>>>   Thanks,
>>>>   Andrew
>>>> 
>>>>   [1]
>>>>   https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
>>>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
>>>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L174-L180
>>>>   [2]
>>>>   https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
>>>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
>>>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L495
>>>>   [3] https://kafka.apache.org/08/configuration.html
>>>>   [4] http://ingest.tips/2015/01/21/handling-large-messages-kafka/
>>>> 
>>>>   On Sat, Aug 20, 2016 at 3:25 PM, McDermott, Chris Kevin (MSDU -
>>>>   STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
>>>> 
>>>>> Hi folks,
>>>>> 
>>>>> 
>>>>> 
>>>>> From experimentation and looking at the code it seems that the max
>>>> message
>>>>> size that can be sent via the PublishKafka and PutKafka processors
>>>> in 0.7.0
>>>>> is 1MB.  Can someone please confirm my read on this?
>>>>> 
>>>>> 
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> 
>>>>> 
>>>>> Chris McDermott
>>>>> 
>>>>> 
>>>>> 
>>>>> Remote Business Analytics
>>>>> 
>>>>> STaTS/StoreFront Remote
>>>>> 
>>>>> HPE Storage
>>>>> 
>>>>> Hewlett Packard Enterprise
>>>>> 
>>>>> Mobile: +1 978-697-5315
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>> 
>>>> 
>>>>   --
>>>>   Thanks,
>>>>   Andrew
>>>> 
>>>>   Subscribe to my book: Streaming Data <http://manning.com/psaltis>
>>>>   <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
>>>>   twiiter: @itmdata <http://twitter.com/intent/
>>> user?screen_name=itmdata>
>>>> 
>>>> 
>>>> 
>>> 
>>> 
>>> --
>>> Thanks,
>>> Andrew
>>> 
>>> Subscribe to my book: Streaming Data <http://manning.com/psaltis>
>>> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
>>> twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata>
>>> 
>> 
>> 
>> 
>> 
> 
> 
> 


Re: Max Kafka message size

Posted by "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <ch...@hpe.com>.
Thanks for getting back, Oleg. I’d be happy to send demarcated messages; the problem is reassembling them at the other end of the pipe.  I’ve done a lot of searching for techniques to do this.  They all seem to have major draw backs in terms of reliable message delivery or in terms of garbage collection (not in the sense of Java GC, but cleanup of files bounced off of a shared file system.) The nice thing about Kafka is its atomic, it has replicated delivery, and guaranteed GC semantics.   My use case has fairly low throughput requirements (thousands, not millions of TPM) where most messages are fairly small but a few are larger.

It would be nice if the Kafka client could learn the max message size from Kafka itself by querying the max.message.bytes on the topic, rather than have the flow designer be required to set it on the producer Processors.  For now, though I’d be happy going back to the old behavior where its set on the producer Processors.  On the flip side I am also concerned that the clients (GetKafka and ConsumeKafka) do not expose a max message parameter.  That will be equally problematic.

Anyway, enough of my blathering.  

Yours and the communities help is greatly appreciated.

Thanks,

Chris McDermott
 
Remote Business Analytics
STaTS/StoreFront Remote
HPE Storage
Hewlett Packard Enterprise
Mobile: +1 978-697-5315
 


On 8/21/16, 3:43 PM, "Oleg Zhurakousky" <oz...@hortonworks.com> wrote:

    Chris
    
    Sorry for getting late to this
    
    This indeed was intentional, primarily to follow Kafka’s best practices where Kafka was not designed to be a general data transfer system of arbitrary size but rather “manageable” size. Also, as you know we have ability to demarcate messages essentially allowing you to send a very large FlowFile that will be parsed into chunks where each chunk will end up to be a separate Kafka message.
    That said, we did consider that at some point we may expose it as configuration property and it seems like the time has come.
    
    Cheers
    Oleg
    
    > On Aug 20, 2016, at 7:03 PM, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
    > 
    > Jira is https://issues.apache.org/jira/browse/NIFI-2614.
    > 
    > Thanks,
    > 
    > Chris McDermott
    > 
    > Remote Business Analytics
    > STaTS/StoreFront Remote
    > HPE Storage
    > Hewlett Packard Enterprise
    > Mobile: +1 978-697-5315
    > 
    > 
    > 
    > On 8/20/16, 6:57 PM, "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <ch...@hpe.com> wrote:
    > 
    >    I’ll raise a JIRA, Joe.
    > 
    >    Thanks,
    > 
    >    Chris McDermott
    > 
    >    Remote Business Analytics
    >    STaTS/StoreFront Remote
    >    HPE Storage
    >    Hewlett Packard Enterprise
    >    Mobile: +1 978-697-5315
    > 
    > 
    > 
    >    On 8/20/16, 6:52 PM, "Joe Witt" <jo...@gmail.com> wrote:
    > 
    >        If no jira is raised sooner I'll raise one and get it sorted.
    > 
    >        On Aug 20, 2016 6:40 PM, "Andrew Psaltis" <ps...@gmail.com> wrote:
    > 
    >> Hi Chris,
    >> Sorry for not catching that code path. I am not sure if it is actually a
    >> regression as I took a look at the 1.0.0-BETA code and it matches the
    >> 0.7.0, specifically this comment block:
    >> 
    >> /*
    >> * We're using the default value from Kafka. We are using it to control the
    >> * message size before it goes to to Kafka thus limiting possibility of a
    >> * late failures in Kafka client.
    >> */
    >> 
    >> found at[1] leads me to believe it was intentional and not a regression.
    >> Looking at the 0.6.1 release code it appears that PutKafka used a default
    >> of 5 MB [2].
    >> 
    >> I can speculate on the reasoning behind it, however, I will refrain from
    >> opining on it as I was not involved in any of the conversations related to
    >> the change and enforcement of the 1 MB max.
    >> 
    >> [1]
    >> https://github.com/apache/nifi/blob/rel/nifi-1.0.0-BETA-
    >> official/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-
    >> processors/src/main/java/org/apache/nifi/processors/kafka/
    >> PublishingContext.java#L36-L41
    >> [2]
    >> https://github.com/apache/nifi/blob/rel/nifi-0.6.1/nifi-
    >> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
    >> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L169-L176
    >> 
    >> Thanks,
    >> Andrew
    >> 
    >> On Sat, Aug 20, 2016 at 6:09 PM, McDermott, Chris Kevin (MSDU -
    >> STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
    >> 
    >>> Thanks, Andrew.
    >>> 
    >>> I’ve set all of the right broker configs to allow larger messages.
    >>> Believe me I spent a lot of time banging my head against the wall
    >> thinking
    >>> that the broker and topic configs were wrong.
    >>> 
    >>> PublisingKafka uses PublishingContext.  That class has bean property
    >>> called maxRequestSize, which defaults to 1048576.  As far as I can tell
    >> the
    >>> setMaxRequestSize() method is never called (except by some test code.)
    >>> KafkaPublisher.publish() calls Max Record Size.getMaxRequestSize() and
    >>> passes the result to the constructor for StreamDemarcator.   The publish
    >>> method then calls the StreamDemarcator. getNextToken(), which in turns
    >>> calls StreamDemarcator.fill() which compares the stream position against
    >>> the maxRequestSize and throws the exception with this line.
    >>> 
    >>> throw new IllegalStateException("Maximum allowed data size of " +
    >>> this.maxDataSize + " exceeded.");
    >>> 
    >>> Which matches what I see in the nifi-app.log file…
    >>> 
    >>> 2016-08-20 22:03:05,470 ERROR [Timer-Driven Process Thread-8]
    >>> o.apache.nifi.processors.kafka.PutKafka
    >>> java.lang.IllegalStateException: Maximum allowed data size of 1048576
    >>> exceeded.
    >>>        at org.apache.nifi.stream.io.util.StreamDemarcator.fill(
    >> StreamDemarcator.java:153)
    >>> ~[nifi-utils-0.7.0.jar:0.7.0]
    >>>        at org.apache.nifi.stream.io.util.StreamDemarcator.
    >>> nextToken(StreamDemarcator.java:105) ~[nifi-utils-0.7.0.jar:0.7.0]
    >>>        at org.apache.nifi.processors.kafka.KafkaPublisher.publish(
    >> KafkaPublisher.java:129)
    >>> ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
    >>>        at org.apache.nifi.processors.kafka.PutKafka$1.process(
    >> PutKafka.java:315)
    >>> ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
    >>>        at org.apache.nifi.controller.repository.
    >>> StandardProcessSession.read(StandardProcessSession.java:1851)
    >>> ~[nifi-framework-core-0.7.0.jar:0.7.0]
    >>>        at org.apache.nifi.controller.repository.
    >>> StandardProcessSession.read(StandardProcessSession.java:1822)
    >>> ~[nifi-framework-core-0.7.0.jar:0.7.0]
    >>>        at org.apache.nifi.processors.kafka.PutKafka.
    >>> doRendezvousWithKafka(PutKafka.java:311) ~[nifi-kafka-processors-0.7.0.
    >>> jar:0.7.0]
    >>>        at org.apache.nifi.processors.kafka.PutKafka.
    >>> rendezvousWithKafka(PutKafka.java:287) ~[nifi-kafka-processors-0.7.0.
    >>> jar:0.7.0]
    >>>        at org.apache.nifi.processors.kafka.AbstractKafkaProcessor.
    >>> onTrigger(AbstractKafkaProcessor.java:76) ~[nifi-kafka-processors-0.7.0.
    >>> jar:0.7.0]
    >>>        at org.apache.nifi.controller.StandardProcessorNode.onTrigger(
    >>> StandardProcessorNode.java:1054) [nifi-framework-core-0.7.0.jar:0.7.0]
    >>>        at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
    >>> call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0.
    >>> jar:0.7.0]
    >>>        at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
    >>> call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0.
    >>> jar:0.7.0]
    >>>        at org.apache.nifi.controller.scheduling.
    >>> TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:127)
    >>> [nifi-framework-core-0.7.0.jar:0.7.0]
    >>>        at java.util.concurrent.Executors$RunnableAdapter.
    >> call(Executors.java:511)
    >>> [na:1.8.0_45]
    >>>        at java.util.concurrent.FutureTask.runAndReset(
    >> FutureTask.java:308)
    >>> [na:1.8.0_45]
    >>>        at java.util.concurrent.ScheduledThreadPoolExecutor$
    >>> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    >>> [na:1.8.0_45]
    >>>        at java.util.concurrent.ScheduledThreadPoolExecutor$
    >>> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    >>> [na:1.8.0_45]
    >>>        at java.util.concurrent.ThreadPoolExecutor.runWorker(
    >> ThreadPoolExecutor.java:1142)
    >>> [na:1.8.0_45]
    >>>        at java.util.concurrent.ThreadPoolExecutor$Worker.run(
    >> ThreadPoolExecutor.java:617)
    >>> [na:1.8.0_45]
    >>>        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
    >>> 
    >>> This occurs using PublishKafka, and PutKafka.  Setting the Max Record
    >> Size
    >>> property in the PutKafka processor has no affect on this.  Note the stack
    >>> trace above is from the PutKafka processor with Max Record Size set to
    >> 10MB.
    >>> 
    >>> I believe that this a regression from 0.6.0.
    >>> 
    >>> Chris McDermott
    >>> 
    >>> Remote Business Analytics
    >>> STaTS/StoreFront Remote
    >>> HPE Storage
    >>> Hewlett Packard Enterprise
    >>> Mobile: +1 978-697-5315
    >>> 
    >>> 
    >>> 
    >>> On 8/20/16, 3:48 PM, "Andrew Psaltis" <ps...@gmail.com> wrote:
    >>> 
    >>>    Hi Chris,
    >>>    Regarding the PutKafka processor looking at this block[1] of the
    >>> PutKafka
    >>>    code, it has a default size of 1 MB, but it does not restrict the
    >>> size. The
    >>>    DATA_SIZE_VALIDATOR does a sanity check and also enforces that
    >>>    the supported value entered is the correct format <value> [B|
    >>> KB|MB|GB|TB].
    >>>    Later on in the code at this block[2], the value is set on the Kafka
    >>>    config, again this does not enforce a value maximum.
    >>> 
    >>>    In regards to the PublishKafka processor I do not see where it
    >> accepts
    >>> a
    >>>    size nor restrict the size at all.
    >>> 
    >>>    Have you adjusted the 'message.max.bytes' config value for your
    >>> broker(s)?
    >>>    The default value for that is 1 MB [3] (The url references the 0.8
    >>> Kafka,
    >>>    however I believe this default has been stable since the early days
    >> of
    >>> the
    >>>    project.)
    >>> 
    >>>    If you really do want to send messages that are larger than 1 MB in
    >>> size, I
    >>>    would highly recommending reading this post[4] from Gwen Shapira.  It
    >>> does
    >>>    a great job of outlining the things you need to take into
    >>> consideration.
    >>>    This will also point you to the relevant configs in Kafka that will
    >>> need to
    >>>    be adjusted if you decide to go this route.
    >>> 
    >>> 
    >>>    Thanks,
    >>>    Andrew
    >>> 
    >>>    [1]
    >>>    https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
    >>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
    >>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L174-L180
    >>>    [2]
    >>>    https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
    >>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
    >>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L495
    >>>    [3] https://kafka.apache.org/08/configuration.html
    >>>    [4] http://ingest.tips/2015/01/21/handling-large-messages-kafka/
    >>> 
    >>>    On Sat, Aug 20, 2016 at 3:25 PM, McDermott, Chris Kevin (MSDU -
    >>>    STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
    >>> 
    >>>> Hi folks,
    >>>> 
    >>>> 
    >>>> 
    >>>> From experimentation and looking at the code it seems that the max
    >>> message
    >>>> size that can be sent via the PublishKafka and PutKafka processors
    >>> in 0.7.0
    >>>> is 1MB.  Can someone please confirm my read on this?
    >>>> 
    >>>> 
    >>>> 
    >>>> Thanks,
    >>>> 
    >>>> 
    >>>> 
    >>>> Chris McDermott
    >>>> 
    >>>> 
    >>>> 
    >>>> Remote Business Analytics
    >>>> 
    >>>> STaTS/StoreFront Remote
    >>>> 
    >>>> HPE Storage
    >>>> 
    >>>> Hewlett Packard Enterprise
    >>>> 
    >>>> Mobile: +1 978-697-5315
    >>>> 
    >>>> 
    >>>> 
    >>>> 
    >>> 
    >>> 
    >>>    --
    >>>    Thanks,
    >>>    Andrew
    >>> 
    >>>    Subscribe to my book: Streaming Data <http://manning.com/psaltis>
    >>>    <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
    >>>    twiiter: @itmdata <http://twitter.com/intent/
    >> user?screen_name=itmdata>
    >>> 
    >>> 
    >>> 
    >> 
    >> 
    >> --
    >> Thanks,
    >> Andrew
    >> 
    >> Subscribe to my book: Streaming Data <http://manning.com/psaltis>
    >> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
    >> twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata>
    >> 
    > 
    > 
    > 
    > 
    
    


Re: Max Kafka message size

Posted by Oleg Zhurakousky <oz...@hortonworks.com>.
Chris

Sorry for getting late to this

This indeed was intentional, primarily to follow Kafka’s best practices where Kafka was not designed to be a general data transfer system of arbitrary size but rather “manageable” size. Also, as you know we have ability to demarcate messages essentially allowing you to send a very large FlowFile that will be parsed into chunks where each chunk will end up to be a separate Kafka message.
That said, we did consider that at some point we may expose it as configuration property and it seems like the time has come.

Cheers
Oleg

> On Aug 20, 2016, at 7:03 PM, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
> 
> Jira is https://issues.apache.org/jira/browse/NIFI-2614.
> 
> Thanks,
> 
> Chris McDermott
> 
> Remote Business Analytics
> STaTS/StoreFront Remote
> HPE Storage
> Hewlett Packard Enterprise
> Mobile: +1 978-697-5315
> 
> 
> 
> On 8/20/16, 6:57 PM, "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <ch...@hpe.com> wrote:
> 
>    I’ll raise a JIRA, Joe.
> 
>    Thanks,
> 
>    Chris McDermott
> 
>    Remote Business Analytics
>    STaTS/StoreFront Remote
>    HPE Storage
>    Hewlett Packard Enterprise
>    Mobile: +1 978-697-5315
> 
> 
> 
>    On 8/20/16, 6:52 PM, "Joe Witt" <jo...@gmail.com> wrote:
> 
>        If no jira is raised sooner I'll raise one and get it sorted.
> 
>        On Aug 20, 2016 6:40 PM, "Andrew Psaltis" <ps...@gmail.com> wrote:
> 
>> Hi Chris,
>> Sorry for not catching that code path. I am not sure if it is actually a
>> regression as I took a look at the 1.0.0-BETA code and it matches the
>> 0.7.0, specifically this comment block:
>> 
>> /*
>> * We're using the default value from Kafka. We are using it to control the
>> * message size before it goes to to Kafka thus limiting possibility of a
>> * late failures in Kafka client.
>> */
>> 
>> found at[1] leads me to believe it was intentional and not a regression.
>> Looking at the 0.6.1 release code it appears that PutKafka used a default
>> of 5 MB [2].
>> 
>> I can speculate on the reasoning behind it, however, I will refrain from
>> opining on it as I was not involved in any of the conversations related to
>> the change and enforcement of the 1 MB max.
>> 
>> [1]
>> https://github.com/apache/nifi/blob/rel/nifi-1.0.0-BETA-
>> official/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-
>> processors/src/main/java/org/apache/nifi/processors/kafka/
>> PublishingContext.java#L36-L41
>> [2]
>> https://github.com/apache/nifi/blob/rel/nifi-0.6.1/nifi-
>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L169-L176
>> 
>> Thanks,
>> Andrew
>> 
>> On Sat, Aug 20, 2016 at 6:09 PM, McDermott, Chris Kevin (MSDU -
>> STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
>> 
>>> Thanks, Andrew.
>>> 
>>> I’ve set all of the right broker configs to allow larger messages.
>>> Believe me I spent a lot of time banging my head against the wall
>> thinking
>>> that the broker and topic configs were wrong.
>>> 
>>> PublisingKafka uses PublishingContext.  That class has bean property
>>> called maxRequestSize, which defaults to 1048576.  As far as I can tell
>> the
>>> setMaxRequestSize() method is never called (except by some test code.)
>>> KafkaPublisher.publish() calls Max Record Size.getMaxRequestSize() and
>>> passes the result to the constructor for StreamDemarcator.   The publish
>>> method then calls the StreamDemarcator. getNextToken(), which in turns
>>> calls StreamDemarcator.fill() which compares the stream position against
>>> the maxRequestSize and throws the exception with this line.
>>> 
>>> throw new IllegalStateException("Maximum allowed data size of " +
>>> this.maxDataSize + " exceeded.");
>>> 
>>> Which matches what I see in the nifi-app.log file…
>>> 
>>> 2016-08-20 22:03:05,470 ERROR [Timer-Driven Process Thread-8]
>>> o.apache.nifi.processors.kafka.PutKafka
>>> java.lang.IllegalStateException: Maximum allowed data size of 1048576
>>> exceeded.
>>>        at org.apache.nifi.stream.io.util.StreamDemarcator.fill(
>> StreamDemarcator.java:153)
>>> ~[nifi-utils-0.7.0.jar:0.7.0]
>>>        at org.apache.nifi.stream.io.util.StreamDemarcator.
>>> nextToken(StreamDemarcator.java:105) ~[nifi-utils-0.7.0.jar:0.7.0]
>>>        at org.apache.nifi.processors.kafka.KafkaPublisher.publish(
>> KafkaPublisher.java:129)
>>> ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
>>>        at org.apache.nifi.processors.kafka.PutKafka$1.process(
>> PutKafka.java:315)
>>> ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
>>>        at org.apache.nifi.controller.repository.
>>> StandardProcessSession.read(StandardProcessSession.java:1851)
>>> ~[nifi-framework-core-0.7.0.jar:0.7.0]
>>>        at org.apache.nifi.controller.repository.
>>> StandardProcessSession.read(StandardProcessSession.java:1822)
>>> ~[nifi-framework-core-0.7.0.jar:0.7.0]
>>>        at org.apache.nifi.processors.kafka.PutKafka.
>>> doRendezvousWithKafka(PutKafka.java:311) ~[nifi-kafka-processors-0.7.0.
>>> jar:0.7.0]
>>>        at org.apache.nifi.processors.kafka.PutKafka.
>>> rendezvousWithKafka(PutKafka.java:287) ~[nifi-kafka-processors-0.7.0.
>>> jar:0.7.0]
>>>        at org.apache.nifi.processors.kafka.AbstractKafkaProcessor.
>>> onTrigger(AbstractKafkaProcessor.java:76) ~[nifi-kafka-processors-0.7.0.
>>> jar:0.7.0]
>>>        at org.apache.nifi.controller.StandardProcessorNode.onTrigger(
>>> StandardProcessorNode.java:1054) [nifi-framework-core-0.7.0.jar:0.7.0]
>>>        at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
>>> call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0.
>>> jar:0.7.0]
>>>        at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
>>> call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0.
>>> jar:0.7.0]
>>>        at org.apache.nifi.controller.scheduling.
>>> TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:127)
>>> [nifi-framework-core-0.7.0.jar:0.7.0]
>>>        at java.util.concurrent.Executors$RunnableAdapter.
>> call(Executors.java:511)
>>> [na:1.8.0_45]
>>>        at java.util.concurrent.FutureTask.runAndReset(
>> FutureTask.java:308)
>>> [na:1.8.0_45]
>>>        at java.util.concurrent.ScheduledThreadPoolExecutor$
>>> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>> [na:1.8.0_45]
>>>        at java.util.concurrent.ScheduledThreadPoolExecutor$
>>> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>> [na:1.8.0_45]
>>>        at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1142)
>>> [na:1.8.0_45]
>>>        at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:617)
>>> [na:1.8.0_45]
>>>        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
>>> 
>>> This occurs using PublishKafka, and PutKafka.  Setting the Max Record
>> Size
>>> property in the PutKafka processor has no affect on this.  Note the stack
>>> trace above is from the PutKafka processor with Max Record Size set to
>> 10MB.
>>> 
>>> I believe that this a regression from 0.6.0.
>>> 
>>> Chris McDermott
>>> 
>>> Remote Business Analytics
>>> STaTS/StoreFront Remote
>>> HPE Storage
>>> Hewlett Packard Enterprise
>>> Mobile: +1 978-697-5315
>>> 
>>> 
>>> 
>>> On 8/20/16, 3:48 PM, "Andrew Psaltis" <ps...@gmail.com> wrote:
>>> 
>>>    Hi Chris,
>>>    Regarding the PutKafka processor looking at this block[1] of the
>>> PutKafka
>>>    code, it has a default size of 1 MB, but it does not restrict the
>>> size. The
>>>    DATA_SIZE_VALIDATOR does a sanity check and also enforces that
>>>    the supported value entered is the correct format <value> [B|
>>> KB|MB|GB|TB].
>>>    Later on in the code at this block[2], the value is set on the Kafka
>>>    config, again this does not enforce a value maximum.
>>> 
>>>    In regards to the PublishKafka processor I do not see where it
>> accepts
>>> a
>>>    size nor restrict the size at all.
>>> 
>>>    Have you adjusted the 'message.max.bytes' config value for your
>>> broker(s)?
>>>    The default value for that is 1 MB [3] (The url references the 0.8
>>> Kafka,
>>>    however I believe this default has been stable since the early days
>> of
>>> the
>>>    project.)
>>> 
>>>    If you really do want to send messages that are larger than 1 MB in
>>> size, I
>>>    would highly recommending reading this post[4] from Gwen Shapira.  It
>>> does
>>>    a great job of outlining the things you need to take into
>>> consideration.
>>>    This will also point you to the relevant configs in Kafka that will
>>> need to
>>>    be adjusted if you decide to go this route.
>>> 
>>> 
>>>    Thanks,
>>>    Andrew
>>> 
>>>    [1]
>>>    https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
>>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
>>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L174-L180
>>>    [2]
>>>    https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
>>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
>>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L495
>>>    [3] https://kafka.apache.org/08/configuration.html
>>>    [4] http://ingest.tips/2015/01/21/handling-large-messages-kafka/
>>> 
>>>    On Sat, Aug 20, 2016 at 3:25 PM, McDermott, Chris Kevin (MSDU -
>>>    STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
>>> 
>>>> Hi folks,
>>>> 
>>>> 
>>>> 
>>>> From experimentation and looking at the code it seems that the max
>>> message
>>>> size that can be sent via the PublishKafka and PutKafka processors
>>> in 0.7.0
>>>> is 1MB.  Can someone please confirm my read on this?
>>>> 
>>>> 
>>>> 
>>>> Thanks,
>>>> 
>>>> 
>>>> 
>>>> Chris McDermott
>>>> 
>>>> 
>>>> 
>>>> Remote Business Analytics
>>>> 
>>>> STaTS/StoreFront Remote
>>>> 
>>>> HPE Storage
>>>> 
>>>> Hewlett Packard Enterprise
>>>> 
>>>> Mobile: +1 978-697-5315
>>>> 
>>>> 
>>>> 
>>>> 
>>> 
>>> 
>>>    --
>>>    Thanks,
>>>    Andrew
>>> 
>>>    Subscribe to my book: Streaming Data <http://manning.com/psaltis>
>>>    <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
>>>    twiiter: @itmdata <http://twitter.com/intent/
>> user?screen_name=itmdata>
>>> 
>>> 
>>> 
>> 
>> 
>> --
>> Thanks,
>> Andrew
>> 
>> Subscribe to my book: Streaming Data <http://manning.com/psaltis>
>> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
>> twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata>
>> 
> 
> 
> 
> 


Re: Max Kafka message size

Posted by "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <ch...@hpe.com>.
Jira is https://issues.apache.org/jira/browse/NIFI-2614.

Thanks,

Chris McDermott
 
Remote Business Analytics
STaTS/StoreFront Remote
HPE Storage
Hewlett Packard Enterprise
Mobile: +1 978-697-5315
 


On 8/20/16, 6:57 PM, "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <ch...@hpe.com> wrote:

    I’ll raise a JIRA, Joe.
    
    Thanks,
    
    Chris McDermott
     
    Remote Business Analytics
    STaTS/StoreFront Remote
    HPE Storage
    Hewlett Packard Enterprise
    Mobile: +1 978-697-5315
     
    
    
    On 8/20/16, 6:52 PM, "Joe Witt" <jo...@gmail.com> wrote:
    
        If no jira is raised sooner I'll raise one and get it sorted.
        
        On Aug 20, 2016 6:40 PM, "Andrew Psaltis" <ps...@gmail.com> wrote:
        
        > Hi Chris,
        > Sorry for not catching that code path. I am not sure if it is actually a
        > regression as I took a look at the 1.0.0-BETA code and it matches the
        > 0.7.0, specifically this comment block:
        >
        > /*
        >  * We're using the default value from Kafka. We are using it to control the
        >  * message size before it goes to to Kafka thus limiting possibility of a
        >  * late failures in Kafka client.
        >  */
        >
        > found at[1] leads me to believe it was intentional and not a regression.
        > Looking at the 0.6.1 release code it appears that PutKafka used a default
        > of 5 MB [2].
        >
        > I can speculate on the reasoning behind it, however, I will refrain from
        > opining on it as I was not involved in any of the conversations related to
        > the change and enforcement of the 1 MB max.
        >
        > [1]
        > https://github.com/apache/nifi/blob/rel/nifi-1.0.0-BETA-
        > official/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-
        > processors/src/main/java/org/apache/nifi/processors/kafka/
        > PublishingContext.java#L36-L41
        > [2]
        > https://github.com/apache/nifi/blob/rel/nifi-0.6.1/nifi-
        > nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
        > main/java/org/apache/nifi/processors/kafka/PutKafka.java#L169-L176
        >
        > Thanks,
        > Andrew
        >
        > On Sat, Aug 20, 2016 at 6:09 PM, McDermott, Chris Kevin (MSDU -
        > STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
        >
        > > Thanks, Andrew.
        > >
        > > I’ve set all of the right broker configs to allow larger messages.
        > > Believe me I spent a lot of time banging my head against the wall
        > thinking
        > > that the broker and topic configs were wrong.
        > >
        > > PublisingKafka uses PublishingContext.  That class has bean property
        > > called maxRequestSize, which defaults to 1048576.  As far as I can tell
        > the
        > > setMaxRequestSize() method is never called (except by some test code.)
        > > KafkaPublisher.publish() calls Max Record Size.getMaxRequestSize() and
        > > passes the result to the constructor for StreamDemarcator.   The publish
        > > method then calls the StreamDemarcator. getNextToken(), which in turns
        > > calls StreamDemarcator.fill() which compares the stream position against
        > > the maxRequestSize and throws the exception with this line.
        > >
        > > throw new IllegalStateException("Maximum allowed data size of " +
        > > this.maxDataSize + " exceeded.");
        > >
        > > Which matches what I see in the nifi-app.log file…
        > >
        > > 2016-08-20 22:03:05,470 ERROR [Timer-Driven Process Thread-8]
        > > o.apache.nifi.processors.kafka.PutKafka
        > > java.lang.IllegalStateException: Maximum allowed data size of 1048576
        > > exceeded.
        > >         at org.apache.nifi.stream.io.util.StreamDemarcator.fill(
        > StreamDemarcator.java:153)
        > > ~[nifi-utils-0.7.0.jar:0.7.0]
        > >         at org.apache.nifi.stream.io.util.StreamDemarcator.
        > > nextToken(StreamDemarcator.java:105) ~[nifi-utils-0.7.0.jar:0.7.0]
        > >         at org.apache.nifi.processors.kafka.KafkaPublisher.publish(
        > KafkaPublisher.java:129)
        > > ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
        > >         at org.apache.nifi.processors.kafka.PutKafka$1.process(
        > PutKafka.java:315)
        > > ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
        > >         at org.apache.nifi.controller.repository.
        > > StandardProcessSession.read(StandardProcessSession.java:1851)
        > > ~[nifi-framework-core-0.7.0.jar:0.7.0]
        > >         at org.apache.nifi.controller.repository.
        > > StandardProcessSession.read(StandardProcessSession.java:1822)
        > > ~[nifi-framework-core-0.7.0.jar:0.7.0]
        > >         at org.apache.nifi.processors.kafka.PutKafka.
        > > doRendezvousWithKafka(PutKafka.java:311) ~[nifi-kafka-processors-0.7.0.
        > > jar:0.7.0]
        > >         at org.apache.nifi.processors.kafka.PutKafka.
        > > rendezvousWithKafka(PutKafka.java:287) ~[nifi-kafka-processors-0.7.0.
        > > jar:0.7.0]
        > >         at org.apache.nifi.processors.kafka.AbstractKafkaProcessor.
        > > onTrigger(AbstractKafkaProcessor.java:76) ~[nifi-kafka-processors-0.7.0.
        > > jar:0.7.0]
        > >         at org.apache.nifi.controller.StandardProcessorNode.onTrigger(
        > > StandardProcessorNode.java:1054) [nifi-framework-core-0.7.0.jar:0.7.0]
        > >         at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
        > > call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0.
        > > jar:0.7.0]
        > >         at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
        > > call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0.
        > > jar:0.7.0]
        > >         at org.apache.nifi.controller.scheduling.
        > > TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:127)
        > > [nifi-framework-core-0.7.0.jar:0.7.0]
        > >         at java.util.concurrent.Executors$RunnableAdapter.
        > call(Executors.java:511)
        > > [na:1.8.0_45]
        > >         at java.util.concurrent.FutureTask.runAndReset(
        > FutureTask.java:308)
        > > [na:1.8.0_45]
        > >         at java.util.concurrent.ScheduledThreadPoolExecutor$
        > > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        > > [na:1.8.0_45]
        > >         at java.util.concurrent.ScheduledThreadPoolExecutor$
        > > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        > > [na:1.8.0_45]
        > >         at java.util.concurrent.ThreadPoolExecutor.runWorker(
        > ThreadPoolExecutor.java:1142)
        > > [na:1.8.0_45]
        > >         at java.util.concurrent.ThreadPoolExecutor$Worker.run(
        > ThreadPoolExecutor.java:617)
        > > [na:1.8.0_45]
        > >         at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
        > >
        > > This occurs using PublishKafka, and PutKafka.  Setting the Max Record
        > Size
        > > property in the PutKafka processor has no affect on this.  Note the stack
        > > trace above is from the PutKafka processor with Max Record Size set to
        > 10MB.
        > >
        > > I believe that this a regression from 0.6.0.
        > >
        > > Chris McDermott
        > >
        > > Remote Business Analytics
        > > STaTS/StoreFront Remote
        > > HPE Storage
        > > Hewlett Packard Enterprise
        > > Mobile: +1 978-697-5315
        > >
        > >
        > >
        > > On 8/20/16, 3:48 PM, "Andrew Psaltis" <ps...@gmail.com> wrote:
        > >
        > >     Hi Chris,
        > >     Regarding the PutKafka processor looking at this block[1] of the
        > > PutKafka
        > >     code, it has a default size of 1 MB, but it does not restrict the
        > > size. The
        > >     DATA_SIZE_VALIDATOR does a sanity check and also enforces that
        > >     the supported value entered is the correct format <value> [B|
        > > KB|MB|GB|TB].
        > >     Later on in the code at this block[2], the value is set on the Kafka
        > >     config, again this does not enforce a value maximum.
        > >
        > >     In regards to the PublishKafka processor I do not see where it
        > accepts
        > > a
        > >     size nor restrict the size at all.
        > >
        > >     Have you adjusted the 'message.max.bytes' config value for your
        > > broker(s)?
        > >     The default value for that is 1 MB [3] (The url references the 0.8
        > > Kafka,
        > >     however I believe this default has been stable since the early days
        > of
        > > the
        > >     project.)
        > >
        > >     If you really do want to send messages that are larger than 1 MB in
        > > size, I
        > >     would highly recommending reading this post[4] from Gwen Shapira.  It
        > > does
        > >     a great job of outlining the things you need to take into
        > > consideration.
        > >     This will also point you to the relevant configs in Kafka that will
        > > need to
        > >     be adjusted if you decide to go this route.
        > >
        > >
        > >     Thanks,
        > >     Andrew
        > >
        > >     [1]
        > >     https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
        > > nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
        > > main/java/org/apache/nifi/processors/kafka/PutKafka.java#L174-L180
        > >     [2]
        > >     https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
        > > nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
        > > main/java/org/apache/nifi/processors/kafka/PutKafka.java#L495
        > >     [3] https://kafka.apache.org/08/configuration.html
        > >     [4] http://ingest.tips/2015/01/21/handling-large-messages-kafka/
        > >
        > >     On Sat, Aug 20, 2016 at 3:25 PM, McDermott, Chris Kevin (MSDU -
        > >     STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
        > >
        > >     > Hi folks,
        > >     >
        > >     >
        > >     >
        > >     > From experimentation and looking at the code it seems that the max
        > > message
        > >     > size that can be sent via the PublishKafka and PutKafka processors
        > > in 0.7.0
        > >     > is 1MB.  Can someone please confirm my read on this?
        > >     >
        > >     >
        > >     >
        > >     > Thanks,
        > >     >
        > >     >
        > >     >
        > >     > Chris McDermott
        > >     >
        > >     >
        > >     >
        > >     > Remote Business Analytics
        > >     >
        > >     > STaTS/StoreFront Remote
        > >     >
        > >     > HPE Storage
        > >     >
        > >     > Hewlett Packard Enterprise
        > >     >
        > >     > Mobile: +1 978-697-5315
        > >     >
        > >     >
        > >     >
        > >     >
        > >
        > >
        > >     --
        > >     Thanks,
        > >     Andrew
        > >
        > >     Subscribe to my book: Streaming Data <http://manning.com/psaltis>
        > >     <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
        > >     twiiter: @itmdata <http://twitter.com/intent/
        > user?screen_name=itmdata>
        > >
        > >
        > >
        >
        >
        > --
        > Thanks,
        > Andrew
        >
        > Subscribe to my book: Streaming Data <http://manning.com/psaltis>
        > <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
        > twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata>
        >
        
    
    


Re: Max Kafka message size

Posted by "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <ch...@hpe.com>.
I’ll raise a JIRA, Joe.

Thanks,

Chris McDermott
 
Remote Business Analytics
STaTS/StoreFront Remote
HPE Storage
Hewlett Packard Enterprise
Mobile: +1 978-697-5315
 


On 8/20/16, 6:52 PM, "Joe Witt" <jo...@gmail.com> wrote:

    If no jira is raised sooner I'll raise one and get it sorted.
    
    On Aug 20, 2016 6:40 PM, "Andrew Psaltis" <ps...@gmail.com> wrote:
    
    > Hi Chris,
    > Sorry for not catching that code path. I am not sure if it is actually a
    > regression as I took a look at the 1.0.0-BETA code and it matches the
    > 0.7.0, specifically this comment block:
    >
    > /*
    >  * We're using the default value from Kafka. We are using it to control the
    >  * message size before it goes to to Kafka thus limiting possibility of a
    >  * late failures in Kafka client.
    >  */
    >
    > found at[1] leads me to believe it was intentional and not a regression.
    > Looking at the 0.6.1 release code it appears that PutKafka used a default
    > of 5 MB [2].
    >
    > I can speculate on the reasoning behind it, however, I will refrain from
    > opining on it as I was not involved in any of the conversations related to
    > the change and enforcement of the 1 MB max.
    >
    > [1]
    > https://github.com/apache/nifi/blob/rel/nifi-1.0.0-BETA-
    > official/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-
    > processors/src/main/java/org/apache/nifi/processors/kafka/
    > PublishingContext.java#L36-L41
    > [2]
    > https://github.com/apache/nifi/blob/rel/nifi-0.6.1/nifi-
    > nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
    > main/java/org/apache/nifi/processors/kafka/PutKafka.java#L169-L176
    >
    > Thanks,
    > Andrew
    >
    > On Sat, Aug 20, 2016 at 6:09 PM, McDermott, Chris Kevin (MSDU -
    > STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
    >
    > > Thanks, Andrew.
    > >
    > > I’ve set all of the right broker configs to allow larger messages.
    > > Believe me I spent a lot of time banging my head against the wall
    > thinking
    > > that the broker and topic configs were wrong.
    > >
    > > PublisingKafka uses PublishingContext.  That class has bean property
    > > called maxRequestSize, which defaults to 1048576.  As far as I can tell
    > the
    > > setMaxRequestSize() method is never called (except by some test code.)
    > > KafkaPublisher.publish() calls Max Record Size.getMaxRequestSize() and
    > > passes the result to the constructor for StreamDemarcator.   The publish
    > > method then calls the StreamDemarcator. getNextToken(), which in turns
    > > calls StreamDemarcator.fill() which compares the stream position against
    > > the maxRequestSize and throws the exception with this line.
    > >
    > > throw new IllegalStateException("Maximum allowed data size of " +
    > > this.maxDataSize + " exceeded.");
    > >
    > > Which matches what I see in the nifi-app.log file…
    > >
    > > 2016-08-20 22:03:05,470 ERROR [Timer-Driven Process Thread-8]
    > > o.apache.nifi.processors.kafka.PutKafka
    > > java.lang.IllegalStateException: Maximum allowed data size of 1048576
    > > exceeded.
    > >         at org.apache.nifi.stream.io.util.StreamDemarcator.fill(
    > StreamDemarcator.java:153)
    > > ~[nifi-utils-0.7.0.jar:0.7.0]
    > >         at org.apache.nifi.stream.io.util.StreamDemarcator.
    > > nextToken(StreamDemarcator.java:105) ~[nifi-utils-0.7.0.jar:0.7.0]
    > >         at org.apache.nifi.processors.kafka.KafkaPublisher.publish(
    > KafkaPublisher.java:129)
    > > ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
    > >         at org.apache.nifi.processors.kafka.PutKafka$1.process(
    > PutKafka.java:315)
    > > ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
    > >         at org.apache.nifi.controller.repository.
    > > StandardProcessSession.read(StandardProcessSession.java:1851)
    > > ~[nifi-framework-core-0.7.0.jar:0.7.0]
    > >         at org.apache.nifi.controller.repository.
    > > StandardProcessSession.read(StandardProcessSession.java:1822)
    > > ~[nifi-framework-core-0.7.0.jar:0.7.0]
    > >         at org.apache.nifi.processors.kafka.PutKafka.
    > > doRendezvousWithKafka(PutKafka.java:311) ~[nifi-kafka-processors-0.7.0.
    > > jar:0.7.0]
    > >         at org.apache.nifi.processors.kafka.PutKafka.
    > > rendezvousWithKafka(PutKafka.java:287) ~[nifi-kafka-processors-0.7.0.
    > > jar:0.7.0]
    > >         at org.apache.nifi.processors.kafka.AbstractKafkaProcessor.
    > > onTrigger(AbstractKafkaProcessor.java:76) ~[nifi-kafka-processors-0.7.0.
    > > jar:0.7.0]
    > >         at org.apache.nifi.controller.StandardProcessorNode.onTrigger(
    > > StandardProcessorNode.java:1054) [nifi-framework-core-0.7.0.jar:0.7.0]
    > >         at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
    > > call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0.
    > > jar:0.7.0]
    > >         at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
    > > call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0.
    > > jar:0.7.0]
    > >         at org.apache.nifi.controller.scheduling.
    > > TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:127)
    > > [nifi-framework-core-0.7.0.jar:0.7.0]
    > >         at java.util.concurrent.Executors$RunnableAdapter.
    > call(Executors.java:511)
    > > [na:1.8.0_45]
    > >         at java.util.concurrent.FutureTask.runAndReset(
    > FutureTask.java:308)
    > > [na:1.8.0_45]
    > >         at java.util.concurrent.ScheduledThreadPoolExecutor$
    > > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    > > [na:1.8.0_45]
    > >         at java.util.concurrent.ScheduledThreadPoolExecutor$
    > > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    > > [na:1.8.0_45]
    > >         at java.util.concurrent.ThreadPoolExecutor.runWorker(
    > ThreadPoolExecutor.java:1142)
    > > [na:1.8.0_45]
    > >         at java.util.concurrent.ThreadPoolExecutor$Worker.run(
    > ThreadPoolExecutor.java:617)
    > > [na:1.8.0_45]
    > >         at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
    > >
    > > This occurs using PublishKafka, and PutKafka.  Setting the Max Record
    > Size
    > > property in the PutKafka processor has no affect on this.  Note the stack
    > > trace above is from the PutKafka processor with Max Record Size set to
    > 10MB.
    > >
    > > I believe that this a regression from 0.6.0.
    > >
    > > Chris McDermott
    > >
    > > Remote Business Analytics
    > > STaTS/StoreFront Remote
    > > HPE Storage
    > > Hewlett Packard Enterprise
    > > Mobile: +1 978-697-5315
    > >
    > >
    > >
    > > On 8/20/16, 3:48 PM, "Andrew Psaltis" <ps...@gmail.com> wrote:
    > >
    > >     Hi Chris,
    > >     Regarding the PutKafka processor looking at this block[1] of the
    > > PutKafka
    > >     code, it has a default size of 1 MB, but it does not restrict the
    > > size. The
    > >     DATA_SIZE_VALIDATOR does a sanity check and also enforces that
    > >     the supported value entered is the correct format <value> [B|
    > > KB|MB|GB|TB].
    > >     Later on in the code at this block[2], the value is set on the Kafka
    > >     config, again this does not enforce a value maximum.
    > >
    > >     In regards to the PublishKafka processor I do not see where it
    > accepts
    > > a
    > >     size nor restrict the size at all.
    > >
    > >     Have you adjusted the 'message.max.bytes' config value for your
    > > broker(s)?
    > >     The default value for that is 1 MB [3] (The url references the 0.8
    > > Kafka,
    > >     however I believe this default has been stable since the early days
    > of
    > > the
    > >     project.)
    > >
    > >     If you really do want to send messages that are larger than 1 MB in
    > > size, I
    > >     would highly recommending reading this post[4] from Gwen Shapira.  It
    > > does
    > >     a great job of outlining the things you need to take into
    > > consideration.
    > >     This will also point you to the relevant configs in Kafka that will
    > > need to
    > >     be adjusted if you decide to go this route.
    > >
    > >
    > >     Thanks,
    > >     Andrew
    > >
    > >     [1]
    > >     https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
    > > nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
    > > main/java/org/apache/nifi/processors/kafka/PutKafka.java#L174-L180
    > >     [2]
    > >     https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
    > > nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
    > > main/java/org/apache/nifi/processors/kafka/PutKafka.java#L495
    > >     [3] https://kafka.apache.org/08/configuration.html
    > >     [4] http://ingest.tips/2015/01/21/handling-large-messages-kafka/
    > >
    > >     On Sat, Aug 20, 2016 at 3:25 PM, McDermott, Chris Kevin (MSDU -
    > >     STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
    > >
    > >     > Hi folks,
    > >     >
    > >     >
    > >     >
    > >     > From experimentation and looking at the code it seems that the max
    > > message
    > >     > size that can be sent via the PublishKafka and PutKafka processors
    > > in 0.7.0
    > >     > is 1MB.  Can someone please confirm my read on this?
    > >     >
    > >     >
    > >     >
    > >     > Thanks,
    > >     >
    > >     >
    > >     >
    > >     > Chris McDermott
    > >     >
    > >     >
    > >     >
    > >     > Remote Business Analytics
    > >     >
    > >     > STaTS/StoreFront Remote
    > >     >
    > >     > HPE Storage
    > >     >
    > >     > Hewlett Packard Enterprise
    > >     >
    > >     > Mobile: +1 978-697-5315
    > >     >
    > >     >
    > >     >
    > >     >
    > >
    > >
    > >     --
    > >     Thanks,
    > >     Andrew
    > >
    > >     Subscribe to my book: Streaming Data <http://manning.com/psaltis>
    > >     <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
    > >     twiiter: @itmdata <http://twitter.com/intent/
    > user?screen_name=itmdata>
    > >
    > >
    > >
    >
    >
    > --
    > Thanks,
    > Andrew
    >
    > Subscribe to my book: Streaming Data <http://manning.com/psaltis>
    > <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
    > twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata>
    >
    


Re: Max Kafka message size

Posted by Joe Witt <jo...@gmail.com>.
If no jira is raised sooner I'll raise one and get it sorted.

On Aug 20, 2016 6:40 PM, "Andrew Psaltis" <ps...@gmail.com> wrote:

> Hi Chris,
> Sorry for not catching that code path. I am not sure if it is actually a
> regression as I took a look at the 1.0.0-BETA code and it matches the
> 0.7.0, specifically this comment block:
>
> /*
>  * We're using the default value from Kafka. We are using it to control the
>  * message size before it goes to to Kafka thus limiting possibility of a
>  * late failures in Kafka client.
>  */
>
> found at[1] leads me to believe it was intentional and not a regression.
> Looking at the 0.6.1 release code it appears that PutKafka used a default
> of 5 MB [2].
>
> I can speculate on the reasoning behind it, however, I will refrain from
> opining on it as I was not involved in any of the conversations related to
> the change and enforcement of the 1 MB max.
>
> [1]
> https://github.com/apache/nifi/blob/rel/nifi-1.0.0-BETA-
> official/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-
> processors/src/main/java/org/apache/nifi/processors/kafka/
> PublishingContext.java#L36-L41
> [2]
> https://github.com/apache/nifi/blob/rel/nifi-0.6.1/nifi-
> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L169-L176
>
> Thanks,
> Andrew
>
> On Sat, Aug 20, 2016 at 6:09 PM, McDermott, Chris Kevin (MSDU -
> STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
>
> > Thanks, Andrew.
> >
> > I’ve set all of the right broker configs to allow larger messages.
> > Believe me I spent a lot of time banging my head against the wall
> thinking
> > that the broker and topic configs were wrong.
> >
> > PublisingKafka uses PublishingContext.  That class has bean property
> > called maxRequestSize, which defaults to 1048576.  As far as I can tell
> the
> > setMaxRequestSize() method is never called (except by some test code.)
> > KafkaPublisher.publish() calls Max Record Size.getMaxRequestSize() and
> > passes the result to the constructor for StreamDemarcator.   The publish
> > method then calls the StreamDemarcator. getNextToken(), which in turns
> > calls StreamDemarcator.fill() which compares the stream position against
> > the maxRequestSize and throws the exception with this line.
> >
> > throw new IllegalStateException("Maximum allowed data size of " +
> > this.maxDataSize + " exceeded.");
> >
> > Which matches what I see in the nifi-app.log file…
> >
> > 2016-08-20 22:03:05,470 ERROR [Timer-Driven Process Thread-8]
> > o.apache.nifi.processors.kafka.PutKafka
> > java.lang.IllegalStateException: Maximum allowed data size of 1048576
> > exceeded.
> >         at org.apache.nifi.stream.io.util.StreamDemarcator.fill(
> StreamDemarcator.java:153)
> > ~[nifi-utils-0.7.0.jar:0.7.0]
> >         at org.apache.nifi.stream.io.util.StreamDemarcator.
> > nextToken(StreamDemarcator.java:105) ~[nifi-utils-0.7.0.jar:0.7.0]
> >         at org.apache.nifi.processors.kafka.KafkaPublisher.publish(
> KafkaPublisher.java:129)
> > ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
> >         at org.apache.nifi.processors.kafka.PutKafka$1.process(
> PutKafka.java:315)
> > ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
> >         at org.apache.nifi.controller.repository.
> > StandardProcessSession.read(StandardProcessSession.java:1851)
> > ~[nifi-framework-core-0.7.0.jar:0.7.0]
> >         at org.apache.nifi.controller.repository.
> > StandardProcessSession.read(StandardProcessSession.java:1822)
> > ~[nifi-framework-core-0.7.0.jar:0.7.0]
> >         at org.apache.nifi.processors.kafka.PutKafka.
> > doRendezvousWithKafka(PutKafka.java:311) ~[nifi-kafka-processors-0.7.0.
> > jar:0.7.0]
> >         at org.apache.nifi.processors.kafka.PutKafka.
> > rendezvousWithKafka(PutKafka.java:287) ~[nifi-kafka-processors-0.7.0.
> > jar:0.7.0]
> >         at org.apache.nifi.processors.kafka.AbstractKafkaProcessor.
> > onTrigger(AbstractKafkaProcessor.java:76) ~[nifi-kafka-processors-0.7.0.
> > jar:0.7.0]
> >         at org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> > StandardProcessorNode.java:1054) [nifi-framework-core-0.7.0.jar:0.7.0]
> >         at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
> > call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0.
> > jar:0.7.0]
> >         at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
> > call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0.
> > jar:0.7.0]
> >         at org.apache.nifi.controller.scheduling.
> > TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:127)
> > [nifi-framework-core-0.7.0.jar:0.7.0]
> >         at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
> > [na:1.8.0_45]
> >         at java.util.concurrent.FutureTask.runAndReset(
> FutureTask.java:308)
> > [na:1.8.0_45]
> >         at java.util.concurrent.ScheduledThreadPoolExecutor$
> > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> > [na:1.8.0_45]
> >         at java.util.concurrent.ScheduledThreadPoolExecutor$
> > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> > [na:1.8.0_45]
> >         at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> > [na:1.8.0_45]
> >         at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> > [na:1.8.0_45]
> >         at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
> >
> > This occurs using PublishKafka, and PutKafka.  Setting the Max Record
> Size
> > property in the PutKafka processor has no affect on this.  Note the stack
> > trace above is from the PutKafka processor with Max Record Size set to
> 10MB.
> >
> > I believe that this a regression from 0.6.0.
> >
> > Chris McDermott
> >
> > Remote Business Analytics
> > STaTS/StoreFront Remote
> > HPE Storage
> > Hewlett Packard Enterprise
> > Mobile: +1 978-697-5315
> >
> >
> >
> > On 8/20/16, 3:48 PM, "Andrew Psaltis" <ps...@gmail.com> wrote:
> >
> >     Hi Chris,
> >     Regarding the PutKafka processor looking at this block[1] of the
> > PutKafka
> >     code, it has a default size of 1 MB, but it does not restrict the
> > size. The
> >     DATA_SIZE_VALIDATOR does a sanity check and also enforces that
> >     the supported value entered is the correct format <value> [B|
> > KB|MB|GB|TB].
> >     Later on in the code at this block[2], the value is set on the Kafka
> >     config, again this does not enforce a value maximum.
> >
> >     In regards to the PublishKafka processor I do not see where it
> accepts
> > a
> >     size nor restrict the size at all.
> >
> >     Have you adjusted the 'message.max.bytes' config value for your
> > broker(s)?
> >     The default value for that is 1 MB [3] (The url references the 0.8
> > Kafka,
> >     however I believe this default has been stable since the early days
> of
> > the
> >     project.)
> >
> >     If you really do want to send messages that are larger than 1 MB in
> > size, I
> >     would highly recommending reading this post[4] from Gwen Shapira.  It
> > does
> >     a great job of outlining the things you need to take into
> > consideration.
> >     This will also point you to the relevant configs in Kafka that will
> > need to
> >     be adjusted if you decide to go this route.
> >
> >
> >     Thanks,
> >     Andrew
> >
> >     [1]
> >     https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
> > nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
> > main/java/org/apache/nifi/processors/kafka/PutKafka.java#L174-L180
> >     [2]
> >     https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
> > nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
> > main/java/org/apache/nifi/processors/kafka/PutKafka.java#L495
> >     [3] https://kafka.apache.org/08/configuration.html
> >     [4] http://ingest.tips/2015/01/21/handling-large-messages-kafka/
> >
> >     On Sat, Aug 20, 2016 at 3:25 PM, McDermott, Chris Kevin (MSDU -
> >     STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
> >
> >     > Hi folks,
> >     >
> >     >
> >     >
> >     > From experimentation and looking at the code it seems that the max
> > message
> >     > size that can be sent via the PublishKafka and PutKafka processors
> > in 0.7.0
> >     > is 1MB.  Can someone please confirm my read on this?
> >     >
> >     >
> >     >
> >     > Thanks,
> >     >
> >     >
> >     >
> >     > Chris McDermott
> >     >
> >     >
> >     >
> >     > Remote Business Analytics
> >     >
> >     > STaTS/StoreFront Remote
> >     >
> >     > HPE Storage
> >     >
> >     > Hewlett Packard Enterprise
> >     >
> >     > Mobile: +1 978-697-5315
> >     >
> >     >
> >     >
> >     >
> >
> >
> >     --
> >     Thanks,
> >     Andrew
> >
> >     Subscribe to my book: Streaming Data <http://manning.com/psaltis>
> >     <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
> >     twiiter: @itmdata <http://twitter.com/intent/
> user?screen_name=itmdata>
> >
> >
> >
>
>
> --
> Thanks,
> Andrew
>
> Subscribe to my book: Streaming Data <http://manning.com/psaltis>
> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
> twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata>
>

Re: Max Kafka message size

Posted by Andrew Psaltis <ps...@gmail.com>.
Hi Chris,
Sorry for not catching that code path. I am not sure if it is actually a
regression as I took a look at the 1.0.0-BETA code and it matches the
0.7.0, specifically this comment block:

/*
 * We're using the default value from Kafka. We are using it to control the
 * message size before it goes to to Kafka thus limiting possibility of a
 * late failures in Kafka client.
 */

found at[1] leads me to believe it was intentional and not a regression.
Looking at the 0.6.1 release code it appears that PutKafka used a default
of 5 MB [2].

I can speculate on the reasoning behind it, however, I will refrain from
opining on it as I was not involved in any of the conversations related to
the change and enforcement of the 1 MB max.

[1]
https://github.com/apache/nifi/blob/rel/nifi-1.0.0-BETA-official/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java#L36-L41
[2]
https://github.com/apache/nifi/blob/rel/nifi-0.6.1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java#L169-L176

Thanks,
Andrew

On Sat, Aug 20, 2016 at 6:09 PM, McDermott, Chris Kevin (MSDU -
STaTS/StorefrontRemote) <ch...@hpe.com> wrote:

> Thanks, Andrew.
>
> I’ve set all of the right broker configs to allow larger messages.
> Believe me I spent a lot of time banging my head against the wall thinking
> that the broker and topic configs were wrong.
>
> PublisingKafka uses PublishingContext.  That class has bean property
> called maxRequestSize, which defaults to 1048576.  As far as I can tell the
> setMaxRequestSize() method is never called (except by some test code.)
> KafkaPublisher.publish() calls Max Record Size.getMaxRequestSize() and
> passes the result to the constructor for StreamDemarcator.   The publish
> method then calls the StreamDemarcator. getNextToken(), which in turns
> calls StreamDemarcator.fill() which compares the stream position against
> the maxRequestSize and throws the exception with this line.
>
> throw new IllegalStateException("Maximum allowed data size of " +
> this.maxDataSize + " exceeded.");
>
> Which matches what I see in the nifi-app.log file…
>
> 2016-08-20 22:03:05,470 ERROR [Timer-Driven Process Thread-8]
> o.apache.nifi.processors.kafka.PutKafka
> java.lang.IllegalStateException: Maximum allowed data size of 1048576
> exceeded.
>         at org.apache.nifi.stream.io.util.StreamDemarcator.fill(StreamDemarcator.java:153)
> ~[nifi-utils-0.7.0.jar:0.7.0]
>         at org.apache.nifi.stream.io.util.StreamDemarcator.
> nextToken(StreamDemarcator.java:105) ~[nifi-utils-0.7.0.jar:0.7.0]
>         at org.apache.nifi.processors.kafka.KafkaPublisher.publish(KafkaPublisher.java:129)
> ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
>         at org.apache.nifi.processors.kafka.PutKafka$1.process(PutKafka.java:315)
> ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
>         at org.apache.nifi.controller.repository.
> StandardProcessSession.read(StandardProcessSession.java:1851)
> ~[nifi-framework-core-0.7.0.jar:0.7.0]
>         at org.apache.nifi.controller.repository.
> StandardProcessSession.read(StandardProcessSession.java:1822)
> ~[nifi-framework-core-0.7.0.jar:0.7.0]
>         at org.apache.nifi.processors.kafka.PutKafka.
> doRendezvousWithKafka(PutKafka.java:311) ~[nifi-kafka-processors-0.7.0.
> jar:0.7.0]
>         at org.apache.nifi.processors.kafka.PutKafka.
> rendezvousWithKafka(PutKafka.java:287) ~[nifi-kafka-processors-0.7.0.
> jar:0.7.0]
>         at org.apache.nifi.processors.kafka.AbstractKafkaProcessor.
> onTrigger(AbstractKafkaProcessor.java:76) ~[nifi-kafka-processors-0.7.0.
> jar:0.7.0]
>         at org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> StandardProcessorNode.java:1054) [nifi-framework-core-0.7.0.jar:0.7.0]
>         at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
> call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0.
> jar:0.7.0]
>         at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
> call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0.
> jar:0.7.0]
>         at org.apache.nifi.controller.scheduling.
> TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:127)
> [nifi-framework-core-0.7.0.jar:0.7.0]
>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [na:1.8.0_45]
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> [na:1.8.0_45]
>         at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> [na:1.8.0_45]
>         at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> [na:1.8.0_45]
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [na:1.8.0_45]
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [na:1.8.0_45]
>         at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
>
> This occurs using PublishKafka, and PutKafka.  Setting the Max Record Size
> property in the PutKafka processor has no affect on this.  Note the stack
> trace above is from the PutKafka processor with Max Record Size set to 10MB.
>
> I believe that this a regression from 0.6.0.
>
> Chris McDermott
>
> Remote Business Analytics
> STaTS/StoreFront Remote
> HPE Storage
> Hewlett Packard Enterprise
> Mobile: +1 978-697-5315
>
>
>
> On 8/20/16, 3:48 PM, "Andrew Psaltis" <ps...@gmail.com> wrote:
>
>     Hi Chris,
>     Regarding the PutKafka processor looking at this block[1] of the
> PutKafka
>     code, it has a default size of 1 MB, but it does not restrict the
> size. The
>     DATA_SIZE_VALIDATOR does a sanity check and also enforces that
>     the supported value entered is the correct format <value> [B|
> KB|MB|GB|TB].
>     Later on in the code at this block[2], the value is set on the Kafka
>     config, again this does not enforce a value maximum.
>
>     In regards to the PublishKafka processor I do not see where it accepts
> a
>     size nor restrict the size at all.
>
>     Have you adjusted the 'message.max.bytes' config value for your
> broker(s)?
>     The default value for that is 1 MB [3] (The url references the 0.8
> Kafka,
>     however I believe this default has been stable since the early days of
> the
>     project.)
>
>     If you really do want to send messages that are larger than 1 MB in
> size, I
>     would highly recommending reading this post[4] from Gwen Shapira.  It
> does
>     a great job of outlining the things you need to take into
> consideration.
>     This will also point you to the relevant configs in Kafka that will
> need to
>     be adjusted if you decide to go this route.
>
>
>     Thanks,
>     Andrew
>
>     [1]
>     https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L174-L180
>     [2]
>     https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L495
>     [3] https://kafka.apache.org/08/configuration.html
>     [4] http://ingest.tips/2015/01/21/handling-large-messages-kafka/
>
>     On Sat, Aug 20, 2016 at 3:25 PM, McDermott, Chris Kevin (MSDU -
>     STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
>
>     > Hi folks,
>     >
>     >
>     >
>     > From experimentation and looking at the code it seems that the max
> message
>     > size that can be sent via the PublishKafka and PutKafka processors
> in 0.7.0
>     > is 1MB.  Can someone please confirm my read on this?
>     >
>     >
>     >
>     > Thanks,
>     >
>     >
>     >
>     > Chris McDermott
>     >
>     >
>     >
>     > Remote Business Analytics
>     >
>     > STaTS/StoreFront Remote
>     >
>     > HPE Storage
>     >
>     > Hewlett Packard Enterprise
>     >
>     > Mobile: +1 978-697-5315
>     >
>     >
>     >
>     >
>
>
>     --
>     Thanks,
>     Andrew
>
>     Subscribe to my book: Streaming Data <http://manning.com/psaltis>
>     <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
>     twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata>
>
>
>


-- 
Thanks,
Andrew

Subscribe to my book: Streaming Data <http://manning.com/psaltis>
<https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata>

Re: Max Kafka message size

Posted by "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <ch...@hpe.com>.
Thanks, Andrew.  

I’ve set all of the right broker configs to allow larger messages.  Believe me I spent a lot of time banging my head against the wall thinking that the broker and topic configs were wrong.

PublisingKafka uses PublishingContext.  That class has bean property called maxRequestSize, which defaults to 1048576.  As far as I can tell the setMaxRequestSize() method is never called (except by some test code.)  KafkaPublisher.publish() calls Max Record Size.getMaxRequestSize() and passes the result to the constructor for StreamDemarcator.   The publish method then calls the StreamDemarcator. getNextToken(), which in turns calls StreamDemarcator.fill() which compares the stream position against the maxRequestSize and throws the exception with this line.

throw new IllegalStateException("Maximum allowed data size of " + this.maxDataSize + " exceeded.");

Which matches what I see in the nifi-app.log file…

2016-08-20 22:03:05,470 ERROR [Timer-Driven Process Thread-8] o.apache.nifi.processors.kafka.PutKafka
java.lang.IllegalStateException: Maximum allowed data size of 1048576 exceeded.
       	at org.apache.nifi.stream.io.util.StreamDemarcator.fill(StreamDemarcator.java:153) ~[nifi-utils-0.7.0.jar:0.7.0]
       	at org.apache.nifi.stream.io.util.StreamDemarcator.nextToken(StreamDemarcator.java:105) ~[nifi-utils-0.7.0.jar:0.7.0]
       	at org.apache.nifi.processors.kafka.KafkaPublisher.publish(KafkaPublisher.java:129) ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
       	at org.apache.nifi.processors.kafka.PutKafka$1.process(PutKafka.java:315) ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
       	at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1851) ~[nifi-framework-core-0.7.0.jar:0.7.0]
       	at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1822) ~[nifi-framework-core-0.7.0.jar:0.7.0]
       	at org.apache.nifi.processors.kafka.PutKafka.doRendezvousWithKafka(PutKafka.java:311) ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
       	at org.apache.nifi.processors.kafka.PutKafka.rendezvousWithKafka(PutKafka.java:287) ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
       	at org.apache.nifi.processors.kafka.AbstractKafkaProcessor.onTrigger(AbstractKafkaProcessor.java:76) ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
       	at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1054) [nifi-framework-core-0.7.0.jar:0.7.0]
       	at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0.jar:0.7.0]
       	at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0.jar:0.7.0]
       	at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:127) [nifi-framework-core-0.7.0.jar:0.7.0]
       	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_45]
       	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_45]
       	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_45]
       	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_45]
       	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45]
       	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45]
       	at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]

This occurs using PublishKafka, and PutKafka.  Setting the Max Record Size property in the PutKafka processor has no affect on this.  Note the stack trace above is from the PutKafka processor with Max Record Size set to 10MB.

I believe that this a regression from 0.6.0.

Chris McDermott
 
Remote Business Analytics
STaTS/StoreFront Remote
HPE Storage
Hewlett Packard Enterprise
Mobile: +1 978-697-5315
 


On 8/20/16, 3:48 PM, "Andrew Psaltis" <ps...@gmail.com> wrote:

    Hi Chris,
    Regarding the PutKafka processor looking at this block[1] of the PutKafka
    code, it has a default size of 1 MB, but it does not restrict the size. The
    DATA_SIZE_VALIDATOR does a sanity check and also enforces that
    the supported value entered is the correct format <value> [B| KB|MB|GB|TB].
    Later on in the code at this block[2], the value is set on the Kafka
    config, again this does not enforce a value maximum.
    
    In regards to the PublishKafka processor I do not see where it accepts a
    size nor restrict the size at all.
    
    Have you adjusted the 'message.max.bytes' config value for your broker(s)?
    The default value for that is 1 MB [3] (The url references the 0.8 Kafka,
    however I believe this default has been stable since the early days of the
    project.)
    
    If you really do want to send messages that are larger than 1 MB in size, I
    would highly recommending reading this post[4] from Gwen Shapira.  It does
    a great job of outlining the things you need to take into consideration.
    This will also point you to the relevant configs in Kafka that will need to
    be adjusted if you decide to go this route.
    
    
    Thanks,
    Andrew
    
    [1]
    https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java#L174-L180
    [2]
    https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java#L495
    [3] https://kafka.apache.org/08/configuration.html
    [4] http://ingest.tips/2015/01/21/handling-large-messages-kafka/
    
    On Sat, Aug 20, 2016 at 3:25 PM, McDermott, Chris Kevin (MSDU -
    STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
    
    > Hi folks,
    >
    >
    >
    > From experimentation and looking at the code it seems that the max message
    > size that can be sent via the PublishKafka and PutKafka processors in 0.7.0
    > is 1MB.  Can someone please confirm my read on this?
    >
    >
    >
    > Thanks,
    >
    >
    >
    > Chris McDermott
    >
    >
    >
    > Remote Business Analytics
    >
    > STaTS/StoreFront Remote
    >
    > HPE Storage
    >
    > Hewlett Packard Enterprise
    >
    > Mobile: +1 978-697-5315
    >
    >
    >
    >
    
    
    -- 
    Thanks,
    Andrew
    
    Subscribe to my book: Streaming Data <http://manning.com/psaltis>
    <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
    twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata>
    


Re: Max Kafka message size

Posted by Andrew Psaltis <ps...@gmail.com>.
Hi Chris,
Regarding the PutKafka processor looking at this block[1] of the PutKafka
code, it has a default size of 1 MB, but it does not restrict the size. The
DATA_SIZE_VALIDATOR does a sanity check and also enforces that
the supported value entered is the correct format <value> [B| KB|MB|GB|TB].
Later on in the code at this block[2], the value is set on the Kafka
config, again this does not enforce a value maximum.

In regards to the PublishKafka processor I do not see where it accepts a
size nor restrict the size at all.

Have you adjusted the 'message.max.bytes' config value for your broker(s)?
The default value for that is 1 MB [3] (The url references the 0.8 Kafka,
however I believe this default has been stable since the early days of the
project.)

If you really do want to send messages that are larger than 1 MB in size, I
would highly recommending reading this post[4] from Gwen Shapira.  It does
a great job of outlining the things you need to take into consideration.
This will also point you to the relevant configs in Kafka that will need to
be adjusted if you decide to go this route.


Thanks,
Andrew

[1]
https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java#L174-L180
[2]
https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java#L495
[3] https://kafka.apache.org/08/configuration.html
[4] http://ingest.tips/2015/01/21/handling-large-messages-kafka/

On Sat, Aug 20, 2016 at 3:25 PM, McDermott, Chris Kevin (MSDU -
STaTS/StorefrontRemote) <ch...@hpe.com> wrote:

> Hi folks,
>
>
>
> From experimentation and looking at the code it seems that the max message
> size that can be sent via the PublishKafka and PutKafka processors in 0.7.0
> is 1MB.  Can someone please confirm my read on this?
>
>
>
> Thanks,
>
>
>
> Chris McDermott
>
>
>
> Remote Business Analytics
>
> STaTS/StoreFront Remote
>
> HPE Storage
>
> Hewlett Packard Enterprise
>
> Mobile: +1 978-697-5315
>
>
>
>


-- 
Thanks,
Andrew

Subscribe to my book: Streaming Data <http://manning.com/psaltis>
<https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata>