You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Colin Williams <co...@gmail.com> on 2018/03/22 01:16:19 UTC

GG kafka topic in avro format to NiFi

I have an avro avsc file for a table with a definition like:

{"type":"record","name":"INV_ADJ","namespace":"NSP_SCH","fields":[{"name":"table","type":"string"},{"name":"op_type","type":"string"},{"name":"op_ts","type":"string"},{"name":"current_ts","type":"string"},{"name":"pos","type":"string"},{"name":"primary_keys","type":{"type":"array","items":"string"}},{"name":"tokens","type":{"type":"map","values":"string"},"default":{}},{"name":"before","type":["null",{"type":"record","name":"columns","fields":[{"name":"ITEM","type":["null","string"],"default":null},{"name":"ITEM_isMissing","type":"boolean"},{"name":"INV_STATUS","type":["null","long"],"default":null},{"name":"INV_STATUS_isMissing","type":"boolean"},{"name":"LOC_TYPE","type":["null","string"],"default":null},{"name":"LOC_TYPE_isMissing","type":"boolean"},{"name":"LOCATION","type":["null","long"],"default":null},{"name":"LOCATION_isMissing","type":"boolean"},{"name":"ADJ_QTY","type":["null","double"],"default":null},{"name":"ADJ_QTY_isMissing","type":"boolean"},{"name":"REASON","type":["null","long"],"default":null},{"name":"REASON_isMissing","type":"boolean"},{"name":"ADJ_DATE","type":["null","string"],"default":null},{"name":"ADJ_DATE_isMissing","type":"boolean"},{"name":"PREV_QTY","type":["null","double"],"default":null},{"name":"PREV_QTY_isMissing","type":"boolean"},{"name":"USER_ID","type":["null","string"],"default":null},{"name":"USER_ID_isMissing","type":"boolean"},{"name":"ADJ_WEIGHT","type":["null","double"],"default":null},{"name":"ADJ_WEIGHT_isMissing","type":"boolean"},{"name":"ADJ_WEIGHT_UOM","type":["null","string"],"default":null},{"name":"ADJ_WEIGHT_UOM_isMissing","type":"boolean"},{"name":"CREATE_ID","type":["null","string"],"default":null},{"name":"CREATE_ID_isMissing","type":"boolean"},{"name":"CREATE_DATETIME","type":["null","string"],"default":null},{"name":"CREATE_DATETIME_isMissing","type":"boolean"}]}],"default":null},{"name":"after","type":["null","columns"],"default":null}]}

I have a kafka topic which should contain avro records using the above
definition.

I've configured the avro registry, reader, and writer with the the above
definition. When I try using my nifi workflow I get exceptions like:
invalid int encoding and don't seem to process any data.

What am I doing wrong?

Re: GG kafka topic in avro format to NiFi

Posted by Joe Witt <jo...@gmail.com>.
Colin,

You would have not seen this error before from the Kafka processor you
were using as it would not be doing any deserialization.  You would
have seen that error downstream in the LookupRecord processor you had.
Just wanted to clarify that point.

That you're seeing the error now with ConsumeKafkaRecord tells us that
the schema and/or format you're configuring it to read does not align
to what is actually in Kafka.  Possible causes could be that it is
simply the wrong or incompatible schema - or - you're actually writing
the avro schema and the payload but in this case are configured to
only read the bare record/payload.  If you can pull the raw data from
Kafka in bytes and take a look at that to be sure it is what you
expect.  Or, you can use NiFi's click-to-content/provenance
capabilities to look at the raw records as it sends them to the
parse.failures relationship called out in the error.  This should make
troubleshooting really easy.

Can you describe the how and what for getting data into Kafka?  That
might make it easier to describe as well.

Thanks
joe

