You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Junguk Cho <jm...@gmail.com> on 2017/09/06 19:54:59 UTC

Question about Flink internals

Hi, All.

I am new to Flink.
I just installed Flink in clusters and start reading documents to
understand Flink internals.
After reading some documents, I have some questions.
I have some experiences of Storm and Heron before, so I am linking their
mechanisms to questions to better understand Flink.

1. Can I specify worker parallelism explicitly like Storm?

2. Record in Flink
Can I think a "record" in FLINK is almost same as Tuple in Storm?
Tuple in Storm is used for carrying "real data" + "metadata (e.g., stream
type, source id and so on).

3. How does partition (e.g., shuffling,  map) works internally?
In Storm, it has (worker id) : (tcp info to next workers) tables.
So, based on this information, after executing partition function, Tuple is
 forwarded to next hops based on tables.
Is it the same?

4. How does Flink detect fault in case of worker dead machine failure?
Based on documents, Job manager checks liveness of task managers with
heartbeat message.
In Storm, supervisor (I think it is similar with Task manager) first
detects worker dead based on heartbeat and locally re-runs it again. For
machine failure, Nimbus (I think it is similar with Job manager) detects
machine failure based on supervisor's heartbeat and re-schedule all
assigned worker to other machine.
How does Flink work?

5. For exactly-once delivery, Flink uses checking point and record replay
mechanism.
It needs messages queues (e.g, Kafka) for record replay.
Kafka uses TCP to send and receive data. So I wonder if data source does
not use TCP (e.g., IoT sensors), what is general solutions to use record
replay?
For example, source workers are directly connected to several inputs (e.g.,
IoT sensors) while I think it is not normal deployments.

6. Flink supports Cycles.
However,  based on documents, Cycled tasks act as regular dataflow source
and sink respectively, yet they are collocated in the same physical
instance to share an in-memory buffer and thus, implement loopback stream
transparently.
So, what if the number of workers which make cycles is high? It would be
hard to put them in the same physical machine.

Thanks,
Junguk

Re: Question about Flink internals

Posted by Junguk Cho <jm...@gmail.com>.
Hi, Timo.

Thank you for detailed replies.
It helps me to understand flink a lot.

However, there are misinterpreted points.

2. From a user's perspective you can only see the "real data". Internally,
there are different types of records that flow through the topology (namely
watermarks, checkpoint barriers, latency markers, and records with or
without timestamp metadata).
-> I understood there are several type of records. I wonder "record" class
and its members. E.g., Tuple in Storm (
https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java
)


