You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Uwe Geercken <uw...@web.de> on 2017/06/22 19:40:21 UTC

Nifi 1.3.0 - Problem with schema.name and ConsumeKafkaRecord_0_10 processor

Hello,
 
besides my other problem with the ConsumeKafkaRecord_0_10 processor, I have another question.
 
Using the AvroSchemaRegistry 1.3.0, I can define a schema and reference it e.g. in the CSVReader controller using the 'Schema Name' property and by setting this property to ${schema.name}.

But how can I set this attribute ${schema.name}? Usually I would do so using the UpdateAttribute Processor but the ConsumeKafkaRecord_0_10 processor is the first one in the flow and it does not allow incomming connections.

Thanks for any comments or help.

Rgds,

Uwe

Re: Re: Nifi 1.3.0 - Problem with schema.name and ConsumeKafkaRecord_0_10 processor

Posted by Matt Burgess <ma...@gmail.com>.
Certainly!  Here are some references:

[1] https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#custom_properties
[2] https://cwiki.apache.org/confluence/display/NIFI/Variable+Registry
[3] https://issues.apache.org/jira/browse/NIFI-1974

Basically you create a Java properties file or file(s), point to them
in nifi.properties via the "nifi.variable.registry.properties"
property, then (re)start NiFi. You can use your properties in
Expression Language constructs as if they were attributes (note: they
are currently static for the running NiFi instance, so if you change
their values in the file(s) you must restart NiFi).

Regards,
Matt


On Thu, Jun 22, 2017 at 4:45 PM, Uwe Geercken <uw...@web.de> wrote:
> Matt,
>
> got it resolved with the help of Mark.
>
> One more question: do you have a link to the "Variable Registry" you
> mentioned?
>
> Rgds,
>
> Uwe
>
> Gesendet: Donnerstag, 22. Juni 2017 um 21:47 Uhr
> Von: "Matt Burgess" <ma...@apache.org>
> An: users@nifi.apache.org
> Betreff: Re: Nifi 1.3.0 - Problem with schema.name and
> ConsumeKafkaRecord_0_10 processor
> Uwe,
>
> Since ConsumeKafkaRecord is a "source" processor, you won't be able to
> set schema.name as a FlowFile attribute. However you could set it as a
> property in a Variable Registry file and use that. Are your schemas
> dynamic based on the topic? If not, you likely don't need to use the
> schema.name attribute, you could hard-code the name in the reader
> (${schema.name} is just the default), or even the schema itself (if
> you use the Schema Text strategy and Schema Text set to your schema).
>
> If the schemas depend on the topic, I'm not sure a single
> ConsumeKafkaRecord would work in this case. Having said that, it might
> be cool as an improvement to make a "transient attribute" available,
> exposing "topic.name" as what might look like a flow file attribute to
> the EL evaluator, and whose value would be the topic name for the
> current incoming message.
>
> Regards,
> Matt
>
>
> On Thu, Jun 22, 2017 at 3:40 PM, Uwe Geercken <uw...@web.de> wrote:
>> Hello,
>>
>> besides my other problem with the ConsumeKafkaRecord_0_10 processor, I
>> have another question.
>>
>> Using the AvroSchemaRegistry 1.3.0, I can define a schema and reference it
>> e.g. in the CSVReader controller using the 'Schema Name' property and by
>> setting this property to ${schema.name}.
>>
>> But how can I set this attribute ${schema.name}? Usually I would do so
>> using the UpdateAttribute Processor but the ConsumeKafkaRecord_0_10
>> processor is the first one in the flow and it does not allow incomming
>> connections.
>>
>> Thanks for any comments or help.
>>
>> Rgds,
>>
>> Uwe

Re: Nifi 1.3.0 - Problem with schema.name and ConsumeKafkaRecord_0_10 processor

Posted by Matt Burgess <ma...@apache.org>.
Uwe,

Since ConsumeKafkaRecord is a "source" processor, you won't be able to
set schema.name as a FlowFile attribute. However you could set it as a
property in a Variable Registry file and use that.  Are your schemas
dynamic based on the topic? If not, you likely don't need to use the
schema.name attribute, you could hard-code the name in the reader
(${schema.name} is just the default), or even the schema itself (if
you use the Schema Text strategy and Schema Text set to your schema).

If the schemas depend on the topic, I'm not sure a single
ConsumeKafkaRecord would work in this case. Having said that, it might
be cool as an improvement to make a "transient attribute" available,
exposing "topic.name" as what might look like a flow file attribute to
the EL evaluator, and whose value would be the topic name for the
current incoming message.

Regards,
Matt


On Thu, Jun 22, 2017 at 3:40 PM, Uwe Geercken <uw...@web.de> wrote:
> Hello,
>
> besides my other problem with the ConsumeKafkaRecord_0_10 processor, I have another question.
>
> Using the AvroSchemaRegistry 1.3.0, I can define a schema and reference it e.g. in the CSVReader controller using the 'Schema Name' property and by setting this property to ${schema.name}.
>
> But how can I set this attribute ${schema.name}? Usually I would do so using the UpdateAttribute Processor but the ConsumeKafkaRecord_0_10 processor is the first one in the flow and it does not allow incomming connections.
>
> Thanks for any comments or help.
>
> Rgds,
>
> Uwe