You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by Anshuman Ghosh <an...@gmail.com> on 2017/05/11 13:49:43 UTC

Partitioning from actual Data (FlowFile) in NiFi

Hello everyone,

It would be great if you can help me implementing this use-case

   - Is there any way (NiFi processor) to use *an attribute (field/ column)
   value for partitioning* when writing the final FlowFile to HDFS/ other
   storage.
   - Earlier we were using simple system date (
   */year=${now():format('yyyy')}/month=${now():format('MM')}/day=${now():format('dd')}/*)
   for this but that doesn't make sense when we consume old data from Kafka
   and want to partition on original date (a date field inside Kafka message)


Thank you!
​
______________________

*Kind Regards,*
*Anshuman Ghosh*
*Contact - +49 179 9090964*

Re: Partitioning from actual Data (FlowFile) in NiFi

Posted by Anshuman Ghosh <an...@gmail.com>.
Thank you so much for the reply!

We have one more question regarding the whole flow

   - This entire flow described above has been encapsulated inside a
   template for export and import purpose in different environment.
   - Let's say one version of a Flow is running presently in PROD.
   - After making some minor changes, a new version of the flow has been
   imported into PROD.
   - Before starting the new version of the flow, we need to stop the old
   version; *otherwise there will be a problem with Kafka consumption from
   the same topic*.
   - So if we stop the old version of the flow and then start the new
   version of the flow it will be fine!
   - However there will be some FlowFiles still in the queue to be
   processed (like for Merge Content). How to completely process them? How to
   automate flush/ process of all FlowFiles before actually stop and mark it
   as an old version?
   - Find the rough Flow outline below as well

*Flow outline*
1. Consume from Kafka
2. Evaluate JSON Path
3. Update attribute to get year, month, day from a Timestamp
4. Convert JSON to Avro
5. Merged content based on the date attribute
6. Write to GCS
7. Partitions refresh


Than
​king
 you
​ in advance​
!
​
______________________

*Kind Regards,*
*Anshuman Ghosh*
*Contact - +49 179 9090964*


On Mon, May 15, 2017 at 1:53 PM, Andrew Grande <ap...@gmail.com> wrote:

> Yes to compress. The output of the merge step is a larger piece of data, no
> more/older than configured by the merge step. It can produce partial
> smaller buckets if it were configured with max age attribute.
>
> Andrew
>
> On Mon, May 15, 2017, 5:28 AM Anshuman Ghosh <anshuman.ghosh2009@gmail.com
> >
> wrote:
>
> > Thank you so much Bryan :-)
> > It is working fine now as the following workflow
> >
> > *Consume from Kafka ==> *
> > *Evaluate JSON path (Timestamp) ==> *
> > *Update Attribute to get year, month and day; since we receive a 19 digit
> > long Timestamp value , we had to use the following trick
> >
> > (**${Click.RequestTimestamp:toString():substring(0,13):
> toNumber():format("yyyy",
> > "GMT")}**) ==> Convert JSON to Avro ==> *
> > *Merge Content on similar Attribute (Timestamp - Date) ==> *
> > *Write merged FlowFile onto Google Cloud Storage (GCS) buckets*
> >
> > ​Let me know whether it can be further improvised.
> > Also will it be okay to use a "*CompressContent*" processor right after
> > merge step?​
> >
> >
> > Than
> > ​king you in advance!​
> >
> > ​
> > ______________________
> >
> > *Kind Regards,*
> > *Anshuman Ghosh*
> > *Contact - +49 179 9090964*
> >
> >
> >
> > On Thu, May 11, 2017 at 4:44 PM, Joe Witt <jo...@gmail.com> wrote:
> >
> > > Cool.  Bryan offers a good approach now.  And this JIRA captures a
> > > really powerful way to do it going forward
> > > https://issues.apache.org/jira/browse/NIFI-3866
> > >
> > > Thanks
> > > Joe
> > >
> > > On Thu, May 11, 2017 at 10:41 AM, Bryan Bende <bb...@gmail.com>
> wrote:
> > > > If your data is JSON, then you could extract the date field from the
> > > > JSON before you convert to Avro by using EvaluateJsonPath.
> > > >
> > > > From there lets say you have an attribute called "time" with the unix
> > > > timestamp, you could use an UpdateAttribute processor to create
> > > > attributes for each part of the timestamp:
> > > >
> > > > time.year = ${time:format("yyyy", "GMT")}
> > > > time.month = ${time:format("MM", "GMT")}
> > > > time.day = ${time:format("dd", "GMT")}
> > > >
> > > > Then in PutHDFS you can do something similar to what you were already
> > > doing:
> > > >
> > > > /year=${time.year}/month=${time.month}/day=${time.day}/
> > > >
> > > > As Joe mentioned there is a bunch of new record reader/writer related
> > > > capabilities in 1.2.0, and there is a follow JIRA to add a "record
> > > > path" which would allow you to extract a value (like your date field)
> > > > from any data format.
> > > >
> > > > On Thu, May 11, 2017 at 10:04 AM, Anshuman Ghosh
> > > > <an...@gmail.com> wrote:
> > > >> Hello Joe,
> > > >>
> > > >> Regret for the inconvenience, I would keep that in mind going
> forward!
> > > >>
> > > >> Thank you for your suggestion :-)
> > > >> We have recently built NiFi from the master branch, so it should be
> > > similar
> > > >> to 1.2.0
> > > >> We receive data in JSON format and then convert to Avro before
> writing
> > > to
> > > >> HDFS.
> > > >> The date filed here is an Unix timestamp of 19 digit (bigint)
> > > >>
> > > >> It would be really great if you can help a bit on how we can achieve
> > the
> > > >> same with Avro here.
> > > >> Thanking you in advance!
> > > >>
> > > >>
> > > >> ______________________
> > > >>
> > > >> *Kind Regards,*
> > > >> *Anshuman Ghosh*
> > > >> *Contact - +49 179 9090964*
> > > >>
> > > >>
> > > >> On Thu, May 11, 2017 at 3:53 PM, Joe Witt <jo...@gmail.com>
> wrote:
> > > >>
> > > >>> Anshuman
> > > >>>
> > > >>> Hello.  Please avoid directly addressing specific developers and
> > > >>> instead just address the mailing list you need (dev or user).
> > > >>>
> > > >>> If your data is CSV, for example, you can use RouteText to
> > efficiently
> > > >>> partition the incoming sets by matching field/column values and in
> so
> > > >>> doing you'll now have the flowfile attribute you need for that
> group.
> > > >>> Then you can merge those together with MergeContent for like
> > > >>> attributes and when writing to HDFS you can use that value.
> > > >>>
> > > >>> With the next record reader/writer capabilities in Apache NiFI
> 1.2.0
> > > >>> we can now provide a record oriented PartitionRecord processor
> which
> > > >>> will then also let you easily do this pattern on all kinds of
> > > >>> formats/schemas in a nice/clean way.
> > > >>>
> > > >>> Joe
> > > >>>
> > > >>> On Thu, May 11, 2017 at 9:49 AM, Anshuman Ghosh
> > > >>> <an...@gmail.com> wrote:
> > > >>> > Hello everyone,
> > > >>> >
> > > >>> > It would be great if you can help me implementing this use-case
> > > >>> >
> > > >>> > Is there any way (NiFi processor) to use an attribute (field/
> > column)
> > > >>> value
> > > >>> > for partitioning when writing the final FlowFile to HDFS/ other
> > > storage.
> > > >>> > Earlier we were using simple system date
> > > >>> > (/year=${now():format('yyyy')}/month=${now():format('MM')}/
> > > >>> day=${now():format('dd')}/)
> > > >>> > for this but that doesn't make sense when we consume old data
> from
> > > Kafka
> > > >>> and
> > > >>> > want to partition on original date (a date field inside Kafka
> > > message)
> > > >>> >
> > > >>> >
> > > >>> > Thank you!
> > > >>> > ______________________
> > > >>> >
> > > >>> > Kind Regards,
> > > >>> > Anshuman Ghosh
> > > >>> > Contact - +49 179 9090964
> > > >>> >
> > > >>>
> > >
> >
>

Re: Partitioning from actual Data (FlowFile) in NiFi

Posted by Andrew Grande <ap...@gmail.com>.
Yes to compress. The output of the merge step is a larger piece of data, no
more/older than configured by the merge step. It can produce partial
smaller buckets if it were configured with max age attribute.

Andrew

On Mon, May 15, 2017, 5:28 AM Anshuman Ghosh <an...@gmail.com>
wrote:

> Thank you so much Bryan :-)
> It is working fine now as the following workflow
>
> *Consume from Kafka ==> *
> *Evaluate JSON path (Timestamp) ==> *
> *Update Attribute to get year, month and day; since we receive a 19 digit
> long Timestamp value , we had to use the following trick
>
> (**${Click.RequestTimestamp:toString():substring(0,13):toNumber():format("yyyy",
> "GMT")}**) ==> Convert JSON to Avro ==> *
> *Merge Content on similar Attribute (Timestamp - Date) ==> *
> *Write merged FlowFile onto Google Cloud Storage (GCS) buckets*
>
> ​Let me know whether it can be further improvised.
> Also will it be okay to use a "*CompressContent*" processor right after
> merge step?​
>
>
> Than
> ​king you in advance!​
>
> ​
> ______________________
>
> *Kind Regards,*
> *Anshuman Ghosh*
> *Contact - +49 179 9090964*
>
>
>
> On Thu, May 11, 2017 at 4:44 PM, Joe Witt <jo...@gmail.com> wrote:
>
> > Cool.  Bryan offers a good approach now.  And this JIRA captures a
> > really powerful way to do it going forward
> > https://issues.apache.org/jira/browse/NIFI-3866
> >
> > Thanks
> > Joe
> >
> > On Thu, May 11, 2017 at 10:41 AM, Bryan Bende <bb...@gmail.com> wrote:
> > > If your data is JSON, then you could extract the date field from the
> > > JSON before you convert to Avro by using EvaluateJsonPath.
> > >
> > > From there lets say you have an attribute called "time" with the unix
> > > timestamp, you could use an UpdateAttribute processor to create
> > > attributes for each part of the timestamp:
> > >
> > > time.year = ${time:format("yyyy", "GMT")}
> > > time.month = ${time:format("MM", "GMT")}
> > > time.day = ${time:format("dd", "GMT")}
> > >
> > > Then in PutHDFS you can do something similar to what you were already
> > doing:
> > >
> > > /year=${time.year}/month=${time.month}/day=${time.day}/
> > >
> > > As Joe mentioned there is a bunch of new record reader/writer related
> > > capabilities in 1.2.0, and there is a follow JIRA to add a "record
> > > path" which would allow you to extract a value (like your date field)
> > > from any data format.
> > >
> > > On Thu, May 11, 2017 at 10:04 AM, Anshuman Ghosh
> > > <an...@gmail.com> wrote:
> > >> Hello Joe,
> > >>
> > >> Regret for the inconvenience, I would keep that in mind going forward!
> > >>
> > >> Thank you for your suggestion :-)
> > >> We have recently built NiFi from the master branch, so it should be
> > similar
> > >> to 1.2.0
> > >> We receive data in JSON format and then convert to Avro before writing
> > to
> > >> HDFS.
> > >> The date filed here is an Unix timestamp of 19 digit (bigint)
> > >>
> > >> It would be really great if you can help a bit on how we can achieve
> the
> > >> same with Avro here.
> > >> Thanking you in advance!
> > >>
> > >>
> > >> ______________________
> > >>
> > >> *Kind Regards,*
> > >> *Anshuman Ghosh*
> > >> *Contact - +49 179 9090964*
> > >>
> > >>
> > >> On Thu, May 11, 2017 at 3:53 PM, Joe Witt <jo...@gmail.com> wrote:
> > >>
> > >>> Anshuman
> > >>>
> > >>> Hello.  Please avoid directly addressing specific developers and
> > >>> instead just address the mailing list you need (dev or user).
> > >>>
> > >>> If your data is CSV, for example, you can use RouteText to
> efficiently
> > >>> partition the incoming sets by matching field/column values and in so
> > >>> doing you'll now have the flowfile attribute you need for that group.
> > >>> Then you can merge those together with MergeContent for like
> > >>> attributes and when writing to HDFS you can use that value.
> > >>>
> > >>> With the next record reader/writer capabilities in Apache NiFI 1.2.0
> > >>> we can now provide a record oriented PartitionRecord processor which
> > >>> will then also let you easily do this pattern on all kinds of
> > >>> formats/schemas in a nice/clean way.
> > >>>
> > >>> Joe
> > >>>
> > >>> On Thu, May 11, 2017 at 9:49 AM, Anshuman Ghosh
> > >>> <an...@gmail.com> wrote:
> > >>> > Hello everyone,
> > >>> >
> > >>> > It would be great if you can help me implementing this use-case
> > >>> >
> > >>> > Is there any way (NiFi processor) to use an attribute (field/
> column)
> > >>> value
> > >>> > for partitioning when writing the final FlowFile to HDFS/ other
> > storage.
> > >>> > Earlier we were using simple system date
> > >>> > (/year=${now():format('yyyy')}/month=${now():format('MM')}/
> > >>> day=${now():format('dd')}/)
> > >>> > for this but that doesn't make sense when we consume old data from
> > Kafka
> > >>> and
> > >>> > want to partition on original date (a date field inside Kafka
> > message)
> > >>> >
> > >>> >
> > >>> > Thank you!
> > >>> > ______________________
> > >>> >
> > >>> > Kind Regards,
> > >>> > Anshuman Ghosh
> > >>> > Contact - +49 179 9090964
> > >>> >
> > >>>
> >
>

Re: Partitioning from actual Data (FlowFile) in NiFi

Posted by Anshuman Ghosh <an...@gmail.com>.
Thank you so much Bryan :-)
It is working fine now as the following workflow

*Consume from Kafka ==> *
*Evaluate JSON path (Timestamp) ==> *
*Update Attribute to get year, month and day; since we receive a 19 digit
long Timestamp value , we had to use the following trick
(**${Click.RequestTimestamp:toString():substring(0,13):toNumber():format("yyyy",
"GMT")}**) ==> Convert JSON to Avro ==> *
*Merge Content on similar Attribute (Timestamp - Date) ==> *
*Write merged FlowFile onto Google Cloud Storage (GCS) buckets*

​Let me know whether it can be further improvised.
Also will it be okay to use a "*CompressContent*" processor right after
merge step?​


Than
​king you in advance!​

​
______________________

*Kind Regards,*
*Anshuman Ghosh*
*Contact - +49 179 9090964*



On Thu, May 11, 2017 at 4:44 PM, Joe Witt <jo...@gmail.com> wrote:

> Cool.  Bryan offers a good approach now.  And this JIRA captures a
> really powerful way to do it going forward
> https://issues.apache.org/jira/browse/NIFI-3866
>
> Thanks
> Joe
>
> On Thu, May 11, 2017 at 10:41 AM, Bryan Bende <bb...@gmail.com> wrote:
> > If your data is JSON, then you could extract the date field from the
> > JSON before you convert to Avro by using EvaluateJsonPath.
> >
> > From there lets say you have an attribute called "time" with the unix
> > timestamp, you could use an UpdateAttribute processor to create
> > attributes for each part of the timestamp:
> >
> > time.year = ${time:format("yyyy", "GMT")}
> > time.month = ${time:format("MM", "GMT")}
> > time.day = ${time:format("dd", "GMT")}
> >
> > Then in PutHDFS you can do something similar to what you were already
> doing:
> >
> > /year=${time.year}/month=${time.month}/day=${time.day}/
> >
> > As Joe mentioned there is a bunch of new record reader/writer related
> > capabilities in 1.2.0, and there is a follow JIRA to add a "record
> > path" which would allow you to extract a value (like your date field)
> > from any data format.
> >
> > On Thu, May 11, 2017 at 10:04 AM, Anshuman Ghosh
> > <an...@gmail.com> wrote:
> >> Hello Joe,
> >>
> >> Regret for the inconvenience, I would keep that in mind going forward!
> >>
> >> Thank you for your suggestion :-)
> >> We have recently built NiFi from the master branch, so it should be
> similar
> >> to 1.2.0
> >> We receive data in JSON format and then convert to Avro before writing
> to
> >> HDFS.
> >> The date filed here is an Unix timestamp of 19 digit (bigint)
> >>
> >> It would be really great if you can help a bit on how we can achieve the
> >> same with Avro here.
> >> Thanking you in advance!
> >>
> >>
> >> ______________________
> >>
> >> *Kind Regards,*
> >> *Anshuman Ghosh*
> >> *Contact - +49 179 9090964*
> >>
> >>
> >> On Thu, May 11, 2017 at 3:53 PM, Joe Witt <jo...@gmail.com> wrote:
> >>
> >>> Anshuman
> >>>
> >>> Hello.  Please avoid directly addressing specific developers and
> >>> instead just address the mailing list you need (dev or user).
> >>>
> >>> If your data is CSV, for example, you can use RouteText to efficiently
> >>> partition the incoming sets by matching field/column values and in so
> >>> doing you'll now have the flowfile attribute you need for that group.
> >>> Then you can merge those together with MergeContent for like
> >>> attributes and when writing to HDFS you can use that value.
> >>>
> >>> With the next record reader/writer capabilities in Apache NiFI 1.2.0
> >>> we can now provide a record oriented PartitionRecord processor which
> >>> will then also let you easily do this pattern on all kinds of
> >>> formats/schemas in a nice/clean way.
> >>>
> >>> Joe
> >>>
> >>> On Thu, May 11, 2017 at 9:49 AM, Anshuman Ghosh
> >>> <an...@gmail.com> wrote:
> >>> > Hello everyone,
> >>> >
> >>> > It would be great if you can help me implementing this use-case
> >>> >
> >>> > Is there any way (NiFi processor) to use an attribute (field/ column)
> >>> value
> >>> > for partitioning when writing the final FlowFile to HDFS/ other
> storage.
> >>> > Earlier we were using simple system date
> >>> > (/year=${now():format('yyyy')}/month=${now():format('MM')}/
> >>> day=${now():format('dd')}/)
> >>> > for this but that doesn't make sense when we consume old data from
> Kafka
> >>> and
> >>> > want to partition on original date (a date field inside Kafka
> message)
> >>> >
> >>> >
> >>> > Thank you!
> >>> > ______________________
> >>> >
> >>> > Kind Regards,
> >>> > Anshuman Ghosh
> >>> > Contact - +49 179 9090964
> >>> >
> >>>
>

Re: Partitioning from actual Data (FlowFile) in NiFi

Posted by Joe Witt <jo...@gmail.com>.
Cool.  Bryan offers a good approach now.  And this JIRA captures a
really powerful way to do it going forward
https://issues.apache.org/jira/browse/NIFI-3866

Thanks
Joe

On Thu, May 11, 2017 at 10:41 AM, Bryan Bende <bb...@gmail.com> wrote:
> If your data is JSON, then you could extract the date field from the
> JSON before you convert to Avro by using EvaluateJsonPath.
>
> From there lets say you have an attribute called "time" with the unix
> timestamp, you could use an UpdateAttribute processor to create
> attributes for each part of the timestamp:
>
> time.year = ${time:format("yyyy", "GMT")}
> time.month = ${time:format("MM", "GMT")}
> time.day = ${time:format("dd", "GMT")}
>
> Then in PutHDFS you can do something similar to what you were already doing:
>
> /year=${time.year}/month=${time.month}/day=${time.day}/
>
> As Joe mentioned there is a bunch of new record reader/writer related
> capabilities in 1.2.0, and there is a follow JIRA to add a "record
> path" which would allow you to extract a value (like your date field)
> from any data format.
>
> On Thu, May 11, 2017 at 10:04 AM, Anshuman Ghosh
> <an...@gmail.com> wrote:
>> Hello Joe,
>>
>> Regret for the inconvenience, I would keep that in mind going forward!
>>
>> Thank you for your suggestion :-)
>> We have recently built NiFi from the master branch, so it should be similar
>> to 1.2.0
>> We receive data in JSON format and then convert to Avro before writing to
>> HDFS.
>> The date filed here is an Unix timestamp of 19 digit (bigint)
>>
>> It would be really great if you can help a bit on how we can achieve the
>> same with Avro here.
>> Thanking you in advance!
>>
>>
>> ______________________
>>
>> *Kind Regards,*
>> *Anshuman Ghosh*
>> *Contact - +49 179 9090964*
>>
>>
>> On Thu, May 11, 2017 at 3:53 PM, Joe Witt <jo...@gmail.com> wrote:
>>
>>> Anshuman
>>>
>>> Hello.  Please avoid directly addressing specific developers and
>>> instead just address the mailing list you need (dev or user).
>>>
>>> If your data is CSV, for example, you can use RouteText to efficiently
>>> partition the incoming sets by matching field/column values and in so
>>> doing you'll now have the flowfile attribute you need for that group.
>>> Then you can merge those together with MergeContent for like
>>> attributes and when writing to HDFS you can use that value.
>>>
>>> With the next record reader/writer capabilities in Apache NiFI 1.2.0
>>> we can now provide a record oriented PartitionRecord processor which
>>> will then also let you easily do this pattern on all kinds of
>>> formats/schemas in a nice/clean way.
>>>
>>> Joe
>>>
>>> On Thu, May 11, 2017 at 9:49 AM, Anshuman Ghosh
>>> <an...@gmail.com> wrote:
>>> > Hello everyone,
>>> >
>>> > It would be great if you can help me implementing this use-case
>>> >
>>> > Is there any way (NiFi processor) to use an attribute (field/ column)
>>> value
>>> > for partitioning when writing the final FlowFile to HDFS/ other storage.
>>> > Earlier we were using simple system date
>>> > (/year=${now():format('yyyy')}/month=${now():format('MM')}/
>>> day=${now():format('dd')}/)
>>> > for this but that doesn't make sense when we consume old data from Kafka
>>> and
>>> > want to partition on original date (a date field inside Kafka message)
>>> >
>>> >
>>> > Thank you!
>>> > ______________________
>>> >
>>> > Kind Regards,
>>> > Anshuman Ghosh
>>> > Contact - +49 179 9090964
>>> >
>>>

Re: Partitioning from actual Data (FlowFile) in NiFi

Posted by Bryan Bende <bb...@gmail.com>.
If your data is JSON, then you could extract the date field from the
JSON before you convert to Avro by using EvaluateJsonPath.

From there lets say you have an attribute called "time" with the unix
timestamp, you could use an UpdateAttribute processor to create
attributes for each part of the timestamp:

time.year = ${time:format("yyyy", "GMT")}
time.month = ${time:format("MM", "GMT")}
time.day = ${time:format("dd", "GMT")}

Then in PutHDFS you can do something similar to what you were already doing:

/year=${time.year}/month=${time.month}/day=${time.day}/

As Joe mentioned there is a bunch of new record reader/writer related
capabilities in 1.2.0, and there is a follow JIRA to add a "record
path" which would allow you to extract a value (like your date field)
from any data format.

On Thu, May 11, 2017 at 10:04 AM, Anshuman Ghosh
<an...@gmail.com> wrote:
> Hello Joe,
>
> Regret for the inconvenience, I would keep that in mind going forward!
>
> Thank you for your suggestion :-)
> We have recently built NiFi from the master branch, so it should be similar
> to 1.2.0
> We receive data in JSON format and then convert to Avro before writing to
> HDFS.
> The date filed here is an Unix timestamp of 19 digit (bigint)
>
> It would be really great if you can help a bit on how we can achieve the
> same with Avro here.
> Thanking you in advance!
>
>
> ______________________
>
> *Kind Regards,*
> *Anshuman Ghosh*
> *Contact - +49 179 9090964*
>
>
> On Thu, May 11, 2017 at 3:53 PM, Joe Witt <jo...@gmail.com> wrote:
>
>> Anshuman
>>
>> Hello.  Please avoid directly addressing specific developers and
>> instead just address the mailing list you need (dev or user).
>>
>> If your data is CSV, for example, you can use RouteText to efficiently
>> partition the incoming sets by matching field/column values and in so
>> doing you'll now have the flowfile attribute you need for that group.
>> Then you can merge those together with MergeContent for like
>> attributes and when writing to HDFS you can use that value.
>>
>> With the next record reader/writer capabilities in Apache NiFI 1.2.0
>> we can now provide a record oriented PartitionRecord processor which
>> will then also let you easily do this pattern on all kinds of
>> formats/schemas in a nice/clean way.
>>
>> Joe
>>
>> On Thu, May 11, 2017 at 9:49 AM, Anshuman Ghosh
>> <an...@gmail.com> wrote:
>> > Hello everyone,
>> >
>> > It would be great if you can help me implementing this use-case
>> >
>> > Is there any way (NiFi processor) to use an attribute (field/ column)
>> value
>> > for partitioning when writing the final FlowFile to HDFS/ other storage.
>> > Earlier we were using simple system date
>> > (/year=${now():format('yyyy')}/month=${now():format('MM')}/
>> day=${now():format('dd')}/)
>> > for this but that doesn't make sense when we consume old data from Kafka
>> and
>> > want to partition on original date (a date field inside Kafka message)
>> >
>> >
>> > Thank you!
>> > ______________________
>> >
>> > Kind Regards,
>> > Anshuman Ghosh
>> > Contact - +49 179 9090964
>> >
>>

Re: Partitioning from actual Data (FlowFile) in NiFi

Posted by Anshuman Ghosh <an...@gmail.com>.
Hello Joe,

Regret for the inconvenience, I would keep that in mind going forward!

Thank you for your suggestion :-)
We have recently built NiFi from the master branch, so it should be similar
to 1.2.0
We receive data in JSON format and then convert to Avro before writing to
HDFS.
The date filed here is an Unix timestamp of 19 digit (bigint)

It would be really great if you can help a bit on how we can achieve the
same with Avro here.
Thanking you in advance!

​
______________________

*Kind Regards,*
*Anshuman Ghosh*
*Contact - +49 179 9090964*


On Thu, May 11, 2017 at 3:53 PM, Joe Witt <jo...@gmail.com> wrote:

> Anshuman
>
> Hello.  Please avoid directly addressing specific developers and
> instead just address the mailing list you need (dev or user).
>
> If your data is CSV, for example, you can use RouteText to efficiently
> partition the incoming sets by matching field/column values and in so
> doing you'll now have the flowfile attribute you need for that group.
> Then you can merge those together with MergeContent for like
> attributes and when writing to HDFS you can use that value.
>
> With the next record reader/writer capabilities in Apache NiFI 1.2.0
> we can now provide a record oriented PartitionRecord processor which
> will then also let you easily do this pattern on all kinds of
> formats/schemas in a nice/clean way.
>
> Joe
>
> On Thu, May 11, 2017 at 9:49 AM, Anshuman Ghosh
> <an...@gmail.com> wrote:
> > Hello everyone,
> >
> > It would be great if you can help me implementing this use-case
> >
> > Is there any way (NiFi processor) to use an attribute (field/ column)
> value
> > for partitioning when writing the final FlowFile to HDFS/ other storage.
> > Earlier we were using simple system date
> > (/year=${now():format('yyyy')}/month=${now():format('MM')}/
> day=${now():format('dd')}/)
> > for this but that doesn't make sense when we consume old data from Kafka
> and
> > want to partition on original date (a date field inside Kafka message)
> >
> >
> > Thank you!
> > ______________________
> >
> > Kind Regards,
> > Anshuman Ghosh
> > Contact - +49 179 9090964
> >
>

Re: Partitioning from actual Data (FlowFile) in NiFi

Posted by Joe Witt <jo...@gmail.com>.
Anshuman

Hello.  Please avoid directly addressing specific developers and
instead just address the mailing list you need (dev or user).

If your data is CSV, for example, you can use RouteText to efficiently
partition the incoming sets by matching field/column values and in so
doing you'll now have the flowfile attribute you need for that group.
Then you can merge those together with MergeContent for like
attributes and when writing to HDFS you can use that value.

With the next record reader/writer capabilities in Apache NiFI 1.2.0
we can now provide a record oriented PartitionRecord processor which
will then also let you easily do this pattern on all kinds of
formats/schemas in a nice/clean way.

Joe

On Thu, May 11, 2017 at 9:49 AM, Anshuman Ghosh
<an...@gmail.com> wrote:
> Hello everyone,
>
> It would be great if you can help me implementing this use-case
>
> Is there any way (NiFi processor) to use an attribute (field/ column) value
> for partitioning when writing the final FlowFile to HDFS/ other storage.
> Earlier we were using simple system date
> (/year=${now():format('yyyy')}/month=${now():format('MM')}/day=${now():format('dd')}/)
> for this but that doesn't make sense when we consume old data from Kafka and
> want to partition on original date (a date field inside Kafka message)
>
>
> Thank you!
> ______________________
>
> Kind Regards,
> Anshuman Ghosh
> Contact - +49 179 9090964
>