6. I don't know about the internals of iteration feature but you might be
right. Cyclic dataflows are not fully supported yet. E.g. they are also not
participating in Flink's checkpointing mechanism.
->  Based on Section 3.4 in this paper (
http://www.vldb.org/pvldb/vol10/p1718-carbone.pdf), it seemed that Flink
supports checkingpoint for cyclic dataflows. However, there is this
limitation (Cycled tasks act as regular dataflow source and sink
respectively, yet they are collocated in the same physical instance to
share an in-memory buffer and thus, implement loopback stream
transparently.).

In general, I would recommmend to import Flink into your IDE and set a
breakpoint in an example (e.g. within a mapper before a keyBy) and run it
in debug mode. You can step through the layers to see more about the
internals. This should answer most of your question, otherwise feel free to
ask again.
-> I will try this. Thanks a lot.

Thanks,
Junguk

On Thu, Sep 7, 2017 at 6:14 AM, Timo Walther <tw...@apache.org> wrote:

> Hi Junguk,
>
> I try to answer your questions, but also loop in Ufuk who might now more
> about the network internals:
>
> 1. Yes, every operator/operator chain has a "setParallelism()" method do
> specify the parallelism. The overall parallelism of the job can be set when
> submitting a job. The parallelism per TaskManager is determined by the
> number of slots.
>
> 2. From a user's perspective you can only see the "real data". Internally,
> there are different types of records that flow through the topology (namely
> watermarks, checkpoint barriers, latency markers, and records with or
> without timestamp metadata).
>
> 3. See my last comment.
>
> 4. Flink also uses heartbeat messages between JobManager and TaskManagers.
> In case of a failure the JobManager restores the entire topology to the
> last successful checkpoint. See [1] for more explanation. In the future it
> is planned to recover more fine-grained.
>
> 5. Source workers should not be directly connected but though systems like
> Kafka or Pravega. Not only for replaying in case of failures but also for
> using it as the single source of truth in case your processing logic needs
> to be adapted. E.g. you had a bug in your application and the state that
> you have built is invalid, you want to be able to correct your mistake and
> rebuild the state in a batch. The folks from Drivetribe showed a very nice
> architecture [2]. I don't know if replaying your IoT devices would make
> sense, in theory you could implement your own connector that implements a
> similar logic as Flink's Kafka consumer.
>
> 6. I don't know about the internals of iteration feature but you might be
> right. Cyclic dataflows are not fully supported yet. E.g. they are also not
> participating in Flink's checkpointing mechanism.
>
> In general, I would recommmend to import Flink into your IDE and set a
> breakpoint in an example (e.g. within a mapper before a keyBy) and run it
> in debug mode. You can step through the layers to see more about the
> internals. This should answer most of your question, otherwise feel free to
> ask again.
>
> Regards,
> Timo
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
> internals/stream_checkpointing.html
> [2] https://data-artisans.com/blog/drivetribe-cqrs-apache-flink
>
> Am 06.09.17 um 21:54 schrieb Junguk Cho:
>
> Hi, All.
>>
>> I am new to Flink.
>> I just installed Flink in clusters and start reading documents to
>> understand Flink internals.
>> After reading some documents, I have some questions.
>> I have some experiences of Storm and Heron before, so I am linking their
>> mechanisms to questions to better understand Flink.
>>
>> 1. Can I specify worker parallelism explicitly like Storm?
>>
>> 2. Record in Flink
>> Can I think a "record" in FLINK is almost same as Tuple in Storm?
>> Tuple in Storm is used for carrying "real data" + "metadata (e.g., stream
>> type, source id and so on).
>>
>> 3. How does partition (e.g., shuffling,  map) works internally?
>> In Storm, it has (worker id) : (tcp info to next workers) tables.
>> So, based on this information, after executing partition function, Tuple
>> is  forwarded to next hops based on tables.
>> Is it the same?
>>
>> 4. How does Flink detect fault in case of worker dead machine failure?
>> Based on documents, Job manager checks liveness of task managers with
>> heartbeat message.
>> In Storm, supervisor (I think it is similar with Task manager) first
>> detects worker dead based on heartbeat and locally re-runs it again. For
>> machine failure, Nimbus (I think it is similar with Job manager) detects
>> machine failure based on supervisor's heartbeat and re-schedule all
>> assigned worker to other machine.
>> How does Flink work?
>>
>> 5. For exactly-once delivery, Flink uses checking point and record replay
>> mechanism.
>> It needs messages queues (e.g, Kafka) for record replay.
>> Kafka uses TCP to send and receive data. So I wonder if data source does
>> not use TCP (e.g., IoT sensors), what is general solutions to use record
>> replay?
>> For example, source workers are directly connected to several inputs
>> (e.g., IoT sensors) while I think it is not normal deployments.
>>
>> 6. Flink supports Cycles.
>> However,  based on documents, Cycled tasks act as regular dataflow source
>> and sink respectively, yet they are collocated in the same physical
>> instance to share an in-memory buffer and thus, implement loopback stream
>> transparently.
>> So, what if the number of workers which make cycles is high? It would be
>> hard to put them in the same physical machine.
>>
>> Thanks,
>> Junguk
>>
>
>
>

Re: Question about Flink internals

Posted by Timo Walther <tw...@apache.org>.
Hi Junguk,

