You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by David Dreyfus <dd...@gmail.com> on 2017/10/26 02:13:33 UTC

Tasks, slots, and partitioned joins

Hello -

I have a large number of pairs of files. For purpose of discussion:
/source1/{1..10000} and /source2/{1..10000}.

I want to join the files pair-wise: /source1/1 joined to /source2/1,
/source1/2 joined to /source2/2, and so on.
I then want to union the results of the pair-wise joins and perform an
aggregate.

I create a simple flink job that has four sources, two joins, and two sinks
to produce intermediate results. This represents two unrelated chains.

I notice that when running this job with parallelism = 1 on a standalone
machine with one task manager and 3 slots, only one slot gets used. 

My concern is that when I scale up to a YARN cluster, flink will continue to
use one slot on one machine instead of using all slots on all machines.

Prior reading suggests all the data source subtasks are added to a default
resource group. Downstream tasks (joins and sinks) want to be colocated with
the data sources. The result is all of my tasks are executed in one slot.

Flink Stream (DataStream) offers the slotSharingGroup() function. This
doesn't seem available to the DataSet user.

*Q1:* How do I force Flink to distribute work evenly across task managers
and the slots allocated to them? If this shouldn't be a concern, please
elaborate. 

When I scale up the number of unrelated chains I notice that flink seems to
start all of them at the same time, which results in thrashing and errors -
lots of IO and errors regarding hash buffers.

*Q2:* Is there any method for controlling the scheduling of tasks so that
some finish before others start? My work around is to execute multiple,
sequential batches with results going into an intermediate directory, and
then a final job that aggregates the results. I would certainly prefer one
job that might avoid the intermediate write.

If I treat /source1 as one data source and /source2 as the second, and then
join the two, flink will shuffle and partition the files on the join key.
The /source1 and /source2 files represent this partitioning. They are reused
multiple times; thus, I shuffle and save the results creating /source1 and
/source2.

*Q3:* Does flink have a method by which I can mark individual files (or
directories) as belonging to a particular partition so that when I try to
join them, the unnecessary shuffle and repartition is avoided?

Thank you,
David



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Tasks, slots, and partitioned joins

Posted by Fabian Hueske <fh...@gmail.com>.
Hi David,

please find my answers below:

1. For high utilization, all slot should be filled. Each slot will
processes a slice of the program on a slice of the data. In case of
partitioning or changed parallelism, the data is shuffled accordingly .
2. That's a good question. I think the default logic is to go round-robin
on the TMs as you suggested, but I'm not 100% sure. There are a couple of
exceptions and special cases, IIRC.
3. No, I would still use Flink's join operator to do the join. When you
read both files with the same split, you'd have a single source for both
input. You could do something like:

                /-- Filter "/source1" --\
Source -<                                  >-Join-->
                \-- Filter "/source2" --/

If all operators have the same parallelism and the source has the right
split properties configured, all data should stay local and the join would
work without partitioning the data.
You could go even further if the data in the files is sorted on the join
key. Then you could read in zig-zag fashion from both files and give sorted
split properties. In theory, the join would happen without a sort (haven't
tried this though).

4.a Yes that is true. FileInputFormat has a flag to prevent files from
being split up into multiple splits.
4.b You might be able to hack this with a custom InputSplitAssigner. The
SplitDataProperties partition methods have a partitioniner ID field. IIRC,
this is used to determine equal partitioning for joins.
However, as I said, you need to make sure that the files with the same keys
are read by the same subtask. You could do that with a custom
InputSplitAssigner.
My proposal to read both files with the same key in the same input split
(with a single source) tried to go around this issue by forcing the data of
both files in the same subtask.
4.c. The concept of a partition is a bit different in Flink and not bound
to InputSplits. All data arriving at a parallel instance of an operator is
considered to be in the same partition.
So both, FlatMap and MapPartition, call open() just once. In MapPartition
the mapPartition() method is also called once, while flatMap() is called
for each record.

Cheers, Fabian




2017-10-26 15:04 GMT+02:00 David Dreyfus <dd...@gmail.com>:

> Hi Fabian,
>
> Thank you for the great, detailed answers.
> 1. So, each parallel slice of the DAG is placed into one slot. The key to
> high utilization is many slices of the source data (or the various methods
> of repartitioning it). Yes?
> 2. In batch processing, are slots filled round-robin on task managers, or
> do
> I need to tune the number of slots to load the cluster evenly?
> 3. Are you suggesting that I perform the join in my custom data source?
> 4. Looking at this sample from
> org.apache.flink.optimizer.PropertyDataSourceTest
>
>   DataSource<Tuple2&lt;Long, String>> data =
>     env.readCsvFile("/some/path").types(Long.class, String.class);
>
>   data.getSplitDataProperties()
>     .splitsPartitionedBy(0);
>
> 4.a Does this code assume that one split == one file from /some/path? If
> readCsvFile splits each file, the guarantee that all keys in each part of
> the file share the same partition would be violated, right?
> 4.b Is there a way to mark a partition number so that sources that share
> partition numbers are read in parallel and joined? If I have 10,000 pairs,
> I
> want partition 1 read from the sources at the same time.
> 4.c Does a downstream flatmap function get an open() call for each new
> partition? Or, do I chain MapPartition directly to the datasource?
>
> Thank you,
> David
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Tasks, slots, and partitioned joins