On Thu, Mar 22, 2018 at 12:28 PM, Colin Williams
<co...@gmail.com> wrote:
> Hi,
>
> I configured ConsumeKafkaRecord and set schema.name to the value. I think
> something had preventing me from editing it's value previously.
>
> Anyhow I have what looks like a similar error message:
>
>
> 2018-03-22 09:24:24,990 ERROR [Timer-Driven Process Thread-6]
> o.a.n.p.k.pubsub.ConsumeKafkaRecord_0_10
> ConsumeKafkaRecord_0_10[id=0162106b-f849-104c-4246-f70e5b8e320b] Failed to
> parse message from Kafka using   the configured Record Reader. Will route
> message as its own FlowFile to the 'parse.failure' relationship:
> java.io.IOException: Invalid int encoding
>  java.io.IOException: Invalid int encoding
>    at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:145)
>    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:259)
>    at
> org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
>    at
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:430)
>    at
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
>    at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
>    at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
>    at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
>    at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
>    at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
>    at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
>    at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
>    at
> org.apache.nifi.avro.AvroReaderWithExplicitSchema.nextAvroRecord(AvroReaderWithExplicitSchema.java:61)
>    at
> org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:36)
>    at
> org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50)
>    at
> org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:474)
>    at
> org.apache.nifi.processors.kafka.pubsub.ConsumerLease.lambda$processRecords$2(ConsumerLease.java:322)
>    at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1553)
>    at
> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
>    at
> org.apache.nifi.processors.kafka.pubsub.ConsumerLease.processRecords(ConsumerLease.java:309)
>    at
> org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:170)
>    at
> org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.onTrigger(ConsumeKafkaRecord_0_10.java:327)
>    at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>    at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1122)
>    at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
>    at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
>    at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
>    at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>    at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>    at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>    at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>    at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>    at java.lang.Thread.run(Thread.java:748)
>
> On Thu, Mar 22, 2018 at 4:29 AM, Bryan Bende <bb...@gmail.com> wrote:
>>
>> Hello,
>>
>> You don’t need to use a properties file. In your AvroRecordReader, just
>> change the Schema Name property from ${schema.name} to the actual name of
>> the schema you want to use from the schema registry.
>>
>> It just means that the record reader can only be used with one schema now,
>> rather than trying to dynamically determine the schema from a flow file
>> attribute, which it can’t do in this case because there is no incoming flow
>> file to ConsumeKafkaRecord.
>>
>> -Bryan
>>
>>
>> On Thu, Mar 22, 2018 at 2:41 AM Colin Williams
>> <co...@gmail.com> wrote:
>>>
>>> Hi Joe,
>>>
>>> Thanks for the suggestion. I started by using the ConsumeKafkaRecord0_10.
>>> But I had read the only way to configure the schema.name was via a
>>> properties file, which I read also required a restart of NiFi.
>>> http://apache-nifi-users-list.2361937.n4.nabble.com/Nifi-1-3-0-Problem-with-schema-name-and-ConsumeKafkaRecord-0-10-processor-td2256.html
>>> That's why I moved away from using ConsumeKafkaRecord and to the regular
>>> consumer. I didn't want to create a properties file and couldn't see how to
>>> set schema.name otherwise.
>>>
>>> Regarding the error information. I saw the error displayed at the
>>> ConsumeKafka processor, from the UI. I will look for log files, and if
>>> necessary configuring logging after configuring ConsumeKafkaRecord via
>>> properties file tomorrow.
>>>
>>> Best,
>>>
>>> Colin Williams
>>>
>>> On Wed, Mar 21, 2018 at 6:45 PM, Joe Witt <jo...@gmail.com> wrote:
>>>>
>>>> Colin,
>>>>
>>>> You're using the ConsumeKafka processors.  Given that this is avro
>>>> data for which you have a schema/etc.. I strongly recommend you use
>>>> ConsumeKafkaRecord0_10...
>>>>
>>>> In that you get to specify the record reader/writer you'll need.  You
>>>> will also see dramatically higher performance.
>>>>
>>>> Lets get you reliably reading records from kafka and then move on to
>>>> other details such as LookupRecord/etc..
>>>>
>>>> We'll need to see the actual error information you're getting I
>>>> suspect to be of much help.
>>>>
>>>> Thanks
>>>>
>>>>
>>>>
>>>> On Wed, Mar 21, 2018 at 9:33 PM, Colin Williams
>>>> <co...@gmail.com> wrote:
>>>> > Hi Joe,
>>>> >
>>>> > I don't believe the Avro schema included, and expect they are the data
>>>> > portion... I think that's why I need to use the avsc file mentioned
>>>> > above...
>>>> >
>>>> > On Wed, Mar 21, 2018 at 6:19 PM, Joe Witt <jo...@gmail.com> wrote:
>>>> >>
>>>> >> Can you share a template of your process group?
>>>> >>
>>>> >> Do the messages in Kafka have the Avro schema included in them or are
>>>> >> they just the data portion of the record?
>>>> >>
>>>> >> On Wed, Mar 21, 2018 at 9:16 PM, Colin Williams
>>>> >> <co...@gmail.com> wrote:
>>>> >> > I have an avro avsc file for a table with a definition like:
>>>> >> >
>>>> >> >
>>>> >> >
>>>> >> > {"type":"record","name":"INV_ADJ","namespace":"NSP_SCH","fields":[{"name":"table","type":"string"},{"name":"op_type","type":"string"},{"name":"op_ts","type":"string"},{"name":"current_ts","type":"string"},{"name":"pos","type":"string"},{"name":"primary_keys","type":{"type":"array","items":"string"}},{"name":"tokens","type":{"type":"map","values":"string"},"default":{}},{"name":"before","type":["null",{"type":"record","name":"columns","fields":[{"name":"ITEM","type":["null","string"],"default":null},{"name":"ITEM_isMissing","type":"boolean"},{"name":"INV_STATUS","type":["null","long"],"default":null},{"name":"INV_STATUS_isMissing","type":"boolean"},{"name":"LOC_TYPE","type":["null","string"],"default":null},{"name":"LOC_TYPE_isMissing","type":"boolean"},{"name":"LOCATION","type":["null","long"],"default":null},{"name":"LOCATION_isMissing","type":"boolean"},{"name":"ADJ_QTY","type":["null","double"],"default":null},{"name":"ADJ_QTY_isMissing","type":"boolean"},{"name":"REASON","type":["null","long"],"default":null},{"name":"REASON_isMissing","type":"boolean"},{"name":"ADJ_DATE","type":["null","string"],"default":null},{"name":"ADJ_DATE_isMissing","type":"boolean"},{"name":"PREV_QTY","type":["null","double"],"default":null},{"name":"PREV_QTY_isMissing","type":"boolean"},{"name":"USER_ID","type":["null","string"],"default":null},{"name":"USER_ID_isMissing","type":"boolean"},{"name":"ADJ_WEIGHT","type":["null","double"],"default":null},{"name":"ADJ_WEIGHT_isMissing","type":"boolean"},{"name":"ADJ_WEIGHT_UOM","type":["null","string"],"default":null},{"name":"ADJ_WEIGHT_UOM_isMissing","type":"boolean"},{"name":"CREATE_ID","type":["null","string"],"default":null},{"name":"CREATE_ID_isMissing","type":"boolean"},{"name":"CREATE_DATETIME","type":["null","string"],"default":null},{"name":"CREATE_DATETIME_isMissing","type":"boolean"}]}],"default":null},{"name":"after","type":["null","columns"],"default":null}]}
>>>> >> >
>>>> >> > I have a kafka topic which should contain avro records using the
>>>> >> > above
>>>> >> > definition.
>>>> >> >
>>>> >> > I've configured the avro registry, reader, and writer with the the
>>>> >> > above
>>>> >> > definition. When I try using my nifi workflow I get exceptions
>>>> >> > like:
>>>> >> > invalid
>>>> >> > int encoding and don't seem to process any data.
>>>> >> >
>>>> >> > What am I doing wrong?
>>>> >
>>>> >
>>>
>>>
>> --
>> Sent from Gmail Mobile
>
>

