You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Uwe Geercken <uw...@web.de> on 2017/06/22 19:30:59 UTC

Nifi 1.3.0 - Problems with ConsumeKafkaRecord_0_10

Hello everyone,
 
I wanted to try the following
- get messages from a kafka topic. these are simple messages in CSV format
- use the PartitionRecord processor to get familiar with the RecordPath concept
 
I started zookeeper and kafka on localhost and added some messages to a topic using the kafka concole producer. A message looks like this:
 
ZRH,departure,LX,1000,F,2017-06-22,10:00,2017-06-22,10:05,200
 
I can retrieve this message using the kafka concole consumer.
 
To my flow I added the ConsumeKafkaRecord_0_10 and the PartitionRecord processor. I configured the ConsumeKafkaRecord_0_10 with a CSVReader controller. It uses the AvroSchemaRegistry 1.3.0 with following schema:
 
{
"type": "record",
"name": "flight_schema",
"fields": [
   { "name": "flight_station", "type": "string" },
   { "name": "flight_orientation", "type": "string" },
   { "name": "flight_carrier", "type": "string" },
   { "name": "flight_number", "type": "string" },
   { "name": "flight_number_suffix", "type": "string" },
   { "name": "flight_scheduled_date", "type": "string" },
   { "name": "flight_scheduled_time", "type": "string" },
   { "name": "flight_actual_date", "type": "string" },
   { "name": "flight_actual_time", "type": "string" },
   { "name": "flight_passengers", "type": "int" }
  ]
}
 
And then I have a CSVRecordWriter which uses the same schema. I also added a line to logback.xml to debug the ConsumeKafkaRecord_0_10 processor.
 
Now when I run the processors and add a message to the topic in kafka I get following error:
 

2017-06-22 18:32:11,228 ERROR [Timer-Driven Process Thread-7] o.a.n.p.k.pubsub.ConsumeKafkaRecord_0_10 ConsumeKafkaRecord_0_10[id=cb353b32-015c-1000-0ed2-0753cceaa542] Exception while processing data from kafka so will close the lease org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@4f137cc2 due to org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException: org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException
org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException
    at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:514)
    at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.lambda$processRecords$8(ConsumerLease.java:320)
    at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1548)
    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
    at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.processRecords(ConsumerLease.java:307)
    at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:168)
    at org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.onTrigger(ConsumeKafkaRecord_0_10.java:327)
    at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1120)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException: null
    at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:458)
    ... 18 common frames omitted
 
 
 
I have played around with the settings of the processors and controllers quite a lot, but always get this NullPointerException.
 
I then added a ConsumeKafka_0_10 1.3.0 processor to verify that I can retrieve the messages and it does work.
 
I hope that someone can point out what the problem is and help me.
 
Greetings,
 
Uwe

Re: Nifi 1.3.0 - Problems with ConsumeKafkaRecord_0_10

Posted by Mark Payne <ma...@hotmail.com>.
Uwe,

Great, I'm glad that you were able to get it resolved! I agree, throwing an NPE is never okay and is
absolutely a bug. If there is no Record present, we should provide a reasonable error message.

It may actually make sense to change the name of that property as well. Instead of "Skip Header Line"
perhaps change it to "Treat First Line as Header"

Thanks
-Mark

