You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by yejug <ms...@gmail.com> on 2015/10/13 11:22:40 UTC

Ingest Original data from External system by data's dependent condition

Hi

I'm new in NiFi and trying to implement simple workflow:

1) */GetKafka /*> *original encoded message* (AVRO/MSGPACK/SOME ANOTHER
FORMAT...)
2) */ConvertFromXXXToJson /*> JSON message
3) */EvaluateJsonPath /*> read some fields *from decoded Json* and put into
FlowFile attribute (in my case I simple read /timestamp / field)
4) */MergeContent/* > merge *original messages* used /timestamp / attribute
as "correlation" attribute
5) */PutHDFS /*> save *batch of original messages* into some directory on
HDFS

The problem is: Yet after step 2) I loss original messages readed from
kafka, and only parsed JSONs are available.
It is possible via standard NiFI processors\services set access original
messages at step 4), without developing custom ConvertXXXtoJson processors
(in simple case copy-pasting) and transmit input flowFiles to new custom
relation ?



--
View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/Ingest-Original-data-from-External-system-by-data-s-dependent-condition-tp3093.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.

Re: Ingest Original data from External system by data's dependent condition

Posted by Bryan Bende <bb...@gmail.com>.
FYI, in case someone wants to work it, the ticket for extracting from Avro
is: https://issues.apache.org/jira/browse/NIFI-962


On Tue, Oct 13, 2015 at 9:29 AM, Andrew Grande <ag...@hortonworks.com>
wrote:

> A typical pattern/workaround for this situation was to copy e.g. the json
> _in full_ into an attribute, leaving the payload in a binary format. But,
> as you can imagine, it's not ideal as FlowFile memory and disk pressure
> will be raised significantly and duplicate that of an existing content repo.
>
> Andrew
>
>
>
>
> On 10/13/15, 9:21 AM, "Joe Witt" <jo...@gmail.com> wrote:
>
> >Hello
> >
> >Is the only reason for converting from AVRO or whatever to JSON so
> >that you can extract attributes?
> >
> >I recommend not converting the data simply so that you can do that.  I
> >recommend building processes to extract attributes from the raw.  I
> >believe we have JIRA's targeted for the next release to do this for
> >AVRO just like JSON.  If you have other custom formats in mind i
> >recommend building 'ExtractXYZAttributes'.
> >
> >There is no mechanism in play today where we convert from format A to
> >B and then in the resulting B we keep the original A hanging around
> >that object.  You can do this of course by making archive/container
> >formats to hold both but this is also not recommended.
> >
> >Does this make sense?
> >
> >Thanks
> >Joe
> >
> >On Tue, Oct 13, 2015 at 9:06 AM, Oleg Zhurakousky
> ><oz...@hortonworks.com> wrote:
> >> Sorry, I meant to say that you have to enrich the original file with a
> correlation attribute, otherwise there is nothing to correlate on.
> >> I am not sure if NiFi has any implementation of ContentEnricher (EIP),
> perhaps UpdateAttribute will do the trick.
> >>
> >> Oleg
> >>
> >>> On Oct 13, 2015, at 8:21 AM, yejug <ms...@gmail.com> wrote:
> >>>
> >>> Hi Oleg
> >>>
> >>> THanks for response, may be I missing something (I cannot find you
> image
> >>> =)), but you suggestion doesn;t appropriate.
> >>>
> >>> There into MergeContent processor brings two types of flowFiles :
> >>> 1) one is flow file with original content (AVRO) but without populated
> >>> "correlation" attribute, directly from GetKafka
> >>> 2) and second type of flow file with parsed content (JSON) and
> populated
> >>> "correlation" attribute
> >>>
> >>>
> >>>
> >>>
> >>> --
> >>> View this message in context:
> http://apache-nifi-developer-list.39713.n7.nabble.com/Ingest-Original-data-from-External-system-by-data-s-dependent-condition-tp3093p3096.html
> >>> Sent from the Apache NiFi Developer List mailing list archive at
> Nabble.com.
> >>>
> >>
> >
>

Re: Ingest Original data from External system by data's dependent condition

Posted by yejug <ms...@gmail.com>.
Hi guys

Thanks again for attention and discussion about my problem. It seems very
common problem and availability of this functionality will simplify not one
NiFI flow =).

>From my humble opinion it would be nice for NiFi to have both approaches,
and proposed by Oleg and proposed Bryan.

I cursory glance at Bryan's solution and it remind me java's
"CountDownLatch" =). In my case it possible to solve problem with this
mechanism. but it looks like workaround =(. In common cases "Pause flow till
condition come" and "Store file temporarily" tasks from different families.
So it woulb be more clearly to have different mechanisms for this different
tasks (I think ;))

PS: Currently NiFi remind me Servlet API only with "request scope" and
Oleg's approach add something like "session scope"

Thanks



--
View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/Ingest-Original-data-from-External-system-by-data-s-dependent-condition-tp3093p3117.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.

Re: Ingest Original data from External system by data's dependent condition

Posted by Joe Witt <jo...@gmail.com>.
+1 Bryan/Oleg regarding ClaimCheck and NIFI-190.  Totally forgot about
that path but this does offer a great way to tackle this.

On Tue, Oct 13, 2015 at 11:22 AM, Bryan Bende <bb...@gmail.com> wrote:
> We do have an idea that we called HoldFile that hasn't been fully
> implemented yet, but has come up a few times:
> https://issues.apache.org/jira/browse/NIFI-190
>
> The idea was basically for a processor to "hold" a FlowFile until it was
> signaled by another processor to release it.
> Seems like this is similar to the ClaimCheck idea and could play into the
> scenarios being discussed... hold format A, convert to format B, add some
> attributes to B, then release A, transferring those attributes to A.
>
>
> On Tue, Oct 13, 2015 at 11:08 AM, Oleg Zhurakousky <
> ozhurakousky@hortonworks.com> wrote:
>
>> Great points Joe!
>>
>> One point I want to add to the discussion. . .
>>
>> As I am still learning the internals of the NiFi, the use case at the core
>> of this thread is actually a very common EIP problem and while Aggregator
>> (Merger) receiving from multiple inbound sources is one approach, it is not
>> the only one.
>> Another pattern that would probably fit better here is the ClaimCheck in
>> combination with MessageStore.
>> The way it would work is like this:
>> - Original FlowFile (Message) is stored in MessageStore with the given key
>> (ClaimCheck) which becomes an attribute to be passed downstream
>> - Somewhere downstream whenever you ready for aggregation, use the
>> ClaimCheck to access MessageStore to get the original Message to perform
>> aggregation or whatever else.
>>
>> The general benefit is that accessing the original message may be required
>> not only for aggregation but for any variety of use cases. Having
>> ClaimCheck will give access to the original message to anyone who has it.
>>
>> So, I wan to use this as an opportunity to ask a wider NiFi group (since I
>> am still learning it myself) if such pattern is supported? I know there is
>> a ContentRepository so I am assuming it would’t be that difficult
>>
>> Cheers
>> Oleg
>>
>> > On Oct 13, 2015, at 10:56 AM, Joe Witt <jo...@gmail.com> wrote:
>> >
>> > Lot of details passing by here but...
>> >
>> > Given formats A,B...Z coming in the following capabilities are
>> > generally desired:
>> > 1) Extract attributes of each event
>> > 2) Make routing decisions on each event based on those extracted
>> attributes
>> > 3) Deliver raw/unmodified data to some endpoint (like HDFS)
>> > 4) Convert/Transform data to some normalized format (and possibly schema
>> too).
>> > 5) Deliver converted data to some endpoint.
>> >
>> > Steps #1 and #4 involve (naturally) custom work for formats that are
>> > not something we can readily support out of the box such as XML, JSON,
>> > AVRO, etc...  Even the workaround suggested really only works for the
>> > case where you know the original format well enough and we can support
>> > it which means we'd like not have needed the workaround anyway.  So,
>> > the issue remains that custom work is required for #1 and #4 cases...
>> > Now, if you have packed formats that you think we could support please
>> > let us know and we can see about some mechanism of dealing with those
>> > formats generically - would be a power user tool of course but
>> > avoiding custom work is great when achievable with the right user
>> > experience/capability mix.
>> >
>> > Thanks
>> > Joe
>> >
>> > On Tue, Oct 13, 2015 at 10:06 AM, yejug <ms...@gmail.com> wrote:
>> >> Ok,
>> >>
>> >> Thank you guys for assistance.
>> >>
>> >> Looks like Joe's suggestion more appropriate for me, but there is one
>> BUT,
>> >> in case 'ExtractXYZAttributes' we must implement implicit parsing of
>> encoded
>> >> message and cannot reuse this logic, e.g. if we will want do actual
>> XXX ->
>> >> Json (for example json =)) convertion in future.
>> >>
>> >> With 99,9% in my case, except AVRO there will be more inputs (as minimum
>> >> msgpack and some custom binary formats), which must be parsed as well as
>> >> stored in the original input format
>> >>
>> >> So I think, except ConvertXXXToJson + Andrew's workaround there no more
>> >> alternatives for me now
>> >>
>> >> Thanks again
>> >>
>> >>
>> >>
>> >> --
>> >> View this message in context:
>> http://apache-nifi-developer-list.39713.n7.nabble.com/Ingest-Original-data-from-External-system-by-data-s-dependent-condition-tp3093p3101.html
>> >> Sent from the Apache NiFi Developer List mailing list archive at
>> Nabble.com.
>> >
>>
>>

Re: Ingest Original data from External system by data's dependent condition

Posted by Bryan Bende <bb...@gmail.com>.
We do have an idea that we called HoldFile that hasn't been fully
implemented yet, but has come up a few times:
https://issues.apache.org/jira/browse/NIFI-190

The idea was basically for a processor to "hold" a FlowFile until it was
signaled by another processor to release it.
Seems like this is similar to the ClaimCheck idea and could play into the
scenarios being discussed... hold format A, convert to format B, add some
attributes to B, then release A, transferring those attributes to A.


On Tue, Oct 13, 2015 at 11:08 AM, Oleg Zhurakousky <
ozhurakousky@hortonworks.com> wrote:

> Great points Joe!
>
> One point I want to add to the discussion. . .
>
> As I am still learning the internals of the NiFi, the use case at the core
> of this thread is actually a very common EIP problem and while Aggregator
> (Merger) receiving from multiple inbound sources is one approach, it is not
> the only one.
> Another pattern that would probably fit better here is the ClaimCheck in
> combination with MessageStore.
> The way it would work is like this:
> - Original FlowFile (Message) is stored in MessageStore with the given key
> (ClaimCheck) which becomes an attribute to be passed downstream
> - Somewhere downstream whenever you ready for aggregation, use the
> ClaimCheck to access MessageStore to get the original Message to perform
> aggregation or whatever else.
>
> The general benefit is that accessing the original message may be required
> not only for aggregation but for any variety of use cases. Having
> ClaimCheck will give access to the original message to anyone who has it.
>
> So, I wan to use this as an opportunity to ask a wider NiFi group (since I
> am still learning it myself) if such pattern is supported? I know there is
> a ContentRepository so I am assuming it would’t be that difficult
>
> Cheers
> Oleg
>
> > On Oct 13, 2015, at 10:56 AM, Joe Witt <jo...@gmail.com> wrote:
> >
> > Lot of details passing by here but...
> >
> > Given formats A,B...Z coming in the following capabilities are
> > generally desired:
> > 1) Extract attributes of each event
> > 2) Make routing decisions on each event based on those extracted
> attributes
> > 3) Deliver raw/unmodified data to some endpoint (like HDFS)
> > 4) Convert/Transform data to some normalized format (and possibly schema
> too).
> > 5) Deliver converted data to some endpoint.
> >
> > Steps #1 and #4 involve (naturally) custom work for formats that are
> > not something we can readily support out of the box such as XML, JSON,
> > AVRO, etc...  Even the workaround suggested really only works for the
> > case where you know the original format well enough and we can support
> > it which means we'd like not have needed the workaround anyway.  So,
> > the issue remains that custom work is required for #1 and #4 cases...
> > Now, if you have packed formats that you think we could support please
> > let us know and we can see about some mechanism of dealing with those
> > formats generically - would be a power user tool of course but
> > avoiding custom work is great when achievable with the right user
> > experience/capability mix.
> >
> > Thanks
> > Joe
> >
> > On Tue, Oct 13, 2015 at 10:06 AM, yejug <ms...@gmail.com> wrote:
> >> Ok,
> >>
> >> Thank you guys for assistance.
> >>
> >> Looks like Joe's suggestion more appropriate for me, but there is one
> BUT,
> >> in case 'ExtractXYZAttributes' we must implement implicit parsing of
> encoded
> >> message and cannot reuse this logic, e.g. if we will want do actual
> XXX ->
> >> Json (for example json =)) convertion in future.
> >>
> >> With 99,9% in my case, except AVRO there will be more inputs (as minimum
> >> msgpack and some custom binary formats), which must be parsed as well as
> >> stored in the original input format
> >>
> >> So I think, except ConvertXXXToJson + Andrew's workaround there no more
> >> alternatives for me now
> >>
> >> Thanks again
> >>
> >>
> >>
> >> --
> >> View this message in context:
> http://apache-nifi-developer-list.39713.n7.nabble.com/Ingest-Original-data-from-External-system-by-data-s-dependent-condition-tp3093p3101.html
> >> Sent from the Apache NiFi Developer List mailing list archive at
> Nabble.com.
> >
>
>

Re: Ingest Original data from External system by data's dependent condition

Posted by Oleg Zhurakousky <oz...@hortonworks.com>.
Great points Joe!

One point I want to add to the discussion. . .

As I am still learning the internals of the NiFi, the use case at the core of this thread is actually a very common EIP problem and while Aggregator (Merger) receiving from multiple inbound sources is one approach, it is not the only one.
Another pattern that would probably fit better here is the ClaimCheck in combination with MessageStore.
The way it would work is like this:
- Original FlowFile (Message) is stored in MessageStore with the given key (ClaimCheck) which becomes an attribute to be passed downstream
- Somewhere downstream whenever you ready for aggregation, use the ClaimCheck to access MessageStore to get the original Message to perform aggregation or whatever else.

The general benefit is that accessing the original message may be required not only for aggregation but for any variety of use cases. Having ClaimCheck will give access to the original message to anyone who has it.

So, I wan to use this as an opportunity to ask a wider NiFi group (since I am still learning it myself) if such pattern is supported? I know there is a ContentRepository so I am assuming it would’t be that difficult

Cheers
Oleg

> On Oct 13, 2015, at 10:56 AM, Joe Witt <jo...@gmail.com> wrote:
> 
> Lot of details passing by here but...
> 
> Given formats A,B...Z coming in the following capabilities are
> generally desired:
> 1) Extract attributes of each event
> 2) Make routing decisions on each event based on those extracted attributes
> 3) Deliver raw/unmodified data to some endpoint (like HDFS)
> 4) Convert/Transform data to some normalized format (and possibly schema too).
> 5) Deliver converted data to some endpoint.
> 
> Steps #1 and #4 involve (naturally) custom work for formats that are
> not something we can readily support out of the box such as XML, JSON,
> AVRO, etc...  Even the workaround suggested really only works for the
> case where you know the original format well enough and we can support
> it which means we'd like not have needed the workaround anyway.  So,
> the issue remains that custom work is required for #1 and #4 cases...
> Now, if you have packed formats that you think we could support please
> let us know and we can see about some mechanism of dealing with those
> formats generically - would be a power user tool of course but
> avoiding custom work is great when achievable with the right user
> experience/capability mix.
> 
> Thanks
> Joe
> 
> On Tue, Oct 13, 2015 at 10:06 AM, yejug <ms...@gmail.com> wrote:
>> Ok,
>> 
>> Thank you guys for assistance.
>> 
>> Looks like Joe's suggestion more appropriate for me, but there is one BUT,
>> in case 'ExtractXYZAttributes' we must implement implicit parsing of encoded
>> message and cannot reuse this logic, e.g. if we will want do actual  XXX ->
>> Json (for example json =)) convertion in future.
>> 
>> With 99,9% in my case, except AVRO there will be more inputs (as minimum
>> msgpack and some custom binary formats), which must be parsed as well as
>> stored in the original input format
>> 
>> So I think, except ConvertXXXToJson + Andrew's workaround there no more
>> alternatives for me now
>> 
>> Thanks again
>> 
>> 
>> 
>> --
>> View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/Ingest-Original-data-from-External-system-by-data-s-dependent-condition-tp3093p3101.html
>> Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.
> 


Re: Ingest Original data from External system by data's dependent condition

Posted by Joe Witt <jo...@gmail.com>.
Lot of details passing by here but...

Given formats A,B...Z coming in the following capabilities are
generally desired:
1) Extract attributes of each event
2) Make routing decisions on each event based on those extracted attributes
3) Deliver raw/unmodified data to some endpoint (like HDFS)
4) Convert/Transform data to some normalized format (and possibly schema too).
5) Deliver converted data to some endpoint.

Steps #1 and #4 involve (naturally) custom work for formats that are
not something we can readily support out of the box such as XML, JSON,
AVRO, etc...  Even the workaround suggested really only works for the
case where you know the original format well enough and we can support
it which means we'd like not have needed the workaround anyway.  So,
the issue remains that custom work is required for #1 and #4 cases...
Now, if you have packed formats that you think we could support please
let us know and we can see about some mechanism of dealing with those
formats generically - would be a power user tool of course but
avoiding custom work is great when achievable with the right user
experience/capability mix.

Thanks
Joe

On Tue, Oct 13, 2015 at 10:06 AM, yejug <ms...@gmail.com> wrote:
> Ok,
>
> Thank you guys for assistance.
>
> Looks like Joe's suggestion more appropriate for me, but there is one BUT,
> in case 'ExtractXYZAttributes' we must implement implicit parsing of encoded
> message and cannot reuse this logic, e.g. if we will want do actual  XXX ->
> Json (for example json =)) convertion in future.
>
> With 99,9% in my case, except AVRO there will be more inputs (as minimum
> msgpack and some custom binary formats), which must be parsed as well as
> stored in the original input format
>
> So I think, except ConvertXXXToJson + Andrew's workaround there no more
> alternatives for me now
>
> Thanks again
>
>
>
> --
> View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/Ingest-Original-data-from-External-system-by-data-s-dependent-condition-tp3093p3101.html
> Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.

Re: Ingest Original data from External system by data's dependent condition

Posted by yejug <ms...@gmail.com>.
Ok,

Thank you guys for assistance.

Looks like Joe's suggestion more appropriate for me, but there is one BUT,
in case 'ExtractXYZAttributes' we must implement implicit parsing of encoded
message and cannot reuse this logic, e.g. if we will want do actual  XXX ->
Json (for example json =)) convertion in future.

With 99,9% in my case, except AVRO there will be more inputs (as minimum
msgpack and some custom binary formats), which must be parsed as well as
stored in the original input format

So I think, except ConvertXXXToJson + Andrew's workaround there no more
alternatives for me now

Thanks again



--
View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/Ingest-Original-data-from-External-system-by-data-s-dependent-condition-tp3093p3101.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.

Re: Ingest Original data from External system by data's dependent condition

Posted by Andrew Grande <ag...@hortonworks.com>.
A typical pattern/workaround for this situation was to copy e.g. the json _in full_ into an attribute, leaving the payload in a binary format. But, as you can imagine, it's not ideal as FlowFile memory and disk pressure will be raised significantly and duplicate that of an existing content repo.

Andrew




On 10/13/15, 9:21 AM, "Joe Witt" <jo...@gmail.com> wrote:

>Hello
>
>Is the only reason for converting from AVRO or whatever to JSON so
>that you can extract attributes?
>
>I recommend not converting the data simply so that you can do that.  I
>recommend building processes to extract attributes from the raw.  I
>believe we have JIRA's targeted for the next release to do this for
>AVRO just like JSON.  If you have other custom formats in mind i
>recommend building 'ExtractXYZAttributes'.
>
>There is no mechanism in play today where we convert from format A to
>B and then in the resulting B we keep the original A hanging around
>that object.  You can do this of course by making archive/container
>formats to hold both but this is also not recommended.
>
>Does this make sense?
>
>Thanks
>Joe
>
>On Tue, Oct 13, 2015 at 9:06 AM, Oleg Zhurakousky
><oz...@hortonworks.com> wrote:
>> Sorry, I meant to say that you have to enrich the original file with a correlation attribute, otherwise there is nothing to correlate on.
>> I am not sure if NiFi has any implementation of ContentEnricher (EIP), perhaps UpdateAttribute will do the trick.
>>
>> Oleg
>>
>>> On Oct 13, 2015, at 8:21 AM, yejug <ms...@gmail.com> wrote:
>>>
>>> Hi Oleg
>>>
>>> THanks for response, may be I missing something (I cannot find you image
>>> =)), but you suggestion doesn;t appropriate.
>>>
>>> There into MergeContent processor brings two types of flowFiles :
>>> 1) one is flow file with original content (AVRO) but without populated
>>> "correlation" attribute, directly from GetKafka
>>> 2) and second type of flow file with parsed content (JSON) and populated
>>> "correlation" attribute
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/Ingest-Original-data-from-External-system-by-data-s-dependent-condition-tp3093p3096.html
>>> Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.
>>>
>>
>

Re: Ingest Original data from External system by data's dependent condition

Posted by Joe Witt <jo...@gmail.com>.
Hello

Is the only reason for converting from AVRO or whatever to JSON so
that you can extract attributes?

I recommend not converting the data simply so that you can do that.  I
recommend building processes to extract attributes from the raw.  I
believe we have JIRA's targeted for the next release to do this for
AVRO just like JSON.  If you have other custom formats in mind i
recommend building 'ExtractXYZAttributes'.

There is no mechanism in play today where we convert from format A to
B and then in the resulting B we keep the original A hanging around
that object.  You can do this of course by making archive/container
formats to hold both but this is also not recommended.

Does this make sense?

Thanks
Joe

On Tue, Oct 13, 2015 at 9:06 AM, Oleg Zhurakousky
<oz...@hortonworks.com> wrote:
> Sorry, I meant to say that you have to enrich the original file with a correlation attribute, otherwise there is nothing to correlate on.
> I am not sure if NiFi has any implementation of ContentEnricher (EIP), perhaps UpdateAttribute will do the trick.
>
> Oleg
>
>> On Oct 13, 2015, at 8:21 AM, yejug <ms...@gmail.com> wrote:
>>
>> Hi Oleg
>>
>> THanks for response, may be I missing something (I cannot find you image
>> =)), but you suggestion doesn;t appropriate.
>>
>> There into MergeContent processor brings two types of flowFiles :
>> 1) one is flow file with original content (AVRO) but without populated
>> "correlation" attribute, directly from GetKafka
>> 2) and second type of flow file with parsed content (JSON) and populated
>> "correlation" attribute
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/Ingest-Original-data-from-External-system-by-data-s-dependent-condition-tp3093p3096.html
>> Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.
>>
>

Re: Ingest Original data from External system by data's dependent condition

Posted by Oleg Zhurakousky <oz...@hortonworks.com>.
Sorry, I meant to say that you have to enrich the original file with a correlation attribute, otherwise there is nothing to correlate on.
I am not sure if NiFi has any implementation of ContentEnricher (EIP), perhaps UpdateAttribute will do the trick.

Oleg
 
> On Oct 13, 2015, at 8:21 AM, yejug <ms...@gmail.com> wrote:
> 
> Hi Oleg
> 
> THanks for response, may be I missing something (I cannot find you image
> =)), but you suggestion doesn;t appropriate.
> 
> There into MergeContent processor brings two types of flowFiles : 
> 1) one is flow file with original content (AVRO) but without populated
> "correlation" attribute, directly from GetKafka
> 2) and second type of flow file with parsed content (JSON) and populated
> "correlation" attribute
> 
> 
> 
> 
> --
> View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/Ingest-Original-data-from-External-system-by-data-s-dependent-condition-tp3093p3096.html
> Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.
> 


Re: Ingest Original data from External system by data's dependent condition

Posted by yejug <ms...@gmail.com>.
Hi Oleg

THanks for response, may be I missing something (I cannot find you image
=)), but you suggestion doesn;t appropriate.

There into MergeContent processor brings two types of flowFiles : 
1) one is flow file with original content (AVRO) but without populated
"correlation" attribute, directly from GetKafka
2) and second type of flow file with parsed content (JSON) and populated
"correlation" attribute




--
View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/Ingest-Original-data-from-External-system-by-data-s-dependent-condition-tp3093p3096.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.

Re: Ingest Original data from External system by data's dependent condition

Posted by Oleg Zhurakousky <oz...@hortonworks.com>.
You can simply have another relationship from GetKafka to MergeContent essentially allowing MergeContent to have the original message so it could be merged.
See attached image for example:

Oleg

[cid:1F6F4F5A-85C9-4809-AECD-645E4F5054D3]




On Oct 13, 2015, at 5:22 AM, yejug <ms...@gmail.com>> wrote:

Hi

I'm new in NiFi and trying to implement simple workflow:

1) */GetKafka /*> *original encoded message* (AVRO/MSGPACK/SOME ANOTHER
FORMAT...)
2) */ConvertFromXXXToJson /*> JSON message
3) */EvaluateJsonPath /*> read some fields *from decoded Json* and put into
FlowFile attribute (in my case I simple read /timestamp / field)
4) */MergeContent/* > merge *original messages* used /timestamp / attribute
as "correlation" attribute
5) */PutHDFS /*> save *batch of original messages* into some directory on
HDFS

The problem is: Yet after step 2) I loss original messages readed from
kafka, and only parsed JSONs are available.
It is possible via standard NiFI processors\services set access original
messages at step 4), without developing custom ConvertXXXtoJson processors
(in simple case copy-pasting) and transmit input flowFiles to new custom
relation ?



--
View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/Ingest-Original-data-from-External-system-by-data-s-dependent-condition-tp3093.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com<http://Nabble.com>.