Re: GG kafka topic in avro format to NiFi

Posted by Colin Williams <co...@gmail.com>.
Hi,

I configured ConsumeKafkaRecord and set schema.name to the value. I think
something had preventing me from editing it's value previously.

Anyhow I have what looks like a similar error message:


2018-03-22 09:24:24,990 ERROR [Timer-Driven Process Thread-6]
o.a.n.p.k.pubsub.ConsumeKafkaRecord_0_10
ConsumeKafkaRecord_0_10[id=0162106b-f849-104c-4246-f70e5b8e320b] Failed to
parse message from Kafka using   the configured Record Reader. Will route
message as its own FlowFile to the 'parse.failure' relationship:
java.io.IOException: Invalid int encoding
 java.io.IOException: Invalid int encoding
   at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:145)
   at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:259)
   at
org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
   at
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:430)
   at
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
   at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
   at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
   at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
   at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
   at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
   at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
   at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
   at
org.apache.nifi.avro.AvroReaderWithExplicitSchema.nextAvroRecord(AvroReaderWithExplicitSchema.java:61)
   at
org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:36)
   at
org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50)
   at
org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:474)
   at
org.apache.nifi.processors.kafka.pubsub.ConsumerLease.lambda$processRecords$2(ConsumerLease.java:322)
   at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1553)
   at
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
   at
org.apache.nifi.processors.kafka.pubsub.ConsumerLease.processRecords(ConsumerLease.java:309)
   at
org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:170)
   at
org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.onTrigger(ConsumeKafkaRecord_0_10.java:327)
   at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
   at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1122)
   at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
   at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
   at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
   at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
   at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
   at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
   at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   at java.lang.Thread.run(Thread.java:748)

On Thu, Mar 22, 2018 at 4:29 AM, Bryan Bende <bb...@gmail.com> wrote:

