You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by "Perko, Ralph J" <Ra...@pnnl.gov> on 2016/04/29 20:45:05 UTC

Nifi 0.6.1 PutKafka error

Hi

We are using Kafka as our messaging backbone and having been using Nifi since 0.3.x – It has been a real game changer – thanks for the work!

In 0.6.1 When we set concurrent to anything greater than 1 and partition strategy to "round robin" we are getting an invalid partition error:

2016-04-29 17:44:31,536 ERROR [Timer-Driven Process Thread-6] o.apache.nifi.processors.kafka.PutKafka PutKafka[id=67a1e471-1548-407c-bedc-a2a6212c2458] PutKafka[id=67a1e471-1548-407c-bedc-a2a6212c2458] failed to process session due to java.lang.IllegalArgumentException: Invalid partition given with record: 165 is not in the range [0...10].: java.lang.IllegalArgumentException: Invalid partition given with record: 165 is not in the range [0...10].

Setting the partition strategy to “random” seems to clear up this issue.

It also seems batching is not working as it did in previous versions.  Poking around in the code and comparing the PutKafka in 0.4.1 to 0.6.1 it appears the incoming message delimiter is now tied to outgoing batching, but from what I can tell this is not the case in 0.4.1.

PutKafka.java:438

if (context.getProperty(MESSAGE_DELIMITER).isSet()) {
    properties.setProperty("batch.size", context.getProperty(BATCH_NUM_MESSAGES).getValue());
} else {
    properties.setProperty("batch.size", "1");
}

We have many single small messages coming in and do not need the message delimiter but still want to batch.

It could very well be I am misunderstanding how this works.  Should I be using it differently?  Should I drop in the 0.4.1 kafka nar to get the batching behavior we are looking for?

Thanks,
Ralph


Re: Nifi 0.6.1 PutKafka error

Posted by Oleg Zhurakousky <oz...@hortonworks.com>.
Ralf

While we are indeed working on it at the moment, please do file an enhancement request with all the details. It’s always good for tracking purposes.

Cheers
Oleg

> On May 2, 2016, at 10:25 AM, Perko, Ralph J <Ra...@pnnl.gov> wrote:
> 
> Thanks for the update.  Is there any discussion around decoupling incoming
> message delimiters from outgoing batching?  I believe this is how it
> worked up to 0.4.x.  Should I make an enhancement request?
> 
> Thanks,
> Ralph
> 
> 
> 
> On 4/29/16, 1:02 PM, "Joe Witt" <jo...@gmail.com> wrote:
> 
>> Ralph,
>> 
>> Possibly related to https://issues.apache.org/jira/browse/NIFI-1827.
>> Clearly something to get sorted out promptly.
>> 
>> Thanks
>> Joe
>> 
>> On Fri, Apr 29, 2016 at 2:45 PM, Perko, Ralph J <Ra...@pnnl.gov>
>> wrote:
>>> Hi
>>> 
>>> We are using Kafka as our messaging backbone and having been using Nifi
>>> since 0.3.x ­ It has been a real game changer ­ thanks for the work!
>>> 
>>> In 0.6.1 When we set concurrent to anything greater than 1 and partition
>>> strategy to "round robin" we are getting an invalid partition error:
>>> 
>>> 2016-04-29 17:44:31,536 ERROR [Timer-Driven Process Thread-6]
>>> o.apache.nifi.processors.kafka.PutKafka
>>> PutKafka[id=67a1e471-1548-407c-bedc-a2a6212c2458]
>>> PutKafka[id=67a1e471-1548-407c-bedc-a2a6212c2458] failed to process
>>> session
>>> due to java.lang.IllegalArgumentException: Invalid partition given with
>>> record: 165 is not in the range [0...10].:
>>> java.lang.IllegalArgumentException: Invalid partition given with
>>> record: 165
>>> is not in the range [0...10].
>>> 
>>> Setting the partition strategy to ³random² seems to clear up this issue.
>>> 
>>> It also seems batching is not working as it did in previous versions.
>>> Poking around in the code and comparing the PutKafka in 0.4.1 to 0.6.1
>>> it
>>> appears the incoming message delimiter is now tied to outgoing
>>> batching, but
>>> from what I can tell this is not the case in 0.4.1.
>>> 
>>> PutKafka.java:438
>>> 
>>> if (context.getProperty(MESSAGE_DELIMITER).isSet()) {
>>>    properties.setProperty("batch.size",
>>> context.getProperty(BATCH_NUM_MESSAGES).getValue());
>>> } else {
>>>    properties.setProperty("batch.size", "1");
>>> }
>>> 
>>> We have many single small messages coming in and do not need the message
>>> delimiter but still want to batch.
>>> 
>>> 
>>> It could very well be I am misunderstanding how this works.  Should I be
>>> using it differently?  Should I drop in the 0.4.1 kafka nar to get the
>>> batching behavior we are looking for?
>>> 
>>> Thanks,
>>> Ralph
>>> 
> 
> 


