You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by rey26 <re...@gmail.com> on 2018/05/30 20:35:28 UTC

Publishing Kafka Topics via Apache Nifi cluster.

Hello Team,

We have a apache Nifi cluster with 3 nodes and 3 nodes kafka cluster.We are
receiving some files which has transactions in orders.(A-type first and than
B-type)
These events are in order but may come is different files.For example
A-event for id 111 can be present in file 1 and B-event can come in
immedaite file2 [B will always come after
A-type for any ID].We want data need to be puslished in the same order as it
is received.

We developed a flow using ListSFTP+FecthFTP+publishkafka combination in
order ,have also done partitioning on kafka topic[9 partitions] on the
basics of a key column
and same key is used in Publish Kafka Processor.

Al the events are published to the same partition but are going out of order
but within the partition are out of order.
Example B-type events are coming before A-Type in kafka topic TEST.

Now i have some queries regarding the above 

What i understood is that since the ListSFTP+FecthFTP improves load
balancing but does it ensures ordering?
File1 may go to Node1 and File2 may go to Node2 , and Node 2 can publish the
record to the same partition on kafka before Node1?
Is there any way to gaurantee load order of files in Apache Nifi in cluster
Mode keeping perfomance in mind.?

Since each task in PublishKafka processor is one publisher , if we run the
publish kafka on only primary node and pass only one broker-id does it will
do the trick?



--
Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/

Re: Publishing Kafka Topics via Apache Nifi cluster.

Posted by Andrew Psaltis <ps...@gmail.com>.
Pierre,
Mentioning that JIRA is very timely, I was just talking with some yesterday
about NiFi and Kafka and this exact scenario came up and was something they
were keenly interested in being able to do. Route data for say different
stocks to different NiFi nodes via an RPG.  Happy to help our here with
finding time to work on that JIRA.

On Thu, May 31, 2018 at 10:55 AM, Pierre Villard <
pierre.villard.fr@gmail.com> wrote:

> As an additional note to what Bryan said, there is a JIRA [1] that could
> help in this case.
> I'm trying to find time to work on it... but no luck so far :)
>
> [1] https://issues.apache.org/jira/browse/NIFI-4026
>
>
>
> 2018-05-31 15:43 GMT+02:00 Bryan Bende <bb...@gmail.com>:
>
> > Hello,
> >
> > If I'm understanding the situation correctly, you want ordering within
> > a key, but not necessarily total ordering across all your data?
> >
> > I'm making this assumption since you said you have 9 partitions on
> > your Kafka topic and you are partitioning by key, so the data for each
> > key is in order per partition.
> >
> > The list + fetch pattern with redistribution doesn't have a way to
> > control how the data is distributed, it is just round-robin and you
> > can control the batch size, but you can't partition the data to nodes
> > based on a key.
> >
> > There is an EnforceOrder processor [1] which was made to help with
> > this kind of scenario, I believe specifically for CDC scenarios where
> > the event log has to be processed in order. I haven't used it myself
> > so maybe others can help here, but I believe you would use your "key"
> > as the "Group Identifier" and then somehow you need to get an integer
> > value on each flow file that represents the order within the group. So
> > for example your A-event flow file would need some kind of attribute
> > like "order = 1" and then the B-event flow file would need an
> > attribute like "order = 2". You might be able to assign this order
> > using an UpdateAttribute processor right after the ListSFTP, but you
> > have to do it per key somehow.
> >
> > Another option is to just run the whole flow on primary node without
> > doing the site-to-site redistribution, but then you lose out on
> > parallel processing, and even on a single node I believe there are
> > cases where ordering is not guaranteed.
> >
> > Thanks,
> >
> > Bryan
> >
> > [1] https://nifi.apache.org/docs/nifi-docs/components/org.
> > apache.nifi/nifi-standard-nar/1.6.0/org.apache.nifi.processors.standard.
> > EnforceOrder/index.html
> >
> >
> > On Wed, May 30, 2018 at 4:35 PM, rey26 <re...@gmail.com> wrote:
> > > Hello Team,
> > >
> > > We have a apache Nifi cluster with 3 nodes and 3 nodes kafka cluster.We
> > are
> > > receiving some files which has transactions in orders.(A-type first and
> > than
> > > B-type)
> > > These events are in order but may come is different files.For example
> > > A-event for id 111 can be present in file 1 and B-event can come in
> > > immedaite file2 [B will always come after
> > > A-type for any ID].We want data need to be puslished in the same order
> > as it
> > > is received.
> > >
> > > We developed a flow using ListSFTP+FecthFTP+publishkafka combination in
> > > order ,have also done partitioning on kafka topic[9 partitions] on the
> > > basics of a key column
> > > and same key is used in Publish Kafka Processor.
> > >
> > > Al the events are published to the same partition but are going out of
> > order
> > > but within the partition are out of order.
> > > Example B-type events are coming before A-Type in kafka topic TEST.
> > >
> > > Now i have some queries regarding the above
> > >
> > > What i understood is that since the ListSFTP+FecthFTP improves load
> > > balancing but does it ensures ordering?
> > > File1 may go to Node1 and File2 may go to Node2 , and Node 2 can
> publish
> > the
> > > record to the same partition on kafka before Node1?
> > > Is there any way to gaurantee load order of files in Apache Nifi in
> > cluster
> > > Mode keeping perfomance in mind.?
> > >
> > > Since each task in PublishKafka processor is one publisher , if we run
> > the
> > > publish kafka on only primary node and pass only one broker-id does it
> > will
> > > do the trick?
> > >
> > >
> > >
> > > --
> > > Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/
> >
>

