You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ken Krugler <kk...@transpac.com> on 2016/09/28 23:15:50 UTC

Iterations vs. combo source/sink

Hi all,

I’ve got a very specialized DB (runs in the JVM) that I need to use to both keep track of state and generate new records to be processed by my Flink streaming workflow. Some of the workflow results are updates to be applied to the DB.

And the DB needs to be partitioned.

My initial approach is to wrap it in a regular operator, and have subsequent streams be inputs for updating state. So now I’ve got an IterativeDataStream, which should work.

But I imagine I could also wrap this DB in a source and a sink, yes? Though I’m not sure how I could partition it as a source, in that case.

If it is feasible to have a partitioned source/sink, are there general pros/cons to either approach?

Thanks,

— Ken


Re: Iterations vs. combo source/sink

Posted by Till Rohrmann <tr...@apache.org>.
Hi Ken,

you can let a class implement both the SourceFunction and the SinkFunction.
However when running a job, the source and the sink will be distinct
instances. Thus, there is no way that they share instance variables.

What you could do is to write the updated and newly discovered URLs to a
message queue like Kafka from which you read with your source function.
That way you’ll have at least once processing guarantees. But then again,
you’ll use external infrastructure.

If you really want to avoid this, then I guess you have to use iterations
and do without the processing guarantees at the moment.

Cheers,
Till
​

On Sat, Oct 1, 2016 at 1:49 AM, Ken Krugler <kk...@transpac.com>
wrote:

> Hi Fabian,
>
> Thanks for responding. Comments and questions inline below.
>
> Regards,
>
> — Ken
>
>
> On Sep 29, 2016, at 6:10am, Fabian Hueske <fh...@gmail.com> wrote:
>
> Hi Ken,
>
> you can certainly have partitioned sources and sinks. You can control the
> parallelism by calling .setParallelism() method.
>
>
> So I assume I’d implement the ParallelSourceFunction interface.
>
> If you need a partitioned sink, you can call .keyBy() to hash partition.
>
> I did not completely understand the requirements of your program. Can you
> maybe provide pseudo code for how the program should look like.
>
>
> Just for grins, I’m looking at re-implementing the Bixo web crawler (built
> on top of Cascading/Hadoop MR) as a continuous crawler on top of Flink.
>
> The main issue is the “crawl DB” that has to maintain the state of every
> URL ever seen, and also provide a fast way to generate the “best” URLs to
> be fetched. The logic of figuring out the best URL is complex, depending on
> factors like the anticipated value of the page, refetch rates for pages
> that have already been seen, number of unique URLs per domain vs. the
> domain “rank”, etc.
>
> And it has to scale to something like 30B+ URLs with a small (e.g. 10
> moderately big servers) cluster, so it needs to be very efficient in terms
> of memory/CPU usage.
>
> An additional goal is to not require additional external infrastructure.
> That simplifies the operational overhead of running a continuous crawl.
>
> So this “crawl DB” has to act as both a source (of the best URLs to fetch)
> and as a sink (for updates to fetched URLs, and as new URLs are
> discovered/injected). The state is a mix of in-memory and spilled to disk
> data.
>
> Given what you mention below about iterative data flows not being fault
> tolerant, it seems like a combo source/sink (if possible) would be best.
>
> Any guidance as to how to implement such a thing? I don’t know enough yet
> about Flink to determine if I can essentially have one task that’s acting
> as both the source & sink.
>
> Some general comments:
> - Flink's fault tolerance mechanism does not work with iterative data
> flows yet. This is work in progress see: FLINK-3257 [1]
>
>
> OK, good to know.
>
> - Flink's fault tolerance mechanism does only work if you expose all!
> internal operator state. So you would need to put your Java DB in Flink
> state to have a recoverable job.
>
>
> Yes.
>
> - Is the DB essential in your application? Could you use Flink's
> key-partitioned state interface instead? That would help to make your job
> fault-tolerant.
>
>
> Yes, as per above.
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-3257
> [2] https://ci.apache.org/projects/flink/flink-docs-
> release-1.1/apis/streaming/state.html#using-the-keyvalue-state-interface
>
> 2016-09-29 1:15 GMT+02:00 Ken Krugler <kk...@transpac.com>:
>
>> Hi all,
>>
>> I’ve got a very specialized DB (runs in the JVM) that I need to use to
>> both keep track of state and generate new records to be processed by my
>> Flink streaming workflow. Some of the workflow results are updates to be
>> applied to the DB.
>>
>> And the DB needs to be partitioned.
>>
>> My initial approach is to wrap it in a regular operator, and have
>> subsequent streams be inputs for updating state. So now I’ve got an
>> IterativeDataStream, which should work.
>>
>> But I imagine I could also wrap this DB in a source and a sink, yes?
>> Though I’m not sure how I could partition it as a source, in that case.
>>
>> If it is feasible to have a partitioned source/sink, are there general
>> pros/cons to either approach?
>>
>> Thanks,
>>
>> — Ken
>>
>>
>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>
>
>