Posted by David Dreyfus <dd...@gmail.com>.
Hi Fabian,

Thank you for the great, detailed answers. 
1. So, each parallel slice of the DAG is placed into one slot. The key to
high utilization is many slices of the source data (or the various methods
of repartitioning it). Yes?
2. In batch processing, are slots filled round-robin on task managers, or do
I need to tune the number of slots to load the cluster evenly?
3. Are you suggesting that I perform the join in my custom data source?
4. Looking at this sample from
org.apache.flink.optimizer.PropertyDataSourceTest

  DataSource<Tuple2&lt;Long, String>> data = 
    env.readCsvFile("/some/path").types(Long.class, String.class); 
 
  data.getSplitDataProperties() 
    .splitsPartitionedBy(0); 

4.a Does this code assume that one split == one file from /some/path? If
readCsvFile splits each file, the guarantee that all keys in each part of
the file share the same partition would be violated, right?
4.b Is there a way to mark a partition number so that sources that share
partition numbers are read in parallel and joined? If I have 10,000 pairs, I
want partition 1 read from the sources at the same time.
4.c Does a downstream flatmap function get an open() call for each new
partition? Or, do I chain MapPartition directly to the datasource?

Thank you,
David



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Tasks, slots, and partitioned joins

Posted by Fabian Hueske <fh...@gmail.com>.
Hi David,

Flink's DataSet API schedules one slice of a program to a task slot. A
program slice is one parallel instance of each operator of a program.
When all operator of your program run with a parallelism of 1, you end up
with only 1 slice that runs on a single slot.
Flink's DataSet API leverages data parallelism (running parallel instance
of the same operator on different workers working on different data
partitions) instead of task parallelism (running different operators on
different workers).

Regarding your task, I would implement a custom InputFormat which extends
the FileInputFormat. The FileInputFormat.open() [1] method is called with a
FileInputSplit [2] which contains the file path. You can put the path aside
and add as an additional field when emitting records in the nextRecord()
method.
This way, you only need two sources (one for /source1 and one for /source2)
and can join the records on a composite key of filename and join key. This
should balance the load evenly over a larger number of keys.
However, you would lose the advantage of pre-partitioned files because all
data of source1 would be joined with all data of source2.

There is a low-level interface to leverage pre-partitioned files. With
SplitDataProperties [3] you can specify that the data produced by a
DataSource [4] is partitioned by InputSplit.
If you implement the source in a way that a single split contains the
information to read both files, you can avoid an additional shuffle and
join locally. This is manual low-level optimization where you need to know
what you are doing. I'm not sure if this is documented except for the Java
Docs.

Hope this helps,
Fabian

[1]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L684
[2]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java#L34
[3]
https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java#L101
[4]
https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java#L117






2017-10-26 4:13 GMT+02:00 David Dreyfus <dd...@gmail.com>:

> Hello -
>
> I have a large number of pairs of files. For purpose of discussion:
> /source1/{1..10000} and /source2/{1..10000}.
>
> I want to join the files pair-wise: /source1/1 joined to /source2/1,
> /source1/2 joined to /source2/2, and so on.
> I then want to union the results of the pair-wise joins and perform an
> aggregate.
>
> I create a simple flink job that has four sources, two joins, and two sinks
> to produce intermediate results. This represents two unrelated chains.
>
> I notice that when running this job with parallelism = 1 on a standalone
> machine with one task manager and 3 slots, only one slot gets used.
>
> My concern is that when I scale up to a YARN cluster, flink will continue
> to
> use one slot on one machine instead of using all slots on all machines.
>
> Prior reading suggests all the data source subtasks are added to a default
> resource group. Downstream tasks (joins and sinks) want to be colocated
> with
> the data sources. The result is all of my tasks are executed in one slot.
>
> Flink Stream (DataStream) offers the slotSharingGroup() function. This
> doesn't seem available to the DataSet user.
>
> *Q1:* How do I force Flink to distribute work evenly across task managers
> and the slots allocated to them? If this shouldn't be a concern, please
> elaborate.
>
> When I scale up the number of unrelated chains I notice that flink seems to
> start all of them at the same time, which results in thrashing and errors -
> lots of IO and errors regarding hash buffers.
>
> *Q2:* Is there any method for controlling the scheduling of tasks so that
> some finish before others start? My work around is to execute multiple,
> sequential batches with results going into an intermediate directory, and
> then a final job that aggregates the results. I would certainly prefer one
> job that might avoid the intermediate write.
>
> If I treat /source1 as one data source and /source2 as the second, and then
> join the two, flink will shuffle and partition the files on the join key.
> The /source1 and /source2 files represent this partitioning. They are
> reused
> multiple times; thus, I shuffle and save the results creating /source1 and
> /source2.
>
> *Q3:* Does flink have a method by which I can mark individual files (or
> directories) as belonging to a particular partition so that when I try to
> join them, the unnecessary shuffle and repartition is avoided?
>
> Thank you,
> David
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>