Re: Publishing Kafka Topics via Apache Nifi cluster.

Posted by Pierre Villard <pi...@gmail.com>.
As an additional note to what Bryan said, there is a JIRA [1] that could
help in this case.
I'm trying to find time to work on it... but no luck so far :)

[1] https://issues.apache.org/jira/browse/NIFI-4026



2018-05-31 15:43 GMT+02:00 Bryan Bende <bb...@gmail.com>:

> Hello,
>
> If I'm understanding the situation correctly, you want ordering within
> a key, but not necessarily total ordering across all your data?
>
> I'm making this assumption since you said you have 9 partitions on
> your Kafka topic and you are partitioning by key, so the data for each
> key is in order per partition.
>
> The list + fetch pattern with redistribution doesn't have a way to
> control how the data is distributed, it is just round-robin and you
> can control the batch size, but you can't partition the data to nodes
> based on a key.
>
> There is an EnforceOrder processor [1] which was made to help with
> this kind of scenario, I believe specifically for CDC scenarios where
> the event log has to be processed in order. I haven't used it myself
> so maybe others can help here, but I believe you would use your "key"
> as the "Group Identifier" and then somehow you need to get an integer
> value on each flow file that represents the order within the group. So
> for example your A-event flow file would need some kind of attribute
> like "order = 1" and then the B-event flow file would need an
> attribute like "order = 2". You might be able to assign this order
> using an UpdateAttribute processor right after the ListSFTP, but you
> have to do it per key somehow.
>
> Another option is to just run the whole flow on primary node without
> doing the site-to-site redistribution, but then you lose out on
> parallel processing, and even on a single node I believe there are
> cases where ordering is not guaranteed.
>
> Thanks,
>
> Bryan
>
> [1] https://nifi.apache.org/docs/nifi-docs/components/org.
> apache.nifi/nifi-standard-nar/1.6.0/org.apache.nifi.processors.standard.
> EnforceOrder/index.html
>
>
> On Wed, May 30, 2018 at 4:35 PM, rey26 <re...@gmail.com> wrote:
> > Hello Team,
> >
> > We have a apache Nifi cluster with 3 nodes and 3 nodes kafka cluster.We
> are
> > receiving some files which has transactions in orders.(A-type first and
> than
> > B-type)
> > These events are in order but may come is different files.For example
> > A-event for id 111 can be present in file 1 and B-event can come in
> > immedaite file2 [B will always come after
> > A-type for any ID].We want data need to be puslished in the same order
> as it
> > is received.
> >
> > We developed a flow using ListSFTP+FecthFTP+publishkafka combination in
> > order ,have also done partitioning on kafka topic[9 partitions] on the
> > basics of a key column
> > and same key is used in Publish Kafka Processor.
> >
> > Al the events are published to the same partition but are going out of
> order
> > but within the partition are out of order.
> > Example B-type events are coming before A-Type in kafka topic TEST.
> >
> > Now i have some queries regarding the above
> >
> > What i understood is that since the ListSFTP+FecthFTP improves load
> > balancing but does it ensures ordering?
> > File1 may go to Node1 and File2 may go to Node2 , and Node 2 can publish
> the
> > record to the same partition on kafka before Node1?
> > Is there any way to gaurantee load order of files in Apache Nifi in
> cluster
> > Mode keeping perfomance in mind.?
> >
> > Since each task in PublishKafka processor is one publisher , if we run
> the
> > publish kafka on only primary node and pass only one broker-id does it
> will
> > do the trick?
> >
> >
> >
> > --
> > Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/
>

Re: Publishing Kafka Topics via Apache Nifi cluster.

Posted by Bryan Bende <bb...@gmail.com>.
I was looking at EnforceOrder again and I'm not sure that will
actually help here since I don't think it works across a cluster, but
maybe others know more.

I think you can only ever have 1 concurrent task for your PublishKafka
processor. Even if you run everything on primary node, if you have 2
concurrent tasks it is going to take 2 flow files from the queue and
start publishing them to Kafka at the same time which will break the
ordering.

One thing you could try is to use ListenHttp and PostHttp instead of
site-to-site, this would let you customize the routing.

You would have a ListenHttp running on each node to receive the
listings, then you would have ListSFTP (primary node only) -> (some
processor that creates your key attrbiute) -> RouteOnAttribute (routes
on the key) -> 3 PostHttp processors (1 for each node of your
cluster).

This way you always route flow files with the same key to the same node.