> On Jun 22, 2017, at 4:34 PM, Uwe Geercken <uw...@web.de> wrote:
> 
> Mark,
>  
> yes it was the skip header line. Damn it - so many hours.....
>  
> Actually - as a non native english speaker - it was a misinterpretation of what that means. I interpreted it as "there is no header line". But reading the help for this property againa and again, I see that I just didn't read it carefully enough. Anyway, a better exception message would have helped me maybe.
>  
> Thank you Mark - you made my day.
>  
> Greetings,
>  
> Uwe
>  
> 
> Gesendet: Donnerstag, 22. Juni 2017 um 22:23 Uhr
> Von: "Mark Payne" <ma...@hotmail.com>
> An: "users@nifi.apache.org" <us...@nifi.apache.org>
> Betreff: Re: Nifi 1.3.0 - Problems with ConsumeKafkaRecord_0_10
> Uwe,
> 
> We do have a JIRA to address an issue where ConsumeKafkaRecord expects that exactly one Record exists per
> Kafka message. If you retrieve a Kafka message that has no records, you'll get a NPE. Is there any chance that
> you're reading a 0-byte message from Kafka?
> 
> Also, on your CSV Reader, do you have the "Skip Header Line" property set to "true"? This is the default, and if
> this is the case, it is reading your message as the header. As a result, it is finding 0 records in the Kafka message
> because the stream ends after reading the "header." Since you don't have a header on your CSV data, you'll
> want to make sure that Skip Header Line is set to false.
> 
> Thanks
> -Mark
> 
> 
>> On Jun 22, 2017, at 3:54 PM, Matt Burgess <ma...@apache.org> wrote:
>> 
>> Uwe,
>> 
>> It looks like this error is directly related to your other question.
>> This line from the stack trace:
>> 
>> Caused by: java.lang.NullPointerException: null
>> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:458)
>> 
>> Is when it calls record.getSchema(). Not sure if the "real" error
>> message is getting lost or perhaps further down in your stack trace,
>> but it looks like an error with finding or reading the schema.
>> 
>> Regards,
>> Matt
>> 
>> 
>> On Thu, Jun 22, 2017 at 3:30 PM, Uwe Geercken <uw...@web.de> wrote:
>>> Hello everyone,
>>> 
>>> I wanted to try the following
>>> - get messages from a kafka topic. these are simple messages in CSV format
>>> - use the PartitionRecord processor to get familiar with the RecordPath concept
>>> 
>>> I started zookeeper and kafka on localhost and added some messages to a topic using the kafka concole producer. A message looks like this:
>>> 
>>> ZRH,departure,LX,1000,F,2017-06-22,10:00,2017-06-22,10:05,200
>>> 
>>> I can retrieve this message using the kafka concole consumer.
>>> 
>>> To my flow I added the ConsumeKafkaRecord_0_10 and the PartitionRecord processor. I configured the ConsumeKafkaRecord_0_10 with a CSVReader controller. It uses the AvroSchemaRegistry 1.3.0 with following schema:
>>> 
>>> {
>>> "type": "record",
>>> "name": "flight_schema",
>>> "fields": [
>>> { "name": "flight_station", "type": "string" },
>>> { "name": "flight_orientation", "type": "string" },
>>> { "name": "flight_carrier", "type": "string" },
>>> { "name": "flight_number", "type": "string" },
>>> { "name": "flight_number_suffix", "type": "string" },
>>> { "name": "flight_scheduled_date", "type": "string" },
>>> { "name": "flight_scheduled_time", "type": "string" },
>>> { "name": "flight_actual_date", "type": "string" },
>>> { "name": "flight_actual_time", "type": "string" },
>>> { "name": "flight_passengers", "type": "int" }
>>> ]
>>> }
>>> 
>>> And then I have a CSVRecordWriter which uses the same schema. I also added a line to logback.xml to debug the ConsumeKafkaRecord_0_10 processor.
>>> 
>>> Now when I run the processors and add a message to the topic in kafka I get following error:
>>> 
>>> 
>>> 2017-06-22 18:32:11,228 ERROR [Timer-Driven Process Thread-7] o.a.n.p.k.pubsub.ConsumeKafkaRecord_0_10 ConsumeKafkaRecord_0_10[id=cb353b32-015c-1000-0ed2-0753cceaa542] Exception while processing data from kafka so will close the lease org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@4f137cc2 due to org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException: org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException
>>> org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException
>>> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:514)
>>> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.lambda$processRecords$8(ConsumerLease.java:320)
>>> at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1548)
>>> at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
>>> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.processRecords(ConsumerLease.java:307)
>>> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:168)
>>> at org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.onTrigger(ConsumeKafkaRecord_0_10.java:327)
>>> at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>>> at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1120)
>>> at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
>>> at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
>>> at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)
>>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.NullPointerException: null
>>> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:458)
>>> ... 18 common frames omitted
>>> 
>>> 
>>> 
>>> I have played around with the settings of the processors and controllers quite a lot, but always get this NullPointerException.
>>> 
>>> I then added a ConsumeKafka_0_10 1.3.0 processor to verify that I can retrieve the messages and it does work.
>>> 
>>> I hope that someone can point out what the problem is and help me.
>>> 
>>> Greetings,
>>> 
>>> Uwe
>  


Aw: Re: Nifi 1.3.0 - Problems with ConsumeKafkaRecord_0_10

Posted by Uwe Geercken <uw...@web.de>.
Mark,
 
yes it was the skip header line. Damn it - so many hours.....
 
Actually - as a non native english speaker - it was a misinterpretation of what that means. I interpreted it as "there is no header line". But reading the help for this property againa and again, I see that I just didn't read it carefully enough. Anyway, a better exception message would have helped me maybe.
 
Thank you Mark - you made my day.
 
Greetings,
 
Uwe
 

Gesendet: Donnerstag, 22. Juni 2017 um 22:23 Uhr
Von: "Mark Payne" <ma...@hotmail.com>
An: "users@nifi.apache.org" <us...@nifi.apache.org>
Betreff: Re: Nifi 1.3.0 - Problems with ConsumeKafkaRecord_0_10
Uwe,

We do have a JIRA to address an issue where ConsumeKafkaRecord expects that exactly one Record exists per
Kafka message. If you retrieve a Kafka message that has no records, you'll get a NPE. Is there any chance that
you're reading a 0-byte message from Kafka?

Also, on your CSV Reader, do you have the "Skip Header Line" property set to "true"? This is the default, and if
this is the case, it is reading your message as the header. As a result, it is finding 0 records in the Kafka message
because the stream ends after reading the "header." Since you don't have a header on your CSV data, you'll
want to make sure that Skip Header Line is set to false.