Re: Nifi 0.6.1 PutKafka error

Posted by "Perko, Ralph J" <Ra...@pnnl.gov>.
Thanks for the update.  Is there any discussion around decoupling incoming
message delimiters from outgoing batching?  I believe this is how it
worked up to 0.4.x.  Should I make an enhancement request?

Thanks,
Ralph



On 4/29/16, 1:02 PM, "Joe Witt" <jo...@gmail.com> wrote:

>Ralph,
>
>Possibly related to https://issues.apache.org/jira/browse/NIFI-1827.
>Clearly something to get sorted out promptly.
>
>Thanks
>Joe
>
>On Fri, Apr 29, 2016 at 2:45 PM, Perko, Ralph J <Ra...@pnnl.gov>
>wrote:
>> Hi
>>
>> We are using Kafka as our messaging backbone and having been using Nifi
>> since 0.3.x ­ It has been a real game changer ­ thanks for the work!
>>
>> In 0.6.1 When we set concurrent to anything greater than 1 and partition
>> strategy to "round robin" we are getting an invalid partition error:
>>
>> 2016-04-29 17:44:31,536 ERROR [Timer-Driven Process Thread-6]
>> o.apache.nifi.processors.kafka.PutKafka
>> PutKafka[id=67a1e471-1548-407c-bedc-a2a6212c2458]
>> PutKafka[id=67a1e471-1548-407c-bedc-a2a6212c2458] failed to process
>>session
>> due to java.lang.IllegalArgumentException: Invalid partition given with
>> record: 165 is not in the range [0...10].:
>> java.lang.IllegalArgumentException: Invalid partition given with
>>record: 165
>> is not in the range [0...10].
>>
>> Setting the partition strategy to ³random² seems to clear up this issue.
>>
>> It also seems batching is not working as it did in previous versions.
>> Poking around in the code and comparing the PutKafka in 0.4.1 to 0.6.1
>>it
>> appears the incoming message delimiter is now tied to outgoing
>>batching, but
>> from what I can tell this is not the case in 0.4.1.
>>
>> PutKafka.java:438
>>
>> if (context.getProperty(MESSAGE_DELIMITER).isSet()) {
>>     properties.setProperty("batch.size",
>> context.getProperty(BATCH_NUM_MESSAGES).getValue());
>> } else {
>>     properties.setProperty("batch.size", "1");
>> }
>>
>> We have many single small messages coming in and do not need the message
>> delimiter but still want to batch.
>>
>>
>> It could very well be I am misunderstanding how this works.  Should I be
>> using it differently?  Should I drop in the 0.4.1 kafka nar to get the
>> batching behavior we are looking for?
>>
>> Thanks,
>> Ralph
>>


Re: Nifi 0.6.1 PutKafka error

Posted by Joe Witt <jo...@gmail.com>.
Ralph,

Possibly related to https://issues.apache.org/jira/browse/NIFI-1827.
Clearly something to get sorted out promptly.

Thanks
Joe

On Fri, Apr 29, 2016 at 2:45 PM, Perko, Ralph J <Ra...@pnnl.gov> wrote:
> Hi
>
> We are using Kafka as our messaging backbone and having been using Nifi
> since 0.3.x – It has been a real game changer – thanks for the work!
>
> In 0.6.1 When we set concurrent to anything greater than 1 and partition
> strategy to "round robin" we are getting an invalid partition error:
>
> 2016-04-29 17:44:31,536 ERROR [Timer-Driven Process Thread-6]
> o.apache.nifi.processors.kafka.PutKafka
> PutKafka[id=67a1e471-1548-407c-bedc-a2a6212c2458]
> PutKafka[id=67a1e471-1548-407c-bedc-a2a6212c2458] failed to process session
> due to java.lang.IllegalArgumentException: Invalid partition given with
> record: 165 is not in the range [0...10].:
> java.lang.IllegalArgumentException: Invalid partition given with record: 165
> is not in the range [0...10].
>
> Setting the partition strategy to “random” seems to clear up this issue.
>
> It also seems batching is not working as it did in previous versions.
> Poking around in the code and comparing the PutKafka in 0.4.1 to 0.6.1 it
> appears the incoming message delimiter is now tied to outgoing batching, but
> from what I can tell this is not the case in 0.4.1.
>
> PutKafka.java:438
>
> if (context.getProperty(MESSAGE_DELIMITER).isSet()) {
>     properties.setProperty("batch.size",
> context.getProperty(BATCH_NUM_MESSAGES).getValue());
> } else {
>     properties.setProperty("batch.size", "1");
> }
>
> We have many single small messages coming in and do not need the message
> delimiter but still want to batch.
>
>
> It could very well be I am misunderstanding how this works.  Should I be
> using it differently?  Should I drop in the 0.4.1 kafka nar to get the
> batching behavior we are looking for?
>
> Thanks,
> Ralph
>