You might have to consider what you would want to do if a node went
down and one of the PostHttp processors can't deliver the data, do you
let it queue up until the node comes back, and try and send to one of
the other nodes.


On Thu, May 31, 2018 at 4:18 PM, rey26 <re...@gmail.com> wrote:
> Hello Bryan ,
>
> I have read about the enforce order processor and have timestamp part in my
> files which are unique and actually files are created in the order of events
> happening.
> I can read the flow file and extract this timestamp , convert this to unix
> timestamp and make it as an attribute for Nifi.
> But once all the flow reached till queue before publish kafka, actual
> problem/confusion comes.
>
> So If i have 12 files which are sorted with timestamp they are in queue just
> before the Publish Kafka .
>
> As I understood correctly each publish kafka processor is unique
> publisher.So if my cluster is having 3 nodes and with 2 threads,
> I believe Nifi will spawn 6 publishers .[Correct me if i am wrong.]
>
> Lets say if each will get 2 files , there is no way to enfore order ,
> publisher 1 may publish its records
> before publisher 2 .
>
> I believe running on primary node is the only was for these scenarios I
> guess so far untill some global order kind of concept is introduced in Nifi.
>
>
>
>
>
> --
> Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/

Re: Publishing Kafka Topics via Apache Nifi cluster.

Posted by rey26 <re...@gmail.com>.
Hello Bryan ,

I have read about the enforce order processor and have timestamp part in my
files which are unique and actually files are created in the order of events
happening.
I can read the flow file and extract this timestamp , convert this to unix
timestamp and make it as an attribute for Nifi.
But once all the flow reached till queue before publish kafka, actual
problem/confusion comes.

So If i have 12 files which are sorted with timestamp they are in queue just
before the Publish Kafka .

As I understood correctly each publish kafka processor is unique
publisher.So if my cluster is having 3 nodes and with 2 threads,
I believe Nifi will spawn 6 publishers .[Correct me if i am wrong.]

Lets say if each will get 2 files , there is no way to enfore order ,
publisher 1 may publish its records 
before publisher 2 .

I believe running on primary node is the only was for these scenarios I
guess so far untill some global order kind of concept is introduced in Nifi.





--
Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/

Re: Publishing Kafka Topics via Apache Nifi cluster.

Posted by Bryan Bende <bb...@gmail.com>.
Hello,

If I'm understanding the situation correctly, you want ordering within
a key, but not necessarily total ordering across all your data?

I'm making this assumption since you said you have 9 partitions on
your Kafka topic and you are partitioning by key, so the data for each
key is in order per partition.

The list + fetch pattern with redistribution doesn't have a way to
control how the data is distributed, it is just round-robin and you
can control the batch size, but you can't partition the data to nodes
based on a key.

There is an EnforceOrder processor [1] which was made to help with
this kind of scenario, I believe specifically for CDC scenarios where
the event log has to be processed in order. I haven't used it myself
so maybe others can help here, but I believe you would use your "key"
as the "Group Identifier" and then somehow you need to get an integer
value on each flow file that represents the order within the group. So
for example your A-event flow file would need some kind of attribute
like "order = 1" and then the B-event flow file would need an
attribute like "order = 2". You might be able to assign this order
using an UpdateAttribute processor right after the ListSFTP, but you
have to do it per key somehow.

Another option is to just run the whole flow on primary node without
doing the site-to-site redistribution, but then you lose out on
parallel processing, and even on a single node I believe there are
cases where ordering is not guaranteed.

Thanks,

Bryan

[1] https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.6.0/org.apache.nifi.processors.standard.EnforceOrder/index.html


On Wed, May 30, 2018 at 4:35 PM, rey26 <re...@gmail.com> wrote:
> Hello Team,
>
> We have a apache Nifi cluster with 3 nodes and 3 nodes kafka cluster.We are
> receiving some files which has transactions in orders.(A-type first and than
> B-type)
> These events are in order but may come is different files.For example
> A-event for id 111 can be present in file 1 and B-event can come in
> immedaite file2 [B will always come after
> A-type for any ID].We want data need to be puslished in the same order as it
> is received.
>
> We developed a flow using ListSFTP+FecthFTP+publishkafka combination in
> order ,have also done partitioning on kafka topic[9 partitions] on the
> basics of a key column
> and same key is used in Publish Kafka Processor.
>
> Al the events are published to the same partition but are going out of order
> but within the partition are out of order.
> Example B-type events are coming before A-Type in kafka topic TEST.
>
> Now i have some queries regarding the above
>
> What i understood is that since the ListSFTP+FecthFTP improves load
> balancing but does it ensures ordering?
> File1 may go to Node1 and File2 may go to Node2 , and Node 2 can publish the
> record to the same partition on kafka before Node1?
> Is there any way to gaurantee load order of files in Apache Nifi in cluster
> Mode keeping perfomance in mind.?
>
> Since each task in PublishKafka processor is one publisher , if we run the
> publish kafka on only primary node and pass only one broker-id does it will
> do the trick?
>
>
>
> --
> Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/