Thanks
-Mark


> On Jun 22, 2017, at 3:54 PM, Matt Burgess <ma...@apache.org> wrote:
>
> Uwe,
>
> It looks like this error is directly related to your other question.
> This line from the stack trace:
>
> Caused by: java.lang.NullPointerException: null
> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:458)
>
> Is when it calls record.getSchema(). Not sure if the "real" error
> message is getting lost or perhaps further down in your stack trace,
> but it looks like an error with finding or reading the schema.
>
> Regards,
> Matt
>
>
> On Thu, Jun 22, 2017 at 3:30 PM, Uwe Geercken <uw...@web.de> wrote:
>> Hello everyone,
>>
>> I wanted to try the following
>> - get messages from a kafka topic. these are simple messages in CSV format
>> - use the PartitionRecord processor to get familiar with the RecordPath concept
>>
>> I started zookeeper and kafka on localhost and added some messages to a topic using the kafka concole producer. A message looks like this:
>>
>> ZRH,departure,LX,1000,F,2017-06-22,10:00,2017-06-22,10:05,200
>>
>> I can retrieve this message using the kafka concole consumer.
>>
>> To my flow I added the ConsumeKafkaRecord_0_10 and the PartitionRecord processor. I configured the ConsumeKafkaRecord_0_10 with a CSVReader controller. It uses the AvroSchemaRegistry 1.3.0 with following schema:
>>
>> {
>> "type": "record",
>> "name": "flight_schema",
>> "fields": [
>> { "name": "flight_station", "type": "string" },
>> { "name": "flight_orientation", "type": "string" },
>> { "name": "flight_carrier", "type": "string" },
>> { "name": "flight_number", "type": "string" },
>> { "name": "flight_number_suffix", "type": "string" },
>> { "name": "flight_scheduled_date", "type": "string" },
>> { "name": "flight_scheduled_time", "type": "string" },
>> { "name": "flight_actual_date", "type": "string" },
>> { "name": "flight_actual_time", "type": "string" },
>> { "name": "flight_passengers", "type": "int" }
>> ]
>> }
>>
>> And then I have a CSVRecordWriter which uses the same schema. I also added a line to logback.xml to debug the ConsumeKafkaRecord_0_10 processor.
>>
>> Now when I run the processors and add a message to the topic in kafka I get following error:
>>
>>
>> 2017-06-22 18:32:11,228 ERROR [Timer-Driven Process Thread-7] o.a.n.p.k.pubsub.ConsumeKafkaRecord_0_10 ConsumeKafkaRecord_0_10[id=cb353b32-015c-1000-0ed2-0753cceaa542] Exception while processing data from kafka so will close the lease org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@4f137cc2 due to org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException: org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException
>> org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException
>> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:514)
>> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.lambda$processRecords$8(ConsumerLease.java:320)
>> at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1548)
>> at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
>> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.processRecords(ConsumerLease.java:307)
>> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:168)
>> at org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.onTrigger(ConsumeKafkaRecord_0_10.java:327)
>> at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>> at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1120)
>> at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
>> at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
>> at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.NullPointerException: null
>> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:458)
>> ... 18 common frames omitted
>>
>>
>>
>> I have played around with the settings of the processors and controllers quite a lot, but always get this NullPointerException.
>>
>> I then added a ConsumeKafka_0_10 1.3.0 processor to verify that I can retrieve the messages and it does work.
>>
>> I hope that someone can point out what the problem is and help me.
>>
>> Greetings,
>>
>> Uwe
 

Re: Nifi 1.3.0 - Problems with ConsumeKafkaRecord_0_10

Posted by Mark Payne <ma...@hotmail.com>.
Uwe,

We do have a JIRA to address an issue where ConsumeKafkaRecord expects that exactly one Record exists per
Kafka message. If you retrieve a Kafka message that has no records, you'll get a NPE. Is there any chance that
you're reading a 0-byte message from Kafka?

Also, on your CSV Reader, do you have the "Skip Header Line" property set to "true"? This is the default, and if
this is the case, it is reading your message as the header. As a result, it is finding 0 records in the Kafka message
because the stream ends after reading the "header." Since you don't have a header on your CSV data, you'll
want to make sure that Skip Header Line is set to false.

Thanks
-Mark