I try to answer your questions, but also loop in Ufuk who might now more 
about the network internals:

1. Yes, every operator/operator chain has a "setParallelism()" method do 
specify the parallelism. The overall parallelism of the job can be set 
when submitting a job. The parallelism per TaskManager is determined by 
the number of slots.

2. From a user's perspective you can only see the "real data". 
Internally, there are different types of records that flow through the 
topology (namely watermarks, checkpoint barriers, latency markers, and 
records with or without timestamp metadata).

3. See my last comment.

4. Flink also uses heartbeat messages between JobManager and 
TaskManagers. In case of a failure the JobManager restores the entire 
topology to the last successful checkpoint. See [1] for more 
explanation. In the future it is planned to recover more fine-grained.

5. Source workers should not be directly connected but though systems 
like Kafka or Pravega. Not only for replaying in case of failures but 
also for using it as the single source of truth in case your processing 
logic needs to be adapted. E.g. you had a bug in your application and 
the state that you have built is invalid, you want to be able to correct 
your mistake and rebuild the state in a batch. The folks from Drivetribe 
showed a very nice architecture [2]. I don't know if replaying your IoT 
devices would make sense, in theory you could implement your own 
connector that implements a similar logic as Flink's Kafka consumer.

6. I don't know about the internals of iteration feature but you might 
be right. Cyclic dataflows are not fully supported yet. E.g. they are 
also not participating in Flink's checkpointing mechanism.

In general, I would recommmend to import Flink into your IDE and set a 
breakpoint in an example (e.g. within a mapper before a keyBy) and run 
it in debug mode. You can step through the layers to see more about the 
internals. This should answer most of your question, otherwise feel free 
to ask again.

Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/stream_checkpointing.html
[2] https://data-artisans.com/blog/drivetribe-cqrs-apache-flink

Am 06.09.17 um 21:54 schrieb Junguk Cho:
> Hi, All.
>
> I am new to Flink.
> I just installed Flink in clusters and start reading documents to 
> understand Flink internals.
> After reading some documents, I have some questions.
> I have some experiences of Storm and Heron before, so I am linking 
> their mechanisms to questions to better understand Flink.
>
> 1. Can I specify worker parallelism explicitly like Storm?
>
> 2. Record in Flink
> Can I think a "record" in FLINK is almost same as Tuple in Storm?
> Tuple in Storm is used for carrying "real data" + "metadata (e.g., 
> stream type, source id and so on).
>
> 3. How does partition (e.g., shuffling,  map) works internally?
> In Storm, it has (worker id) : (tcp info to next workers) tables.
> So, based on this information, after executing partition function, 
> Tuple is  forwarded to next hops based on tables.
> Is it the same?
>
> 4. How does Flink detect fault in case of worker dead machine failure?
> Based on documents, Job manager checks liveness of task managers with 
> heartbeat message.
> In Storm, supervisor (I think it is similar with Task manager) first 
> detects worker dead based on heartbeat and locally re-runs it again. 
> For machine failure, Nimbus (I think it is similar with Job manager) 
> detects machine failure based on supervisor's heartbeat and 
> re-schedule all assigned worker to other machine.
> How does Flink work?
>
> 5. For exactly-once delivery, Flink uses checking point and record 
> replay mechanism.
> It needs messages queues (e.g, Kafka) for record replay.
> Kafka uses TCP to send and receive data. So I wonder if data source 
> does not use TCP (e.g., IoT sensors), what is general solutions to use 
> record replay?
> For example, source workers are directly connected to several inputs 
> (e.g., IoT sensors) while I think it is not normal deployments.
>
> 6. Flink supports Cycles.
> However,  based on documents, Cycled tasks act as regular dataflow 
> source and sink respectively, yet they are collocated in the same 
> physical instance to share an in-memory buffer and thus, implement 
> loopback stream transparently.
> So, what if the number of workers which make cycles is high? It would 
> be hard to put them in the same physical machine.
>
> Thanks,
> Junguk