> Hello,
>
> You don’t need to use a properties file. In your AvroRecordReader, just
> change the Schema Name property from ${schema.name} to the actual name of
> the schema you want to use from the schema registry.
>
> It just means that the record reader can only be used with one schema now,
> rather than trying to dynamically determine the schema from a flow file
> attribute, which it can’t do in this case because there is no incoming flow
> file to ConsumeKafkaRecord.
>
> -Bryan
>
>
> On Thu, Mar 22, 2018 at 2:41 AM Colin Williams <
> colin.williams.seattle@gmail.com> wrote:
>
>> Hi Joe,
>>
>> Thanks for the suggestion. I started by using the ConsumeKafkaRecord0_10.
>> But I had read the only way to configure the schema.name was via a
>> properties file, which I read also required a restart of NiFi.
>> http://apache-nifi-users-list.2361937.n4.nabble.com/Nifi-1-
>> 3-0-Problem-with-schema-name-and-ConsumeKafkaRecord-0-10-
>> processor-td2256.html That's why I moved away from using
>> ConsumeKafkaRecord and to the regular consumer. I didn't want to create a
>> properties file and couldn't see how to set schema.name otherwise.
>>
>> Regarding the error information. I saw the error displayed at the
>> ConsumeKafka processor, from the UI. I will look for log files, and if
>> necessary configuring logging after configuring ConsumeKafkaRecord via
>> properties file tomorrow.
>>
>> Best,
>>
>> Colin Williams
>>
>> On Wed, Mar 21, 2018 at 6:45 PM, Joe Witt <jo...@gmail.com> wrote:
>>
>>> Colin,
>>>
>>> You're using the ConsumeKafka processors.  Given that this is avro
>>> data for which you have a schema/etc.. I strongly recommend you use
>>> ConsumeKafkaRecord0_10...
>>>
>>> In that you get to specify the record reader/writer you'll need.  You
>>> will also see dramatically higher performance.
>>>
>>> Lets get you reliably reading records from kafka and then move on to
>>> other details such as LookupRecord/etc..
>>>
>>> We'll need to see the actual error information you're getting I
>>> suspect to be of much help.
>>>
>>> Thanks
>>>
>>>
>>>
>>> On Wed, Mar 21, 2018 at 9:33 PM, Colin Williams
>>> <co...@gmail.com> wrote:
>>> > Hi Joe,
>>> >
>>> > I don't believe the Avro schema included, and expect they are the data
>>> > portion... I think that's why I need to use the avsc file mentioned
>>> above...
>>> >
>>> > On Wed, Mar 21, 2018 at 6:19 PM, Joe Witt <jo...@gmail.com> wrote:
>>> >>
>>> >> Can you share a template of your process group?
>>> >>
>>> >> Do the messages in Kafka have the Avro schema included in them or are
>>> >> they just the data portion of the record?
>>> >>
>>> >> On Wed, Mar 21, 2018 at 9:16 PM, Colin Williams
>>> >> <co...@gmail.com> wrote:
>>> >> > I have an avro avsc file for a table with a definition like:
>>> >> >
>>> >> >
>>> >> > {"type":"record","name":"INV_ADJ","namespace":"NSP_SCH","
>>> fields":[{"name":"table","type":"string"},{"name":"op_
>>> type","type":"string"},{"name":"op_ts","type":"string"},{"
>>> name":"current_ts","type":"string"},{"name":"pos","type":
>>> "string"},{"name":"primary_keys","type":{"type":"array","
>>> items":"string"}},{"name":"tokens","type":{"type":"map","
>>> values":"string"},"default":{}},{"name":"before","type":["
>>> null",{"type":"record","name":"columns","fields":[{"name":"
>>> ITEM","type":["null","string"],"default":null},{"name":"
>>> ITEM_isMissing","type":"boolean"},{"name":"INV_STATUS"
>>> ,"type":["null","long"],"default":null},{"name":"INV_
>>> STATUS_isMissing","type":"boolean"},{"name":"LOC_TYPE","
>>> type":["null","string"],"default":null},{"name":"LOC_
>>> TYPE_isMissing","type":"boolean"},{"name":"LOCATION","
>>> type":["null","long"],"default":null},{"name":"
>>> LOCATION_isMissing","type":"boolean"},{"name":"ADJ_QTY","
>>> type":["null","double"],"default":null},{"name":"ADJ_
>>> QTY_isMissing","type":"boolean"},{"name":"REASON","
>>> type":["null","long"],"default":null},{"name":"
>>> REASON_isMissing","type":"boolean"},{"name":"ADJ_DATE","
>>> type":["null","string"],"default":null},{"name":"ADJ_
>>> DATE_isMissing","type":"boolean"},{"name":"PREV_QTY","
>>> type":["null","double"],"default":null},{"name":"PREV_
>>> QTY_isMissing","type":"boolean"},{"name":"USER_ID","
>>> type":["null","string"],"default":null},{"name":"USER_
>>> ID_isMissing","type":"boolean"},{"name":"ADJ_WEIGHT","type":
>>> ["null","double"],"default":null},{"name":"ADJ_WEIGHT_
>>> isMissing","type":"boolean"},{"name":"ADJ_WEIGHT_UOM","type"
>>> :["null","string"],"default":null},{"name":"ADJ_WEIGHT_UOM_
>>> isMissing","type":"boolean"},{"name":"CREATE_ID","type":["
>>> null","string"],"default":null},{"name":"CREATE_ID_
>>> isMissing","type":"boolean"},{"name":"CREATE_DATETIME","
>>> type":["null","string"],"default":null},{"name":"
>>> CREATE_DATETIME_isMissing","type":"boolean"}]}],"default":
>>> null},{"name":"after","type":["null","columns"],"default":null}]}
>>> >> >
>>> >> > I have a kafka topic which should contain avro records using the
>>> above
>>> >> > definition.
>>> >> >
>>> >> > I've configured the avro registry, reader, and writer with the the
>>> above
>>> >> > definition. When I try using my nifi workflow I get exceptions like:
>>> >> > invalid
>>> >> > int encoding and don't seem to process any data.
>>> >> >
>>> >> > What am I doing wrong?
>>> >
>>> >
>>>
>>
>> --
> Sent from Gmail Mobile
>

Re: GG kafka topic in avro format to NiFi

Posted by Bryan Bende <bb...@gmail.com>.
Hello,

You don’t need to use a properties file. In your AvroRecordReader, just
change the Schema Name property from ${schema.name} to the actual name of
the schema you want to use from the schema registry.

It just means that the record reader can only be used with one schema now,
rather than trying to dynamically determine the schema from a flow file
attribute, which it can’t do in this case because there is no incoming flow
file to ConsumeKafkaRecord.

-Bryan


On Thu, Mar 22, 2018 at 2:41 AM Colin Williams <
colin.williams.seattle@gmail.com> wrote:

> Hi Joe,
>
> Thanks for the suggestion. I started by using the ConsumeKafkaRecord0_10.
> But I had read the only way to configure the schema.name was via a
> properties file, which I read also required a restart of NiFi.
> http://apache-nifi-users-list.2361937.n4.nabble.com/Nifi-1-3-0-Problem-with-schema-name-and-ConsumeKafkaRecord-0-10-processor-td2256.html
> That's why I moved away from using ConsumeKafkaRecord and to the regular
> consumer. I didn't want to create a properties file and couldn't see how to
> set schema.name otherwise.
>
> Regarding the error information. I saw the error displayed at the
> ConsumeKafka processor, from the UI. I will look for log files, and if
> necessary configuring logging after configuring ConsumeKafkaRecord via
> properties file tomorrow.
>
> Best,
>
> Colin Williams
>
> On Wed, Mar 21, 2018 at 6:45 PM, Joe Witt <jo...@gmail.com> wrote:
>
>> Colin,
>>
>> You're using the ConsumeKafka processors.  Given that this is avro
>> data for which you have a schema/etc.. I strongly recommend you use
>> ConsumeKafkaRecord0_10...
>>
>> In that you get to specify the record reader/writer you'll need.  You
>> will also see dramatically higher performance.
>>
>> Lets get you reliably reading records from kafka and then move on to
>> other details such as LookupRecord/etc..
>>
>> We'll need to see the actual error information you're getting I
>> suspect to be of much help.
>>
>> Thanks
>>
>>
>>
>> On Wed, Mar 21, 2018 at 9:33 PM, Colin Williams
>> <co...@gmail.com> wrote:
>> > Hi Joe,
>> >
>> > I don't believe the Avro schema included, and expect they are the data
>> > portion... I think that's why I need to use the avsc file mentioned
>> above...
>> >
>> > On Wed, Mar 21, 2018 at 6:19 PM, Joe Witt <jo...@gmail.com> wrote:
>> >>
>> >> Can you share a template of your process group?
>> >>
>> >> Do the messages in Kafka have the Avro schema included in them or are
>> >> they just the data portion of the record?
>> >>
>> >> On Wed, Mar 21, 2018 at 9:16 PM, Colin Williams
>> >> <co...@gmail.com> wrote:
>> >> > I have an avro avsc file for a table with a definition like:
>> >> >
>> >> >
>> >> >
>> {"type":"record","name":"INV_ADJ","namespace":"NSP_SCH","fields":[{"name":"table","type":"string"},{"name":"op_type","type":"string"},{"name":"op_ts","type":"string"},{"name":"current_ts","type":"string"},{"name":"pos","type":"string"},{"name":"primary_keys","type":{"type":"array","items":"string"}},{"name":"tokens","type":{"type":"map","values":"string"},"default":{}},{"name":"before","type":["null",{"type":"record","name":"columns","fields":[{"name":"ITEM","type":["null","string"],"default":null},{"name":"ITEM_isMissing","type":"boolean"},{"name":"INV_STATUS","type":["null","long"],"default":null},{"name":"INV_STATUS_isMissing","type":"boolean"},{"name":"LOC_TYPE","type":["null","string"],"default":null},{"name":"LOC_TYPE_isMissing","type":"boolean"},{"name":"LOCATION","type":["null","long"],"default":null},{"name":"LOCATION_isMissing","type":"boolean"},{"name":"ADJ_QTY","type":["null","double"],"default":null},{"name":"ADJ_QTY_isMissing","type":"boolean"},{"name":"REASON","type":["null","long"],"default":null},{"name":"REASON_isMissing","type":"boolean"},{"name":"ADJ_DATE","type":["null","string"],"default":null},{"name":"ADJ_DATE_isMissing","type":"boolean"},{"name":"PREV_QTY","type":["null","double"],"default":null},{"name":"PREV_QTY_isMissing","type":"boolean"},{"name":"USER_ID","type":["null","string"],"default":null},{"name":"USER_ID_isMissing","type":"boolean"},{"name":"ADJ_WEIGHT","type":["null","double"],"default":null},{"name":"ADJ_WEIGHT_isMissing","type":"boolean"},{"name":"ADJ_WEIGHT_UOM","type":["null","string"],"default":null},{"name":"ADJ_WEIGHT_UOM_isMissing","type":"boolean"},{"name":"CREATE_ID","type":["null","string"],"default":null},{"name":"CREATE_ID_isMissing","type":"boolean"},{"name":"CREATE_DATETIME","type":["null","string"],"default":null},{"name":"CREATE_DATETIME_isMissing","type":"boolean"}]}],"default":null},{"name":"after","type":["null","columns"],"default":null}]}
>> >> >
>> >> > I have a kafka topic which should contain avro records using the
>> above
>> >> > definition.
>> >> >
>> >> > I've configured the avro registry, reader, and writer with the the
>> above
>> >> > definition. When I try using my nifi workflow I get exceptions like:
>> >> > invalid
>> >> > int encoding and don't seem to process any data.
>> >> >
>> >> > What am I doing wrong?
>> >
>> >
>>
>
> --
Sent from Gmail Mobile

Re: GG kafka topic in avro format to NiFi