> On Jun 22, 2017, at 3:54 PM, Matt Burgess <ma...@apache.org> wrote:
> 
> Uwe,
> 
> It looks like this error is directly related to your other question.
> This line from the stack trace:
> 
> Caused by: java.lang.NullPointerException: null
>    at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:458)
> 
> Is when it calls record.getSchema().  Not sure if the "real" error
> message is getting lost or perhaps further down in your stack trace,
> but it looks like an error with finding or reading the schema.
> 
> Regards,
> Matt
> 
> 
> On Thu, Jun 22, 2017 at 3:30 PM, Uwe Geercken <uw...@web.de> wrote:
>> Hello everyone,
>> 
>> I wanted to try the following
>> - get messages from a kafka topic. these are simple messages in CSV format
>> - use the PartitionRecord processor to get familiar with the RecordPath concept
>> 
>> I started zookeeper and kafka on localhost and added some messages to a topic using the kafka concole producer. A message looks like this:
>> 
>> ZRH,departure,LX,1000,F,2017-06-22,10:00,2017-06-22,10:05,200
>> 
>> I can retrieve this message using the kafka concole consumer.
>> 
>> To my flow I added the ConsumeKafkaRecord_0_10 and the PartitionRecord processor. I configured the ConsumeKafkaRecord_0_10 with a CSVReader controller. It uses the AvroSchemaRegistry 1.3.0 with following schema:
>> 
>> {
>> "type": "record",
>> "name": "flight_schema",
>> "fields": [
>>   { "name": "flight_station", "type": "string" },
>>   { "name": "flight_orientation", "type": "string" },
>>   { "name": "flight_carrier", "type": "string" },
>>   { "name": "flight_number", "type": "string" },
>>   { "name": "flight_number_suffix", "type": "string" },
>>   { "name": "flight_scheduled_date", "type": "string" },
>>   { "name": "flight_scheduled_time", "type": "string" },
>>   { "name": "flight_actual_date", "type": "string" },
>>   { "name": "flight_actual_time", "type": "string" },
>>   { "name": "flight_passengers", "type": "int" }
>>  ]
>> }
>> 
>> And then I have a CSVRecordWriter which uses the same schema. I also added a line to logback.xml to debug the ConsumeKafkaRecord_0_10 processor.
>> 
>> Now when I run the processors and add a message to the topic in kafka I get following error:
>> 
>> 
>> 2017-06-22 18:32:11,228 ERROR [Timer-Driven Process Thread-7] o.a.n.p.k.pubsub.ConsumeKafkaRecord_0_10 ConsumeKafkaRecord_0_10[id=cb353b32-015c-1000-0ed2-0753cceaa542] Exception while processing data from kafka so will close the lease org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@4f137cc2 due to org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException: org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException
>> org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException
>>    at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:514)
>>    at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.lambda$processRecords$8(ConsumerLease.java:320)
>>    at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1548)
>>    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
>>    at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.processRecords(ConsumerLease.java:307)
>>    at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:168)
>>    at org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.onTrigger(ConsumeKafkaRecord_0_10.java:327)
>>    at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>>    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1120)
>>    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
>>    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
>>    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)
>>    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>    at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.NullPointerException: null
>>    at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:458)
>>    ... 18 common frames omitted
>> 
>> 
>> 
>> I have played around with the settings of the processors and controllers quite a lot, but always get this NullPointerException.
>> 
>> I then added a ConsumeKafka_0_10 1.3.0 processor to verify that I can retrieve the messages and it does work.
>> 
>> I hope that someone can point out what the problem is and help me.
>> 
>> Greetings,
>> 
>> Uwe


Aw: Re: Nifi 1.3.0 - Problems with ConsumeKafkaRecord_0_10

Posted by Uwe Geercken <uw...@web.de>.
Matt,
 
maybe the schema.
 
It's not an empty Kafka message, because the ConsumeKafka_0_10 processor on the same flow works correctly and without problems. Maybe that is another hint that it could be the schema itself.
 
I already spent hald a day on it, but I will try to do some further tests.
 
Rgds,
 
Uwe
 

Gesendet: Donnerstag, 22. Juni 2017 um 22:01 Uhr
Von: "Matt Burgess" <ma...@apache.org>
An: users@nifi.apache.org
Betreff: Re: Nifi 1.3.0 - Problems with ConsumeKafkaRecord_0_10
Uwe,

Actually I spoke too soon; although it might end up being a schema
issue, the NPE looks like it is caused when the record itself is null,
which can happen in the code (there is no null check before trying to
get the schema from the record). However this might still be a schema
issue if the Reader can be instantiated but then it retrieves no
records. Or it might be a bug in the processor's handling of the
Kafka input which might be empty? That's all the guessing I'll do on
the Kafka stuff, I'll leave it to the folks that know much more about
it :)

Regards,
Matt

