You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Robert Schmidtke <ro...@gmail.com> on 2017/01/13 11:14:12 UTC

Terminology: Split, Group and Partition

Hi all,

I'm having some trouble grasping what the meaning of/difference between the
following concepts is:

- Split
- Group
- Partition

Let me elaborate a bit on the problem I'm trying to solve here. In my tests
I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in standalone
mode. Each node has 64G of memory and 32 cores. I'm starting the JobManager
on one node, and a TaskManager on each node. I'm assigning 16 slots to each
TaskManager, so the overall parallelism is 80 (= 5 TMs x 16 Slots).

The data I want to process resides in a local folder on each worker with
the same path (say /tmp/input). There can be arbitrarily many input files
in each worker's folder. I have written a custom input format that
round-robin assigns the files to each of the 16 local input splits (
https://github.com/robert-schmidtke/hdfs-statistics-adapter/blob/master/sfs-analysis/src/main/java/de/zib/sfs/analysis/io/SfsInputFormat.java)
to obtain a total of 80 input splits that need processing. Each split reads
zero or more files, parsing the contents into records that are emitted
correctly. This works as expected.

Now we're getting to the questions. How do these 80 input splits relate to
groups and partitions? My understanding of a partition is a subset of my
DataSet<X> that is local to each node. I.e. if I were to repartition the
data according to some scheme, a shuffling over workers would occur. After
reading all the data, I have 80 partitions, correct?

What is less clear to me is the concept of a group, i.e. the result of a
groupBy operation. The input files I have are produced on each worker by
some other process. I first want to do pre-aggregation (I hope that's the
term) on each node before sending data over the network. The records I'm
processing contain a 'hostname' attribute, which is set to the worker's
hostname that processes the data, because the DataSources are local. That
means the records produced by the worker on host1 always contain the
attribute hostname=host1. Similar for the other 4 workers.

Now what happens if I do a groupBy("hostname")? How do the workers realize
that no network transfer is necessary? Is a group a logical abstraction, or
a physical one (in my understanding a partition is physical because it's
local to exactly one worker).

What I'd like to do next is a reduceGroup to merge multiple records into
one (some custom, yet straightforward, aggregation) and emit another record
for every couple of input records. Am I correct in assuming that the
Iterable<X> values passed to the reduce function all have the same hostname
value? That is, will the operation have a parallelism of 80, where 5x16
operations will have the same hostname value? Because I have 16 splits per
host, the 16 reduces on host1 should all receive values with
hostname=host1, correct? And after the operation has finished, will the
reduced groups (now actual DataSets again) still be local to the workers?

This is quite a lot to work on I have to admit. I'm happy for any hints,
advice and feedback on this. If there's need for clarification I'd be happy
to provide more information.

Thanks a lot in advance!

Robert

-- 
My GPG Key ID: 336E2680

Re: Terminology: Split, Group and Partition

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Robert,

thanks for opening the ticket.

Regarding injecting grouping or partitioning information, semantic
annotations (forward fields) [1] is probably what you are looking for.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/batch/index.html#semantic-annotations

2017-01-14 13:59 GMT+01:00 Robert Schmidtke <ro...@gmail.com>:

> Hi Fabian,
>
> I have opened a ticket for that, thanks.
>
> I have another question: now that I have obtained the proper local
> grouping, I did some aggregation of type [T] -> U, where one aggregated
> object is of type U, containing information of zero or more Ts. The Us are
> still tied to the hostname, and have the property hostname=hostX for the
> workers they're executed on, just like before. Is it possible to specify
> the grouping/partitioning for DataSets that are not DataSources, just like
> you suggested before? Because my guess is that the grouping information is
> lost when going from T to U.
>
> Best and thanks for the great help!
> Robert
>
> On Fri, Jan 13, 2017 at 8:54 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> I think so far getExecutionPlan() was only used for debugging purpose and
>> not in programs that would also be executed.
>> You can open a JIRA issue if you think that this would a valuable feature.
>>
>> Thanks, Fabian
>>
>> 2017-01-13 16:34 GMT+01:00 Robert Schmidtke <ro...@gmail.com>:
>>
>>> Just a side note, I'm guessing there's a bug here:
>>> https://github.com/apache/flink/blob/master/flink-clie
>>> nts/src/main/java/org/apache/flink/client/program/ContextEn
>>> vironment.java#L68
>>>
>>> It should say createProgramPlan("unnamed job", false);
>>>
>>> Otherwise I'm getting an exception complaining that no new sinks have
>>> been added after the last execution. So currently it is not possible for me
>>> to first get the execution plan and then run execute the program.
>>>
>>> Robert
>>>
>>> On Fri, Jan 13, 2017 at 3:14 PM, Robert Schmidtke <
>>> ro.schmidtke@gmail.com> wrote:
>>>
>>>> Hi Fabian,
>>>>
>>>> thanks for the quick and comprehensive reply. I'll have a look at the
>>>> ExecutionPlan using your suggestion to check what actually gets computed,
>>>> and I'll use the properties as well. If I stumble across something else
>>>> I'll let you know.
>>>>
>>>> Many thanks again!
>>>> Robert
>>>>
>>>> On Fri, Jan 13, 2017 at 2:40 PM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Robert,
>>>>>
>>>>> let me first describe what splits, groups, and partitions are.
>>>>>
>>>>> * Partition: This is basically all data that goes through the same
>>>>> task instance. If you have an operator with a parallelism of 80, you have
>>>>> 80 partitions. When you call sortPartition() you'll have 80 sorted streams,
>>>>> if you call mapPartition you iterate over all records in one partition.
>>>>> * Split: Splits are a concept of InputFormats. An InputFormat can
>>>>> process several splits. All splits that are processed by the same data
>>>>> source task make up the partition of that task. So a split is a subset of a
>>>>> partition. In your case where each task reads exactly one split, the split
>>>>> is equivalent to the partition.
>>>>> * Group: A group is based on the groupBy attribute and hence
>>>>> data-driven and does not depend on the parallelism. A groupReduce requires
>>>>> a partitioning such that all records with the same grouping attribute are
>>>>> sent to the same operator, i.e., all are part of the same partition.
>>>>> Depending on the number of distinct grouping keys (and the hash-function) a
>>>>> partition can have zero, one, or more groups.
>>>>>
>>>>> Now coming to your use case. You have 80 sources running on 5
>>>>> machines. All source on the same machine produce records with the same
>>>>> grouping key (hostname). You can actually give a hint to Flink, that the
>>>>> data returned by a split is partitioned, grouped, or sorted in a specific
>>>>> way. This works as follows:
>>>>>
>>>>> // String is hostname, Integer is parallel id of the source task
>>>>> DataSet<Tuple3<String, Integer, Long>> = env.createInput(YourFormat);
>>>>> SplitDataProperties<Tuple3<String, Integer, Long>> splitProps =
>>>>> ((DataSource)text).getSplitDataProperties();
>>>>> splitProps.splitsGroupedBy(0,1)
>>>>> splitProps.splitsPartitionedBy(0,1)
>>>>>
>>>>> With this info, Flink knows that the data returned by our source is
>>>>> partitioned and grouped. Now you can do groupBy(0,1).groupReduce(XXX) to
>>>>> run a local groupReduce operation on each of the 80 tasks (hostname and
>>>>> parallel index result in 80 keys) and locally reduce the data.
>>>>> Next step would be another .groupBy(0).groupReduce() which gives 16
>>>>> groups which are distributed across your tasks.
>>>>>
>>>>> However, you have to be careful with the SplitDataProperties. If you
>>>>> get them wrong, the optimizer makes false assumption and the resulting plan
>>>>> might not compute what you are looking for.
>>>>> I'd recommend to read the JavaDocs and play a bit with this feature to
>>>>> see how it behaves. ExecutionEnvironment.getExecutionPlan() can help
>>>>> to figure out what is happening.
>>>>>
>>>>> Best,
>>>>> Fabian
>>>>>
>>>>>
>>>>> 2017-01-13 12:14 GMT+01:00 Robert Schmidtke <ro...@gmail.com>:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I'm having some trouble grasping what the meaning of/difference
>>>>>> between the following concepts is:
>>>>>>
>>>>>> - Split
>>>>>> - Group
>>>>>> - Partition
>>>>>>
>>>>>> Let me elaborate a bit on the problem I'm trying to solve here. In my
>>>>>> tests I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in
>>>>>> standalone mode. Each node has 64G of memory and 32 cores. I'm starting the
>>>>>> JobManager on one node, and a TaskManager on each node. I'm assigning 16
>>>>>> slots to each TaskManager, so the overall parallelism is 80 (= 5 TMs x 16
>>>>>> Slots).
>>>>>>
>>>>>> The data I want to process resides in a local folder on each worker
>>>>>> with the same path (say /tmp/input). There can be arbitrarily many input
>>>>>> files in each worker's folder. I have written a custom input format that
>>>>>> round-robin assigns the files to each of the 16 local input splits (
>>>>>> https://github.com/robert-schmidtke/hdfs-statistics-adapter
>>>>>> /blob/master/sfs-analysis/src/main/java/de/zib/sfs/analysis/
>>>>>> io/SfsInputFormat.java) to obtain a total of 80 input splits that
>>>>>> need processing. Each split reads zero or more files, parsing the contents
>>>>>> into records that are emitted correctly. This works as expected.
>>>>>>
>>>>>> Now we're getting to the questions. How do these 80 input splits
>>>>>> relate to groups and partitions? My understanding of a partition is a
>>>>>> subset of my DataSet<X> that is local to each node. I.e. if I were to
>>>>>> repartition the data according to some scheme, a shuffling over workers
>>>>>> would occur. After reading all the data, I have 80 partitions, correct?
>>>>>>
>>>>>> What is less clear to me is the concept of a group, i.e. the result
>>>>>> of a groupBy operation. The input files I have are produced on each worker
>>>>>> by some other process. I first want to do pre-aggregation (I hope that's
>>>>>> the term) on each node before sending data over the network. The records
>>>>>> I'm processing contain a 'hostname' attribute, which is set to the worker's
>>>>>> hostname that processes the data, because the DataSources are local. That
>>>>>> means the records produced by the worker on host1 always contain the
>>>>>> attribute hostname=host1. Similar for the other 4 workers.
>>>>>>
>>>>>> Now what happens if I do a groupBy("hostname")? How do the workers
>>>>>> realize that no network transfer is necessary? Is a group a logical
>>>>>> abstraction, or a physical one (in my understanding a partition is physical
>>>>>> because it's local to exactly one worker).
>>>>>>
>>>>>> What I'd like to do next is a reduceGroup to merge multiple records
>>>>>> into one (some custom, yet straightforward, aggregation) and emit another
>>>>>> record for every couple of input records. Am I correct in assuming that the
>>>>>> Iterable<X> values passed to the reduce function all have the same hostname
>>>>>> value? That is, will the operation have a parallelism of 80, where 5x16
>>>>>> operations will have the same hostname value? Because I have 16 splits per
>>>>>> host, the 16 reduces on host1 should all receive values with
>>>>>> hostname=host1, correct? And after the operation has finished, will the
>>>>>> reduced groups (now actual DataSets again) still be local to the workers?
>>>>>>
>>>>>> This is quite a lot to work on I have to admit. I'm happy for any
>>>>>> hints, advice and feedback on this. If there's need for clarification I'd
>>>>>> be happy to provide more information.
>>>>>>
>>>>>> Thanks a lot in advance!
>>>>>>
>>>>>> Robert
>>>>>>
>>>>>> --
>>>>>> My GPG Key ID: 336E2680
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> My GPG Key ID: 336E2680
>>>>
>>>
>>>
>>>
>>> --
>>> My GPG Key ID: 336E2680
>>>
>>
>>
>
>
> --
> My GPG Key ID: 336E2680
>

Re: Terminology: Split, Group and Partition

Posted by Robert Schmidtke <ro...@gmail.com>.
Hi Fabian,

I have opened a ticket for that, thanks.

I have another question: now that I have obtained the proper local
grouping, I did some aggregation of type [T] -> U, where one aggregated
object is of type U, containing information of zero or more Ts. The Us are
still tied to the hostname, and have the property hostname=hostX for the
workers they're executed on, just like before. Is it possible to specify
the grouping/partitioning for DataSets that are not DataSources, just like
you suggested before? Because my guess is that the grouping information is
lost when going from T to U.

Best and thanks for the great help!
Robert

On Fri, Jan 13, 2017 at 8:54 PM, Fabian Hueske <fh...@gmail.com> wrote:

> I think so far getExecutionPlan() was only used for debugging purpose and
> not in programs that would also be executed.
> You can open a JIRA issue if you think that this would a valuable feature.
>
> Thanks, Fabian
>
> 2017-01-13 16:34 GMT+01:00 Robert Schmidtke <ro...@gmail.com>:
>
>> Just a side note, I'm guessing there's a bug here:
>> https://github.com/apache/flink/blob/master/flink-
>> clients/src/main/java/org/apache/flink/client/program/
>> ContextEnvironment.java#L68
>>
>> It should say createProgramPlan("unnamed job", false);
>>
>> Otherwise I'm getting an exception complaining that no new sinks have
>> been added after the last execution. So currently it is not possible for me
>> to first get the execution plan and then run execute the program.
>>
>> Robert
>>
>> On Fri, Jan 13, 2017 at 3:14 PM, Robert Schmidtke <ro.schmidtke@gmail.com
>> > wrote:
>>
>>> Hi Fabian,
>>>
>>> thanks for the quick and comprehensive reply. I'll have a look at the
>>> ExecutionPlan using your suggestion to check what actually gets computed,
>>> and I'll use the properties as well. If I stumble across something else
>>> I'll let you know.
>>>
>>> Many thanks again!
>>> Robert
>>>
>>> On Fri, Jan 13, 2017 at 2:40 PM, Fabian Hueske <fh...@gmail.com>
>>> wrote:
>>>
>>>> Hi Robert,
>>>>
>>>> let me first describe what splits, groups, and partitions are.
>>>>
>>>> * Partition: This is basically all data that goes through the same task
>>>> instance. If you have an operator with a parallelism of 80, you have 80
>>>> partitions. When you call sortPartition() you'll have 80 sorted streams, if
>>>> you call mapPartition you iterate over all records in one partition.
>>>> * Split: Splits are a concept of InputFormats. An InputFormat can
>>>> process several splits. All splits that are processed by the same data
>>>> source task make up the partition of that task. So a split is a subset of a
>>>> partition. In your case where each task reads exactly one split, the split
>>>> is equivalent to the partition.
>>>> * Group: A group is based on the groupBy attribute and hence
>>>> data-driven and does not depend on the parallelism. A groupReduce requires
>>>> a partitioning such that all records with the same grouping attribute are
>>>> sent to the same operator, i.e., all are part of the same partition.
>>>> Depending on the number of distinct grouping keys (and the hash-function) a
>>>> partition can have zero, one, or more groups.
>>>>
>>>> Now coming to your use case. You have 80 sources running on 5 machines.
>>>> All source on the same machine produce records with the same grouping key
>>>> (hostname). You can actually give a hint to Flink, that the data returned
>>>> by a split is partitioned, grouped, or sorted in a specific way. This works
>>>> as follows:
>>>>
>>>> // String is hostname, Integer is parallel id of the source task
>>>> DataSet<Tuple3<String, Integer, Long>> = env.createInput(YourFormat);
>>>> SplitDataProperties<Tuple3<String, Integer, Long>> splitProps =
>>>> ((DataSource)text).getSplitDataProperties();
>>>> splitProps.splitsGroupedBy(0,1)
>>>> splitProps.splitsPartitionedBy(0,1)
>>>>
>>>> With this info, Flink knows that the data returned by our source is
>>>> partitioned and grouped. Now you can do groupBy(0,1).groupReduce(XXX) to
>>>> run a local groupReduce operation on each of the 80 tasks (hostname and
>>>> parallel index result in 80 keys) and locally reduce the data.
>>>> Next step would be another .groupBy(0).groupReduce() which gives 16
>>>> groups which are distributed across your tasks.
>>>>
>>>> However, you have to be careful with the SplitDataProperties. If you
>>>> get them wrong, the optimizer makes false assumption and the resulting plan
>>>> might not compute what you are looking for.
>>>> I'd recommend to read the JavaDocs and play a bit with this feature to
>>>> see how it behaves. ExecutionEnvironment.getExecutionPlan() can help
>>>> to figure out what is happening.
>>>>
>>>> Best,
>>>> Fabian
>>>>
>>>>
>>>> 2017-01-13 12:14 GMT+01:00 Robert Schmidtke <ro...@gmail.com>:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I'm having some trouble grasping what the meaning of/difference
>>>>> between the following concepts is:
>>>>>
>>>>> - Split
>>>>> - Group
>>>>> - Partition
>>>>>
>>>>> Let me elaborate a bit on the problem I'm trying to solve here. In my
>>>>> tests I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in
>>>>> standalone mode. Each node has 64G of memory and 32 cores. I'm starting the
>>>>> JobManager on one node, and a TaskManager on each node. I'm assigning 16
>>>>> slots to each TaskManager, so the overall parallelism is 80 (= 5 TMs x 16
>>>>> Slots).
>>>>>
>>>>> The data I want to process resides in a local folder on each worker
>>>>> with the same path (say /tmp/input). There can be arbitrarily many input
>>>>> files in each worker's folder. I have written a custom input format that
>>>>> round-robin assigns the files to each of the 16 local input splits (
>>>>> https://github.com/robert-schmidtke/hdfs-statistics-adapter
>>>>> /blob/master/sfs-analysis/src/main/java/de/zib/sfs/analysis/
>>>>> io/SfsInputFormat.java) to obtain a total of 80 input splits that
>>>>> need processing. Each split reads zero or more files, parsing the contents
>>>>> into records that are emitted correctly. This works as expected.
>>>>>
>>>>> Now we're getting to the questions. How do these 80 input splits
>>>>> relate to groups and partitions? My understanding of a partition is a
>>>>> subset of my DataSet<X> that is local to each node. I.e. if I were to
>>>>> repartition the data according to some scheme, a shuffling over workers
>>>>> would occur. After reading all the data, I have 80 partitions, correct?
>>>>>
>>>>> What is less clear to me is the concept of a group, i.e. the result of
>>>>> a groupBy operation. The input files I have are produced on each worker by
>>>>> some other process. I first want to do pre-aggregation (I hope that's the
>>>>> term) on each node before sending data over the network. The records I'm
>>>>> processing contain a 'hostname' attribute, which is set to the worker's
>>>>> hostname that processes the data, because the DataSources are local. That
>>>>> means the records produced by the worker on host1 always contain the
>>>>> attribute hostname=host1. Similar for the other 4 workers.
>>>>>
>>>>> Now what happens if I do a groupBy("hostname")? How do the workers
>>>>> realize that no network transfer is necessary? Is a group a logical
>>>>> abstraction, or a physical one (in my understanding a partition is physical
>>>>> because it's local to exactly one worker).
>>>>>
>>>>> What I'd like to do next is a reduceGroup to merge multiple records
>>>>> into one (some custom, yet straightforward, aggregation) and emit another
>>>>> record for every couple of input records. Am I correct in assuming that the
>>>>> Iterable<X> values passed to the reduce function all have the same hostname
>>>>> value? That is, will the operation have a parallelism of 80, where 5x16
>>>>> operations will have the same hostname value? Because I have 16 splits per
>>>>> host, the 16 reduces on host1 should all receive values with
>>>>> hostname=host1, correct? And after the operation has finished, will the
>>>>> reduced groups (now actual DataSets again) still be local to the workers?
>>>>>
>>>>> This is quite a lot to work on I have to admit. I'm happy for any
>>>>> hints, advice and feedback on this. If there's need for clarification I'd
>>>>> be happy to provide more information.
>>>>>
>>>>> Thanks a lot in advance!
>>>>>
>>>>> Robert
>>>>>
>>>>> --
>>>>> My GPG Key ID: 336E2680
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> My GPG Key ID: 336E2680
>>>
>>
>>
>>
>> --
>> My GPG Key ID: 336E2680
>>
>
>


-- 
My GPG Key ID: 336E2680

Re: Terminology: Split, Group and Partition

Posted by Fabian Hueske <fh...@gmail.com>.
I think so far getExecutionPlan() was only used for debugging purpose and
not in programs that would also be executed.
You can open a JIRA issue if you think that this would a valuable feature.

Thanks, Fabian

2017-01-13 16:34 GMT+01:00 Robert Schmidtke <ro...@gmail.com>:

> Just a side note, I'm guessing there's a bug here: https://github.com/
> apache/flink/blob/master/flink-clients/src/main/java/
> org/apache/flink/client/program/ContextEnvironment.java#L68
>
> It should say createProgramPlan("unnamed job", false);
>
> Otherwise I'm getting an exception complaining that no new sinks have been
> added after the last execution. So currently it is not possible for me to
> first get the execution plan and then run execute the program.
>
> Robert
>
> On Fri, Jan 13, 2017 at 3:14 PM, Robert Schmidtke <ro...@gmail.com>
> wrote:
>
>> Hi Fabian,
>>
>> thanks for the quick and comprehensive reply. I'll have a look at the
>> ExecutionPlan using your suggestion to check what actually gets computed,
>> and I'll use the properties as well. If I stumble across something else
>> I'll let you know.
>>
>> Many thanks again!
>> Robert
>>
>> On Fri, Jan 13, 2017 at 2:40 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi Robert,
>>>
>>> let me first describe what splits, groups, and partitions are.
>>>
>>> * Partition: This is basically all data that goes through the same task
>>> instance. If you have an operator with a parallelism of 80, you have 80
>>> partitions. When you call sortPartition() you'll have 80 sorted streams, if
>>> you call mapPartition you iterate over all records in one partition.
>>> * Split: Splits are a concept of InputFormats. An InputFormat can
>>> process several splits. All splits that are processed by the same data
>>> source task make up the partition of that task. So a split is a subset of a
>>> partition. In your case where each task reads exactly one split, the split
>>> is equivalent to the partition.
>>> * Group: A group is based on the groupBy attribute and hence data-driven
>>> and does not depend on the parallelism. A groupReduce requires a
>>> partitioning such that all records with the same grouping attribute are
>>> sent to the same operator, i.e., all are part of the same partition.
>>> Depending on the number of distinct grouping keys (and the hash-function) a
>>> partition can have zero, one, or more groups.
>>>
>>> Now coming to your use case. You have 80 sources running on 5 machines.
>>> All source on the same machine produce records with the same grouping key
>>> (hostname). You can actually give a hint to Flink, that the data returned
>>> by a split is partitioned, grouped, or sorted in a specific way. This works
>>> as follows:
>>>
>>> // String is hostname, Integer is parallel id of the source task
>>> DataSet<Tuple3<String, Integer, Long>> = env.createInput(YourFormat);
>>> SplitDataProperties<Tuple3<String, Integer, Long>> splitProps =
>>> ((DataSource)text).getSplitDataProperties();
>>> splitProps.splitsGroupedBy(0,1)
>>> splitProps.splitsPartitionedBy(0,1)
>>>
>>> With this info, Flink knows that the data returned by our source is
>>> partitioned and grouped. Now you can do groupBy(0,1).groupReduce(XXX) to
>>> run a local groupReduce operation on each of the 80 tasks (hostname and
>>> parallel index result in 80 keys) and locally reduce the data.
>>> Next step would be another .groupBy(0).groupReduce() which gives 16
>>> groups which are distributed across your tasks.
>>>
>>> However, you have to be careful with the SplitDataProperties. If you get
>>> them wrong, the optimizer makes false assumption and the resulting plan
>>> might not compute what you are looking for.
>>> I'd recommend to read the JavaDocs and play a bit with this feature to
>>> see how it behaves. ExecutionEnvironment.getExecutionPlan() can help to
>>> figure out what is happening.
>>>
>>> Best,
>>> Fabian
>>>
>>>
>>> 2017-01-13 12:14 GMT+01:00 Robert Schmidtke <ro...@gmail.com>:
>>>
>>>> Hi all,
>>>>
>>>> I'm having some trouble grasping what the meaning of/difference between
>>>> the following concepts is:
>>>>
>>>> - Split
>>>> - Group
>>>> - Partition
>>>>
>>>> Let me elaborate a bit on the problem I'm trying to solve here. In my
>>>> tests I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in
>>>> standalone mode. Each node has 64G of memory and 32 cores. I'm starting the
>>>> JobManager on one node, and a TaskManager on each node. I'm assigning 16
>>>> slots to each TaskManager, so the overall parallelism is 80 (= 5 TMs x 16
>>>> Slots).
>>>>
>>>> The data I want to process resides in a local folder on each worker
>>>> with the same path (say /tmp/input). There can be arbitrarily many input
>>>> files in each worker's folder. I have written a custom input format that
>>>> round-robin assigns the files to each of the 16 local input splits (
>>>> https://github.com/robert-schmidtke/hdfs-statistics-adapter
>>>> /blob/master/sfs-analysis/src/main/java/de/zib/sfs/analysis/
>>>> io/SfsInputFormat.java) to obtain a total of 80 input splits that need
>>>> processing. Each split reads zero or more files, parsing the contents into
>>>> records that are emitted correctly. This works as expected.
>>>>
>>>> Now we're getting to the questions. How do these 80 input splits relate
>>>> to groups and partitions? My understanding of a partition is a subset of my
>>>> DataSet<X> that is local to each node. I.e. if I were to repartition the
>>>> data according to some scheme, a shuffling over workers would occur. After
>>>> reading all the data, I have 80 partitions, correct?
>>>>
>>>> What is less clear to me is the concept of a group, i.e. the result of
>>>> a groupBy operation. The input files I have are produced on each worker by
>>>> some other process. I first want to do pre-aggregation (I hope that's the
>>>> term) on each node before sending data over the network. The records I'm
>>>> processing contain a 'hostname' attribute, which is set to the worker's
>>>> hostname that processes the data, because the DataSources are local. That
>>>> means the records produced by the worker on host1 always contain the
>>>> attribute hostname=host1. Similar for the other 4 workers.
>>>>
>>>> Now what happens if I do a groupBy("hostname")? How do the workers
>>>> realize that no network transfer is necessary? Is a group a logical
>>>> abstraction, or a physical one (in my understanding a partition is physical
>>>> because it's local to exactly one worker).
>>>>
>>>> What I'd like to do next is a reduceGroup to merge multiple records
>>>> into one (some custom, yet straightforward, aggregation) and emit another
>>>> record for every couple of input records. Am I correct in assuming that the
>>>> Iterable<X> values passed to the reduce function all have the same hostname
>>>> value? That is, will the operation have a parallelism of 80, where 5x16
>>>> operations will have the same hostname value? Because I have 16 splits per
>>>> host, the 16 reduces on host1 should all receive values with
>>>> hostname=host1, correct? And after the operation has finished, will the
>>>> reduced groups (now actual DataSets again) still be local to the workers?
>>>>
>>>> This is quite a lot to work on I have to admit. I'm happy for any
>>>> hints, advice and feedback on this. If there's need for clarification I'd
>>>> be happy to provide more information.
>>>>
>>>> Thanks a lot in advance!
>>>>
>>>> Robert
>>>>
>>>> --
>>>> My GPG Key ID: 336E2680
>>>>
>>>
>>>
>>
>>
>> --
>> My GPG Key ID: 336E2680
>>
>
>
>
> --
> My GPG Key ID: 336E2680
>

Re: Terminology: Split, Group and Partition

Posted by Robert Schmidtke <ro...@gmail.com>.
Just a side note, I'm guessing there's a bug here:
https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java#L68

It should say createProgramPlan("unnamed job", false);

Otherwise I'm getting an exception complaining that no new sinks have been
added after the last execution. So currently it is not possible for me to
first get the execution plan and then run execute the program.

Robert

On Fri, Jan 13, 2017 at 3:14 PM, Robert Schmidtke <ro...@gmail.com>
wrote:

> Hi Fabian,
>
> thanks for the quick and comprehensive reply. I'll have a look at the
> ExecutionPlan using your suggestion to check what actually gets computed,
> and I'll use the properties as well. If I stumble across something else
> I'll let you know.
>
> Many thanks again!
> Robert
>
> On Fri, Jan 13, 2017 at 2:40 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Robert,
>>
>> let me first describe what splits, groups, and partitions are.
>>
>> * Partition: This is basically all data that goes through the same task
>> instance. If you have an operator with a parallelism of 80, you have 80
>> partitions. When you call sortPartition() you'll have 80 sorted streams, if
>> you call mapPartition you iterate over all records in one partition.
>> * Split: Splits are a concept of InputFormats. An InputFormat can process
>> several splits. All splits that are processed by the same data source task
>> make up the partition of that task. So a split is a subset of a partition.
>> In your case where each task reads exactly one split, the split is
>> equivalent to the partition.
>> * Group: A group is based on the groupBy attribute and hence data-driven
>> and does not depend on the parallelism. A groupReduce requires a
>> partitioning such that all records with the same grouping attribute are
>> sent to the same operator, i.e., all are part of the same partition.
>> Depending on the number of distinct grouping keys (and the hash-function) a
>> partition can have zero, one, or more groups.
>>
>> Now coming to your use case. You have 80 sources running on 5 machines.
>> All source on the same machine produce records with the same grouping key
>> (hostname). You can actually give a hint to Flink, that the data returned
>> by a split is partitioned, grouped, or sorted in a specific way. This works
>> as follows:
>>
>> // String is hostname, Integer is parallel id of the source task
>> DataSet<Tuple3<String, Integer, Long>> = env.createInput(YourFormat);
>> SplitDataProperties<Tuple3<String, Integer, Long>> splitProps =
>> ((DataSource)text).getSplitDataProperties();
>> splitProps.splitsGroupedBy(0,1)
>> splitProps.splitsPartitionedBy(0,1)
>>
>> With this info, Flink knows that the data returned by our source is
>> partitioned and grouped. Now you can do groupBy(0,1).groupReduce(XXX) to
>> run a local groupReduce operation on each of the 80 tasks (hostname and
>> parallel index result in 80 keys) and locally reduce the data.
>> Next step would be another .groupBy(0).groupReduce() which gives 16
>> groups which are distributed across your tasks.
>>
>> However, you have to be careful with the SplitDataProperties. If you get
>> them wrong, the optimizer makes false assumption and the resulting plan
>> might not compute what you are looking for.
>> I'd recommend to read the JavaDocs and play a bit with this feature to
>> see how it behaves. ExecutionEnvironment.getExecutionPlan() can help to
>> figure out what is happening.
>>
>> Best,
>> Fabian
>>
>>
>> 2017-01-13 12:14 GMT+01:00 Robert Schmidtke <ro...@gmail.com>:
>>
>>> Hi all,
>>>
>>> I'm having some trouble grasping what the meaning of/difference between
>>> the following concepts is:
>>>
>>> - Split
>>> - Group
>>> - Partition
>>>
>>> Let me elaborate a bit on the problem I'm trying to solve here. In my
>>> tests I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in
>>> standalone mode. Each node has 64G of memory and 32 cores. I'm starting the
>>> JobManager on one node, and a TaskManager on each node. I'm assigning 16
>>> slots to each TaskManager, so the overall parallelism is 80 (= 5 TMs x 16
>>> Slots).
>>>
>>> The data I want to process resides in a local folder on each worker with
>>> the same path (say /tmp/input). There can be arbitrarily many input files
>>> in each worker's folder. I have written a custom input format that
>>> round-robin assigns the files to each of the 16 local input splits (
>>> https://github.com/robert-schmidtke/hdfs-statistics-adapter
>>> /blob/master/sfs-analysis/src/main/java/de/zib/sfs/analysis/
>>> io/SfsInputFormat.java) to obtain a total of 80 input splits that need
>>> processing. Each split reads zero or more files, parsing the contents into
>>> records that are emitted correctly. This works as expected.
>>>
>>> Now we're getting to the questions. How do these 80 input splits relate
>>> to groups and partitions? My understanding of a partition is a subset of my
>>> DataSet<X> that is local to each node. I.e. if I were to repartition the
>>> data according to some scheme, a shuffling over workers would occur. After
>>> reading all the data, I have 80 partitions, correct?
>>>
>>> What is less clear to me is the concept of a group, i.e. the result of a
>>> groupBy operation. The input files I have are produced on each worker by
>>> some other process. I first want to do pre-aggregation (I hope that's the
>>> term) on each node before sending data over the network. The records I'm
>>> processing contain a 'hostname' attribute, which is set to the worker's
>>> hostname that processes the data, because the DataSources are local. That
>>> means the records produced by the worker on host1 always contain the
>>> attribute hostname=host1. Similar for the other 4 workers.
>>>
>>> Now what happens if I do a groupBy("hostname")? How do the workers
>>> realize that no network transfer is necessary? Is a group a logical
>>> abstraction, or a physical one (in my understanding a partition is physical
>>> because it's local to exactly one worker).
>>>
>>> What I'd like to do next is a reduceGroup to merge multiple records into
>>> one (some custom, yet straightforward, aggregation) and emit another record
>>> for every couple of input records. Am I correct in assuming that the
>>> Iterable<X> values passed to the reduce function all have the same hostname
>>> value? That is, will the operation have a parallelism of 80, where 5x16
>>> operations will have the same hostname value? Because I have 16 splits per
>>> host, the 16 reduces on host1 should all receive values with
>>> hostname=host1, correct? And after the operation has finished, will the
>>> reduced groups (now actual DataSets again) still be local to the workers?
>>>
>>> This is quite a lot to work on I have to admit. I'm happy for any hints,
>>> advice and feedback on this. If there's need for clarification I'd be happy
>>> to provide more information.
>>>
>>> Thanks a lot in advance!
>>>
>>> Robert
>>>
>>> --
>>> My GPG Key ID: 336E2680
>>>
>>
>>
>
>
> --
> My GPG Key ID: 336E2680
>



-- 
My GPG Key ID: 336E2680

Re: Terminology: Split, Group and Partition

Posted by Robert Schmidtke <ro...@gmail.com>.
Hi Fabian,

thanks for the quick and comprehensive reply. I'll have a look at the
ExecutionPlan using your suggestion to check what actually gets computed,
and I'll use the properties as well. If I stumble across something else
I'll let you know.

Many thanks again!
Robert

On Fri, Jan 13, 2017 at 2:40 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Robert,
>
> let me first describe what splits, groups, and partitions are.
>
> * Partition: This is basically all data that goes through the same task
> instance. If you have an operator with a parallelism of 80, you have 80
> partitions. When you call sortPartition() you'll have 80 sorted streams, if
> you call mapPartition you iterate over all records in one partition.
> * Split: Splits are a concept of InputFormats. An InputFormat can process
> several splits. All splits that are processed by the same data source task
> make up the partition of that task. So a split is a subset of a partition.
> In your case where each task reads exactly one split, the split is
> equivalent to the partition.
> * Group: A group is based on the groupBy attribute and hence data-driven
> and does not depend on the parallelism. A groupReduce requires a
> partitioning such that all records with the same grouping attribute are
> sent to the same operator, i.e., all are part of the same partition.
> Depending on the number of distinct grouping keys (and the hash-function) a
> partition can have zero, one, or more groups.
>
> Now coming to your use case. You have 80 sources running on 5 machines.
> All source on the same machine produce records with the same grouping key
> (hostname). You can actually give a hint to Flink, that the data returned
> by a split is partitioned, grouped, or sorted in a specific way. This works
> as follows:
>
> // String is hostname, Integer is parallel id of the source task
> DataSet<Tuple3<String, Integer, Long>> = env.createInput(YourFormat);
> SplitDataProperties<Tuple3<String, Integer, Long>> splitProps =
> ((DataSource)text).getSplitDataProperties();
> splitProps.splitsGroupedBy(0,1)
> splitProps.splitsPartitionedBy(0,1)
>
> With this info, Flink knows that the data returned by our source is
> partitioned and grouped. Now you can do groupBy(0,1).groupReduce(XXX) to
> run a local groupReduce operation on each of the 80 tasks (hostname and
> parallel index result in 80 keys) and locally reduce the data.
> Next step would be another .groupBy(0).groupReduce() which gives 16 groups
> which are distributed across your tasks.
>
> However, you have to be careful with the SplitDataProperties. If you get
> them wrong, the optimizer makes false assumption and the resulting plan
> might not compute what you are looking for.
> I'd recommend to read the JavaDocs and play a bit with this feature to see
> how it behaves. ExecutionEnvironment.getExecutionPlan() can help to
> figure out what is happening.
>
> Best,
> Fabian
>
>
> 2017-01-13 12:14 GMT+01:00 Robert Schmidtke <ro...@gmail.com>:
>
>> Hi all,
>>
>> I'm having some trouble grasping what the meaning of/difference between
>> the following concepts is:
>>
>> - Split
>> - Group
>> - Partition
>>
>> Let me elaborate a bit on the problem I'm trying to solve here. In my
>> tests I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in
>> standalone mode. Each node has 64G of memory and 32 cores. I'm starting the
>> JobManager on one node, and a TaskManager on each node. I'm assigning 16
>> slots to each TaskManager, so the overall parallelism is 80 (= 5 TMs x 16
>> Slots).
>>
>> The data I want to process resides in a local folder on each worker with
>> the same path (say /tmp/input). There can be arbitrarily many input files
>> in each worker's folder. I have written a custom input format that
>> round-robin assigns the files to each of the 16 local input splits (
>> https://github.com/robert-schmidtke/hdfs-statistics-adapter
>> /blob/master/sfs-analysis/src/main/java/de/zib/sfs/analysis/
>> io/SfsInputFormat.java) to obtain a total of 80 input splits that need
>> processing. Each split reads zero or more files, parsing the contents into
>> records that are emitted correctly. This works as expected.
>>
>> Now we're getting to the questions. How do these 80 input splits relate
>> to groups and partitions? My understanding of a partition is a subset of my
>> DataSet<X> that is local to each node. I.e. if I were to repartition the
>> data according to some scheme, a shuffling over workers would occur. After
>> reading all the data, I have 80 partitions, correct?
>>
>> What is less clear to me is the concept of a group, i.e. the result of a
>> groupBy operation. The input files I have are produced on each worker by
>> some other process. I first want to do pre-aggregation (I hope that's the
>> term) on each node before sending data over the network. The records I'm
>> processing contain a 'hostname' attribute, which is set to the worker's
>> hostname that processes the data, because the DataSources are local. That
>> means the records produced by the worker on host1 always contain the
>> attribute hostname=host1. Similar for the other 4 workers.
>>
>> Now what happens if I do a groupBy("hostname")? How do the workers
>> realize that no network transfer is necessary? Is a group a logical
>> abstraction, or a physical one (in my understanding a partition is physical
>> because it's local to exactly one worker).
>>
>> What I'd like to do next is a reduceGroup to merge multiple records into
>> one (some custom, yet straightforward, aggregation) and emit another record
>> for every couple of input records. Am I correct in assuming that the
>> Iterable<X> values passed to the reduce function all have the same hostname
>> value? That is, will the operation have a parallelism of 80, where 5x16
>> operations will have the same hostname value? Because I have 16 splits per
>> host, the 16 reduces on host1 should all receive values with
>> hostname=host1, correct? And after the operation has finished, will the
>> reduced groups (now actual DataSets again) still be local to the workers?
>>
>> This is quite a lot to work on I have to admit. I'm happy for any hints,
>> advice and feedback on this. If there's need for clarification I'd be happy
>> to provide more information.
>>
>> Thanks a lot in advance!
>>
>> Robert
>>
>> --
>> My GPG Key ID: 336E2680
>>
>
>


-- 
My GPG Key ID: 336E2680

Re: Terminology: Split, Group and Partition

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Robert,

let me first describe what splits, groups, and partitions are.

* Partition: This is basically all data that goes through the same task
instance. If you have an operator with a parallelism of 80, you have 80
partitions. When you call sortPartition() you'll have 80 sorted streams, if
you call mapPartition you iterate over all records in one partition.
* Split: Splits are a concept of InputFormats. An InputFormat can process
several splits. All splits that are processed by the same data source task
make up the partition of that task. So a split is a subset of a partition.
In your case where each task reads exactly one split, the split is
equivalent to the partition.
* Group: A group is based on the groupBy attribute and hence data-driven
and does not depend on the parallelism. A groupReduce requires a
partitioning such that all records with the same grouping attribute are
sent to the same operator, i.e., all are part of the same partition.
Depending on the number of distinct grouping keys (and the hash-function) a
partition can have zero, one, or more groups.

Now coming to your use case. You have 80 sources running on 5 machines. All
source on the same machine produce records with the same grouping key
(hostname). You can actually give a hint to Flink, that the data returned
by a split is partitioned, grouped, or sorted in a specific way. This works
as follows:

// String is hostname, Integer is parallel id of the source task
DataSet<Tuple3<String, Integer, Long>> = env.createInput(YourFormat);
SplitDataProperties<Tuple3<String, Integer, Long>> splitProps =
((DataSource)text).getSplitDataProperties();
splitProps.splitsGroupedBy(0,1)
splitProps.splitsPartitionedBy(0,1)

With this info, Flink knows that the data returned by our source is
partitioned and grouped. Now you can do groupBy(0,1).groupReduce(XXX) to
run a local groupReduce operation on each of the 80 tasks (hostname and
parallel index result in 80 keys) and locally reduce the data.
Next step would be another .groupBy(0).groupReduce() which gives 16 groups
which are distributed across your tasks.

However, you have to be careful with the SplitDataProperties. If you get
them wrong, the optimizer makes false assumption and the resulting plan
might not compute what you are looking for.
I'd recommend to read the JavaDocs and play a bit with this feature to see
how it behaves. ExecutionEnvironment.getExecutionPlan() can help to figure
out what is happening.

Best,
Fabian


2017-01-13 12:14 GMT+01:00 Robert Schmidtke <ro...@gmail.com>:

> Hi all,
>
> I'm having some trouble grasping what the meaning of/difference between
> the following concepts is:
>
> - Split
> - Group
> - Partition
>
> Let me elaborate a bit on the problem I'm trying to solve here. In my
> tests I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in
> standalone mode. Each node has 64G of memory and 32 cores. I'm starting the
> JobManager on one node, and a TaskManager on each node. I'm assigning 16
> slots to each TaskManager, so the overall parallelism is 80 (= 5 TMs x 16
> Slots).
>
> The data I want to process resides in a local folder on each worker with
> the same path (say /tmp/input). There can be arbitrarily many input files
> in each worker's folder. I have written a custom input format that
> round-robin assigns the files to each of the 16 local input splits (
> https://github.com/robert-schmidtke/hdfs-statistics-
> adapter/blob/master/sfs-analysis/src/main/java/de/zib/sfs/analysis/io/
> SfsInputFormat.java) to obtain a total of 80 input splits that need
> processing. Each split reads zero or more files, parsing the contents into
> records that are emitted correctly. This works as expected.
>
> Now we're getting to the questions. How do these 80 input splits relate to
> groups and partitions? My understanding of a partition is a subset of my
> DataSet<X> that is local to each node. I.e. if I were to repartition the
> data according to some scheme, a shuffling over workers would occur. After
> reading all the data, I have 80 partitions, correct?
>
> What is less clear to me is the concept of a group, i.e. the result of a
> groupBy operation. The input files I have are produced on each worker by
> some other process. I first want to do pre-aggregation (I hope that's the
> term) on each node before sending data over the network. The records I'm
> processing contain a 'hostname' attribute, which is set to the worker's
> hostname that processes the data, because the DataSources are local. That
> means the records produced by the worker on host1 always contain the
> attribute hostname=host1. Similar for the other 4 workers.
>
> Now what happens if I do a groupBy("hostname")? How do the workers realize
> that no network transfer is necessary? Is a group a logical abstraction, or
> a physical one (in my understanding a partition is physical because it's
> local to exactly one worker).
>
> What I'd like to do next is a reduceGroup to merge multiple records into
> one (some custom, yet straightforward, aggregation) and emit another record
> for every couple of input records. Am I correct in assuming that the
> Iterable<X> values passed to the reduce function all have the same hostname
> value? That is, will the operation have a parallelism of 80, where 5x16
> operations will have the same hostname value? Because I have 16 splits per
> host, the 16 reduces on host1 should all receive values with
> hostname=host1, correct? And after the operation has finished, will the
> reduced groups (now actual DataSets again) still be local to the workers?
>
> This is quite a lot to work on I have to admit. I'm happy for any hints,
> advice and feedback on this. If there's need for clarification I'd be happy
> to provide more information.
>
> Thanks a lot in advance!
>
> Robert
>
> --
> My GPG Key ID: 336E2680
>