Posted by Colin Williams <co...@gmail.com>.
Hi Joe,

Thanks for the suggestion. I started by using the ConsumeKafkaRecord0_10.
But I had read the only way to configure the schema.name was via a
properties file, which I read also required a restart of NiFi.
http://apache-nifi-users-list.2361937.n4.nabble.com/Nifi-1-3-0-Problem-with-schema-name-and-ConsumeKafkaRecord-0-10-processor-td2256.html
That's why I moved away from using ConsumeKafkaRecord and to the regular
consumer. I didn't want to create a properties file and couldn't see how to
set schema.name otherwise.

Regarding the error information. I saw the error displayed at the
ConsumeKafka processor, from the UI. I will look for log files, and if
necessary configuring logging after configuring ConsumeKafkaRecord via
properties file tomorrow.

Best,

Colin Williams

On Wed, Mar 21, 2018 at 6:45 PM, Joe Witt <jo...@gmail.com> wrote:

> Colin,
>
> You're using the ConsumeKafka processors.  Given that this is avro
> data for which you have a schema/etc.. I strongly recommend you use
> ConsumeKafkaRecord0_10...
>
> In that you get to specify the record reader/writer you'll need.  You
> will also see dramatically higher performance.
>
> Lets get you reliably reading records from kafka and then move on to
> other details such as LookupRecord/etc..
>
> We'll need to see the actual error information you're getting I
> suspect to be of much help.
>
> Thanks
>
>
>
> On Wed, Mar 21, 2018 at 9:33 PM, Colin Williams
> <co...@gmail.com> wrote:
> > Hi Joe,
> >
> > I don't believe the Avro schema included, and expect they are the data
> > portion... I think that's why I need to use the avsc file mentioned
> above...
> >
> > On Wed, Mar 21, 2018 at 6:19 PM, Joe Witt <jo...@gmail.com> wrote:
> >>
> >> Can you share a template of your process group?
> >>
> >> Do the messages in Kafka have the Avro schema included in them or are
> >> they just the data portion of the record?
> >>
> >> On Wed, Mar 21, 2018 at 9:16 PM, Colin Williams
> >> <co...@gmail.com> wrote:
> >> > I have an avro avsc file for a table with a definition like:
> >> >
> >> >
> >> > {"type":"record","name":"INV_ADJ","namespace":"NSP_SCH","
> fields":[{"name":"table","type":"string"},{"name":"op_
> type","type":"string"},{"name":"op_ts","type":"string"},{"
> name":"current_ts","type":"string"},{"name":"pos","type":
> "string"},{"name":"primary_keys","type":{"type":"array","
> items":"string"}},{"name":"tokens","type":{"type":"map","
> values":"string"},"default":{}},{"name":"before","type":["
> null",{"type":"record","name":"columns","fields":[{"name":"
> ITEM","type":["null","string"],"default":null},{"name":"
> ITEM_isMissing","type":"boolean"},{"name":"INV_STATUS"
> ,"type":["null","long"],"default":null},{"name":"INV_
> STATUS_isMissing","type":"boolean"},{"name":"LOC_TYPE","
> type":["null","string"],"default":null},{"name":"LOC_
> TYPE_isMissing","type":"boolean"},{"name":"LOCATION","
> type":["null","long"],"default":null},{"name":"
> LOCATION_isMissing","type":"boolean"},{"name":"ADJ_QTY","
> type":["null","double"],"default":null},{"name":"ADJ_
> QTY_isMissing","type":"boolean"},{"name":"REASON","type":["null","long"],"
> default":null},{"name":"REASON_isMissing","type":"
> boolean"},{"name":"ADJ_DATE","type":["null","string"],"
> default":null},{"name":"ADJ_DATE_isMissing","type":"
> boolean"},{"name":"PREV_QTY","type":["null","double"],"
> default":null},{"name":"PREV_QTY_isMissing","type":"
> boolean"},{"name":"USER_ID","type":["null","string"],"
> default":null},{"name":"USER_ID_isMissing","type":"boolean"
> },{"name":"ADJ_WEIGHT","type":["null","double"],"default":
> null},{"name":"ADJ_WEIGHT_isMissing","type":"boolean"},{
> "name":"ADJ_WEIGHT_UOM","type":["null","string"],"default":
> null},{"name":"ADJ_WEIGHT_UOM_isMissing","type":"boolean"},{
> "name":"CREATE_ID","type":["null","string"],"default":
> null},{"name":"CREATE_ID_isMissing","type":"boolean"},{
> "name":"CREATE_DATETIME","type":["null","string"],"
> default":null},{"name":"CREATE_DATETIME_isMissing","
> type":"boolean"}]}],"default":null},{"name":"after","type":[
> "null","columns"],"default":null}]}
> >> >
> >> > I have a kafka topic which should contain avro records using the above
> >> > definition.
> >> >
> >> > I've configured the avro registry, reader, and writer with the the
> above
> >> > definition. When I try using my nifi workflow I get exceptions like:
> >> > invalid
> >> > int encoding and don't seem to process any data.
> >> >
> >> > What am I doing wrong?
> >
> >
>

Re: GG kafka topic in avro format to NiFi