On Thu, Jun 22, 2017 at 3:54 PM, Matt Burgess <ma...@apache.org> wrote:
> Uwe,
>
> It looks like this error is directly related to your other question.
> This line from the stack trace:
>
> Caused by: java.lang.NullPointerException: null
> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:458)
>
> Is when it calls record.getSchema(). Not sure if the "real" error
> message is getting lost or perhaps further down in your stack trace,
> but it looks like an error with finding or reading the schema.
>
> Regards,
> Matt
>
>
> On Thu, Jun 22, 2017 at 3:30 PM, Uwe Geercken <uw...@web.de> wrote:
>> Hello everyone,
>>
>> I wanted to try the following
>> - get messages from a kafka topic. these are simple messages in CSV format
>> - use the PartitionRecord processor to get familiar with the RecordPath concept
>>
>> I started zookeeper and kafka on localhost and added some messages to a topic using the kafka concole producer. A message looks like this:
>>
>> ZRH,departure,LX,1000,F,2017-06-22,10:00,2017-06-22,10:05,200
>>
>> I can retrieve this message using the kafka concole consumer.
>>
>> To my flow I added the ConsumeKafkaRecord_0_10 and the PartitionRecord processor. I configured the ConsumeKafkaRecord_0_10 with a CSVReader controller. It uses the AvroSchemaRegistry 1.3.0 with following schema:
>>
>> {
>> "type": "record",
>> "name": "flight_schema",
>> "fields": [
>> { "name": "flight_station", "type": "string" },
>> { "name": "flight_orientation", "type": "string" },
>> { "name": "flight_carrier", "type": "string" },
>> { "name": "flight_number", "type": "string" },
>> { "name": "flight_number_suffix", "type": "string" },
>> { "name": "flight_scheduled_date", "type": "string" },
>> { "name": "flight_scheduled_time", "type": "string" },
>> { "name": "flight_actual_date", "type": "string" },
>> { "name": "flight_actual_time", "type": "string" },
>> { "name": "flight_passengers", "type": "int" }
>> ]
>> }
>>
>> And then I have a CSVRecordWriter which uses the same schema. I also added a line to logback.xml to debug the ConsumeKafkaRecord_0_10 processor.
>>
>> Now when I run the processors and add a message to the topic in kafka I get following error:
>>
>>
>> 2017-06-22 18:32:11,228 ERROR [Timer-Driven Process Thread-7] o.a.n.p.k.pubsub.ConsumeKafkaRecord_0_10 ConsumeKafkaRecord_0_10[id=cb353b32-015c-1000-0ed2-0753cceaa542] Exception while processing data from kafka so will close the lease org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@4f137cc2 due to org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException: org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException
>> org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException
>> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:514)
>> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.lambda$processRecords$8(ConsumerLease.java:320)
>> at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1548)
>> at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
>> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.processRecords(ConsumerLease.java:307)
>> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:168)
>> at org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.onTrigger(ConsumeKafkaRecord_0_10.java:327)
>> at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>> at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1120)
>> at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
>> at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
>> at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.NullPointerException: null
>> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:458)
>> ... 18 common frames omitted
>>
>>
>>
>> I have played around with the settings of the processors and controllers quite a lot, but always get this NullPointerException.
>>
>> I then added a ConsumeKafka_0_10 1.3.0 processor to verify that I can retrieve the messages and it does work.
>>
>> I hope that someone can point out what the problem is and help me.
>>
>> Greetings,
>>
>> Uwe

Aw: Re: Nifi 1.3.0 - Problems with ConsumeKafkaRecord_0_10

Posted by Uwe Geercken <uw...@web.de>.
Matt,

here are my CSVReader settings:

======
Schema Access Strategy = Use 'Schema Text' Property

Schema Registry = No value set
Schema Name = ${schema.name}

Schema Text = {"type": "record","name": "flight_schema","fields": [{ "name": "flight_station", "type": "string" },{ "name": "flight_orientation", "type": "string" },{ "name": "flight_carrier", "type": "string" },{ "name": "flight_number", "type": "string" },{ "name": "flight_number_suffix", "type": "string" },{ "name": "flight_scheduled_date", "type": "string" },{ "name": "flight_scheduled_time", "type": "string" },{ "name": "flight_actual_date", "type": "string" },{ "name": "flight_actual_time", "type": "string" },{ "name": "flight_passengers", "type": "int" }]}

Date Format = No value set

Time Format = No value set

Timestamp Format = No value set

CSV Format = Custom Format

Value Separator = ,

Skip Header Line = true

Quote Character = "

Escape Character = \

Comment Marker = No value set

Null String = No value set

Trim Fields = true
======

Maybe this helps.

Rgds,

Uwe
 

Gesendet: Donnerstag, 22. Juni 2017 um 22:01 Uhr
Von: "Matt Burgess" <ma...@apache.org>
An: users@nifi.apache.org
Betreff: Re: Nifi 1.3.0 - Problems with ConsumeKafkaRecord_0_10
Uwe,

Actually I spoke too soon; although it might end up being a schema
issue, the NPE looks like it is caused when the record itself is null,
which can happen in the code (there is no null check before trying to
get the schema from the record). However this might still be a schema
issue if the Reader can be instantiated but then it retrieves no
records. Or it might be a bug in the processor's handling of the
Kafka input which might be empty? That's all the guessing I'll do on
the Kafka stuff, I'll leave it to the folks that know much more about
it :)

Regards,
Matt