Re: Iterations vs. combo source/sink

Posted by Ken Krugler <kk...@transpac.com>.
Hi Fabian,

Thanks for responding. Comments and questions inline below.

Regards,

— Ken


> On Sep 29, 2016, at 6:10am, Fabian Hueske <fh...@gmail.com> wrote:
> 
> Hi Ken,
> 
> you can certainly have partitioned sources and sinks. You can control the parallelism by calling .setParallelism() method.

So I assume I’d implement the ParallelSourceFunction interface.

> If you need a partitioned sink, you can call .keyBy() to hash partition.
> 
> I did not completely understand the requirements of your program. Can you maybe provide pseudo code for how the program should look like.

Just for grins, I’m looking at re-implementing the Bixo web crawler (built on top of Cascading/Hadoop MR) as a continuous crawler on top of Flink.

The main issue is the “crawl DB” that has to maintain the state of every URL ever seen, and also provide a fast way to generate the “best” URLs to be fetched. The logic of figuring out the best URL is complex, depending on factors like the anticipated value of the page, refetch rates for pages that have already been seen, number of unique URLs per domain vs. the domain “rank”, etc.

And it has to scale to something like 30B+ URLs with a small (e.g. 10 moderately big servers) cluster, so it needs to be very efficient in terms of memory/CPU usage.

An additional goal is to not require additional external infrastructure. That simplifies the operational overhead of running a continuous crawl.

So this “crawl DB” has to act as both a source (of the best URLs to fetch) and as a sink (for updates to fetched URLs, and as new URLs are discovered/injected). The state is a mix of in-memory and spilled to disk data.

Given what you mention below about iterative data flows not being fault tolerant, it seems like a combo source/sink (if possible) would be best.

Any guidance as to how to implement such a thing? I don’t know enough yet about Flink to determine if I can essentially have one task that’s acting as both the source & sink.

> Some general comments:
> - Flink's fault tolerance mechanism does not work with iterative data flows yet. This is work in progress see: FLINK-3257 [1]

OK, good to know.

> - Flink's fault tolerance mechanism does only work if you expose all! internal operator state. So you would need to put your Java DB in Flink state to have a recoverable job.

Yes.

> - Is the DB essential in your application? Could you use Flink's key-partitioned state interface instead? That would help to make your job fault-tolerant.

Yes, as per above.


> [1] https://issues.apache.org/jira/browse/FLINK-3257 <https://issues.apache.org/jira/browse/FLINK-3257>
> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html#using-the-keyvalue-state-interface <https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html#using-the-keyvalue-state-interface>
> 
> 2016-09-29 1:15 GMT+02:00 Ken Krugler <kkrugler_lists@transpac.com <ma...@transpac.com>>:
> Hi all,
> 
> I’ve got a very specialized DB (runs in the JVM) that I need to use to both keep track of state and generate new records to be processed by my Flink streaming workflow. Some of the workflow results are updates to be applied to the DB.
> 
> And the DB needs to be partitioned.
> 
> My initial approach is to wrap it in a regular operator, and have subsequent streams be inputs for updating state. So now I’ve got an IterativeDataStream, which should work.
> 
> But I imagine I could also wrap this DB in a source and a sink, yes? Though I’m not sure how I could partition it as a source, in that case.
> 
> If it is feasible to have a partitioned source/sink, are there general pros/cons to either approach?
> 
> Thanks,
> 
> — Ken
> 
> 

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr




Re: Iterations vs. combo source/sink

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Ken,

you can certainly have partitioned sources and sinks. You can control the
parallelism by calling .setParallelism() method.
If you need a partitioned sink, you can call .keyBy() to hash partition.

I did not completely understand the requirements of your program. Can you
maybe provide pseudo code for how the program should look like.

Some general comments:
- Flink's fault tolerance mechanism does not work with iterative data flows
yet. This is work in progress see: FLINK-3257 [1]
- Flink's fault tolerance mechanism does only work if you expose all!
internal operator state. So you would need to put your Java DB in Flink
state to have a recoverable job.
- Is the DB essential in your application? Could you use Flink's
key-partitioned state interface instead? That would help to make your job
fault-tolerant.

Best,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-3257
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html#using-the-keyvalue-state-interface

2016-09-29 1:15 GMT+02:00 Ken Krugler <kk...@transpac.com>:

> Hi all,
>
> I’ve got a very specialized DB (runs in the JVM) that I need to use to
> both keep track of state and generate new records to be processed by my
> Flink streaming workflow. Some of the workflow results are updates to be
> applied to the DB.
>
> And the DB needs to be partitioned.
>
> My initial approach is to wrap it in a regular operator, and have
> subsequent streams be inputs for updating state. So now I’ve got an
> IterativeDataStream, which should work.
>
> But I imagine I could also wrap this DB in a source and a sink, yes?
> Though I’m not sure how I could partition it as a source, in that case.
>
> If it is feasible to have a partitioned source/sink, are there general
> pros/cons to either approach?
>
> Thanks,
>
> — Ken
>
>