Posted by Joe Witt <jo...@gmail.com>.
Colin,

You're using the ConsumeKafka processors.  Given that this is avro
data for which you have a schema/etc.. I strongly recommend you use
ConsumeKafkaRecord0_10...

In that you get to specify the record reader/writer you'll need.  You
will also see dramatically higher performance.

Lets get you reliably reading records from kafka and then move on to
other details such as LookupRecord/etc..

We'll need to see the actual error information you're getting I
suspect to be of much help.

Thanks



On Wed, Mar 21, 2018 at 9:33 PM, Colin Williams
<co...@gmail.com> wrote:
> Hi Joe,
>
> I don't believe the Avro schema included, and expect they are the data
> portion... I think that's why I need to use the avsc file mentioned above...
>
> On Wed, Mar 21, 2018 at 6:19 PM, Joe Witt <jo...@gmail.com> wrote:
>>
>> Can you share a template of your process group?
>>
>> Do the messages in Kafka have the Avro schema included in them or are
>> they just the data portion of the record?
>>
>> On Wed, Mar 21, 2018 at 9:16 PM, Colin Williams
>> <co...@gmail.com> wrote:
>> > I have an avro avsc file for a table with a definition like:
>> >
>> >
>> > {"type":"record","name":"INV_ADJ","namespace":"NSP_SCH","fields":[{"name":"table","type":"string"},{"name":"op_type","type":"string"},{"name":"op_ts","type":"string"},{"name":"current_ts","type":"string"},{"name":"pos","type":"string"},{"name":"primary_keys","type":{"type":"array","items":"string"}},{"name":"tokens","type":{"type":"map","values":"string"},"default":{}},{"name":"before","type":["null",{"type":"record","name":"columns","fields":[{"name":"ITEM","type":["null","string"],"default":null},{"name":"ITEM_isMissing","type":"boolean"},{"name":"INV_STATUS","type":["null","long"],"default":null},{"name":"INV_STATUS_isMissing","type":"boolean"},{"name":"LOC_TYPE","type":["null","string"],"default":null},{"name":"LOC_TYPE_isMissing","type":"boolean"},{"name":"LOCATION","type":["null","long"],"default":null},{"name":"LOCATION_isMissing","type":"boolean"},{"name":"ADJ_QTY","type":["null","double"],"default":null},{"name":"ADJ_QTY_isMissing","type":"boolean"},{"name":"REASON","type":["null","long"],"default":null},{"name":"REASON_isMissing","type":"boolean"},{"name":"ADJ_DATE","type":["null","string"],"default":null},{"name":"ADJ_DATE_isMissing","type":"boolean"},{"name":"PREV_QTY","type":["null","double"],"default":null},{"name":"PREV_QTY_isMissing","type":"boolean"},{"name":"USER_ID","type":["null","string"],"default":null},{"name":"USER_ID_isMissing","type":"boolean"},{"name":"ADJ_WEIGHT","type":["null","double"],"default":null},{"name":"ADJ_WEIGHT_isMissing","type":"boolean"},{"name":"ADJ_WEIGHT_UOM","type":["null","string"],"default":null},{"name":"ADJ_WEIGHT_UOM_isMissing","type":"boolean"},{"name":"CREATE_ID","type":["null","string"],"default":null},{"name":"CREATE_ID_isMissing","type":"boolean"},{"name":"CREATE_DATETIME","type":["null","string"],"default":null},{"name":"CREATE_DATETIME_isMissing","type":"boolean"}]}],"default":null},{"name":"after","type":["null","columns"],"default":null}]}
>> >
>> > I have a kafka topic which should contain avro records using the above
>> > definition.
>> >
>> > I've configured the avro registry, reader, and writer with the the above
>> > definition. When I try using my nifi workflow I get exceptions like:
>> > invalid
>> > int encoding and don't seem to process any data.
>> >
>> > What am I doing wrong?
>
>

Re: GG kafka topic in avro format to NiFi

Posted by Colin Williams <co...@gmail.com>.
Hi Joe,

I don't believe the Avro schema included, and expect they are the data
portion... I think that's why I need to use the avsc file mentioned above...

On Wed, Mar 21, 2018 at 6:19 PM, Joe Witt <jo...@gmail.com> wrote:

> Can you share a template of your process group?
>
> Do the messages in Kafka have the Avro schema included in them or are
> they just the data portion of the record?
>
> On Wed, Mar 21, 2018 at 9:16 PM, Colin Williams
> <co...@gmail.com> wrote:
> > I have an avro avsc file for a table with a definition like:
> >
> > {"type":"record","name":"INV_ADJ","namespace":"NSP_SCH","
> fields":[{"name":"table","type":"string"},{"name":"op_
> type","type":"string"},{"name":"op_ts","type":"string"},{"
> name":"current_ts","type":"string"},{"name":"pos","type":
> "string"},{"name":"primary_keys","type":{"type":"array","
> items":"string"}},{"name":"tokens","type":{"type":"map","
> values":"string"},"default":{}},{"name":"before","type":["
> null",{"type":"record","name":"columns","fields":[{"name":"
> ITEM","type":["null","string"],"default":null},{"name":"
> ITEM_isMissing","type":"boolean"},{"name":"INV_STATUS"
> ,"type":["null","long"],"default":null},{"name":"INV_
> STATUS_isMissing","type":"boolean"},{"name":"LOC_TYPE","
> type":["null","string"],"default":null},{"name":"LOC_
> TYPE_isMissing","type":"boolean"},{"name":"LOCATION","
> type":["null","long"],"default":null},{"name":"
> LOCATION_isMissing","type":"boolean"},{"name":"ADJ_QTY","
> type":["null","double"],"default":null},{"name":"ADJ_
> QTY_isMissing","type":"boolean"},{"name":"REASON","type":["null","long"],"
> default":null},{"name":"REASON_isMissing","type":"
> boolean"},{"name":"ADJ_DATE","type":["null","string"],"
> default":null},{"name":"ADJ_DATE_isMissing","type":"
> boolean"},{"name":"PREV_QTY","type":["null","double"],"
> default":null},{"name":"PREV_QTY_isMissing","type":"
> boolean"},{"name":"USER_ID","type":["null","string"],"
> default":null},{"name":"USER_ID_isMissing","type":"boolean"
> },{"name":"ADJ_WEIGHT","type":["null","double"],"default":
> null},{"name":"ADJ_WEIGHT_isMissing","type":"boolean"},{
> "name":"ADJ_WEIGHT_UOM","type":["null","string"],"default":
> null},{"name":"ADJ_WEIGHT_UOM_isMissing","type":"boolean"},{
> "name":"CREATE_ID","type":["null","string"],"default":
> null},{"name":"CREATE_ID_isMissing","type":"boolean"},{
> "name":"CREATE_DATETIME","type":["null","string"],"
> default":null},{"name":"CREATE_DATETIME_isMissing","
> type":"boolean"}]}],"default":null},{"name":"after","type":[
> "null","columns"],"default":null}]}
> >
> > I have a kafka topic which should contain avro records using the above
> > definition.
> >
> > I've configured the avro registry, reader, and writer with the the above
> > definition. When I try using my nifi workflow I get exceptions like:
> invalid
> > int encoding and don't seem to process any data.
> >
> > What am I doing wrong?
>

Re: GG kafka topic in avro format to NiFi

Posted by Joe Witt <jo...@gmail.com>.
Can you share a template of your process group?

Do the messages in Kafka have the Avro schema included in them or are
they just the data portion of the record?

On Wed, Mar 21, 2018 at 9:16 PM, Colin Williams
<co...@gmail.com> wrote:
> I have an avro avsc file for a table with a definition like:
>
> {"type":"record","name":"INV_ADJ","namespace":"NSP_SCH","fields":[{"name":"table","type":"string"},{"name":"op_type","type":"string"},{"name":"op_ts","type":"string"},{"name":"current_ts","type":"string"},{"name":"pos","type":"string"},{"name":"primary_keys","type":{"type":"array","items":"string"}},{"name":"tokens","type":{"type":"map","values":"string"},"default":{}},{"name":"before","type":["null",{"type":"record","name":"columns","fields":[{"name":"ITEM","type":["null","string"],"default":null},{"name":"ITEM_isMissing","type":"boolean"},{"name":"INV_STATUS","type":["null","long"],"default":null},{"name":"INV_STATUS_isMissing","type":"boolean"},{"name":"LOC_TYPE","type":["null","string"],"default":null},{"name":"LOC_TYPE_isMissing","type":"boolean"},{"name":"LOCATION","type":["null","long"],"default":null},{"name":"LOCATION_isMissing","type":"boolean"},{"name":"ADJ_QTY","type":["null","double"],"default":null},{"name":"ADJ_QTY_isMissing","type":"boolean"},{"name":"REASON","type":["null","long"],"default":null},{"name":"REASON_isMissing","type":"boolean"},{"name":"ADJ_DATE","type":["null","string"],"default":null},{"name":"ADJ_DATE_isMissing","type":"boolean"},{"name":"PREV_QTY","type":["null","double"],"default":null},{"name":"PREV_QTY_isMissing","type":"boolean"},{"name":"USER_ID","type":["null","string"],"default":null},{"name":"USER_ID_isMissing","type":"boolean"},{"name":"ADJ_WEIGHT","type":["null","double"],"default":null},{"name":"ADJ_WEIGHT_isMissing","type":"boolean"},{"name":"ADJ_WEIGHT_UOM","type":["null","string"],"default":null},{"name":"ADJ_WEIGHT_UOM_isMissing","type":"boolean"},{"name":"CREATE_ID","type":["null","string"],"default":null},{"name":"CREATE_ID_isMissing","type":"boolean"},{"name":"CREATE_DATETIME","type":["null","string"],"default":null},{"name":"CREATE_DATETIME_isMissing","type":"boolean"}]}],"default":null},{"name":"after","type":["null","columns"],"default":null}]}
>
> I have a kafka topic which should contain avro records using the above
> definition.
>
> I've configured the avro registry, reader, and writer with the the above
> definition. When I try using my nifi workflow I get exceptions like: invalid
> int encoding and don't seem to process any data.
>
> What am I doing wrong?