On Thu, Jun 22, 2017 at 3:54 PM, Matt Burgess <ma...@apache.org> wrote:
> Uwe,
>
> It looks like this error is directly related to your other question.
> This line from the stack trace:
>
> Caused by: java.lang.NullPointerException: null
> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:458)
>
> Is when it calls record.getSchema(). Not sure if the "real" error
> message is getting lost or perhaps further down in your stack trace,
> but it looks like an error with finding or reading the schema.
>
> Regards,
> Matt
>
>
> On Thu, Jun 22, 2017 at 3:30 PM, Uwe Geercken <uw...@web.de> wrote:
>> Hello everyone,
>>
>> I wanted to try the following
>> - get messages from a kafka topic. these are simple messages in CSV format
>> - use the PartitionRecord processor to get familiar with the RecordPath concept
>>
>> I started zookeeper and kafka on localhost and added some messages to a topic using the kafka concole producer. A message looks like this:
>>
>> ZRH,departure,LX,1000,F,2017-06-22,10:00,2017-06-22,10:05,200
>>
>> I can retrieve this message using the kafka concole consumer.
>>
>> To my flow I added the ConsumeKafkaRecord_0_10 and the PartitionRecord processor. I configured the ConsumeKafkaRecord_0_10 with a CSVReader controller. It uses the AvroSchemaRegistry 1.3.0 with following schema:
>>
>> {
>> "type": "record",
>> "name": "flight_schema",
>> "fields": [
>> { "name": "flight_station", "type": "string" },
>> { "name": "flight_orientation", "type": "string" },
>> { "name": "flight_carrier", "type": "string" },
>> { "name": "flight_number", "type": "string" },
>> { "name": "flight_number_suffix", "type": "string" },
>> { "name": "flight_scheduled_date", "type": "string" },
>> { "name": "flight_scheduled_time", "type": "string" },
>> { "name": "flight_actual_date", "type": "string" },
>> { "name": "flight_actual_time", "type": "string" },
>> { "name": "flight_passengers", "type": "int" }
>> ]
>> }
>>
>> And then I have a CSVRecordWriter which uses the same schema. I also added a line to logback.xml to debug the ConsumeKafkaRecord_0_10 processor.
>>
>> Now when I run the processors and add a message to the topic in kafka I get following error:
>>
>>
>> 2017-06-22 18:32:11,228 ERROR [Timer-Driven Process Thread-7] o.a.n.p.k.pubsub.ConsumeKafkaRecord_0_10 ConsumeKafkaRecord_0_10[id=cb353b32-015c-1000-0ed2-0753cceaa542] Exception while processing data from kafka so will close the lease org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@4f137cc2 due to org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException: org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException
>> org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException
>> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:514)
>> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.lambda$processRecords$8(ConsumerLease.java:320)
>> at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1548)
>> at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
>> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.processRecords(ConsumerLease.java:307)
>> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:168)
>> at org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.onTrigger(ConsumeKafkaRecord_0_10.java:327)
>> at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>> at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1120)
>> at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
>> at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
>> at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.NullPointerException: null
>> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:458)
>> ... 18 common frames omitted
>>
>>
>>
>> I have played around with the settings of the processors and controllers quite a lot, but always get this NullPointerException.
>>
>> I then added a ConsumeKafka_0_10 1.3.0 processor to verify that I can retrieve the messages and it does work.
>>
>> I hope that someone can point out what the problem is and help me.
>>
>> Greetings,
>>
>> Uwe

Re: Nifi 1.3.0 - Problems with ConsumeKafkaRecord_0_10

Posted by Matt Burgess <ma...@apache.org>.
Uwe,

Actually I spoke too soon; although it might end up being a schema
issue, the NPE looks like it is caused when the record itself is null,
which can happen in the code (there is no null check before trying to
get the schema from the record). However this might still be a schema
issue if the Reader can be instantiated but then it retrieves no
records.  Or it might be a bug in the processor's handling of the
Kafka input which might be empty?  That's all the guessing I'll do on
the Kafka stuff, I'll leave it to the folks that know much more about
it :)

Regards,
Matt

On Thu, Jun 22, 2017 at 3:54 PM, Matt Burgess <ma...@apache.org> wrote:
> Uwe,
>
> It looks like this error is directly related to your other question.
> This line from the stack trace:
>
> Caused by: java.lang.NullPointerException: null
>     at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:458)
>
> Is when it calls record.getSchema().  Not sure if the "real" error
> message is getting lost or perhaps further down in your stack trace,
> but it looks like an error with finding or reading the schema.
>
> Regards,
> Matt
>
>
> On Thu, Jun 22, 2017 at 3:30 PM, Uwe Geercken <uw...@web.de> wrote:
>> Hello everyone,
>>
>> I wanted to try the following
>> - get messages from a kafka topic. these are simple messages in CSV format
>> - use the PartitionRecord processor to get familiar with the RecordPath concept
>>
>> I started zookeeper and kafka on localhost and added some messages to a topic using the kafka concole producer. A message looks like this:
>>
>> ZRH,departure,LX,1000,F,2017-06-22,10:00,2017-06-22,10:05,200
>>
>> I can retrieve this message using the kafka concole consumer.
>>
>> To my flow I added the ConsumeKafkaRecord_0_10 and the PartitionRecord processor. I configured the ConsumeKafkaRecord_0_10 with a CSVReader controller. It uses the AvroSchemaRegistry 1.3.0 with following schema:
>>
>> {
>> "type": "record",
>> "name": "flight_schema",
>> "fields": [
>>    { "name": "flight_station", "type": "string" },
>>    { "name": "flight_orientation", "type": "string" },
>>    { "name": "flight_carrier", "type": "string" },
>>    { "name": "flight_number", "type": "string" },
>>    { "name": "flight_number_suffix", "type": "string" },
>>    { "name": "flight_scheduled_date", "type": "string" },
>>    { "name": "flight_scheduled_time", "type": "string" },
>>    { "name": "flight_actual_date", "type": "string" },
>>    { "name": "flight_actual_time", "type": "string" },
>>    { "name": "flight_passengers", "type": "int" }
>>   ]
>> }
>>
>> And then I have a CSVRecordWriter which uses the same schema. I also added a line to logback.xml to debug the ConsumeKafkaRecord_0_10 processor.
>>
>> Now when I run the processors and add a message to the topic in kafka I get following error:
>>
>>
>> 2017-06-22 18:32:11,228 ERROR [Timer-Driven Process Thread-7] o.a.n.p.k.pubsub.ConsumeKafkaRecord_0_10 ConsumeKafkaRecord_0_10[id=cb353b32-015c-1000-0ed2-0753cceaa542] Exception while processing data from kafka so will close the lease org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@4f137cc2 due to org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException: org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException
>> org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException
>>     at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:514)
>>     at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.lambda$processRecords$8(ConsumerLease.java:320)
>>     at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1548)
>>     at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
>>     at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.processRecords(ConsumerLease.java:307)
>>     at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:168)
>>     at org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.onTrigger(ConsumeKafkaRecord_0_10.java:327)
>>     at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>>     at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1120)
>>     at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
>>     at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
>>     at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)
>>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.NullPointerException: null
>>     at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:458)
>>     ... 18 common frames omitted
>>
>>
>>
>> I have played around with the settings of the processors and controllers quite a lot, but always get this NullPointerException.
>>
>> I then added a ConsumeKafka_0_10 1.3.0 processor to verify that I can retrieve the messages and it does work.
>>
>> I hope that someone can point out what the problem is and help me.
>>
>> Greetings,
>>
>> Uwe

Aw: Re: Nifi 1.3.0 - Problems with ConsumeKafkaRecord_0_10

Posted by Uwe Geercken <uw...@web.de>.
Matt,
 
there is nothing more in the log about the error.
 
I tried also hardcoding the schema name. And I also tried providing the schema as 'Schema Text' without the AvroSchemaRegistry. But I always get the same error.
 
Maybe the problem is somewhere else. Maybe the schema itself or linebreaks in the schema or something missing in the schema?
 
Thanks Matt,
 
Uwe
 
 

Gesendet: Donnerstag, 22. Juni 2017 um 21:54 Uhr
Von: "Matt Burgess" <ma...@apache.org>
An: users@nifi.apache.org
Betreff: Re: Nifi 1.3.0 - Problems with ConsumeKafkaRecord_0_10
Uwe,

It looks like this error is directly related to your other question.
This line from the stack trace:

Caused by: java.lang.NullPointerException: null
at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:458)

Is when it calls record.getSchema(). Not sure if the "real" error
message is getting lost or perhaps further down in your stack trace,
but it looks like an error with finding or reading the schema.

Regards,
Matt


On Thu, Jun 22, 2017 at 3:30 PM, Uwe Geercken <uw...@web.de> wrote:
> Hello everyone,
>
> I wanted to try the following
> - get messages from a kafka topic. these are simple messages in CSV format
> - use the PartitionRecord processor to get familiar with the RecordPath concept
>
> I started zookeeper and kafka on localhost and added some messages to a topic using the kafka concole producer. A message looks like this:
>
> ZRH,departure,LX,1000,F,2017-06-22,10:00,2017-06-22,10:05,200
>
> I can retrieve this message using the kafka concole consumer.
>
> To my flow I added the ConsumeKafkaRecord_0_10 and the PartitionRecord processor. I configured the ConsumeKafkaRecord_0_10 with a CSVReader controller. It uses the AvroSchemaRegistry 1.3.0 with following schema:
>
> {
> "type": "record",
> "name": "flight_schema",
> "fields": [
> { "name": "flight_station", "type": "string" },
> { "name": "flight_orientation", "type": "string" },
> { "name": "flight_carrier", "type": "string" },
> { "name": "flight_number", "type": "string" },
> { "name": "flight_number_suffix", "type": "string" },
> { "name": "flight_scheduled_date", "type": "string" },
> { "name": "flight_scheduled_time", "type": "string" },
> { "name": "flight_actual_date", "type": "string" },
> { "name": "flight_actual_time", "type": "string" },
> { "name": "flight_passengers", "type": "int" }
> ]
> }
>
> And then I have a CSVRecordWriter which uses the same schema. I also added a line to logback.xml to debug the ConsumeKafkaRecord_0_10 processor.
>
> Now when I run the processors and add a message to the topic in kafka I get following error:
>
>
> 2017-06-22 18:32:11,228 ERROR [Timer-Driven Process Thread-7] o.a.n.p.k.pubsub.ConsumeKafkaRecord_0_10 ConsumeKafkaRecord_0_10[id=cb353b32-015c-1000-0ed2-0753cceaa542] Exception while processing data from kafka so will close the lease org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@4f137cc2 due to org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException: org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException
> org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException
> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:514)
> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.lambda$processRecords$8(ConsumerLease.java:320)
> at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1548)
> at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.processRecords(ConsumerLease.java:307)
> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:168)
> at org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.onTrigger(ConsumeKafkaRecord_0_10.java:327)
> at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1120)
> at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
> at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
> at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException: null
> at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:458)
> ... 18 common frames omitted
>
>
>
> I have played around with the settings of the processors and controllers quite a lot, but always get this NullPointerException.
>
> I then added a ConsumeKafka_0_10 1.3.0 processor to verify that I can retrieve the messages and it does work.
>
> I hope that someone can point out what the problem is and help me.
>
> Greetings,
>
> Uwe

Re: Nifi 1.3.0 - Problems with ConsumeKafkaRecord_0_10

Posted by Matt Burgess <ma...@apache.org>.
Uwe,

It looks like this error is directly related to your other question.
This line from the stack trace:

Caused by: java.lang.NullPointerException: null
    at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:458)

Is when it calls record.getSchema().  Not sure if the "real" error
message is getting lost or perhaps further down in your stack trace,
but it looks like an error with finding or reading the schema.

Regards,
Matt


On Thu, Jun 22, 2017 at 3:30 PM, Uwe Geercken <uw...@web.de> wrote:
> Hello everyone,
>
> I wanted to try the following
> - get messages from a kafka topic. these are simple messages in CSV format
> - use the PartitionRecord processor to get familiar with the RecordPath concept
>
> I started zookeeper and kafka on localhost and added some messages to a topic using the kafka concole producer. A message looks like this:
>
> ZRH,departure,LX,1000,F,2017-06-22,10:00,2017-06-22,10:05,200
>
> I can retrieve this message using the kafka concole consumer.
>
> To my flow I added the ConsumeKafkaRecord_0_10 and the PartitionRecord processor. I configured the ConsumeKafkaRecord_0_10 with a CSVReader controller. It uses the AvroSchemaRegistry 1.3.0 with following schema:
>
> {
> "type": "record",
> "name": "flight_schema",
> "fields": [
>    { "name": "flight_station", "type": "string" },
>    { "name": "flight_orientation", "type": "string" },
>    { "name": "flight_carrier", "type": "string" },
>    { "name": "flight_number", "type": "string" },
>    { "name": "flight_number_suffix", "type": "string" },
>    { "name": "flight_scheduled_date", "type": "string" },
>    { "name": "flight_scheduled_time", "type": "string" },
>    { "name": "flight_actual_date", "type": "string" },
>    { "name": "flight_actual_time", "type": "string" },
>    { "name": "flight_passengers", "type": "int" }
>   ]
> }
>
> And then I have a CSVRecordWriter which uses the same schema. I also added a line to logback.xml to debug the ConsumeKafkaRecord_0_10 processor.
>
> Now when I run the processors and add a message to the topic in kafka I get following error:
>
>
> 2017-06-22 18:32:11,228 ERROR [Timer-Driven Process Thread-7] o.a.n.p.k.pubsub.ConsumeKafkaRecord_0_10 ConsumeKafkaRecord_0_10[id=cb353b32-015c-1000-0ed2-0753cceaa542] Exception while processing data from kafka so will close the lease org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@4f137cc2 due to org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException: org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException
> org.apache.nifi.processor.exception.ProcessException: java.lang.NullPointerException
>     at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:514)
>     at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.lambda$processRecords$8(ConsumerLease.java:320)
>     at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1548)
>     at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
>     at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.processRecords(ConsumerLease.java:307)
>     at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:168)
>     at org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.onTrigger(ConsumeKafkaRecord_0_10.java:327)
>     at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>     at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1120)
>     at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
>     at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
>     at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException: null
>     at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:458)
>     ... 18 common frames omitted
>
>
>
> I have played around with the settings of the processors and controllers quite a lot, but always get this NullPointerException.
>
> I then added a ConsumeKafka_0_10 1.3.0 processor to verify that I can retrieve the messages and it does work.
>
> I hope that someone can point out what the problem is and help me.
>
> Greetings,
>
> Uwe